Commit edb5af7f authored by Philippe Gerum's avatar Philippe Gerum
Browse files

include/tube: fix queue remove/add race



A receiver could get a spurious empty tube status, due to
receive_tube() racing with send_tube(). See the added comments into
the code for details about the resolution.

At this chance, guard against load/store tearing on shared pointers.

Pending issue: we still have a potential connectivity issue between
the prep and finish ops when pushing to a tube.
Signed-off-by: Philippe Gerum's avatarPhilippe Gerum <rpm@xenomai.org>
parent d9443e0e
......@@ -18,19 +18,83 @@
#include <evl/compiler.h>
#include <evl/atomic.h>
#define tube_push_prepare(__desc, __free) \
({ \
typeof((__desc)->tail) __old_qp; \
atomic_store(&(__free)->next, NULL); \
smp_mb(); /* (1) */ \
do { \
__old_qp = (__desc)->tail; \
} while (__sync_val_compare_and_swap( \
&(__desc)->tail, \
__old_qp, __free) != __old_qp); \
__old_qp; \
/*
* The tricky one: pulling a canister from the tube. The noticeable
* part is how we deal with the end-of-queue situation, temporarily
* snatching the final element - which is not usable for carrying
* payload and cannot be returned to the caller. We have two
* requirements:
*
* - first is that a queue must contain at least one (final) element,
* so that tube_push_item() can always store a new message there,
* adding a free canister at the end of the same queue to compensate
* for the consumed slot. Therefore, consuming the final element with
* a NULL ->next pointer is not allowed, which means that observing
* __desc->head->next == NULL denotes an empty queue.
*
* - second is to not interpret or dereference the value of __next_dq
* until the CAS advancing the head pointer has taken place, so that
* we don't consume such information which might have been updated in
* parallel by a concurrent thread which won the CAS race on advancing
* the same head pointer (i.e. __desc->head = __head_dq->next). We
* solve this by allowing the head pointer to go NULL shortly,
* reverting this move only after the CAS has succeeded, which
* guarantees us that nobody else was granted the same item
* (__head_dq). Another concurrent pull might fetch a NULL head in the
* meantime, which also denotes an empty queue (like seeing
* __head_dq->next == NULL).
*
* The only drawback of this scheme is as follows:
*
* A: tube_pull() sets head = NULL B: tube_push() stores to final element
* B: tube_pull() observes empty queue (head == NULL)
* A: tube_pull() restores head = &final
*
* IOW, when a puller reaches the non-removable final element of a
* queue, a concurrent pusher might still observe an empty queue after
* a successful push, until the puller eventually restores the head
* pointer, revealing the new message. Although not perfect, that
* seems acceptable from the standpoint of the pusher knowing that
* concurrent pullers are present.
*/
#define tube_pull(__desc) \
({ \
typeof((__desc)->head) __next_dq, __head_dq; \
do { \
__head_dq = atomic_load(&(__desc)->head); \
if (__head_dq == NULL) \
break; \
__next_dq = atomic_load(&(__head_dq)->next); \
} while (!__sync_bool_compare_and_swap( \
&(__desc)->head, __head_dq, __next_dq)); \
if (__head_dq && __next_dq == NULL) { \
atomic_store(&(__desc)->head, __head_dq); \
__head_dq = NULL; \
} \
__head_dq; \
})
/*
* The push operation moves the tail to a free canister, which serves
* as the new final element, returning the former tail element which
* will convey the payload (CAS ensures that multiple callers cannot
* get the same element).
*
* FIXME: we still have a queue connectivity issue there if preempted
* indefinitely between the prep and finish ops. Revisit.
*/
#define tube_push_prepare(__desc, __free) \
({ \
typeof((__desc)->tail) __old_qp; \
atomic_store(&(__free)->next, NULL); \
smp_mb(); \
do { \
__old_qp = atomic_load(&(__desc)->tail); \
} while (!__sync_bool_compare_and_swap( \
&(__desc)->tail, __old_qp, __free)); \
__old_qp; \
})
/*
* We may directly compete with tube_push_prepare() on the tail
* pointer, and indirectly with tube_pull() which may be walking the
......@@ -41,13 +105,14 @@
*/
#define tube_push_finish(__desc, __new, __free) \
do { \
smp_mb(); /* (1) */ \
smp_mb(); \
atomic_store(&(__new)->next, __free); \
} while (0)
#define tube_push(__desc, __free) \
do { \
typeof((__desc)->tail) __new_q; \
compiler_barrier(); \
__new_q = tube_push_prepare(__desc, __free); \
tube_push_finish(__desc, __new_q, __free); \
} while (0)
......@@ -60,18 +125,6 @@
tube_push_finish(__desc, __new_qi, __free); \
} while (0)
#define tube_pull(__desc) \
({ \
typeof((__desc)->head) __next_dq, __old_dq; \
do { \
__old_dq = (__desc)->head; \
__next_dq = __old_dq->next; \
} while (__next_dq && __sync_val_compare_and_swap( \
&(__desc)->head, \
__old_dq, __next_dq) != __old_dq); \
__next_dq ? __old_dq : NULL; \
})
#define DECLARE_EVL_CANISTER(__can_struct, __payload) \
struct __can_struct { \
struct __can_struct *next; \
......@@ -162,24 +215,48 @@
typeof((__desc)->tail) __old_qp; \
uintptr_t __off_qp = __memoff(__base, __free); \
atomic_store(&(__free)->next, 0); \
smp_mb(); /* (1) */ \
smp_mb(); \
do { \
__old_qp = (__desc)->tail; \
__old_qp = atomic_load(&(__desc)->tail); \
} while (__sync_val_compare_and_swap( \
&(__desc)->tail, \
__old_qp, __off_qp) != __old_qp); \
__memptr(__base, __old_qp); \
})
/* See tube_pull() for details. */
#define tube_pull_rel(__base, __desc) \
({ \
typeof((__desc)->head) __next_dq, __head_dq; \
typeof((__desc)->first[0]) *__head_dqptr; \
do { \
__head_dq = atomic_load(&(__desc)->head); \
if (__head_dq == 0) { \
__head_dqptr = NULL; \
break; \
} \
__head_dqptr = (typeof(__head_dqptr)) \
__memptr(__base, __head_dq); \
__next_dq = atomic_load(&(__head_dqptr)->next); \
} while (!__sync_bool_compare_and_swap( \
&(__desc)->head, __head_dq, __next_dq)); \
if (__head_dq && __next_dq == 0) { \
atomic_store(&(__desc)->head, __head_dq); \
__head_dqptr = NULL; \
} \
__head_dqptr; \
})
#define tube_push_finish_rel(__base, __desc, __new, __free) \
do { \
smp_mb(); /* (1) */ \
smp_mb(); \
atomic_store(&(__new)->next, __memoff(__base, __free)); \
} while (0)
#define tube_push_rel(__base, __desc, __free) \
do { \
typeof((__desc)->first[0]) *__new_q; \
compiler_barrier(); \
__new_q = (typeof(__new_q)) \
tube_push_prepare_rel(__base, __desc, __free); \
tube_push_finish_rel(__base, __desc, __new_q, __free); \
......@@ -194,21 +271,6 @@
tube_push_finish_rel(__base, __desc, __new_qi, __free); \
} while (0)
#define tube_pull_rel(__base, __desc) \
({ \
typeof((__desc)->head) __next_dq, __old_dq; \
typeof((__desc)->first[0]) *__old_dqptr; \
do { \
__old_dq = (__desc)->head; \
__old_dqptr = (typeof(__old_dqptr)) \
__memptr(__base, __old_dq); \
__next_dq = __old_dqptr->next; \
} while (__next_dq && __sync_val_compare_and_swap( \
&(__desc)->head, \
__old_dq, __next_dq) != __old_dq); \
__next_dq ? __old_dqptr : NULL; \
})
#define DECLARE_EVL_CANISTER_REL(__can_struct, __payload) \
struct __can_struct { \
uintptr_t next; \
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment