/* * BIRD Library -- Generic lock-free structures * * (c) 2023--2024 Maria Matejka * (c) 2023--2024 CZ.NIC, z.s.p.o. * * Can be freely distributed and used under the terms of the GNU GPL. */ #ifndef _BIRD_LOCKFREE_H_ #define _BIRD_LOCKFREE_H_ #include "lib/event.h" #include "lib/rcu.h" #include "lib/settle.h" #include "lib/tlists.h" #include "lib/io-loop.h" #include /** * Lock-free usecounts. */ struct lfuc { _Atomic u64 uc; }; #define LFUC_PU_SHIFT 44 #define LFUC_IN_PROGRESS (1ULL << LFUC_PU_SHIFT) /** * lfuc_lock - increase an atomic usecount * @c: the usecount structure */ static inline u64 lfuc_lock(struct lfuc *c) { /* Locking is trivial; somebody already holds the underlying data structure * so we just increase the use count. Nothing can be freed underneath our hands. */ u64 uc = atomic_fetch_add_explicit(&c->uc, 1, memory_order_acq_rel); ASSERT_DIE(uc > 0); return uc & (LFUC_IN_PROGRESS - 1); } /** * lfuc_lock_revive - increase an atomic usecount even if it's zero * @c: the usecount structure * * If the caller is sure that they can't collide with the prune routine, * they can call this even on structures with already zeroed usecount. * Handy for situations with flapping routes. Use only from the same * loop as which runs the prune routine. */ static inline u64 lfuc_lock_revive(struct lfuc *c) { u64 uc = atomic_fetch_add_explicit(&c->uc, 1, memory_order_acq_rel); return uc & (LFUC_IN_PROGRESS - 1); } /** * lfuc_unlock_immediately - decrease an atomic usecount * @c: the usecount structure * @el: prune event list * @ev: prune event itself * * If the usecount reaches zero, a prune event is run to possibly free the object. * The prune event MUST use lfuc_finished() to check the object state. */ static inline void lfuc_unlock_immediately(struct lfuc *c, event_list *el, event *ev) { /* Unlocking is tricky. We do it lockless so at the same time, the prune * event may be running, therefore if the unlock gets us to zero, it must be * the last thing in this routine, otherwise the prune routine may find the * source's usecount zeroed, freeing it prematurely. * * The usecount is split into two parts: * the top 20 bits are an in-progress indicator * the bottom 44 bits keep the actual usecount. * * Therefore at most 1 million of writers can simultaneously unlock the same * structure, while at most ~17T different places can reference it. Both limits * are insanely high from the 2022 point of view. Let's suppose that when 17T * routes or 1M peers/tables get real, we get also 128bit atomic variables in the * C norm. */ /* First, we push the in-progress indicator */ u64 uc = atomic_fetch_add_explicit(&c->uc, LFUC_IN_PROGRESS, memory_order_acq_rel); /* Then we split the indicator to its parts. Remember, we got the value * before the operation happened so we're re-doing the operation locally * to get a view how the indicator _would_ look if nobody else was interacting. */ u64 pending = (uc >> LFUC_PU_SHIFT) + 1; uc &= LFUC_IN_PROGRESS - 1; /* Obviously, there can't be more pending unlocks than the usecount itself */ if (uc == pending) /* If we're the last unlocker (every owner is already unlocking), schedule * the owner's prune event */ ev_send(el, ev); else ASSERT_DIE(uc > pending); /* And now, finally, simultaneously pop the in-progress indicator and the * usecount, possibly allowing the pruning routine to free this structure */ uc = atomic_fetch_sub_explicit(&c->uc, LFUC_IN_PROGRESS + 1, memory_order_acq_rel); // return uc - LFUC_IN_PROGRESS - 1; } extern _Thread_local struct lfuc_unlock_queue { event e; u32 pos; struct lfuc_unlock_queue_block { struct lfuc *c; event_list *el; event *ev; } block[0]; } *lfuc_unlock_queue; void lfuc_unlock_deferred(void *queue); static inline void lfuc_unlock(struct lfuc *c, event_list *el, event *ev) { static u32 queue_items = 0; if (queue_items == 0) { ASSERT_DIE((u64) page_size > sizeof(struct lfuc_unlock_queue) + sizeof(struct lfuc_unlock_queue_block)); queue_items = (page_size - OFFSETOF(struct lfuc_unlock_queue, block)) / sizeof lfuc_unlock_queue->block[0]; } if (!lfuc_unlock_queue || (lfuc_unlock_queue->pos >= queue_items)) { lfuc_unlock_queue = alloc_page(); *lfuc_unlock_queue = (struct lfuc_unlock_queue) { .e = { .hook = lfuc_unlock_deferred, .data = lfuc_unlock_queue, }, }; ev_send_this_thread(&lfuc_unlock_queue->e); } lfuc_unlock_queue->block[lfuc_unlock_queue->pos++] = (struct lfuc_unlock_queue_block) { .c = c, .el = el, .ev = ev, }; } /** * lfuc_finished - auxiliary routine for prune event * @c: usecount structure * * This routine simply waits until all unlockers finish their job and leave * the critical section of lfuc_unlock(). Then we decide whether the usecount * is indeed zero or not, and therefore whether the structure is free to be freed. */ static inline _Bool lfuc_finished(struct lfuc *c) { u64 uc; /* Wait until all unlockers finish */ while ((uc = atomic_load_explicit(&c->uc, memory_order_acquire)) >> LFUC_PU_SHIFT) birdloop_yield(); /* All of them are now done and if the usecount is now zero, then we're * the last place to reference the object and we can call it finished. */ return (uc == 0); } /** * lfuc_init - auxiliary routine for usecount initialization * @c: usecount structure * * Called on object initialization, sets the usecount to an initial one to make * sure that the prune routine doesn't free it before somebody else references it. */ static inline void lfuc_init(struct lfuc *c) { atomic_store_explicit(&c->uc, 1, memory_order_release); } /** * Lock-free journal. */ /* Journal item. Put LFJOUR_ITEM_INHERIT(name) into your structure * to inherit lfjour_item */ #define LFJOUR_ITEM \ u64 seq; \ struct lfjour_item { LFJOUR_ITEM; }; #define LFJOUR_ITEM_INHERIT(name) union { \ struct lfjour_item name; \ struct { LFJOUR_ITEM; }; \ } /* Journal item block. Internal structure, no need to check out. */ #define TLIST_PREFIX lfjour_block #define TLIST_TYPE struct lfjour_block #define TLIST_ITEM n #define TLIST_WANT_ADD_TAIL struct lfjour_block { TLIST_DEFAULT_NODE; _Atomic u32 end; _Atomic _Bool not_last; struct lfjour_item _block[0]; }; /* Defines lfjour_block_list */ #include "lib/tlists.h" /* Journal recipient. Inherit this in your implementation. */ #define TLIST_PREFIX lfjour_recipient #define TLIST_TYPE struct lfjour_recipient #define TLIST_ITEM n #define TLIST_WANT_ADD_TAIL #define TLIST_WANT_WALK struct lfjour_recipient { TLIST_DEFAULT_NODE; event *event; /* Event running when something is in the journal */ event_list *target; /* Event target */ struct lfjour_item * _Atomic last; /* Last item processed */ struct lfjour_item *cur; /* Processing this now */ _Atomic u64 recipient_flags; /* LFJOUR_R_* */ }; enum lfjour_recipient_flags { LFJOUR_R_SEQ_RESET = 1, /* Signalling of sequence number reset */ }; /* Defines lfjour_recipient_list */ #include "lib/tlists.h" /* Journal base structure. Include this. */ struct lfjour { struct domain_generic *domain; /* The journal itself belongs to this domain (if different from the loop) */ struct birdloop *loop; /* Cleanup loop */ u32 item_size, item_count; /* Allocation parameters */ struct lfjour_block_list pending; /* List of packed journal blocks */ struct lfjour_item * _Atomic first; /* First journal item to announce */ struct lfjour_item *open; /* Journal item in progress */ u64 next_seq; /* Next export to push has this ID */ struct lfjour_recipient_list recipients; /* Announce updates to these */ event announce_kick_event; /* Kicks announce_timer */ struct settle announce_timer; /* Announces changes to recipients */ event cleanup_event; /* Runs the journal cleanup routine */ /* Callback on item removal from journal */ void (*item_done)(struct lfjour *, struct lfjour_item *); /* Callback when the cleanup routine is ending */ void (*cleanup_done)(struct lfjour *, u64 begin_seq, u64 end_seq); }; struct lfjour_item *lfjour_push_prepare(struct lfjour *); void lfjour_push_commit(struct lfjour *); struct lfjour_item *lfjour_get(struct lfjour_recipient *); void lfjour_release(struct lfjour_recipient *); static inline _Bool lfjour_reset_seqno(struct lfjour_recipient *r) { return atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_SEQ_RESET, memory_order_acq_rel) & LFJOUR_R_SEQ_RESET; } void lfjour_announce_now(struct lfjour *); u64 lfjour_pending_items(struct lfjour *); static inline void lfjour_schedule_cleanup(struct lfjour *j) { ev_send_loop(j->loop, &j->cleanup_event); } static inline void lfjour_do_cleanup_now(struct lfjour *j) { /* This requires the caller to own the cleanup event loop */ ev_postpone(&j->cleanup_event); j->cleanup_event.hook(j->cleanup_event.data); } void lfjour_register(struct lfjour *, struct lfjour_recipient *); void lfjour_unregister(struct lfjour_recipient *); static inline uint lfjour_count_recipients(struct lfjour *j) { return TLIST_LENGTH(lfjour_recipient, &j->recipients); } void lfjour_init(struct lfjour *, struct settle_config *); static inline struct lfjour *lfjour_of_recipient(struct lfjour_recipient *r) { struct lfjour_recipient_list *list = lfjour_recipient_enlisted(r); return list ? SKIP_BACK(struct lfjour, recipients, list) : NULL; } #endif