Generalize graph processing

This allows to pass any GraphChain to the Graph to process.
It removes the need to use a mutex to swap two dedicated
chains (setup-chain <> active-chain, pending-chain).

Also various special cases pertaining to graph interaction
while auditioning and route-deletion can be removed.

This also unconditionally creates a graph-thread for GraphChains
to be processed, even if the main callback uses a special-cased
sorted RouteList if there is only one process thread.
This commit is contained in:
Robin Gareus 2022-05-01 16:25:12 +02:00
parent 5fa7d481c1
commit f5b280a850
Signed by: rgareus
GPG Key ID: A090BCE02CF57F04
7 changed files with 286 additions and 312 deletions

View File

@ -30,9 +30,9 @@
#include <boost/shared_ptr.hpp>
#include "pbd/g_atomic_compat.h"
#include "pbd/mpmc_queue.h"
#include "pbd/semutils.h"
#include "pbd/g_atomic_compat.h"
#include "ardour/audio_backend.h"
#include "ardour/libardour_visibility.h"
@ -53,31 +53,38 @@ typedef boost::shared_ptr<GraphNode> node_ptr_t;
typedef std::list<node_ptr_t> node_list_t;
typedef std::set<node_ptr_t> node_set_t;
struct GraphChain {
GraphChain (GraphNodeList const&, GraphEdges const&);
~GraphChain ();
void dump () const;
bool plot (std::string const&) const;
node_list_t _nodes_rt;
/** Nodes that are not fed by any other nodes */
node_list_t _init_trigger_list;
/** The number of nodes that do not feed any other node */
int _n_terminal_nodes;
};
class LIBARDOUR_API Graph : public SessionHandleRef
{
public:
Graph (Session& session);
void trigger (GraphNode* n);
void rechain (GraphNodeList const&, GraphEdges const&);
bool plot (std::string const& file_name) const;
/* public API for use by session-process */
int process_routes (boost::shared_ptr<GraphChain> chain, pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool& need_butler);
int routes_no_roll (boost::shared_ptr<GraphChain> chain, pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool non_rt_pending);
void plot (int chain);
bool in_process_thread () const;
uint32_t n_threads () const;
/* called by GraphNode */
void trigger (GraphNode* n);
void reached_terminal_node ();
void helper_thread ();
int process_routes (pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool& need_butler);
int routes_no_roll (pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool non_rt_pending);
/* called by virtual GraphNode::process() */
void process_one_route (Route* route);
void clear_other_chain ();
void swap_process_chain ();
bool in_process_thread () const;
protected:
virtual void session_going_away ();
@ -87,10 +94,8 @@ private:
void run_one ();
void main_thread ();
void prep ();
void dump (int chain) const;
node_list_t _nodes_rt[2];
node_list_t _init_trigger_list[2];
void helper_thread ();
PBD::MPMCQueue<GraphNode*> _trigger_queue; ///< nodes that can be processed
GATOMIC_QUAL guint _trigger_queue_size; ///< number of entries in trigger-queue
@ -108,9 +113,7 @@ private:
/** The number of unprocessed nodes that do not feed any other node; updated during processing */
GATOMIC_QUAL guint _terminal_refcnt;
/** The initial number of nodes that do not feed any other node (for each chain) */
guint _n_terminal_nodes[2];
bool _graph_empty;
bool _graph_empty;
/* number of background worker threads >= 0 */
GATOMIC_QUAL guint _n_workers;
@ -118,13 +121,8 @@ private:
/* flag to terminate background threads */
GATOMIC_QUAL gint _terminate;
/* chain swapping */
Glib::Threads::Cond _cleanup_cond;
mutable Glib::Threads::Mutex _swap_mutex;
GATOMIC_QUAL gint _current_chain;
GATOMIC_QUAL gint _pending_chain;
GATOMIC_QUAL gint _setup_chain;
/* graph chain */
GraphChain const* _graph_chain;
/* parameter caches */
pframes_t _process_nframes;
@ -142,6 +140,6 @@ private:
void engine_stopped ();
};
} // namespace
} // namespace ARDOUR
#endif /* __ardour_graph_h__ */

View File

@ -22,17 +22,21 @@
#define __ardour_graphnode_h__
#include <list>
#include <map>
#include <set>
#include <vector>
#include <boost/shared_ptr.hpp>
#include "pbd/g_atomic_compat.h"
#include "pbd/rcu.h"
#include "ardour/libardour_visibility.h"
namespace ARDOUR
{
class Graph;
class GraphNode;
struct GraphChain;
typedef boost::shared_ptr<GraphNode> node_ptr_t;
typedef std::set<node_ptr_t> node_set_t;
@ -40,12 +44,23 @@ typedef std::list<node_ptr_t> node_list_t;
class LIBARDOUR_API GraphActivision
{
public:
GraphActivision ();
virtual ~GraphActivision () {}
typedef std::map<GraphChain const*, node_set_t> ActivationMap;
typedef std::map<GraphChain const*, int> RefCntMap;
node_set_t const& activation_set (GraphChain const* const g) const;
int init_refcount (GraphChain const* const g) const;
protected:
friend class Graph;
friend struct GraphChain;
/** Nodes that we directly feed */
node_set_t _activation_set[2];
SerializedRCUManager<ActivationMap> _activation_set;
/** The number of nodes that we directly feed us (one count for each chain) */
gint _init_refcount[2];
SerializedRCUManager<RefCntMap> _init_refcount;
};
/** A node on our processing graph, ie a Route */
@ -53,19 +68,15 @@ class LIBARDOUR_API GraphNode : public GraphActivision
{
public:
GraphNode (boost::shared_ptr<Graph> Graph);
virtual ~GraphNode ();
void prep (int chain);
/* API used by Graph */
void prep (GraphChain const*);
void trigger ();
void run (GraphChain const* chain);
void
run (int chain)
{
process ();
finish (chain);
}
/* API used to sort Nodes and create GraphChain */
virtual std::string graph_node_name () const = 0;
virtual bool direct_feeds_according_to_reality (boost::shared_ptr<GraphNode>, bool* via_send_only = 0) = 0;
protected:
@ -74,11 +85,11 @@ protected:
boost::shared_ptr<Graph> _graph;
private:
void finish (int chain);
void finish (GraphChain const*);
GATOMIC_QUAL gint _refcount;
};
}
} // namespace ARDOUR
#endif

View File

@ -141,6 +141,7 @@ class CoreSelection;
class ExportHandler;
class ExportStatus;
class Graph;
struct GraphChain;
class IO;
class IOProcessor;
class ImportStatus;
@ -2253,7 +2254,8 @@ private:
*/
GraphEdges _current_route_graph;
boost::shared_ptr<Graph> _process_graph;
boost::shared_ptr<Graph> _process_graph;
boost::shared_ptr<GraphChain> _graph_chain;
void resort_routes_using (boost::shared_ptr<RouteList>);
bool rechain_process_graph (GraphNodeList&);

View File

@ -65,9 +65,7 @@ Graph::Graph (Session& session)
, _callback_start_sem ("graph_start", 0)
, _callback_done_sem ("graph_done", 0)
, _graph_empty (true)
, _current_chain (0)
, _pending_chain (0)
, _setup_chain (1)
, _graph_chain (0)
{
g_atomic_int_set (&_terminal_refcnt, 0);
g_atomic_int_set (&_terminate, 0);
@ -75,9 +73,6 @@ Graph::Graph (Session& session)
g_atomic_int_set (&_idle_thread_cnt, 0);
g_atomic_int_set (&_trigger_queue_size, 0);
_n_terminal_nodes[0] = 0;
_n_terminal_nodes[1] = 0;
/* pre-allocate memory */
_trigger_queue.reserve (1024);
@ -111,9 +106,6 @@ Graph::reset_thread_list ()
uint32_t num_threads = how_many_dsp_threads ();
guint n_workers = g_atomic_uint_get (&_n_workers);
/* For now, we shouldn't be using the graph code if we only have 1 DSP thread */
assert (num_threads > 1);
/* don't bother doing anything here if we already have the right
* number of threads.
*/
@ -146,25 +138,26 @@ Graph::reset_thread_list ()
}
}
uint32_t
Graph::n_threads () const
{
return 1 + g_atomic_uint_get (&_n_workers);
}
void
Graph::session_going_away ()
{
drop_threads ();
// now drop all references on the nodes.
_nodes_rt[0].clear ();
_nodes_rt[1].clear ();
_init_trigger_list[0].clear ();
_init_trigger_list[1].clear ();
/* now drop all references on the nodes. */
g_atomic_int_set (&_trigger_queue_size, 0);
_trigger_queue.clear ();
_graph_chain = 0;
}
void
Graph::drop_threads ()
{
Glib::Threads::Mutex::Lock ls (_swap_mutex);
/* Flag threads to terminate */
g_atomic_int_set (&_terminate, 1);
@ -203,90 +196,27 @@ Graph::drop_threads ()
#endif
}
/* special case route removal -- called from Session::remove_routes */
void
Graph::clear_other_chain ()
{
Glib::Threads::Mutex::Lock ls (_swap_mutex);
while (1) {
if (_setup_chain != _pending_chain) {
for (node_list_t::iterator ni = _nodes_rt[_setup_chain].begin (); ni != _nodes_rt[_setup_chain].end (); ++ni) {
(*ni)->_activation_set[_setup_chain].clear ();
}
_nodes_rt[_setup_chain].clear ();
_init_trigger_list[_setup_chain].clear ();
break;
}
/* setup chain == pending chain - we have
* to wait till this is no longer true.
*/
_cleanup_cond.wait (_swap_mutex);
}
}
void
Graph::swap_process_chain ()
{
/* Intended to be called Session::process_audition.
* Must not be called while the graph is processing.
*/
if (_swap_mutex.trylock ()) {
/* swap mutex acquired */
if (_current_chain != _pending_chain) {
DEBUG_TRACE (DEBUG::Graph, string_compose ("Graph::swap_process_chain = %1)\n", _pending_chain));
/* use new chain */
_setup_chain = _current_chain;
_current_chain = _pending_chain;
printf ("Graph::swap to %d\n", _current_chain);
_trigger_queue.clear ();
/* ensure that all nodes can be queued */
_trigger_queue.reserve (_nodes_rt[_current_chain].size ());
g_atomic_int_set (&_trigger_queue_size, 0);
_cleanup_cond.signal ();
}
_swap_mutex.unlock ();
}
}
void
Graph::prep ()
{
if (_swap_mutex.trylock ()) {
/* swap mutex acquired */
if (_current_chain != _pending_chain) {
/* use new chain */
DEBUG_TRACE (DEBUG::Graph, string_compose ("Graph::prep chain = %1)\n", _pending_chain));
_setup_chain = _current_chain;
_current_chain = _pending_chain;
/* ensure that all nodes can be queued */
_trigger_queue.reserve (_nodes_rt[_current_chain].size ());
assert (g_atomic_uint_get (&_trigger_queue_size) == 0);
_cleanup_cond.signal ();
}
_swap_mutex.unlock ();
}
assert (_graph_chain);
_graph_empty = true;
int chain = _current_chain;
node_list_t::iterator i;
for (i = _nodes_rt[chain].begin (); i != _nodes_rt[chain].end (); ++i) {
(*i)->prep (chain);
for (auto const& i : _graph_chain->_nodes_rt) {
i->prep (_graph_chain);
_graph_empty = false;
}
assert (g_atomic_uint_get (&_trigger_queue_size) == 0);
assert (_graph_empty != (_n_terminal_nodes[chain] > 0));
assert (_graph_empty != (_graph_chain->_n_terminal_nodes > 0));
g_atomic_int_set (&_terminal_refcnt, _n_terminal_nodes[chain]);
g_atomic_int_set (&_terminal_refcnt, _graph_chain->_n_terminal_nodes);
/* Trigger the initial nodes for processing, which are the ones at the `input' end */
for (i = _init_trigger_list[chain].begin (); i != _init_trigger_list[chain].end (); i++) {
for (auto const& i : _graph_chain->_init_trigger_list) {
g_atomic_int_inc (&_trigger_queue_size);
_trigger_queue.push_back (i->get ());
_trigger_queue.push_back (i.get ());
}
}
@ -339,7 +269,7 @@ Graph::reached_terminal_node ()
* - Reset terminal reference count
* - queue initial nodes
*/
prep ();
prep (); // XXX
if (_graph_empty && !g_atomic_int_get (&_terminate)) {
goto again;
@ -348,76 +278,6 @@ Graph::reached_terminal_node ()
}
}
/** Rechain our stuff using a list of routes (which can be in any order) and
* a directed graph of their interconnections, which is guaranteed to be
* acyclic.
*/
void
Graph::rechain (GraphNodeList const& nodelist, GraphEdges const& edges)
{
Glib::Threads::Mutex::Lock ls (_swap_mutex);
int chain = _setup_chain;
DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1 (current = %2 pending = %3) thread %4\n", chain, _current_chain, _pending_chain, pthread_name()));
/* This will become the number of nodes that do not feed any other node;
* once we have processed this number of those nodes, we have finished.
*/
_n_terminal_nodes[chain] = 0;
/* This will become a list of nodes that are not fed by another node, ie
* those at the `input' end.
*/
_init_trigger_list[chain].clear ();
_nodes_rt[chain].clear ();
/* Clear things out, and make _nodes_rt[chain] a copy of nodelist */
for (auto const& ri : nodelist) {
ri->_init_refcount[chain] = 0;
ri->_activation_set[chain].clear ();
_nodes_rt[chain].push_back (ri);
}
// now add refs for the connections.
for (auto const& ni : _nodes_rt[chain]) {
/* The routes that are directly fed by r */
set<GraphVertex> fed_from_r = edges.from (ni);
/* Hence whether r has an output */
bool const has_output = !fed_from_r.empty ();
/* Set up r's activation set */
for (set<GraphVertex>::iterator i = fed_from_r.begin (); i != fed_from_r.end (); ++i) {
ni->_activation_set[chain].insert (*i);
}
/* ni has an input if there are some incoming edges to r in the graph */
bool const has_input = !edges.has_none_to (ni);
/* Increment the refcount of any route that we directly feed */
for (auto const& ai : ni->_activation_set[chain]) {
ai->_init_refcount[chain] += 1;
}
if (!has_input) {
/* no input, so this node needs to be triggered initially to get things going */
_init_trigger_list[chain].push_back (ni);
}
if (!has_output) {
/* no output, so this is one of the nodes that we can count off to decide
* if we've finished
*/
_n_terminal_nodes[chain] += 1;
}
}
_pending_chain = chain;
dump (chain);
}
/** Called by both the main thread and all helpers. */
void
Graph::run_one ()
@ -470,7 +330,7 @@ Graph::run_one ()
/* Process the graph-node */
g_atomic_int_dec_and_test (&_trigger_queue_size);
to_run->run (_current_chain);
to_run->run (_graph_chain);
DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 has finished run_one()\n", pthread_name ()));
}
@ -562,82 +422,8 @@ again:
delete (pt);
}
void
Graph::dump (int chain) const
{
#ifndef NDEBUG
node_list_t::const_iterator ni;
node_set_t::const_iterator ai;
chain = _pending_chain;
DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
for (auto const& ni : _nodes_rt[chain]) {
DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1 refcount: %2\n", ni->graph_node_name (), ni->_init_refcount[chain]));
for (auto const& ai : ni->_activation_set[chain]) {
DEBUG_TRACE (DEBUG::Graph, string_compose (" triggers: %1\n", ai->graph_node_name ()));
}
}
DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
for (auto const& ni : _init_trigger_list[chain]) {
DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1 refcount: %2\n", ni->graph_node_name (), ni->_init_refcount[chain]));
}
DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _n_terminal_nodes[chain]));
#endif
}
bool
Graph::plot (std::string const& file_name) const
{
Glib::Threads::Mutex::Lock ls (_swap_mutex);
int chain = _current_chain;
node_list_t::const_iterator ni;
node_set_t::const_iterator ai;
stringstream ss;
ss << "digraph {\n";
ss << " node [shape = ellipse];\n";
for (auto const& ni : _nodes_rt[chain]) {
std::string sn = string_compose ("%1 (%2)", ni->graph_node_name (), ni->_init_refcount[chain]);
if (ni->_init_refcount[chain] == 0 && ni->_activation_set[chain].size() == 0) {
ss << " \"" << sn << "\"[style=filled,fillcolor=gold1];\n";
} else if (ni->_init_refcount[chain] == 0) {
ss << " \"" << sn << "\"[style=filled,fillcolor=lightskyblue1];\n";
} else if (ni->_activation_set[chain].size() == 0) {
ss << " \"" << sn << "\"[style=filled,fillcolor=aquamarine2];\n";
}
for (auto const& ai : ni->_activation_set[chain]) {
std::string dn = string_compose ("%1 (%2)", ai->graph_node_name (), ai->_init_refcount[chain]);
bool sends_only = false;
ni->direct_feeds_according_to_reality (ai, &sends_only);
if (sends_only) {
ss << " edge [style=dashed];\n";
}
ss << " \"" << sn << "\" -> \"" << dn << "\"\n";
if (sends_only) {
ss << " edge [style=solid];\n";
}
}
}
ss << "}\n";
GError *err = NULL;
if (!g_file_set_contents (file_name.c_str(), ss.str().c_str(), -1, &err)) {
if (err) {
error << string_compose (_("Could not graph to file (%1)"), err->message) << endmsg;
g_error_free (err);
}
return false;
}
return true;
}
int
Graph::process_routes (pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool& need_butler)
Graph::process_routes (boost::shared_ptr<GraphChain> chain, pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool& need_butler)
{
DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("graph execution from %1 to %2 = %3\n", start_sample, end_sample, nframes));
@ -645,6 +431,7 @@ Graph::process_routes (pframes_t nframes, samplepos_t start_sample, samplepos_t
return 0;
}
_graph_chain = chain.get ();
_process_nframes = nframes;
_process_start_sample = start_sample;
_process_end_sample = end_sample;
@ -664,7 +451,7 @@ Graph::process_routes (pframes_t nframes, samplepos_t start_sample, samplepos_t
}
int
Graph::routes_no_roll (pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool non_rt_pending)
Graph::routes_no_roll (boost::shared_ptr<GraphChain> chain, pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool non_rt_pending)
{
DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_sample, end_sample, nframes));
@ -672,6 +459,7 @@ Graph::routes_no_roll (pframes_t nframes, samplepos_t start_sample, samplepos_t
return 0;
}
_graph_chain = chain.get ();
_process_nframes = nframes;
_process_start_sample = start_sample;
_process_end_sample = end_sample;
@ -688,6 +476,7 @@ Graph::routes_no_roll (pframes_t nframes, samplepos_t start_sample, samplepos_t
return _process_retval;
}
void
Graph::process_one_route (Route* route)
{
@ -709,7 +498,7 @@ Graph::process_one_route (Route* route)
}
if (need_butler) {
_process_need_butler = true;
_process_need_butler = true; // -> atomic
}
}
@ -718,3 +507,151 @@ Graph::in_process_thread () const
{
return AudioEngine::instance ()->in_process_thread ();
}
/* ****************************************************************************/
GraphChain::GraphChain (GraphNodeList const& nodelist, GraphEdges const& edges)
{
DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphChain constructed in thread:%1\n", pthread_name ()));
/* This will become the number of nodes that do not feed any other node;
* once we have processed this number of those nodes, we have finished.
*/
_n_terminal_nodes = 0;
/* This will become a list of nodes that are not fed by another node, ie
* those at the `input' end.
*/
_init_trigger_list.clear ();
_nodes_rt.clear ();
/* copy nodelist to _nodes_rt, prepare GraphNodes for this graph */
for (auto const& ri : nodelist) {
RCUWriter<GraphActivision::ActivationMap> wa (ri->_activation_set);
RCUWriter<GraphActivision::RefCntMap> wr (ri->_init_refcount);
boost::shared_ptr<GraphActivision::ActivationMap> ma (wa.get_copy ());
boost::shared_ptr<GraphActivision::RefCntMap> mr (wr.get_copy ());
(*mr)[this] = 0;
(*ma)[this].clear ();
_nodes_rt.push_back (ri);
}
/* now add refs for the connections. */
for (auto const& ni : _nodes_rt) {
/* The nodes that are directly fed by ni */
set<GraphVertex> fed_from_r = edges.from (ni);
/* Hence whether ni has an output, or is otherwise a terminal node */
bool const has_output = !fed_from_r.empty ();
/* Set up ni's activation set */
if (has_output) {
boost::shared_ptr<GraphActivision::ActivationMap> m (ni->_activation_set.reader ());
for (auto const& i : fed_from_r) {
auto it = (*m)[this].insert (i);
assert (it.second);
/* Increment the refcount of any node that we directly feed */
boost::shared_ptr<GraphActivision::RefCntMap> a ((*it.first)->_init_refcount.reader ());
(*a)[this] += 1;
}
}
/* ni has an input if there are some incoming edges to r in the graph */
bool const has_input = !edges.has_none_to (ni);
if (!has_input) {
/* no input, so this node needs to be triggered initially to get things going */
_init_trigger_list.push_back (ni);
}
if (!has_output) {
/* no output, so this is one of the nodes that we can count off to decide
* if we've finished
*/
_n_terminal_nodes += 1;
}
}
dump ();
}
GraphChain::~GraphChain ()
{
/* clear chain */
DEBUG_TRACE (DEBUG::Graph, string_compose ("~GraphChain destroyed in thread:%1\n", pthread_name ()));
for (auto const& ni : _nodes_rt) {
RCUWriter<GraphActivision::ActivationMap> wa (ni->_activation_set);
RCUWriter<GraphActivision::RefCntMap> wr (ni->_init_refcount);
boost::shared_ptr<GraphActivision::ActivationMap> ma (wa.get_copy ());
boost::shared_ptr<GraphActivision::RefCntMap> mr (wr.get_copy ());
mr->erase (this);
ma->erase (this);
}
}
bool
GraphChain::plot (std::string const& file_name) const
{
node_list_t::const_iterator ni;
node_set_t::const_iterator ai;
stringstream ss;
ss << "digraph {\n";
ss << " node [shape = ellipse];\n";
for (auto const& ni : _nodes_rt) {
std::string sn = string_compose ("%1 (%2)", ni->graph_node_name (), ni->init_refcount (this));
if (ni->init_refcount (this) == 0 && ni->activation_set (this).size () == 0) {
ss << " \"" << sn << "\"[style=filled,fillcolor=gold1];\n";
} else if (ni->init_refcount (this) == 0) {
ss << " \"" << sn << "\"[style=filled,fillcolor=lightskyblue1];\n";
} else if (ni->activation_set (this).size () == 0) {
ss << " \"" << sn << "\"[style=filled,fillcolor=aquamarine2];\n";
}
for (auto const& ai : ni->activation_set (this)) {
std::string dn = string_compose ("%1 (%2)", ai->graph_node_name (), ai->init_refcount (this));
bool sends_only = false;
ni->direct_feeds_according_to_reality (ai, &sends_only);
if (sends_only) {
ss << " edge [style=dashed];\n";
}
ss << " \"" << sn << "\" -> \"" << dn << "\"\n";
if (sends_only) {
ss << " edge [style=solid];\n";
}
}
}
ss << "}\n";
GError* err = NULL;
if (!g_file_set_contents (file_name.c_str (), ss.str ().c_str (), -1, &err)) {
if (err) {
error << string_compose (_("Could not graph to file (%1)"), err->message) << endmsg;
g_error_free (err);
}
return false;
}
return true;
}
void
GraphChain::dump () const
{
#ifndef NDEBUG
DEBUG_TRACE (DEBUG::Graph, "--8<-- Graph dump ----------------------------\n");
for (auto const& ni : _nodes_rt) {
DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1 refcount: %2\n", ni->graph_node_name (), ni->init_refcount (this)));
for (auto const& ai : ni->activation_set (this)) {
DEBUG_TRACE (DEBUG::Graph, string_compose (" triggers: %1\n", ai->graph_node_name ()));
}
}
DEBUG_TRACE (DEBUG::Graph, " --- trigger list ---\n");
for (auto const& ni : _init_trigger_list) {
DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1 refcount: %2\n", ni->graph_node_name (), ni->init_refcount (this)));
}
DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _n_terminal_nodes));
DEBUG_TRACE (DEBUG::Graph, "-->8-- END Graph dump ------------------------\n");
#endif
}

View File

@ -18,27 +18,52 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "ardour/graph.h"
#include "ardour/graphnode.h"
#include "ardour/graph.h"
#include "ardour/route.h"
using namespace ARDOUR;
GraphActivision::GraphActivision ()
: _activation_set (new ActivationMap)
, _init_refcount (new RefCntMap)
{
}
node_set_t const&
GraphActivision::activation_set (GraphChain const* const g) const
{
boost::shared_ptr<ActivationMap> m (_activation_set.reader ());
return m->at (g);
}
int
GraphActivision::init_refcount (GraphChain const* const g) const
{
boost::shared_ptr<RefCntMap> m (_init_refcount.reader ());
return m->at (g);
}
/* ****************************************************************************/
GraphNode::GraphNode (boost::shared_ptr<Graph> graph)
: _graph (graph)
{
g_atomic_int_set (&_refcount, 0);
}
GraphNode::~GraphNode ()
void
GraphNode::prep (GraphChain const* chain)
{
/* This is the number of nodes that directly feed us */
g_atomic_int_set (&_refcount, init_refcount (chain));
}
void
GraphNode::prep (int chain)
GraphNode::run (GraphChain const* chain)
{
/* This is the number of nodes that directly feed us */
g_atomic_int_set (&_refcount, _init_refcount[chain]);
process ();
finish (chain);
}
/** Called by an upstream node, when it has completed processing */
@ -57,14 +82,14 @@ GraphNode::trigger ()
}
void
GraphNode::finish (int chain)
GraphNode::finish (GraphChain const* chain)
{
node_set_t::iterator i;
bool feeds = false;
/* Notify downstream nodes that depend on this node */
for (i = _activation_set[chain].begin (); i != _activation_set[chain].end (); ++i) {
(*i)->trigger ();
for (auto const& i : activation_set (chain)) {
i->trigger ();
feeds = true;
}

View File

@ -547,13 +547,7 @@ Session::immediately_post_engine ()
*/
_rt_tasklist.reset (new RTTaskList ());
if (how_many_dsp_threads () > 1) {
/* For now, only create the graph if we are using >1 DSP threads, as
it is a bit slower than the old code with 1 thread.
*/
_process_graph.reset (new Graph (*this));
}
_process_graph.reset (new Graph (*this));
/* every time we reconnect, recompute worst case output latencies */
@ -676,6 +670,9 @@ Session::destroy ()
/* reset dynamic state version back to default */
Stateful::loading_state_version = 0;
/* drop GraphNode references */
_graph_chain.reset ();
_butler->drop_references ();
delete _butler;
_butler = 0;
@ -2218,8 +2215,18 @@ Session::rechain_process_graph (GraphNodeList& g)
* Note: the process graph chain does not require a
* topologically-sorted list, but hey ho.
*/
if (_process_graph) {
_process_graph->rechain (g, edges);
if (_process_graph->n_threads () > 1) {
/* Ideally we'd use a memory pool to allocate the GraphChain, however node_lists
* inside the change are STL list/set. It was never rt-safe to re-chain the graph.
* Furthermore graph-changes are usually caused by connection changes, which are not
* rt-safe either.
*
* However, the graph-chain may be in use (session process), and the last reference
* be helf by the process-callback. So we delegate deletion to the butler thread.
*/
_graph_chain = boost::shared_ptr<GraphChain> (new GraphChain (g, edges), boost::bind (&rt_safe_delete<GraphChain>, this, _1));
} else {
_graph_chain.reset ();
}
_current_route_graph = edges;
@ -3119,7 +3126,7 @@ Session::add_routes_inner (RouteList& new_routes, bool input_auto_connect, bool
* we will resort when done.
*/
if (!_monitor_out && !loading()) {
if (!_monitor_out && !loading() && !input_auto_connect && !output_auto_connect) {
resort_routes_using (r);
}
}
@ -3444,10 +3451,6 @@ Session::remove_routes (boost::shared_ptr<RouteList> routes_to_remove)
routes.flush (); // maybe unsafe, see below.
resort_routes ();
if (_process_graph && !deletion_in_progress() && _engine.running()) {
_process_graph->clear_other_chain ();
}
/* get rid of it from the dead wood collection in the route list manager */
/* XXX i think this is unsafe as it currently stands, but i am not sure. (pd, october 2nd, 2006) */
@ -6115,7 +6118,7 @@ Session::nstripables (bool with_monitor) const
bool
Session::plot_process_graph (std::string const& file_name) const {
return _process_graph ? _process_graph->plot (file_name) : false;
return _graph_chain ? _graph_chain->plot (file_name) : false;
}
void

View File

@ -203,9 +203,10 @@ Session::no_roll (pframes_t nframes)
_global_locate_pending = locate_pending ();
if (_process_graph) {
boost::shared_ptr<GraphChain> graph_chain = _graph_chain;
if (graph_chain) {
DEBUG_TRACE(DEBUG::ProcessThreads,"calling graph/no-roll\n");
_process_graph->routes_no_roll( nframes, _transport_sample, end_sample, non_realtime_work_pending());
_process_graph->routes_no_roll(graph_chain, nframes, _transport_sample, end_sample, non_realtime_work_pending());
} else {
PT_TIMING_CHECK (10);
for (RouteList::iterator i = r->begin(); i != r->end(); ++i) {
@ -250,9 +251,10 @@ Session::process_routes (pframes_t nframes, bool& need_butler)
_global_locate_pending = locate_pending();
if (_process_graph) {
boost::shared_ptr<GraphChain> graph_chain = _graph_chain;
if (graph_chain) {
DEBUG_TRACE(DEBUG::ProcessThreads,"calling graph/process-routes\n");
if (_process_graph->process_routes (nframes, start_sample, end_sample, need_butler) < 0) {
if (_process_graph->process_routes (graph_chain, nframes, start_sample, end_sample, need_butler) < 0) {
stop_transport ();
return -1;
}
@ -747,10 +749,6 @@ Session::process_audition (pframes_t nframes)
}
}
if (_process_graph) {
_process_graph->swap_process_chain ();
}
/* handle pending events */
while (pending_events.read (&ev, 1) == 1) {