Re-implement RTTaskList using Graph Threads

There is no longer an extra set of rt-threads, but existing
process-graph threads are reused.

There are two main benefits to this approach: graph-threads
have a SessioEvent pool and ProcessThread buffers. They are
also joined to work-groups (on macOS), or  JACK created threads
(cgroups).
This commit is contained in:
Robin Gareus 2022-06-04 16:24:49 +02:00
parent 7219791d22
commit c713841f39
Signed by: rgareus
GPG Key ID: A090BCE02CF57F04
9 changed files with 141 additions and 182 deletions

View File

@ -47,6 +47,7 @@ class Graph;
class IOPlug;
class Route;
class RTTaskList;
class Session;
class GraphEdges;
@ -90,6 +91,9 @@ public:
void process_one_route (Route* route);
void process_one_ioplug (IOPlug*);
/* RTTasks */
void process_tasklist (RTTaskList const&);
protected:
virtual void session_going_away ();

View File

@ -0,0 +1,47 @@
/*
* Copyright (C) 2017,2022 Robin Gareus <robin@gareus.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _ardour_rt_task_h_
#define _ardour_rt_task_h_
#include <boost/function.hpp>
#include "ardour/graphnode.h"
namespace ARDOUR
{
class Graph;
class RTTaskList;
class LIBARDOUR_API RTTask : public ProcessNode
{
public:
RTTask (Graph* g, boost::function<void ()> const& fn);
void prep (GraphChain const*) {}
void run (GraphChain const*);
private:
friend class RTTaskList;
boost::function<void ()> _f;
Graph* _graph;
};
}
#endif

View File

@ -20,46 +20,29 @@
#define _ardour_rt_tasklist_h_
#include <boost/function.hpp>
#include <list>
#include <vector>
#include "pbd/g_atomic_compat.h"
#include "pbd/mpmc_queue.h"
#include "pbd/semutils.h"
#include "ardour/audio_backend.h"
#include "ardour/libardour_visibility.h"
#include "ardour/session_handle.h"
#include "ardour/types.h"
#include "ardour/rt_task.h"
namespace ARDOUR
{
class Graph;
class LIBARDOUR_API RTTaskList
{
public:
RTTaskList ();
~RTTaskList ();
RTTaskList (boost::shared_ptr<Graph>);
/** process tasks in list in parallel, wait for them to complete */
void process ();
void push_back (boost::function<void ()> fn);
std::vector<RTTask> const& tasks () const { return _tasks; }
private:
GATOMIC_QUAL gint _threads_active;
std::vector<pthread_t> _threads;
void reset_thread_list ();
void drop_threads ();
void run ();
static void* _thread_run (void* arg);
PBD::Semaphore _task_run_sem;
PBD::Semaphore _task_end_sem;
size_t _n_tasks;
size_t _m_tasks;
PBD::MPMCQueue<boost::function<void ()>> _tasks;
std::vector<RTTask> _tasks;
boost::shared_ptr<Graph> _graph;
};
} // namespace ARDOUR

View File

@ -36,6 +36,8 @@
#include "ardour/io_plug.h"
#include "ardour/process_thread.h"
#include "ardour/route.h"
#include "ardour/rt_task.h"
#include "ardour/rt_tasklist.h"
#include "ardour/session.h"
#include "ardour/types.h"
@ -200,7 +202,9 @@ Graph::drop_threads ()
void
Graph::prep ()
{
assert (_graph_chain);
if (!_graph_chain) {
return;
}
_graph_empty = true;
node_list_t::iterator i;
@ -571,6 +575,33 @@ Graph::in_process_thread () const
/* ****************************************************************************/
void
Graph::process_tasklist (RTTaskList const& rt)
{
assert (g_atomic_uint_get (&_trigger_queue_size) == 0);
std::vector<RTTask> const& tasks = rt.tasks ();
if (tasks.empty ()) {
return;
}
g_atomic_int_set (&_trigger_queue_size, tasks.size ());
g_atomic_int_set (&_terminal_refcnt, tasks.size ());
_graph_empty = false;
for (auto const& t : tasks) {
_trigger_queue.push_back (const_cast<RTTask*>(&t));
}
_graph_chain = 0;
DEBUG_TRACE (DEBUG::ProcessThreads, "wake graph for RTTask processing\n");
_callback_start_sem.signal ();
_callback_done_sem.wait ();
DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
}
/* ****************************************************************************/
GraphChain::GraphChain (GraphNodeList const& nodelist, GraphEdges const& edges)
{
DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphChain constructed in thread:%1\n", pthread_name ()));

35
libs/ardour/rt_task.cc Normal file
View File

@ -0,0 +1,35 @@
/*
* Copyright (C) 2017,2022 Robin Gareus <robin@gareus.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "ardour/graph.h"
#include "ardour/rt_task.h"
using namespace ARDOUR;
RTTask::RTTask (Graph* g, boost::function<void ()> const& fn)
: _f (fn)
, _graph (g)
{
}
void
RTTask::run (GraphChain const*)
{
_f ();
_graph->reached_terminal_node ();
}

View File

@ -16,171 +16,32 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <cstring>
#include "pbd/g_atomic_compat.h"
#include "pbd/pthread_utils.h"
#include "ardour/audioengine.h"
#include "ardour/debug.h"
#include "ardour/graph.h"
#include "ardour/rt_tasklist.h"
#include "ardour/utils.h"
#include "pbd/i18n.h"
using namespace ARDOUR;
RTTaskList::RTTaskList ()
: _task_run_sem ("rt_task_run", 0)
, _task_end_sem ("rt_task_done", 0)
, _n_tasks (0)
, _m_tasks (0)
, _tasks (256)
RTTaskList::RTTaskList (boost::shared_ptr<Graph> process_graph)
: _graph (process_graph)
{
g_atomic_int_set (&_threads_active, 0);
reset_thread_list ();
}
RTTaskList::~RTTaskList ()
{
drop_threads ();
}
void
RTTaskList::drop_threads ()
{
g_atomic_int_set (&_threads_active, 0);
uint32_t nt = _threads.size ();
for (uint32_t i = 0; i < nt; ++i) {
_task_run_sem.signal ();
}
for (auto const& i : _threads) {
pthread_join (i, NULL);
}
_threads.clear ();
_task_run_sem.reset ();
_task_end_sem.reset ();
}
/*static*/ void*
RTTaskList::_thread_run (void* arg)
{
RTTaskList* d = static_cast<RTTaskList*> (arg);
pbd_mach_set_realtime_policy (pthread_self (), 5. * 1e-5, false);
char name[64];
snprintf (name, 64, "RTTask-%p", (void*)DEBUG_THREAD_SELF);
pthread_set_name (name);
/* TODO: join macOS workgroup (needs backend API update).
* also rt-tasks need to be re-initialized when the engine is restarted
*/
d->run ();
// TODO: leave macOS workgroup
pthread_exit (0);
return 0;
}
void
RTTaskList::reset_thread_list ()
{
drop_threads ();
const uint32_t num_threads = how_many_dsp_threads ();
if (num_threads < 2) {
return;
}
g_atomic_int_set (&_threads_active, 1);
for (uint32_t i = 0; i < num_threads; ++i) {
int rv = 1;
pthread_t thread_id;
if (AudioEngine::instance ()->is_realtime ()) {
rv = pbd_realtime_pthread_create (PBD_SCHED_FIFO, AudioEngine::instance ()->client_real_time_priority (), PBD_RT_STACKSIZE_HELP, &thread_id, _thread_run, this);
}
if (rv) {
rv = pbd_pthread_create (PBD_RT_STACKSIZE_HELP, &thread_id, _thread_run, this);
}
if (rv) {
PBD::fatal << _("Cannot create thread for TaskList!") << " (" << strerror (rv) << ")" << endmsg;
/* NOT REACHED */
}
_threads.push_back (thread_id);
}
}
void
RTTaskList::run ()
{
bool wait = true;
while (true) {
if (wait) {
_task_run_sem.wait ();
}
if (0 == g_atomic_int_get (&_threads_active)) {
_task_end_sem.signal ();
break;
}
wait = false;
boost::function<void ()> to_run;
if (_tasks.pop_front (to_run)) {
to_run ();
continue;
}
if (!wait) {
_task_end_sem.signal ();
}
wait = true;
}
_tasks.reserve (256);
}
void
RTTaskList::push_back (boost::function<void ()> fn)
{
if (!_tasks.push_back (fn)) {
fn ();
} else {
++_n_tasks;
}
++_m_tasks;
_tasks.push_back (RTTask (_graph.get(), fn));
}
void
RTTaskList::process ()
{
if (0 == g_atomic_int_get (&_threads_active) || _threads.size () == 0) {
boost::function<void ()> to_run;
while (_tasks.pop_front (to_run)) {
to_run ();
--_n_tasks;
if (_graph->n_threads () > 1 && _tasks.size () > 2) {
_graph->process_tasklist (*this);
} else {
for (auto const& fn : _tasks) {
fn._f ();
}
assert (_n_tasks == 0);
_n_tasks = 0;
return;
}
uint32_t nt = std::min (_threads.size (), _n_tasks);
for (uint32_t i = 0; i < nt; ++i) {
_task_run_sem.signal ();
}
for (uint32_t i = 0; i < nt; ++i) {
_task_end_sem.wait ();
}
/* re-allocate queue if needed */
if (_tasks.capacity () < _m_tasks) {
_tasks.reserve (_m_tasks);
}
_n_tasks = 0;
_m_tasks = 0;
_tasks.clear ();
}

View File

@ -552,8 +552,8 @@ Session::immediately_post_engine ()
* session or set state for an existing one.
*/
_rt_tasklist.reset (new RTTaskList ());
_process_graph.reset (new Graph (*this));
_rt_tasklist.reset (new RTTaskList (_process_graph));
/* every time we reconnect, recompute worst case output latencies */

View File

@ -760,10 +760,7 @@ Session::process_audition (pframes_t nframes)
boost::shared_ptr<GraphChain> graph_chain = _graph_chain;
if (graph_chain) {
/* Ideally we'd use Session::rt_tasklist, since dependency is irrelevant.
* However the RTTaskList process threads have no ProcessThread buffers
* nor a SessioEvent thread_pool.
*/
/* Ideally we'd use Session::rt_tasklist, since dependency is irrelevant. */
_process_graph->silence_routes (graph_chain, nframes);
} else {
for (RouteList::iterator i = r->begin(); i != r->end(); ++i) {

View File

@ -210,6 +210,7 @@ libardour_sources = [
'route_group.cc',
'route_group_member.cc',
'rb_effect.cc',
'rt_task.cc',
'rt_tasklist.cc',
'scene_change.cc',
'search_paths.cc',