diff --git a/lib/lockfree.c b/lib/lockfree.c index 3a2ccab7..17c17d18 100644 --- a/lib/lockfree.c +++ b/lib/lockfree.c @@ -117,7 +117,7 @@ lfjour_push_commit(struct lfjour *j) } static struct lfjour_item * -lfjour_get_next(struct lfjour *j, struct lfjour_item *last) +lfjour_get_next(struct lfjour *j, const struct lfjour_item *last) { /* This is lockless, no domain checks. */ if (!last) @@ -158,36 +158,67 @@ lfjour_get_next(struct lfjour *j, struct lfjour_item *last) struct lfjour_item * lfjour_get(struct lfjour_recipient *r) { - ASSERT_DIE(r->cur == NULL); struct lfjour *j = lfjour_of_recipient(r); - /* The last pointer may get cleaned up under our hands. - * Indicating that we're using it, by RCU read. */ + const struct lfjour_item *last = r->cur; + struct lfjour_item *next = NULL; - rcu_read_lock(); - struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire); - r->cur = lfjour_get_next(j, last); - rcu_read_unlock(); + if (last) + next = lfjour_get_next(j, r->cur); + else + { + /* The last pointer may get cleaned up under our hands. + * Indicating that we're using it, by RCU read. */ + + rcu_read_lock(); + last = atomic_load_explicit(&r->last, memory_order_acquire); + next = lfjour_get_next(j, last); + rcu_read_unlock(); + } if (last) { lfjour_debug_detailed("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, last %p", - j, r, r->cur, r->cur ? r->cur->seq : 0ULL, last); + j, r, next, next ? next->seq : 0ULL, last); } else { lfjour_debug("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, clean", - j, r, r->cur, r->cur ? r->cur->seq : 0ULL); + j, r, next, next ? next->seq : 0ULL); } - return r->cur; + if (!next) + return NULL; + + if (!r->first_holding_seq) + r->first_holding_seq = next->seq; + + return r->cur = next; } -void lfjour_release(struct lfjour_recipient *r) +void lfjour_release(struct lfjour_recipient *r, const struct lfjour_item *it) { - /* This is lockless, no domain checks. */ + /* Find out what we actually released last */ + rcu_read_lock(); + const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire); + struct lfjour_block *last_block = last ? PAGE_HEAD(last) : NULL; + rcu_read_unlock(); + /* This is lockless, no domain checks. */ ASSERT_DIE(r->cur); + + /* Partial or full release? */ + ASSERT_DIE(r->first_holding_seq); + ASSERT_DIE(it->seq >= r->first_holding_seq); + if (it->seq < r->cur->seq) + { + lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, partial upto seq=%lu", + j, r, it, it->seq); + r->first_holding_seq = it->seq + 1; + atomic_store_explicit(&r->last, it, memory_order_release); + return; + } + struct lfjour_block *block = PAGE_HEAD(r->cur); u32 end = atomic_load_explicit(&block->end, memory_order_acquire); @@ -210,9 +241,10 @@ void lfjour_release(struct lfjour_recipient *r) atomic_store_explicit(&r->last, r->cur, memory_order_release); /* The last block may be available to free */ - if (pos + 1 == end) + if ((pos + 1 == end) || last && (last_block != block)) lfjour_schedule_cleanup(j); + r->first_holding_seq = 0; r->cur = NULL; } @@ -276,7 +308,7 @@ lfjour_unregister(struct lfjour_recipient *r) ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain)); if (r->cur) - lfjour_release(r); + lfjour_release(r, r->cur); lfjour_recipient_rem_node(&j->recipients, r); lfjour_schedule_cleanup(j); @@ -297,7 +329,7 @@ lfjour_cleanup_hook(void *_j) if (_locked) DG_LOCK(_locked); u64 min_seq = ~((u64) 0); - struct lfjour_item *last_item_to_free = NULL; + const struct lfjour_item *last_item_to_free = NULL; struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire); if (!first) @@ -310,7 +342,7 @@ lfjour_cleanup_hook(void *_j) WALK_TLIST(lfjour_recipient, r, &j->recipients) { - struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire); + const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire); if (!last) /* No last export means that the channel has exported nothing since last cleanup */ @@ -333,7 +365,7 @@ lfjour_cleanup_hook(void *_j) WALK_TLIST(lfjour_recipient, r, &j->recipients) { - struct lfjour_item *last = last_item_to_free; + const struct lfjour_item *last = last_item_to_free; /* This either succeeds if this item is the most-behind-one, * or fails and gives us the actual last for debug output. */ if (atomic_compare_exchange_strong_explicit( diff --git a/lib/lockfree.h b/lib/lockfree.h index 0553aac1..f99704b3 100644 --- a/lib/lockfree.h +++ b/lib/lockfree.h @@ -211,7 +211,8 @@ struct lfjour_recipient { TLIST_DEFAULT_NODE; event *event; /* Event running when something is in the journal */ event_list *target; /* Event target */ - struct lfjour_item * _Atomic last; /* Last item processed */ + const struct lfjour_item * _Atomic last; /* Last item processed */ + u64 first_holding_seq; /* First item not released yet */ struct lfjour_item *cur; /* Processing this now */ _Atomic u64 recipient_flags; /* LFJOUR_R_* */ }; @@ -248,7 +249,7 @@ struct lfjour_item *lfjour_push_prepare(struct lfjour *); void lfjour_push_commit(struct lfjour *); struct lfjour_item *lfjour_get(struct lfjour_recipient *); -void lfjour_release(struct lfjour_recipient *); +void lfjour_release(struct lfjour_recipient *, const struct lfjour_item *); static inline _Bool lfjour_reset_seqno(struct lfjour_recipient *r) { return atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_SEQ_RESET, memory_order_acq_rel) & LFJOUR_R_SEQ_RESET; diff --git a/nest/proto.c b/nest/proto.c index f74e8063..85344c04 100644 --- a/nest/proto.c +++ b/nest/proto.c @@ -394,17 +394,23 @@ struct roa_subscription { void (*refeed_hook)(struct channel *, struct rt_feeding_request *); struct lfjour_recipient digest_recipient; event update_event; - struct rt_feeding_request rfr; +}; + +struct roa_reload_request { + struct rt_feeding_request req; + struct roa_subscription *s; + struct lfjour_item *item; }; static void channel_roa_reload_done(struct rt_feeding_request *req) { - SKIP_BACK_DECLARE(struct roa_subscription, s, rfr, req); - ASSERT_DIE(s->c->channel_state == CS_UP); + SKIP_BACK_DECLARE(struct roa_reload_request, rrr, req, req); + ASSERT_DIE(rrr->s->c->channel_state == CS_UP); - lfjour_release(&s->digest_recipient); - ev_send(proto_work_list(s->c->proto), &s->update_event); + lfjour_release(&rrr->s->digest_recipient, rrr->item); + ev_send(proto_work_list(rrr->s->c->proto), &rrr->s->update_event); + mb_free(rrr); /* FIXME: this should reset import/export filters if ACTION BLOCK */ } @@ -413,22 +419,24 @@ channel_roa_changed(void *_s) { struct roa_subscription *s = _s; - if (s->digest_recipient.cur) - return; + for (struct lfjour_item *it; it = lfjour_get(&s->digest_recipient); ) + { + SKIP_BACK_DECLARE(struct rt_digest, rd, li, s->digest_recipient.cur); + struct roa_reload_request *rrr = mb_alloc(s->c->proto->pool, sizeof *rrr); + *rrr = (struct roa_reload_request) { + .req = { + .prefilter = { + .mode = TE_ADDR_TRIE, + .trie = rd->trie, + }, + .done = channel_roa_reload_done, + }, + .s = s, + .item = it, + }; - if (!lfjour_get(&s->digest_recipient)) - return; - - SKIP_BACK_DECLARE(struct rt_digest, rd, li, s->digest_recipient.cur); - s->rfr = (struct rt_feeding_request) { - .prefilter = { - .mode = TE_ADDR_TRIE, - .trie = rd->trie, - }, - .done = channel_roa_reload_done, - }; - - s->refeed_hook(s->c, &s->rfr); + s->refeed_hook(s->c, &rrr->req); + } } static inline void (*channel_roa_reload_hook(int dir))(struct channel *, struct rt_feeding_request *) @@ -572,6 +580,8 @@ channel_start_import(struct channel *c) channel_reset_limit(c, &c->rx_limit, PLD_RX); channel_reset_limit(c, &c->in_limit, PLD_IN); + bmap_init(&c->imported_map, c->proto->pool, 16); + memset(&c->import_stats, 0, sizeof(struct channel_import_stats)); DBG("%s.%s: Channel start import req=%p\n", c->proto->name, c->name, &c->in_req); @@ -694,9 +704,24 @@ channel_import_stopped(struct rt_import_request *req) mb_free(c->in_req.name); c->in_req.name = NULL; + bmap_free(&c->imported_map); + channel_check_stopped(c); } +static u32 +channel_reimport_next_feed_index(struct rt_export_feeder *f, u32 try_this) +{ + SKIP_BACK_DECLARE(struct channel, c, reimporter, f); + while (!bmap_test(&c->imported_map, try_this)) + if (!(try_this & (try_this - 1))) /* return every power of two to check for maximum */ + return try_this; + else + try_this++; + + return try_this; +} + static void channel_do_reload(void *_c) { @@ -704,6 +729,7 @@ channel_do_reload(void *_c) RT_FEED_WALK(&c->reimporter, f) { + _Bool seen = 0; for (uint i = 0; i < f->count_routes; i++) { rte *r = &f->block[i]; @@ -721,9 +747,14 @@ channel_do_reload(void *_c) /* And reload the route */ rte_update(c, r->net, &new, new.src); + + seen = 1; } } + if (!seen) + bmap_clear(&c->imported_map, f->ni->index); + /* Local data needed no more */ tmp_flush(); @@ -739,6 +770,7 @@ channel_setup_in_table(struct channel *c) c->reimporter = (struct rt_export_feeder) { .name = mb_sprintf(c->proto->pool, "%s.%s.reimport", c->proto->name, c->name), .trace_routes = c->debug, + .next_feed_index = channel_reimport_next_feed_index, }; c->reimport_event = (event) { .hook = channel_do_reload, diff --git a/nest/protocol.h b/nest/protocol.h index ad43e9d9..bbb76a8a 100644 --- a/nest/protocol.h +++ b/nest/protocol.h @@ -533,6 +533,7 @@ struct channel { const struct filter *in_filter; /* Input filter */ const struct filter *out_filter; /* Output filter */ const net_addr *out_subprefix; /* Export only subprefixes of this net */ + struct bmap imported_map; /* Which nets were touched by our import */ struct bmap export_accepted_map; /* Keeps track which routes were really exported */ struct bmap export_rejected_map; /* Keeps track which routes were rejected by export filter */ diff --git a/nest/route.h b/nest/route.h index e64c8d8b..c640a8a2 100644 --- a/nest/route.h +++ b/nest/route.h @@ -158,6 +158,7 @@ struct rt_export_request { /* Feeding itself */ u32 feed_index; /* Index of the feed in progress */ + u32 (*next_feed_index)(struct rt_export_feeder *, u32 try_this); struct rt_feeding_request { struct rt_feeding_request *next; /* Next in request chain */ void (*done)(struct rt_feeding_request *);/* Called when this refeed finishes */ diff --git a/nest/rt-export.c b/nest/rt-export.c index 1dd536a5..b991b975 100644 --- a/nest/rt-export.c +++ b/nest/rt-export.c @@ -52,7 +52,7 @@ rt_export_get(struct rt_export_request *r) } while (0) #define NOT_THIS_UPDATE \ - lfjour_release(&r->r); \ + lfjour_release(&r->r, &update->li); \ continue; while (1) @@ -200,7 +200,7 @@ rt_export_release(const struct rt_export_union *u) case RT_EXPORT_UPDATE: rtex_trace(r, D_ROUTES, "Export %lu released", u->update->seq); - lfjour_release(&r->r); + lfjour_release(&r->r, &u->update->li); break; @@ -272,16 +272,18 @@ rt_export_get_next_feed(struct rt_export_feeder *f, struct rcu_unwinder *u) return NULL; } +#define NEXT_INDEX(f) f->feed_index = f->next_feed_index ? f->next_feed_index(f, f->feed_index + 1) : f->feed_index + 1 + #define NOT_THIS_FEED(...) { \ rtex_trace(f, D_ROUTES, __VA_ARGS__); \ - f->feed_index++; \ + NEXT_INDEX(f); \ continue; \ } if (!feed) NOT_THIS_FEED("Nothing found for index %u", f->feed_index); - f->feed_index++; + NEXT_INDEX(f); return feed; } @@ -319,18 +321,19 @@ rt_export_next_feed(struct rt_export_feeder *f) f->feed_index = 0; - if (f->feed_pending) - { - rtex_trace(f, D_STATES, "Feeding done, refeed request pending"); - f->feeding = f->feed_pending; - f->feed_pending = NULL; - return rt_export_next_feed(f); - } - else - { - rtex_trace(f, D_STATES, "Feeding done (%u)", f->feed_index); + uint count = 0; + for (struct rt_feeding_request *rfr = f->feed_pending; rfr; rfr = rfr->next) + count++; + + rtex_trace(f, D_STATES, "Feeding done, %u refeed request%s pending", + count, (count == 1) ? "" : "s"); + + if (!f->feed_pending) return NULL; - } + + f->feeding = f->feed_pending; + f->feed_pending = NULL; + return rt_export_next_feed(f); } static void diff --git a/nest/rt-table.c b/nest/rt-table.c index 276f7b47..4a27bc3a 100644 --- a/nest/rt-table.c +++ b/nest/rt-table.c @@ -477,7 +477,7 @@ rt_aggregate_roa(void *_rag) { struct rt_roa_aggregator *rag = _rag; - RT_EXPORT_WALK(&rag->src, u) + RT_EXPORT_WALK(&rag->src, u) TMP_SAVED { const net_addr *nroa = NULL; struct rte_src *src = NULL; @@ -529,7 +529,7 @@ rt_aggregate_roa(void *_rag) SKIP_BACK_DECLARE(struct rt_roa_aggregated_adata, rad, ad, ea->u.ptr); count = ROA_AGGR_COUNT(rad); - rad_new = alloca(sizeof *rad_new + (count + 1) * sizeof rad_new->u[0]); + rad_new = tmp_alloc(sizeof *rad_new + (count + 1) * sizeof rad_new->u[0]); /* Insertion into a sorted list */ uint p = 0; @@ -559,7 +559,7 @@ rt_aggregate_roa(void *_rag) else if (src) { count = 1; - rad_new = alloca(sizeof *rad_new + sizeof rad_new->u[0]); + rad_new = tmp_alloc(sizeof *rad_new + sizeof rad_new->u[0]); rad_new->u[0].asn = asn; rad_new->u[0].max_pxlen = max_pxlen; } @@ -1988,6 +1988,9 @@ channel_preimport(struct rt_import_request *req, rte *new, const rte *old) mpls_rte_preimport(new_in ? new : NULL, old_in ? old : NULL); + if (new) + bmap_set(&c->imported_map, NET_TO_INDEX(new->net)->index); + return verdict; }