From d503204aa0773e847938ddd5d587b1ed20b99ed1 Mon Sep 17 00:00:00 2001 From: Katerina Kubecova Date: Fri, 6 Dec 2024 13:16:18 +0100 Subject: [PATCH] Lockfree journal: Cleanup hook runs only when needed. The function lfjour_cleanup_hook() was scheduled each time any of the journal recipients reached end of a block of journal items or read all of journal items. Because lfjour_cleanup_hook() can clean only journal items every recipient has processed, it was often called uselessly. This commit restricts most of the unuseful scheduling. Only some recipients are given a token alowing them to try to schedule the cleanup hook. When a recipient wants to schedule the cleanup hook, it checks whether it has a token. If yes, it decrements number of tokens the journal has given (issued_tokens) and discards its own token. If issued_tokens reaches zero, the recipient is allowed to schedule the cleanup hook. There is a maximum number of tokens a journal can give to its recipients (max_tokens). A new recipient is given a token in its init, unless the maximum number of tokens is reached. The rest of tokens is given to customers in lfjour_cleanup_hook(). In the cleanup hook, the issued_tokens number is increased in order to avoid calling the hook before it finishes. Then, tokens are given to the slowest recipients (but never to more than max_token recipients). Before leaving lfjour_cleanup_hook(), the issued_tokens number is decreased back. If no other tokens are given, we have to make sure the lfjour_cleanup_hook will be called again. If every item in journal was read by every recipient, tokens are given to random recipients. If all recipients with tokens managed to finish until now, we give the token to the first unfinished customer we find, or we just call the hook again. --- lib/lockfree.c | 159 +++++++++++++++++++++++++++++++++++++++++++++++-- lib/lockfree.h | 3 + 2 files changed, 158 insertions(+), 4 deletions(-) diff --git a/lib/lockfree.c b/lib/lockfree.c index 2d57b46a..c68899d3 100644 --- a/lib/lockfree.c +++ b/lib/lockfree.c @@ -38,6 +38,27 @@ void lfuc_unlock_deferred(struct deferred_call *dc) off / s; \ }) + +static void +lfjour_return_cleanup_token(struct lfjour *j, struct lfjour_recipient *r) +{ + /* Eligible for requesting cleanup */ + if (!(atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER)) + return; + + /* Return the lastrunner's token. */ + u64 pings = atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + + /* Is this the last token? */ + if (pings > 1) + return; + + ASSERT_DIE(pings != 0); + + /* No more cleanup tokens issued, request cleanup. */ + lfjour_schedule_cleanup(j); +} + struct lfjour_item * lfjour_push_prepare(struct lfjour *j) { @@ -228,7 +249,7 @@ void lfjour_release(struct lfjour_recipient *r, const struct lfjour_item *it) /* Releasing this export for cleanup routine */ if (pos + 1 == end) - { +{ lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, seq=%lu (end)", j, r, r->cur, r->cur->seq); } @@ -242,7 +263,7 @@ void lfjour_release(struct lfjour_recipient *r, const struct lfjour_item *it) /* The last block may be available to free */ if ((pos + 1 == end) || last && (last_block != block)) - lfjour_schedule_cleanup(j); + lfjour_return_cleanup_token(j, r); r->first_holding_seq = 0; r->cur = NULL; @@ -299,6 +320,13 @@ lfjour_register(struct lfjour *j, struct lfjour_recipient *r) ASSERT_DIE(!r->cur); lfjour_recipient_add_tail(&j->recipients, r); + + if (j->issued_tokens < j->max_tokens) + { + /* Cleanup hook does not run, so we are sure j->issued_tokens is not increasing */ + atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + atomic_fetch_or_explicit(&r->recipient_flags, LFJOUR_R_LAST_RUNNER, memory_order_acq_rel); + } } void @@ -311,7 +339,7 @@ lfjour_unregister(struct lfjour_recipient *r) lfjour_release(r, r->cur); lfjour_recipient_rem_node(&j->recipients, r); - lfjour_schedule_cleanup(j); + lfjour_return_cleanup_token(j, r); } static inline void lfjour_cleanup_unlock_helper(struct domain_generic **dg) @@ -320,6 +348,110 @@ static inline void lfjour_cleanup_unlock_helper(struct domain_generic **dg) DG_UNLOCK(*dg); } +static bool +lfjour_issue_cleanup_token(struct lfjour *j, struct lfjour_recipient* rec) +{ + /* This function keeps the invariant that the total number of issued + * LFJOUR_R_LAST_RUNNER flags is the same as j->issued_tokens. + * + * Returs true if the token was successfully issued. + */ + + /* The journal is not empty and the recipient is on seq_max, + * so it won't call cleanup */ + const struct lfjour_item *last = atomic_load_explicit(&rec->last, memory_order_acquire); + if (last && last->seq == (j->next_seq - 1)) + return false; + + /* Take a token from the pile */ + if (atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel) >= j->max_tokens) + { + /* No more tokens to issue, already at max */ + atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + return false; + } + + /* Trying to give the token */ + if (atomic_fetch_or_explicit(&rec->recipient_flags, LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER) + { + /* This recipient already has the token */ + atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + return false; + } + + /* Has't the recipient finished inbetween? (Recheck.) */ + last = atomic_load_explicit(&rec->last, memory_order_acquire); + if (last && last->seq == (j->next_seq - 1)) + { + /* It has! Retreat! */ + if (atomic_fetch_and_explicit(&rec->recipient_flags, ~LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER) + { + /* The flag was still set, we managed to get it back safely. + * Now the recipient won't call the cleanup. */ + atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + return false; + } + + /* The recipient was quick enough to finish the task and + * grab the flag. Now we are going to get pinged anyway + * so we consider the flag to be issued. */ + } + + /* Now the token is issued. */ + return true; +} + +static void +lfjour_cleanup_done(struct lfjour *j) +{ + /* Returning the cleanup token. */ + u64 pings = atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + + /* Somebody else is also holding a token, nothing to do. + * They'll call us when the time comes. */ + if (pings > 1) + return; + + ASSERT_DIE(pings == 1); + + /* We'll need to know whether the journal is empty or not. */ + struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire); + + /* No recipients, schedule one more cleanup if not empty */ + if (EMPTY_TLIST(lfjour_recipient, &j->recipients)) + { + if (first) + lfjour_schedule_cleanup(j); + return; + } + + /* There are some recipients but nothing to clean. + * Somebody is going to wake us, let's just throw + * the token to the crowd. */ + if (!first) + { + WALK_TLIST(lfjour_recipient, r, &j->recipients) + /* If we failed to issue a cleanup token, there is definitely + * somebody holding it, no more tokens needed. */ + if (! lfjour_issue_cleanup_token(j, r)) + { + ASSERT_DIE(atomic_load_explicit(&j->issued_tokens, memory_order_acquire) > 0); + return; + } + } + + /* We have to find some recipient which has not + * yet finished. When we find it, it will happily + * accept the token so we are done. */ + WALK_TLIST(lfjour_recipient, r, &j->recipients) + if (lfjour_issue_cleanup_token(j, r)) + return; + + /* Nobody needs anything, but the journal is not empty. + * Run cleanup again. */ + lfjour_schedule_cleanup(j); +} + static void lfjour_cleanup_hook(void *_j) { @@ -328,15 +460,22 @@ lfjour_cleanup_hook(void *_j) CLEANUP(lfjour_cleanup_unlock_helper) struct domain_generic *_locked = j->domain; if (_locked) DG_LOCK(_locked); + /* We consider ourselves holding one token to keep recipients + * from scheduling us too early. */ + atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + u64 min_seq = ~((u64) 0); const struct lfjour_item *last_item_to_free = NULL; struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire); if (!first) { + lfjour_cleanup_done(j); + /* Nothing to cleanup, actually, just call the done callback */ ASSERT_DIE(EMPTY_TLIST(lfjour_block, &j->pending)); CALL(j->cleanup_done, j, 0, ~((u64) 0)); + return; } @@ -345,8 +484,14 @@ lfjour_cleanup_hook(void *_j) const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire); if (!last) - /* No last export means that the channel has exported nothing since last cleanup */ + { + /* No last export means that the channel has exported nothing since + * last cleanup. Let's try to give them the token, + * and we're done anyway. */ + lfjour_issue_cleanup_token(j, r); + lfjour_cleanup_done(j); return; + } else if (min_seq > last->seq) { @@ -373,6 +518,9 @@ lfjour_cleanup_hook(void *_j) memory_order_acq_rel, memory_order_acquire)) { lfjour_debug("lfjour(%p)_cleanup(recipient=%p): store last=NULL", j, r); + + /* The most-behind-one gets the cleanup token */ + lfjour_issue_cleanup_token(j, r); } else { @@ -429,6 +577,8 @@ lfjour_cleanup_hook(void *_j) first = next; } + lfjour_cleanup_done(j); + CALL(j->cleanup_done, j, orig_first_seq, first ? first->seq : ~((u64) 0)); } @@ -452,4 +602,5 @@ lfjour_init(struct lfjour *j, struct settle_config *scf) .hook = lfjour_cleanup_hook, .data = j, }; + j->max_tokens = 20; } diff --git a/lib/lockfree.h b/lib/lockfree.h index ab7f7d0e..8dc8f82d 100644 --- a/lib/lockfree.h +++ b/lib/lockfree.h @@ -219,6 +219,7 @@ struct lfjour_recipient { enum lfjour_recipient_flags { LFJOUR_R_SEQ_RESET = 1, /* Signalling of sequence number reset */ + LFJOUR_R_LAST_RUNNER = 2, /* Set if this recipient is supposed to ping cleanup hook */ }; /* Defines lfjour_recipient_list */ @@ -237,6 +238,8 @@ struct lfjour { event announce_kick_event; /* Kicks announce_timer */ struct settle announce_timer; /* Announces changes to recipients */ event cleanup_event; /* Runs the journal cleanup routine */ + u64 max_tokens; /* Maximum number of cleanup tokens to issue */ + _Atomic u64 issued_tokens; /* Current count of issued tokens */ /* Callback on item removal from journal */ void (*item_done)(struct lfjour *, struct lfjour_item *);