/* * BIRD -- Route Export Mechanisms * * (c) 2024 Maria Matejka * * Can be freely distributed and used under the terms of the GNU GPL. */ #include "nest/bird.h" #include "nest/route.h" #include "nest/protocol.h" #define rtex_trace(_req, _cat, msg, args...) do { \ if ((_req)->trace_routes & _cat) \ log(L_TRACE "%s: " msg, (_req)->name, ##args); \ } while (0) static inline enum rt_export_state rt_export_change_state(struct rt_export_request *r, u32 expected_mask, enum rt_export_state state) { r->last_state_change = current_time(); enum rt_export_state old = atomic_exchange_explicit(&r->export_state, state, memory_order_acq_rel); if (!((1 << old) & expected_mask)) bug("Unexpected export state change from %s to %s, expected mask %02x", rt_export_state_name(old), rt_export_state_name(state), expected_mask ); rtex_trace(r, D_STATES, "Export state changed from %s to %s", rt_export_state_name(old), rt_export_state_name(state)); return old; } const struct rt_export_union * rt_export_get(struct rt_export_request *r) { ASSERT_DIE(!r->cur); #define EXPORT_FOUND(_kind) do { \ struct rt_export_union *reu = tmp_alloc(sizeof *reu); \ *reu = (struct rt_export_union) { \ .kind = _kind, \ .req = r, \ .update = update, \ .feed = feed, \ }; \ return (r->cur = reu); \ } while (0) \ #define NOT_THIS_UPDATE do { \ lfjour_release(&r->r); \ return rt_export_get(r); \ } while (0) \ enum rt_export_state es = rt_export_get_state(r); switch (es) { case TES_DOWN: rtex_trace(r, (D_ROUTES|D_STATES), "Export is down"); return NULL; case TES_STOP: rtex_trace(r, (D_ROUTES|D_STATES), "Received stop event"); struct rt_export_union *reu = tmp_alloc(sizeof *reu); *reu = (struct rt_export_union) { .kind = RT_EXPORT_STOP, .req = r, }; return (r->cur = reu); case TES_PARTIAL: case TES_FEEDING: case TES_READY: break; case TES_MAX: bug("invalid export state"); } /* Process sequence number reset event */ if (lfjour_reset_seqno(&r->r)) bmap_reset(&r->seq_map, 4); /* Get a new update */ SKIP_BACK_DECLARE(struct rt_export_item, update, li, lfjour_get(&r->r)); SKIP_BACK_DECLARE(struct rt_exporter, e, journal, lfjour_of_recipient(&r->r)); struct rt_export_feed *feed = NULL; /* No update, try feed */ if (!update) if (es == TES_READY) { /* Fed up of feeding */ rtex_trace(r, D_ROUTES, "Export drained"); return NULL; } else if (feed = rt_export_next_feed(&r->feeder)) { /* Feeding more */ bmap_set(&r->feed_map, feed->ni->index); rtex_trace(r, D_ROUTES, "Feeding %N", feed->ni->addr); EXPORT_FOUND(RT_EXPORT_FEED); } else if (rt_export_get_state(r) == TES_DOWN) { /* Torn down inbetween */ rtex_trace(r, D_STATES, "Export ended itself"); return NULL; } else { /* No more food */ rt_export_change_state(r, BIT32_ALL(TES_FEEDING, TES_PARTIAL), TES_READY); rtex_trace(r, D_STATES, "Fed up"); CALL(r->fed, r); return NULL; } /* There actually is an update */ if (bmap_test(&r->seq_map, update->seq)) { /* But this update has been already processed, let's try another one */ rtex_trace(r, D_ROUTES, "Skipping an already processed update %lu", update->seq); NOT_THIS_UPDATE; } /* Is this update allowed by prefilter? */ const net_addr *n = (update->new ?: update->old)->net; struct netindex *ni = NET_TO_INDEX(n); if (!rt_prefilter_net(&r->feeder.prefilter, n)) { rtex_trace(r, D_ROUTES, "Not exporting %N due to prefilter", n); NOT_THIS_UPDATE; } if ((es != TES_READY) && rt_net_is_feeding(r, n)) { /* But this net shall get a feed first! */ rtex_trace(r, D_ROUTES, "Expediting %N feed due to pending update %lu", n, update->seq); RCU_ANCHOR(u); feed = e->feed_net(e, u, ni, update); bmap_set(&r->feed_map, ni->index); ASSERT_DIE(feed); EXPORT_FOUND(RT_EXPORT_FEED); } /* OK, now this actually is an update, thank you for your patience */ rtex_trace(r, D_ROUTES, "Updating %N, seq %lu", n, update->seq); EXPORT_FOUND(RT_EXPORT_UPDATE); #undef NOT_THIS_UPDATE #undef EXPORT_FOUND } void rt_export_release(const struct rt_export_union *u) { /* May be already released */ if (!u->req) return; struct rt_export_request *r = u->req; /* Must be crosslinked */ ASSERT_DIE(r->cur == u); r->cur = NULL; switch (u->kind) { case RT_EXPORT_FEED: for (uint i = 0; i < u->feed->count_exports; i++) bmap_set(&r->seq_map, u->feed->exports[i]); if (!u->update) break; /* fall through */ case RT_EXPORT_UPDATE: rtex_trace(r, D_ROUTES, "Export %lu released", u->update->seq); lfjour_release(&r->r); break; case RT_EXPORT_STOP: /* Checking that we have indeed stopped the exporter */ ASSERT_DIE(rt_export_get_state(r) == TES_DOWN); rtex_trace(r, D_ROUTES, "Export stopped"); break; default: bug("strange export kind"); } } void rt_export_processed(struct rt_export_request *r, u64 seq) { rtex_trace(r, D_ROUTES, "Marking export %lu as processed", seq); /* Check sequence number reset event */ if (lfjour_reset_seqno(&r->r)) bmap_reset(&r->seq_map, 4); ASSERT_DIE(!bmap_test(&r->seq_map, seq)); bmap_set(&r->seq_map, seq); } struct rt_export_feed * rt_export_next_feed(struct rt_export_feeder *f) { ASSERT_DIE(f); while (1) { RCU_ANCHOR(u); struct rt_exporter *e = atomic_load_explicit(&f->exporter, memory_order_acquire); if (!e) { rtex_trace(f, (D_ROUTES|D_STATES), "Exporter kicked us away"); return NULL; } struct netindex *ni = NULL; u32 mfi = atomic_load_explicit(&e->max_feed_index, memory_order_acquire); for (; !ni && f->feed_index < mfi; f->feed_index++) ni = net_resolve_index(e->netindex, e->net_type, f->feed_index); if (!ni) { f->feed_index = ~0; break; } if (!rt_prefilter_net(&f->prefilter, ni->addr)) { rtex_trace(f, D_ROUTES, "Not feeding %N due to prefilter", ni->addr); continue; } if (f->feeding && !rt_net_is_feeding_feeder(f, ni->addr)) { rtex_trace(f, D_ROUTES, "Not feeding %N, not requested", ni->addr); continue; } struct rt_export_feed *feed = e->feed_net(e, u, ni, NULL); if (feed) { rtex_trace(f, D_ROUTES, "Feeding %u routes for %N", feed->count_routes, ni->addr); return feed; } } /* Feeding done */ while (f->feeding) { struct rt_feeding_request *rfr = f->feeding; f->feeding = rfr->next; CALL(rfr->done, rfr); } f->feed_index = 0; if (f->feed_pending) { rtex_trace(f, D_STATES, "Feeding done, refeed request pending"); f->feeding = f->feed_pending; f->feed_pending = NULL; return rt_export_next_feed(f); } else { rtex_trace(f, D_STATES, "Feeding done (%u)", f->feed_index); return NULL; } } static void rt_feeding_request_default_done(struct rt_feeding_request *rfr) { mb_free(rfr); } void rt_export_refeed_feeder(struct rt_export_feeder *f, struct rt_feeding_request *rfr) { if (!rfr) return; rfr->next = f->feed_pending; f->feed_pending = rfr; } void rt_export_refeed_request(struct rt_export_request *rer, struct rt_feeding_request *rfr) { if (!rfr) { rfr = mb_allocz(rer->pool, sizeof *rfr); rfr->done = rt_feeding_request_default_done; } bmap_reset(&rer->feed_map, 4); rt_export_refeed_feeder(&rer->feeder, rfr); rt_export_change_state(rer, BIT32_ALL(TES_FEEDING, TES_PARTIAL, TES_READY), TES_PARTIAL); if (rer->r.event) ev_send(rer->r.target, rer->r.event); } void rtex_export_subscribe(struct rt_exporter *e, struct rt_export_request *r) { rt_export_change_state(r, BIT32_ALL(TES_DOWN), TES_FEEDING); ASSERT_DIE(r->pool); rt_feeder_subscribe(e, &r->feeder); lfjour_register(&e->journal, &r->r); r->stats = (struct rt_export_stats) {}; r->last_state_change = current_time(); bmap_init(&r->seq_map, r->pool, 4); bmap_init(&r->feed_map, r->pool, 4); rt_export_refeed_request(r, NULL); } void rtex_export_unsubscribe(struct rt_export_request *r) { rt_feeder_unsubscribe(&r->feeder); if (r->cur) rt_export_release(r->cur); switch (rt_export_change_state(r, BIT32_ALL(TES_FEEDING, TES_PARTIAL, TES_READY, TES_STOP), TES_DOWN)) { case TES_FEEDING: case TES_PARTIAL: case TES_READY: case TES_STOP: lfjour_unregister(&r->r); break; default: bug("not implemented"); } bmap_free(&r->feed_map); bmap_free(&r->seq_map); } static void rt_exporter_cleanup_done(struct lfjour *j, u64 begin_seq UNUSED, u64 end_seq) { SKIP_BACK_DECLARE(struct rt_exporter, e, journal, j); /* TODO: log the begin_seq / end_seq values */ CALL(e->cleanup_done, e, end_seq); if (e->stopped && (lfjour_count_recipients(j) == 0)) { settle_cancel(&j->announce_timer); ev_postpone(&j->cleanup_event); e->stopped(e); } } void rt_exporter_init(struct rt_exporter *e, struct settle_config *scf) { rtex_trace(e, D_STATES, "Exporter init"); e->journal.cleanup_done = rt_exporter_cleanup_done; lfjour_init(&e->journal, scf); ASSERT_DIE(e->feed_net); ASSERT_DIE(e->netindex); } struct rt_export_item * rt_exporter_push(struct rt_exporter *e, const struct rt_export_item *uit) { /* Get the object */ struct lfjour_item *lit = lfjour_push_prepare(&e->journal); if (!lit) return NULL; SKIP_BACK_DECLARE(struct rt_export_item, it, li, lit); /* Copy the data, keeping the header */ memcpy(&it->data, &uit->data, e->journal.item_size - OFFSETOF(struct rt_export_item, data)); /* Commit the update */ rtex_trace(e, D_ROUTES, "Announcing change %lu at %N: %p (%u) -> %p (%u)", lit->seq, (uit->new ?: uit->old)->net, uit->old, uit->old ? uit->old->id : 0, uit->new, uit->new ? uit->new->id : 0); lfjour_push_commit(&e->journal); /* Return the update pointer */ return it; } #define RTEX_FEEDERS_LOCK(e) \ while (atomic_exchange_explicit(&e->feeders_lock, 1, memory_order_acq_rel)) \ birdloop_yield(); \ CLEANUP(_rtex_feeders_unlock_) UNUSED struct rt_exporter *_rtex_feeders_locked_ = e; static inline void _rtex_feeders_unlock_(struct rt_exporter **e) { ASSERT_DIE(atomic_exchange_explicit(&(*e)->feeders_lock, 0, memory_order_acq_rel)); } void rt_feeder_subscribe(struct rt_exporter *e, struct rt_export_feeder *f) { f->feed_index = 0; atomic_store_explicit(&f->exporter, e, memory_order_relaxed); RTEX_FEEDERS_LOCK(e); rt_export_feeder_add_tail(&e->feeders, f); rtex_trace(f, D_STATES, "Subscribed to exporter %s", e->name); } void rt_feeder_unsubscribe(struct rt_export_feeder *f) { RCU_ANCHOR(a); struct rt_exporter *e = atomic_exchange_explicit(&f->exporter, NULL, memory_order_acquire); if (e) { RTEX_FEEDERS_LOCK(e); rt_export_feeder_rem_node(&e->feeders, f); rtex_trace(f, D_STATES, "Unsubscribed from exporter %s", e->name); } else rtex_trace(f, D_STATES, "Already unsubscribed"); } void rt_exporter_shutdown(struct rt_exporter *e, void (*stopped)(struct rt_exporter *)) { rtex_trace(e, D_STATES, "Exporter shutdown"); /* Last lock check before dropping the domain reference */ if (e->journal.domain) ASSERT_DIE(DG_IS_LOCKED(e->journal.domain)); e->journal.domain = NULL; /* We have to tell every receiver to stop */ _Bool done = 1; WALK_TLIST(lfjour_recipient, r, &e->journal.recipients) { done = 0; rt_export_change_state( SKIP_BACK(struct rt_export_request, r, r), BIT32_ALL(TES_FEEDING, TES_PARTIAL, TES_READY, TES_STOP), TES_STOP); } /* We can drop feeders synchronously */ { RTEX_FEEDERS_LOCK(e); WALK_TLIST_DELSAFE(rt_export_feeder, f, &e->feeders) { ASSERT_DIE(atomic_exchange_explicit(&f->exporter, NULL, memory_order_acq_rel) == e); rt_export_feeder_rem_node(&e->feeders, f); } } /* Wait for feeders to finish */ synchronize_rcu(); /* The rest is done via the cleanup routine */ lfjour_do_cleanup_now(&e->journal); if (done) { ev_postpone(&e->journal.cleanup_event); settle_cancel(&e->journal.announce_timer); CALL(stopped, e); } else // e->stopped = stopped; bug("not implemented yet"); }