0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2025-01-10 19:11:54 +00:00

Lock free journal refactored into a separate data structure

This commit is contained in:
Maria Matejka 2024-02-29 14:03:30 +01:00 committed by Katerina Kubecova
parent cb6e7123c4
commit 3787c0f7a5
3 changed files with 534 additions and 3 deletions

View File

@ -1,4 +1,4 @@
src := a-path.c a-set.c bitmap.c bitops.c blake2s.c blake2b.c checksum.c event.c flowspec.c idm.c ip.c lists.c mac.c md5.c mempool.c net.c netindex.c patmatch.c printf.c rcu.c resource.c sha1.c sha256.c sha512.c slab.c slists.c strtoul.c tbf.c timer.c xmalloc.c src := a-path.c a-set.c bitmap.c bitops.c blake2s.c blake2b.c checksum.c event.c flowspec.c idm.c ip.c lists.c lockfree.c mac.c md5.c mempool.c net.c netindex.c patmatch.c printf.c rcu.c resource.c sha1.c sha256.c sha512.c slab.c slists.c strtoul.c tbf.c timer.c xmalloc.c
obj := $(src-o-files) obj := $(src-o-files)
$(all-daemon) $(all-daemon)

413
lib/lockfree.c Normal file
View File

@ -0,0 +1,413 @@
/*
* BIRD Library -- Generic lock-free structures
*
* (c) 2023--2024 Maria Matejka <mq@jmq.cz>
* (c) 2023--2024 CZ.NIC, z.s.p.o.
*
* Can be freely distributed and used under the terms of the GNU GPL.
*/
#include "lib/birdlib.h"
#include "lib/lockfree.h"
#define LOCAL_DEBUG
#if 0
#define lfjour_debug(...) log(L_TRACE __VA_ARGS__)
#define lfjour_debug_detailed(...) log(L_TRACE __VA_ARGS__)
#elif 0
#define lfjour_debug(...) log(L_TRACE __VA_ARGS__)
#define lfjour_debug_detailed(...)
#else
#define lfjour_debug(...)
#define lfjour_debug_detailed(...)
#endif
#define LBI(j, b, p) ((struct lfjour_item *)(((void *) (b)->_block) + ((j)->item_size * (p))))
#define LBP(j, b, i) ({ \
off_t off = ((void *) (i)) - ((void *) (b)->_block); \
u32 s = (j)->item_size; \
ASSERT_DIE(off < page_size); \
ASSERT_DIE((off % s) == 0); \
off / s; \
})
struct lfjour_item *
lfjour_push_prepare(struct lfjour *j)
{
ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
ASSERT_DIE(!j->open);
if (EMPTY_TLIST(lfjour_block, &j->pending) &&
EMPTY_TLIST(lfjour_recipient, &j->recipients))
return NULL;
struct lfjour_block *block = NULL;
u32 end = 0;
if (!EMPTY_TLIST(lfjour_block, &j->pending))
{
block = j->pending.last;
end = atomic_load_explicit(&block->end, memory_order_relaxed);
if (end >= j->item_count)
{
ASSERT_DIE(end == j->item_count);
block = NULL;
end = 0;
}
}
if (!block)
{
block = alloc_page();
lfjour_debug("lfjour(%p)_push_prepare: allocating block %p", j, block);
*block = (struct lfjour_block) {};
lfjour_block_add_tail(&j->pending, block);
}
struct lfjour_item *i = LBI(j, block, end);
*i = (struct lfjour_item) {
.seq = j->next_seq++,
};
return j->open = i;
}
void
lfjour_push_commit(struct lfjour *j)
{
ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
ASSERT_DIE(j->open);
struct lfjour_block *b = PAGE_HEAD(j->open);
ASSERT_DIE(b == j->pending.last);
lfjour_debug("lfjour(%p)_push_commit of %p, seq=%lu", j, j->open, j->open->seq);
u32 end = atomic_fetch_add_explicit(&b->end, 1, memory_order_release);
ASSERT_DIE(j->open == LBI(j, b, end));
if (end == 0)
{
struct lfjour_block *prev = b->n.prev;
_Bool f = 0;
if (prev)
ASSERT_DIE(atomic_compare_exchange_strong_explicit(&prev->not_last, &f, 1,
memory_order_release, memory_order_relaxed));
}
/* Store the first item to announce (only if this is actually the first one). */
struct lfjour_item *null_item = NULL;
if (atomic_compare_exchange_strong_explicit(
&j->first, &null_item, j->open,
memory_order_acq_rel, memory_order_relaxed))
{
lfjour_debug("lfjour(%p) first set", j);
}
j->open = NULL;
if (!ev_active(&j->announce_kick_event))
ev_send_loop(j->loop, &j->announce_kick_event);
}
static struct lfjour_item *
lfjour_get_next(struct lfjour *j, struct lfjour_item *last)
{
/* This is lockless, no domain checks. */
if (!last)
{
struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire);
return first;
}
struct lfjour_block *block = PAGE_HEAD(last);
ASSERT_DIE(block);
u32 end = atomic_load_explicit(&block->end, memory_order_acquire);
u32 pos = LBP(j, block, last);
ASSERT_DIE(pos < end);
/* Next is in the same block. */
if (++pos < end)
return LBI(j, block, pos);
/* There is another block. */
if (atomic_load_explicit(&block->not_last, memory_order_acquire))
{
/* To avoid rare race conditions, we shall check the current block end once again */
u32 new_end = atomic_load_explicit(&block->end, memory_order_acquire);
ASSERT_DIE(new_end >= end);
if (new_end > end)
return LBI(j, block, pos);
/* Nothing in the previous one, let's move to the next block.
* This is OK to do non-atomically because of the not_last flag. */
block = block->n.next;
return LBI(j, block, 0);
}
/* There is nothing more. */
return NULL;
}
struct lfjour_item *
lfjour_get(struct lfjour_recipient *r)
{
ASSERT_DIE(r->cur == NULL);
struct lfjour *j = lfjour_of_recipient(r);
/* The last pointer may get cleaned up under our hands.
* Indicating that we're using it, by RCU read. */
rcu_read_lock();
struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
r->cur = lfjour_get_next(j, last);
rcu_read_unlock();
if (last)
{
lfjour_debug_detailed("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, last %p",
j, r, r->cur, r->cur ? r->cur->seq : 0ULL, last);
}
else
{
lfjour_debug("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, clean",
j, r, r->cur, r->cur ? r->cur->seq : 0ULL);
}
return r->cur;
}
void lfjour_release(struct lfjour_recipient *r)
{
/* This is lockless, no domain checks. */
ASSERT_DIE(r->cur);
struct lfjour_block *block = PAGE_HEAD(r->cur);
u32 end = atomic_load_explicit(&block->end, memory_order_acquire);
struct lfjour *j = lfjour_of_recipient(r);
u32 pos = LBP(j, block, r->cur);
ASSERT_DIE(pos < end);
/* Releasing this export for cleanup routine */
if (pos + 1 == end)
{
lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, seq=%lu (end)",
j, r, r->cur, r->cur->seq);
}
else
{
lfjour_debug_detailed("lfjour(%p)_release(recipient=%p) of %p, seq=%lu (mid)",
j, r, r->cur, r->cur->seq);
}
atomic_store_explicit(&r->last, r->cur, memory_order_release);
/* The last block may be available to free */
if (pos + 1 == end)
lfjour_schedule_cleanup(j);
r->cur = NULL;
}
void
lfjour_announce_now(struct lfjour *j)
{
ASSERT_DIE(birdloop_inside(j->loop));
settle_cancel(&j->announce_timer);
ev_postpone(&j->announce_kick_event);
if (EMPTY_TLIST(lfjour_recipient, &j->recipients))
return lfjour_schedule_cleanup(j);
WALK_TLIST(lfjour_recipient, r, &j->recipients)
ev_send(r->target, r->event);
}
static void
lfjour_announce_settle_hook(struct settle *s)
{
return lfjour_announce_now(SKIP_BACK(struct lfjour, announce_timer, s));
}
static void
lfjour_announce_kick_hook(void *_j)
{
struct lfjour *j = _j;
settle_kick(&j->announce_timer, j->loop);
}
u64
lfjour_pending_items(struct lfjour *j)
{
ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_relaxed);
if (!first)
return 0;
ASSERT_DIE(j->next_seq > first->seq);
return j->next_seq - first->seq;
}
void
lfjour_register(struct lfjour *j, struct lfjour_recipient *r)
{
ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
ASSERT_DIE(r->event);
ASSERT_DIE(r->target);
atomic_store_explicit(&r->last, NULL, memory_order_relaxed);
ASSERT_DIE(!r->cur);
lfjour_recipient_add_tail(&j->recipients, r);
}
void
lfjour_unregister(struct lfjour_recipient *r)
{
struct lfjour *j = lfjour_of_recipient(r);
ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
lfjour_recipient_rem_node(&j->recipients, r);
lfjour_schedule_cleanup(j);
}
static inline void lfjour_cleanup_unlock_helper(struct domain_generic **dg)
{
if (!*dg) return;
DG_UNLOCK(*dg);
}
static void
lfjour_cleanup_hook(void *_j)
{
struct lfjour *j = _j;
CLEANUP(lfjour_cleanup_unlock_helper) struct domain_generic *_locked = j->domain;
if (_locked) DG_LOCK(_locked);
u64 min_seq = ~((u64) 0);
struct lfjour_item *last_item_to_free = NULL;
struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire);
if (!first)
{
/* Nothing to cleanup, actually, just call the done callback */
ASSERT_DIE(EMPTY_TLIST(lfjour_block, &j->pending));
CALL(j->cleanup_done, j, 0, ~((u64) 0));
return;
}
WALK_TLIST(lfjour_recipient, r, &j->recipients)
{
struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
if (!last)
/* No last export means that the channel has exported nothing since last cleanup */
return;
else if (min_seq > last->seq)
{
min_seq = last->seq;
last_item_to_free = last;
}
}
/* Here we're sure that no receiver is going to use the first pointer soon.
* It is only used when the receiver's last pointer is NULL, which is avoided by the code above.
* Thus, we can just move the journal's first pointer forward. */
struct lfjour_item *next = last_item_to_free ? lfjour_get_next(j, last_item_to_free) : NULL;
atomic_store_explicit(&j->first, next, memory_order_release);
lfjour_debug("lfjour(%p) set first=%p (was %p)", j, next, first);
WALK_TLIST(lfjour_recipient, r, &j->recipients)
{
struct lfjour_item *last = last_item_to_free;
/* This either succeeds if this item is the most-behind-one,
* or fails and gives us the actual last for debug output. */
if (atomic_compare_exchange_strong_explicit(
&r->last, &last, NULL,
memory_order_acq_rel, memory_order_acquire))
{
lfjour_debug("lfjour(%p)_cleanup(recipient=%p): store last=NULL", j, r);
}
else
{
lfjour_debug("lfjour(%p)_cleanup(recipient=%p): keep last=%p", j, r, last);
}
}
/* Now some recipients may have old last-pointers. We have to wait
* until they finish their routine, before we start cleaning up. */
synchronize_rcu();
u64 orig_first_seq = first->seq;
/* Now we do the actual cleanup */
while (first && (first->seq <= min_seq))
{
j->item_done(j, first);
#ifdef LOCAL_DEBUG
memset(first, 0xbd, j->item_size);
#endif
/* Find next journal item */
struct lfjour_item *next = lfjour_get_next(j, first);
if (PAGE_HEAD(next) != PAGE_HEAD(first))
{
/* This was the last one in its block */
struct lfjour_block *block = PAGE_HEAD(first);
lfjour_debug("lfjour(%p)_cleanup: freeing block %p", j, block);
ASSERT_DIE(block == j->pending.first);
/* Free this block */
lfjour_block_rem_node(&j->pending, block);
#ifdef LOCAL_DEBUG
memset(block, 0xbe, page_size);
#endif
free_page(block);
/* If no more blocks are remaining, we shall reset
* the sequence numbers */
if (EMPTY_TLIST(lfjour_block, &j->pending))
{
lfjour_debug("lfjour(%p)_cleanup: seq reset", j);
WALK_TLIST(lfjour_recipient, r, &j->recipients)
atomic_fetch_or_explicit(&r->recipient_flags, LFJOUR_R_SEQ_RESET, memory_order_acq_rel);
j->next_seq = 1;
}
}
/* And now move on to the next item */
first = next;
}
CALL(j->cleanup_done, j, orig_first_seq, first ? first->seq : ~((u64) 0));
}
void
lfjour_init(struct lfjour *j, struct settle_config *scf)
{
/* Expecting all other fields to be initialized to zeroes by the caller */
ASSERT_DIE(j->loop);
ASSERT_DIE(j->item_size >= sizeof(struct lfjour_item));
j->item_size = BIRD_CPU_ALIGN(j->item_size);
j->item_count = (page_size - sizeof(struct lfjour_block)) / j->item_size;
j->next_seq = 1;
j->announce_kick_event = (event) {
.hook = lfjour_announce_kick_hook,
.data = j,
};
j->announce_timer = SETTLE_INIT(scf, lfjour_announce_settle_hook, j);
j->cleanup_event = (event) {
.hook = lfjour_cleanup_hook,
.data = j,
};
}

View File

@ -1,8 +1,8 @@
/* /*
* BIRD Library -- Generic lock-free structures * BIRD Library -- Generic lock-free structures
* *
* (c) 2023 Maria Matejka <mq@jmq.cz> * (c) 2023--2024 Maria Matejka <mq@jmq.cz>
* (c) 2023 CZ.NIC, z.s.p.o. * (c) 2023--2024 CZ.NIC, z.s.p.o.
* *
* Can be freely distributed and used under the terms of the GNU GPL. * Can be freely distributed and used under the terms of the GNU GPL.
*/ */
@ -12,6 +12,8 @@
#include "lib/event.h" #include "lib/event.h"
#include "lib/rcu.h" #include "lib/rcu.h"
#include "lib/settle.h"
#include "lib/tlists.h"
#include <stdatomic.h> #include <stdatomic.h>
@ -147,4 +149,120 @@ lfuc_init(struct lfuc *c)
atomic_store_explicit(&c->uc, 1, memory_order_release); atomic_store_explicit(&c->uc, 1, memory_order_release);
} }
/**
* Lock-free journal.
*/
/* Journal item. Put LFJOUR_ITEM_INHERIT(name) into your structure
* to inherit lfjour_item */
#define LFJOUR_ITEM \
u64 seq; \
struct lfjour_item {
LFJOUR_ITEM;
};
#define LFJOUR_ITEM_INHERIT(name) union { \
struct lfjour_item name; \
struct { LFJOUR_ITEM; }; \
}
/* Journal item block. Internal structure, no need to check out. */
#define TLIST_PREFIX lfjour_block
#define TLIST_TYPE struct lfjour_block
#define TLIST_ITEM n
#define TLIST_WANT_ADD_TAIL
struct lfjour_block {
TLIST_DEFAULT_NODE;
_Atomic u32 end;
_Atomic _Bool not_last;
struct lfjour_item _block[0];
};
/* Defines lfjour_block_list */
#include "lib/tlists.h"
/* Journal recipient. Inherit this in your implementation. */
#define TLIST_PREFIX lfjour_recipient
#define TLIST_TYPE struct lfjour_recipient
#define TLIST_ITEM n
#define TLIST_WANT_ADD_TAIL
#define TLIST_WANT_WALK
struct lfjour_recipient {
TLIST_DEFAULT_NODE;
event *event; /* Event running when something is in the journal */
event_list *target; /* Event target */
struct lfjour_item * _Atomic last; /* Last item processed */
struct lfjour_item *cur; /* Processing this now */
_Atomic u64 recipient_flags; /* LFJOUR_R_* */
};
enum lfjour_recipient_flags {
LFJOUR_R_SEQ_RESET = 1, /* Signalling of sequence number reset */
};
/* Defines lfjour_recipient_list */
#include "lib/tlists.h"
/* Journal base structure. Include this. */
struct lfjour {
struct domain_generic *domain; /* The journal itself belongs to this domain (if different from the loop) */
struct birdloop *loop; /* Cleanup loop */
u32 item_size, item_count; /* Allocation parameters */
struct lfjour_block_list pending; /* List of packed journal blocks */
struct lfjour_item * _Atomic first; /* First journal item to announce */
struct lfjour_item *open; /* Journal item in progress */
u64 next_seq; /* Next export to push has this ID */
struct lfjour_recipient_list recipients; /* Announce updates to these */
event announce_kick_event; /* Kicks announce_timer */
struct settle announce_timer; /* Announces changes to recipients */
event cleanup_event; /* Runs the journal cleanup routine */
/* Callback on item removal from journal */
void (*item_done)(struct lfjour *, struct lfjour_item *);
/* Callback when the cleanup routine is ending */
void (*cleanup_done)(struct lfjour *, u64 begin_seq, u64 end_seq);
};
struct lfjour_item *lfjour_push_prepare(struct lfjour *);
void lfjour_push_commit(struct lfjour *);
struct lfjour_item *lfjour_get(struct lfjour_recipient *);
void lfjour_release(struct lfjour_recipient *);
static inline _Bool lfjour_reset_seqno(struct lfjour_recipient *r)
{
return atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_SEQ_RESET, memory_order_acq_rel) & LFJOUR_R_SEQ_RESET;
}
void lfjour_announce_now(struct lfjour *);
u64 lfjour_pending_items(struct lfjour *);
static inline void lfjour_schedule_cleanup(struct lfjour *j)
{ ev_send_loop(j->loop, &j->cleanup_event); }
static inline void lfjour_do_cleanup_now(struct lfjour *j)
{
/* This requires the caller to own the cleanup event loop */
ev_postpone(&j->cleanup_event);
j->cleanup_event.hook(j->cleanup_event.data);
}
void lfjour_register(struct lfjour *, struct lfjour_recipient *);
void lfjour_unregister(struct lfjour_recipient *);
static inline uint lfjour_count_recipients(struct lfjour *j)
{ return TLIST_LENGTH(lfjour_recipient, &j->recipients); }
void lfjour_init(struct lfjour *, struct settle_config *);
static inline struct lfjour *lfjour_of_recipient(struct lfjour_recipient *r)
{
struct lfjour_recipient_list *list = lfjour_recipient_enlisted(r);
return list ? SKIP_BACK(struct lfjour, recipients, list) : NULL;
}
#endif #endif