diff --git a/libs/pbd/pbd/mpmc_queue.h b/libs/pbd/pbd/mpmc_queue.h new file mode 100644 index 0000000000..eda78473b8 --- /dev/null +++ b/libs/pbd/pbd/mpmc_queue.h @@ -0,0 +1,147 @@ +/* + * (C) 2017, 2019 Robin Gareus + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#ifndef _pbd_mpc_queue_h_ +#define _pbd_mpc_queue_h_ + +#include +#include +#include + +namespace PBD { + +/** Lock free multiple producer, multiple consumer queue + * + * inspired by http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue + * Kudos to Dmitry Vyukov + */ +template +class /*LIBPBD_API*/ MPMCQueue +{ +public: + MPMCQueue (size_t buffer_size = 8) + : _buffer (0) + , _buffer_mask (0) + { + reserve (buffer_size); + } + + ~MPMCQueue () + { + delete[] _buffer; + } + + static size_t + power_of_two_size (size_t sz) + { + int32_t power_of_two; + for (power_of_two = 1; 1U << power_of_two < sz; ++power_of_two) ; + return 1U << power_of_two; + } + + void + reserve (size_t buffer_size) + { + buffer_size = power_of_two_size (buffer_size); + assert ((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0)); + if (_buffer_mask >= buffer_size - 1) { + return; + } + delete[] _buffer; + _buffer = new cell_t[buffer_size]; + _buffer_mask = buffer_size - 1; + clear (); + } + + void + clear () + { + for (size_t i = 0; i <= _buffer_mask; ++i) { + g_atomic_int_set (&_buffer[i]._sequence, i); + } + g_atomic_int_set (&_enqueue_pos, 0); + g_atomic_int_set (&_dequeue_pos, 0); + } + + bool + push_back (T const& data) + { + cell_t* cell; + guint pos = g_atomic_int_get (&_enqueue_pos); + for (;;) { + cell = &_buffer[pos & _buffer_mask]; + guint seq = g_atomic_int_get (&cell->_sequence); + intptr_t dif = (intptr_t)seq - (intptr_t)pos; + if (dif == 0) { + if (g_atomic_int_compare_and_exchange (&_enqueue_pos, pos, pos + 1)) { + break; + } + } else if (dif < 0) { + assert (0); + return false; + } else { + pos = g_atomic_int_get (&_enqueue_pos); + } + } + + cell->_data = data; + g_atomic_int_set (&cell->_sequence, pos + 1); + return true; + } + + bool + pop_front (T& data) + { + cell_t* cell; + guint pos = g_atomic_int_get (&_dequeue_pos); + for (;;) { + cell = &_buffer[pos & _buffer_mask]; + guint seq = g_atomic_int_get (&cell->_sequence); + intptr_t dif = (intptr_t)seq - (intptr_t) (pos + 1); + if (dif == 0) { + if (g_atomic_int_compare_and_exchange (&_dequeue_pos, pos, pos + 1)) { + break; + } + } else if (dif < 0) { + return false; + } else { + pos = g_atomic_int_get (&_dequeue_pos); + } + } + + data = cell->_data; + g_atomic_int_set (&cell->_sequence, pos + _buffer_mask + 1); + return true; + } + +private: + struct cell_t { + volatile guint _sequence; + T _data; + }; + + cell_t* _buffer; + size_t _buffer_mask; + + volatile guint _enqueue_pos; + volatile guint _dequeue_pos; +}; + +} /* end namespace */ + +#endif