0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2024-12-22 09:41:54 +00:00

Conflating multiple partial ROA reload requests together

This commit is contained in:
Maria Matejka 2024-06-20 11:58:23 +02:00
parent bd44a13ce5
commit 0b6e752bd9
7 changed files with 131 additions and 58 deletions

View File

@ -117,7 +117,7 @@ lfjour_push_commit(struct lfjour *j)
}
static struct lfjour_item *
lfjour_get_next(struct lfjour *j, struct lfjour_item *last)
lfjour_get_next(struct lfjour *j, const struct lfjour_item *last)
{
/* This is lockless, no domain checks. */
if (!last)
@ -158,36 +158,67 @@ lfjour_get_next(struct lfjour *j, struct lfjour_item *last)
struct lfjour_item *
lfjour_get(struct lfjour_recipient *r)
{
ASSERT_DIE(r->cur == NULL);
struct lfjour *j = lfjour_of_recipient(r);
/* The last pointer may get cleaned up under our hands.
* Indicating that we're using it, by RCU read. */
const struct lfjour_item *last = r->cur;
struct lfjour_item *next = NULL;
rcu_read_lock();
struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
r->cur = lfjour_get_next(j, last);
rcu_read_unlock();
if (last)
next = lfjour_get_next(j, r->cur);
else
{
/* The last pointer may get cleaned up under our hands.
* Indicating that we're using it, by RCU read. */
rcu_read_lock();
last = atomic_load_explicit(&r->last, memory_order_acquire);
next = lfjour_get_next(j, last);
rcu_read_unlock();
}
if (last)
{
lfjour_debug_detailed("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, last %p",
j, r, r->cur, r->cur ? r->cur->seq : 0ULL, last);
j, r, next, next ? next->seq : 0ULL, last);
}
else
{
lfjour_debug("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, clean",
j, r, r->cur, r->cur ? r->cur->seq : 0ULL);
j, r, next, next ? next->seq : 0ULL);
}
return r->cur;
if (!next)
return NULL;
if (!r->first_holding_seq)
r->first_holding_seq = next->seq;
return r->cur = next;
}
void lfjour_release(struct lfjour_recipient *r)
void lfjour_release(struct lfjour_recipient *r, const struct lfjour_item *it)
{
/* This is lockless, no domain checks. */
/* Find out what we actually released last */
rcu_read_lock();
const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
struct lfjour_block *last_block = last ? PAGE_HEAD(last) : NULL;
rcu_read_unlock();
/* This is lockless, no domain checks. */
ASSERT_DIE(r->cur);
/* Partial or full release? */
ASSERT_DIE(r->first_holding_seq);
ASSERT_DIE(it->seq >= r->first_holding_seq);
if (it->seq < r->cur->seq)
{
lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, partial upto seq=%lu",
j, r, it, it->seq);
r->first_holding_seq = it->seq + 1;
atomic_store_explicit(&r->last, it, memory_order_release);
return;
}
struct lfjour_block *block = PAGE_HEAD(r->cur);
u32 end = atomic_load_explicit(&block->end, memory_order_acquire);
@ -210,9 +241,10 @@ void lfjour_release(struct lfjour_recipient *r)
atomic_store_explicit(&r->last, r->cur, memory_order_release);
/* The last block may be available to free */
if (pos + 1 == end)
if ((pos + 1 == end) || last && (last_block != block))
lfjour_schedule_cleanup(j);
r->first_holding_seq = 0;
r->cur = NULL;
}
@ -276,7 +308,7 @@ lfjour_unregister(struct lfjour_recipient *r)
ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
if (r->cur)
lfjour_release(r);
lfjour_release(r, r->cur);
lfjour_recipient_rem_node(&j->recipients, r);
lfjour_schedule_cleanup(j);
@ -297,7 +329,7 @@ lfjour_cleanup_hook(void *_j)
if (_locked) DG_LOCK(_locked);
u64 min_seq = ~((u64) 0);
struct lfjour_item *last_item_to_free = NULL;
const struct lfjour_item *last_item_to_free = NULL;
struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire);
if (!first)
@ -310,7 +342,7 @@ lfjour_cleanup_hook(void *_j)
WALK_TLIST(lfjour_recipient, r, &j->recipients)
{
struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
if (!last)
/* No last export means that the channel has exported nothing since last cleanup */
@ -333,7 +365,7 @@ lfjour_cleanup_hook(void *_j)
WALK_TLIST(lfjour_recipient, r, &j->recipients)
{
struct lfjour_item *last = last_item_to_free;
const struct lfjour_item *last = last_item_to_free;
/* This either succeeds if this item is the most-behind-one,
* or fails and gives us the actual last for debug output. */
if (atomic_compare_exchange_strong_explicit(

View File

@ -211,7 +211,8 @@ struct lfjour_recipient {
TLIST_DEFAULT_NODE;
event *event; /* Event running when something is in the journal */
event_list *target; /* Event target */
struct lfjour_item * _Atomic last; /* Last item processed */
const struct lfjour_item * _Atomic last; /* Last item processed */
u64 first_holding_seq; /* First item not released yet */
struct lfjour_item *cur; /* Processing this now */
_Atomic u64 recipient_flags; /* LFJOUR_R_* */
};
@ -248,7 +249,7 @@ struct lfjour_item *lfjour_push_prepare(struct lfjour *);
void lfjour_push_commit(struct lfjour *);
struct lfjour_item *lfjour_get(struct lfjour_recipient *);
void lfjour_release(struct lfjour_recipient *);
void lfjour_release(struct lfjour_recipient *, const struct lfjour_item *);
static inline _Bool lfjour_reset_seqno(struct lfjour_recipient *r)
{
return atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_SEQ_RESET, memory_order_acq_rel) & LFJOUR_R_SEQ_RESET;

View File

@ -394,17 +394,23 @@ struct roa_subscription {
void (*refeed_hook)(struct channel *, struct rt_feeding_request *);
struct lfjour_recipient digest_recipient;
event update_event;
struct rt_feeding_request rfr;
};
struct roa_reload_request {
struct rt_feeding_request req;
struct roa_subscription *s;
struct lfjour_item *item;
};
static void
channel_roa_reload_done(struct rt_feeding_request *req)
{
SKIP_BACK_DECLARE(struct roa_subscription, s, rfr, req);
ASSERT_DIE(s->c->channel_state == CS_UP);
SKIP_BACK_DECLARE(struct roa_reload_request, rrr, req, req);
ASSERT_DIE(rrr->s->c->channel_state == CS_UP);
lfjour_release(&s->digest_recipient);
ev_send(proto_work_list(s->c->proto), &s->update_event);
lfjour_release(&rrr->s->digest_recipient, rrr->item);
ev_send(proto_work_list(rrr->s->c->proto), &rrr->s->update_event);
mb_free(rrr);
/* FIXME: this should reset import/export filters if ACTION BLOCK */
}
@ -413,22 +419,24 @@ channel_roa_changed(void *_s)
{
struct roa_subscription *s = _s;
if (s->digest_recipient.cur)
return;
for (struct lfjour_item *it; it = lfjour_get(&s->digest_recipient); )
{
SKIP_BACK_DECLARE(struct rt_digest, rd, li, s->digest_recipient.cur);
struct roa_reload_request *rrr = mb_alloc(s->c->proto->pool, sizeof *rrr);
*rrr = (struct roa_reload_request) {
.req = {
.prefilter = {
.mode = TE_ADDR_TRIE,
.trie = rd->trie,
},
.done = channel_roa_reload_done,
},
.s = s,
.item = it,
};
if (!lfjour_get(&s->digest_recipient))
return;
SKIP_BACK_DECLARE(struct rt_digest, rd, li, s->digest_recipient.cur);
s->rfr = (struct rt_feeding_request) {
.prefilter = {
.mode = TE_ADDR_TRIE,
.trie = rd->trie,
},
.done = channel_roa_reload_done,
};
s->refeed_hook(s->c, &s->rfr);
s->refeed_hook(s->c, &rrr->req);
}
}
static inline void (*channel_roa_reload_hook(int dir))(struct channel *, struct rt_feeding_request *)
@ -572,6 +580,8 @@ channel_start_import(struct channel *c)
channel_reset_limit(c, &c->rx_limit, PLD_RX);
channel_reset_limit(c, &c->in_limit, PLD_IN);
bmap_init(&c->imported_map, c->proto->pool, 16);
memset(&c->import_stats, 0, sizeof(struct channel_import_stats));
DBG("%s.%s: Channel start import req=%p\n", c->proto->name, c->name, &c->in_req);
@ -694,9 +704,24 @@ channel_import_stopped(struct rt_import_request *req)
mb_free(c->in_req.name);
c->in_req.name = NULL;
bmap_free(&c->imported_map);
channel_check_stopped(c);
}
static u32
channel_reimport_next_feed_index(struct rt_export_feeder *f, u32 try_this)
{
SKIP_BACK_DECLARE(struct channel, c, reimporter, f);
while (!bmap_test(&c->imported_map, try_this))
if (!(try_this & (try_this - 1))) /* return every power of two to check for maximum */
return try_this;
else
try_this++;
return try_this;
}
static void
channel_do_reload(void *_c)
{
@ -704,6 +729,7 @@ channel_do_reload(void *_c)
RT_FEED_WALK(&c->reimporter, f)
{
_Bool seen = 0;
for (uint i = 0; i < f->count_routes; i++)
{
rte *r = &f->block[i];
@ -721,9 +747,14 @@ channel_do_reload(void *_c)
/* And reload the route */
rte_update(c, r->net, &new, new.src);
seen = 1;
}
}
if (!seen)
bmap_clear(&c->imported_map, f->ni->index);
/* Local data needed no more */
tmp_flush();
@ -739,6 +770,7 @@ channel_setup_in_table(struct channel *c)
c->reimporter = (struct rt_export_feeder) {
.name = mb_sprintf(c->proto->pool, "%s.%s.reimport", c->proto->name, c->name),
.trace_routes = c->debug,
.next_feed_index = channel_reimport_next_feed_index,
};
c->reimport_event = (event) {
.hook = channel_do_reload,

View File

@ -533,6 +533,7 @@ struct channel {
const struct filter *in_filter; /* Input filter */
const struct filter *out_filter; /* Output filter */
const net_addr *out_subprefix; /* Export only subprefixes of this net */
struct bmap imported_map; /* Which nets were touched by our import */
struct bmap export_accepted_map; /* Keeps track which routes were really exported */
struct bmap export_rejected_map; /* Keeps track which routes were rejected by export filter */

View File

@ -158,6 +158,7 @@ struct rt_export_request {
/* Feeding itself */
u32 feed_index; /* Index of the feed in progress */
u32 (*next_feed_index)(struct rt_export_feeder *, u32 try_this);
struct rt_feeding_request {
struct rt_feeding_request *next; /* Next in request chain */
void (*done)(struct rt_feeding_request *);/* Called when this refeed finishes */

View File

@ -52,7 +52,7 @@ rt_export_get(struct rt_export_request *r)
} while (0)
#define NOT_THIS_UPDATE \
lfjour_release(&r->r); \
lfjour_release(&r->r, &update->li); \
continue;
while (1)
@ -200,7 +200,7 @@ rt_export_release(const struct rt_export_union *u)
case RT_EXPORT_UPDATE:
rtex_trace(r, D_ROUTES, "Export %lu released", u->update->seq);
lfjour_release(&r->r);
lfjour_release(&r->r, &u->update->li);
break;
@ -272,16 +272,18 @@ rt_export_get_next_feed(struct rt_export_feeder *f, struct rcu_unwinder *u)
return NULL;
}
#define NEXT_INDEX(f) f->feed_index = f->next_feed_index ? f->next_feed_index(f, f->feed_index + 1) : f->feed_index + 1
#define NOT_THIS_FEED(...) { \
rtex_trace(f, D_ROUTES, __VA_ARGS__); \
f->feed_index++; \
NEXT_INDEX(f); \
continue; \
}
if (!feed)
NOT_THIS_FEED("Nothing found for index %u", f->feed_index);
f->feed_index++;
NEXT_INDEX(f);
return feed;
}
@ -319,18 +321,19 @@ rt_export_next_feed(struct rt_export_feeder *f)
f->feed_index = 0;
if (f->feed_pending)
{
rtex_trace(f, D_STATES, "Feeding done, refeed request pending");
f->feeding = f->feed_pending;
f->feed_pending = NULL;
return rt_export_next_feed(f);
}
else
{
rtex_trace(f, D_STATES, "Feeding done (%u)", f->feed_index);
uint count = 0;
for (struct rt_feeding_request *rfr = f->feed_pending; rfr; rfr = rfr->next)
count++;
rtex_trace(f, D_STATES, "Feeding done, %u refeed request%s pending",
count, (count == 1) ? "" : "s");
if (!f->feed_pending)
return NULL;
}
f->feeding = f->feed_pending;
f->feed_pending = NULL;
return rt_export_next_feed(f);
}
static void

View File

@ -477,7 +477,7 @@ rt_aggregate_roa(void *_rag)
{
struct rt_roa_aggregator *rag = _rag;
RT_EXPORT_WALK(&rag->src, u)
RT_EXPORT_WALK(&rag->src, u) TMP_SAVED
{
const net_addr *nroa = NULL;
struct rte_src *src = NULL;
@ -529,7 +529,7 @@ rt_aggregate_roa(void *_rag)
SKIP_BACK_DECLARE(struct rt_roa_aggregated_adata, rad, ad, ea->u.ptr);
count = ROA_AGGR_COUNT(rad);
rad_new = alloca(sizeof *rad_new + (count + 1) * sizeof rad_new->u[0]);
rad_new = tmp_alloc(sizeof *rad_new + (count + 1) * sizeof rad_new->u[0]);
/* Insertion into a sorted list */
uint p = 0;
@ -559,7 +559,7 @@ rt_aggregate_roa(void *_rag)
else if (src)
{
count = 1;
rad_new = alloca(sizeof *rad_new + sizeof rad_new->u[0]);
rad_new = tmp_alloc(sizeof *rad_new + sizeof rad_new->u[0]);
rad_new->u[0].asn = asn;
rad_new->u[0].max_pxlen = max_pxlen;
}
@ -1988,6 +1988,9 @@ channel_preimport(struct rt_import_request *req, rte *new, const rte *old)
mpls_rte_preimport(new_in ? new : NULL, old_in ? old : NULL);
if (new)
bmap_set(&c->imported_map, NET_TO_INDEX(new->net)->index);
return verdict;
}