Commit f99ad375 authored by Philippe Gerum's avatar Philippe Gerum
Browse files

evl/proxy: add read side



A proxy should be usable for reading from an in-band file without
encurring a stage switch, just like it is for the output direction.

These changes introduce the EVL_CLONE_INPUT, EVL_CLONE_OUTPUT creation
flags for a proxy, which can be used in a mutually exclusive manner,
or combined in order to have a bidirectional channel to the target
file.

As a by-product, the output direction gains in-band poll() support.
Signed-off-by: Philippe Gerum's avatarPhilippe Gerum <rpm@xenomai.org>
parent 49a416ea
......@@ -18,7 +18,7 @@
* latter to the former. CAUTION: a litteral value is required for the
* current ABI definition (scripts reading this may be naive).
*/
#define EVL_ABI_LEVEL 23
#define EVL_ABI_LEVEL 24
#define EVL_CONTROL_DEV "/dev/evl/control"
......
......@@ -23,6 +23,8 @@ struct evl_element_ids {
#define EVL_CLONE_OBSERVABLE (1 << 17)
#define EVL_CLONE_NONBLOCK (1 << 18)
#define EVL_CLONE_MASTER (1 << 19)
#define EVL_CLONE_INPUT (1 << 20)
#define EVL_CLONE_OUTPUT (1 << 21)
#define EVL_CLONE_COREDEV (1 << 31)
#define EVL_CLONE_MASK (((__u32)-1 << 16) & ~EVL_CLONE_COREDEV)
......
......@@ -21,44 +21,64 @@
#include <evl/poll.h>
#include <uapi/evl/proxy.h>
#define EVL_PROXY_CLONE_FLAGS \
(EVL_CLONE_PUBLIC|EVL_CLONE_OUTPUT|EVL_CLONE_INPUT)
struct proxy_ring {
void *bufmem;
atomic_t fillsz;
int wrpending;
int nesting;
unsigned int bufsz;
unsigned int rdoff;
unsigned int wroff;
unsigned int fillrsvd;
unsigned int reserved;
unsigned int granularity;
};
struct proxy_out { /* oob_write->write */
struct evl_flag oob_drained;
wait_queue_head_t inband_drained;
struct evl_flag oob_wait;
wait_queue_head_t inband_wait;
struct evl_work relay_work;
hard_spinlock_t lock;
struct evl_poll_head poll_head;
struct proxy_ring ring;
struct workqueue_struct *wq;
struct mutex worker_lock;
};
struct proxy_out { /* oob_write->write */
struct proxy_ring ring;
};
struct proxy_in { /* read->oob_read */
struct proxy_ring ring;
atomic_t reqsz;
atomic_t on_eof;
int on_error;
};
struct evl_proxy {
struct file *filp;
struct proxy_out output;
struct proxy_in input;
struct evl_element element;
struct evl_poll_head poll_head;
};
static inline bool proxy_is_readable(struct evl_proxy *proxy)
{
return !!(proxy->element.clone_flags & EVL_CLONE_INPUT);
}
static inline bool proxy_is_writable(struct evl_proxy *proxy)
{
return !!(proxy->element.clone_flags & EVL_CLONE_OUTPUT);
}
static void relay_output(struct evl_proxy *proxy)
{
struct proxy_out *out = &proxy->output;
struct proxy_ring *ring = &out->ring;
struct proxy_ring *ring = &proxy->output.ring;
unsigned int rdoff, count, len, n;
struct file *filp = proxy->filp;
loff_t pos, *ppos;
ssize_t ret = 0;
mutex_lock(&out->worker_lock);
mutex_lock(&ring->worker_lock);
count = atomic_read(&ring->fillsz);
rdoff = ring->rdoff;
......@@ -88,7 +108,7 @@ static void relay_output(struct evl_proxy *proxy)
rdoff = (rdoff + n) % ring->bufsz;
} while (len > 0 && ret > 0);
/*
* On error, the portion we failed relaying is
* On error, the portion we failed writing is
* lost. Fair enough.
*/
count = atomic_sub_return(count, &ring->fillsz);
......@@ -99,46 +119,45 @@ static void relay_output(struct evl_proxy *proxy)
ring->rdoff = rdoff;
mutex_unlock(&out->worker_lock);
mutex_unlock(&ring->worker_lock);
/*
* For proxies, writability means that all pending data was
* sent out without error.
*/
if (count == 0)
evl_signal_poll_events(&out->poll_head, POLLOUT|POLLWRNORM);
evl_signal_poll_events(&proxy->poll_head, POLLOUT|POLLWRNORM);
/*
* Since we are running in-band, make sure to give precedence
* to oob waiters for wakeups.
*/
if (count < ring->bufsz) {
evl_raise_flag(&out->oob_drained); /* Reschedules. */
wake_up(&out->inband_drained);
evl_raise_flag(&ring->oob_wait); /* Reschedules. */
wake_up(&ring->inband_wait);
} else
evl_schedule(); /* Covers evl_signal_poll_events() */
}
static void relay_work(struct evl_work *work)
static void relay_output_work(struct evl_work *work)
{
struct evl_proxy *proxy = container_of(work, struct evl_proxy, output.relay_work);
struct evl_proxy *proxy =
container_of(work, struct evl_proxy, output.ring.relay_work);
relay_output(proxy);
}
static bool can_write_buffer(struct proxy_out *out, size_t size)
static bool can_write_buffer(struct proxy_ring *ring, size_t size)
{
struct proxy_ring *ring = &out->ring;
return atomic_read(&ring->fillsz) +
ring->fillrsvd + size <= ring->bufsz;
ring->reserved + size <= ring->bufsz;
}
static ssize_t do_proxy_write(struct file *filp,
const char __user *u_buf, size_t count)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_out *out = &proxy->output;
struct proxy_ring *ring = &out->ring;
struct proxy_ring *ring = &proxy->output.ring;
unsigned int wroff, wbytes, n, rsvd;
unsigned long flags;
ssize_t ret;
......@@ -153,10 +172,10 @@ static ssize_t do_proxy_write(struct file *filp,
if (ring->granularity > 1 && count % ring->granularity > 0)
return -EINVAL;
raw_spin_lock_irqsave(&out->lock, flags);
raw_spin_lock_irqsave(&ring->lock, flags);
/* No short or scattered writes. */
if (!can_write_buffer(out, count)) {
if (!can_write_buffer(ring, count)) {
ret = -EAGAIN;
goto out;
}
......@@ -164,8 +183,8 @@ static ssize_t do_proxy_write(struct file *filp,
/* Reserve a write slot into the circular buffer. */
wroff = ring->wroff;
ring->wroff = (wroff + count) % ring->bufsz;
ring->wrpending++;
ring->fillrsvd += count;
ring->nesting++;
ring->reserved += count;
wbytes = ret = count;
do {
......@@ -174,9 +193,9 @@ static ssize_t do_proxy_write(struct file *filp,
else
n = wbytes;
raw_spin_unlock_irqrestore(&out->lock, flags);
raw_spin_unlock_irqrestore(&ring->lock, flags);
xret = raw_copy_from_user(ring->bufmem + wroff, u_buf, n);
raw_spin_lock_irqsave(&out->lock, flags);
raw_spin_lock_irqsave(&ring->lock, flags);
if (xret) {
memset(ring->bufmem + wroff + n - xret, 0, xret);
ret = -EFAULT;
......@@ -188,21 +207,182 @@ static ssize_t do_proxy_write(struct file *filp,
wroff = (wroff + n) % ring->bufsz;
} while (wbytes > 0);
if (--ring->wrpending == 0) {
n = atomic_add_return(ring->fillrsvd, &ring->fillsz);
rsvd = ring->fillrsvd;
ring->fillrsvd = 0;
if (--ring->nesting == 0) {
n = atomic_add_return(ring->reserved, &ring->fillsz);
rsvd = ring->reserved;
ring->reserved = 0;
if (n == rsvd) { /* empty -> non-empty transition */
if (running_inband()) {
raw_spin_unlock_irqrestore(&out->lock, flags);
raw_spin_unlock_irqrestore(&ring->lock, flags);
relay_output(proxy);
return ret;
}
evl_call_inband_from(&out->relay_work, out->wq);
evl_call_inband_from(&ring->relay_work, ring->wq);
}
}
out:
raw_spin_unlock_irqrestore(&out->lock, flags);
raw_spin_unlock_irqrestore(&ring->lock, flags);
return ret;
}
static void relay_input(struct evl_proxy *proxy)
{
struct proxy_in *in = &proxy->input;
struct proxy_ring *ring = &in->ring;
unsigned int wroff, count, len, n;
struct file *filp = proxy->filp;
bool exception = false;
loff_t pos, *ppos;
ssize_t ret = 0;
mutex_lock(&ring->worker_lock);
count = atomic_read(&in->reqsz);
wroff = ring->wroff;
ppos = NULL;
if (!(filp->f_mode & FMODE_STREAM)) {
mutex_lock(&filp->f_pos_lock);
ppos = &pos;
pos = filp->f_pos;
}
while (count > 0) {
len = count;
do {
if (wroff + len > ring->bufsz)
n = ring->bufsz - wroff;
else
n = len;
if (ring->granularity > 0)
n = min(n, ring->granularity);
ret = kernel_read(filp, ring->bufmem + wroff, n, ppos);
if (ret <= 0) {
atomic_sub(count - len, &in->reqsz);
if (ret)
in->on_error = ret;
else
atomic_set(&in->on_eof, true);
exception = true;
goto done;
}
if (ppos)
filp->f_pos = *ppos;
atomic_add(ret, &ring->fillsz);
len -= ret;
wroff = (wroff + n) % ring->bufsz;
} while (len > 0);
count = atomic_sub_return(count, &in->reqsz);
}
done:
if (ppos)
mutex_unlock(&filp->f_pos_lock);
ring->wroff = wroff;
mutex_unlock(&ring->worker_lock);
if (atomic_read(&ring->fillsz) > 0 || exception) {
evl_signal_poll_events(&proxy->poll_head, POLLIN|POLLRDNORM);
evl_raise_flag(&ring->oob_wait); /* Reschedules. */
wake_up(&ring->inband_wait);
}
}
static void relay_input_work(struct evl_work *work)
{
struct evl_proxy *proxy =
container_of(work, struct evl_proxy, input.ring.relay_work);
relay_input(proxy);
}
static ssize_t do_proxy_read(struct file *filp,
char __user *u_buf, size_t count)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_in *in = &proxy->input;
struct proxy_ring *ring = &in->ring;
ssize_t len, ret, rbytes, n;
unsigned int rdoff, avail;
unsigned long flags;
char __user *u_ptr;
int xret;
if (count == 0)
return 0;
if (count > ring->bufsz)
return -EFBIG;
if (ring->granularity > 1 && count % ring->granularity > 0)
return -EINVAL;
len = count;
retry:
u_ptr = u_buf;
for (;;) {
raw_spin_lock_irqsave(&ring->lock, flags);
avail = atomic_read(&ring->fillsz) - ring->reserved;
if (avail < len) {
raw_spin_unlock_irqrestore(&ring->lock, flags);
if (in->on_error)
return in->on_error;
if (!(filp->f_flags & O_NONBLOCK) &&
avail > 0 && ring->granularity <= avail) {
len = ring->granularity > 1 ?
ring->granularity : avail;
goto retry;
}
return -EAGAIN;
}
rdoff = ring->rdoff;
ring->rdoff = (rdoff + len) % ring->bufsz;
ring->nesting++;
ring->reserved += len;
rbytes = ret = len;
do {
if (rdoff + rbytes > ring->bufsz)
n = ring->bufsz - rdoff;
else
n = rbytes;
raw_spin_unlock_irqrestore(&ring->lock, flags);
xret = raw_copy_to_user(u_ptr, ring->bufmem + rdoff, n);
raw_spin_lock_irqsave(&ring->lock, flags);
if (xret) {
ret = -EFAULT;
break;
}
u_ptr += n;
rbytes -= n;
rdoff = (rdoff + n) % ring->bufsz;
} while (rbytes > 0);
if (--ring->nesting == 0) {
atomic_sub(ring->reserved, &ring->fillsz);
ring->reserved = 0;
}
break;
}
raw_spin_unlock_irqrestore(&ring->lock, flags);
evl_schedule();
return ret;
}
......@@ -211,48 +391,151 @@ static ssize_t proxy_oob_write(struct file *filp,
const char __user *u_buf, size_t count)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_out *out = &proxy->output;
struct proxy_ring *ring = &proxy->output.ring;
ssize_t ret;
if (!proxy_is_writable(proxy))
return -ENXIO;
do {
ret = do_proxy_write(filp, u_buf, count);
if (ret != -EAGAIN || filp->f_flags & O_NONBLOCK)
break;
ret = evl_wait_flag(&out->oob_drained);
ret = evl_wait_flag(&ring->oob_wait);
} while (!ret);
return ret == -EIDRM ? -EBADF : ret;
}
static ssize_t proxy_oob_read(struct file *filp,
char __user *u_buf, size_t count)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_in *in = &proxy->input;
struct proxy_ring *ring = &in->ring;
bool request_done = false;
ssize_t ret;
if (!proxy_is_readable(proxy))
return -ENXIO;
for (;;) {
ret = do_proxy_read(filp, u_buf, count);
if (ret != -EAGAIN || filp->f_flags & O_NONBLOCK)
break;
if (!request_done) {
atomic_add(count, &in->reqsz);
request_done = true;
}
evl_call_inband_from(&ring->relay_work, ring->wq);
ret = evl_wait_flag(&ring->oob_wait);
if (ret)
break;
if (atomic_cmpxchg(&in->on_eof, true, false) == true) {
ret = 0;
break;
}
}
return ret == -EIDRM ? -EBADF : ret;
}
static __poll_t proxy_oob_poll(struct file *filp,
struct oob_poll_wait *wait)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_ring *oring = &proxy->output.ring;
struct proxy_ring *iring = &proxy->input.ring;
__poll_t ret = 0;
if (!(proxy_is_readable(proxy) || proxy_is_writable(proxy)))
return POLLERR;
evl_poll_watch(&proxy->poll_head, wait, NULL);
if (proxy_is_writable(proxy) &&
atomic_read(&oring->fillsz) < oring->bufsz)
ret = POLLOUT|POLLWRNORM;
if (proxy_is_readable(proxy) && atomic_read(&iring->fillsz) > 0)
ret |= POLLIN|POLLRDNORM;
return ret;
}
static ssize_t proxy_write(struct file *filp, const char __user *u_buf,
size_t count, loff_t *ppos)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_out *out = &proxy->output;
struct proxy_ring *ring = &proxy->output.ring;
ssize_t ret;
if (!proxy_is_writable(proxy))
return -ENXIO;
do {
ret = do_proxy_write(filp, u_buf, count);
if (ret != -EAGAIN || filp->f_flags & O_NONBLOCK)
break;
ret = wait_event_interruptible(out->inband_drained,
can_write_buffer(out, count));
ret = wait_event_interruptible(ring->inband_wait,
can_write_buffer(ring, count));
} while (!ret);
return ret;
}
static __poll_t proxy_oob_poll(struct file *filp,
struct oob_poll_wait *wait)
static ssize_t proxy_read(struct file *filp,
char __user *u_buf, size_t count, loff_t *ppos)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_in *in = &proxy->input;
bool request_done = false;
ssize_t ret;
if (!proxy_is_readable(proxy))
return -ENXIO;
for (;;) {
ret = do_proxy_read(filp, u_buf, count);
if (ret != -EAGAIN || filp->f_flags & O_NONBLOCK)
break;
if (!request_done) {
atomic_add(count, &in->reqsz);
request_done = true;
}
relay_input(proxy);
if (atomic_cmpxchg(&in->on_eof, true, false) == true) {
ret = 0;
break;
}
}
return ret;
}
static __poll_t proxy_poll(struct file *filp, poll_table *wait)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_out *out = &proxy->output;
struct proxy_ring *ring = &out->ring;
struct proxy_ring *oring = &proxy->output.ring;
struct proxy_ring *iring = &proxy->input.ring;
__poll_t ret = 0;
if (!(proxy_is_readable(proxy) || proxy_is_writable(proxy)))
return POLLERR;
if (proxy_is_writable(proxy)) {
poll_wait(filp, &oring->inband_wait, wait);
if (atomic_read(&oring->fillsz) < oring->bufsz)
ret = POLLOUT|POLLWRNORM;
}
evl_poll_watch(&out->poll_head, wait, NULL);
if (proxy_is_readable(proxy)) {
poll_wait(filp, &iring->inband_wait, wait);
if (atomic_read(&iring->fillsz) > 0)
ret |= POLLIN|POLLRDNORM;
}
return atomic_read(&ring->fillsz) < ring->bufsz ?
POLLOUT|POLLWRNORM : 0;
return ret;
}
static int proxy_mmap(struct file *filp, struct vm_area_struct *vma)
......@@ -286,9 +569,12 @@ static int proxy_mmap(struct file *filp, struct vm_area_struct *vma)
static int proxy_release(struct inode *inode, struct file *filp)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_out *out = &proxy->output;
evl_flush_flag(&out->oob_drained, T_RMID);
if (proxy_is_writable(proxy))
evl_flush_flag(&proxy->output.ring.oob_wait, T_RMID);
if (proxy_is_readable(proxy))
evl_flush_flag(&proxy->input.ring.oob_wait, T_RMID);
return evl_release_element(inode, filp);
}
......@@ -297,25 +583,75 @@ static const struct file_operations proxy_fops = {
.open = evl_open_element,
.release = proxy_release,
.oob_write = proxy_oob_write,
.oob_read = proxy_oob_read,
.oob_poll = proxy_oob_poll,
.write = proxy_write,
.read = proxy_read,
.poll = proxy_poll,
.mmap = proxy_mmap,
};
static int init_ring(struct proxy_ring *ring,
struct evl_proxy *proxy,
size_t bufsz,
unsigned int granularity,
bool is_output)
{
struct workqueue_struct *wq;
void *bufmem;
bufmem = kzalloc(bufsz, GFP_KERNEL);
if (bufmem == NULL)
return -ENOMEM;
wq = alloc_ordered_workqueue("%s.%c", 0,
evl_element_name(&proxy->element),
is_output ? 'O' : 'I');
if (wq == NULL) {
kfree(bufmem);
return -ENOMEM;
}
ring->wq = wq;
ring->bufmem = bufmem;
ring->bufsz = bufsz;
ring->granularity = granularity;
raw_spin_lock_init(&ring->lock);
evl_init_work_safe(&ring->relay_work,
is_output ? relay_output_work : relay_input_work,
&proxy->element);
evl_init_flag(&ring->oob_wait);
init_waitqueue_head(&ring->inband_wait);
mutex_init(&ring->worker_lock);
return 0;
}
static void destroy_ring(struct proxy_ring *ring)
{
evl_destroy_flag(&ring->oob_wait);
/*
* We cannot flush_work() since we may be called from a
* kworker, so we bluntly cancel any pending work instead. If
* output sync has to be guaranteed on closure, polling for
* POLLOUT before closing the target file is your friend.