Commit 73d5a587 authored by Philippe Gerum's avatar Philippe Gerum
Browse files

evl/xbuf: fix rd/rd and wr/wr races



Concurrent reads or writes to an xbuf causing nested buffer space
reservations could end up in a miscalculation of the number of bytes
remaining to be read or written.
Signed-off-by: Philippe Gerum's avatarPhilippe Gerum <rpm@xenomai.org>
parent eee98654
......@@ -30,8 +30,10 @@ struct xbuf_ring {
size_t fillsz;
unsigned int rdoff;
unsigned int rdrsvd;
int rdpending;
unsigned int wroff;
unsigned int wrrsvd;
int wrpending;
int (*wait_input)(struct xbuf_ring *ring, size_t len);
void (*signal_input)(struct xbuf_ring *ring);
int (*wait_output)(struct xbuf_ring *ring, size_t len);
......@@ -145,6 +147,7 @@ static ssize_t do_xbuf_read(struct xbuf_ring *ring,
/* Reserve a read slot into the circular buffer. */
rdoff = ring->rdoff;
ring->rdoff = (rdoff + len) % ring->bufsz;
ring->rdpending++;
ring->rdrsvd += len;
rbytes = ret = len;
......@@ -173,18 +176,18 @@ static ssize_t do_xbuf_read(struct xbuf_ring *ring,
rdoff = (rdoff + n) % ring->bufsz;
} while (rbytes > 0);
if (ring->fillsz == ring->bufsz)
/* -> writable */
ring->signal_pollable(ring, POLLOUT|POLLWRNORM);
ring->rdrsvd -= len;
ring->fillsz -= len;
/*
* Wake up the thread heading the output wait queue if
* we freed enough room to post its message.
*/
ring->unblock_output(ring);
if (--ring->rdpending == 0) {
ring->fillsz -= ring->rdrsvd;
ring->rdrsvd = 0;
if (ring->fillsz == ring->bufsz)
/* -> writable */
ring->signal_pollable(ring, POLLOUT|POLLWRNORM);
/*
* Wake up the thread heading the output wait queue if
* we freed enough room to post its message.
*/
ring->unblock_output(ring);
}
goto out;
wait:
if (len > ring->bufsz)
......@@ -193,11 +196,12 @@ static ssize_t do_xbuf_read(struct xbuf_ring *ring,
* Check whether writers are already waiting for
* sending data, while we are about to wait for
* receiving some. In such a case, we have a
* pathological use of the buffer. We must allow for a
* short read to prevent a deadlock.
* pathological use of the buffer due to a
* miscalculated size. We must allow for a short read
* to prevent a deadlock.
*/
if (ring->fillsz > 0 && ring->in_output_contention(ring)) {
len = ring->fillsz;
if (avail > 0 && ring->in_output_contention(ring)) {
len = avail;
goto retry;
}
......@@ -248,6 +252,7 @@ static ssize_t do_xbuf_write(struct xbuf_ring *ring,
/* Reserve a write slot into the circular buffer. */
wroff = ring->wroff;
ring->wroff = (wroff + len) % ring->bufsz;
ring->wrpending++;
ring->wrrsvd += len;
wbytes = ret = len;
......@@ -279,14 +284,15 @@ static ssize_t do_xbuf_write(struct xbuf_ring *ring,
wroff = (wroff + n) % ring->bufsz;
} while (wbytes > 0);
ring->fillsz += len;
ring->wrrsvd -= len;
if (--ring->wrpending == 0) {
ring->fillsz += ring->wrrsvd;
ring->wrrsvd = 0;
if (ring->fillsz == len)
/* -> readable */
ring->signal_pollable(ring, POLLIN|POLLRDNORM);
if (ring->fillsz == len)
/* -> readable */
ring->signal_pollable(ring, POLLIN|POLLRDNORM);
ring->signal_input(ring);
ring->signal_input(ring);
}
goto out;
wait:
if (f_flags & O_NONBLOCK) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment