Remove mutex from RTTaskList, use MPMC queue

This commit is contained in:
Robin Gareus 2022-05-11 23:45:51 +02:00
parent 34678c094a
commit c87a5cdcfd
Signed by: rgareus
GPG Key ID: A090BCE02CF57F04
3 changed files with 75 additions and 76 deletions

View File

@ -19,30 +19,29 @@
#ifndef _ardour_rt_tasklist_h_
#define _ardour_rt_tasklist_h_
#include <list>
#include <boost/function.hpp>
#include <list>
#include "pbd/semutils.h"
#include "pbd/g_atomic_compat.h"
#include "pbd/mpmc_queue.h"
#include "pbd/semutils.h"
#include "ardour/libardour_visibility.h"
#include "ardour/types.h"
#include "ardour/audio_backend.h"
#include "ardour/libardour_visibility.h"
#include "ardour/session_handle.h"
#include "ardour/types.h"
namespace ARDOUR {
namespace ARDOUR
{
class LIBARDOUR_API RTTaskList
{
public:
RTTaskList ();
~RTTaskList ();
// TODO use dedicated allocator of a boost::intrusive::list
typedef std::list<boost::function<void ()> > TaskList;
/** process tasks in list in parallel, wait for them to complete */
void process (TaskList const&);
void process ();
void push_back (boost::function<void ()> fn);
private:
GATOMIC_QUAL gint _threads_active;
@ -50,18 +49,18 @@ private:
void reset_thread_list ();
void drop_threads ();
void process_tasklist ();
static void* _thread_run (void *arg);
void run ();
Glib::Threads::Mutex _process_mutex;
Glib::Threads::Mutex _tasklist_mutex;
static void* _thread_run (void* arg);
PBD::Semaphore _task_run_sem;
PBD::Semaphore _task_end_sem;
TaskList _tasklist;
size_t _n_tasks;
size_t _m_tasks;
size_t _queue_size;
PBD::MPMCQueue<boost::function<void ()>> _tasks;
};
} // namespace ARDOUR

View File

@ -1112,15 +1112,18 @@ PortManager::cycle_start (pframes_t nframes, Session* s)
* A single external source-port may be connected to many ardour
* input-ports. Currently re-sampling is per input.
*/
if (s && s->rt_tasklist () && fabs (Port::speed_ratio ()) != 1.0) {
RTTaskList::TaskList tl;
boost::shared_ptr<RTTaskList> tl;
if (s) {
tl = s->rt_tasklist ();
}
if (tl && fabs (Port::speed_ratio ()) != 1.0) {
for (Ports::iterator p = _cycle_ports->begin (); p != _cycle_ports->end (); ++p) {
if (!(p->second->flags () & TransportSyncPort)) {
tl.push_back (boost::bind (&Port::cycle_start, p->second, nframes));
tl->push_back (boost::bind (&Port::cycle_start, p->second, nframes));
}
}
tl.push_back (boost::bind (&PortManager::run_input_meters, this, nframes, s ? s->nominal_sample_rate () : 0));
s->rt_tasklist ()->process (tl);
tl->push_back (boost::bind (&PortManager::run_input_meters, this, nframes, s ? s->nominal_sample_rate () : 0));
tl->process ();
} else {
for (Ports::iterator p = _cycle_ports->begin (); p != _cycle_ports->end (); ++p) {
if (!(p->second->flags () & TransportSyncPort)) {
@ -1135,14 +1138,17 @@ void
PortManager::cycle_end (pframes_t nframes, Session* s)
{
// see optimzation note in ::cycle_start()
if (0 && s && s->rt_tasklist () && fabs (Port::speed_ratio ()) != 1.0) {
RTTaskList::TaskList tl;
boost::shared_ptr<RTTaskList> tl;
if (s) {
tl = s->rt_tasklist ();
}
if (tl && fabs (Port::speed_ratio ()) != 1.0) {
for (Ports::iterator p = _cycle_ports->begin (); p != _cycle_ports->end (); ++p) {
if (!(p->second->flags () & TransportSyncPort)) {
tl.push_back (boost::bind (&Port::cycle_end, p->second, nframes));
tl->push_back (boost::bind (&Port::cycle_end, p->second, nframes));
}
}
s->rt_tasklist ()->process (tl);
tl->process ();
} else {
for (Ports::iterator p = _cycle_ports->begin (); p != _cycle_ports->end (); ++p) {
if (!(p->second->flags () & TransportSyncPort)) {
@ -1250,14 +1256,17 @@ void
PortManager::cycle_end_fade_out (gain_t base_gain, gain_t gain_step, pframes_t nframes, Session* s)
{
// see optimzation note in ::cycle_start()
if (0 && s && s->rt_tasklist () && fabs (Port::speed_ratio ()) != 1.0) {
RTTaskList::TaskList tl;
boost::shared_ptr<RTTaskList> tl;
if (s) {
tl = s->rt_tasklist ();
}
if (tl && fabs (Port::speed_ratio ()) != 1.0) {
for (Ports::iterator p = _cycle_ports->begin (); p != _cycle_ports->end (); ++p) {
if (!(p->second->flags () & TransportSyncPort)) {
tl.push_back (boost::bind (&Port::cycle_end, p->second, nframes));
tl->push_back (boost::bind (&Port::cycle_end, p->second, nframes));
}
}
s->rt_tasklist ()->process (tl);
tl->process ();
} else {
for (Ports::iterator p = _cycle_ports->begin (); p != _cycle_ports->end (); ++p) {
if (!(p->second->flags () & TransportSyncPort)) {

View File

@ -18,6 +18,7 @@
#include <cstring>
#include "pbd/g_atomic_compat.h"
#include "pbd/pthread_utils.h"
#include "ardour/audioengine.h"
@ -32,6 +33,10 @@ using namespace ARDOUR;
RTTaskList::RTTaskList ()
: _task_run_sem ("rt_task_run", 0)
, _task_end_sem ("rt_task_done", 0)
, _n_tasks (0)
, _m_tasks (0)
, _queue_size (1024)
, _tasks (_queue_size)
{
g_atomic_int_set (&_threads_active, 0);
reset_thread_list ();
@ -45,15 +50,14 @@ RTTaskList::~RTTaskList ()
void
RTTaskList::drop_threads ()
{
Glib::Threads::Mutex::Lock pm (_process_mutex);
g_atomic_int_set (&_threads_active, 0);
uint32_t nt = _threads.size ();
for (uint32_t i = 0; i < nt; ++i) {
_task_run_sem.signal ();
}
for (std::vector<pthread_t>::const_iterator i = _threads.begin (); i != _threads.end (); ++i) {
pthread_join (*i, NULL);
for (auto const& i : _threads) {
pthread_join (i, NULL);
}
_threads.clear ();
_task_run_sem.reset ();
@ -61,9 +65,9 @@ RTTaskList::drop_threads ()
}
/*static*/ void*
RTTaskList::_thread_run (void *arg)
RTTaskList::_thread_run (void* arg)
{
RTTaskList *d = static_cast<RTTaskList *>(arg);
RTTaskList* d = static_cast<RTTaskList*> (arg);
char name[64];
snprintf (name, 64, "RTTask-%p", (void*)DEBUG_THREAD_SELF);
@ -84,20 +88,18 @@ RTTaskList::reset_thread_list ()
return;
}
Glib::Threads::Mutex::Lock pm (_process_mutex);
g_atomic_int_set (&_threads_active, 1);
for (uint32_t i = 0; i < num_threads; ++i) {
int rv = 1;
pthread_t thread_id;
int rv = 1;
if (AudioEngine::instance()->is_realtime ()) {
rv = pbd_realtime_pthread_create (PBD_SCHED_FIFO, AudioEngine::instance()->client_real_time_priority(), PBD_RT_STACKSIZE_HELP, &thread_id, _thread_run, this);
if (AudioEngine::instance ()->is_realtime ()) {
rv = pbd_realtime_pthread_create (PBD_SCHED_FIFO, AudioEngine::instance ()->client_real_time_priority (), PBD_RT_STACKSIZE_HELP, &thread_id, _thread_run, this);
}
if (rv) {
rv = pbd_pthread_create (PBD_RT_STACKSIZE_HELP, &thread_id, _thread_run, this);
}
if (rv) {
PBD::fatal << _("Cannot create thread for TaskList!") << " (" << strerror(rv) << ")" << endmsg;
PBD::fatal << _("Cannot create thread for TaskList!") << " (" << strerror (rv) << ")" << endmsg;
/* NOT REACHED */
}
pbd_mach_set_realtime_policy (thread_id, 5. * 1e-5, false);
@ -108,7 +110,6 @@ RTTaskList::reset_thread_list ()
void
RTTaskList::run ()
{
Glib::Threads::Mutex::Lock tm (_tasklist_mutex, Glib::Threads::NOT_LOCK);
bool wait = true;
while (true) {
@ -124,14 +125,7 @@ RTTaskList::run ()
wait = false;
boost::function<void ()> to_run;
tm.acquire ();
if (!_tasklist.empty ()) {
to_run = _tasklist.front();
_tasklist.pop_front ();
}
tm.release ();
if (!to_run.empty ()) {
if (_tasks.pop_front (to_run)) {
to_run ();
continue;
}
@ -145,42 +139,31 @@ RTTaskList::run ()
}
void
RTTaskList::process (TaskList const& tl)
RTTaskList::push_back (boost::function<void ()> fn)
{
Glib::Threads::Mutex::Lock pm (_process_mutex);
#ifndef NDEBUG
/* must not be called while processing is already running */
Glib::Threads::Mutex::Lock tm (_tasklist_mutex, Glib::Threads::NOT_LOCK);
tm.acquire ();
assert (_tasklist.empty ());
tm.release ();
#endif
_tasklist = tl;
process_tasklist ();
#ifndef NDEBUG
/* ensure that all tasks are processed, and threads are in wait state */
tm.acquire ();
assert (_tasklist.empty ());
tm.release ();
#endif
if (!_tasks.push_back (fn)) {
fn ();
} else {
++_n_tasks;
}
++_m_tasks;
}
void
RTTaskList::process_tasklist ()
RTTaskList::process ()
{
if (0 == g_atomic_int_get (&_threads_active) || _threads.size () == 0) {
for (TaskList::iterator i = _tasklist.begin (); i != _tasklist.end(); ++i) {
(*i)();
boost::function<void ()> to_run;
while (_tasks.pop_front (to_run)) {
to_run ();
--_n_tasks;
}
_tasklist.clear ();
assert (_n_tasks == 0);
_n_tasks = 0;
return;
}
uint32_t nt = std::min (_threads.size (), _tasklist.size ());
uint32_t nt = std::min (_threads.size (), _n_tasks);
for (uint32_t i = 0; i < nt; ++i) {
_task_run_sem.signal ();
@ -188,4 +171,12 @@ RTTaskList::process_tasklist ()
for (uint32_t i = 0; i < nt; ++i) {
_task_end_sem.wait ();
}
/* re-allocate queue if needed */
if (_m_tasks >= _queue_size) {
_queue_size = _tasks.power_of_two_size (_m_tasks + 1);
_tasks.reserve (_queue_size);
}
_n_tasks = 0;
_m_tasks = 0;
}