mirror of
https://gitlab.nic.cz/labs/bird.git
synced 2024-12-22 01:31:55 +00:00
b84460432a
reached end of a block of journal items or read all of journal items. lfjour_cleanup_hook() can clean only journal items every recipient has processed, so it was often called uselessly. This commit restricts most of the unusefull scheduling. Only some consumers are given a token alowing them to try to schedule the lfjour_cleanup_hook(). When a consumer wants to schedule the lfjour_cleanup_hook(), it checks whether it has a token. If yes, it decrements number of tokens the journal has given (issued_tokens) and discards its own token. If issued_tokens reaches zero, the consumer is allowed do schedule the lfjour_cleanup_hook(). There is a maximum number of tokens a journal can give to its customers (max_tokens). A new customer is given a token in its init, if the maximum number of tokens was not reached. The rest of tokens is given to customers in lfjour_cleanup_hook(). In lf_jour_cleanup_hook(), it is increased the issued_tokens number in order not to call the hook before it finishes. Then, tokens are given to the slowest recipients (but never to more than max_token recipients). Before leaving lfjour_cleanup_hook(), the issued_tokens number is decreased. If no other tokens are given, we have to make sure the lfjour_cleanup_hook will be called again. If every item in journal was read by every recipient, tokens are given to random recipients. If all recipients with tokens managed to finish until now, we give the token to first unfinished customer we find or we call the hook again.
607 lines
17 KiB
C
607 lines
17 KiB
C
/*
|
|
* BIRD Library -- Generic lock-free structures
|
|
*
|
|
* (c) 2023--2024 Maria Matejka <mq@jmq.cz>
|
|
* (c) 2023--2024 CZ.NIC, z.s.p.o.
|
|
*
|
|
* Can be freely distributed and used under the terms of the GNU GPL.
|
|
*/
|
|
|
|
#include "lib/birdlib.h"
|
|
#include "lib/lockfree.h"
|
|
|
|
#define LOCAL_DEBUG
|
|
|
|
void lfuc_unlock_deferred(struct deferred_call *dc)
|
|
{
|
|
SKIP_BACK_DECLARE(struct lfuc_unlock_queue_item, luqi, dc, dc);
|
|
lfuc_unlock_immediately(luqi->c, luqi->el, luqi->ev);
|
|
}
|
|
|
|
#if 0
|
|
#define lfjour_debug(...) log(L_TRACE __VA_ARGS__)
|
|
#define lfjour_debug_detailed(...) log(L_TRACE __VA_ARGS__)
|
|
#elif 0
|
|
#define lfjour_debug(...) log(L_TRACE __VA_ARGS__)
|
|
#define lfjour_debug_detailed(...)
|
|
#else
|
|
#define lfjour_debug(...)
|
|
#define lfjour_debug_detailed(...)
|
|
#endif
|
|
|
|
#define LBI(j, b, p) ((struct lfjour_item *)(((void *) (b)->_block) + ((j)->item_size * (p))))
|
|
#define LBP(j, b, i) ({ \
|
|
off_t off = ((void *) (i)) - ((void *) (b)->_block); \
|
|
u32 s = (j)->item_size; \
|
|
ASSERT_DIE(off < page_size); \
|
|
ASSERT_DIE((off % s) == 0); \
|
|
off / s; \
|
|
})
|
|
|
|
|
|
static void
|
|
lfjour_return_cleanup_token(struct lfjour *j, struct lfjour_recipient *r)
|
|
{
|
|
/* Eligible for requesting cleanup */
|
|
if (!(atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER))
|
|
return;
|
|
|
|
/* Return the lastrunner's token. */
|
|
u64 pings = atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
|
|
|
|
/* Is this the last token? */
|
|
if (pings > 1)
|
|
return;
|
|
|
|
ASSERT_DIE(pings != 0);
|
|
|
|
/* No more cleanup tokens issued, request cleanup. */
|
|
lfjour_schedule_cleanup(j);
|
|
}
|
|
|
|
struct lfjour_item *
|
|
lfjour_push_prepare(struct lfjour *j)
|
|
{
|
|
ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
|
|
ASSERT_DIE(!j->open);
|
|
|
|
if (EMPTY_TLIST(lfjour_block, &j->pending) &&
|
|
EMPTY_TLIST(lfjour_recipient, &j->recipients))
|
|
return NULL;
|
|
|
|
struct lfjour_block *block = NULL;
|
|
u32 end = 0;
|
|
|
|
if (!EMPTY_TLIST(lfjour_block, &j->pending))
|
|
{
|
|
block = j->pending.last;
|
|
end = atomic_load_explicit(&block->end, memory_order_relaxed);
|
|
if (end >= j->item_count)
|
|
{
|
|
ASSERT_DIE(end == j->item_count);
|
|
block = NULL;
|
|
end = 0;
|
|
}
|
|
}
|
|
|
|
if (!block)
|
|
{
|
|
block = alloc_page();
|
|
lfjour_debug("lfjour(%p)_push_prepare: allocating block %p", j, block);
|
|
*block = (struct lfjour_block) {};
|
|
lfjour_block_add_tail(&j->pending, block);
|
|
}
|
|
|
|
struct lfjour_item *i = LBI(j, block, end);
|
|
*i = (struct lfjour_item) {
|
|
.seq = j->next_seq++,
|
|
};
|
|
|
|
return j->open = i;
|
|
}
|
|
|
|
void
|
|
lfjour_push_commit(struct lfjour *j)
|
|
{
|
|
ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
|
|
ASSERT_DIE(j->open);
|
|
struct lfjour_block *b = PAGE_HEAD(j->open);
|
|
ASSERT_DIE(b == j->pending.last);
|
|
|
|
lfjour_debug("lfjour(%p)_push_commit of %p, seq=%lu", j, j->open, j->open->seq);
|
|
|
|
u32 end = atomic_fetch_add_explicit(&b->end, 1, memory_order_release);
|
|
ASSERT_DIE(j->open == LBI(j, b, end));
|
|
|
|
if (end == 0)
|
|
{
|
|
struct lfjour_block *prev = b->n.prev;
|
|
bool f = 0;
|
|
if (prev)
|
|
ASSERT_DIE(atomic_compare_exchange_strong_explicit(&prev->not_last, &f, 1,
|
|
memory_order_release, memory_order_relaxed));
|
|
}
|
|
|
|
/* Store the first item to announce (only if this is actually the first one). */
|
|
struct lfjour_item *null_item = NULL;
|
|
if (atomic_compare_exchange_strong_explicit(
|
|
&j->first, &null_item, j->open,
|
|
memory_order_acq_rel, memory_order_relaxed))
|
|
{
|
|
lfjour_debug("lfjour(%p) first set", j);
|
|
}
|
|
|
|
j->open = NULL;
|
|
|
|
if (!ev_active(&j->announce_kick_event))
|
|
ev_send_loop(j->loop, &j->announce_kick_event);
|
|
}
|
|
|
|
static struct lfjour_item *
|
|
lfjour_get_next(struct lfjour *j, const struct lfjour_item *last)
|
|
{
|
|
/* This is lockless, no domain checks. */
|
|
if (!last)
|
|
{
|
|
struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire);
|
|
return first;
|
|
}
|
|
|
|
struct lfjour_block *block = PAGE_HEAD(last);
|
|
ASSERT_DIE(block);
|
|
u32 end = atomic_load_explicit(&block->end, memory_order_acquire);
|
|
u32 pos = LBP(j, block, last);
|
|
ASSERT_DIE(pos < end);
|
|
|
|
/* Next is in the same block. */
|
|
if (++pos < end)
|
|
return LBI(j, block, pos);
|
|
|
|
/* There is another block. */
|
|
if (atomic_load_explicit(&block->not_last, memory_order_acquire))
|
|
{
|
|
/* To avoid rare race conditions, we shall check the current block end once again */
|
|
u32 new_end = atomic_load_explicit(&block->end, memory_order_acquire);
|
|
ASSERT_DIE(new_end >= end);
|
|
if (new_end > end)
|
|
return LBI(j, block, pos);
|
|
|
|
/* Nothing in the previous one, let's move to the next block.
|
|
* This is OK to do non-atomically because of the not_last flag. */
|
|
block = block->n.next;
|
|
return LBI(j, block, 0);
|
|
}
|
|
|
|
/* There is nothing more. */
|
|
return NULL;
|
|
}
|
|
|
|
struct lfjour_item *
|
|
lfjour_get(struct lfjour_recipient *r)
|
|
{
|
|
struct lfjour *j = lfjour_of_recipient(r);
|
|
|
|
const struct lfjour_item *last = r->cur;
|
|
struct lfjour_item *next = NULL;
|
|
|
|
if (last)
|
|
next = lfjour_get_next(j, r->cur);
|
|
else
|
|
{
|
|
/* The last pointer may get cleaned up under our hands.
|
|
* Indicating that we're using it, by RCU read. */
|
|
|
|
rcu_read_lock();
|
|
last = atomic_load_explicit(&r->last, memory_order_acquire);
|
|
next = lfjour_get_next(j, last);
|
|
rcu_read_unlock();
|
|
}
|
|
|
|
if (last)
|
|
{
|
|
lfjour_debug_detailed("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, last %p",
|
|
j, r, next, next ? next->seq : 0ULL, last);
|
|
}
|
|
else
|
|
{
|
|
lfjour_debug("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, clean",
|
|
j, r, next, next ? next->seq : 0ULL);
|
|
}
|
|
|
|
if (!next)
|
|
return NULL;
|
|
|
|
if (!r->first_holding_seq)
|
|
r->first_holding_seq = next->seq;
|
|
|
|
return r->cur = next;
|
|
}
|
|
|
|
void lfjour_release(struct lfjour_recipient *r, const struct lfjour_item *it)
|
|
{
|
|
/* Find out what we actually released last */
|
|
rcu_read_lock();
|
|
const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
|
|
struct lfjour_block *last_block = last ? PAGE_HEAD(last) : NULL;
|
|
rcu_read_unlock();
|
|
|
|
/* This is lockless, no domain checks. */
|
|
ASSERT_DIE(r->cur);
|
|
|
|
/* Partial or full release? */
|
|
ASSERT_DIE(r->first_holding_seq);
|
|
ASSERT_DIE(it->seq >= r->first_holding_seq);
|
|
if (it->seq < r->cur->seq)
|
|
{
|
|
lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, partial upto seq=%lu",
|
|
j, r, it, it->seq);
|
|
r->first_holding_seq = it->seq + 1;
|
|
atomic_store_explicit(&r->last, it, memory_order_release);
|
|
return;
|
|
}
|
|
|
|
struct lfjour_block *block = PAGE_HEAD(r->cur);
|
|
u32 end = atomic_load_explicit(&block->end, memory_order_acquire);
|
|
|
|
struct lfjour *j = lfjour_of_recipient(r);
|
|
u32 pos = LBP(j, block, r->cur);
|
|
ASSERT_DIE(pos < end);
|
|
|
|
/* Releasing this export for cleanup routine */
|
|
if (pos + 1 == end)
|
|
{
|
|
lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, seq=%lu (end)",
|
|
j, r, r->cur, r->cur->seq);
|
|
}
|
|
else
|
|
{
|
|
lfjour_debug_detailed("lfjour(%p)_release(recipient=%p) of %p, seq=%lu (mid)",
|
|
j, r, r->cur, r->cur->seq);
|
|
}
|
|
|
|
atomic_store_explicit(&r->last, r->cur, memory_order_release);
|
|
|
|
/* The last block may be available to free */
|
|
if ((pos + 1 == end) || last && (last_block != block))
|
|
lfjour_return_cleanup_token(j, r);
|
|
|
|
r->first_holding_seq = 0;
|
|
r->cur = NULL;
|
|
}
|
|
|
|
void
|
|
lfjour_announce_now(struct lfjour *j)
|
|
{
|
|
ASSERT_DIE(birdloop_inside(j->loop));
|
|
settle_cancel(&j->announce_timer);
|
|
ev_postpone(&j->announce_kick_event);
|
|
|
|
if (EMPTY_TLIST(lfjour_recipient, &j->recipients))
|
|
return lfjour_schedule_cleanup(j);
|
|
|
|
WALK_TLIST(lfjour_recipient, r, &j->recipients)
|
|
if (r->event)
|
|
ev_send(r->target, r->event);
|
|
}
|
|
|
|
static void
|
|
lfjour_announce_settle_hook(struct settle *s)
|
|
{
|
|
return lfjour_announce_now(SKIP_BACK(struct lfjour, announce_timer, s));
|
|
}
|
|
|
|
static void
|
|
lfjour_announce_kick_hook(void *_j)
|
|
{
|
|
struct lfjour *j = _j;
|
|
settle_kick(&j->announce_timer, j->loop);
|
|
}
|
|
|
|
u64
|
|
lfjour_pending_items(struct lfjour *j)
|
|
{
|
|
ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
|
|
|
|
struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_relaxed);
|
|
if (!first)
|
|
return 0;
|
|
|
|
ASSERT_DIE(j->next_seq > first->seq);
|
|
return j->next_seq - first->seq;
|
|
}
|
|
|
|
void
|
|
lfjour_register(struct lfjour *j, struct lfjour_recipient *r)
|
|
{
|
|
ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
|
|
ASSERT_DIE(!r->event == !r->target);
|
|
|
|
atomic_store_explicit(&r->last, NULL, memory_order_relaxed);
|
|
ASSERT_DIE(!r->cur);
|
|
|
|
lfjour_recipient_add_tail(&j->recipients, r);
|
|
|
|
if (j->issued_tokens < j->max_tokens)
|
|
{
|
|
/* Cleanup hook does not run, so we are sure j->issued_tokens is not increasing */
|
|
atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
|
|
atomic_fetch_or_explicit(&r->recipient_flags, LFJOUR_R_LAST_RUNNER, memory_order_acq_rel);
|
|
}
|
|
}
|
|
|
|
void
|
|
lfjour_unregister(struct lfjour_recipient *r)
|
|
{
|
|
struct lfjour *j = lfjour_of_recipient(r);
|
|
ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
|
|
|
|
if (r->cur)
|
|
lfjour_release(r, r->cur);
|
|
|
|
lfjour_recipient_rem_node(&j->recipients, r);
|
|
lfjour_return_cleanup_token(j, r);
|
|
}
|
|
|
|
static inline void lfjour_cleanup_unlock_helper(struct domain_generic **dg)
|
|
{
|
|
if (!*dg) return;
|
|
DG_UNLOCK(*dg);
|
|
}
|
|
|
|
static bool
|
|
lfjour_issue_cleanup_token(struct lfjour *j, struct lfjour_recipient* rec)
|
|
{
|
|
/* This function keeps the invariant that the total number of issued
|
|
* LFJOUR_R_LAST_RUNNER flags is the same as j->issued_tokens.
|
|
*
|
|
* Returs true if the token was successfully issued.
|
|
*/
|
|
|
|
/* The journal is not empty and the recipient is on seq_max,
|
|
* so it won't call cleanup */
|
|
const struct lfjour_item *last = atomic_load_explicit(&rec->last, memory_order_acquire);
|
|
if (last && last->seq == (j->next_seq - 1))
|
|
return false;
|
|
|
|
/* Take a token from the pile */
|
|
if (atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel) >= j->max_tokens)
|
|
{
|
|
/* No more tokens to issue, already at max */
|
|
atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
|
|
return false;
|
|
}
|
|
|
|
/* Trying to give the token */
|
|
if (atomic_fetch_or_explicit(&rec->recipient_flags, LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER)
|
|
{
|
|
/* This recipient already has the token */
|
|
atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
|
|
return false;
|
|
}
|
|
|
|
/* Has't the recipient finished inbetween? (Recheck.) */
|
|
last = atomic_load_explicit(&rec->last, memory_order_acquire);
|
|
if (last && last->seq == (j->next_seq - 1))
|
|
{
|
|
/* It has! Retreat! */
|
|
if (atomic_fetch_and_explicit(&rec->recipient_flags, ~LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER)
|
|
{
|
|
/* The flag was still set, we managed to get it back safely.
|
|
* Now the recipient won't call the cleanup. */
|
|
atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
|
|
return false;
|
|
}
|
|
|
|
/* The recipient was quick enough to finish the task and
|
|
* grab the flag. Now we are going to get pinged anyway
|
|
* so we consider the flag to be issued. */
|
|
}
|
|
|
|
/* Now the token is issued. */
|
|
return true;
|
|
}
|
|
|
|
static void
|
|
lfjour_cleanup_done(struct lfjour *j)
|
|
{
|
|
/* Returning the cleanup token. */
|
|
u64 pings = atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
|
|
|
|
/* Somebody else is also holding a token, nothing to do.
|
|
* They'll call us when the time comes. */
|
|
if (pings > 1)
|
|
return;
|
|
|
|
ASSERT_DIE(pings == 1);
|
|
|
|
/* We'll need to know whether the journal is empty or not. */
|
|
struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire);
|
|
|
|
/* No recipients, schedule one more cleanup if not empty */
|
|
if (EMPTY_TLIST(lfjour_recipient, &j->recipients))
|
|
{
|
|
if (first)
|
|
lfjour_schedule_cleanup(j);
|
|
return;
|
|
}
|
|
|
|
/* There are some recipients but nothing to clean.
|
|
* Somebody is going to wake us, let's just throw
|
|
* the token to the crowd. */
|
|
if (!first)
|
|
{
|
|
WALK_TLIST(lfjour_recipient, r, &j->recipients)
|
|
/* If we failed to issue a cleanup token, there is definitely
|
|
* somebody holding it, no more tokens needed. */
|
|
if (! lfjour_issue_cleanup_token(j, r))
|
|
{
|
|
ASSERT_DIE(atomic_load_explicit(&j->issued_tokens, memory_order_acquire) > 0);
|
|
return;
|
|
}
|
|
}
|
|
|
|
/* We have to find some recipient which has not
|
|
* yet finished. When we find it, it will happily
|
|
* accept the token so we are done. */
|
|
WALK_TLIST(lfjour_recipient, r, &j->recipients)
|
|
if (lfjour_issue_cleanup_token(j, r))
|
|
return;
|
|
|
|
/* Nobody needs anything, but the journal is not empty.
|
|
* Run cleanup again. */
|
|
lfjour_schedule_cleanup(j);
|
|
}
|
|
|
|
static void
|
|
lfjour_cleanup_hook(void *_j)
|
|
{
|
|
struct lfjour *j = _j;
|
|
|
|
CLEANUP(lfjour_cleanup_unlock_helper) struct domain_generic *_locked = j->domain;
|
|
if (_locked) DG_LOCK(_locked);
|
|
|
|
/* We consider ourselves holding one token to keep recipients
|
|
* from scheduling us too early. */
|
|
atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
|
|
|
|
u64 min_seq = ~((u64) 0);
|
|
const struct lfjour_item *last_item_to_free = NULL;
|
|
struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire);
|
|
|
|
if (!first)
|
|
{
|
|
lfjour_cleanup_done(j);
|
|
|
|
/* Nothing to cleanup, actually, just call the done callback */
|
|
ASSERT_DIE(EMPTY_TLIST(lfjour_block, &j->pending));
|
|
CALL(j->cleanup_done, j, 0, ~((u64) 0));
|
|
|
|
return;
|
|
}
|
|
|
|
WALK_TLIST(lfjour_recipient, r, &j->recipients)
|
|
{
|
|
const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
|
|
|
|
if (!last)
|
|
{
|
|
/* No last export means that the channel has exported nothing since
|
|
* last cleanup. Let's try to give them the token,
|
|
* and we're done anyway. */
|
|
lfjour_issue_cleanup_token(j, r);
|
|
lfjour_cleanup_done(j);
|
|
return;
|
|
}
|
|
|
|
else if (min_seq > last->seq)
|
|
{
|
|
min_seq = last->seq;
|
|
last_item_to_free = last;
|
|
}
|
|
}
|
|
|
|
/* Here we're sure that no receiver is going to use the first pointer soon.
|
|
* It is only used when the receiver's last pointer is NULL, which is avoided by the code above.
|
|
* Thus, we can just move the journal's first pointer forward. */
|
|
struct lfjour_item *next = last_item_to_free ? lfjour_get_next(j, last_item_to_free) : NULL;
|
|
atomic_store_explicit(&j->first, next, memory_order_release);
|
|
|
|
lfjour_debug("lfjour(%p) set first=%p (was %p)", j, next, first);
|
|
|
|
WALK_TLIST(lfjour_recipient, r, &j->recipients)
|
|
{
|
|
const struct lfjour_item *last = last_item_to_free;
|
|
/* This either succeeds if this item is the most-behind-one,
|
|
* or fails and gives us the actual last for debug output. */
|
|
if (atomic_compare_exchange_strong_explicit(
|
|
&r->last, &last, NULL,
|
|
memory_order_acq_rel, memory_order_acquire))
|
|
{
|
|
lfjour_debug("lfjour(%p)_cleanup(recipient=%p): store last=NULL", j, r);
|
|
|
|
/* The most-behind-one gets the cleanup token */
|
|
lfjour_issue_cleanup_token(j, r);
|
|
}
|
|
else
|
|
{
|
|
lfjour_debug("lfjour(%p)_cleanup(recipient=%p): keep last=%p", j, r, last);
|
|
}
|
|
}
|
|
|
|
/* Now some recipients may have old last-pointers. We have to wait
|
|
* until they finish their routine, before we start cleaning up. */
|
|
synchronize_rcu();
|
|
|
|
u64 orig_first_seq = first->seq;
|
|
|
|
/* Now we do the actual cleanup */
|
|
while (first && (first->seq <= min_seq))
|
|
{
|
|
j->item_done(j, first);
|
|
|
|
/* Find next journal item */
|
|
struct lfjour_item *next = lfjour_get_next(j, first);
|
|
if (PAGE_HEAD(next) != PAGE_HEAD(first))
|
|
{
|
|
/* This was the last one in its block */
|
|
struct lfjour_block *block = PAGE_HEAD(first);
|
|
lfjour_debug("lfjour(%p)_cleanup: freeing block %p", j, block);
|
|
ASSERT_DIE(block == j->pending.first);
|
|
|
|
/* Free this block */
|
|
lfjour_block_rem_node(&j->pending, block);
|
|
|
|
/* Wait for possible pending readers of the block */
|
|
synchronize_rcu();
|
|
|
|
/* Now we can finally drop the block */
|
|
#ifdef LOCAL_DEBUG
|
|
memset(block, 0xbe, page_size);
|
|
#endif
|
|
free_page(block);
|
|
|
|
/* If no more blocks are remaining, we shall reset
|
|
* the sequence numbers */
|
|
|
|
if (EMPTY_TLIST(lfjour_block, &j->pending))
|
|
{
|
|
lfjour_debug("lfjour(%p)_cleanup: seq reset", j);
|
|
WALK_TLIST(lfjour_recipient, r, &j->recipients)
|
|
atomic_fetch_or_explicit(&r->recipient_flags, LFJOUR_R_SEQ_RESET, memory_order_acq_rel);
|
|
|
|
j->next_seq = 1;
|
|
}
|
|
}
|
|
|
|
/* And now move on to the next item */
|
|
first = next;
|
|
}
|
|
|
|
lfjour_cleanup_done(j);
|
|
|
|
CALL(j->cleanup_done, j, orig_first_seq, first ? first->seq : ~((u64) 0));
|
|
}
|
|
|
|
void
|
|
lfjour_init(struct lfjour *j, struct settle_config *scf)
|
|
{
|
|
/* Expecting all other fields to be initialized to zeroes by the caller */
|
|
ASSERT_DIE(j->loop);
|
|
ASSERT_DIE(j->item_size >= sizeof(struct lfjour_item));
|
|
|
|
j->item_size = BIRD_CPU_ALIGN(j->item_size);
|
|
j->item_count = (page_size - sizeof(struct lfjour_block)) / j->item_size;
|
|
|
|
j->next_seq = 1;
|
|
j->announce_kick_event = (event) {
|
|
.hook = lfjour_announce_kick_hook,
|
|
.data = j,
|
|
};
|
|
j->announce_timer = SETTLE_INIT(scf, lfjour_announce_settle_hook, j);
|
|
j->cleanup_event = (event) {
|
|
.hook = lfjour_cleanup_hook,
|
|
.data = j,
|
|
};
|
|
j->max_tokens = 20;
|
|
}
|