13
0

Fix concurrency issue when adding/removing ports of latent plugins

This fixes an issue when changing port-configuration
or toggle strict-i/o of latent multi-out plugins.

`Session::auto_connect_thread_run` can run concurrently
while ports are added/removed from the GUI thread.
The latter invalidates IO::_port PortSet, while
the former thread iterates over ports.

This also changes the IO-Mutex into a RWLock to allow
concurrent access when possible.
This commit is contained in:
Robin Gareus 2023-03-22 22:03:49 +01:00
parent 5453e6f1f7
commit 197157ecf8
Signed by: rgareus
GPG Key ID: A090BCE02CF57F04
2 changed files with 26 additions and 17 deletions

View File

@ -206,7 +206,7 @@ protected:
bool _sendish; bool _sendish;
private: private:
mutable Glib::Threads::Mutex io_lock; mutable Glib::Threads::RWLock _io_lock;
PortSet _ports; PortSet _ports;
void reestablish_port_subscriptions (); void reestablish_port_subscriptions ();

View File

@ -99,11 +99,10 @@ IO::IO (Session& s, const XMLNode& node, DataType dt, bool sendish)
IO::~IO () IO::~IO ()
{ {
Glib::Threads::Mutex::Lock lm (io_lock);
DEBUG_TRACE (DEBUG::Ports, string_compose ("IO %1 unregisters %2 ports\n", name(), _ports.num_ports())); DEBUG_TRACE (DEBUG::Ports, string_compose ("IO %1 unregisters %2 ports\n", name(), _ports.num_ports()));
BLOCK_PROCESS_CALLBACK (); BLOCK_PROCESS_CALLBACK ();
Glib::Threads::RWLock::WriterLock wl (_io_lock);
for (PortSet::iterator i = _ports.begin(); i != _ports.end(); ++i) { for (PortSet::iterator i = _ports.begin(); i != _ports.end(); ++i) {
_session.engine().unregister_port (*i); _session.engine().unregister_port (*i);
@ -122,12 +121,13 @@ IO::connection_change (boost::shared_ptr<Port> a, boost::shared_ptr<Port> b)
we assume that its safely locked by our own ::disconnect(). we assume that its safely locked by our own ::disconnect().
*/ */
Glib::Threads::Mutex::Lock tm (io_lock, Glib::Threads::TRY_LOCK); Glib::Threads::RWLock::WriterLock wl (_io_lock, Glib::Threads::TRY_LOCK);
if (tm.locked()) { if (wl.locked()) {
/* we took the lock, so we cannot be here from inside /* we took the lock, so we cannot be here from inside
* ::disconnect() * ::disconnect()
*/ */
wl.release (); // release lock before emitting signal
if (_ports.contains (a) || _ports.contains (b)) { if (_ports.contains (a) || _ports.contains (b)) {
changed (IOChange (IOChange::ConnectionsChanged), this); /* EMIT SIGNAL */ changed (IOChange (IOChange::ConnectionsChanged), this); /* EMIT SIGNAL */
} }
@ -158,7 +158,7 @@ IO::disconnect (boost::shared_ptr<Port> our_port, string other_port, void* src)
} }
{ {
Glib::Threads::Mutex::Lock lm (io_lock); Glib::Threads::RWLock::ReaderLock rl (_io_lock);
/* check that our_port is really one of ours */ /* check that our_port is really one of ours */
@ -192,7 +192,7 @@ IO::connect (boost::shared_ptr<Port> our_port, string other_port, void* src)
} }
{ {
Glib::Threads::Mutex::Lock lm (io_lock); Glib::Threads::RWLock::ReaderLock rl (_io_lock);
/* check that our_port is really one of ours */ /* check that our_port is really one of ours */
@ -247,7 +247,7 @@ IO::remove_port (boost::shared_ptr<Port> port, void* src)
BLOCK_PROCESS_CALLBACK (); BLOCK_PROCESS_CALLBACK ();
{ {
Glib::Threads::Mutex::Lock lm (io_lock); Glib::Threads::RWLock::WriterLock wl (_io_lock);
if (_ports.remove(port)) { if (_ports.remove(port)) {
change.type = IOChange::Type (change.type | IOChange::ConfigurationChanged); change.type = IOChange::Type (change.type | IOChange::ConfigurationChanged);
@ -318,7 +318,7 @@ IO::add_port (string destination, void* src, DataType type)
{ {
Glib::Threads::Mutex::Lock lm (io_lock); Glib::Threads::RWLock::WriterLock wl (_io_lock);
/* Create a new port */ /* Create a new port */
@ -364,7 +364,7 @@ int
IO::disconnect (void* src) IO::disconnect (void* src)
{ {
{ {
Glib::Threads::Mutex::Lock lm (io_lock); Glib::Threads::RWLock::ReaderLock rl (_io_lock);
for (PortSet::iterator i = _ports.begin(); i != _ports.end(); ++i) { for (PortSet::iterator i = _ports.begin(); i != _ports.end(); ++i) {
i->disconnect_all (); i->disconnect_all ();
@ -487,7 +487,7 @@ IO::ensure_ports (ChanCount count, bool clear, void* src)
change.before = _ports.count (); change.before = _ports.count ();
{ {
Glib::Threads::Mutex::Lock im (io_lock); Glib::Threads::RWLock::WriterLock wl (_io_lock);
if (ensure_ports_locked (count, clear, changed)) { if (ensure_ports_locked (count, clear, changed)) {
return -1; return -1;
} }
@ -536,7 +536,7 @@ IO::state () const
{ {
XMLNode* node = new XMLNode (state_node_name); XMLNode* node = new XMLNode (state_node_name);
int n; int n;
Glib::Threads::Mutex::Lock lm (io_lock); Glib::Threads::RWLock::WriterLock wl (_io_lock);
node->set_property ("name", name()); node->set_property ("name", name());
node->set_property ("id", id ()); node->set_property ("id", id ());
@ -1237,7 +1237,7 @@ IO::latency () const
{ {
samplecnt_t max_latency = 0; samplecnt_t max_latency = 0;
/* io lock not taken - must be protected by other means */ Glib::Threads::RWLock::ReaderLock rl (_io_lock);
for (PortSet::const_iterator i = _ports.begin(); i != _ports.end(); ++i) { for (PortSet::const_iterator i = _ports.begin(); i != _ports.end(); ++i) {
samplecnt_t latency; samplecnt_t latency;
@ -1285,7 +1285,16 @@ IO::public_latency () const
samplecnt_t samplecnt_t
IO::connected_latency (bool for_playback) const IO::connected_latency (bool for_playback) const
{ {
/* io lock not taken - must be protected by other means */ /* may be called concurrently with processing via
*
* Session::auto_connect_thread_run ()
* -> Session::update_latency_compensation ()
* -> Session::update_route_latency ()
* -> Route::update_signal_latency ()
* -> IO::connected_latency ()
*/
Glib::Threads::RWLock::ReaderLock rl (_io_lock);
samplecnt_t max_latency = 0; samplecnt_t max_latency = 0;
bool connected = false; bool connected = false;
@ -1325,7 +1334,7 @@ IO::connect_ports_to_bundle (boost::shared_ptr<Bundle> c, bool exclusive,
BLOCK_PROCESS_CALLBACK (); BLOCK_PROCESS_CALLBACK ();
{ {
Glib::Threads::Mutex::Lock lm2 (io_lock); Glib::Threads::RWLock::ReaderLock rl (_io_lock);
if (exclusive) { if (exclusive) {
for (PortSet::iterator i = _ports.begin(); i != _ports.end(); ++i) { for (PortSet::iterator i = _ports.begin(); i != _ports.end(); ++i) {
@ -1347,7 +1356,7 @@ IO::disconnect_ports_from_bundle (boost::shared_ptr<Bundle> c, void* src)
BLOCK_PROCESS_CALLBACK (); BLOCK_PROCESS_CALLBACK ();
{ {
Glib::Threads::Mutex::Lock lm2 (io_lock); Glib::Threads::RWLock::ReaderLock rl (_io_lock);
c->disconnect (_bundle, _session.engine()); c->disconnect (_bundle, _session.engine());
@ -1738,6 +1747,6 @@ IO::physically_connected () const
bool bool
IO::has_port (boost::shared_ptr<Port> p) const IO::has_port (boost::shared_ptr<Port> p) const
{ {
Glib::Threads::Mutex::Lock lm (io_lock); Glib::Threads::RWLock::ReaderLock rl (_io_lock);
return _ports.contains (p); return _ports.contains (p);
} }