0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2025-01-23 17:31:55 +00:00
bird/nest/rt-export.c
Maria Matejka 0a2f92ad20 Table: not feeding twice, once is enough
If there is no feed pending, the requested one should be
activated immediately, otherwise it is activated only after
the full run, effectively running first a full feed and
then the requested one.
2024-12-19 11:54:05 +01:00

601 lines
14 KiB
C

/*
* BIRD -- Route Export Mechanisms
*
* (c) 2024 Maria Matejka <mq@jmq.cz>
*
* Can be freely distributed and used under the terms of the GNU GPL.
*/
#include "nest/bird.h"
#include "nest/route.h"
#include "nest/protocol.h"
struct rt_export_feed rt_feed_index_out_of_range;
#define rtex_trace(_req, _cat, msg, args...) do { \
if ((_req)->trace_routes & _cat) \
log(L_TRACE "%s: " msg, (_req)->name, ##args); \
} while (0)
static inline enum rt_export_state
rt_export_change_state(struct rt_export_request *r, u32 expected_mask, enum rt_export_state state)
{
r->last_state_change = current_time();
enum rt_export_state old = atomic_exchange_explicit(&r->export_state, state, memory_order_acq_rel);
if (!((1 << old) & expected_mask))
bug("Unexpected export state change from %s to %s, expected mask %02x",
rt_export_state_name(old),
rt_export_state_name(state),
expected_mask
);
rtex_trace(r, D_STATES, "Export state changed from %s to %s",
rt_export_state_name(old), rt_export_state_name(state));
return old;
}
const struct rt_export_union *
rt_export_get(struct rt_export_request *r)
{
ASSERT_DIE(!r->cur);
#define EXPORT_FOUND(_kind) do { \
struct rt_export_union *reu = tmp_alloc(sizeof *reu); \
*reu = (struct rt_export_union) { \
.kind = _kind, \
.req = r, \
.update = update, \
.feed = feed, \
}; \
return (r->cur = reu); \
} while (0)
#define NOT_THIS_UPDATE \
lfjour_release(&r->r, &update->li); \
continue;
while (1)
{
enum rt_export_state es = rt_export_get_state(r);
switch (es)
{
case TES_DOWN:
rtex_trace(r, (D_ROUTES|D_STATES), "Export is down");
return NULL;
case TES_STOP:
rtex_trace(r, (D_ROUTES|D_STATES), "Received stop event");
struct rt_export_union *reu = tmp_alloc(sizeof *reu);
*reu = (struct rt_export_union) {
.kind = RT_EXPORT_STOP,
.req = r,
};
return (r->cur = reu);
case TES_PARTIAL:
case TES_FEEDING:
case TES_READY:
break;
case TES_MAX:
bug("invalid export state");
}
/* Process sequence number reset event */
if (lfjour_reset_seqno(&r->r))
bmap_reset(&r->seq_map, 4);
/* Get a new update */
SKIP_BACK_DECLARE(struct rt_export_item, update, li, lfjour_get(&r->r));
SKIP_BACK_DECLARE(struct rt_exporter, e, journal, lfjour_of_recipient(&r->r));
struct rt_export_feed *feed = NULL;
/* No update, try feed */
if (!update)
{
if (es == TES_READY)
{
/* Fed up of feeding */
rtex_trace(r, D_ROUTES, "Export drained");
return NULL;
}
else if (feed = rt_export_next_feed(&r->feeder))
{
/* Feeding more */
rtex_trace(r, D_ROUTES, "Feeding %N", feed->ni->addr);
EXPORT_FOUND(RT_EXPORT_FEED);
}
else if (rt_export_get_state(r) == TES_DOWN)
{
/* Torn down inbetween */
rtex_trace(r, D_STATES, "Export ended itself");
return NULL;
}
else
{
/* No more food */
rt_export_change_state(r, BIT32_ALL(TES_FEEDING, TES_PARTIAL), TES_READY);
rtex_trace(r, D_STATES, "Fed up");
CALL(r->fed, r);
return NULL;
}
}
/* There actually is an update */
if (bmap_test(&r->seq_map, update->seq))
{
/* But this update has been already processed, let's try another one */
rtex_trace(r, D_ROUTES, "Skipping an already processed update %lu", update->seq);
NOT_THIS_UPDATE;
}
/* Is this update allowed by prefilter? */
const net_addr *n = (update->new ?: update->old)->net;
struct netindex *ni = NET_TO_INDEX(n);
if (!rt_prefilter_net(&r->feeder.prefilter, n))
{
rtex_trace(r, D_ROUTES, "Not exporting %N due to prefilter", n);
NOT_THIS_UPDATE;
}
if ((es != TES_READY) && rt_net_is_feeding(r, n))
{
/* But this net shall get a feed first! */
rtex_trace(r, D_ROUTES, "Expediting %N feed due to pending update %lu", n, update->seq);
if (r->feeder.domain.rtable)
{
LOCK_DOMAIN(rtable, r->feeder.domain);
feed = e->feed_net(e, NULL, ni->index, NULL, NULL, update);
UNLOCK_DOMAIN(rtable, r->feeder.domain);
}
else
{
RCU_ANCHOR(u);
feed = e->feed_net(e, u, ni->index, NULL, NULL, update);
}
ASSERT_DIE(feed && (feed != &rt_feed_index_out_of_range));
EXPORT_FOUND(RT_EXPORT_FEED);
}
/* OK, now this actually is an update, thank you for your patience */
rtex_trace(r, D_ROUTES, "Updating %N, seq %lu", n, update->seq);
EXPORT_FOUND(RT_EXPORT_UPDATE);
}
#undef NOT_THIS_UPDATE
#undef EXPORT_FOUND
}
void
rt_export_release(const struct rt_export_union *u)
{
/* May be already released */
if (!u->req)
return;
struct rt_export_request *r = u->req;
/* Must be crosslinked */
ASSERT_DIE(r->cur == u);
r->cur = NULL;
switch (u->kind)
{
case RT_EXPORT_FEED:
for (uint i = 0; i < u->feed->count_exports; i++)
bmap_set(&r->seq_map, u->feed->exports[i]);
bmap_set(&r->feed_map, u->feed->ni->index);
if (!u->update)
break;
/* fall through */
case RT_EXPORT_UPDATE:
rtex_trace(r, D_ROUTES, "Export %lu released", u->update->seq);
lfjour_release(&r->r, &u->update->li);
break;
case RT_EXPORT_STOP:
/* Checking that we have indeed stopped the exporter */
ASSERT_DIE(rt_export_get_state(r) == TES_DOWN);
rtex_trace(r, D_ROUTES, "Export stopped");
break;
default:
bug("strange export kind");
}
}
void
rt_export_processed(struct rt_export_request *r, u64 seq)
{
rtex_trace(r, D_ROUTES, "Marking export %lu as processed", seq);
/* Check sequence number reset event */
if (lfjour_reset_seqno(&r->r))
bmap_reset(&r->seq_map, 4);
ASSERT_DIE(!bmap_test(&r->seq_map, seq));
bmap_set(&r->seq_map, seq);
}
struct rt_export_feed *
rt_alloc_feed(uint routes, uint exports)
{
struct rt_export_feed *feed;
uint size = sizeof *feed
+ routes * sizeof *feed->block + _Alignof(typeof(*feed->block))
+ exports * sizeof *feed->exports + _Alignof(typeof(*feed->exports));
feed = tmp_alloc(size);
feed->count_routes = routes;
feed->count_exports = exports;
BIRD_SET_ALIGNED_POINTER(feed->block, feed->data);
BIRD_SET_ALIGNED_POINTER(feed->exports, &feed->block[routes]);
/* Consistency check */
ASSERT_DIE(((void *) &feed->exports[exports]) <= ((void *) feed) + size);
return feed;
}
static struct rt_export_feed *
rt_export_get_next_feed(struct rt_export_feeder *f, struct rcu_unwinder *u)
{
for (uint retry = 0; retry < (u ? 1024 : ~0U); retry++)
{
ASSERT_DIE(u || DOMAIN_IS_LOCKED(rtable, f->domain));
struct rt_exporter *e = atomic_load_explicit(&f->exporter, memory_order_acquire);
if (!e)
{
rtex_trace(f, (D_ROUTES|D_STATES), "Exporter kicked us away");
return NULL;
}
struct rt_export_feed *feed = e->feed_net(e, u, f->feed_index,
rt_net_is_feeding_feeder, f, NULL);
if (feed == &rt_feed_index_out_of_range)
{
rtex_trace(f, D_ROUTES, "Nothing more to feed", f->feed_index);
f->feed_index = ~0;
return NULL;
}
#define NEXT_INDEX(f) f->feed_index = f->next_feed_index ? f->next_feed_index(f, f->feed_index + 1) : f->feed_index + 1
#define NOT_THIS_FEED(...) { \
rtex_trace(f, D_ROUTES, __VA_ARGS__); \
NEXT_INDEX(f); \
continue; \
}
if (!feed)
NOT_THIS_FEED("Nothing found for index %u", f->feed_index);
NEXT_INDEX(f);
return feed;
}
RCU_RETRY_FAST(u);
}
struct rt_export_feed *
rt_export_next_feed(struct rt_export_feeder *f)
{
ASSERT_DIE(f);
struct rt_export_feed *feed = NULL;
if (f->domain.rtable)
{
LOCK_DOMAIN(rtable, f->domain);
feed = rt_export_get_next_feed(f, NULL);
UNLOCK_DOMAIN(rtable, f->domain);
}
else
{
RCU_ANCHOR(u);
feed = rt_export_get_next_feed(f, u);
}
if (feed)
return feed;
/* Feeding done */
struct rt_feeding_request *reverse = NULL;
while (f->feeding)
{
struct rt_feeding_request *rfr = f->feeding;
f->feeding = rfr->next;
rfr->next = reverse;
reverse = rfr;
}
/* Call the done hook in the same order as requests came in */
while (reverse)
{
struct rt_feeding_request *rfr = reverse;
reverse = rfr->next;
CALL(rfr->done, rfr);
}
f->feed_index = 0;
uint count = 0;
for (struct rt_feeding_request *rfr = f->feed_pending; rfr; rfr = rfr->next)
count++;
rtex_trace(f, D_STATES, "Feeding done, %u refeed request%s pending",
count, (count == 1) ? "" : "s");
if (!f->feed_pending)
return NULL;
f->feeding = f->feed_pending;
f->feed_pending = NULL;
return rt_export_next_feed(f);
}
static void
rt_feeding_request_default_done(struct rt_feeding_request *rfr)
{
mb_free(rfr);
}
void
rt_export_refeed_feeder(struct rt_export_feeder *f, struct rt_feeding_request *rfr)
{
if (!rfr)
return;
if (f->feeding)
{
rfr->next = f->feed_pending;
f->feed_pending = rfr;
}
else
{
rfr->next = NULL;
f->feeding = rfr;
}
}
void rt_export_refeed_request(struct rt_export_request *rer, struct rt_feeding_request *rfr)
{
if (!rfr)
{
rfr = mb_allocz(rer->pool, sizeof *rfr);
rfr->done = rt_feeding_request_default_done;
}
bmap_reset(&rer->feed_map, 4);
rt_export_refeed_feeder(&rer->feeder, rfr);
rt_export_change_state(rer, BIT32_ALL(TES_FEEDING, TES_PARTIAL, TES_READY), TES_PARTIAL);
if (rer->r.event)
ev_send(rer->r.target, rer->r.event);
}
void
rtex_export_subscribe(struct rt_exporter *e, struct rt_export_request *r)
{
rt_export_change_state(r, BIT32_ALL(TES_DOWN), TES_FEEDING);
ASSERT_DIE(r->pool);
rt_feeder_subscribe(e, &r->feeder);
lfjour_register(&e->journal, &r->r);
r->stats = (struct rt_export_stats) {};
r->last_state_change = current_time();
bmap_init(&r->seq_map, r->pool, 4);
bmap_init(&r->feed_map, r->pool, 4);
rt_export_refeed_request(r, NULL);
}
void
rtex_export_unsubscribe(struct rt_export_request *r)
{
rt_feeder_unsubscribe(&r->feeder);
if (r->cur)
rt_export_release(r->cur);
switch (rt_export_change_state(r, BIT32_ALL(TES_FEEDING, TES_PARTIAL, TES_READY, TES_STOP), TES_DOWN))
{
case TES_FEEDING:
case TES_PARTIAL:
case TES_READY:
case TES_STOP:
lfjour_unregister(&r->r);
break;
default:
bug("not implemented");
}
bmap_free(&r->feed_map);
bmap_free(&r->seq_map);
}
static void
rt_exporter_cleanup_done(struct lfjour *j, u64 begin_seq UNUSED, u64 end_seq)
{
SKIP_BACK_DECLARE(struct rt_exporter, e, journal, j);
/* TODO: log the begin_seq / end_seq values */
CALL(e->cleanup_done, e, end_seq);
if (e->stopped && (lfjour_count_recipients(j) == 0))
{
settle_cancel(&j->announce_timer);
ev_postpone(&j->cleanup_event);
e->stopped(e);
}
}
void
rt_exporter_init(struct rt_exporter *e, struct settle_config *scf)
{
rtex_trace(e, D_STATES, "Exporter init");
e->journal.cleanup_done = rt_exporter_cleanup_done;
lfjour_init(&e->journal, scf);
ASSERT_DIE(e->feed_net);
ASSERT_DIE(e->netindex);
}
struct rt_export_item *
rt_exporter_push(struct rt_exporter *e, const struct rt_export_item *uit)
{
/* Get the object */
struct lfjour_item *lit = lfjour_push_prepare(&e->journal);
if (!lit)
return NULL;
SKIP_BACK_DECLARE(struct rt_export_item, it, li, lit);
/* Copy the data, keeping the header */
memcpy(&it->data, &uit->data, e->journal.item_size - OFFSETOF(struct rt_export_item, data));
/* Commit the update */
rtex_trace(e, D_ROUTES, "Announcing change %lu at %N: %p (%u) -> %p (%u)",
lit->seq, (uit->new ?: uit->old)->net,
uit->old, uit->old ? uit->old->id : 0,
uit->new, uit->new ? uit->new->id : 0);
lfjour_push_commit(&e->journal);
/* Return the update pointer */
return it;
}
#define RTEX_FEEDERS_LOCK(e) \
while (atomic_exchange_explicit(&e->feeders_lock, 1, memory_order_acq_rel)) \
birdloop_yield(); \
CLEANUP(_rtex_feeders_unlock_) UNUSED struct rt_exporter *_rtex_feeders_locked_ = e;
static inline void _rtex_feeders_unlock_(struct rt_exporter **e)
{
ASSERT_DIE(atomic_exchange_explicit(&(*e)->feeders_lock, 0, memory_order_acq_rel));
}
void
rt_feeder_subscribe(struct rt_exporter *e, struct rt_export_feeder *f)
{
f->feed_index = 0;
atomic_store_explicit(&f->exporter, e, memory_order_relaxed);
f->domain = e->domain;
RTEX_FEEDERS_LOCK(e);
rt_export_feeder_add_tail(&e->feeders, f);
rtex_trace(f, D_STATES, "Subscribed to exporter %s", e->name);
}
static void
rt_feeder_do_unsubscribe(struct rt_export_feeder *f)
{
struct rt_exporter *e = atomic_exchange_explicit(&f->exporter, NULL, memory_order_acquire);
if (e)
{
RTEX_FEEDERS_LOCK(e);
rt_export_feeder_rem_node(&e->feeders, f);
rtex_trace(f, D_STATES, "Unsubscribed from exporter %s", e->name);
}
else
rtex_trace(f, D_STATES, "Already unsubscribed");
}
void
rt_feeder_unsubscribe(struct rt_export_feeder *f)
{
if (f->domain.rtable)
{
LOCK_DOMAIN(rtable, f->domain);
rt_feeder_do_unsubscribe(f);
UNLOCK_DOMAIN(rtable, f->domain);
}
else
{
RCU_ANCHOR(u);
rt_feeder_do_unsubscribe(f);
}
}
void
rt_exporter_shutdown(struct rt_exporter *e, void (*stopped)(struct rt_exporter *))
{
rtex_trace(e, D_STATES, "Exporter shutdown");
/* Last lock check before dropping the domain reference */
if (e->journal.domain)
ASSERT_DIE(DG_IS_LOCKED(e->journal.domain));
e->journal.domain = NULL;
/* We have to tell every receiver to stop */
bool done = 1;
WALK_TLIST(lfjour_recipient, r, &e->journal.recipients)
{
done = 0;
rt_export_change_state(
SKIP_BACK(struct rt_export_request, r, r),
BIT32_ALL(TES_FEEDING, TES_PARTIAL, TES_READY, TES_STOP),
TES_STOP);
}
/* We can drop feeders synchronously */
{
RTEX_FEEDERS_LOCK(e);
WALK_TLIST_DELSAFE(rt_export_feeder, f, &e->feeders)
{
ASSERT_DIE(atomic_exchange_explicit(&f->exporter, NULL, memory_order_acq_rel) == e);
rt_export_feeder_rem_node(&e->feeders, f);
}
}
/* Wait for feeders to finish */
synchronize_rcu();
/* The rest is done via the cleanup routine */
lfjour_do_cleanup_now(&e->journal);
if (done)
{
ev_postpone(&e->journal.cleanup_event);
settle_cancel(&e->journal.announce_timer);
CALL(stopped, e);
}
else
// e->stopped = stopped;
bug("not implemented yet");
}
void
rt_exporter_dump(struct dump_request *dreq, struct rt_exporter *e)
{
RDUMP("Feeders (max_feed_index=%u):\n",
atomic_load_explicit(&e->max_feed_index, memory_order_relaxed));
dreq->indent += 3;
WALK_TLIST(rt_export_feeder, f, &e->feeders)
RDUMP("%p (%s), index %u\n",
f, f->name, f->feed_index);
dreq->indent -= 3;
RDUMP("Journal:\n");
dreq->indent += 3;
lfjour_dump(dreq, &e->journal);
dreq->indent -= 3;
}