diff --git a/lib/Makefile b/lib/Makefile index fafca6bf..f620f484 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -1,4 +1,4 @@ -src := a-path.c a-set.c bitmap.c bitops.c blake2s.c blake2b.c checksum.c event.c flowspec.c idm.c ip.c lists.c mac.c md5.c mempool.c net.c netindex.c patmatch.c printf.c rcu.c resource.c sha1.c sha256.c sha512.c slab.c slists.c strtoul.c tbf.c timer.c xmalloc.c +src := a-path.c a-set.c bitmap.c bitops.c blake2s.c blake2b.c checksum.c event.c flowspec.c idm.c ip.c lists.c lockfree.c mac.c md5.c mempool.c net.c netindex.c patmatch.c printf.c rcu.c resource.c sha1.c sha256.c sha512.c slab.c slists.c strtoul.c tbf.c timer.c xmalloc.c obj := $(src-o-files) $(all-daemon) diff --git a/lib/lockfree.c b/lib/lockfree.c new file mode 100644 index 00000000..2c2b1b56 --- /dev/null +++ b/lib/lockfree.c @@ -0,0 +1,413 @@ +/* + * BIRD Library -- Generic lock-free structures + * + * (c) 2023--2024 Maria Matejka + * (c) 2023--2024 CZ.NIC, z.s.p.o. + * + * Can be freely distributed and used under the terms of the GNU GPL. + */ + +#include "lib/birdlib.h" +#include "lib/lockfree.h" + +#define LOCAL_DEBUG + +#if 0 +#define lfjour_debug(...) log(L_TRACE __VA_ARGS__) +#define lfjour_debug_detailed(...) log(L_TRACE __VA_ARGS__) +#elif 0 +#define lfjour_debug(...) log(L_TRACE __VA_ARGS__) +#define lfjour_debug_detailed(...) +#else +#define lfjour_debug(...) +#define lfjour_debug_detailed(...) +#endif + +#define LBI(j, b, p) ((struct lfjour_item *)(((void *) (b)->_block) + ((j)->item_size * (p)))) +#define LBP(j, b, i) ({ \ + off_t off = ((void *) (i)) - ((void *) (b)->_block); \ + u32 s = (j)->item_size; \ + ASSERT_DIE(off < page_size); \ + ASSERT_DIE((off % s) == 0); \ + off / s; \ + }) + +struct lfjour_item * +lfjour_push_prepare(struct lfjour *j) +{ + ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain)); + ASSERT_DIE(!j->open); + + if (EMPTY_TLIST(lfjour_block, &j->pending) && + EMPTY_TLIST(lfjour_recipient, &j->recipients)) + return NULL; + + struct lfjour_block *block = NULL; + u32 end = 0; + + if (!EMPTY_TLIST(lfjour_block, &j->pending)) + { + block = j->pending.last; + end = atomic_load_explicit(&block->end, memory_order_relaxed); + if (end >= j->item_count) + { + ASSERT_DIE(end == j->item_count); + block = NULL; + end = 0; + } + } + + if (!block) + { + block = alloc_page(); + lfjour_debug("lfjour(%p)_push_prepare: allocating block %p", j, block); + *block = (struct lfjour_block) {}; + lfjour_block_add_tail(&j->pending, block); + } + + struct lfjour_item *i = LBI(j, block, end); + *i = (struct lfjour_item) { + .seq = j->next_seq++, + }; + + return j->open = i; +} + +void +lfjour_push_commit(struct lfjour *j) +{ + ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain)); + ASSERT_DIE(j->open); + struct lfjour_block *b = PAGE_HEAD(j->open); + ASSERT_DIE(b == j->pending.last); + + lfjour_debug("lfjour(%p)_push_commit of %p, seq=%lu", j, j->open, j->open->seq); + + u32 end = atomic_fetch_add_explicit(&b->end, 1, memory_order_release); + ASSERT_DIE(j->open == LBI(j, b, end)); + + if (end == 0) + { + struct lfjour_block *prev = b->n.prev; + _Bool f = 0; + if (prev) + ASSERT_DIE(atomic_compare_exchange_strong_explicit(&prev->not_last, &f, 1, + memory_order_release, memory_order_relaxed)); + } + + /* Store the first item to announce (only if this is actually the first one). */ + struct lfjour_item *null_item = NULL; + if (atomic_compare_exchange_strong_explicit( + &j->first, &null_item, j->open, + memory_order_acq_rel, memory_order_relaxed)) + { + lfjour_debug("lfjour(%p) first set", j); + } + + j->open = NULL; + + if (!ev_active(&j->announce_kick_event)) + ev_send_loop(j->loop, &j->announce_kick_event); +} + +static struct lfjour_item * +lfjour_get_next(struct lfjour *j, struct lfjour_item *last) +{ + /* This is lockless, no domain checks. */ + if (!last) + { + struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire); + return first; + } + + struct lfjour_block *block = PAGE_HEAD(last); + ASSERT_DIE(block); + u32 end = atomic_load_explicit(&block->end, memory_order_acquire); + u32 pos = LBP(j, block, last); + ASSERT_DIE(pos < end); + + /* Next is in the same block. */ + if (++pos < end) + return LBI(j, block, pos); + + /* There is another block. */ + if (atomic_load_explicit(&block->not_last, memory_order_acquire)) + { + /* To avoid rare race conditions, we shall check the current block end once again */ + u32 new_end = atomic_load_explicit(&block->end, memory_order_acquire); + ASSERT_DIE(new_end >= end); + if (new_end > end) + return LBI(j, block, pos); + + /* Nothing in the previous one, let's move to the next block. + * This is OK to do non-atomically because of the not_last flag. */ + block = block->n.next; + return LBI(j, block, 0); + } + + /* There is nothing more. */ + return NULL; +} + +struct lfjour_item * +lfjour_get(struct lfjour_recipient *r) +{ + ASSERT_DIE(r->cur == NULL); + struct lfjour *j = lfjour_of_recipient(r); + + /* The last pointer may get cleaned up under our hands. + * Indicating that we're using it, by RCU read. */ + + rcu_read_lock(); + struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire); + r->cur = lfjour_get_next(j, last); + rcu_read_unlock(); + + if (last) + { + lfjour_debug_detailed("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, last %p", + j, r, r->cur, r->cur ? r->cur->seq : 0ULL, last); + } + else + { + lfjour_debug("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, clean", + j, r, r->cur, r->cur ? r->cur->seq : 0ULL); + } + + return r->cur; +} + +void lfjour_release(struct lfjour_recipient *r) +{ + /* This is lockless, no domain checks. */ + + ASSERT_DIE(r->cur); + struct lfjour_block *block = PAGE_HEAD(r->cur); + u32 end = atomic_load_explicit(&block->end, memory_order_acquire); + + struct lfjour *j = lfjour_of_recipient(r); + u32 pos = LBP(j, block, r->cur); + ASSERT_DIE(pos < end); + + /* Releasing this export for cleanup routine */ + if (pos + 1 == end) + { + lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, seq=%lu (end)", + j, r, r->cur, r->cur->seq); + } + else + { + lfjour_debug_detailed("lfjour(%p)_release(recipient=%p) of %p, seq=%lu (mid)", + j, r, r->cur, r->cur->seq); + } + + atomic_store_explicit(&r->last, r->cur, memory_order_release); + + /* The last block may be available to free */ + if (pos + 1 == end) + lfjour_schedule_cleanup(j); + + r->cur = NULL; +} + +void +lfjour_announce_now(struct lfjour *j) +{ + ASSERT_DIE(birdloop_inside(j->loop)); + settle_cancel(&j->announce_timer); + ev_postpone(&j->announce_kick_event); + + if (EMPTY_TLIST(lfjour_recipient, &j->recipients)) + return lfjour_schedule_cleanup(j); + + WALK_TLIST(lfjour_recipient, r, &j->recipients) + ev_send(r->target, r->event); +} + +static void +lfjour_announce_settle_hook(struct settle *s) +{ + return lfjour_announce_now(SKIP_BACK(struct lfjour, announce_timer, s)); +} + +static void +lfjour_announce_kick_hook(void *_j) +{ + struct lfjour *j = _j; + settle_kick(&j->announce_timer, j->loop); +} + +u64 +lfjour_pending_items(struct lfjour *j) +{ + ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain)); + + struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_relaxed); + if (!first) + return 0; + + ASSERT_DIE(j->next_seq > first->seq); + return j->next_seq - first->seq; +} + +void +lfjour_register(struct lfjour *j, struct lfjour_recipient *r) +{ + ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain)); + ASSERT_DIE(r->event); + ASSERT_DIE(r->target); + + atomic_store_explicit(&r->last, NULL, memory_order_relaxed); + ASSERT_DIE(!r->cur); + + lfjour_recipient_add_tail(&j->recipients, r); +} + +void +lfjour_unregister(struct lfjour_recipient *r) +{ + struct lfjour *j = lfjour_of_recipient(r); + ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain)); + + lfjour_recipient_rem_node(&j->recipients, r); + lfjour_schedule_cleanup(j); +} + +static inline void lfjour_cleanup_unlock_helper(struct domain_generic **dg) +{ + if (!*dg) return; + DG_UNLOCK(*dg); +} + +static void +lfjour_cleanup_hook(void *_j) +{ + struct lfjour *j = _j; + + CLEANUP(lfjour_cleanup_unlock_helper) struct domain_generic *_locked = j->domain; + if (_locked) DG_LOCK(_locked); + + u64 min_seq = ~((u64) 0); + struct lfjour_item *last_item_to_free = NULL; + struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire); + + if (!first) + { + /* Nothing to cleanup, actually, just call the done callback */ + ASSERT_DIE(EMPTY_TLIST(lfjour_block, &j->pending)); + CALL(j->cleanup_done, j, 0, ~((u64) 0)); + return; + } + + WALK_TLIST(lfjour_recipient, r, &j->recipients) + { + struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire); + + if (!last) + /* No last export means that the channel has exported nothing since last cleanup */ + return; + + else if (min_seq > last->seq) + { + min_seq = last->seq; + last_item_to_free = last; + } + } + + /* Here we're sure that no receiver is going to use the first pointer soon. + * It is only used when the receiver's last pointer is NULL, which is avoided by the code above. + * Thus, we can just move the journal's first pointer forward. */ + struct lfjour_item *next = last_item_to_free ? lfjour_get_next(j, last_item_to_free) : NULL; + atomic_store_explicit(&j->first, next, memory_order_release); + + lfjour_debug("lfjour(%p) set first=%p (was %p)", j, next, first); + + WALK_TLIST(lfjour_recipient, r, &j->recipients) + { + struct lfjour_item *last = last_item_to_free; + /* This either succeeds if this item is the most-behind-one, + * or fails and gives us the actual last for debug output. */ + if (atomic_compare_exchange_strong_explicit( + &r->last, &last, NULL, + memory_order_acq_rel, memory_order_acquire)) + { + lfjour_debug("lfjour(%p)_cleanup(recipient=%p): store last=NULL", j, r); + } + else + { + lfjour_debug("lfjour(%p)_cleanup(recipient=%p): keep last=%p", j, r, last); + } + } + + /* Now some recipients may have old last-pointers. We have to wait + * until they finish their routine, before we start cleaning up. */ + synchronize_rcu(); + + u64 orig_first_seq = first->seq; + + /* Now we do the actual cleanup */ + while (first && (first->seq <= min_seq)) + { + j->item_done(j, first); + +#ifdef LOCAL_DEBUG + memset(first, 0xbd, j->item_size); +#endif + + /* Find next journal item */ + struct lfjour_item *next = lfjour_get_next(j, first); + if (PAGE_HEAD(next) != PAGE_HEAD(first)) + { + /* This was the last one in its block */ + struct lfjour_block *block = PAGE_HEAD(first); + lfjour_debug("lfjour(%p)_cleanup: freeing block %p", j, block); + ASSERT_DIE(block == j->pending.first); + + /* Free this block */ + lfjour_block_rem_node(&j->pending, block); +#ifdef LOCAL_DEBUG + memset(block, 0xbe, page_size); +#endif + free_page(block); + + /* If no more blocks are remaining, we shall reset + * the sequence numbers */ + + if (EMPTY_TLIST(lfjour_block, &j->pending)) + { + lfjour_debug("lfjour(%p)_cleanup: seq reset", j); + WALK_TLIST(lfjour_recipient, r, &j->recipients) + atomic_fetch_or_explicit(&r->recipient_flags, LFJOUR_R_SEQ_RESET, memory_order_acq_rel); + + j->next_seq = 1; + } + } + + /* And now move on to the next item */ + first = next; + } + + CALL(j->cleanup_done, j, orig_first_seq, first ? first->seq : ~((u64) 0)); +} + +void +lfjour_init(struct lfjour *j, struct settle_config *scf) +{ + /* Expecting all other fields to be initialized to zeroes by the caller */ + ASSERT_DIE(j->loop); + ASSERT_DIE(j->item_size >= sizeof(struct lfjour_item)); + + j->item_size = BIRD_CPU_ALIGN(j->item_size); + j->item_count = (page_size - sizeof(struct lfjour_block)) / j->item_size; + + j->next_seq = 1; + j->announce_kick_event = (event) { + .hook = lfjour_announce_kick_hook, + .data = j, + }; + j->announce_timer = SETTLE_INIT(scf, lfjour_announce_settle_hook, j); + j->cleanup_event = (event) { + .hook = lfjour_cleanup_hook, + .data = j, + }; +} diff --git a/lib/lockfree.h b/lib/lockfree.h index 23ab3523..7b2ecf05 100644 --- a/lib/lockfree.h +++ b/lib/lockfree.h @@ -1,8 +1,8 @@ /* * BIRD Library -- Generic lock-free structures * - * (c) 2023 Maria Matejka - * (c) 2023 CZ.NIC, z.s.p.o. + * (c) 2023--2024 Maria Matejka + * (c) 2023--2024 CZ.NIC, z.s.p.o. * * Can be freely distributed and used under the terms of the GNU GPL. */ @@ -12,6 +12,8 @@ #include "lib/event.h" #include "lib/rcu.h" +#include "lib/settle.h" +#include "lib/tlists.h" #include @@ -147,4 +149,120 @@ lfuc_init(struct lfuc *c) atomic_store_explicit(&c->uc, 1, memory_order_release); } + +/** + * Lock-free journal. + */ + +/* Journal item. Put LFJOUR_ITEM_INHERIT(name) into your structure + * to inherit lfjour_item */ +#define LFJOUR_ITEM \ + u64 seq; \ + +struct lfjour_item { + LFJOUR_ITEM; +}; + +#define LFJOUR_ITEM_INHERIT(name) union { \ + struct lfjour_item name; \ + struct { LFJOUR_ITEM; }; \ +} + +/* Journal item block. Internal structure, no need to check out. */ +#define TLIST_PREFIX lfjour_block +#define TLIST_TYPE struct lfjour_block +#define TLIST_ITEM n +#define TLIST_WANT_ADD_TAIL + +struct lfjour_block { + TLIST_DEFAULT_NODE; + _Atomic u32 end; + _Atomic _Bool not_last; + + struct lfjour_item _block[0]; +}; + +/* Defines lfjour_block_list */ +#include "lib/tlists.h" + +/* Journal recipient. Inherit this in your implementation. */ +#define TLIST_PREFIX lfjour_recipient +#define TLIST_TYPE struct lfjour_recipient +#define TLIST_ITEM n +#define TLIST_WANT_ADD_TAIL +#define TLIST_WANT_WALK + +struct lfjour_recipient { + TLIST_DEFAULT_NODE; + event *event; /* Event running when something is in the journal */ + event_list *target; /* Event target */ + struct lfjour_item * _Atomic last; /* Last item processed */ + struct lfjour_item *cur; /* Processing this now */ + _Atomic u64 recipient_flags; /* LFJOUR_R_* */ +}; + +enum lfjour_recipient_flags { + LFJOUR_R_SEQ_RESET = 1, /* Signalling of sequence number reset */ +}; + +/* Defines lfjour_recipient_list */ +#include "lib/tlists.h" + +/* Journal base structure. Include this. */ +struct lfjour { + struct domain_generic *domain; /* The journal itself belongs to this domain (if different from the loop) */ + struct birdloop *loop; /* Cleanup loop */ + u32 item_size, item_count; /* Allocation parameters */ + struct lfjour_block_list pending; /* List of packed journal blocks */ + struct lfjour_item * _Atomic first; /* First journal item to announce */ + struct lfjour_item *open; /* Journal item in progress */ + u64 next_seq; /* Next export to push has this ID */ + struct lfjour_recipient_list recipients; /* Announce updates to these */ + event announce_kick_event; /* Kicks announce_timer */ + struct settle announce_timer; /* Announces changes to recipients */ + event cleanup_event; /* Runs the journal cleanup routine */ + + /* Callback on item removal from journal */ + void (*item_done)(struct lfjour *, struct lfjour_item *); + + /* Callback when the cleanup routine is ending */ + void (*cleanup_done)(struct lfjour *, u64 begin_seq, u64 end_seq); +}; + +struct lfjour_item *lfjour_push_prepare(struct lfjour *); +void lfjour_push_commit(struct lfjour *); + +struct lfjour_item *lfjour_get(struct lfjour_recipient *); +void lfjour_release(struct lfjour_recipient *); +static inline _Bool lfjour_reset_seqno(struct lfjour_recipient *r) +{ + return atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_SEQ_RESET, memory_order_acq_rel) & LFJOUR_R_SEQ_RESET; +} + +void lfjour_announce_now(struct lfjour *); +u64 lfjour_pending_items(struct lfjour *); + +static inline void lfjour_schedule_cleanup(struct lfjour *j) +{ ev_send_loop(j->loop, &j->cleanup_event); } + +static inline void lfjour_do_cleanup_now(struct lfjour *j) +{ + /* This requires the caller to own the cleanup event loop */ + ev_postpone(&j->cleanup_event); + j->cleanup_event.hook(j->cleanup_event.data); +} + +void lfjour_register(struct lfjour *, struct lfjour_recipient *); +void lfjour_unregister(struct lfjour_recipient *); +static inline uint lfjour_count_recipients(struct lfjour *j) +{ return TLIST_LENGTH(lfjour_recipient, &j->recipients); } + +void lfjour_init(struct lfjour *, struct settle_config *); + + +static inline struct lfjour *lfjour_of_recipient(struct lfjour_recipient *r) +{ + struct lfjour_recipient_list *list = lfjour_recipient_enlisted(r); + return list ? SKIP_BACK(struct lfjour, recipients, list) : NULL; +} #endif