13
0

Prepare buffer for seeking

Keep track of safe reservation:
Data has been read (or was skipped) previously can be read again
up to the allocated "reservation" (which is never overwritten).
This commit is contained in:
Robin Gareus 2019-02-06 19:00:15 +01:00
parent b3fda6236a
commit 6975b5ca54
Signed by: rgareus
GPG Key ID: A090BCE02CF57F04

View File

@ -34,7 +34,7 @@ class /*LIBPBD_API*/ PlaybackBuffer
public:
PlaybackBuffer (int32_t sz, guint res = 8191)
: reservation (res)
, _writepos_lock ()
, _reservation_lock ()
{
sz += reservation;
@ -46,24 +46,28 @@ public:
buf = new T[size];
read_idx = 0;
reset (0);
reset ();
}
virtual ~PlaybackBuffer () {
delete [] buf;
}
/* non-linear write needs to reset() the buffer and set the
* position that write() will commence at */
void reset (int64_t start = 0) {
/* init (mlock) */
T *buffer () { return buf; }
/* init (mlock) */
guint bufsize () const { return size; }
/* write-thread */
void reset () {
/* writer, when seeking, may block */
Glib::Threads::Mutex::Lock lm (_reset_lock);
SpinLock sl (_writepos_lock);
write_pos = start;
SpinLock sl (_reservation_lock);
g_atomic_int_set (&write_idx, g_atomic_int_get (&read_idx));
g_atomic_int_set (&reserved, 0);
}
/* write-thread */
guint write_space () const {
guint w, r;
@ -92,6 +96,7 @@ public:
return 0;
}
/* read-thread */
guint read_space () const {
guint w, r;
@ -105,37 +110,77 @@ public:
}
}
/* read-thead */
guint read (T *dest, guint cnt, bool commit = true);
/* write-thead */
guint write (T const * src, guint cnt);
/* write-thead */
guint write_zero (guint cnt);
T *buffer () { return buf; }
guint bufsize () const { return size; }
guint get_write_idx () const { return g_atomic_int_get (&write_idx); }
guint get_read_idx () const { return g_atomic_int_get (&read_idx); }
void read_flush () { g_atomic_int_set (&read_idx, g_atomic_int_get (&write_idx)); }
void increment_read_ptr (guint cnt) {
cnt = std::min (cnt, read_space ());
g_atomic_int_set (&read_idx, (g_atomic_int_get (&read_idx) + cnt) & size_mask);
/* read-thead */
void read_flush ()
{
SpinLock sl (_reservation_lock);
g_atomic_int_set (&read_idx, g_atomic_int_get (&write_idx));
g_atomic_int_set (&reserved, 0);
}
protected:
/* read-thead */
guint decrement_read_ptr (guint cnt)
{
SpinLock sl (_reservation_lock);
guint r = g_atomic_int_get (&read_idx);
guint res = g_atomic_int_get (&reserved);
cnt = std::min (cnt, res);
r = (r + size - cnt) & size_mask;
res -= cnt;
g_atomic_int_set (&read_idx, r);
g_atomic_int_set (&reserved, res);
return cnt;
}
/* read-thead */
guint increment_read_ptr (guint cnt)
{
cnt = std::min (cnt, read_space ());
SpinLock sl (_reservation_lock);
g_atomic_int_set (&read_idx, (g_atomic_int_get (&read_idx) + cnt) & size_mask);
g_atomic_int_set (&reserved, std::min (reservation, g_atomic_int_get (&reserved) + cnt));
return cnt;
}
/* read-thead */
bool can_seek (int cnt) {
if (cnt > 0) {
return read_space() >= cnt;
}
else if (cnt < 0) {
return g_atomic_int_get (&reserved) >= cnt;
}
else {
return true;
}
}
private:
T *buf;
guint reservation;
guint size;
guint size_mask;
int64_t write_pos; // samplepos_t
mutable gint write_idx; // corresponds to (write_pos)
mutable gint write_idx;
mutable gint read_idx;
mutable gint reserved;
private:
/* spinlock will be used to update write_pos and write_idx in sync */
mutable spinlock_t _writepos_lock;
/* spinlock will be used to update write_idx and reserved in sync */
spinlock_t _reservation_lock;
/* reset_lock is used to prevent concurrent reading and reset (seek, transport reversal etc). */
Glib::Threads::Mutex _reset_lock;
};
@ -170,11 +215,7 @@ PlaybackBuffer<T>::write (T const *src, guint cnt)
w = n2;
}
{
SpinLock sl (_writepos_lock);
write_pos += to_write;
g_atomic_int_set (&write_idx, w);
}
g_atomic_int_set (&write_idx, w);
return to_write;
}
@ -208,11 +249,7 @@ PlaybackBuffer<T>::write_zero (guint cnt)
w = n2;
}
{
SpinLock sl (_writepos_lock);
write_pos += to_write;
g_atomic_int_set (&write_idx, w);
}
g_atomic_int_set (&write_idx, w);
return to_write;
}
@ -251,10 +288,11 @@ PlaybackBuffer<T>::read (T *dest, guint cnt, bool commit)
}
if (commit) {
/* set read-pointer to position of last read's end */
SpinLock sl (_reservation_lock);
g_atomic_int_set (&read_idx, r);
g_atomic_int_set (&reserved, std::min (reservation, g_atomic_int_get (&reserved) + to_read));
}
return cnt;
return to_read;
}
} /* end namespace */