Commit 9332f2a3 authored by Philippe Gerum's avatar Philippe Gerum
Browse files

evl/proxy: enable fixed-size write, polling for output to drain



Connecting a proxy with some special file may impose fixed-size writes
to the latter, e.g. an eventfd would require that 64bit values only be
written. These changes enable the application to set a fixed
granularity the proxy must abide by when writing to the target file,
so that such requirement is honored.

In addition:

- oob_write() may block until enough space is available in the
  transfer buffer to complete the sending, unless O_NONBLOCK is set on
  the proxy file.

- POLLOUT|POLLWRNORM can be monitored for waiting for the output to
drain.
Signed-off-by: Philippe Gerum's avatarPhilippe Gerum <rpm@xenomai.org>
parent 73d5a587
......@@ -10,7 +10,7 @@
#include <linux/types.h>
#include <uapi/evl/sched.h>
#define EVL_ABI_LEVEL 5
#define EVL_ABI_LEVEL 6
#define EVL_CONTROL_DEV "/dev/evl/control"
......
......@@ -14,6 +14,7 @@
struct evl_proxy_attrs {
__u32 fd;
__u32 bufsz;
__u32 granularity;
};
#endif /* !_EVL_UAPI_PROXY_H */
......@@ -13,138 +13,197 @@
#include <linux/log2.h>
#include <linux/irq_work.h>
#include <linux/workqueue.h>
#include <linux/circ_buf.h>
#include <linux/atomic.h>
#include <evl/factory.h>
#include <evl/flag.h>
#include <evl/poll.h>
#include <uapi/evl/proxy.h>
struct evl_proxy {
struct file *filp;
struct circ_buf circ_buf;
bool write_flag;
size_t bufsz;
struct proxy_ring {
void *bufmem;
atomic_t fillsz;
int wrpending;
unsigned int bufsz;
unsigned int rdoff;
unsigned int wroff;
unsigned int fillrsvd;
unsigned int granularity;
};
struct proxy_out { /* oob_write->write */
struct evl_flag drained;
struct irq_work irq_work;
struct work_struct work;
hard_spinlock_t lock;
struct evl_poll_head poll_head;
struct proxy_ring ring;
};
struct evl_proxy {
struct file *filp;
struct proxy_out output;
struct evl_element element;
};
static void relay_output(struct work_struct *work)
{
int head, tail, rem, len;
struct evl_proxy *proxy;
struct circ_buf *circ;
struct file *outfilp;
struct evl_proxy *proxy = container_of(work, struct evl_proxy, output.work);
struct proxy_out *out = &proxy->output;
struct proxy_ring *ring = &out->ring;
unsigned int rdoff, count, len, n;
struct file *filp = proxy->filp;
ssize_t ret;
loff_t pos;
proxy = container_of(work, struct evl_proxy, work);
outfilp = proxy->filp;
circ = &proxy->circ_buf;
mutex_lock(&filp->f_pos_lock);
mutex_lock(&outfilp->f_pos_lock);
count = atomic_read(&ring->fillsz);
rdoff = ring->rdoff;
for (;;) {
head = smp_load_acquire(&circ->head);
tail = circ->tail;
rem = CIRC_CNT(head, tail, proxy->bufsz);
if (rem <= 0)
if (count == 0)
break;
len = min(CIRC_CNT_TO_END(head, tail, proxy->bufsz), rem);
len = count;
pos = outfilp->f_pos;
ret = vfs_write(outfilp, circ->buf + tail, len, &pos);
if (ret >= 0)
outfilp->f_pos = pos;
do {
if (rdoff + len > ring->bufsz)
n = ring->bufsz - rdoff;
else
n = len;
smp_store_release(&circ->tail,
(tail + len) & (proxy->bufsz - 1));
if (ring->granularity > 0)
n = min(n, ring->granularity);
pos = filp->f_pos;
ret = vfs_write(filp, ring->bufmem + rdoff, n, &pos);
if (ret >= 0)
filp->f_pos = pos;
len -= n;
rdoff = (rdoff + n) % ring->bufsz;
} while (len > 0);
count = atomic_sub_return(count, &ring->fillsz);
}
mutex_unlock(&outfilp->f_pos_lock);
ring->rdoff = rdoff;
mutex_unlock(&filp->f_pos_lock);
if (!(filp->f_flags & O_NONBLOCK))
evl_raise_flag(&out->drained);
evl_signal_poll_events(&out->poll_head, POLLOUT|POLLWRNORM);
evl_schedule();
}
static void relay_output_irq(struct irq_work *work)
{
struct evl_proxy *proxy;
proxy = container_of(work, struct evl_proxy, irq_work);
schedule_work(&proxy->work);
proxy = container_of(work, struct evl_proxy, output.irq_work);
schedule_work(&proxy->output.work);
}
static ssize_t proxy_oob_write(struct file *filp,
const char __user *u_buf, size_t count)
{
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct circ_buf *circ = &proxy->circ_buf;
ssize_t rem, avail, written = 0;
const char __user *u_ptr;
int head, tail, len, ret;
struct proxy_out *out = &proxy->output;
struct proxy_ring *ring = &out->ring;
unsigned int wroff, wbytes, n, fillsz;
unsigned long flags;
bool kick, race;
ssize_t ret;
int xret;
if (count == 0)
return 0;
if (count >= proxy->bufsz) /* Avail space is bufsz - 1. */
if (count > ring->bufsz)
return -EFBIG;
retry:
u_ptr = u_buf;
rem = count;
raw_spin_lock_irqsave(&proxy->lock, flags);
if (ring->granularity > 1 && count % ring->granularity > 0)
return -EINVAL;
for (;;) {
head = circ->head;
tail = READ_ONCE(circ->tail);
if (rem == 0 || CIRC_SPACE(head, tail, proxy->bufsz) <= 0)
break;
raw_spin_lock_irqsave(&out->lock, flags);
avail = CIRC_SPACE_TO_END(head, tail, proxy->bufsz);
len = min(avail, rem);
race = proxy->write_flag;
if (!race)
proxy->write_flag = true;
raw_spin_unlock_irqrestore(&proxy->lock, flags);
if (race)
goto retry;
ret = raw_copy_from_user(circ->buf + head, u_ptr, len);
raw_spin_lock_irqsave(&proxy->lock, flags);
proxy->write_flag = false;
for (;;) {
fillsz = atomic_read(&ring->fillsz) + ring->fillrsvd;
/*
* No short or scattered writes, wait for drain or
* -EAGAIN if there is not enough space.
*/
if (fillsz + count > ring->bufsz)
goto wait;
/* Reserve a write slot into the circular buffer. */
wroff = ring->wroff;
ring->wroff = (wroff + count) % ring->bufsz;
ring->wrpending++;
ring->fillrsvd += count;
wbytes = ret = count;
do {
if (wroff + wbytes > ring->bufsz)
n = ring->bufsz - wroff;
else
n = wbytes;
raw_spin_unlock_irqrestore(&out->lock, flags);
xret = raw_copy_from_user(ring->bufmem + wroff, u_buf, n);
raw_spin_lock_irqsave(&out->lock, flags);
if (xret) {
memset(ring->bufmem + wroff + n - xret, 0, xret);
ret = -EFAULT;
break;
}
u_buf += n;
wbytes -= n;
wroff = (wroff + n) % ring->bufsz;
} while (wbytes > 0);
if (--ring->wrpending == 0) {
fillsz = atomic_add_return(ring->fillrsvd, &ring->fillsz);
ring->fillrsvd = 0;
if (fillsz == count)
/* empty -> non-empty transition */
irq_work_queue(&out->irq_work);
}
if (ret) {
written = -EFAULT;
break;
wait:
if (filp->f_flags & O_NONBLOCK) {
ret = -EAGAIN;
break;
}
smp_store_release(&circ->head,
(head + len) & (proxy->bufsz - 1));
u_ptr += len;
rem -= len;
written += len;
}
raw_spin_unlock_irqrestore(&out->lock, flags);
kick = CIRC_CNT(head, tail, proxy->bufsz) > 0;
ret = evl_wait_flag(&out->drained);
if (unlikely(ret))
return ret;
raw_spin_unlock_irqrestore(&proxy->lock, flags);
raw_spin_lock_irqsave(&out->lock, flags);
}
if (kick)
irq_work_queue(&proxy->irq_work);
raw_spin_unlock_irqrestore(&out->lock, flags);
return written;
return ret;
}
static ssize_t proxy_write(struct file *filp, const char __user *u_buf,
size_t count, loff_t *ppos)
static __poll_t proxy_oob_poll(struct file *filp,
struct oob_poll_wait *wait)
{
return proxy_oob_write(filp, u_buf, count);
struct evl_proxy *proxy = element_of(filp, struct evl_proxy);
struct proxy_out *out = &proxy->output;
struct proxy_ring *ring = &out->ring;
evl_poll_watch(&out->poll_head, wait, NULL);
return atomic_read(&ring->fillsz) < ring->bufsz ?
POLLOUT|POLLWRNORM : 0;
}
static int proxy_mmap(struct file *filp, struct vm_area_struct *vma)
......@@ -179,7 +238,7 @@ static const struct file_operations proxy_fops = {
.open = evl_open_element,
.release = evl_release_element,
.oob_write = proxy_oob_write,
.write = proxy_write,
.oob_poll = proxy_oob_poll,
.mmap = proxy_mmap,
};
......@@ -198,10 +257,17 @@ proxy_factory_build(struct evl_factory *fac, const char *name,
if (ret)
return ERR_PTR(-EFAULT);
bufsz = roundup_pow_of_two(attrs.bufsz);
bufsz = attrs.bufsz;
if (order_base_2(bufsz) > 30) /* LART */
return ERR_PTR(-EINVAL);
/*
* If a granularity is set, the buffer size must be a multiple
* of the granule size.
*/
if (attrs.granularity > 1 && bufsz % attrs.granularity > 0)
return ERR_PTR(-EINVAL);
filp = fget(attrs.fd);
if (filp == NULL)
return ERR_PTR(-EINVAL);
......@@ -230,11 +296,14 @@ proxy_factory_build(struct evl_factory *fac, const char *name,
goto fail_element;
proxy->filp = filp;
proxy->circ_buf.buf = bufmem;
proxy->bufsz = bufsz;
INIT_WORK(&proxy->work, relay_output);
init_irq_work(&proxy->irq_work, relay_output_irq);
raw_spin_lock_init(&proxy->lock);
proxy->output.ring.bufmem = bufmem;
proxy->output.ring.bufsz = bufsz;
proxy->output.ring.granularity = attrs.granularity;
raw_spin_lock_init(&proxy->output.lock);
init_irq_work(&proxy->output.irq_work, relay_output_irq);
INIT_WORK(&proxy->output.work, relay_output);
evl_init_poll_head(&proxy->output.poll_head);
evl_init_flag(&proxy->output.drained);
evl_index_element(&proxy->element);
return &proxy->element;
......@@ -254,12 +323,13 @@ static void proxy_factory_dispose(struct evl_element *e)
struct evl_proxy *proxy;
proxy = container_of(e, struct evl_proxy, element);
evl_destroy_flag(&proxy->output.drained);
evl_unindex_element(&proxy->element);
fput(proxy->filp);
evl_destroy_element(&proxy->element);
if (proxy->circ_buf.buf)
kfree(proxy->circ_buf.buf);
if (proxy->output.ring.bufmem)
kfree(proxy->output.ring.bufmem);
kfree_rcu(proxy, element.rcu);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment