diff --git a/lib/lockfree.c b/lib/lockfree.c index 2d57b46a..c68899d3 100644 --- a/lib/lockfree.c +++ b/lib/lockfree.c @@ -38,6 +38,27 @@ void lfuc_unlock_deferred(struct deferred_call *dc) off / s; \ }) + +static void +lfjour_return_cleanup_token(struct lfjour *j, struct lfjour_recipient *r) +{ + /* Eligible for requesting cleanup */ + if (!(atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER)) + return; + + /* Return the lastrunner's token. */ + u64 pings = atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + + /* Is this the last token? */ + if (pings > 1) + return; + + ASSERT_DIE(pings != 0); + + /* No more cleanup tokens issued, request cleanup. */ + lfjour_schedule_cleanup(j); +} + struct lfjour_item * lfjour_push_prepare(struct lfjour *j) { @@ -228,7 +249,7 @@ void lfjour_release(struct lfjour_recipient *r, const struct lfjour_item *it) /* Releasing this export for cleanup routine */ if (pos + 1 == end) - { +{ lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, seq=%lu (end)", j, r, r->cur, r->cur->seq); } @@ -242,7 +263,7 @@ void lfjour_release(struct lfjour_recipient *r, const struct lfjour_item *it) /* The last block may be available to free */ if ((pos + 1 == end) || last && (last_block != block)) - lfjour_schedule_cleanup(j); + lfjour_return_cleanup_token(j, r); r->first_holding_seq = 0; r->cur = NULL; @@ -299,6 +320,13 @@ lfjour_register(struct lfjour *j, struct lfjour_recipient *r) ASSERT_DIE(!r->cur); lfjour_recipient_add_tail(&j->recipients, r); + + if (j->issued_tokens < j->max_tokens) + { + /* Cleanup hook does not run, so we are sure j->issued_tokens is not increasing */ + atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + atomic_fetch_or_explicit(&r->recipient_flags, LFJOUR_R_LAST_RUNNER, memory_order_acq_rel); + } } void @@ -311,7 +339,7 @@ lfjour_unregister(struct lfjour_recipient *r) lfjour_release(r, r->cur); lfjour_recipient_rem_node(&j->recipients, r); - lfjour_schedule_cleanup(j); + lfjour_return_cleanup_token(j, r); } static inline void lfjour_cleanup_unlock_helper(struct domain_generic **dg) @@ -320,6 +348,110 @@ static inline void lfjour_cleanup_unlock_helper(struct domain_generic **dg) DG_UNLOCK(*dg); } +static bool +lfjour_issue_cleanup_token(struct lfjour *j, struct lfjour_recipient* rec) +{ + /* This function keeps the invariant that the total number of issued + * LFJOUR_R_LAST_RUNNER flags is the same as j->issued_tokens. + * + * Returs true if the token was successfully issued. + */ + + /* The journal is not empty and the recipient is on seq_max, + * so it won't call cleanup */ + const struct lfjour_item *last = atomic_load_explicit(&rec->last, memory_order_acquire); + if (last && last->seq == (j->next_seq - 1)) + return false; + + /* Take a token from the pile */ + if (atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel) >= j->max_tokens) + { + /* No more tokens to issue, already at max */ + atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + return false; + } + + /* Trying to give the token */ + if (atomic_fetch_or_explicit(&rec->recipient_flags, LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER) + { + /* This recipient already has the token */ + atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + return false; + } + + /* Has't the recipient finished inbetween? (Recheck.) */ + last = atomic_load_explicit(&rec->last, memory_order_acquire); + if (last && last->seq == (j->next_seq - 1)) + { + /* It has! Retreat! */ + if (atomic_fetch_and_explicit(&rec->recipient_flags, ~LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER) + { + /* The flag was still set, we managed to get it back safely. + * Now the recipient won't call the cleanup. */ + atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + return false; + } + + /* The recipient was quick enough to finish the task and + * grab the flag. Now we are going to get pinged anyway + * so we consider the flag to be issued. */ + } + + /* Now the token is issued. */ + return true; +} + +static void +lfjour_cleanup_done(struct lfjour *j) +{ + /* Returning the cleanup token. */ + u64 pings = atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + + /* Somebody else is also holding a token, nothing to do. + * They'll call us when the time comes. */ + if (pings > 1) + return; + + ASSERT_DIE(pings == 1); + + /* We'll need to know whether the journal is empty or not. */ + struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire); + + /* No recipients, schedule one more cleanup if not empty */ + if (EMPTY_TLIST(lfjour_recipient, &j->recipients)) + { + if (first) + lfjour_schedule_cleanup(j); + return; + } + + /* There are some recipients but nothing to clean. + * Somebody is going to wake us, let's just throw + * the token to the crowd. */ + if (!first) + { + WALK_TLIST(lfjour_recipient, r, &j->recipients) + /* If we failed to issue a cleanup token, there is definitely + * somebody holding it, no more tokens needed. */ + if (! lfjour_issue_cleanup_token(j, r)) + { + ASSERT_DIE(atomic_load_explicit(&j->issued_tokens, memory_order_acquire) > 0); + return; + } + } + + /* We have to find some recipient which has not + * yet finished. When we find it, it will happily + * accept the token so we are done. */ + WALK_TLIST(lfjour_recipient, r, &j->recipients) + if (lfjour_issue_cleanup_token(j, r)) + return; + + /* Nobody needs anything, but the journal is not empty. + * Run cleanup again. */ + lfjour_schedule_cleanup(j); +} + static void lfjour_cleanup_hook(void *_j) { @@ -328,15 +460,22 @@ lfjour_cleanup_hook(void *_j) CLEANUP(lfjour_cleanup_unlock_helper) struct domain_generic *_locked = j->domain; if (_locked) DG_LOCK(_locked); + /* We consider ourselves holding one token to keep recipients + * from scheduling us too early. */ + atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel); + u64 min_seq = ~((u64) 0); const struct lfjour_item *last_item_to_free = NULL; struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire); if (!first) { + lfjour_cleanup_done(j); + /* Nothing to cleanup, actually, just call the done callback */ ASSERT_DIE(EMPTY_TLIST(lfjour_block, &j->pending)); CALL(j->cleanup_done, j, 0, ~((u64) 0)); + return; } @@ -345,8 +484,14 @@ lfjour_cleanup_hook(void *_j) const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire); if (!last) - /* No last export means that the channel has exported nothing since last cleanup */ + { + /* No last export means that the channel has exported nothing since + * last cleanup. Let's try to give them the token, + * and we're done anyway. */ + lfjour_issue_cleanup_token(j, r); + lfjour_cleanup_done(j); return; + } else if (min_seq > last->seq) { @@ -373,6 +518,9 @@ lfjour_cleanup_hook(void *_j) memory_order_acq_rel, memory_order_acquire)) { lfjour_debug("lfjour(%p)_cleanup(recipient=%p): store last=NULL", j, r); + + /* The most-behind-one gets the cleanup token */ + lfjour_issue_cleanup_token(j, r); } else { @@ -429,6 +577,8 @@ lfjour_cleanup_hook(void *_j) first = next; } + lfjour_cleanup_done(j); + CALL(j->cleanup_done, j, orig_first_seq, first ? first->seq : ~((u64) 0)); } @@ -452,4 +602,5 @@ lfjour_init(struct lfjour *j, struct settle_config *scf) .hook = lfjour_cleanup_hook, .data = j, }; + j->max_tokens = 20; } diff --git a/lib/lockfree.h b/lib/lockfree.h index ab7f7d0e..8dc8f82d 100644 --- a/lib/lockfree.h +++ b/lib/lockfree.h @@ -219,6 +219,7 @@ struct lfjour_recipient { enum lfjour_recipient_flags { LFJOUR_R_SEQ_RESET = 1, /* Signalling of sequence number reset */ + LFJOUR_R_LAST_RUNNER = 2, /* Set if this recipient is supposed to ping cleanup hook */ }; /* Defines lfjour_recipient_list */ @@ -237,6 +238,8 @@ struct lfjour { event announce_kick_event; /* Kicks announce_timer */ struct settle announce_timer; /* Announces changes to recipients */ event cleanup_event; /* Runs the journal cleanup routine */ + u64 max_tokens; /* Maximum number of cleanup tokens to issue */ + _Atomic u64 issued_tokens; /* Current count of issued tokens */ /* Callback on item removal from journal */ void (*item_done)(struct lfjour *, struct lfjour_item *);