0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2024-12-22 17:51:53 +00:00

Merge branch 'mq-aggregator-for-v3' into HEAD

This commit is contained in:
Maria Matejka 2023-11-01 10:57:57 +01:00
commit 8917f16e4b
20 changed files with 907 additions and 11 deletions

View File

@ -76,7 +76,7 @@ HASH_DEFINE_REHASH_FN(SYM, struct symbol)
/* Global symbol scopes */ /* Global symbol scopes */
pool *global_root_scope_pool; pool *global_root_scope_pool;
linpool *global_root_scope_linpool; linpool *global_root_scope_linpool;
static struct sym_scope struct sym_scope
global_root_scope = { global_root_scope = {
}, },
global_filter_scope = { global_filter_scope = {

View File

@ -97,6 +97,7 @@ CF_DECLS
struct settle_config settle; struct settle_config settle;
struct adata *ad; struct adata *ad;
const struct adata *bs; const struct adata *bs;
struct aggr_item_node *ai;
} }
%token END CLI_MARKER INVALID_TOKEN ELSECOL DDOT %token END CLI_MARKER INVALID_TOKEN ELSECOL DDOT

View File

@ -307,13 +307,14 @@ if test "$enable_mpls_kernel" != no ; then
fi fi
# temporarily removed "mrt" from all_protocols to speed up 3.0-alpha1 release # temporarily removed "mrt" from all_protocols to speed up 3.0-alpha1 release
all_protocols="bfd babel bgp ospf perf pipe radv rip rpki static" all_protocols="aggregator bfd babel bgp ospf perf pipe radv rip rpki static"
all_protocols=`echo $all_protocols | sed 's/ /,/g'` all_protocols=`echo $all_protocols | sed 's/ /,/g'`
if test "$with_protocols" = all ; then if test "$with_protocols" = all ; then
with_protocols="$all_protocols" with_protocols="$all_protocols"
fi fi
AH_TEMPLATE([CONFIG_AGGREGATOR],[Aggregator protocol])
AH_TEMPLATE([CONFIG_BABEL], [Babel protocol]) AH_TEMPLATE([CONFIG_BABEL], [Babel protocol])
AH_TEMPLATE([CONFIG_BFD], [BFD protocol]) AH_TEMPLATE([CONFIG_BFD], [BFD protocol])
AH_TEMPLATE([CONFIG_BGP], [BGP protocol]) AH_TEMPLATE([CONFIG_BGP], [BGP protocol])

View File

@ -1938,6 +1938,70 @@ protocol sections.
<chapt>Protocols <chapt>Protocols
<label id="protocols"> <label id="protocols">
<sect>Aggregator
<label id="aggregator">
<sect1>Introduction
<label id="aggregator-intro">
<p>The Aggregator protocol explicitly merges routes by the given rules. There
are four phases of aggregation. First routes are filtered, then sorted into buckets,
then buckets are merged and finally the results are filtered once again.
Aggregating an already aggregated route is forbidden.
<p>This is an experimental protocol, use with caution.
<sect1>Configuration
<label id="aggregator-config">
<p><descrip>
<tag><label id="aggregator-table">table <m/table/</tag>
The table from which routes are exported to get aggregated.
<tag><label id="aggregator-export">export <m/.../</tag>
A standard channel's <cf/export/ clause, defining which routes are accepted into aggregation.
<tag><label id="aggregator-rule">aggregate on <m/expr/ | <m/attribute/ [<m/, .../]</tag>
All the given filter expressions and route attributes are evaluated for each route. Then routes
are sorted into buckets where <em/all/ values are the same. Note: due to performance reasons,
all filter expressions must return a compact type, e.g. integer, a BGP
(standard, extended, large) community or an IP address. If you need to compare e.g. modified
AS Paths in the aggregation rule, you can define a custom route attribute and set this attribute
in the export filter. For now, it's mandatory to say <cf/net/ here, we can't merge prefixes yet.
<tag><label id="aggregation-merge">merge by { <m/filter code/ }</tag>
The given filter code has an extra symbol defined: <cf/routes/. By iterating over <cf/routes/,
you get all the routes in the bucket and you can construct your new route. All attributes
selected in <cf/aggregate on/ are already set to the common values. For now, it's not possible
to use a named filter here. You have to finalize the route by calling <cf/accept/.
<tag><label id="aggregator-import">import <m/.../</tag>
Filter applied to the route after <cf/merge by/. Here you can use a named filter.
<tag><label id="aggregator-peer-table">peer table <m/table/</tag>
The table to which aggregated routes are imported. It may be the same table
as <cf/table/.
</descrip>
<sect1>Example
<label id="aggregator-example">
<p><code>
protocol aggregator {
table master6;
export where defined(bgp_path);
/* Merge all routes with the same AS Path length */
aggregate on net, bgp_path.len;
merge by {
for route r in routes do {
if ! defined(bgp_path) then { bgp_path = r.bgp_path }
bgp_community = bgp_community.add(r.bgp_community);
}
accept;
};
import all;
peer table agr_result;
}
</code>
<sect>Babel <sect>Babel
<label id="babel"> <label id="babel">

View File

@ -40,12 +40,9 @@ static inline void f_method_call_start(struct f_inst *object)
cf_error("Too many nested method calls"); cf_error("Too many nested method calls");
struct sym_scope *scope = f_type_method_scope(object->type); struct sym_scope *scope = f_type_method_scope(object->type);
if (!scope && object->type != T_ROUTE) if (!scope->hash.count && !scope->next)
cf_error("No methods defined for type %s", f_type_name(object->type)); cf_error("No methods defined for type %s", f_type_name(object->type));
if (!scope)
scope = config->root_scope->next;
/* Replacing the current symbol scope with the appropriate method scope /* Replacing the current symbol scope with the appropriate method scope
for the given type. */ for the given type. */
FM = (struct f_method_scope) { FM = (struct f_method_scope) {

View File

@ -205,6 +205,8 @@ val_compare(const struct f_val *v1, const struct f_val *v2)
return net_compare(v1->val.net, v2->val.net); return net_compare(v1->val.net, v2->val.net);
case T_STRING: case T_STRING:
return strcmp(v1->val.s, v2->val.s); return strcmp(v1->val.s, v2->val.s);
case T_PATH:
return as_path_compare(v1->val.ad, v2->val.ad);
case T_ROUTE: case T_ROUTE:
case T_ROUTES_BLOCK: case T_ROUTES_BLOCK:
default: default:
@ -679,7 +681,6 @@ rte_block_format(const struct rte_block *block, buffer *buf)
} }
} }
/* /*
* val_format - format filter value * val_format - format filter value
*/ */

View File

@ -652,6 +652,8 @@ f_register_method(enum btype t, const byte *name, struct f_method *dsc)
sym->method = dsc; sym->method = dsc;
} }
extern struct sym_scope global_filter_scope;
void f_type_methods_register(void) void f_type_methods_register(void)
{ {
struct f_method *method; struct f_method *method;
@ -660,6 +662,8 @@ FID_WR_PUT(13)
for (uint i = 0; i < ARRAY_SIZE(f_type_method_scopes); i++) for (uint i = 0; i < ARRAY_SIZE(f_type_method_scopes); i++)
f_type_method_scopes[i].readonly = 1; f_type_method_scopes[i].readonly = 1;
f_type_method_scopes[T_ROUTE].next = &global_filter_scope;
} }
/* Line dumpers */ /* Line dumpers */

View File

@ -669,6 +669,29 @@ as_path_filter(struct linpool *pool, const struct adata *path, const struct f_va
return res; return res;
} }
int
as_path_compare(const struct adata *path1, const struct adata *path2)
{
uint pos1 = 0;
uint pos2 = 0;
uint val1 = 0;
uint val2 = 0;
while (1)
{
int res1 = as_path_walk(path1, &pos1, &val1);
int res2 = as_path_walk(path2, &pos2, &val2);
if (res1 == 0 && res2 == 0)
return 0;
if (val1 == val2)
continue;
return val1 < val2 ? -1 : 1;
}
}
int int
as_path_walk(const struct adata *path, uint *pos, uint *val) as_path_walk(const struct adata *path, uint *pos, uint *val)
{ {

View File

@ -83,6 +83,7 @@ u32 as_path_get_last_nonaggregated(const struct adata *path);
int as_path_contains(const struct adata *path, u32 as, int min); int as_path_contains(const struct adata *path, u32 as, int min);
int as_path_match_set(const struct adata *path, const struct f_tree *set); int as_path_match_set(const struct adata *path, const struct f_tree *set);
const struct adata *as_path_filter(struct linpool *pool, const struct adata *path, const struct f_val *set, int pos); const struct adata *as_path_filter(struct linpool *pool, const struct adata *path, const struct f_val *set, int pos);
int as_path_compare(const struct adata *path1, const struct adata *path2);
int as_path_walk(const struct adata *path, uint *pos, uint *val); int as_path_walk(const struct adata *path, uint *pos, uint *val);
static inline struct adata *as_path_prepend(struct linpool *pool, const struct adata *path, u32 as) static inline struct adata *as_path_prepend(struct linpool *pool, const struct adata *path, u32 as)

View File

@ -223,7 +223,8 @@ struct nexthop_adata {
#define RTS_BABEL 13 /* Babel route */ #define RTS_BABEL 13 /* Babel route */
#define RTS_RPKI 14 /* Route Origin Authorization */ #define RTS_RPKI 14 /* Route Origin Authorization */
#define RTS_PERF 15 /* Perf checker */ #define RTS_PERF 15 /* Perf checker */
#define RTS_MAX 16 #define RTS_AGGREGATED 16 /* Aggregated route */
#define RTS_MAX 17
#define RTD_NONE 0 /* Undefined next hop */ #define RTD_NONE 0 /* Undefined next hop */
#define RTD_UNICAST 1 /* A standard next hop */ #define RTD_UNICAST 1 /* A standard next hop */

View File

@ -180,7 +180,6 @@ proto_log_state_change(struct proto *p)
p->last_state_name_announced = NULL; p->last_state_name_announced = NULL;
} }
struct channel_config * struct channel_config *
proto_cf_find_channel(struct proto_config *pc, uint net_type) proto_cf_find_channel(struct proto_config *pc, uint net_type)
{ {

View File

@ -93,7 +93,7 @@ void protos_dump_all(void);
extern struct protocol extern struct protocol
proto_device, proto_radv, proto_rip, proto_static, proto_mrt, proto_device, proto_radv, proto_rip, proto_static, proto_mrt,
proto_ospf, proto_perf, proto_ospf, proto_perf, proto_aggregator,
proto_pipe, proto_bgp, proto_bmp, proto_bfd, proto_babel, proto_rpki; proto_pipe, proto_bgp, proto_bmp, proto_bfd, proto_babel, proto_rpki;
/* /*

View File

@ -91,6 +91,8 @@ const char * const rta_src_names[RTS_MAX] = {
[RTS_PIPE] = "pipe", [RTS_PIPE] = "pipe",
[RTS_BABEL] = "Babel", [RTS_BABEL] = "Babel",
[RTS_RPKI] = "RPKI", [RTS_RPKI] = "RPKI",
[RTS_PERF] = "Perf",
[RTS_AGGREGATED] = "aggregated",
}; };
static void static void

View File

@ -1089,7 +1089,6 @@ rt_notify_merged(struct rt_export_request *req, const net_addr *n,
const rte **feed, uint count) const rte **feed, uint count)
{ {
struct channel *c = channel_from_export_request(req); struct channel *c = channel_from_export_request(req);
// struct proto *p = c->proto; // struct proto *p = c->proto;
#if 0 /* TODO: Find whether this check is possible when processing multiple changes at once. */ #if 0 /* TODO: Find whether this check is possible when processing multiple changes at once. */

1
proto/aggregator/Doc Normal file
View File

@ -0,0 +1 @@
S aggregator.c

View File

@ -0,0 +1,6 @@
src := aggregator.c
obj := $(src-o-files)
$(all-daemon)
$(cf-local)
tests_objs := $(tests_objs) $(src-o-files)

View File

@ -0,0 +1,469 @@
/*
* BIRD Internet Routing Daemon -- Route aggregation
*
* (c) 2023--2023 Igor Putovny <igor.putovny@nic.cz>
* (c) 2023 CZ.NIC, z.s.p.o.
*
* Can be freely distributed and used under the terms of the GNU GPL.
*/
/**
* DOC: Route aggregation
*
* This is an implementation of route aggregation functionality.
* It enables user to specify a set of route attributes in the configuarion file
* and then, for a given destination (net), aggregate routes with the same
* values of these attributes into a single multi-path route.
*
* Structure &channel contains pointer to aggregation list which is represented
* by &aggr_list_linearized. In rt_notify_aggregated(), attributes from this
* list are evaluated for every route of a given net and results are stored
* in &rte_val_list which contains pointer to this route and array of &f_val.
* Array of pointers to &rte_val_list entries is sorted using
* sort_rte_val_list(). For comparison of &f_val structures, val_compare()
* is used. Comparator function is written so that sorting is stable. If all
* attributes have the same values, routes are compared by their global IDs.
*
* After sorting, &rte_val_list entries containing equivalent routes will be
* adjacent to each other. Function process_rte_list() iterates through these
* entries to identify sequences of equivalent routes. New route will be
* created for each such sequence, even if only from a single route.
* Only attributes from the aggreagation list will be set for the new route.
* New &rta is created and prepare_rta() is used to copy static and dynamic
* attributes to new &rta from &rta of the original route. New route is created
* by create_merged_rte() from new &rta and exported to the routing table.
*/
#undef LOCAL_DEBUG
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include "nest/bird.h"
#include "nest/iface.h"
#include "filter/filter.h"
#include "aggregator.h"
#include <stdlib.h>
/*
* Compare list of &f_val entries.
* @count: number of &f_val entries
*/
static int
same_val_list(const struct f_val *v1, const struct f_val *v2, uint len)
{
for (uint i = 0; i < len; i++)
if (!val_same(&v1[i], &v2[i]))
return 0;
return 1;
}
/*
* Create and export new merged route.
* @old: first route in a sequence of equivalent routes that are to be merged
* @rte_val: first element in a sequence of equivalent rte_val_list entries
* @length: number of equivalent routes that are to be merged (at least 1)
* @ail: aggregation list
*/
static void
aggregator_bucket_update(struct aggregator_proto *p, struct aggregator_bucket *bucket, const net_addr *net)
{
/* Empty bucket */
if (!bucket->rte)
{
rte_update(p->dst, net, NULL, bucket->last_src);
bucket->last_src = NULL;
return;
}
/* Store TMP linpool state */
struct lp_state *tmp_state = lp_save(tmp_linpool);
/* Allocate route */
struct rte new = { .net = net, .src = bucket->rte->rte.src };
ea_set_attr(&new.attrs, EA_LITERAL_EMBEDDED(&ea_gen_source, 0, RTS_AGGREGATED));
if (net_type_match(net, NB_DEST))
ea_set_dest(&new.attrs, 0, RTD_UNREACHABLE);
/* Seed the attributes from aggregator rule */
f_eval_rte(p->premerge, &new, p->aggr_on_count, bucket->aggr_data, 0, NULL);
/*
log("=============== CREATE MERGED ROUTE ===============");
log("New route created: id = %d, protocol: %s", new->src->global_id, new->src->proto->name);
log("===================================================");
*/
/* merge filter needs one argument called "routes" */
struct f_val val = {
.type = T_ROUTES_BLOCK,
.val.rte_block = {},
};
for (struct aggregator_route *rte = bucket->rte; rte; rte = rte->next_rte)
val.val.rte_block.len++;
val.val.rte_block.rte = tmp_alloc(sizeof(struct rte *) * val.val.rte_block.len);
{
uint i = 0;
for (struct aggregator_route *rte = bucket->rte; rte; rte = rte->next_rte)
val.val.rte_block.rte[i++] = &rte->rte;
ASSERT_DIE(i == val.val.rte_block.len);
}
/* Actually run the merge rule */
enum filter_return fret = f_eval_rte(p->merge_by, &new, 1, &val, 0, NULL);
/* Finally import the route */
switch (fret)
{
/* Pass the route to the protocol */
case F_ACCEPT:
rte_update(p->dst, net, &new, bucket->last_src ?: new.src);
break;
/* Something bad happened */
default:
ASSERT_DIE(fret == F_ERROR);
/* fall through */
/* We actually don't want this route */
case F_REJECT:
if (bucket->last_src)
rte_update(p->dst, net, NULL, bucket->last_src);
break;
}
/* Switch source lock for bucket->last_src */
if (bucket->last_src != new.src)
{
if (new.src)
rt_lock_source(new.src);
if (bucket->last_src)
rt_unlock_source(bucket->last_src);
bucket->last_src = new.src;
}
lp_restore(tmp_linpool, tmp_state);
}
/*
* Reload all the buckets on reconfiguration if merge filter has changed.
* TODO: make this splitted
*/
static void
aggregator_reload_buckets(void *data)
{
struct aggregator_proto *p = data;
HASH_WALK(p->buckets, next_hash, b)
if (b->rte)
aggregator_bucket_update(p, b, b->rte->rte.net);
HASH_WALK_END;
}
static inline u32 aggr_route_hash(const rte *e)
{
struct {
const net_addr *net; /* the net_addr pointer is stable as long as any route exists for it in the source table */
struct rte_src *src;
} obj = {
.net = e->net,
.src = e->src,
};
return mem_hash(&obj, sizeof obj);
}
#define AGGR_RTE_KEY(n) (&(n)->rte)
#define AGGR_RTE_NEXT(n) ((n)->next_hash)
#define AGGR_RTE_EQ(a,b) (((a)->src == (b)->src) && ((a)->net == (b)->net))
#define AGGR_RTE_FN(_n) aggr_route_hash(_n)
#define AGGR_RTE_ORDER 4 /* Initial */
#define AGGR_RTE_REHASH aggr_rte_rehash
#define AGGR_RTE_PARAMS /8, *2, 2, 2, 4, 24
HASH_DEFINE_REHASH_FN(AGGR_RTE, struct aggregator_route);
#define AGGR_BUCK_KEY(n) (n)
#define AGGR_BUCK_NEXT(n) ((n)->next_hash)
#define AGGR_BUCK_EQ(a,b) (((a)->hash == (b)->hash) && (same_val_list((a)->aggr_data, (b)->aggr_data, p->aggr_on_count)))
#define AGGR_BUCK_FN(n) ((n)->hash)
#define AGGR_BUCK_ORDER 4 /* Initial */
#define AGGR_BUCK_REHASH aggr_buck_rehash
#define AGGR_BUCK_PARAMS /8, *2, 2, 2, 4, 24
HASH_DEFINE_REHASH_FN(AGGR_BUCK, struct aggregator_bucket);
#define AGGR_DATA_MEMSIZE (sizeof(struct f_val) * p->aggr_on_count)
static void
aggregator_rt_notify(struct proto *P, struct channel *src_ch, const net_addr *net, rte *new, const rte *old)
{
struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P);
ASSERT_DIE(src_ch == p->src);
struct aggregator_bucket *new_bucket = NULL, *old_bucket = NULL;
struct aggregator_route *old_route = NULL;
/* Find the objects for the old route */
if (old)
old_route = HASH_FIND(p->routes, AGGR_RTE, old);
if (old_route)
old_bucket = old_route->bucket;
/* Find the bucket for the new route */
if (new)
{
/* Routes are identical, do nothing */
if (old_route && rte_same(&old_route->rte, new))
return;
/* Evaluate route attributes. */
struct aggregator_bucket *tmp_bucket = sl_allocz(p->bucket_slab);
struct lp_state *tmp_state = lp_save(tmp_linpool);
struct ea_list *oa = new->attrs;
enum filter_return fret = f_eval_rte(p->aggr_on, new, 0, NULL, p->aggr_on_count, tmp_bucket->aggr_data);
if (new->attrs != oa)
log(L_WARN "Aggregator rule modifies the route");
/* Check filter return value */
if (fret > F_RETURN)
{
sl_free(tmp_bucket);
lp_restore(tmp_linpool, tmp_state);
return;
}
/* Compute the hash */
u64 haux;
mem_hash_init(&haux);
for (uint i = 0; i < p->aggr_on_count; i++)
mem_hash_mix_f_val(&haux, &tmp_bucket->aggr_data[i]);
tmp_bucket->hash = mem_hash_value(&haux);
/* Find the existing bucket */
if (new_bucket = HASH_FIND(p->buckets, AGGR_BUCK, tmp_bucket))
sl_free(tmp_bucket);
else
{
new_bucket = tmp_bucket;
HASH_INSERT2(p->buckets, AGGR_BUCK, p->p.pool, new_bucket);
}
/* Store the route attributes */
if (rta_is_cached(new->attrs))
rta_clone(new->attrs);
else
new->attrs = rta_lookup(new->attrs, 0);
/* Insert the new route into the bucket */
struct aggregator_route *arte = sl_alloc(p->route_slab);
*arte = (struct aggregator_route) {
.bucket = new_bucket,
.rte = *new,
.next_rte = new_bucket->rte,
};
new_bucket->rte = arte;
new_bucket->count++;
HASH_INSERT2(p->routes, AGGR_RTE, p->p.pool, arte);
lp_restore(tmp_linpool, tmp_state);
}
/* Remove the old route from its bucket */
if (old_bucket)
{
for (struct aggregator_route **k = &old_bucket->rte; *k; k = &(*k)->next_rte)
if (*k == old_route)
{
*k = (*k)->next_rte;
break;
}
old_bucket->count--;
HASH_REMOVE2(p->routes, AGGR_RTE, p->p.pool, old_route);
rta_free(old_route->rte.attrs);
sl_free(old_route);
}
/* Announce changes */
if (old_bucket)
aggregator_bucket_update(p, old_bucket, net);
if (new_bucket && (new_bucket != old_bucket))
aggregator_bucket_update(p, new_bucket, net);
/* Cleanup the old bucket if empty */
if (old_bucket && (!old_bucket->rte || !old_bucket->count))
{
ASSERT_DIE(!old_bucket->rte && !old_bucket->count);
HASH_REMOVE2(p->buckets, AGGR_BUCK, p->p.pool, old_bucket);
sl_free(old_bucket);
}
}
static int
aggregator_preexport(struct channel *C, struct rte *new)
{
struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, C->proto);
/* Reject our own routes */
if (new->sender == p->dst->in_req.hook)
return -1;
/* Disallow aggregating already aggregated routes */
if (ea_get_int(new->attrs, &ea_gen_source, 0) == RTS_AGGREGATED)
{
log(L_ERR "Multiple aggregations of the same route not supported.");
return -1;
}
return 0;
}
static void
aggregator_postconfig(struct proto_config *CF)
{
struct aggregator_config *cf = SKIP_BACK(struct aggregator_config, c, CF);
if (!cf->dst->table)
cf_error("Source table not specified");
if (!cf->src->table)
cf_error("Destination table not specified");
if (cf->dst->table->addr_type != cf->src->table->addr_type)
cf_error("Both tables must be of the same type");
cf->dst->in_filter = cf->src->in_filter;
cf->src->in_filter = FILTER_REJECT;
cf->dst->out_filter = FILTER_REJECT;
cf->dst->debug = cf->src->debug;
}
static struct proto *
aggregator_init(struct proto_config *CF)
{
struct proto *P = proto_new(CF);
struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P);
struct aggregator_config *cf = SKIP_BACK(struct aggregator_config, c, CF);
proto_configure_channel(P, &p->src, cf->src);
proto_configure_channel(P, &p->dst, cf->dst);
p->aggr_on_count = cf->aggr_on_count;
p->aggr_on = cf->aggr_on;
p->premerge = cf->premerge;
p->merge_by = cf->merge_by;
P->rt_notify = aggregator_rt_notify;
P->preexport = aggregator_preexport;
return P;
}
static int
aggregator_start(struct proto *P)
{
struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P);
p->bucket_slab = sl_new(P->pool, sizeof(struct aggregator_bucket) + AGGR_DATA_MEMSIZE);
HASH_INIT(p->buckets, P->pool, AGGR_BUCK_ORDER);
p->route_slab = sl_new(P->pool, sizeof(struct aggregator_route));
HASH_INIT(p->routes, P->pool, AGGR_RTE_ORDER);
p->reload_buckets = (event) {
.hook = aggregator_reload_buckets,
.data = p,
};
return PS_UP;
}
static int
aggregator_shutdown(struct proto *P)
{
struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P);
HASH_WALK_DELSAFE(p->buckets, next_hash, b)
{
for (struct aggregator_route *arte; arte = b->rte; )
{
b->rte = arte->next_rte;
b->count--;
HASH_REMOVE(p->routes, AGGR_RTE, arte);
rta_free(arte->rte.attrs);
sl_free(arte);
}
ASSERT_DIE(b->count == 0);
HASH_REMOVE(p->buckets, AGGR_BUCK, b);
sl_free(b);
}
HASH_WALK_END;
return PS_DOWN;
}
static int
aggregator_reconfigure(struct proto *P, struct proto_config *CF)
{
struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P);
struct aggregator_config *cf = SKIP_BACK(struct aggregator_config, c, CF);
TRACE(D_EVENTS, "Reconfiguring");
/* Compare numeric values (shortcut) */
if (cf->aggr_on_count != p->aggr_on_count)
return 0;
/* Compare aggregator rule */
if (!f_same(cf->aggr_on, p->aggr_on) || !f_same(cf->premerge, p->premerge))
return 0;
/* Compare merge filter */
if (!f_same(cf->merge_by, p->merge_by))
ev_schedule(&p->reload_buckets);
p->aggr_on = cf->aggr_on;
p->premerge = cf->premerge;
p->merge_by = cf->merge_by;
return 1;
}
struct protocol proto_aggregator = {
.name = "Aggregator",
.template = "aggregator%d",
.preference = 1,
.channel_mask = NB_ANY,
.proto_size = sizeof(struct aggregator_proto),
.config_size = sizeof(struct aggregator_config),
.startup = PROTOCOL_STARTUP_CONNECTOR,
.postconfig = aggregator_postconfig,
.init = aggregator_init,
.start = aggregator_start,
.shutdown = aggregator_shutdown,
.reconfigure = aggregator_reconfigure,
};
void
aggregator_build(void)
{
proto_build(&proto_aggregator);
}

View File

@ -0,0 +1,69 @@
/*
* BIRD -- Aggregator Pseudoprotocol
*
* (c) 2023 Igor Putovny <igor.putovny@nic.cz>
* (c) 2023 Maria Matejka <mq@ucw.cz>
* (c) 2023 CZ.NIC z.s.p.o.
*
* Can be freely distributed and used under the terms of the GNU GPL.
*
* This file contains the data structures used by Babel.
*/
#ifndef _BIRD_AGGREGATOR_H_
#define _BIRD_AGGREGATOR_H_
#include "nest/bird.h"
#include "nest/protocol.h"
#include "lib/hash.h"
struct aggregator_config {
struct proto_config c;
struct channel_config *src, *dst;
const struct f_line *aggr_on;
const struct f_line *premerge;
const struct f_line *merge_by;
uint aggr_on_count;
u8 aggr_on_net;
};
struct aggregator_route {
struct aggregator_route *next_hash;
struct aggregator_route *next_rte;
struct aggregator_bucket *bucket;
struct rte rte;
};
struct aggregator_bucket {
struct aggregator_bucket *next_hash;
struct aggregator_route *rte; /* Pointer to struct aggregator_route.rte */
struct rte_src *last_src; /* Which src we announced the bucket last with */
u32 count;
u32 hash;
struct f_val aggr_data[0];
};
struct aggregator_proto {
struct proto p;
struct channel *src, *dst;
/* Buckets by aggregator rule */
HASH(struct aggregator_bucket) buckets;
slab *bucket_slab;
/* Routes by net and src */
HASH(struct aggregator_route) routes;
slab *route_slab;
/* Aggregator rule */
const struct f_line *aggr_on;
uint aggr_on_count;
u8 aggr_on_net;
/* Merge filter */
const struct f_line *premerge;
const struct f_line *merge_by;
event reload_buckets;
};
#endif

142
proto/aggregator/config.Y Normal file
View File

@ -0,0 +1,142 @@
/*
* BIRD -- Aggregator configuration
*
* (c) 2023 Igor Putovny <igor.putovny@nic.cz>
* (c) 2023 Maria Matejka <mq@ucw.cz>
* (c) 2023 CZ.NIC z.s.p.o.
*
* Can be freely distributed and used under the terms of the GNU GPL.
*/
CF_HDR
#include "proto/aggregator/aggregator.h"
CF_DEFINES
#define AGGREGATOR_CFG ((struct aggregator_config *) this_proto)
#define AGGR_ITEM_ALLOC ((struct aggr_item_node *) cfg_allocz(sizeof(struct aggr_item_node)))
CF_DECLS
CF_KEYWORDS(AGGREGATOR, AGGREGATE, ON, MERGE, BY)
%type <xp> aggr_item aggr_list
CF_GRAMMAR
proto: aggregator_proto ;
aggregator_proto_start: proto_start AGGREGATOR
{
this_proto = proto_config_new(&proto_aggregator, $1);
this_channel = AGGREGATOR_CFG->src = channel_config_new(NULL, "source", 0, this_proto);
AGGREGATOR_CFG->dst = channel_config_new(NULL, "destination", 0, this_proto);
AGGREGATOR_CFG->src->ra_mode = AGGREGATOR_CFG->dst->ra_mode = RA_ANY;
};
aggregator_proto_item:
proto_item
| channel_item_
| PEER TABLE rtable { AGGREGATOR_CFG->dst->table = $3; }
| AGGREGATE ON {
if (AGGREGATOR_CFG->aggr_on)
cf_error("Only one aggregate on clause allowed");
cf_enter_filters();
cf_push_block_scope(new_config);
} aggr_list {
int count = new_config->current_scope->slots;
cf_pop_block_scope(new_config);
cf_exit_filters();
if (!AGGREGATOR_CFG->aggr_on_net)
cf_error("aggregate on must be always include 'net'.");
AGGREGATOR_CFG->aggr_on_count = count;
AGGREGATOR_CFG->aggr_on = f_linearize($4.begin, count);
struct f_line *premerge = f_linearize($4.end, 0);
premerge->args = count;
AGGREGATOR_CFG->premerge = premerge;
}
| MERGE BY {
cf_enter_filters();
cf_push_block_scope(new_config);
f_predefined_variable(new_config, "routes", T_ROUTES_BLOCK);
} function_body {
cf_pop_block_scope(new_config);
cf_exit_filters();
$4->args++;
AGGREGATOR_CFG->merge_by = $4;
}
;
aggregator_proto_opts: /* empty */ | aggregator_proto_opts aggregator_proto_item ';' ;
aggregator_proto: aggregator_proto_start proto_name '{' aggregator_proto_opts '}' ;
aggr_list:
aggr_item
| aggr_list ',' aggr_item {
if ($$.begin = $3.begin)
$$.begin->next = $1.begin;
else
$$.begin = $1.begin;
if ($$.end = $3.end)
$$.end->next = $1.end;
else
$$.end = $1.end;
}
;
aggr_item:
'(' term ')' {
switch ($2->type) {
case T_INT:
case T_BOOL:
case T_PAIR:
case T_QUAD:
case T_ENUM:
case T_IP:
case T_EC:
case T_LC:
case T_RD:
/* Fits, OK */
break;
default:
cf_error("Expression evaluated to type %s unsupported by aggregator. Store this value as a custom attribute instead", f_type_name($2->type));
}
$$.begin = $2;
$$.end = NULL;
f_new_var(new_config->current_scope);
}
| lvalue {
$$.begin = f_lval_getter(&$1);
int vari = f_new_var(new_config->current_scope);
if ($1.type == F_LVAL_SA && $1.sa.sa_code == SA_NET)
AGGREGATOR_CFG->aggr_on_net = 1;
if (($1.type == F_LVAL_CONSTANT) ||
($1.type == F_LVAL_SA && $1.sa.readonly))
$$.end = NULL;
else
{
char varname[12];
bsnprintf(varname, 12, "!aggr%d", vari);
$$.end = f_lval_setter(&$1,
f_new_inst(FI_VAR_GET, cf_define_symbol(
new_config, cf_get_symbol(new_config, varname),
SYM_VARIABLE | $$.begin->type, offset, vari
)));
}
}
;
CF_CODE
CF_END

116
proto/aggregator/test.conf Normal file
View File

@ -0,0 +1,116 @@
log "bird.log" all;
protocol device {}
protocol static {
ipv6;
route 2001:db8:0::/48 unreachable { bgp_path.prepend(65432); bgp_path.prepend(4200000000); };
route 2001:db8:1::/48 unreachable;
route 2001:db8:2::/48 unreachable;
route 2001:db8:3::/48 unreachable;
route 2001:db8:4::/48 unreachable;
route 2001:db8:5::/48 unreachable;
route 2001:db8:6::/48 unreachable;
route 2001:db8:7::/48 unreachable;
route 2001:db8:8::/48 unreachable;
route 2001:db8:9::/48 unreachable;
route 2001:db8:a::/48 unreachable;
route 2001:db8:b::/48 unreachable;
route 2001:db8:c::/48 unreachable;
route 2001:db8:d::/48 unreachable;
route 2001:db8:e::/48 unreachable;
route 2001:db8:f::/48 unreachable;
}
protocol static {
ipv6 {
import filter {
bgp_med = 1;
bgp_community = -empty-.add((65533,1)).add((65500,0xe));
accept;
};
};
route 2001:db8:1::/48 unreachable;
route 2001:db8:3::/48 unreachable;
route 2001:db8:5::/48 unreachable;
route 2001:db8:7::/48 unreachable;
route 2001:db8:9::/48 unreachable;
route 2001:db8:b::/48 unreachable;
route 2001:db8:d::/48 unreachable;
route 2001:db8:f::/48 unreachable;
}
protocol static {
ipv6 {
import filter {
bgp_med = 2;
bgp_community = -empty-.add((65533,2)).add((65500,0xd));
accept;
};
};
route 2001:db8:2::/48 unreachable;
route 2001:db8:3::/48 unreachable;
route 2001:db8:6::/48 unreachable;
route 2001:db8:7::/48 unreachable;
route 2001:db8:a::/48 unreachable;
route 2001:db8:b::/48 unreachable;
route 2001:db8:e::/48 unreachable;
route 2001:db8:f::/48 unreachable;
}
protocol static {
ipv6 {
import filter {
bgp_med = 4;
bgp_community = -empty-.add((65533,4)).add((65500,0xb));
accept;
};
};
route 2001:db8:4::/48 unreachable;
route 2001:db8:5::/48 unreachable;
route 2001:db8:6::/48 unreachable;
route 2001:db8:7::/48 unreachable;
route 2001:db8:c::/48 unreachable;
route 2001:db8:d::/48 unreachable;
route 2001:db8:e::/48 unreachable;
route 2001:db8:f::/48 unreachable;
}
protocol static {
ipv6 {
import filter {
bgp_med = 8;
bgp_community = -empty-.add((65533,8)).add((65500,0x7));
accept;
};
};
route 2001:db8:8::/48 unreachable;
route 2001:db8:9::/48 unreachable;
route 2001:db8:a::/48 unreachable;
route 2001:db8:b::/48 unreachable;
route 2001:db8:c::/48 unreachable;
route 2001:db8:d::/48 unreachable;
route 2001:db8:e::/48 unreachable;
route 2001:db8:f::/48 unreachable;
}
ipv6 table agr_result;
protocol aggregator {
table master6;
peer table agr_result;
export all;
aggregate on net,(defined(bgp_med)), (1 + 3 + 5 + 7), preference, dest;
merge by {
print "Merging all these: ", routes;
bgp_med = 0;
for route r in routes do {
if ! defined(r.bgp_med) then { unset(bgp_med); accept; }
print r, " bgp_med: ", r.bgp_med;
bgp_med = bgp_med + r.bgp_med;
bgp_community = bgp_community.add(r.bgp_community);
}
accept;
};
}