Skip to content

Commit

Permalink
Switch to use std::thread instead pthread (no more semaphore), allow …
Browse files Browse the repository at this point in the history
…multiple captures (insert number when needed), remove file when capture time not match
  • Loading branch information
brummer10 committed Nov 25, 2023
1 parent 79cc2dc commit ce0b873
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 123 deletions.
190 changes: 81 additions & 109 deletions plugins/NeuralRecord/profiler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,60 @@ void mtdm_invert (struct MTDM *self)

// --------------------------------------------------------------------------------

ProfilWorker::ProfilWorker()
: _execute(false) {
}

ProfilWorker::~ProfilWorker() {
if( _execute.load(std::memory_order_acquire) ) {
stop();
};
}

void ProfilWorker::stop() {
_execute.store(false, std::memory_order_release);
if (_thd.joinable()) {
cv.notify_one();
_thd.join();
}
}

void ProfilWorker::start(Profil *pt) {
if( _execute.load(std::memory_order_acquire) ) {
stop();
};
_execute.store(true, std::memory_order_release);
_thd = std::thread([this, pt]() {
while (_execute.load(std::memory_order_acquire)) {
std::unique_lock<std::mutex> lk(m);
// pt->busy.store(false, std::memory_order_release);
// wait for signal from dsp that work is to do
cv.wait(lk);
//do work
if (_execute.load(std::memory_order_acquire)) {
pt->run_thread(pt);
}
}
// when done
});
}

bool ProfilWorker::is_running() const noexcept {
return ( _execute.load(std::memory_order_acquire) &&
_thd.joinable() );
}

// --------------------------------------------------------------------------------

template <class T>
inline std::string to_string(const T& t) {
std::stringstream ss;
ss << std::setfill('0') << std::setw(3) << t;
ss << t;
return ss.str();
}

// --------------------------------------------------------------------------------

Profil::Profil(int channel_, std::function<void(const uint32_t , float) > setOutputParameterValue_,
std::function<void(const uint32_t , float) > requestParameterValueChange_)
: recfile(NULL),
Expand All @@ -205,41 +251,20 @@ Profil::Profil(int channel_, std::function<void(const uint32_t , float) > setOut
fRec1(0),
tape(fRec0),
tape1(NULL),
m_pthr(0),
rt_prio(0),
rt_policy(0),
keep_stream(false),
mem_allocated(false),
err(false),
running(false),
time_match(false),
setOutputParameterValue(setOutputParameterValue_),
requestParameterValueChange(requestParameterValueChange_) {
#ifdef _WIN32
m_trig = CreateSemaphore(NULL, 0, 1, "Local\\nrecord");
if(!m_trig){
err = true;
}
#else
sem_unlink("/nrecord");
m_trig = sem_open("/nrecord", O_CREAT|O_EXCL, S_IRWXU, 0);
if(m_trig == SEM_FAILED){
err = true;
}
#endif
worker.start(this);
}


Profil::~Profil() {
stop_thread();
worker.stop();
free(mtdm);
activate(false);
#ifdef _WIN32
if (m_trig) {
CloseHandle(m_trig);
}
#else
sem_unlink("/nrecord");
#endif
}

#ifdef USING_DPF
Expand Down Expand Up @@ -324,7 +349,16 @@ inline std::string Profil::get_path() {

// get the recording path and filename
inline std::string Profil::get_ffilename() {
return get_path() + "target.wav";
struct stat buffer;
std::string name = "target.wav";
int i = 1;
while (stat ((get_path() + name).c_str(), &buffer) == 0) {
if (i > 1) name.erase(name.begin()+6, name.end()-4);
name.insert(6, "_" + to_string(i));
i+=1;
}

return get_path() + name;
}

// check if input.wav is in path, otherwise copy it over and return path + filename
Expand Down Expand Up @@ -386,82 +420,27 @@ inline int Profil::load_from_wave(std::string fname) {

// save the chunks to disk
void Profil::disc_stream() {
for (;running;) {
#ifdef _WIN32
WaitForSingleObject(m_trig, INFINITE);
#else
sem_wait(m_trig);
#endif
if (!running) {
break;
}
if (!recfile) {
std::string fname = get_ffilename();
recfile = open_stream(fname);
}
save_to_wave(recfile, tape, savesize);
filesize +=savesize;
if ((!keep_stream && recfile) || (filesize >MAXFILESIZE)) {
close_stream(&recfile);
filesize = 0;
if (!worker.is_running()) {
return;
}
if (!recfile) {
outputfile = get_ffilename();
recfile = open_stream(outputfile);
}
save_to_wave(recfile, tape, savesize);
filesize +=savesize;
if ((!keep_stream && recfile) || (filesize >MAXFILESIZE)) {
close_stream(&recfile);
filesize = 0;
if (!time_match) {
std::remove(outputfile.c_str());
}
}
}

// get priority and policity from host
void Profil::set_thread_prio(int32_t prio, int32_t policy) {
rt_prio = prio;
rt_policy = policy;
}

// run the recording thread
void *Profil::run_thread(void *p) {
void Profil::run_thread(void *p) {
(reinterpret_cast<Profil *>(p))->disc_stream();
return NULL;
}

// stop the recording thread
void Profil::stop_thread() {
if (running) {
running = false;
#ifdef _WIN32
ReleaseSemaphore(m_trig, 1, NULL);
#else
sem_post(m_trig);
#endif
}
pthread_cancel (m_pthr);
pthread_join (m_pthr, NULL);
}

// start the recording thread
void Profil::start_thread() {
pthread_attr_t attr;
struct sched_param spar;
if (rt_prio == 0) {
rt_prio = sched_get_priority_max(SCHED_FIFO);
}
if ((rt_prio/5) > 0) rt_prio = rt_prio/5;
spar.sched_priority = rt_prio;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE );
pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL);
if (rt_policy == 0) {
pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
} else {
pthread_attr_setschedpolicy(&attr, rt_policy);
}
pthread_attr_setschedparam(&attr, &spar);
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);
// pthread_attr_setstacksize(&attr, 0x10000);
running = true;
if (pthread_create(&m_pthr, &attr, run_thread,
reinterpret_cast<void*>(this))) {
err = true;
running = false;
}
pthread_attr_destroy(&attr);
}

// clear all internal buffers on activation
Expand Down Expand Up @@ -501,7 +480,6 @@ inline void Profil::init(unsigned int samplingFreq) {
reset_errors = 0;
fConst0 = (1.0f / float(fmin(192000, fmax(1, fSamplingFreq))));
mtdm = mtdm_new(fSamplingFreq);
start_thread();
if (fSamplingFreq != 48000) {
err = true;
errors = 3.0;
Expand Down Expand Up @@ -539,11 +517,11 @@ SNDFILE *Profil::open_stream(std::string fname) {
// crude normalisation function, barly used
void Profil::normalize() {
if (tape1) { delete[] tape1; tape1 = 0; }
int n = load_from_wave(get_ffilename());
int n = load_from_wave(outputfile);
for (int i = 0;i<n;i++) {
tape1[i] *= nf;
}
SNDFILE * sf = open_stream(get_ffilename());
SNDFILE * sf = open_stream(outputfile);
if (sf) {
save_to_wave(sf, tape1, n);
sf_close(sf);
Expand Down Expand Up @@ -679,18 +657,15 @@ void always_inline Profil::compute(int count, const float *input0, float *output
} else {
fRec0[IOTA++] = fTemp1;
}
time_match = false;
fConst1 = fmax(fConst1, fabsf(fTemp1));
}
if (IOTA > MAXRECSIZE-1) { // when buffer is full, flush to stream
iA = iA ? 0 : 1 ;
tape = iA ? fRec0 : fRec1;
keep_stream = true;
savesize = IOTA;
#ifdef _WIN32
ReleaseSemaphore(m_trig, 1, NULL);
#else
sem_post(m_trig);
#endif
worker.cv.notify_one();
IOTA = 0;
}
// play input.wav file once
Expand All @@ -708,6 +683,7 @@ void always_inline Profil::compute(int count, const float *input0, float *output
roundtrip = 0;
measure = 0;
iSlow0 = 0;
time_match = true;
// switch off the PROFILE button when host support it
requestParameterValueChange((PortIndex)PROFILE, 0.0f);
// check if we need normalization
Expand All @@ -722,11 +698,7 @@ void always_inline Profil::compute(int count, const float *input0, float *output
tape = iA ? fRec1 : fRec0;
savesize = IOTA;
keep_stream = false;
#ifdef _WIN32
ReleaseSemaphore(m_trig, 1, NULL);
#else
sem_post(m_trig);
#endif
worker.cv.notify_one();
IOTA = 0;
iA = 0;
IOTAP = 0;
Expand Down
39 changes: 25 additions & 14 deletions plugins/NeuralRecord/profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@
#include <windows.h>
#else
#include <dlfcn.h>
#include <semaphore.h>
#endif

#include <fstream>
#include <functional>

#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>


namespace profiler {

Expand Down Expand Up @@ -78,13 +82,31 @@ struct MTDM
struct Freq _freq [13];
};

class Profil;

class ProfilWorker {
private:
std::atomic<bool> _execute;
std::thread _thd;
std::mutex m;

public:
ProfilWorker();
~ProfilWorker();
void stop();
void start(Profil *pt);
bool is_running() const noexcept;
std::condition_variable cv;
};

class Profil {
private:
SNDFILE * recfile;
SNDFILE * playfile;
std::string inputfile;
std::string outputfile;
struct MTDM *mtdm;
ProfilWorker worker;
int fSamplingFreq;
int channel;
float fcheckbox0;
Expand All @@ -107,18 +129,10 @@ class Profil {
float *fRec1;
float *tape;
float *tape1;
#ifdef _WIN32
HANDLE m_trig;
#else
sem_t *m_trig;
#endif
pthread_t m_pthr;
int32_t rt_prio;
int32_t rt_policy;
volatile bool keep_stream;
bool mem_allocated;
bool err;
bool running;
bool time_match;
float fConst0;
float fConst1;
float fConst2;
Expand All @@ -142,12 +156,9 @@ class Profil {
void save_to_wave(SNDFILE * sf, float *tape, int lSize);
SNDFILE *open_stream(std::string fname);
void close_stream(SNDFILE **sf);
void stop_thread();
void start_thread();
void disc_stream();
void connect(uint32_t port, float data);
void normalize();
static void *run_thread(void* p);
inline int load_from_wave(std::string fname);
inline void convert_to_wave(std::string fname, std::string oname);
inline std::string get_path();
Expand All @@ -157,7 +168,7 @@ class Profil {
std::function<void(const uint32_t, float) > requestParameterValueChange;

public:
void set_thread_prio(int32_t prio, int32_t policy);
static void run_thread(void* p);
static void clear_state(Profil*);
static int activate_plugin(bool start, Profil*);
static void set_samplerate(unsigned int samplingFreq, Profil*);
Expand Down

0 comments on commit ce0b873

Please sign in to comment.