0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2024-12-31 22:21:54 +00:00

Route export is now asynchronous.

To allow for multithreaded execution, we need to break the import-export
chain and buffer the exports before actually processing them.
This commit is contained in:
Maria Matejka 2021-09-27 13:04:16 +02:00
parent f18968f52f
commit c70b3198dc
5 changed files with 667 additions and 141 deletions

View File

@ -45,7 +45,7 @@ struct config {
int cli_debug; /* Tracing of CLI connections and commands */
int latency_debug; /* I/O loop tracks duration of each event */
int pipe_debug; /* Track route propagation through pipes */
int table_debug; /* Track route propagation through tables */
u32 latency_limit; /* Events with longer duration are logged (us) */
u32 watchdog_warning; /* I/O loop watchdog limit for warning (us) */
u32 watchdog_timeout; /* Watchdog timeout (in seconds, 0 = disabled) */

View File

@ -348,7 +348,7 @@ debug_default:
DEBUG PROTOCOLS debug_mask { new_config->proto_default_debug = $3; }
| DEBUG CHANNELS debug_mask { new_config->channel_default_debug = $3; }
| DEBUG COMMANDS expr { new_config->cli_debug = $3; }
| DEBUG PIPE bool { new_config->pipe_debug = $3; }
| DEBUG TABLES bool { new_config->table_debug = $3; }
;
/* MRTDUMP PROTOCOLS is in systep/unix/config.Y */

View File

@ -649,7 +649,7 @@ channel_aux_import_stopped(struct rt_import_request *req)
static void
channel_aux_export_stopped(struct rt_export_request *req)
{
struct channel_aux_table *cat = SKIP_BACK(struct channel_aux_table, push, req);
struct channel_aux_table *cat = SKIP_BACK(struct channel_aux_table, get, req);
req->hook = NULL;
if (cat->refeed_pending && !cat->stop)
@ -669,7 +669,6 @@ channel_aux_stop(struct channel_aux_table *cat)
rt_stop_import(&cat->push, channel_aux_import_stopped);
rt_stop_export(&cat->get, channel_aux_export_stopped);
rt_lock_table(cat->tab);
cat->tab->deleted = channel_aux_stopped;
cat->tab->del_data = cat;
rt_unlock_table(cat->tab);
@ -714,17 +713,51 @@ channel_get_log_state_change(struct rt_export_request *req, u8 state)
void rte_update_direct(struct channel *c, const net_addr *n, rte *new, struct rte_src *src);
static int
channel_aux_export_one_any(struct rt_export_request *req, struct rt_pending_export *rpe, rte **new, rte **old)
{
struct rte_src *src = rpe->new ? rpe->new->rte.src : rpe->old->rte.src;
*old = RTES_OR_NULL(rpe->old);
struct rte_storage *new_stored;
while (rpe)
{
new_stored = rpe->new;
rpe_mark_seen(req->hook, rpe);
rpe = rpe_next(rpe, src);
}
*new = RTES_CLONE(new_stored, *new);
return (*new || *old) && (&new_stored->rte != *old);
}
static int
channel_aux_export_one_best(struct rt_export_request *req, struct rt_pending_export *rpe, rte **new, rte **old)
{
*old = RTES_OR_NULL(rpe->old_best);
struct rte_storage *new_stored;
while (rpe)
{
new_stored = rpe->new_best;
rpe_mark_seen(req->hook, rpe);
rpe = rpe_next(rpe, NULL);
}
*new = RTES_CLONE(new_stored, *new);
return (*new || *old) && (&new_stored->rte != *old);
}
static void
channel_in_export_one_any(struct rt_export_request *req, const net_addr *net, struct rt_pending_export *rpe)
{
struct channel_aux_table *cat = SKIP_BACK(struct channel_aux_table, get, req);
if (!rpe->new && !rpe->old)
return;
rte n0;
struct rte_src *src = rpe->new ? rpe->new->rte.src : rpe->old->rte.src;
rte_update_direct(cat->c, net, RTES_CLONE(rpe->new, &n0), src);
rte n0, *new = &n0, *old;
if (channel_aux_export_one_any(req, rpe, &new, &old))
rte_update_direct(cat->c, net, new, old ? old->src : new->src);
}
static void
@ -732,12 +765,9 @@ channel_in_export_one_best(struct rt_export_request *req, const net_addr *net, s
{
struct channel_aux_table *cat = SKIP_BACK(struct channel_aux_table, get, req);
if (!rpe->new && !rpe->old)
return;
rte n0;
struct rte_src *src = rpe->old_best ? rpe->old_best->rte.src : rpe->new_best->rte.src;
rte_update_direct(cat->c, net, RTES_CLONE(rpe->new_best, &n0), src);
rte n0, *new = &n0, *old;
if (channel_aux_export_one_best(req, rpe, &new, &old))
rte_update_direct(cat->c, net, new, old ? old->src : new->src);
}
static void
@ -768,16 +798,18 @@ static void
channel_out_export_one_any(struct rt_export_request *req, const net_addr *net, struct rt_pending_export *rpe)
{
struct channel_aux_table *cat = SKIP_BACK(struct channel_aux_table, get, req);
rte n0;
do_rt_notify_direct(cat->c, net, RTES_CLONE(rpe->new, &n0), RTES_OR_NULL(rpe->old));
rte n0, *new = &n0, *old;
if (channel_aux_export_one_any(req, rpe, &new, &old))
do_rt_notify_direct(cat->c, net, new, old);
}
static void
channel_out_export_one_best(struct rt_export_request *req, const net_addr *net, struct rt_pending_export *rpe)
{
struct channel_aux_table *cat = SKIP_BACK(struct channel_aux_table, get, req);
rte n0;
do_rt_notify_direct(cat->c, net, RTES_CLONE(rpe->new_best, &n0), RTES_OR_NULL(rpe->old_best));
rte n0, *new = &n0, *old;
if (channel_aux_export_one_best(req, rpe, &new, &old))
do_rt_notify_direct(cat->c, net, new, old);
}
static void
@ -831,6 +863,7 @@ channel_setup_in_table(struct channel *c, int best)
c->in_table->c = c;
c->in_table->tab = rt_setup(c->proto->pool, &cat->tab_cf);
self_link(&c->in_table->tab->n);
rt_lock_table(c->in_table->tab);
rt_request_import(c->in_table->tab, &c->in_table->push);
rt_request_export(c->in_table->tab, &c->in_table->get);
@ -872,6 +905,7 @@ channel_setup_out_table(struct channel *c)
c->out_table->c = c;
c->out_table->tab = rt_setup(c->proto->pool, &cat->tab_cf);
self_link(&c->out_table->tab->n);
rt_lock_table(c->out_table->tab);
rt_request_import(c->out_table->tab, &c->out_table->push);
rt_request_export(c->out_table->tab, &c->out_table->get);
@ -1051,6 +1085,8 @@ channel_request_table_feeding(struct channel *c)
void
channel_request_feeding(struct channel *c)
{
CD(c, "Refeed requested");
ASSERT(c->out_req.hook);
if (c->out_table)

View File

@ -15,6 +15,8 @@
#include "lib/resource.h"
#include "lib/net.h"
#include <stdatomic.h>
struct ea_list;
struct protocol;
struct proto;
@ -152,6 +154,7 @@ struct rtable_config {
byte sorted; /* Routes of network are sorted according to rte_better() */
btime min_settle_time; /* Minimum settle time for notifications */
btime max_settle_time; /* Maximum settle time for notifications */
btime export_settle_time; /* Delay before exports are announced */
};
typedef struct rtable {
@ -181,12 +184,20 @@ typedef struct rtable {
byte prune_state; /* Table prune state, 1 -> scheduled, 2-> running */
byte hcu_scheduled; /* Hostcache update is scheduled */
byte nhu_state; /* Next Hop Update state */
byte export_used; /* Export journal setup scheduled */
struct fib_iterator prune_fit; /* Rtable prune FIB iterator */
struct fib_iterator nhu_fit; /* Next Hop Update FIB iterator */
struct tbf rl_pipe; /* Rate limiting token buffer for pipe collisions */
list subscribers; /* Subscribers for notifications */
struct timer *settle_timer; /* Settle time for notifications */
list pending_exports; /* List of packed struct rt_pending_export */
btime base_export_time; /* When first pending export was announced */
struct timer *export_timer;
struct rt_pending_export *first_export; /* First export to announce */
u64 next_export_seq; /* The next export will have this ID */
} rtable;
struct rt_subscription {
@ -203,6 +214,7 @@ struct rt_subscription {
typedef struct network {
struct rte_storage *routes; /* Available routes for this network */
struct rt_pending_export *last, *first; /* Routes with unfinished exports */
struct fib_node n; /* FIB flags reserved for kernel syncer */
} net;
@ -294,6 +306,7 @@ struct rt_import_hook {
u32 withdraws_accepted; /* Number of route withdraws accepted and processed */
} stats;
u64 flush_seq; /* Table export seq when the channel announced flushing */
btime last_state_change; /* Time of last state transition */
u8 import_state; /* IS_* */
@ -306,7 +319,9 @@ struct rt_import_hook {
};
struct rt_pending_export {
struct rt_pending_export * _Atomic next; /* Next export for the same destination */
struct rte_storage *new, *new_best, *old, *old_best;
u64 seq; /* Sequential ID (table-local) of the pending export */
};
struct rt_export_request {
@ -333,7 +348,6 @@ struct rt_export_hook {
rtable *table; /* The connected table */
pool *pool;
linpool *lp;
struct rt_export_request *req; /* The requestor */
@ -345,10 +359,15 @@ struct rt_export_hook {
struct fib_iterator feed_fit; /* Routing table iterator used during feeding */
struct bmap seq_map; /* Keep track which exports were already procesed */
struct rt_pending_export * _Atomic last_export;/* Last export processed */
struct rt_pending_export *rpe_next; /* Next pending export to process */
btime last_state_change; /* Time of last state transition */
u8 refeed_pending; /* Refeeding and another refeed is scheduled */
u8 export_state; /* Route export state (TES_*, see below) */
_Atomic u8 export_state; /* Route export state (TES_*, see below) */
struct event *event; /* Event running all the export operations */
@ -384,6 +403,16 @@ static inline u8 rt_export_get_state(struct rt_export_hook *eh) { return eh ? eh
void rte_import(struct rt_import_request *req, const net_addr *net, rte *new, struct rte_src *src);
/* Get next rpe. If src is given, it must match. */
struct rt_pending_export *rpe_next(struct rt_pending_export *rpe, struct rte_src *src);
/* Mark the pending export processed */
void rpe_mark_seen(struct rt_export_hook *hook, struct rt_pending_export *rpe);
/* Get pending export seen status */
int rpe_get_seen(struct rt_export_hook *hook, struct rt_pending_export *rpe);
/* Types of route announcement, also used as flags */
#define RA_UNDEF 0 /* Undefined RA type */
#define RA_OPTIMAL 1 /* Announcement of optimal route change */

View File

@ -45,12 +45,24 @@
#include "lib/string.h"
#include "lib/alloca.h"
#include <stdatomic.h>
pool *rt_table_pool;
static linpool *rte_update_pool;
list routing_tables;
/* Data structures for export journal */
#define RT_PENDING_EXPORT_ITEMS (page_size - sizeof(struct rt_export_block)) / sizeof(struct rt_pending_export)
struct rt_export_block {
node n;
_Atomic u32 end;
_Atomic _Bool not_last;
struct rt_pending_export export[];
};
static void rt_free_hostcache(rtable *tab);
static void rt_notify_hostcache(rtable *tab, net *net);
static void rt_update_hostcache(rtable *tab);
@ -59,6 +71,12 @@ static inline void rt_prune_table(rtable *tab);
static inline void rt_schedule_notify(rtable *tab);
static void rt_feed_channel(void *);
static inline void rt_export_used(rtable *tab);
static void rt_export_cleanup(rtable *tab);
static inline void rte_update_lock(void);
static inline void rte_update_unlock(void);
const char *rt_import_state_name_array[TIS_MAX] = {
[TIS_DOWN] = "DOWN",
[TIS_UP] = "UP",
@ -385,9 +403,9 @@ rte_mergable(rte *pri, rte *sec)
static void
rte_trace(const char *name, const rte *e, int dir, const char *msg)
{
log(L_TRACE "%s %c %s %N src %uL %uG %uS %s%s",
log(L_TRACE "%s %c %s %N src %uL %uG %uS id %u %s%s",
name, dir, msg, e->net,
e->src->private_id, e->src->global_id, e->stale_cycle,
e->src->private_id, e->src->global_id, e->stale_cycle, e->id,
rta_dest_name(e->attrs->dest),
rte_is_filtered(e) ? " (filtered)" : "");
}
@ -583,6 +601,16 @@ rt_notify_basic(struct channel *c, const net_addr *net, rte *new, rte *old)
do_rt_notify(c, net, new, old);
}
static void
channel_rpe_mark_seen(struct rt_export_request *req, struct rt_pending_export *rpe)
{
struct channel *c = SKIP_BACK(struct channel, out_req, req);
rpe_mark_seen(req->hook, rpe);
if (rpe->old)
bmap_clear(&c->export_reject_map, rpe->old->rte.id);
}
void
rt_notify_accepted(struct rt_export_request *req, const net_addr *n, struct rt_pending_export *rpe,
struct rte **feed, uint count)
@ -626,38 +654,30 @@ rt_notify_accepted(struct rt_export_request *req, const net_addr *n, struct rt_p
}
}
done:
/* Check obsolete routes for previously exported */
if (!old_best)
if (rpe && rpe->old && bmap_test(&c->export_map, rpe->old->rte.id))
old_best = &rpe->old->rte;
/* for (; rpe; rpe = atomic_load_explicit(&rpe->next, memory_order_relaxed))
while (rpe)
{
channel_rpe_mark_seen(req, rpe);
if (rpe->old)
{
if (rpe->old && bmap_test(&hook->accept_map, rpe->old->id))
if (bmap_test(&c->export_map, rpe->old->rte.id))
{
old_best = &rpe->old.rte;
break;
ASSERT_DIE(old_best == NULL);
old_best = &rpe->old->rte;
}
if (rpe == rpe_last)
break;
}
*/
rpe = rpe_next(rpe, NULL);
}
/* Nothing to export */
if (!new_best && !old_best)
{
DBG("rt_notify_accepted: nothing to export\n");
goto done;
return;
}
do_rt_notify(c, n, new_best, old_best);
done:
/* Drop the old stored rejection if applicable.
* new->id == old->id happens when updating hostentries. */
if (rpe && rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id)))
bmap_clear(&c->export_reject_map, rpe->old->rte.id);
}
@ -750,63 +770,70 @@ rt_notify_merged(struct rt_export_request *req, const net_addr *n, struct rt_pen
}
/* Check obsolete routes for previously exported */
if (!old_best)
if (rpe && rpe->old && bmap_test(&c->export_map, rpe->old->rte.id))
old_best = &rpe->old->rte;
/* for (; rpe; rpe = atomic_load_explicit(&rpe->next, memory_order_relaxed))
while (rpe)
{
channel_rpe_mark_seen(req, rpe);
if (rpe->old)
{
if (rpe->old && bmap_test(&hook->accept_map, rpe->old->id))
if (bmap_test(&c->export_map, rpe->old->rte.id))
{
old_best = &rpe->old.rte;
break;
ASSERT_DIE(old_best == NULL);
old_best = &rpe->old->rte;
}
if (rpe == rpe_last)
break;
}
*/
rpe = rpe_next(rpe, NULL);
}
/* Prepare new merged route */
rte *new_merged = count ? rt_export_merged(c, feed, count, rte_update_pool, 0) : NULL;
if (new_merged || old_best)
do_rt_notify(c, n, new_merged, old_best);
/* Drop the old stored rejection if applicable.
* new->id == old->id happens when updating hostentries. */
if (rpe && rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id)))
bmap_clear(&c->export_reject_map, rpe->old->rte.id);
}
void
rt_notify_optimal(struct rt_export_request *req, const net_addr *net, struct rt_pending_export *rpe)
{
struct channel *c = SKIP_BACK(struct channel, out_req, req);
rte n0;
if (rpe->new_best != rpe->old_best)
rt_notify_basic(c, net, RTES_CLONE(rpe->new_best, &n0), RTES_OR_NULL(rpe->old_best));
rte *old = RTES_OR_NULL(rpe->old_best);
struct rte_storage *new_best = rpe->new_best;
/* Drop the old stored rejection if applicable.
* new->id == old->id happens when updating hostentries. */
if (rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id)))
bmap_clear(&c->export_reject_map, rpe->old->rte.id);
while (rpe)
{
channel_rpe_mark_seen(req, rpe);
new_best = rpe->new_best;
rpe = rpe_next(rpe, NULL);
}
if (&new_best->rte != old)
{
rte n0, *new = RTES_CLONE(new_best, &n0);
rt_notify_basic(c, net, new, old);
}
}
void
rt_notify_any(struct rt_export_request *req, const net_addr *net, struct rt_pending_export *rpe)
{
struct channel *c = SKIP_BACK(struct channel, out_req, req);
rte n0;
if (rpe->new != rpe->old)
rt_notify_basic(c, net, RTES_CLONE(rpe->new, &n0), RTES_OR_NULL(rpe->old));
struct rte_src *src = rpe->new ? rpe->new->rte.src : rpe->old->rte.src;
rte *old = RTES_OR_NULL(rpe->old);
struct rte_storage *new_any = rpe->new;
/* Drop the old stored rejection if applicable.
* new->id == old->id happens when updating hostentries. */
if (rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id)))
bmap_clear(&c->export_reject_map, rpe->old->rte.id);
while (rpe)
{
channel_rpe_mark_seen(req, rpe);
new_any = rpe->new;
rpe = rpe_next(rpe, src);
}
if (&new_any->rte != old)
{
rte n0, *new = RTES_CLONE(new_any, &n0);
rt_notify_basic(c, net, new, old);
}
}
void
@ -821,6 +848,76 @@ rt_feed_any(struct rt_export_request *req, const net_addr *net, struct rt_pendin
}
}
void
rpe_mark_seen(struct rt_export_hook *hook, struct rt_pending_export *rpe)
{
bmap_set(&hook->seq_map, rpe->seq);
}
struct rt_pending_export *
rpe_next(struct rt_pending_export *rpe, struct rte_src *src)
{
struct rt_pending_export *next = atomic_load_explicit(&rpe->next, memory_order_acquire);
if (!next)
return NULL;
if (!src)
return next;
while (rpe = next)
if (src == (rpe->new ? rpe->new->rte.src : rpe->old->rte.src))
return rpe;
else
next = atomic_load_explicit(&rpe->next, memory_order_acquire);
return NULL;
}
static struct rt_pending_export * rt_next_export_fast(struct rt_pending_export *last);
static void
rte_export(struct rt_export_hook *hook, struct rt_pending_export *rpe)
{
if (bmap_test(&hook->seq_map, rpe->seq))
goto seen;
const net_addr *n = rpe->new_best ? rpe->new_best->rte.net : rpe->old_best->rte.net;
if (rpe->new)
hook->stats.updates_received++;
else
hook->stats.withdraws_received++;
if (hook->req->export_one)
hook->req->export_one(hook->req, n, rpe);
else if (hook->req->export_bulk)
{
net *net = SKIP_BACK(struct network, n.addr, (net_addr (*)[0]) n);
uint count = rte_feed_count(net);
rte **feed = NULL;
if (count)
{
feed = alloca(count * sizeof(rte *));
rte_feed_obtain(net, feed, count);
}
hook->req->export_bulk(hook->req, n, rpe, feed, count);
}
else
bug("Export request must always provide an export method");
seen:
/* Get the next export if exists */
hook->rpe_next = rt_next_export_fast(rpe);
/* The last block may be available to free */
if (PAGE_HEAD(hook->rpe_next) != PAGE_HEAD(rpe))
rt_export_used(hook->table);
/* Releasing this export for cleanup routine */
DBG("store hook=%p last_export=%p seq=%lu\n", hook, rpe, rpe->seq);
atomic_store_explicit(&hook->last_export, rpe, memory_order_release);
}
/**
* rte_announce - announce a routing table change
* @tab: table the route has been added to
@ -856,19 +953,23 @@ static void
rte_announce(rtable *tab, net *net, struct rte_storage *new, struct rte_storage *old,
struct rte_storage *new_best, struct rte_storage *old_best)
{
if (!new || !rte_is_valid(&new->rte))
new = NULL;
if (!old || !rte_is_valid(&old->rte))
old = NULL;
if (!new_best || !rte_is_valid(&new_best->rte))
new_best = NULL;
if (!old_best || !rte_is_valid(&old_best->rte))
old_best = NULL;
if (!new && !old && !new_best && !old_best)
if (!new || !rte_is_valid(&new->rte))
new = NULL;
if (old && !rte_is_valid(&old->rte))
{
/* Filtered old route isn't announced, should be freed immediately. */
rte_free(old, tab);
old = NULL;
}
if ((new == old) && (new_best == old_best))
return;
if (new_best != old_best)
@ -884,35 +985,194 @@ rte_announce(rtable *tab, net *net, struct rte_storage *new, struct rte_storage
rt_schedule_notify(tab);
struct rt_pending_export rpe = { .new = new, .old = old, .new_best = new_best, .old_best = old_best };
uint count = rte_feed_count(net);
rte **feed = NULL;
if (count)
if (EMPTY_LIST(tab->exports) && EMPTY_LIST(tab->pending_exports))
{
feed = alloca(count * sizeof(rte *));
rte_feed_obtain(net, feed, count);
/* No export hook and no pending exports to cleanup. We may free the route immediately. */
if (!old)
return;
hmap_clear(&tab->id_map, old->rte.id);
rte_free(old, tab);
return;
}
struct rt_export_hook *eh;
WALK_LIST(eh, tab->exports)
/* Get the pending export structure */
struct rt_export_block *rpeb = NULL, *rpebsnl = NULL;
u32 end = 0;
if (!EMPTY_LIST(tab->pending_exports))
{
if (eh->export_state == TES_STOP)
rpeb = TAIL(tab->pending_exports);
end = atomic_load_explicit(&rpeb->end, memory_order_relaxed);
if (end >= RT_PENDING_EXPORT_ITEMS)
{
ASSERT_DIE(end == RT_PENDING_EXPORT_ITEMS);
rpebsnl = rpeb;
rpeb = NULL;
end = 0;
}
}
if (!rpeb)
{
rpeb = alloc_page(tab->rp);
*rpeb = (struct rt_export_block) {};
add_tail(&tab->pending_exports, &rpeb->n);
}
/* Fill the pending export */
struct rt_pending_export *rpe = &rpeb->export[rpeb->end];
*rpe = (struct rt_pending_export) {
.new = new,
.new_best = new_best,
.old = old,
.old_best = old_best,
.seq = tab->next_export_seq++,
};
DBG("rte_announce: table=%s net=%N new=%p from %p old=%p from %p new_best=%p old_best=%p seq=%lu\n", tab->name, net->n.addr, new, new ? new->sender : NULL, old, old ? old->sender : NULL, new_best, old_best, rpe->seq);
ASSERT_DIE(atomic_fetch_add_explicit(&rpeb->end, 1, memory_order_release) == end);
if (rpebsnl)
{
_Bool f = 0;
ASSERT_DIE(atomic_compare_exchange_strong_explicit(&rpebsnl->not_last, &f, 1,
memory_order_release, memory_order_relaxed));
}
/* Append to the same-network squasher list */
if (net->last)
{
struct rt_pending_export *rpenull = NULL;
ASSERT_DIE(atomic_compare_exchange_strong_explicit(
&net->last->next, &rpenull, rpe,
memory_order_relaxed,
memory_order_relaxed));
}
net->last = rpe;
if (!net->first)
net->first = rpe;
if (tab->first_export == NULL)
tab->first_export = rpe;
if (!tm_active(tab->export_timer))
tm_start(tab->export_timer, tab->config->export_settle_time);
}
static struct rt_pending_export *
rt_next_export_fast(struct rt_pending_export *last)
{
/* Get the whole export block and find our position in there. */
struct rt_export_block *rpeb = PAGE_HEAD(last);
u32 pos = (last - &rpeb->export[0]);
u32 end = atomic_load_explicit(&rpeb->end, memory_order_acquire);
ASSERT_DIE(pos < end);
/* Next is in the same block. */
if (++pos < end)
return &rpeb->export[pos];
/* There is another block. */
if (atomic_load_explicit(&rpeb->not_last, memory_order_acquire))
{
/* This is OK to do non-atomically because of the not_last flag. */
rpeb = NODE_NEXT(rpeb);
return &rpeb->export[0];
}
/* There is nothing more. */
return NULL;
}
static struct rt_pending_export *
rt_next_export(struct rt_export_hook *hook, rtable *tab)
{
/* As the table is locked, it is safe to reload the last export pointer */
struct rt_pending_export *last = atomic_load_explicit(&hook->last_export, memory_order_acquire);
/* It is still valid, let's reuse it */
if (last)
return rt_next_export_fast(last);
/* No, therefore we must process the table's first pending export */
else
return tab->first_export;
}
static void
rt_announce_exports(timer *tm)
{
rtable *tab = tm->data;
struct rt_export_hook *c; node *n;
WALK_LIST2(c, n, tab->exports, n)
{
if (atomic_load_explicit(&c->export_state, memory_order_acquire) != TES_READY)
continue;
if (new)
eh->stats.updates_received++;
else
eh->stats.withdraws_received++;
if (eh->req->export_one)
eh->req->export_one(eh->req, net->n.addr, &rpe);
else if (eh->req->export_bulk)
eh->req->export_bulk(eh->req, net->n.addr, &rpe, feed, count);
else
bug("Export request must always provide an export method");
ev_schedule_work(c->event);
}
}
static struct rt_pending_export *
rt_last_export(rtable *tab)
{
struct rt_pending_export *rpe = NULL;
if (!EMPTY_LIST(tab->pending_exports))
{
/* We'll continue processing exports from this export on */
struct rt_export_block *reb = TAIL(tab->pending_exports);
ASSERT_DIE(reb->end);
rpe = &reb->export[reb->end - 1];
}
return rpe;
}
#define RT_EXPORT_BULK 1024
static void
rt_export_hook(void *_data)
{
struct rt_export_hook *c = _data;
ASSERT_DIE(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_READY);
if (!c->rpe_next)
{
c->rpe_next = rt_next_export(c, c->table);
if (!c->rpe_next)
{
rt_export_used(c->table);
return;
}
}
/* Process the export */
for (uint i=0; i<RT_EXPORT_BULK; i++)
{
rte_update_lock();
rte_export(c, c->rpe_next);
if (!c->rpe_next)
break;
rte_update_unlock();
}
ev_schedule_work(c->event);
}
static inline int
rte_validate(struct channel *ch, rte *e)
{
@ -1133,14 +1393,8 @@ rte_recalculate(struct rt_import_hook *c, net *net, rte *new, struct rte_src *sr
if (new_stored)
{
new_stored->rte.lastmod = current_time();
if (!old)
{
new_stored->rte.id = hmap_first_zero(&table->id_map);
hmap_set(&table->id_map, new_stored->rte.id);
}
else
new_stored->rte.id = old->id;
new_stored->rte.id = hmap_first_zero(&table->id_map);
hmap_set(&table->id_map, new_stored->rte.id);
}
_Bool nb = (new_stored == net->routes);
@ -1178,13 +1432,6 @@ rte_recalculate(struct rt_import_hook *c, net *net, rte *new, struct rte_src *sr
p->rte_insert(net, &new_stored->rte);
#endif
if (old)
{
if (!new_stored)
hmap_clear(&table->id_map, old->id);
rte_free(old_stored, table);
}
}
static int rte_update_nest_cnt; /* Nesting counter to allow recursive updates */
@ -1384,13 +1631,16 @@ rt_export_stopped(void *data)
struct rt_export_hook *hook = data;
rtable *tab = hook->table;
/* Drop pending exports */
rt_export_used(tab);
/* Unlist */
rem_node(&hook->n);
/* Reporting the channel as stopped. */
/* Report the channel as stopped. */
hook->stopped(hook->req);
/* Freeing the hook together with its coroutine. */
/* Free the hook together with its coroutine. */
rfree(hook->pool);
rt_unlock_table(tab);
@ -1412,7 +1662,7 @@ static inline void
rt_set_export_state(struct rt_export_hook *hook, u8 state)
{
hook->last_state_change = current_time();
hook->export_state = state;
atomic_store_explicit(&hook->export_state, state, memory_order_release);
if (hook->req->log_state_change)
hook->req->log_state_change(hook->req, state);
@ -1460,15 +1710,20 @@ rt_request_export(rtable *tab, struct rt_export_request *req)
pool *p = rp_new(tab->rp, "Export hook");
struct rt_export_hook *hook = req->hook = mb_allocz(p, sizeof(struct rt_export_hook));
hook->pool = p;
hook->lp = lp_new_default(p);
hook->req = req;
hook->table = tab;
/* stats zeroed by mb_allocz */
bmap_init(&hook->seq_map, p, 1024);
rt_set_export_state(hook, TES_HUNGRY);
struct rt_pending_export *rpe = rt_last_export(hook->table);
DBG("store hook=%p last_export=%p seq=%lu\n", hook, rpe, rpe ? rpe->seq : 0);
atomic_store_explicit(&hook->last_export, rpe, memory_order_relaxed);
hook->n = (node) {};
add_tail(&tab->exports, &hook->n);
@ -1476,10 +1731,10 @@ rt_request_export(rtable *tab, struct rt_export_request *req)
DBG("New export hook %p req %p in table %s uc=%u\n", hook, req, tab->name, tab->use_count);
rt_set_export_state(hook, TES_FEEDING);
hook->event = ev_new_init(p, rt_feed_channel, hook);
ev_schedule_work(hook->event);
rt_set_export_state(hook, TES_FEEDING);
}
void
@ -1493,16 +1748,18 @@ rt_stop_export(struct rt_export_request *req, void (*stopped)(struct rt_export_r
/* Stop feeding */
ev_postpone(hook->event);
if (hook->export_state == TES_FEEDING)
if (atomic_load_explicit(&hook->export_state, memory_order_relaxed) == TES_FEEDING)
fit_get(&tab->fib, &hook->feed_fit);
hook->event->hook = rt_export_stopped;
hook->stopped = stopped;
rt_set_export_state(hook, TES_STOP);
ev_schedule(hook->event);
rt_set_export_state(hook, TES_STOP);
}
/**
* rt_refresh_begin - start a refresh cycle
* @t: related routing table
@ -1649,8 +1906,8 @@ rt_dump_hooks(rtable *tab)
{
eh->req->dump_req(eh->req);
debug(" Export hook %p requested by %p:"
" refeed_pending=%u last_state_change=%t export_state=%u stopped=%p\n",
eh, eh->req, eh->refeed_pending, eh->last_state_change, eh->export_state, eh->stopped);
" refeed_pending=%u last_state_change=%t export_state=%u\n",
eh, eh->req, eh->refeed_pending, eh->last_state_change, atomic_load_explicit(&eh->export_state, memory_order_relaxed));
}
debug("\n");
}
@ -1700,6 +1957,18 @@ rt_schedule_prune(rtable *tab)
tab->prune_state |= 1;
}
void
rt_export_used(rtable *tab)
{
if (config->table_debug)
log(L_TRACE "%s: Export cleanup requested", tab->name);
if (tab->export_used)
return;
tab->export_used = 1;
ev_schedule(tab->rt_event);
}
static void
rt_event(void *ptr)
@ -1708,6 +1977,9 @@ rt_event(void *ptr)
rt_lock_table(tab);
if (tab->export_used)
rt_export_cleanup(tab);
if (tab->hcu_scheduled)
rt_update_hostcache(tab);
@ -1857,13 +2129,17 @@ rt_setup(pool *pp, struct rtable_config *cf)
init_list(&t->imports);
init_list(&t->exports);
hmap_init(&t->id_map, p, 1024);
hmap_set(&t->id_map, 0);
init_list(&t->pending_exports);
init_list(&t->subscribers);
t->rt_event = ev_new_init(p, rt_event, t);
t->export_timer = tm_new_init(p, rt_announce_exports, t, 0, 0);
t->last_rt_change = t->gc_time = current_time();
t->next_export_seq = 1;
t->rl_pipe = (struct tbf) TBF_DEFAULT_LOG_LIMITS;
@ -1961,7 +2237,7 @@ again:
}
}
if (!n->routes) /* Orphaned FIB entry */
if (!n->routes && !n->first) /* Orphaned FIB entry */
{
FIB_ITERATE_PUT(fit);
fib_delete(&tab->fib, n);
@ -1982,15 +2258,15 @@ again:
rt_prune_sources();
uint flushed_channels = 0;
/* Close flushed channels */
WALK_LIST2_DELSAFE(ih, n, x, tab->imports, n)
if (ih->import_state == TIS_FLUSHING)
{
rt_set_import_state(ih, TIS_CLEARED);
ih->stopped(ih->req);
rem_node(&ih->n);
mb_free(ih);
rt_unlock_table(tab);
ih->flush_seq = tab->next_export_seq;
rt_set_import_state(ih, TIS_WAITING);
flushed_channels++;
}
else if (ih->stale_pruning != ih->stale_pruned)
{
@ -1999,6 +2275,185 @@ again:
if (ih->req->trace_routes & D_STATES)
log(L_TRACE "%s: table prune after refresh end [%u]", ih->req->name, ih->stale_pruned);
}
/* In some cases, we may want to directly proceed to export cleanup */
if (EMPTY_LIST(tab->exports) && flushed_channels)
rt_export_cleanup(tab);
}
static void
rt_export_cleanup(rtable *tab)
{
tab->export_used = 0;
u64 min_seq = ~((u64) 0);
struct rt_pending_export *last_export_to_free = NULL;
struct rt_pending_export *first_export = tab->first_export;
struct rt_export_hook *eh;
node *n;
WALK_LIST2(eh, n, tab->exports, n)
{
switch (atomic_load_explicit(&eh->export_state, memory_order_acquire))
{
case TES_DOWN:
case TES_HUNGRY:
continue;
case TES_READY:
{
struct rt_pending_export *last = atomic_load_explicit(&eh->last_export, memory_order_acquire);
if (!last)
/* No last export means that the channel has exported nothing since last cleanup */
goto done;
else if (min_seq > last->seq)
{
min_seq = last->seq;
last_export_to_free = last;
}
continue;
}
default:
/* It's only safe to cleanup when the export state is idle or regular. No feeding or stopping allowed. */
goto done;
}
}
tab->first_export = last_export_to_free ? rt_next_export_fast(last_export_to_free) : NULL;
if (config->table_debug)
log(L_TRACE "%s: Export cleanup, old first_export seq %lu, new %lu, min_seq %ld",
tab->name,
first_export ? first_export->seq : 0,
tab->first_export ? tab->first_export->seq : 0,
min_seq);
WALK_LIST2(eh, n, tab->exports, n)
{
if (atomic_load_explicit(&eh->export_state, memory_order_acquire) != TES_READY)
continue;
struct rt_pending_export *last = atomic_load_explicit(&eh->last_export, memory_order_acquire);
if (last == last_export_to_free)
{
/* This may fail when the channel managed to export more inbetween. This is OK. */
atomic_compare_exchange_strong_explicit(
&eh->last_export, &last, NULL,
memory_order_release,
memory_order_relaxed);
DBG("store hook=%p last_export=NULL\n", eh);
}
}
while (first_export && (first_export->seq <= min_seq))
{
ASSERT_DIE(first_export->new || first_export->old);
const net_addr *n = first_export->new ?
first_export->new->rte.net :
first_export->old->rte.net;
net *net = SKIP_BACK(struct network, n.addr, (net_addr (*)[0]) n);
ASSERT_DIE(net->first == first_export);
if (first_export == net->last)
/* The only export here */
net->last = net->first = NULL;
else
/* First is now the next one */
net->first = atomic_load_explicit(&first_export->next, memory_order_relaxed);
/* For now, the old route may be finally freed */
if (first_export->old)
{
rt_rte_trace_in(D_ROUTES, first_export->old->rte.sender->req, &first_export->old->rte, "freed");
hmap_clear(&tab->id_map, first_export->old->rte.id);
rte_free(first_export->old, tab);
}
#ifdef LOCAL_DEBUG
memset(first_export, 0xbd, sizeof(struct rt_pending_export));
#endif
struct rt_export_block *reb = HEAD(tab->pending_exports);
ASSERT_DIE(reb == PAGE_HEAD(first_export));
u32 pos = (first_export - &reb->export[0]);
u32 end = atomic_load_explicit(&reb->end, memory_order_relaxed);
ASSERT_DIE(pos < end);
struct rt_pending_export *next = NULL;
if (++pos < end)
next = &reb->export[pos];
else
{
rem_node(&reb->n);
#ifdef LOCAL_DEBUG
memset(reb, 0xbe, page_size);
#endif
free_page(tab->rp, reb);
if (EMPTY_LIST(tab->pending_exports))
{
if (config->table_debug)
log(L_TRACE "%s: Resetting export seq", tab->name);
node *n;
WALK_LIST2(eh, n, tab->exports, n)
{
if (atomic_load_explicit(&eh->export_state, memory_order_acquire) != TES_READY)
continue;
ASSERT_DIE(atomic_load_explicit(&eh->last_export, memory_order_acquire) == NULL);
bmap_reset(&eh->seq_map, 1024);
}
tab->next_export_seq = 1;
}
else
{
reb = HEAD(tab->pending_exports);
next = &reb->export[0];
}
}
first_export = next;
}
done:;
struct rt_import_hook *ih; node *x;
_Bool imports_stopped = 0;
WALK_LIST2_DELSAFE(ih, n, x, tab->imports, n)
if (ih->import_state == TIS_WAITING)
if (!first_export || (first_export->seq >= ih->flush_seq))
{
ih->import_state = TIS_CLEARED;
ih->stopped(ih->req);
rem_node(&ih->n);
mb_free(ih);
rt_unlock_table(tab);
imports_stopped = 1;
}
if (tab->export_used)
ev_schedule(tab->rt_event);
if (imports_stopped)
{
if (config->table_debug)
log(L_TRACE "%s: Sources pruning routine requested", tab->name);
rt_prune_sources();
}
if (EMPTY_LIST(tab->pending_exports) && tm_active(tab->export_timer))
tm_stop(tab->export_timer);
}
void
@ -2180,6 +2635,11 @@ rt_next_hop_update_net(rtable *tab, net *n)
/* Replace the route in the list */
new->next = e->next;
*k = e = new;
/* Get a new ID for the route */
new->rte.lastmod = current_time();
new->rte.id = hmap_first_zero(&tab->id_map);
hmap_set(&tab->id_map, new->rte.id);
}
ASSERT_DIE(pos == count);
@ -2213,9 +2673,6 @@ rt_next_hop_update_net(rtable *tab, net *n)
rte_announce_i(tab, n, updates[i].new, updates[i].old, new, old_best);
}
for (int i=0; i<count; i++)
rte_free(updates[i].old, tab);
return count;
}
@ -2405,7 +2862,7 @@ rt_feed_channel(void *data)
struct fib_iterator *fit = &c->feed_fit;
int max_feed = 256;
ASSERT(c->export_state == TES_FEEDING);
ASSERT(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_FEEDING);
FIB_ITERATE_START(&c->table->fib, fit, net, n)
{
@ -2416,8 +2873,8 @@ rt_feed_channel(void *data)
return;
}
if (c->export_state != TES_FEEDING)
goto done;
if (atomic_load_explicit(&c->export_state, memory_order_acquire) != TES_FEEDING)
return;
if (c->req->export_bulk)
{
@ -2427,8 +2884,7 @@ rt_feed_channel(void *data)
rte_update_lock();
rte **feed = alloca(count * sizeof(rte *));
rte_feed_obtain(n, feed, count);
struct rt_pending_export rpe = { .new_best = n->routes };
c->req->export_bulk(c->req, n->n.addr, &rpe, feed, count);
c->req->export_bulk(c->req, n->n.addr, NULL, feed, count);
max_feed -= count;
rte_update_unlock();
}
@ -2441,10 +2897,15 @@ rt_feed_channel(void *data)
max_feed--;
rte_update_unlock();
}
for (struct rt_pending_export *rpe = n->first; rpe; rpe = rpe_next(rpe, NULL))
rpe_mark_seen(c, rpe);
}
FIB_ITERATE_END;
done:
c->event->hook = rt_export_hook;
ev_schedule_work(c->event);
rt_set_export_state(c, TES_READY);
}