From ce0b8739e6d0ff4c4b75d7ce5e418d86c117e0f8 Mon Sep 17 00:00:00 2001 From: brummer10 Date: Sat, 25 Nov 2023 09:34:13 +0100 Subject: [PATCH] Switch to use std::thread instead pthread (no more semaphore), allow multiple captures (insert number when needed), remove file when capture time not match --- plugins/NeuralRecord/profiler.cc | 190 +++++++++++++------------------ plugins/NeuralRecord/profiler.h | 39 ++++--- 2 files changed, 106 insertions(+), 123 deletions(-) diff --git a/plugins/NeuralRecord/profiler.cc b/plugins/NeuralRecord/profiler.cc index 58f856f..948b1bb 100644 --- a/plugins/NeuralRecord/profiler.cc +++ b/plugins/NeuralRecord/profiler.cc @@ -188,14 +188,60 @@ void mtdm_invert (struct MTDM *self) // -------------------------------------------------------------------------------- +ProfilWorker::ProfilWorker() + : _execute(false) { +} + +ProfilWorker::~ProfilWorker() { + if( _execute.load(std::memory_order_acquire) ) { + stop(); + }; +} + +void ProfilWorker::stop() { + _execute.store(false, std::memory_order_release); + if (_thd.joinable()) { + cv.notify_one(); + _thd.join(); + } +} + +void ProfilWorker::start(Profil *pt) { + if( _execute.load(std::memory_order_acquire) ) { + stop(); + }; + _execute.store(true, std::memory_order_release); + _thd = std::thread([this, pt]() { + while (_execute.load(std::memory_order_acquire)) { + std::unique_lock lk(m); + // pt->busy.store(false, std::memory_order_release); + // wait for signal from dsp that work is to do + cv.wait(lk); + //do work + if (_execute.load(std::memory_order_acquire)) { + pt->run_thread(pt); + } + } + // when done + }); +} + +bool ProfilWorker::is_running() const noexcept { + return ( _execute.load(std::memory_order_acquire) && + _thd.joinable() ); +} + +// -------------------------------------------------------------------------------- template inline std::string to_string(const T& t) { std::stringstream ss; - ss << std::setfill('0') << std::setw(3) << t; + ss << t; return ss.str(); } +// -------------------------------------------------------------------------------- + Profil::Profil(int channel_, std::function setOutputParameterValue_, std::function requestParameterValueChange_) : recfile(NULL), @@ -205,41 +251,20 @@ Profil::Profil(int channel_, std::function setOut fRec1(0), tape(fRec0), tape1(NULL), - m_pthr(0), - rt_prio(0), - rt_policy(0), keep_stream(false), mem_allocated(false), err(false), - running(false), + time_match(false), setOutputParameterValue(setOutputParameterValue_), requestParameterValueChange(requestParameterValueChange_) { -#ifdef _WIN32 - m_trig = CreateSemaphore(NULL, 0, 1, "Local\\nrecord"); - if(!m_trig){ - err = true; - } -#else - sem_unlink("/nrecord"); - m_trig = sem_open("/nrecord", O_CREAT|O_EXCL, S_IRWXU, 0); - if(m_trig == SEM_FAILED){ - err = true; - } -#endif + worker.start(this); } Profil::~Profil() { - stop_thread(); + worker.stop(); free(mtdm); activate(false); -#ifdef _WIN32 - if (m_trig) { - CloseHandle(m_trig); - } -#else - sem_unlink("/nrecord"); -#endif } #ifdef USING_DPF @@ -324,7 +349,16 @@ inline std::string Profil::get_path() { // get the recording path and filename inline std::string Profil::get_ffilename() { - return get_path() + "target.wav"; + struct stat buffer; + std::string name = "target.wav"; + int i = 1; + while (stat ((get_path() + name).c_str(), &buffer) == 0) { + if (i > 1) name.erase(name.begin()+6, name.end()-4); + name.insert(6, "_" + to_string(i)); + i+=1; + } + + return get_path() + name; } // check if input.wav is in path, otherwise copy it over and return path + filename @@ -386,82 +420,27 @@ inline int Profil::load_from_wave(std::string fname) { // save the chunks to disk void Profil::disc_stream() { - for (;running;) { -#ifdef _WIN32 - WaitForSingleObject(m_trig, INFINITE); -#else - sem_wait(m_trig); -#endif - if (!running) { - break; - } - if (!recfile) { - std::string fname = get_ffilename(); - recfile = open_stream(fname); - } - save_to_wave(recfile, tape, savesize); - filesize +=savesize; - if ((!keep_stream && recfile) || (filesize >MAXFILESIZE)) { - close_stream(&recfile); - filesize = 0; + if (!worker.is_running()) { + return; + } + if (!recfile) { + outputfile = get_ffilename(); + recfile = open_stream(outputfile); + } + save_to_wave(recfile, tape, savesize); + filesize +=savesize; + if ((!keep_stream && recfile) || (filesize >MAXFILESIZE)) { + close_stream(&recfile); + filesize = 0; + if (!time_match) { + std::remove(outputfile.c_str()); } } } -// get priority and policity from host -void Profil::set_thread_prio(int32_t prio, int32_t policy) { - rt_prio = prio; - rt_policy = policy; -} - // run the recording thread -void *Profil::run_thread(void *p) { +void Profil::run_thread(void *p) { (reinterpret_cast(p))->disc_stream(); - return NULL; -} - -// stop the recording thread -void Profil::stop_thread() { - if (running) { - running = false; -#ifdef _WIN32 - ReleaseSemaphore(m_trig, 1, NULL); -#else - sem_post(m_trig); -#endif - } - pthread_cancel (m_pthr); - pthread_join (m_pthr, NULL); -} - -// start the recording thread -void Profil::start_thread() { - pthread_attr_t attr; - struct sched_param spar; - if (rt_prio == 0) { - rt_prio = sched_get_priority_max(SCHED_FIFO); - } - if ((rt_prio/5) > 0) rt_prio = rt_prio/5; - spar.sched_priority = rt_prio; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE ); - pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); - if (rt_policy == 0) { - pthread_attr_setschedpolicy(&attr, SCHED_FIFO); - } else { - pthread_attr_setschedpolicy(&attr, rt_policy); - } - pthread_attr_setschedparam(&attr, &spar); - pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); - pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); - // pthread_attr_setstacksize(&attr, 0x10000); - running = true; - if (pthread_create(&m_pthr, &attr, run_thread, - reinterpret_cast(this))) { - err = true; - running = false; - } - pthread_attr_destroy(&attr); } // clear all internal buffers on activation @@ -501,7 +480,6 @@ inline void Profil::init(unsigned int samplingFreq) { reset_errors = 0; fConst0 = (1.0f / float(fmin(192000, fmax(1, fSamplingFreq)))); mtdm = mtdm_new(fSamplingFreq); - start_thread(); if (fSamplingFreq != 48000) { err = true; errors = 3.0; @@ -539,11 +517,11 @@ SNDFILE *Profil::open_stream(std::string fname) { // crude normalisation function, barly used void Profil::normalize() { if (tape1) { delete[] tape1; tape1 = 0; } - int n = load_from_wave(get_ffilename()); + int n = load_from_wave(outputfile); for (int i = 0;i MAXRECSIZE-1) { // when buffer is full, flush to stream @@ -686,11 +665,7 @@ void always_inline Profil::compute(int count, const float *input0, float *output tape = iA ? fRec0 : fRec1; keep_stream = true; savesize = IOTA; -#ifdef _WIN32 - ReleaseSemaphore(m_trig, 1, NULL); -#else - sem_post(m_trig); -#endif + worker.cv.notify_one(); IOTA = 0; } // play input.wav file once @@ -708,6 +683,7 @@ void always_inline Profil::compute(int count, const float *input0, float *output roundtrip = 0; measure = 0; iSlow0 = 0; + time_match = true; // switch off the PROFILE button when host support it requestParameterValueChange((PortIndex)PROFILE, 0.0f); // check if we need normalization @@ -722,11 +698,7 @@ void always_inline Profil::compute(int count, const float *input0, float *output tape = iA ? fRec1 : fRec0; savesize = IOTA; keep_stream = false; -#ifdef _WIN32 - ReleaseSemaphore(m_trig, 1, NULL); -#else - sem_post(m_trig); -#endif + worker.cv.notify_one(); IOTA = 0; iA = 0; IOTAP = 0; diff --git a/plugins/NeuralRecord/profiler.h b/plugins/NeuralRecord/profiler.h index 1b4c1ee..e7e87ff 100644 --- a/plugins/NeuralRecord/profiler.h +++ b/plugins/NeuralRecord/profiler.h @@ -44,12 +44,16 @@ #include #else #include -#include #endif #include #include +#include +#include +#include +#include + namespace profiler { @@ -78,13 +82,31 @@ struct MTDM struct Freq _freq [13]; }; +class Profil; + +class ProfilWorker { +private: + std::atomic _execute; + std::thread _thd; + std::mutex m; + +public: + ProfilWorker(); + ~ProfilWorker(); + void stop(); + void start(Profil *pt); + bool is_running() const noexcept; + std::condition_variable cv; +}; class Profil { private: SNDFILE * recfile; SNDFILE * playfile; std::string inputfile; + std::string outputfile; struct MTDM *mtdm; + ProfilWorker worker; int fSamplingFreq; int channel; float fcheckbox0; @@ -107,18 +129,10 @@ class Profil { float *fRec1; float *tape; float *tape1; -#ifdef _WIN32 - HANDLE m_trig; -#else - sem_t *m_trig; -#endif - pthread_t m_pthr; - int32_t rt_prio; - int32_t rt_policy; volatile bool keep_stream; bool mem_allocated; bool err; - bool running; + bool time_match; float fConst0; float fConst1; float fConst2; @@ -142,12 +156,9 @@ class Profil { void save_to_wave(SNDFILE * sf, float *tape, int lSize); SNDFILE *open_stream(std::string fname); void close_stream(SNDFILE **sf); - void stop_thread(); - void start_thread(); void disc_stream(); void connect(uint32_t port, float data); void normalize(); - static void *run_thread(void* p); inline int load_from_wave(std::string fname); inline void convert_to_wave(std::string fname, std::string oname); inline std::string get_path(); @@ -157,7 +168,7 @@ class Profil { std::function requestParameterValueChange; public: - void set_thread_prio(int32_t prio, int32_t policy); + static void run_thread(void* p); static void clear_state(Profil*); static int activate_plugin(bool start, Profil*); static void set_samplerate(unsigned int samplingFreq, Profil*);