mirror of
https://gitlab.nic.cz/labs/bird.git
synced 2024-12-22 01:31:55 +00:00
b84460432a
reached end of a block of journal items or read all of journal items. lfjour_cleanup_hook() can clean only journal items every recipient has processed, so it was often called uselessly. This commit restricts most of the unusefull scheduling. Only some consumers are given a token alowing them to try to schedule the lfjour_cleanup_hook(). When a consumer wants to schedule the lfjour_cleanup_hook(), it checks whether it has a token. If yes, it decrements number of tokens the journal has given (issued_tokens) and discards its own token. If issued_tokens reaches zero, the consumer is allowed do schedule the lfjour_cleanup_hook(). There is a maximum number of tokens a journal can give to its customers (max_tokens). A new customer is given a token in its init, if the maximum number of tokens was not reached. The rest of tokens is given to customers in lfjour_cleanup_hook(). In lf_jour_cleanup_hook(), it is increased the issued_tokens number in order not to call the hook before it finishes. Then, tokens are given to the slowest recipients (but never to more than max_token recipients). Before leaving lfjour_cleanup_hook(), the issued_tokens number is decreased. If no other tokens are given, we have to make sure the lfjour_cleanup_hook will be called again. If every item in journal was read by every recipient, tokens are given to random recipients. If all recipients with tokens managed to finish until now, we give the token to first unfinished customer we find or we call the hook again.
288 lines
9.1 KiB
C
288 lines
9.1 KiB
C
/*
|
|
* BIRD Library -- Generic lock-free structures
|
|
*
|
|
* (c) 2023--2024 Maria Matejka <mq@jmq.cz>
|
|
* (c) 2023--2024 CZ.NIC, z.s.p.o.
|
|
*
|
|
* Can be freely distributed and used under the terms of the GNU GPL.
|
|
*/
|
|
|
|
#ifndef _BIRD_LOCKFREE_H_
|
|
#define _BIRD_LOCKFREE_H_
|
|
|
|
#include "lib/defer.h"
|
|
#include "lib/event.h"
|
|
#include "lib/rcu.h"
|
|
#include "lib/settle.h"
|
|
#include "lib/tlists.h"
|
|
#include "lib/io-loop.h"
|
|
|
|
#include <stdatomic.h>
|
|
|
|
/**
|
|
* Lock-free usecounts.
|
|
*/
|
|
|
|
struct lfuc {
|
|
_Atomic u64 uc;
|
|
};
|
|
|
|
#define LFUC_PU_SHIFT 44
|
|
#define LFUC_IN_PROGRESS (1ULL << LFUC_PU_SHIFT)
|
|
|
|
/**
|
|
* lfuc_lock - increase an atomic usecount
|
|
* @c: the usecount structure
|
|
*/
|
|
static inline u64 lfuc_lock(struct lfuc *c)
|
|
{
|
|
/* Locking is trivial; somebody already holds the underlying data structure
|
|
* so we just increase the use count. Nothing can be freed underneath our hands. */
|
|
u64 uc = atomic_fetch_add_explicit(&c->uc, 1, memory_order_acq_rel);
|
|
ASSERT_DIE(uc > 0);
|
|
return uc & (LFUC_IN_PROGRESS - 1);
|
|
}
|
|
|
|
/**
|
|
* lfuc_lock_revive - increase an atomic usecount even if it's zero
|
|
* @c: the usecount structure
|
|
*
|
|
* If the caller is sure that they can't collide with the prune routine,
|
|
* they can call this even on structures with already zeroed usecount.
|
|
* Handy for situations with flapping routes. Use only from the same
|
|
* loop as which runs the prune routine.
|
|
*/
|
|
static inline u64 lfuc_lock_revive(struct lfuc *c)
|
|
{
|
|
u64 uc = atomic_fetch_add_explicit(&c->uc, 1, memory_order_acq_rel);
|
|
return uc & (LFUC_IN_PROGRESS - 1);
|
|
}
|
|
|
|
/**
|
|
* lfuc_unlock_immediately - decrease an atomic usecount
|
|
* @c: the usecount structure
|
|
* @el: prune event list
|
|
* @ev: prune event itself
|
|
*
|
|
* If the usecount reaches zero, a prune event is run to possibly free the object.
|
|
* The prune event MUST use lfuc_finished() to check the object state.
|
|
*/
|
|
static inline void lfuc_unlock_immediately(struct lfuc *c, event_list *el, event *ev)
|
|
{
|
|
/* Unlocking is tricky. We do it lockless so at the same time, the prune
|
|
* event may be running, therefore if the unlock gets us to zero, it must be
|
|
* the last thing in this routine, otherwise the prune routine may find the
|
|
* source's usecount zeroed, freeing it prematurely.
|
|
*
|
|
* The usecount is split into two parts:
|
|
* the top 20 bits are an in-progress indicator
|
|
* the bottom 44 bits keep the actual usecount.
|
|
*
|
|
* Therefore at most 1 million of writers can simultaneously unlock the same
|
|
* structure, while at most ~17T different places can reference it. Both limits
|
|
* are insanely high from the 2022 point of view. Let's suppose that when 17T
|
|
* routes or 1M peers/tables get real, we get also 128bit atomic variables in the
|
|
* C norm. */
|
|
|
|
/* First, we push the in-progress indicator */
|
|
u64 uc = atomic_fetch_add_explicit(&c->uc, LFUC_IN_PROGRESS, memory_order_acq_rel);
|
|
|
|
/* Then we split the indicator to its parts. Remember, we got the value
|
|
* before the operation happened so we're re-doing the operation locally
|
|
* to get a view how the indicator _would_ look if nobody else was interacting.
|
|
*/
|
|
u64 pending = (uc >> LFUC_PU_SHIFT) + 1;
|
|
uc &= LFUC_IN_PROGRESS - 1;
|
|
|
|
/* Obviously, there can't be more pending unlocks than the usecount itself */
|
|
if (uc == pending)
|
|
/* If we're the last unlocker (every owner is already unlocking), schedule
|
|
* the owner's prune event */
|
|
ev_send(el, ev);
|
|
else
|
|
ASSERT_DIE(uc > pending);
|
|
|
|
/* And now, finally, simultaneously pop the in-progress indicator and the
|
|
* usecount, possibly allowing the pruning routine to free this structure */
|
|
uc = atomic_fetch_sub_explicit(&c->uc, LFUC_IN_PROGRESS + 1, memory_order_acq_rel);
|
|
|
|
// return uc - LFUC_IN_PROGRESS - 1;
|
|
}
|
|
|
|
struct lfuc_unlock_queue_item {
|
|
struct deferred_call dc;
|
|
struct lfuc *c;
|
|
event_list *el;
|
|
event *ev;
|
|
};
|
|
|
|
void lfuc_unlock_deferred(struct deferred_call *dc);
|
|
|
|
static inline void lfuc_unlock(struct lfuc *c, event_list *el, event *ev)
|
|
{
|
|
struct lfuc_unlock_queue_item luqi = {
|
|
.dc.hook = lfuc_unlock_deferred,
|
|
.c = c,
|
|
.el = el,
|
|
.ev = ev,
|
|
};
|
|
|
|
defer_call(&luqi.dc, sizeof luqi);
|
|
}
|
|
|
|
/**
|
|
* lfuc_finished - auxiliary routine for prune event
|
|
* @c: usecount structure
|
|
*
|
|
* This routine simply waits until all unlockers finish their job and leave
|
|
* the critical section of lfuc_unlock(). Then we decide whether the usecount
|
|
* is indeed zero or not, and therefore whether the structure is free to be freed.
|
|
*/
|
|
static inline bool
|
|
lfuc_finished(struct lfuc *c)
|
|
{
|
|
u64 uc;
|
|
/* Wait until all unlockers finish */
|
|
while ((uc = atomic_load_explicit(&c->uc, memory_order_acquire)) >> LFUC_PU_SHIFT)
|
|
birdloop_yield();
|
|
|
|
/* All of them are now done and if the usecount is now zero, then we're
|
|
* the last place to reference the object and we can call it finished. */
|
|
return (uc == 0);
|
|
}
|
|
|
|
/**
|
|
* lfuc_init - auxiliary routine for usecount initialization
|
|
* @c: usecount structure
|
|
*
|
|
* Called on object initialization, sets the usecount to an initial one to make
|
|
* sure that the prune routine doesn't free it before somebody else references it.
|
|
*/
|
|
static inline void
|
|
lfuc_init(struct lfuc *c)
|
|
{
|
|
atomic_store_explicit(&c->uc, 1, memory_order_release);
|
|
}
|
|
|
|
|
|
/**
|
|
* Lock-free journal.
|
|
*/
|
|
|
|
/* Journal item. Put LFJOUR_ITEM_INHERIT(name) into your structure
|
|
* to inherit lfjour_item */
|
|
#define LFJOUR_ITEM \
|
|
u64 seq; \
|
|
|
|
struct lfjour_item {
|
|
LFJOUR_ITEM;
|
|
};
|
|
|
|
#define LFJOUR_ITEM_INHERIT(name) union { \
|
|
struct lfjour_item name; \
|
|
struct { LFJOUR_ITEM; }; \
|
|
}
|
|
|
|
/* Journal item block. Internal structure, no need to check out. */
|
|
#define TLIST_PREFIX lfjour_block
|
|
#define TLIST_TYPE struct lfjour_block
|
|
#define TLIST_ITEM n
|
|
#define TLIST_WANT_ADD_TAIL
|
|
|
|
struct lfjour_block {
|
|
TLIST_DEFAULT_NODE;
|
|
_Atomic u32 end;
|
|
_Atomic bool not_last;
|
|
|
|
struct lfjour_item _block[0];
|
|
};
|
|
|
|
/* Defines lfjour_block_list */
|
|
#include "lib/tlists.h"
|
|
|
|
/* Journal recipient. Inherit this in your implementation. */
|
|
#define TLIST_PREFIX lfjour_recipient
|
|
#define TLIST_TYPE struct lfjour_recipient
|
|
#define TLIST_ITEM n
|
|
#define TLIST_WANT_ADD_TAIL
|
|
#define TLIST_WANT_WALK
|
|
|
|
struct lfjour_recipient {
|
|
TLIST_DEFAULT_NODE;
|
|
event *event; /* Event running when something is in the journal */
|
|
event_list *target; /* Event target */
|
|
const struct lfjour_item * _Atomic last; /* Last item processed */
|
|
u64 first_holding_seq; /* First item not released yet */
|
|
struct lfjour_item *cur; /* Processing this now */
|
|
_Atomic u64 recipient_flags; /* LFJOUR_R_* */
|
|
};
|
|
|
|
enum lfjour_recipient_flags {
|
|
LFJOUR_R_SEQ_RESET = 1, /* Signalling of sequence number reset */
|
|
LFJOUR_R_LAST_RUNNER = 2, /* Set if this recipient is supposed to ping cleanup hook */
|
|
};
|
|
|
|
/* Defines lfjour_recipient_list */
|
|
#include "lib/tlists.h"
|
|
|
|
/* Journal base structure. Include this. */
|
|
struct lfjour {
|
|
struct domain_generic *domain; /* The journal itself belongs to this domain (if different from the loop) */
|
|
struct birdloop *loop; /* Cleanup loop */
|
|
u32 item_size, item_count; /* Allocation parameters */
|
|
struct lfjour_block_list pending; /* List of packed journal blocks */
|
|
struct lfjour_item * _Atomic first; /* First journal item to announce */
|
|
struct lfjour_item *open; /* Journal item in progress */
|
|
u64 next_seq; /* Next export to push has this ID */
|
|
struct lfjour_recipient_list recipients; /* Announce updates to these */
|
|
event announce_kick_event; /* Kicks announce_timer */
|
|
struct settle announce_timer; /* Announces changes to recipients */
|
|
event cleanup_event; /* Runs the journal cleanup routine */
|
|
u64 max_tokens; /* Maximum number of cleanup tokens to issue */
|
|
_Atomic u64 issued_tokens; /* Current count of issued tokens */
|
|
|
|
/* Callback on item removal from journal */
|
|
void (*item_done)(struct lfjour *, struct lfjour_item *);
|
|
|
|
/* Callback when the cleanup routine is ending */
|
|
void (*cleanup_done)(struct lfjour *, u64 begin_seq, u64 end_seq);
|
|
};
|
|
|
|
struct lfjour_item *lfjour_push_prepare(struct lfjour *);
|
|
void lfjour_push_commit(struct lfjour *);
|
|
|
|
struct lfjour_item *lfjour_get(struct lfjour_recipient *);
|
|
void lfjour_release(struct lfjour_recipient *, const struct lfjour_item *);
|
|
static inline bool lfjour_reset_seqno(struct lfjour_recipient *r)
|
|
{
|
|
return atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_SEQ_RESET, memory_order_acq_rel) & LFJOUR_R_SEQ_RESET;
|
|
}
|
|
|
|
void lfjour_announce_now(struct lfjour *);
|
|
u64 lfjour_pending_items(struct lfjour *);
|
|
|
|
static inline void lfjour_schedule_cleanup(struct lfjour *j)
|
|
{ ev_send_loop(j->loop, &j->cleanup_event); }
|
|
|
|
static inline void lfjour_do_cleanup_now(struct lfjour *j)
|
|
{
|
|
/* This requires the caller to own the cleanup event loop */
|
|
ev_postpone(&j->cleanup_event);
|
|
j->cleanup_event.hook(j->cleanup_event.data);
|
|
}
|
|
|
|
void lfjour_register(struct lfjour *, struct lfjour_recipient *);
|
|
void lfjour_unregister(struct lfjour_recipient *);
|
|
static inline uint lfjour_count_recipients(struct lfjour *j)
|
|
{ return TLIST_LENGTH(lfjour_recipient, &j->recipients); }
|
|
|
|
void lfjour_init(struct lfjour *, struct settle_config *);
|
|
|
|
|
|
static inline struct lfjour *lfjour_of_recipient(struct lfjour_recipient *r)
|
|
{
|
|
struct lfjour_recipient_list *list = lfjour_recipient_enlisted(r);
|
|
return list ? SKIP_BACK(struct lfjour, recipients, list) : NULL;
|
|
}
|
|
#endif
|