/* * BIRD Library -- Generic lock-free structures * * (c) 2023--2024 Maria Matejka * (c) 2023--2024 CZ.NIC, z.s.p.o. * * Can be freely distributed and used under the terms of the GNU GPL. */ #include "lib/birdlib.h" #include "lib/lockfree.h" #define LOCAL_DEBUG void lfuc_unlock_deferred(struct deferred_call *dc) { SKIP_BACK_DECLARE(struct lfuc_unlock_queue_item, luqi, dc, dc); lfuc_unlock_immediately(luqi->c, luqi->el, luqi->ev); } #if 0 #define lfjour_debug(...) log(L_TRACE __VA_ARGS__) #define lfjour_debug_detailed(...) log(L_TRACE __VA_ARGS__) #elif 0 #define lfjour_debug(...) log(L_TRACE __VA_ARGS__) #define lfjour_debug_detailed(...) #else #define lfjour_debug(...) #define lfjour_debug_detailed(...) #endif #define LBI(j, b, p) ((struct lfjour_item *)(((void *) (b)->_block) + ((j)->item_size * (p)))) #define LBP(j, b, i) ({ \ off_t off = ((void *) (i)) - ((void *) (b)->_block); \ u32 s = (j)->item_size; \ ASSERT_DIE(off < page_size); \ ASSERT_DIE((off % s) == 0); \ off / s; \ }) static void lfjour_return_cleanup_token(struct lfjour *j, struct lfjour_recipient *r) { /* Eligible for requesting cleanup */ if (!(atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER)) return; /* Return the lastrunner's token. */ u64 pings = atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); /* Is this the last token? */ if (pings > 1) return; ASSERT_DIE(pings != 0); /* No more cleanup tokens issued, request cleanup. */ lfjour_schedule_cleanup(j); } struct lfjour_item * lfjour_push_prepare(struct lfjour *j) { ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain)); ASSERT_DIE(!j->open); if (EMPTY_TLIST(lfjour_block, &j->pending) && EMPTY_TLIST(lfjour_recipient, &j->recipients)) return NULL; struct lfjour_block *block = NULL; u32 end = 0; if (!EMPTY_TLIST(lfjour_block, &j->pending)) { block = j->pending.last; end = atomic_load_explicit(&block->end, memory_order_relaxed); if (end >= j->item_count) { ASSERT_DIE(end == j->item_count); block = NULL; end = 0; } } if (!block) { block = alloc_page(); lfjour_debug("lfjour(%p)_push_prepare: allocating block %p", j, block); *block = (struct lfjour_block) {}; lfjour_block_add_tail(&j->pending, block); } struct lfjour_item *i = LBI(j, block, end); *i = (struct lfjour_item) { .seq = j->next_seq++, }; return j->open = i; } void lfjour_push_commit(struct lfjour *j) { ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain)); ASSERT_DIE(j->open); struct lfjour_block *b = PAGE_HEAD(j->open); ASSERT_DIE(b == j->pending.last); lfjour_debug("lfjour(%p)_push_commit of %p, seq=%lu", j, j->open, j->open->seq); u32 end = atomic_fetch_add_explicit(&b->end, 1, memory_order_release); ASSERT_DIE(j->open == LBI(j, b, end)); if (end == 0) { struct lfjour_block *prev = b->n.prev; bool f = 0; if (prev) ASSERT_DIE(atomic_compare_exchange_strong_explicit(&prev->not_last, &f, 1, memory_order_release, memory_order_relaxed)); } /* Store the first item to announce (only if this is actually the first one). */ struct lfjour_item *null_item = NULL; if (atomic_compare_exchange_strong_explicit( &j->first, &null_item, j->open, memory_order_acq_rel, memory_order_relaxed)) { lfjour_debug("lfjour(%p) first set", j); } j->open = NULL; if (!ev_active(&j->announce_kick_event)) ev_send_loop(j->loop, &j->announce_kick_event); } static struct lfjour_item * lfjour_get_next(struct lfjour *j, const struct lfjour_item *last) { /* This is lockless, no domain checks. */ if (!last) { struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire); return first; } struct lfjour_block *block = PAGE_HEAD(last); ASSERT_DIE(block); u32 end = atomic_load_explicit(&block->end, memory_order_acquire); u32 pos = LBP(j, block, last); ASSERT_DIE(pos < end); /* Next is in the same block. */ if (++pos < end) return LBI(j, block, pos); /* There is another block. */ if (atomic_load_explicit(&block->not_last, memory_order_acquire)) { /* To avoid rare race conditions, we shall check the current block end once again */ u32 new_end = atomic_load_explicit(&block->end, memory_order_acquire); ASSERT_DIE(new_end >= end); if (new_end > end) return LBI(j, block, pos); /* Nothing in the previous one, let's move to the next block. * This is OK to do non-atomically because of the not_last flag. */ block = block->n.next; return LBI(j, block, 0); } /* There is nothing more. */ return NULL; } struct lfjour_item * lfjour_get(struct lfjour_recipient *r) { struct lfjour *j = lfjour_of_recipient(r); const struct lfjour_item *last = r->cur; struct lfjour_item *next = NULL; if (last) next = lfjour_get_next(j, r->cur); else { /* The last pointer may get cleaned up under our hands. * Indicating that we're using it, by RCU read. */ rcu_read_lock(); last = atomic_load_explicit(&r->last, memory_order_acquire); next = lfjour_get_next(j, last); rcu_read_unlock(); } if (last) { lfjour_debug_detailed("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, last %p", j, r, next, next ? next->seq : 0ULL, last); } else { lfjour_debug("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, clean", j, r, next, next ? next->seq : 0ULL); } if (!next) return NULL; if (!r->first_holding_seq) r->first_holding_seq = next->seq; return r->cur = next; } void lfjour_release(struct lfjour_recipient *r, const struct lfjour_item *it) { /* Find out what we actually released last */ rcu_read_lock(); const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire); struct lfjour_block *last_block = last ? PAGE_HEAD(last) : NULL; rcu_read_unlock(); /* This is lockless, no domain checks. */ ASSERT_DIE(r->cur); /* Partial or full release? */ ASSERT_DIE(r->first_holding_seq); ASSERT_DIE(it->seq >= r->first_holding_seq); if (it->seq < r->cur->seq) { lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, partial upto seq=%lu", j, r, it, it->seq); r->first_holding_seq = it->seq + 1; atomic_store_explicit(&r->last, it, memory_order_release); return; } struct lfjour_block *block = PAGE_HEAD(r->cur); u32 end = atomic_load_explicit(&block->end, memory_order_acquire); struct lfjour *j = lfjour_of_recipient(r); u32 pos = LBP(j, block, r->cur); ASSERT_DIE(pos < end); /* Releasing this export for cleanup routine */ if (pos + 1 == end) { lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, seq=%lu (end)", j, r, r->cur, r->cur->seq); } else { lfjour_debug_detailed("lfjour(%p)_release(recipient=%p) of %p, seq=%lu (mid)", j, r, r->cur, r->cur->seq); } atomic_store_explicit(&r->last, r->cur, memory_order_release); /* The last block may be available to free */ if ((pos + 1 == end) || last && (last_block != block)) lfjour_return_cleanup_token(j, r); r->first_holding_seq = 0; r->cur = NULL; } void lfjour_announce_now(struct lfjour *j) { ASSERT_DIE(birdloop_inside(j->loop)); settle_cancel(&j->announce_timer); ev_postpone(&j->announce_kick_event); if (EMPTY_TLIST(lfjour_recipient, &j->recipients)) return lfjour_schedule_cleanup(j); WALK_TLIST(lfjour_recipient, r, &j->recipients) if (r->event) ev_send(r->target, r->event); } static void lfjour_announce_settle_hook(struct settle *s) { return lfjour_announce_now(SKIP_BACK(struct lfjour, announce_timer, s)); } static void lfjour_announce_kick_hook(void *_j) { struct lfjour *j = _j; settle_kick(&j->announce_timer, j->loop); } u64 lfjour_pending_items(struct lfjour *j) { ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain)); struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_relaxed); if (!first) return 0; ASSERT_DIE(j->next_seq > first->seq); return j->next_seq - first->seq; } void lfjour_register(struct lfjour *j, struct lfjour_recipient *r) { ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain)); ASSERT_DIE(!r->event == !r->target); atomic_store_explicit(&r->last, NULL, memory_order_relaxed); ASSERT_DIE(!r->cur); lfjour_recipient_add_tail(&j->recipients, r); if (j->issued_tokens < j->max_tokens) { /* Cleanup hook does not run, so we are sure j->issued_tokens is not increasing */ atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel); atomic_fetch_or_explicit(&r->recipient_flags, LFJOUR_R_LAST_RUNNER, memory_order_acq_rel); } } void lfjour_unregister(struct lfjour_recipient *r) { struct lfjour *j = lfjour_of_recipient(r); ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain)); if (r->cur) lfjour_release(r, r->cur); lfjour_recipient_rem_node(&j->recipients, r); lfjour_return_cleanup_token(j, r); } static inline void lfjour_cleanup_unlock_helper(struct domain_generic **dg) { if (!*dg) return; DG_UNLOCK(*dg); } static bool lfjour_issue_cleanup_token(struct lfjour *j, struct lfjour_recipient* rec) { /* This function keeps the invariant that the total number of issued * LFJOUR_R_LAST_RUNNER flags is the same as j->issued_tokens. * * Returs true if the token was successfully issued. */ /* The journal is not empty and the recipient is on seq_max, * so it won't call cleanup */ const struct lfjour_item *last = atomic_load_explicit(&rec->last, memory_order_acquire); if (last && last->seq == (j->next_seq - 1)) return false; /* Take a token from the pile */ if (atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel) >= j->max_tokens) { /* No more tokens to issue, already at max */ atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); return false; } /* Trying to give the token */ if (atomic_fetch_or_explicit(&rec->recipient_flags, LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER) { /* This recipient already has the token */ atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); return false; } /* Has't the recipient finished inbetween? (Recheck.) */ last = atomic_load_explicit(&rec->last, memory_order_acquire); if (last && last->seq == (j->next_seq - 1)) { /* It has! Retreat! */ if (atomic_fetch_and_explicit(&rec->recipient_flags, ~LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER) { /* The flag was still set, we managed to get it back safely. * Now the recipient won't call the cleanup. */ atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); return false; } /* The recipient was quick enough to finish the task and * grab the flag. Now we are going to get pinged anyway * so we consider the flag to be issued. */ } /* Now the token is issued. */ return true; } static void lfjour_cleanup_done(struct lfjour *j) { /* Returning the cleanup token. */ u64 pings = atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); /* Somebody else is also holding a token, nothing to do. * They'll call us when the time comes. */ if (pings > 1) return; ASSERT_DIE(pings == 1); /* We'll need to know whether the journal is empty or not. */ struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire); /* No recipients, schedule one more cleanup if not empty */ if (EMPTY_TLIST(lfjour_recipient, &j->recipients)) { if (first) lfjour_schedule_cleanup(j); return; } /* There are some recipients but nothing to clean. * Somebody is going to wake us, let's just throw * the token to the crowd. */ if (!first) { WALK_TLIST(lfjour_recipient, r, &j->recipients) /* If we failed to issue a cleanup token, there is definitely * somebody holding it, no more tokens needed. */ if (! lfjour_issue_cleanup_token(j, r)) { ASSERT_DIE(atomic_load_explicit(&j->issued_tokens, memory_order_acquire) > 0); return; } } /* We have to find some recipient which has not * yet finished. When we find it, it will happily * accept the token so we are done. */ WALK_TLIST(lfjour_recipient, r, &j->recipients) if (lfjour_issue_cleanup_token(j, r)) return; /* Nobody needs anything, but the journal is not empty. * Run cleanup again. */ lfjour_schedule_cleanup(j); } static void lfjour_cleanup_hook(void *_j) { struct lfjour *j = _j; CLEANUP(lfjour_cleanup_unlock_helper) struct domain_generic *_locked = j->domain; if (_locked) DG_LOCK(_locked); /* We consider ourselves holding one token to keep recipients * from scheduling us too early. */ atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel); u64 min_seq = ~((u64) 0); const struct lfjour_item *last_item_to_free = NULL; struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire); if (!first) { lfjour_cleanup_done(j); /* Nothing to cleanup, actually, just call the done callback */ ASSERT_DIE(EMPTY_TLIST(lfjour_block, &j->pending)); CALL(j->cleanup_done, j, 0, ~((u64) 0)); return; } WALK_TLIST(lfjour_recipient, r, &j->recipients) { const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire); if (!last) { /* No last export means that the channel has exported nothing since * last cleanup. Let's try to give them the token, * and we're done anyway. */ lfjour_issue_cleanup_token(j, r); lfjour_cleanup_done(j); return; } else if (min_seq > last->seq) { min_seq = last->seq; last_item_to_free = last; } } /* Here we're sure that no receiver is going to use the first pointer soon. * It is only used when the receiver's last pointer is NULL, which is avoided by the code above. * Thus, we can just move the journal's first pointer forward. */ struct lfjour_item *next = last_item_to_free ? lfjour_get_next(j, last_item_to_free) : NULL; atomic_store_explicit(&j->first, next, memory_order_release); lfjour_debug("lfjour(%p) set first=%p (was %p)", j, next, first); WALK_TLIST(lfjour_recipient, r, &j->recipients) { const struct lfjour_item *last = last_item_to_free; /* This either succeeds if this item is the most-behind-one, * or fails and gives us the actual last for debug output. */ if (atomic_compare_exchange_strong_explicit( &r->last, &last, NULL, memory_order_acq_rel, memory_order_acquire)) { lfjour_debug("lfjour(%p)_cleanup(recipient=%p): store last=NULL", j, r); /* The most-behind-one gets the cleanup token */ lfjour_issue_cleanup_token(j, r); } else { lfjour_debug("lfjour(%p)_cleanup(recipient=%p): keep last=%p", j, r, last); } } /* Now some recipients may have old last-pointers. We have to wait * until they finish their routine, before we start cleaning up. */ synchronize_rcu(); u64 orig_first_seq = first->seq; /* Now we do the actual cleanup */ while (first && (first->seq <= min_seq)) { j->item_done(j, first); /* Find next journal item */ struct lfjour_item *next = lfjour_get_next(j, first); if (PAGE_HEAD(next) != PAGE_HEAD(first)) { /* This was the last one in its block */ struct lfjour_block *block = PAGE_HEAD(first); lfjour_debug("lfjour(%p)_cleanup: freeing block %p", j, block); ASSERT_DIE(block == j->pending.first); /* Free this block */ lfjour_block_rem_node(&j->pending, block); /* Wait for possible pending readers of the block */ synchronize_rcu(); /* Now we can finally drop the block */ #ifdef LOCAL_DEBUG memset(block, 0xbe, page_size); #endif free_page(block); /* If no more blocks are remaining, we shall reset * the sequence numbers */ if (EMPTY_TLIST(lfjour_block, &j->pending)) { lfjour_debug("lfjour(%p)_cleanup: seq reset", j); WALK_TLIST(lfjour_recipient, r, &j->recipients) atomic_fetch_or_explicit(&r->recipient_flags, LFJOUR_R_SEQ_RESET, memory_order_acq_rel); j->next_seq = 1; } } /* And now move on to the next item */ first = next; } lfjour_cleanup_done(j); CALL(j->cleanup_done, j, orig_first_seq, first ? first->seq : ~((u64) 0)); } void lfjour_init(struct lfjour *j, struct settle_config *scf) { /* Expecting all other fields to be initialized to zeroes by the caller */ ASSERT_DIE(j->loop); ASSERT_DIE(j->item_size >= sizeof(struct lfjour_item)); j->item_size = BIRD_CPU_ALIGN(j->item_size); j->item_count = (page_size - sizeof(struct lfjour_block)) / j->item_size; j->next_seq = 1; j->announce_kick_event = (event) { .hook = lfjour_announce_kick_hook, .data = j, }; j->announce_timer = SETTLE_INIT(scf, lfjour_announce_settle_hook, j); j->cleanup_event = (event) { .hook = lfjour_cleanup_hook, .data = j, }; j->max_tokens = 20; }