Commit bfce9a18 authored by Philippe Gerum's avatar Philippe Gerum
Browse files

evl/proxy: enable inband writes



Although writing to a proxy is primarily intended for oob callers,
allowing inband tasks to use this channel too comes in handy as the
sequence of messages relayed by such proxy can be preserved regardless
of the execution stage they originate from, as they percolate through
the same buffer.

Inband writers can block until the output has drained, unless
O_NONBLOCK is set for the proxy.
Signed-off-by: Philippe Gerum's avatarPhilippe Gerum <rpm@xenomai.org>
parent 8af49607
......@@ -9,6 +9,7 @@
#include <linux/kernel.h>
#include <linux/uaccess.h>
#include <linux/file.h>
#include <linux/wait.h>
#include <linux/fs.h>
#include <linux/log2.h>
#include <linux/irq_work.h>
......@@ -31,7 +32,8 @@ struct proxy_ring {
};
struct proxy_out { /* oob_write->write */
struct evl_flag drained;
struct evl_flag oob_drained;
wait_queue_head_t inband_drained;
struct irq_work irq_work;
struct work_struct work;
hard_spinlock_t lock;
......@@ -91,11 +93,15 @@ static void relay_output(struct work_struct *work)
mutex_unlock(&filp->f_pos_lock);
if (!(filp->f_flags & O_NONBLOCK))
evl_raise_flag(&out->drained);
/* Give precedence to oob waiters for wakeups. */
evl_signal_poll_events(&out->poll_head, POLLOUT|POLLWRNORM);
evl_schedule();
if (!(filp->f_flags & O_NONBLOCK)) {
evl_raise_flag(&out->oob_drained);
wake_up(&out->inband_drained);
} else
evl_schedule();
}
static void relay_output_irq(struct irq_work *work)
......@@ -106,13 +112,21 @@ static void relay_output_irq(struct irq_work *work)
schedule_work(&proxy->output.work);
}
static ssize_t proxy_oob_write(struct file *filp,
static bool can_write_buffer(struct proxy_out *out, size_t size)
{
struct proxy_ring *ring = &out->ring;
return atomic_read(&ring->fillsz) +
ring->fillrsvd + size <= ring->bufsz;
}
static ssize_t do_proxy_write(struct file *filp,
const char __user *u_buf, size_t count)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_out *out = &proxy->output;
struct proxy_ring *ring = &out->ring;
unsigned int wroff, wbytes, n, fillsz;
unsigned int wroff, wbytes, n;
unsigned long flags;
ssize_t ret;
int xret;
......@@ -128,71 +142,86 @@ static ssize_t proxy_oob_write(struct file *filp,
raw_spin_lock_irqsave(&out->lock, flags);
for (;;) {
fillsz = atomic_read(&ring->fillsz) + ring->fillrsvd;
/*
* No short or scattered writes, wait for drain or
* -EAGAIN if there is not enough space.
*/
if (fillsz + count > ring->bufsz)
goto wait;
/* Reserve a write slot into the circular buffer. */
wroff = ring->wroff;
ring->wroff = (wroff + count) % ring->bufsz;
ring->wrpending++;
ring->fillrsvd += count;
wbytes = ret = count;
/* No short or scattered writes. */
if (!can_write_buffer(out, count)) {
ret = -EAGAIN;
goto out;
}
do {
if (wroff + wbytes > ring->bufsz)
n = ring->bufsz - wroff;
else
n = wbytes;
raw_spin_unlock_irqrestore(&out->lock, flags);
xret = raw_copy_from_user(ring->bufmem + wroff, u_buf, n);
raw_spin_lock_irqsave(&out->lock, flags);
if (xret) {
memset(ring->bufmem + wroff + n - xret, 0, xret);
ret = -EFAULT;
break;
}
u_buf += n;
wbytes -= n;
wroff = (wroff + n) % ring->bufsz;
} while (wbytes > 0);
if (--ring->wrpending == 0) {
fillsz = atomic_add_return(ring->fillrsvd, &ring->fillsz);
ring->fillrsvd = 0;
if (fillsz == count)
/* empty -> non-empty transition */
irq_work_queue(&out->irq_work);
}
/* Reserve a write slot into the circular buffer. */
wroff = ring->wroff;
ring->wroff = (wroff + count) % ring->bufsz;
ring->wrpending++;
ring->fillrsvd += count;
wbytes = ret = count;
break;
wait:
if (filp->f_flags & O_NONBLOCK) {
ret = -EAGAIN;
break;
}
do {
if (wroff + wbytes > ring->bufsz)
n = ring->bufsz - wroff;
else
n = wbytes;
raw_spin_unlock_irqrestore(&out->lock, flags);
xret = raw_copy_from_user(ring->bufmem + wroff, u_buf, n);
raw_spin_lock_irqsave(&out->lock, flags);
if (xret) {
memset(ring->bufmem + wroff + n - xret, 0, xret);
ret = -EFAULT;
break;
}
ret = evl_wait_flag(&out->drained);
if (unlikely(ret))
return ret;
u_buf += n;
wbytes -= n;
wroff = (wroff + n) % ring->bufsz;
} while (wbytes > 0);
raw_spin_lock_irqsave(&out->lock, flags);
if (--ring->wrpending == 0) {
n = atomic_add_return(ring->fillrsvd, &ring->fillsz);
ring->fillrsvd = 0;
if (n == count) /* empty -> non-empty transition */
irq_work_queue(&out->irq_work);
}
out:
raw_spin_unlock_irqrestore(&out->lock, flags);
return ret;
}
static ssize_t proxy_oob_write(struct file *filp,
const char __user *u_buf, size_t count)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_out *out = &proxy->output;
ssize_t ret;
do {
ret = do_proxy_write(filp, u_buf, count);
if (ret != -EAGAIN || filp->f_flags & O_NONBLOCK)
break;
ret = evl_wait_flag(&out->oob_drained);
} while (!ret);
return ret;
}
static ssize_t proxy_write(struct file *filp, const char __user *u_buf,
size_t count, loff_t *ppos)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_out *out = &proxy->output;
ssize_t ret;
do {
ret = do_proxy_write(filp, u_buf, count);
if (ret != -EAGAIN || filp->f_flags & O_NONBLOCK)
break;
ret = wait_event_interruptible(out->inband_drained,
can_write_buffer(out, count));
} while (ret != -ERESTARTSYS);
return ret;
}
static __poll_t proxy_oob_poll(struct file *filp,
struct oob_poll_wait *wait)
{
......@@ -239,6 +268,7 @@ static const struct file_operations proxy_fops = {
.release = evl_release_element,
.oob_write = proxy_oob_write,
.oob_poll = proxy_oob_poll,
.write = proxy_write,
.mmap = proxy_mmap,
};
......@@ -303,7 +333,8 @@ proxy_factory_build(struct evl_factory *fac, const char *name,
init_irq_work(&proxy->output.irq_work, relay_output_irq);
INIT_WORK(&proxy->output.work, relay_output);
evl_init_poll_head(&proxy->output.poll_head);
evl_init_flag(&proxy->output.drained);
evl_init_flag(&proxy->output.oob_drained);
init_waitqueue_head(&proxy->output.inband_drained);
evl_index_element(&proxy->element);
return &proxy->element;
......@@ -323,7 +354,7 @@ static void proxy_factory_dispose(struct evl_element *e)
struct evl_proxy *proxy;
proxy = container_of(e, struct evl_proxy, element);
evl_destroy_flag(&proxy->output.drained);
evl_destroy_flag(&proxy->output.oob_drained);
evl_unindex_element(&proxy->element);
fput(proxy->filp);
evl_destroy_element(&proxy->element);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment