diff --git a/conf/conf.h b/conf/conf.h index 8b2e2dea..50e01bf5 100644 --- a/conf/conf.h +++ b/conf/conf.h @@ -44,7 +44,7 @@ struct config { int cli_debug; /* Tracing of CLI connections and commands */ int latency_debug; /* I/O loop tracks duration of each event */ - int pipe_debug; /* Track route propagation through pipes */ + int table_debug; /* Track route propagation through tables */ u32 latency_limit; /* Events with longer duration are logged (us) */ u32 watchdog_warning; /* I/O loop watchdog limit for warning (us) */ u32 watchdog_timeout; /* Watchdog timeout (in seconds, 0 = disabled) */ diff --git a/lib/birdlib.h b/lib/birdlib.h index 9b6e4a16..25545fc3 100644 --- a/lib/birdlib.h +++ b/lib/birdlib.h @@ -178,8 +178,13 @@ void debug(const char *msg, ...); /* Printf to debug output */ #if defined(LOCAL_DEBUG) || defined(GLOBAL_DEBUG) #define DBG(x, y...) debug(x, ##y) +#define DBGL(x, y...) debug(x "\n", ##y) +#elif defined(DEBUG_TO_LOG) +#define DBG(...) do { } while (0) +#define DBGL(...) log(L_DEBUG __VA_ARGS__) #else -#define DBG(x, y...) do { } while(0) +#define DBG(...) do { } while(0) +#define DBGL(...) do { } while (0) #endif #define ASSERT_DIE(x) do { if (!(x)) bug("Assertion '%s' failed at %s:%d", #x, __FILE__, __LINE__); } while(0) diff --git a/lib/resource.h b/lib/resource.h index 4cedbf00..8bb264b1 100644 --- a/lib/resource.h +++ b/lib/resource.h @@ -115,6 +115,7 @@ void sl_free(void *); void buffer_realloc(void **buf, unsigned *size, unsigned need, unsigned item_size); /* Allocator of whole pages; for use in slabs and other high-level allocators. */ +#define PAGE_HEAD(x) ((void *) (((uintptr_t) (x)) & ~(page_size-1))) extern long page_size; void *alloc_page(void); void free_page(void *); diff --git a/lib/slab.c b/lib/slab.c index 38d10626..054daea1 100644 --- a/lib/slab.c +++ b/lib/slab.c @@ -197,7 +197,7 @@ static struct resclass sl_class = { slab_memsize }; -#define SL_GET_HEAD(x) ((struct sl_head *) (((uintptr_t) (x)) & ~(page_size-1))) +#define SL_GET_HEAD(x) PAGE_HEAD(x) #define SL_HEAD_CHANGE_STATE(_s, _h, _from, _to) ({ \ ASSERT_DIE(_h->state == slh_##_from); \ diff --git a/nest/config.Y b/nest/config.Y index 525829b3..e894f7c4 100644 --- a/nest/config.Y +++ b/nest/config.Y @@ -386,7 +386,7 @@ debug_default: DEBUG PROTOCOLS debug_mask { new_config->proto_default_debug = $3; } | DEBUG CHANNELS debug_mask { new_config->channel_default_debug = $3; } | DEBUG COMMANDS expr { new_config->cli_debug = $3; } - | DEBUG PIPE bool { new_config->pipe_debug = $3; } + | DEBUG TABLES bool { new_config->table_debug = $3; } ; /* MRTDUMP PROTOCOLS is in systep/unix/config.Y */ diff --git a/nest/proto.c b/nest/proto.c index 72e479d7..3a80ab0e 100644 --- a/nest/proto.c +++ b/nest/proto.c @@ -697,8 +697,6 @@ channel_do_stop(struct channel *c) CALL(c->channel->shutdown, c); - /* This have to be done in here, as channel pool is freed before channel_do_down() */ - c->out_table = NULL; } static void diff --git a/nest/rt-show.c b/nest/rt-show.c index 1dbfaec2..6dfb85f6 100644 --- a/nest/rt-show.c +++ b/nest/rt-show.c @@ -79,8 +79,8 @@ rt_show_rte(struct cli *c, byte *ia, rte *e, struct rt_show_data *d, int primary if (d->verbose) { ea_show_list(c, a); - cli_printf(c, -1008, "\tInternal route handling values: %uL %uG %uS", - e->src->private_id, e->src->global_id, e->stale_cycle); + cli_printf(c, -1008, "\tInternal route handling values: %uL %uG %uS id %u", + e->src->private_id, e->src->global_id, e->stale_cycle, e->id); } else if (dest == RTD_UNICAST) ea_show_nexthop_list(c, nhad); diff --git a/nest/rt-table.c b/nest/rt-table.c index 5e07c129..85a6faf7 100644 --- a/nest/rt-table.c +++ b/nest/rt-table.c @@ -110,6 +110,8 @@ #include "proto/bgp/bgp.h" #endif +#include + pool *rt_table_pool; static linpool *rte_update_pool; @@ -117,6 +119,16 @@ static linpool *rte_update_pool; list routing_tables; list deleted_routing_tables; +/* Data structures for export journal */ +#define RT_PENDING_EXPORT_ITEMS (page_size - sizeof(struct rt_export_block)) / sizeof(struct rt_pending_export) + +struct rt_export_block { + node n; + _Atomic u32 end; + _Atomic _Bool not_last; + struct rt_pending_export export[]; +}; + static void rt_free_hostcache(rtable *tab); static void rt_notify_hostcache(rtable *tab, net *net); static void rt_update_hostcache(rtable *tab); @@ -133,6 +145,14 @@ static void rt_feed_equal(void *); static void rt_feed_for(void *); static uint rt_feed_net(struct rt_export_hook *c, net *n); +static inline void rt_export_used(struct rt_exporter *); +static void rt_export_cleanup(rtable *tab); + +static inline void rte_update_lock(void); +static inline void rte_update_unlock(void); + +static int rte_same(rte *x, rte *y); + const char *rt_import_state_name_array[TIS_MAX] = { [TIS_DOWN] = "DOWN", [TIS_UP] = "UP", @@ -685,8 +705,9 @@ rte_mergable(rte *pri, rte *sec) static void rte_trace(const char *name, const rte *e, int dir, const char *msg) { - log(L_TRACE "%s %c %s %N %uL %uG %s", - name, dir, msg, e->net, e->src->private_id, e->src->global_id, + log(L_TRACE "%s %c %s %N src %uL %uG %uS id %u %s", + name, dir, msg, e->net, + e->src->private_id, e->src->global_id, e->stale_cycle, e->id, rta_dest_name(rte_dest(e))); } @@ -848,6 +869,16 @@ do_rt_notify(struct channel *c, const net_addr *net, rte *new, const rte *old) static void rt_notify_basic(struct channel *c, const net_addr *net, rte *new, rte *old) { + if (new && old && rte_same(new, old)) + { + if ((new->id != old->id) && bmap_test(&c->export_map, old->id)) + { + bmap_set(&c->export_map, new->id); + bmap_clear(&c->export_map, old->id); + } + return; + } + if (new) new = export_filter(c, new, 0); @@ -860,6 +891,16 @@ rt_notify_basic(struct channel *c, const net_addr *net, rte *new, rte *old) do_rt_notify(c, net, new, old); } +static void +channel_rpe_mark_seen(struct rt_export_request *req, struct rt_pending_export *rpe) +{ + struct channel *c = SKIP_BACK(struct channel, out_req, req); + + rpe_mark_seen(req->hook, rpe); + if (rpe->old) + bmap_clear(&c->export_reject_map, rpe->old->rte.id); +} + void rt_notify_accepted(struct rt_export_request *req, const net_addr *n, struct rt_pending_export *rpe, struct rte **feed, uint count) @@ -903,38 +944,30 @@ rt_notify_accepted(struct rt_export_request *req, const net_addr *n, struct rt_p } } +done: /* Check obsolete routes for previously exported */ - if (!old_best) - if (rpe && rpe->old && bmap_test(&c->export_map, rpe->old->rte.id)) - old_best = &rpe->old->rte; - -/* for (; rpe; rpe = atomic_load_explicit(&rpe->next, memory_order_relaxed)) + while (rpe) + { + channel_rpe_mark_seen(req, rpe); + if (rpe->old) { - if (rpe->old && bmap_test(&hook->accept_map, rpe->old->id)) + if (bmap_test(&c->export_map, rpe->old->rte.id)) { - old_best = &rpe->old.rte; - break; + ASSERT_DIE(old_best == NULL); + old_best = &rpe->old->rte; } - - if (rpe == rpe_last) - break; } - */ + rpe = rpe_next(rpe, NULL); + } /* Nothing to export */ if (!new_best && !old_best) { DBG("rt_notify_accepted: nothing to export\n"); - goto done; + return; } do_rt_notify(c, n, new_best, old_best); - -done: - /* Drop the old stored rejection if applicable. - * new->id == old->id happens when updating hostentries. */ - if (rpe && rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id))) - bmap_clear(&c->export_reject_map, rpe->old->rte.id); } rte * @@ -1026,33 +1059,25 @@ rt_notify_merged(struct rt_export_request *req, const net_addr *n, struct rt_pen } /* Check obsolete routes for previously exported */ - if (!old_best) - if (rpe && rpe->old && bmap_test(&c->export_map, rpe->old->rte.id)) - old_best = &rpe->old->rte; - -/* for (; rpe; rpe = atomic_load_explicit(&rpe->next, memory_order_relaxed)) + while (rpe) + { + channel_rpe_mark_seen(req, rpe); + if (rpe->old) { - if (rpe->old && bmap_test(&hook->accept_map, rpe->old->id)) + if (bmap_test(&c->export_map, rpe->old->rte.id)) { - old_best = &rpe->old.rte; - break; + ASSERT_DIE(old_best == NULL); + old_best = &rpe->old->rte; } - - if (rpe == rpe_last) - break; } - */ + rpe = rpe_next(rpe, NULL); + } /* Prepare new merged route */ rte *new_merged = count ? rt_export_merged(c, feed, count, rte_update_pool, 0) : NULL; if (new_merged || old_best) do_rt_notify(c, n, new_merged, old_best); - - /* Drop the old stored rejection if applicable. - * new->id == old->id happens when updating hostentries. */ - if (rpe && rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id))) - bmap_clear(&c->export_reject_map, rpe->old->rte.id); } void @@ -1060,19 +1085,19 @@ rt_notify_optimal(struct rt_export_request *req, const net_addr *net, struct rt_ { struct channel *c = SKIP_BACK(struct channel, out_req, req); - if (rpe->new_best != rpe->old_best) - { - rte n0 = RTE_COPY_VALID(rpe->new_best); - rte *o = RTE_VALID_OR_NULL(rpe->old_best); + rte *o = RTE_VALID_OR_NULL(rpe->old_best); + struct rte_storage *new_best = rpe->new_best; - if (n0.src || o) - rt_notify_basic(c, net, n0.src ? &n0 : NULL, o); + while (rpe) + { + channel_rpe_mark_seen(req, rpe); + new_best = rpe->new_best; + rpe = rpe_next(rpe, NULL); } - /* Drop the old stored rejection if applicable. - * new->id == old->id happens when updating hostentries. */ - if (rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id))) - bmap_clear(&c->export_reject_map, rpe->old->rte.id); + rte n0 = RTE_COPY_VALID(new_best); + if (n0.src || o) + rt_notify_basic(c, net, n0.src ? &n0 : NULL, o); } void @@ -1080,18 +1105,28 @@ rt_notify_any(struct rt_export_request *req, const net_addr *net, struct rt_pend { struct channel *c = SKIP_BACK(struct channel, out_req, req); - if (rpe->new != rpe->old) + rte *n = RTE_VALID_OR_NULL(rpe->new); + rte *o = RTE_VALID_OR_NULL(rpe->old); + + if (!n && !o) { - rte n0 = RTE_COPY_VALID(rpe->new); - rte *o = RTE_VALID_OR_NULL(rpe->old); - if (n0.src || o) - rt_notify_basic(c, net, n0.src ? &n0 : NULL, o); + channel_rpe_mark_seen(req, rpe); + return; } - /* Drop the old stored rejection if applicable. - * new->id == old->id happens when updating hostentries. */ - if (rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id))) - bmap_clear(&c->export_reject_map, rpe->old->rte.id); + struct rte_src *src = n ? n->src : o->src; + struct rte_storage *new_latest = rpe->new; + + while (rpe) + { + channel_rpe_mark_seen(req, rpe); + new_latest = rpe->new; + rpe = rpe_next(rpe, src); + } + + rte n0 = RTE_COPY_VALID(new_latest); + if (n0.src || o) + rt_notify_basic(c, net, n0.src ? &n0 : NULL, o); } void @@ -1107,6 +1142,98 @@ rt_feed_any(struct rt_export_request *req, const net_addr *net, struct rt_pendin } } +void +rpe_mark_seen(struct rt_export_hook *hook, struct rt_pending_export *rpe) +{ + bmap_set(&hook->seq_map, rpe->seq); +} + +struct rt_pending_export * +rpe_next(struct rt_pending_export *rpe, struct rte_src *src) +{ + struct rt_pending_export *next = atomic_load_explicit(&rpe->next, memory_order_acquire); + + if (!next) + return NULL; + + if (!src) + return next; + + while (rpe = next) + if (src == (rpe->new ? rpe->new->rte.src : rpe->old->rte.src)) + return rpe; + else + next = atomic_load_explicit(&rpe->next, memory_order_acquire); + + return NULL; +} + +static struct rt_pending_export * rt_next_export_fast(struct rt_pending_export *last); +static void +rte_export(struct rt_export_hook *hook, struct rt_pending_export *rpe) +{ + if (bmap_test(&hook->seq_map, rpe->seq)) + goto ignore; /* Seen already */ + + const net_addr *n = rpe->new_best ? rpe->new_best->rte.net : rpe->old_best->rte.net; + + switch (hook->req->addr_mode) + { + case TE_ADDR_NONE: + break; + + case TE_ADDR_IN: + if (!net_in_netX(n, hook->req->addr)) + goto ignore; + break; + + case TE_ADDR_EQUAL: + if (!net_equal(n, hook->req->addr)) + goto ignore; + break; + + case TE_ADDR_FOR: + bug("Continuos export of best prefix match not implemented yet."); + + default: + bug("Strange table export address mode: %d", hook->req->addr_mode); + } + + if (rpe->new) + hook->stats.updates_received++; + else + hook->stats.withdraws_received++; + + if (hook->req->export_one) + hook->req->export_one(hook->req, n, rpe); + else if (hook->req->export_bulk) + { + net *net = SKIP_BACK(struct network, n.addr, (net_addr (*)[0]) n); + uint count = rte_feed_count(net); + rte **feed = NULL; + if (count) + { + feed = alloca(count * sizeof(rte *)); + rte_feed_obtain(net, feed, count); + } + hook->req->export_bulk(hook->req, n, rpe, feed, count); + } + else + bug("Export request must always provide an export method"); + +ignore: + /* Get the next export if exists */ + hook->rpe_next = rt_next_export_fast(rpe); + + /* The last block may be available to free */ + if (PAGE_HEAD(hook->rpe_next) != PAGE_HEAD(rpe)) + CALL(hook->table->used, hook->table); + + /* Releasing this export for cleanup routine */ + DBG("store hook=%p last_export=%p seq=%lu\n", hook, rpe, rpe->seq); + atomic_store_explicit(&hook->last_export, rpe, memory_order_release); +} + /** * rte_announce - announce a routing table change * @tab: table the route has been added to @@ -1145,7 +1272,7 @@ rte_announce(rtable *tab, net *net, struct rte_storage *new, struct rte_storage int new_best_valid = rte_is_valid(RTE_OR_NULL(new_best)); int old_best_valid = rte_is_valid(RTE_OR_NULL(old_best)); - if (!new && !old && !new_best && !old_best) + if ((new == old) && (new_best == old_best)) return; if (new_best_valid || old_best_valid) @@ -1164,57 +1291,198 @@ rte_announce(rtable *tab, net *net, struct rte_storage *new, struct rte_storage rt_schedule_notify(tab); - struct rt_pending_export rpe = { .new = new, .old = old, .new_best = new_best, .old_best = old_best }; - uint count = rte_feed_count(net); - rte **feed = NULL; - if (count) + if (EMPTY_LIST(tab->exporter.hooks) && EMPTY_LIST(tab->exporter.pending)) { - feed = alloca(count * sizeof(rte *)); - rte_feed_obtain(net, feed, count); + /* No export hook and no pending exports to cleanup. We may free the route immediately. */ + if (!old) + return; + + hmap_clear(&tab->id_map, old->rte.id); + rte_free(old); + return; } - struct rt_export_hook *eh; - WALK_LIST(eh, tab->exporter.hooks) + /* Get the pending export structure */ + struct rt_export_block *rpeb = NULL, *rpebsnl = NULL; + u32 end = 0; + + if (!EMPTY_LIST(tab->exporter.pending)) { - if (eh->export_state == TES_STOP) + rpeb = TAIL(tab->exporter.pending); + end = atomic_load_explicit(&rpeb->end, memory_order_relaxed); + if (end >= RT_PENDING_EXPORT_ITEMS) + { + ASSERT_DIE(end == RT_PENDING_EXPORT_ITEMS); + rpebsnl = rpeb; + + rpeb = NULL; + end = 0; + } + } + + if (!rpeb) + { + rpeb = alloc_page(); + *rpeb = (struct rt_export_block) {}; + add_tail(&tab->exporter.pending, &rpeb->n); + } + + /* Fill the pending export */ + struct rt_pending_export *rpe = &rpeb->export[rpeb->end]; + *rpe = (struct rt_pending_export) { + .new = new, + .new_best = new_best, + .old = old, + .old_best = old_best, + .seq = tab->exporter.next_seq++, + }; + + DBGL("rte_announce: table=%s net=%N new=%p id %u from %s old=%p id %u from %s new_best=%p id %u old_best=%p id %u seq=%lu", + tab->name, net->n.addr, + new, new ? new->rte.id : 0, new ? new->rte.sender->req->name : NULL, + old, old ? old->rte.id : 0, old ? old->rte.sender->req->name : NULL, + new_best, old_best, rpe->seq); + + ASSERT_DIE(atomic_fetch_add_explicit(&rpeb->end, 1, memory_order_release) == end); + + if (rpebsnl) + { + _Bool f = 0; + ASSERT_DIE(atomic_compare_exchange_strong_explicit(&rpebsnl->not_last, &f, 1, + memory_order_release, memory_order_relaxed)); + } + + /* Append to the same-network squasher list */ + if (net->last) + { + struct rt_pending_export *rpenull = NULL; + ASSERT_DIE(atomic_compare_exchange_strong_explicit( + &net->last->next, &rpenull, rpe, + memory_order_relaxed, + memory_order_relaxed)); + + } + + net->last = rpe; + + if (!net->first) + net->first = rpe; + + if (tab->exporter.first == NULL) + tab->exporter.first = rpe; + + if (!tm_active(tab->exporter.export_timer)) + tm_start(tab->exporter.export_timer, tab->config->export_settle_time); +} + +static struct rt_pending_export * +rt_next_export_fast(struct rt_pending_export *last) +{ + /* Get the whole export block and find our position in there. */ + struct rt_export_block *rpeb = PAGE_HEAD(last); + u32 pos = (last - &rpeb->export[0]); + u32 end = atomic_load_explicit(&rpeb->end, memory_order_acquire); + ASSERT_DIE(pos < end); + + /* Next is in the same block. */ + if (++pos < end) + return &rpeb->export[pos]; + + /* There is another block. */ + if (atomic_load_explicit(&rpeb->not_last, memory_order_acquire)) + { + /* This is OK to do non-atomically because of the not_last flag. */ + rpeb = NODE_NEXT(rpeb); + return &rpeb->export[0]; + } + + /* There is nothing more. */ + return NULL; +} + +static struct rt_pending_export * +rt_next_export(struct rt_export_hook *hook, struct rt_exporter *tab) +{ + /* As the table is locked, it is safe to reload the last export pointer */ + struct rt_pending_export *last = atomic_load_explicit(&hook->last_export, memory_order_acquire); + + /* It is still valid, let's reuse it */ + if (last) + return rt_next_export_fast(last); + + /* No, therefore we must process the table's first pending export */ + else + return tab->first; +} + +static void +rt_announce_exports(timer *tm) +{ + rtable *tab = tm->data; + + struct rt_export_hook *c; node *n; + WALK_LIST2(c, n, tab->exporter.hooks, n) + { + if (atomic_load_explicit(&c->export_state, memory_order_acquire) != TES_READY) continue; - switch (eh->req->addr_mode) - { - case TE_ADDR_NONE: - break; - - case TE_ADDR_IN: - if (!net_in_netX(net->n.addr, eh->req->addr)) - continue; - break; - - case TE_ADDR_EQUAL: - if (!net_equal(net->n.addr, eh->req->addr)) - continue; - break; - - case TE_ADDR_FOR: - bug("Continuos export of best prefix match not implemented yet."); - - default: - bug("Strange table export address mode: %d", eh->req->addr_mode); - } - - if (new) - eh->stats.updates_received++; - else - eh->stats.withdraws_received++; - - if (eh->req->export_one) - eh->req->export_one(eh->req, net->n.addr, &rpe); - else if (eh->req->export_bulk) - eh->req->export_bulk(eh->req, net->n.addr, &rpe, feed, count); - else - bug("Export request must always provide an export method"); + ev_schedule_work(c->event); } } +static struct rt_pending_export * +rt_last_export(struct rt_exporter *tab) +{ + struct rt_pending_export *rpe = NULL; + + if (!EMPTY_LIST(tab->pending)) + { + /* We'll continue processing exports from this export on */ + struct rt_export_block *reb = TAIL(tab->pending); + ASSERT_DIE(reb->end); + rpe = &reb->export[reb->end - 1]; + } + + return rpe; +} + +#define RT_EXPORT_BULK 1024 + +static void +rt_export_hook(void *_data) +{ + struct rt_export_hook *c = _data; + + ASSERT_DIE(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_READY); + + if (!c->rpe_next) + { + c->rpe_next = rt_next_export(c, c->table); + + if (!c->rpe_next) + { + CALL(c->table->used, c->table); + return; + } + } + + /* Process the export */ + for (uint i=0; irpe_next); + + if (!c->rpe_next) + break; + + rte_update_unlock(); + } + + ev_schedule_work(c->event); +} + + static inline int rte_validate(struct channel *ch, rte *e) { @@ -1459,14 +1727,8 @@ rte_recalculate(struct rt_import_hook *c, net *net, rte *new, struct rte_src *sr if (new_stored) { new_stored->rte.lastmod = current_time(); - - if (!old) - { - new_stored->rte.id = hmap_first_zero(&table->id_map); - hmap_set(&table->id_map, new_stored->rte.id); - } - else - new_stored->rte.id = old->id; + new_stored->rte.id = hmap_first_zero(&table->id_map); + hmap_set(&table->id_map, new_stored->rte.id); } /* Log the route change */ @@ -1498,13 +1760,6 @@ rte_recalculate(struct rt_import_hook *c, net *net, rte *new, struct rte_src *sr p->rte_insert(net, &new_stored->rte); #endif - if (old) - { - if (!new_stored) - hmap_clear(&table->id_map, old->id); - - rte_free(old_stored); - } } static int rte_update_nest_cnt; /* Nesting counter to allow recursive updates */ @@ -1711,16 +1966,19 @@ rt_export_stopped(void *data) struct rt_export_hook *hook = data; struct rt_exporter *tab = hook->table; + /* Drop pending exports */ + CALL(tab->used, tab); + /* Unlist */ rem_node(&hook->n); - /* Reporting the channel as stopped. */ + /* Report the channel as stopped. */ hook->stopped(hook->req); /* Reporting the hook as finished. */ CALL(tab->done, hook); - /* Freeing the hook together with its coroutine. */ + /* Free the hook together with its coroutine. */ rfree(hook->pool); } @@ -1738,7 +1996,7 @@ void rt_set_export_state(struct rt_export_hook *hook, u8 state) { hook->last_state_change = current_time(); - hook->export_state = state; + atomic_store_explicit(&hook->export_state, state, memory_order_release); if (hook->req->log_state_change) hook->req->log_state_change(hook->req, state); @@ -1784,7 +2042,6 @@ rt_table_export_start(struct rt_exporter *re, struct rt_export_request *req) pool *p = rp_new(tab->rp, "Export hook"); struct rt_export_hook *hook = mb_allocz(p, sizeof(struct rt_export_hook)); hook->pool = p; - hook->lp = lp_new_default(p); /* stats zeroed by mb_allocz */ switch (req->addr_mode) @@ -1829,6 +2086,12 @@ rt_request_export(struct rt_exporter *re, struct rt_export_request *req) hook->req = req; hook->table = re; + bmap_init(&hook->seq_map, hook->pool, 1024); + + struct rt_pending_export *rpe = rt_last_export(hook->table); + DBG("store hook=%p last_export=%p seq=%lu\n", hook, rpe, rpe ? rpe->seq : 0); + atomic_store_explicit(&hook->last_export, rpe, memory_order_relaxed); + hook->n = (node) {}; add_tail(&re->hooks, &hook->n); @@ -1842,7 +2105,7 @@ rt_table_export_stop(struct rt_export_hook *hook) { rtable *tab = SKIP_BACK(rtable, exporter, hook->table); - if (hook->export_state != TES_FEEDING) + if (atomic_load_explicit(&hook->export_state, memory_order_relaxed) != TES_FEEDING) return; switch (hook->req->addr_mode) @@ -2037,8 +2300,8 @@ rt_dump_hooks(rtable *tab) { eh->req->dump_req(eh->req); debug(" Export hook %p requested by %p:" - " refeed_pending=%u last_state_change=%t export_state=%u stopped=%p\n", - eh, eh->req, eh->refeed_pending, eh->last_state_change, eh->export_state, eh->stopped); + " refeed_pending=%u last_state_change=%t export_state=%u\n", + eh, eh->req, eh->refeed_pending, eh->last_state_change, atomic_load_explicit(&eh->export_state, memory_order_relaxed)); } debug("\n"); } @@ -2091,6 +2354,20 @@ rt_schedule_prune(rtable *tab) tab->prune_state |= 1; } +static void +rt_export_used(struct rt_exporter *e) +{ + rtable *tab = SKIP_BACK(rtable, exporter, e); + + if (config->table_debug) + log(L_TRACE "%s: Export cleanup requested", tab->name); + + if (tab->export_used) + return; + + tab->export_used = 1; + ev_schedule(tab->rt_event); +} static void rt_event(void *ptr) @@ -2099,6 +2376,9 @@ rt_event(void *ptr) rt_lock_table(tab); + if (tab->export_used) + rt_export_cleanup(tab); + if (tab->hcu_scheduled) rt_update_hostcache(tab); @@ -2362,8 +2642,11 @@ rt_setup(pool *pp, struct rtable_config *cf) .start = rt_table_export_start, .stop = rt_table_export_stop, .done = rt_table_export_done, + .used = rt_export_used, }; + init_list(&t->exporter.hooks); + init_list(&t->exporter.pending); init_list(&t->imports); @@ -2374,7 +2657,9 @@ rt_setup(pool *pp, struct rtable_config *cf) t->rt_event = ev_new_init(p, rt_event, t); t->prune_timer = tm_new_init(p, rt_prune_timer, t, 0, 0); + t->exporter.export_timer = tm_new_init(p, rt_announce_exports, t, 0, 0); t->last_rt_change = t->gc_time = current_time(); + t->exporter.next_seq = 1; t->rl_pipe = (struct tbf) TBF_DEFAULT_LOG_LIMITS; @@ -2488,7 +2773,7 @@ again: } } - if (!n->routes) /* Orphaned FIB entry */ + if (!n->routes && !n->first) /* Orphaned FIB entry */ { FIB_ITERATE_PUT(fit); fib_delete(&tab->fib, n); @@ -2543,15 +2828,15 @@ again: rt_prune_sources(); + uint flushed_channels = 0; + /* Close flushed channels */ WALK_LIST2_DELSAFE(ih, n, x, tab->imports, n) if (ih->import_state == TIS_FLUSHING) { - rt_set_import_state(ih, TIS_CLEARED); - ih->stopped(ih->req); - rem_node(&ih->n); - mb_free(ih); - rt_unlock_table(tab); + ih->flush_seq = tab->exporter.next_seq; + rt_set_import_state(ih, TIS_WAITING); + flushed_channels++; } else if (ih->stale_pruning != ih->stale_pruned) { @@ -2559,6 +2844,184 @@ again: if (ih->req->trace_routes & D_STATES) log(L_TRACE "%s: table prune after refresh end [%u]", ih->req->name, ih->stale_pruned); } + + /* In some cases, we may want to directly proceed to export cleanup */ + if (EMPTY_LIST(tab->exporter.hooks) && flushed_channels) + rt_export_cleanup(tab); +} + +static void +rt_export_cleanup(rtable *tab) +{ + tab->export_used = 0; + + u64 min_seq = ~((u64) 0); + struct rt_pending_export *last_export_to_free = NULL; + struct rt_pending_export *first = tab->exporter.first; + + struct rt_export_hook *eh; + node *n; + WALK_LIST2(eh, n, tab->exporter.hooks, n) + { + switch (atomic_load_explicit(&eh->export_state, memory_order_acquire)) + { + case TES_DOWN: + continue; + + case TES_READY: + { + struct rt_pending_export *last = atomic_load_explicit(&eh->last_export, memory_order_acquire); + if (!last) + /* No last export means that the channel has exported nothing since last cleanup */ + goto done; + + else if (min_seq > last->seq) + { + min_seq = last->seq; + last_export_to_free = last; + } + continue; + } + + default: + /* It's only safe to cleanup when the export state is idle or regular. No feeding or stopping allowed. */ + goto done; + } + } + + tab->exporter.first = last_export_to_free ? rt_next_export_fast(last_export_to_free) : NULL; + + if (config->table_debug) + log(L_TRACE "%s: Export cleanup, old exporter.first seq %lu, new %lu, min_seq %ld", + tab->name, + first ? first->seq : 0, + tab->exporter.first ? tab->exporter.first->seq : 0, + min_seq); + + WALK_LIST2(eh, n, tab->exporter.hooks, n) + { + if (atomic_load_explicit(&eh->export_state, memory_order_acquire) != TES_READY) + continue; + + struct rt_pending_export *last = atomic_load_explicit(&eh->last_export, memory_order_acquire); + if (last == last_export_to_free) + { + /* This may fail when the channel managed to export more inbetween. This is OK. */ + atomic_compare_exchange_strong_explicit( + &eh->last_export, &last, NULL, + memory_order_release, + memory_order_relaxed); + + DBG("store hook=%p last_export=NULL\n", eh); + } + } + + while (first && (first->seq <= min_seq)) + { + ASSERT_DIE(first->new || first->old); + + const net_addr *n = first->new ? + first->new->rte.net : + first->old->rte.net; + net *net = SKIP_BACK(struct network, n.addr, (net_addr (*)[0]) n); + + ASSERT_DIE(net->first == first); + + if (first == net->last) + /* The only export here */ + net->last = net->first = NULL; + else + /* First is now the next one */ + net->first = atomic_load_explicit(&first->next, memory_order_relaxed); + + /* For now, the old route may be finally freed */ + if (first->old) + { + rt_rte_trace_in(D_ROUTES, first->old->rte.sender->req, &first->old->rte, "freed"); + hmap_clear(&tab->id_map, first->old->rte.id); + rte_free(first->old); + } + +#ifdef LOCAL_DEBUG + memset(first, 0xbd, sizeof(struct rt_pending_export)); +#endif + + struct rt_export_block *reb = HEAD(tab->exporter.pending); + ASSERT_DIE(reb == PAGE_HEAD(first)); + + u32 pos = (first - &reb->export[0]); + u32 end = atomic_load_explicit(&reb->end, memory_order_relaxed); + ASSERT_DIE(pos < end); + + struct rt_pending_export *next = NULL; + + if (++pos < end) + next = &reb->export[pos]; + else + { + rem_node(&reb->n); + +#ifdef LOCAL_DEBUG + memset(reb, 0xbe, page_size); +#endif + + free_page(reb); + + if (EMPTY_LIST(tab->exporter.pending)) + { + if (config->table_debug) + log(L_TRACE "%s: Resetting export seq", tab->name); + + node *n; + WALK_LIST2(eh, n, tab->exporter.hooks, n) + { + if (atomic_load_explicit(&eh->export_state, memory_order_acquire) != TES_READY) + continue; + + ASSERT_DIE(atomic_load_explicit(&eh->last_export, memory_order_acquire) == NULL); + bmap_reset(&eh->seq_map, 1024); + } + + tab->exporter.next_seq = 1; + } + else + { + reb = HEAD(tab->exporter.pending); + next = &reb->export[0]; + } + } + + first = next; + } + +done:; + struct rt_import_hook *ih; node *x; + _Bool imports_stopped = 0; + WALK_LIST2_DELSAFE(ih, n, x, tab->imports, n) + if (ih->import_state == TIS_WAITING) + if (!first || (first->seq >= ih->flush_seq)) + { + ih->import_state = TIS_CLEARED; + ih->stopped(ih->req); + rem_node(&ih->n); + mb_free(ih); + rt_unlock_table(tab); + imports_stopped = 1; + } + + if (tab->export_used) + ev_schedule(tab->rt_event); + + if (imports_stopped) + { + if (config->table_debug) + log(L_TRACE "%s: Sources pruning routine requested", tab->name); + + rt_prune_sources(); + } + + if (EMPTY_LIST(tab->exporter.pending) && tm_active(tab->exporter.export_timer)) + tm_stop(tab->exporter.export_timer); } /** @@ -3040,6 +3503,11 @@ rt_next_hop_update_net(rtable *tab, net *n) /* Replace the route in the list */ new->next = e->next; *k = e = new; + + /* Get a new ID for the route */ + new->rte.lastmod = current_time(); + new->rte.id = hmap_first_zero(&tab->id_map); + hmap_set(&tab->id_map, new->rte.id); } ASSERT_DIE(pos <= count); @@ -3066,14 +3534,14 @@ rt_next_hop_update_net(rtable *tab, net *n) for (int i=0; irte.sender->req, &updates[i].new->rte, best_indicator[nb][ob]); rte_announce_i(tab, n, updates[i].new, updates[i].old, new, old_best); } - for (int i=0; ievent->hook = rt_export_hook; + + rt_set_export_state(c, TES_READY); + + ev_schedule_work(c->event); +} + /** * rt_feed_by_fib - advertise all routes to a channel by walking a fib * @c: channel to be fed @@ -3269,7 +3747,7 @@ rt_feed_by_fib(void *data) struct fib_iterator *fit = &c->feed_fit; int max_feed = 256; - ASSERT(c->export_state == TES_FEEDING); + ASSERT(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_FEEDING); rtable *tab = SKIP_BACK(rtable, exporter, c->table); @@ -3282,14 +3760,15 @@ rt_feed_by_fib(void *data) return; } - ASSERT(c->export_state == TES_FEEDING); + if (atomic_load_explicit(&c->export_state, memory_order_acquire) != TES_FEEDING) + return; if ((c->req->addr_mode == TE_ADDR_NONE) || net_in_netX(n->n.addr, c->req->addr)) max_feed -= rt_feed_net(c, n); } FIB_ITERATE_END; - rt_set_export_state(c, TES_READY); + rt_feed_done(c); } static void @@ -3303,7 +3782,7 @@ rt_feed_by_trie(void *data) int max_feed = 256; - ASSERT_DIE(c->export_state == TES_FEEDING); + ASSERT(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_FEEDING); net_addr addr; while (trie_walk_next(ws, &addr)) @@ -3315,7 +3794,8 @@ rt_feed_by_trie(void *data) if ((max_feed -= rt_feed_net(c, n)) <= 0) return; - ASSERT_DIE(c->export_state == TES_FEEDING); + if (atomic_load_explicit(&c->export_state, memory_order_acquire) != TES_FEEDING) + return; } rt_unlock_trie(tab, c->walk_lock); @@ -3324,7 +3804,7 @@ rt_feed_by_trie(void *data) mb_free(c->walk_state); c->walk_state = NULL; - rt_set_export_state(c, TES_READY); + rt_feed_done(c); } static void @@ -3333,14 +3813,14 @@ rt_feed_equal(void *data) struct rt_export_hook *c = data; rtable *tab = SKIP_BACK(rtable, exporter, c->table); - ASSERT_DIE(c->export_state == TES_FEEDING); + ASSERT_DIE(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_FEEDING); ASSERT_DIE(c->req->addr_mode == TE_ADDR_EQUAL); net *n = net_find(tab, c->req->addr); if (n) rt_feed_net(c, n); - rt_set_export_state(c, TES_READY); + rt_feed_done(c); } static void @@ -3349,52 +3829,53 @@ rt_feed_for(void *data) struct rt_export_hook *c = data; rtable *tab = SKIP_BACK(rtable, exporter, c->table); - ASSERT_DIE(c->export_state == TES_FEEDING); + ASSERT_DIE(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_FEEDING); ASSERT_DIE(c->req->addr_mode == TE_ADDR_FOR); net *n = net_route(tab, c->req->addr); if (n) rt_feed_net(c, n); - rt_set_export_state(c, TES_READY); + rt_feed_done(c); } static uint rt_feed_net(struct rt_export_hook *c, net *n) { - if (c->req->export_bulk) - { - uint count = rte_feed_count(n); - if (count) - { - rte_update_lock(); - rte **feed = alloca(count * sizeof(rte *)); - rte_feed_obtain(n, feed, count); - struct rt_pending_export rpe = { .new_best = n->routes }; - c->req->export_bulk(c->req, n->n.addr, &rpe, feed, count); - rte_update_unlock(); - } - return count; - } + uint count = 0; - if (n->routes && rte_is_valid(&n->routes->rte)) - { - rte_update_lock(); - struct rt_pending_export rpe = { .new = n->routes, .new_best = n->routes }; - c->req->export_one(c->req, n->n.addr, &rpe); - rte_update_unlock(); - return 1; - } + if (c->req->export_bulk) + { + count = rte_feed_count(n); + if (count) + { + rte_update_lock(); + rte **feed = alloca(count * sizeof(rte *)); + rte_feed_obtain(n, feed, count); + c->req->export_bulk(c->req, n->n.addr, NULL, feed, count); + rte_update_unlock(); + } + } - return 0; + else if (n->routes) + { + rte_update_lock(); + struct rt_pending_export rpe = { .new = n->routes, .new_best = n->routes }; + c->req->export_one(c->req, n->n.addr, &rpe); + rte_update_unlock(); + count = 1; + } + + for (struct rt_pending_export *rpe = n->first; rpe; rpe = rpe_next(rpe, NULL)) + rpe_mark_seen(c, rpe); + + return count; } - /* * Import table */ - void channel_reload_export_bulk(struct rt_export_request *req, const net_addr *net, struct rt_pending_export *rpe UNUSED, rte **feed, uint count) { struct channel *c = SKIP_BACK(struct channel, reload_req, req); diff --git a/nest/rt.h b/nest/rt.h index bdbea05b..20ed0ad0 100644 --- a/nest/rt.h +++ b/nest/rt.h @@ -18,6 +18,8 @@ #include "lib/fib.h" #include "lib/route.h" +#include + struct ea_list; struct protocol; struct proto; @@ -53,6 +55,7 @@ struct rtable_config { byte trie_used; /* Rtable has attached trie */ btime min_settle_time; /* Minimum settle time for notifications */ btime max_settle_time; /* Maximum settle time for notifications */ + btime export_settle_time; /* Delay before exports are announced */ }; struct rt_export_hook; @@ -61,9 +64,17 @@ struct rt_export_request; struct rt_exporter { list hooks; /* Registered route export hooks */ uint addr_type; /* Type of address data exported (NET_*) */ + struct rt_export_hook *(*start)(struct rt_exporter *, struct rt_export_request *); void (*stop)(struct rt_export_hook *); void (*done)(struct rt_export_hook *); + void (*used)(struct rt_exporter *); + + list pending; /* List of packed struct rt_pending_export */ + struct timer *export_timer; + + struct rt_pending_export *first; /* First export to announce */ + u64 next_seq; /* The next export will have this ID */ }; typedef struct rtable { @@ -98,6 +109,7 @@ typedef struct rtable { byte prune_trie; /* Prune prefix trie during next table prune */ byte hcu_scheduled; /* Hostcache update is scheduled */ byte nhu_state; /* Next Hop Update state */ + byte export_used; /* Pending Export pruning is scheduled */ struct fib_iterator prune_fit; /* Rtable prune FIB iterator */ struct fib_iterator nhu_fit; /* Next Hop Update FIB iterator */ struct f_trie *trie_new; /* New prefix trie defined during pruning */ @@ -132,7 +144,8 @@ struct rt_flowspec_link { #define NHU_DIRTY 3 typedef struct network { - struct rte_storage *routes; /* Available routes for this network */ + struct rte_storage *routes; /* Available routes for this network */ + struct rt_pending_export *first, *last; struct fib_node n; /* FIB flags reserved for kernel syncer */ } net; @@ -200,6 +213,7 @@ struct rt_import_hook { u32 withdraws_accepted; /* Number of route withdraws accepted and processed */ } stats; + u64 flush_seq; /* Table export seq when the channel announced flushing */ btime last_state_change; /* Time of last state transition */ u8 import_state; /* IS_* */ @@ -212,7 +226,9 @@ struct rt_import_hook { }; struct rt_pending_export { + struct rt_pending_export * _Atomic next; /* Next export for the same destination */ struct rte_storage *new, *new_best, *old, *old_best; + u64 seq; /* Sequential ID (table-local) of the pending export */ }; struct rt_export_request { @@ -241,7 +257,6 @@ struct rt_export_hook { struct rt_exporter *table; /* The connected table */ pool *pool; - linpool *lp; struct rt_export_request *req; /* The requestor */ @@ -260,10 +275,15 @@ struct rt_export_hook { u32 hash_iter; /* Iterator over hash */ }; + struct bmap seq_map; /* Keep track which exports were already procesed */ + + struct rt_pending_export * _Atomic last_export;/* Last export processed */ + struct rt_pending_export *rpe_next; /* Next pending export to process */ + btime last_state_change; /* Time of last state transition */ u8 refeed_pending; /* Refeeding and another refeed is scheduled */ - u8 export_state; /* Route export state (TES_*, see below) */ + _Atomic u8 export_state; /* Route export state (TES_*, see below) */ u8 feed_type; /* Which feeding method is used (TFT_*, see below) */ struct event *event; /* Event running all the export operations */ @@ -314,6 +334,15 @@ void rt_set_export_state(struct rt_export_hook *hook, u8 state); void rte_import(struct rt_import_request *req, const net_addr *net, rte *new, struct rte_src *src); +/* Get next rpe. If src is given, it must match. */ +struct rt_pending_export *rpe_next(struct rt_pending_export *rpe, struct rte_src *src); + +/* Mark the pending export processed */ +void rpe_mark_seen(struct rt_export_hook *hook, struct rt_pending_export *rpe); + +/* Get pending export seen status */ +int rpe_get_seen(struct rt_export_hook *hook, struct rt_pending_export *rpe); + /* Types of route announcement, also used as flags */ #define RA_UNDEF 0 /* Undefined RA type */ #define RA_OPTIMAL 1 /* Announcement of optimal route change */ diff --git a/proto/bgp/attrs.c b/proto/bgp/attrs.c index 883a9746..a7b1a7ed 100644 --- a/proto/bgp/attrs.c +++ b/proto/bgp/attrs.c @@ -1901,7 +1901,6 @@ bgp_out_table_export_start(struct rt_exporter *re, struct rt_export_request *req pool *p = rp_new(c->c.proto->pool, "Export hook"); struct rt_export_hook *hook = mb_allocz(p, sizeof(struct rt_export_hook)); hook->pool = p; - hook->lp = lp_new_default(p); hook->event = ev_new_init(p, bgp_out_table_feed, hook); hook->feed_type = TFT_HASH; @@ -1919,6 +1918,7 @@ bgp_setup_out_table(struct bgp_channel *c) }; init_list(&c->prefix_exporter.hooks); + init_list(&c->prefix_exporter.pending); c->c.out_table = &c->prefix_exporter; }