0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2025-01-03 07:31:54 +00:00

Aggregator: Expressed most of the attribute logic in filter language

This commit is contained in:
Maria Matejka 2023-10-31 11:37:54 +01:00
parent f42c118aa7
commit c103b51fca
4 changed files with 101 additions and 453 deletions

View File

@ -46,85 +46,6 @@
#include "aggregator.h"
#include <stdlib.h>
/*
#include "nest/route.h"
#include "nest/iface.h"
#include "lib/resource.h"
#include "lib/event.h"
#include "lib/timer.h"
#include "lib/string.h"
#include "conf/conf.h"
#include "filter/filter.h"
#include "filter/data.h"
#include "lib/hash.h"
#include "lib/string.h"
#include "lib/alloca.h"
#include "lib/flowspec.h"
*/
/*
* Set static attribute in @rta from static attribute in @old according to @sa.
*/
static void
rta_set_static_attr(struct rta *rta, const struct rta *old, struct f_static_attr sa)
{
switch (sa.sa_code)
{
case SA_NET:
break;
case SA_FROM:
rta->from = old->from;
break;
case SA_GW:
rta->dest = RTD_UNICAST;
rta->nh.gw = old->nh.gw;
rta->nh.iface = old->nh.iface;
rta->nh.next = NULL;
rta->hostentry = NULL;
rta->nh.labels = 0;
break;
case SA_SCOPE:
rta->scope = old->scope;
break;
case SA_DEST:
rta->dest = old->dest;
rta->nh.gw = IPA_NONE;
rta->nh.iface = NULL;
rta->nh.next = NULL;
rta->hostentry = NULL;
rta->nh.labels = 0;
break;
case SA_IFNAME:
rta->dest = RTD_UNICAST;
rta->nh.gw = IPA_NONE;
rta->nh.iface = old->nh.iface;
rta->nh.next = NULL;
rta->hostentry = NULL;
rta->nh.labels = 0;
break;
case SA_GW_MPLS:
rta->nh.labels = old->nh.labels;
memcpy(&rta->nh.label, &old->nh.label, sizeof(u32) * old->nh.labels);
break;
case SA_WEIGHT:
rta->nh.weight = old->nh.weight;
break;
case SA_PREF:
rta->pref = old->pref;
break;
default:
bug("Invalid static attribute access (%u/%u)", sa.f_type, sa.sa_code);
}
}
/*
* Compare list of &f_val entries.
@ -158,35 +79,23 @@ aggregator_bucket_update(struct aggregator_proto *p, struct aggregator_bucket *b
return;
}
/* Allocate RTA and EA list */
/* Store TMP linpool state */
struct lp_state tmp_state;
lp_save(tmp_linpool, &tmp_state);
/* Allocate RTA */
struct rta *rta = allocz(rta_size(bucket->rte->attrs));
rta->dest = RTD_UNREACHABLE;
rta->source = RTS_AGGREGATED;
rta->scope = SCOPE_UNIVERSE;
struct ea_list *eal = allocz(sizeof(struct ea_list) + sizeof(struct eattr) * p->aggr_on_da_count);
eal->next = NULL;
eal->count = 0;
rta->eattrs = eal;
/* Seed the attributes from aggregator rule */
for (uint i = 0; i < p->aggr_on_count; i++)
{
if (p->aggr_on[i].type == AGGR_ITEM_DYNAMIC_ATTR)
{
u32 ea_code = p->aggr_on[i].da.ea_code;
const struct eattr *e = ea_find(bucket->rte->attrs->eattrs, ea_code);
if (e)
eal->attrs[eal->count++] = *e;
}
else if (p->aggr_on[i].type == AGGR_ITEM_STATIC_ATTR)
rta_set_static_attr(rta, bucket->rte->attrs, p->aggr_on[i].sa);
}
/* Allocate route */
struct rte *new = rte_get_temp(rta, bucket->rte->src);
new->net = net;
/* Seed the attributes from aggregator rule */
f_eval_rte(p->premerge, &new, tmp_linpool, p->aggr_on_count, bucket->aggr_data, 0, NULL);
/*
log("=============== CREATE MERGED ROUTE ===============");
log("New route created: id = %d, protocol: %s", new->src->global_id, new->src->proto->name);
@ -199,11 +108,8 @@ aggregator_bucket_update(struct aggregator_proto *p, struct aggregator_bucket *b
.val.rte = bucket->rte,
};
struct lp_state tmp_state;
lp_save(tmp_linpool, &tmp_state);
/* Actually run the filter */
enum filter_return fret = f_eval_rte(p->merge_by, &new, tmp_linpool, 1, &val, 0);
/* Actually run the merge rule */
enum filter_return fret = f_eval_rte(p->merge_by, &new, tmp_linpool, 1, &val, 0, NULL);
/* Src must be stored now, rte_update2() may return new */
struct rte_src *new_src = new ? new->src : NULL;
@ -257,136 +163,6 @@ aggregator_reload_buckets(void *data)
HASH_WALK_END;
}
/*
* Evaluate static attribute of @rt1 according to @sa
* and store result in @pos.
*/
static void
eval_static_attr(const struct rte *rt1, struct f_static_attr sa, struct f_val *pos)
{
const struct rta *rta = rt1->attrs;
#define RESULT(_type, value, result) \
do { \
pos->type = _type; \
pos->val.value = result; \
} while (0)
switch (sa.sa_code)
{
case SA_NET: RESULT(sa.f_type, net, rt1->net->n.addr); break;
case SA_FROM: RESULT(sa.f_type, ip, rta->from); break;
case SA_GW: RESULT(sa.f_type, ip, rta->nh.gw); break;
case SA_PROTO: RESULT(sa.f_type, s, rt1->src->proto->name); break;
case SA_SOURCE: RESULT(sa.f_type, i, rta->source); break;
case SA_SCOPE: RESULT(sa.f_type, i, rta->scope); break;
case SA_DEST: RESULT(sa.f_type, i, rta->dest); break;
case SA_IFNAME: RESULT(sa.f_type, s, rta->nh.iface ? rta->nh.iface->name : ""); break;
case SA_IFINDEX: RESULT(sa.f_type, i, rta->nh.iface ? rta->nh.iface->index : 0); break;
case SA_WEIGHT: RESULT(sa.f_type, i, rta->nh.weight + 1); break;
case SA_PREF: RESULT(sa.f_type, i, rta->pref); break;
case SA_GW_MPLS: RESULT(sa.f_type, i, rta->nh.labels ? rta->nh.label[0] : MPLS_NULL); break;
default:
bug("Invalid static attribute access (%u/%u)", sa.f_type, sa.sa_code);
}
#undef RESULT
}
/*
* Evaluate dynamic attribute of @rt1 according to @da
* and store result in @pos.
*/
static void
eval_dynamic_attr(const struct rte *rt1, struct f_dynamic_attr da, struct f_val *pos)
{
const struct rta *rta = rt1->attrs;
const struct eattr *e = ea_find(rta->eattrs, da.ea_code);
#define RESULT(_type, value, result) \
do { \
pos->type = _type; \
pos->val.value = result; \
} while (0)
#define RESULT_VOID \
do { \
pos->type = T_VOID; \
} while (0)
if (!e)
{
/* A special case: undefined as_path looks like empty as_path */
if (da.type == EAF_TYPE_AS_PATH)
{
RESULT(T_PATH, ad, &null_adata);
return;
}
/* The same special case for int_set */
if (da.type == EAF_TYPE_INT_SET)
{
RESULT(T_CLIST, ad, &null_adata);
return;
}
/* The same special case for ec_set */
if (da.type == EAF_TYPE_EC_SET)
{
RESULT(T_ECLIST, ad, &null_adata);
return;
}
/* The same special case for lc_set */
if (da.type == EAF_TYPE_LC_SET)
{
RESULT(T_LCLIST, ad, &null_adata);
return;
}
/* Undefined value */
RESULT_VOID;
return;
}
switch (e->type & EAF_TYPE_MASK)
{
case EAF_TYPE_INT:
RESULT(da.f_type, i, e->u.data);
break;
case EAF_TYPE_ROUTER_ID:
RESULT(T_QUAD, i, e->u.data);
break;
case EAF_TYPE_OPAQUE:
RESULT(T_ENUM_EMPTY, i, 0);
break;
case EAF_TYPE_IP_ADDRESS:
RESULT(T_IP, ip, *((ip_addr *) e->u.ptr->data));
break;
case EAF_TYPE_AS_PATH:
RESULT(T_PATH, ad, e->u.ptr);
break;
case EAF_TYPE_BITFIELD:
RESULT(T_BOOL, i, !!(e->u.data & (1u << da.bit)));
break;
case EAF_TYPE_INT_SET:
RESULT(T_CLIST, ad, e->u.ptr);
break;
case EAF_TYPE_EC_SET:
RESULT(T_ECLIST, ad, e->u.ptr);
break;
case EAF_TYPE_LC_SET:
RESULT(T_LCLIST, ad, e->u.ptr);
break;
default:
bug("Unknown dynamic attribute type");
}
#undef RESULT
#undef RESULT_VOID
}
static inline u32 aggr_route_hash(const rte *e)
{
struct {
@ -453,17 +229,8 @@ aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new
struct lp_state tmp_state;
lp_save(tmp_linpool, &tmp_state);
for (uint val_idx = 0; val_idx < p->aggr_on_count; val_idx++)
{
int type = p->aggr_on[val_idx].type;
struct f_val *pos = &tmp_bucket->aggr_data[val_idx];
switch (type)
{
case AGGR_ITEM_TERM: {
const struct f_line *line = p->aggr_on[val_idx].line;
struct rte *rt1 = new;
enum filter_return fret = f_eval_rte(line, &new, tmp_linpool, 0, NULL, pos);
enum filter_return fret = f_eval_rte(p->aggr_on, &new, tmp_linpool, 0, NULL, p->aggr_on_count, tmp_bucket->aggr_data);
if (rt1 != new)
{
@ -471,107 +238,20 @@ aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new
log(L_WARN "Aggregator rule modifies the route, reverting");
}
/* Check filter return value */
if (fret > F_RETURN)
log(L_WARN "%s.%s: Wrong number of items left on stack after evaluation of aggregation list", rt1->src->proto->name, rt1->sender->name);
{
sl_free(tmp_bucket);
lp_restore(tmp_linpool, &tmp_state);
switch (pos->type) {
case T_VOID:
case T_INT:
case T_BOOL:
case T_PAIR:
case T_QUAD:
case T_ENUM:
case T_IP:
case T_EC:
case T_LC:
case T_RD:
/* Fits, OK */
break;
default:
log(L_WARN "%s.%s: Expression evaluated to type %s unsupported by aggregator. Store this value as a custom attribute instead", new->src->proto->name, new->sender->name, f_type_name(pos->type));
*pos = (struct f_val) { .type = T_INT, .val.i = 0 };
}
break;
}
case AGGR_ITEM_STATIC_ATTR: {
eval_static_attr(new, p->aggr_on[val_idx].sa, pos);
break;
}
case AGGR_ITEM_DYNAMIC_ATTR: {
eval_dynamic_attr(new, p->aggr_on[val_idx].da, pos);
break;
}
default:
break;
}
return;
}
/* Compute the hash */
u64 haux;
mem_hash_init(&haux);
for (uint i = 0; i < p->aggr_on_count; i++)
{
mem_hash_mix_num(&haux, tmp_bucket->aggr_data[i].type);
#define MX(k) mem_hash_mix(&haux, &IT(k), sizeof IT(k));
#define IT(k) tmp_bucket->aggr_data[i].val.k
switch (tmp_bucket->aggr_data[i].type)
{
case T_VOID:
break;
case T_INT:
case T_BOOL:
case T_PAIR:
case T_QUAD:
case T_ENUM:
MX(i);
break;
case T_EC:
case T_RD:
MX(ec);
break;
case T_LC:
MX(lc);
break;
case T_IP:
MX(ip);
break;
case T_NET:
mem_hash_mix_num(&haux, net_hash(IT(net)));
break;
case T_STRING:
mem_hash_mix_str(&haux, IT(s));
break;
case T_PATH_MASK:
mem_hash_mix(&haux, IT(path_mask), sizeof(*IT(path_mask)) + IT(path_mask)->len * sizeof (IT(path_mask)->item));
break;
case T_PATH:
case T_CLIST:
case T_ECLIST:
case T_LCLIST:
case T_BYTESTRING:
mem_hash_mix(&haux, IT(ad)->data, IT(ad)->length);
break;
case T_NONE:
case T_PATH_MASK_ITEM:
case T_ROUTE:
case T_ROUTES_BLOCK:
bug("Invalid type %s in hashing", f_type_name(tmp_bucket->aggr_data[i].type));
case T_SET:
MX(t);
break;
case T_PREFIX_SET:
MX(ti);
break;
}
}
mem_hash_mix_f_val(&haux, &tmp_bucket->aggr_data[i]);
tmp_bucket->hash = mem_hash_value(&haux);
/* Find the existing bucket */
@ -686,8 +366,8 @@ aggregator_init(struct proto_config *CF)
proto_configure_channel(P, &p->dst, cf->dst);
p->aggr_on_count = cf->aggr_on_count;
p->aggr_on_da_count = cf->aggr_on_da_count;
p->aggr_on = cf->aggr_on;
p->premerge = cf->premerge;
p->merge_by = cf->merge_by;
P->rt_notify = aggregator_rt_notify;
@ -753,34 +433,16 @@ aggregator_reconfigure(struct proto *P, struct proto_config *CF)
if (cf->aggr_on_count != p->aggr_on_count)
return 0;
if (cf->aggr_on_da_count != p->aggr_on_da_count)
return 0;
/* Compare aggregator rule */
for (uint i = 0; i < p->aggr_on_count; i++)
switch (cf->aggr_on[i].type)
{
case AGGR_ITEM_TERM:
if (!f_same(cf->aggr_on[i].line, p->aggr_on[i].line))
if (!f_same(cf->aggr_on, p->aggr_on) || !f_same(cf->premerge, p->premerge))
return 0;
break;
case AGGR_ITEM_STATIC_ATTR:
if (memcmp(&cf->aggr_on[i].sa, &p->aggr_on[i].sa, sizeof(struct f_static_attr)) != 0)
return 0;
break;
case AGGR_ITEM_DYNAMIC_ATTR:
if (memcmp(&cf->aggr_on[i].da, &p->aggr_on[i].da, sizeof(struct f_dynamic_attr)) != 0)
return 0;
break;
default:
bug("Broken aggregator rule");
}
/* Compare merge filter */
if (!f_same(cf->merge_by, p->merge_by))
ev_schedule(&p->reload_buckets);
p->aggr_on = cf->aggr_on;
p->premerge = cf->premerge;
p->merge_by = cf->merge_by;
return 1;

View File

@ -20,10 +20,11 @@
struct aggregator_config {
struct proto_config c;
struct channel_config *src, *dst;
uint aggr_on_count;
uint aggr_on_da_count;
struct aggr_item *aggr_on;
const struct f_line *aggr_on;
const struct f_line *premerge;
const struct f_line *merge_by;
uint aggr_on_count;
u8 aggr_on_net;
};
struct aggregator_route {
@ -54,33 +55,14 @@ struct aggregator_proto {
slab *route_slab;
/* Aggregator rule */
const struct f_line *aggr_on;
uint aggr_on_count;
uint aggr_on_da_count;
struct aggr_item *aggr_on;
u8 aggr_on_net;
/* Merge filter */
const struct f_line *premerge;
const struct f_line *merge_by;
event reload_buckets;
};
enum aggr_item_type {
AGGR_ITEM_TERM,
AGGR_ITEM_STATIC_ATTR,
AGGR_ITEM_DYNAMIC_ATTR,
};
struct aggr_item {
enum aggr_item_type type;
union {
struct f_static_attr sa;
struct f_dynamic_attr da;
const struct f_line *line;
};
};
struct aggr_item_node {
const struct aggr_item_node *next;
struct aggr_item i;
};
#endif

View File

@ -17,12 +17,11 @@ CF_DEFINES
#define AGGREGATOR_CFG ((struct aggregator_config *) this_proto)
#define AGGR_ITEM_ALLOC ((struct aggr_item_node *) cfg_allocz(sizeof(struct aggr_item_node)))
CF_DECLS
CF_KEYWORDS(AGGREGATOR, AGGREGATE, ON, MERGE, BY)
%type <ai> aggr_item aggr_list
%type <xp> aggr_item aggr_list
CF_GRAMMAR
@ -41,42 +40,31 @@ aggregator_proto_item:
proto_item
| channel_item_
| PEER TABLE rtable { AGGREGATOR_CFG->dst->table = $3; }
| AGGREGATE ON aggr_list {
| AGGREGATE ON {
if (AGGREGATOR_CFG->aggr_on)
cf_error("Only one aggregate on clause allowed");
_Bool net_present = 0;
int count = 0;
cf_push_block_scope(new_config);
} aggr_list {
int count = new_config->current_scope->slots;
cf_pop_block_scope(new_config);
for (const struct aggr_item_node *item = $3; item; item = item->next) {
// log(L_WARN "type %d sacode %d", item->i.type, item->i.sa.sa_code);
if (item->i.type == AGGR_ITEM_STATIC_ATTR && item->i.sa.sa_code == SA_NET)
net_present = 1;
if (!AGGREGATOR_CFG->aggr_on_net)
cf_error("aggregate on must be always include 'net'.");
count++;
}
AGGREGATOR_CFG->aggr_on_count = count;
AGGREGATOR_CFG->aggr_on = f_linearize($4.begin, count);
if (!net_present)
cf_error("'NET' must be present");
AGGREGATOR_CFG->aggr_on = cfg_alloc(sizeof(struct aggr_item) * count);
int pos = 0;
for (const struct aggr_item_node *item = $3; item; item = item->next) {
if (item->i.type == AGGR_ITEM_DYNAMIC_ATTR)
AGGREGATOR_CFG->aggr_on_da_count++;
AGGREGATOR_CFG->aggr_on[pos++] = item->i;
}
AGGREGATOR_CFG->aggr_on_count = pos;
struct f_line *premerge = f_linearize($4.end, 0);
premerge->args = count;
AGGREGATOR_CFG->premerge = premerge;
}
| MERGE BY {
cf_push_block_scope(new_config);
f_predefined_variable(new_config, "routes", T_ROUTES_BLOCK);
} function_body {
cf_pop_block_scope(new_config);
$4->args++; /* The predefined "routes" variable */
$4->args++;
AGGREGATOR_CFG->merge_by = $4;
}
;
@ -88,44 +76,60 @@ aggregator_proto: aggregator_proto_start proto_name '{' aggregator_proto_opts '}
aggr_list:
aggr_item
| aggr_list ',' aggr_item {
if ($3 == NULL) {
$$ = $1;
} else {
$$ = $3;
$$->next = $1;
}
if ($$.begin = $3.begin)
$$.begin->next = $1.begin;
else
$$.begin = $1.begin;
if ($$.end = $3.end)
$$.end->next = $1.end;
else
$$.end = $1.end;
}
;
aggr_item:
'(' term ')' {
$$ = AGGR_ITEM_ALLOC;
$$->i.type = AGGR_ITEM_TERM;
$$->i.line = f_linearize($2, 1);
}
| CF_SYM_KNOWN {
switch ($1->class) {
case SYM_ATTRIBUTE:
$$ = AGGR_ITEM_ALLOC;
$$->i.type = AGGR_ITEM_DYNAMIC_ATTR;
$$->i.da = *$1->attribute;
break;
case SYM_CONSTANT_RANGE:
$$ = NULL;
switch ($2->type) {
case T_INT:
case T_BOOL:
case T_PAIR:
case T_QUAD:
case T_ENUM:
case T_IP:
case T_EC:
case T_LC:
case T_RD:
/* Fits, OK */
break;
default:
cf_error("Can't aggregate on symbol type %s.", cf_symbol_class_name($1));
cf_error("Expression evaluated to type %s unsupported by aggregator. Store this value as a custom attribute instead", f_type_name($2->type));
}
$$.begin = $2;
$$.end = NULL;
f_new_var(new_config->current_scope);
}
| dynamic_attr {
$$ = AGGR_ITEM_ALLOC;
$$->i.type = AGGR_ITEM_DYNAMIC_ATTR;
$$->i.da = $1;
| lvalue {
$$.begin = f_lval_getter(&$1);
int vari = f_new_var(new_config->current_scope);
if ($1.type == F_LVAL_SA && $1.sa.sa_code == SA_NET)
AGGREGATOR_CFG->aggr_on_net = 1;
if (($1.type == F_LVAL_CONSTANT) ||
($1.type == F_LVAL_SA && $1.sa.readonly))
$$.end = NULL;
else
{
char varname[12];
bsnprintf(varname, 12, "!aggr%d", vari);
$$.end = f_lval_setter(&$1,
f_new_inst(FI_VAR_GET, cf_define_symbol(
new_config, cf_get_symbol(new_config, varname),
SYM_VARIABLE | $$.begin->type, offset, vari
)));
}
| static_attr {
$$ = AGGR_ITEM_ALLOC;
$$->i.type = AGGR_ITEM_STATIC_ATTR;
$$->i.sa = $1;
}
;

View File

@ -100,7 +100,7 @@ protocol aggregator {
table master6;
peer table agr_result;
export all;
aggregate on net,(defined(bgp_med));
aggregate on net,(defined(bgp_med)), (1 + 3 + 5 + 7), preference, dest;
merge by {
print "Merging all these: ", routes;
bgp_med = 0;