diff --git a/proto/aggregator/aggregator.c b/proto/aggregator/aggregator.c index 66c60166..0e5c7283 100644 --- a/proto/aggregator/aggregator.c +++ b/proto/aggregator/aggregator.c @@ -46,6 +46,7 @@ #include "proto/aggregator/aggregator.h" #include +#include /* #include "nest/route.h" #include "nest/iface.h" @@ -64,6 +65,430 @@ extern linpool *rte_update_pool; +static inline int +is_leaf(const struct trie_node *node) +{ + assert(node != NULL); + return !node->child[0] && !node->child[1]; +} + +static struct trie_node * +new_node(slab *trie_slab) +{ + struct trie_node *new = sl_alloc(trie_slab); + assert(new != NULL); + + *new = (struct trie_node) { + .parent = NULL, + .child = { NULL, NULL }, + .potential_buckets_count = 0, + }; + + return new; +} + +/* + * Mark appropriate child of parent node as NULL and free @node + */ +static void +remove_node(struct trie_node *node) +{ + assert(node != NULL); + assert(node->parent != NULL); + assert(node->child[0] == NULL && node->child[1] == NULL); + + if (node->parent->child[0] == node) + node->parent->child[0] = NULL; + else if (node->parent->child[1] == node) + node->parent->child[1] = NULL; + else + bug("Invalid child pointer"); + + sl_free(node); +} + +static void +trie_init(struct aggregator_proto *p) +{ + static int inits = 0; + p->trie_slab = sl_new(p->p.pool, sizeof(struct trie_node)); + p->root = new_node(p->trie_slab); + inits++; + log("Trie inits: %d", inits); +} + +/* + * Insert prefix in @addr to prefix trie with root at @node + */ +static void +trie_insert_prefix(const union net_addr_union *addr, struct aggregator_bucket *bucket, struct trie_node *node, slab *trie_slab) +{ + assert(addr != NULL); + assert(node != NULL); + + if (addr->n.type != NET_IP4) + return; + + const struct net_addr_ip4 * const ip4 = &addr->ip4; + + for (u32 i = 0; i < ip4->pxlen; i++) + { + u32 bit = (ip4->prefix.addr >> (31 - i)) & 1; + + if (!node->child[bit]) + { + struct trie_node *new = new_node(trie_slab); + new->parent = node; + node->child[bit] = new; + } + + node = node->child[bit]; + + if ((int)i == ip4->pxlen - 1) + node->bucket = bucket; + } +} + +static void +first_pass(struct trie_node *node, slab *trie_slab) +{ + assert(node != NULL); + + if (is_leaf(node)) + return; + + for (int i = 0; i < 2; i++) + { + if (!node->child[i]) + { + node->child[i] = new_node(trie_slab); + + *node->child[i] = (struct trie_node) { + .parent = node, + .child[0] = NULL, + .child[1] = NULL, + .bucket = node->parent ? node->parent->bucket : NULL, + .potential_buckets_count = 0, + }; + } + } + + first_pass(node->child[0], trie_slab); + first_pass(node->child[1], trie_slab); +} + +static int +aggregator_bucket_compare(const void *a, const void *b) +{ + if (a == NULL && b == NULL) + return 0; + + if (a == NULL) + return -1; + + if (b == NULL) + return 1; + + assert(a != NULL); + assert(b != NULL); + + const struct aggregator_bucket *fst = *(struct aggregator_bucket **)a; + const struct aggregator_bucket *snd = *(struct aggregator_bucket **)b; + + if (fst < snd) + return -1; + + if (fst > snd) + return 1; + + return 0; +} + +static void +aggregator_bucket_intersection(struct trie_node *node, const struct trie_node *left, const struct trie_node *right) +{ + assert(node != NULL); + assert(left != NULL); + assert(right != NULL); + + int i = 0; + int j = 0; + + while (i < left->potential_buckets_count && j < right->potential_buckets_count) + { + if (node->potential_buckets_count >= MAX_POTENTIAL_NEXTHOP_COUNT) + return; + + int res = aggregator_bucket_compare(left->potential_buckets[i], right->potential_buckets[j]); + + if (res == 0) + { + node->potential_buckets[node->potential_buckets_count++] = left->potential_buckets[i]; + i++; + j++; + } + else if (res == -1) + node->potential_buckets[node->potential_buckets_count++] = left->potential_buckets[i++]; + else if (res == 1) + node->potential_buckets[node->potential_buckets_count++] = right->potential_buckets[j++]; + } +} + +static void +aggregator_bucket_union(struct trie_node *node, const struct trie_node *left, const struct trie_node *right) +{ + assert(node != NULL); + assert(left != NULL); + assert(right != NULL); + + int i = 0; + int j = 0; + + while (i < left->potential_buckets_count && j < right->potential_buckets_count) + { + if (node->potential_buckets_count >= MAX_POTENTIAL_NEXTHOP_COUNT) + return; + + int res = aggregator_bucket_compare(left->potential_buckets[i], right->potential_buckets[j]); + + if (res == 0) + { + node->potential_buckets[node->potential_buckets_count++] = left->potential_buckets[i]; + i++; + j++; + } + else if (res == -1) + node->potential_buckets[node->potential_buckets_count++] = left->potential_buckets[i++]; + else if (res == 1) + node->potential_buckets[node->potential_buckets_count++] = right->potential_buckets[j++]; + } + + while (i < left->potential_buckets_count) + { + if (node->potential_buckets_count >= MAX_POTENTIAL_NEXTHOP_COUNT) + return; + + node->potential_buckets[node->potential_buckets_count++] = left->potential_buckets[i++]; + } + + while (j < right->potential_buckets_count) + { + if (node->potential_buckets_count >= MAX_POTENTIAL_NEXTHOP_COUNT) + return; + + node->potential_buckets[node->potential_buckets_count++] = right->potential_buckets[j++]; + } +} + +/* + * Check if sets of potential buckets of two nodes are disjoint + */ +static int +bucket_sets_are_disjoint(const struct trie_node *left, const struct trie_node *right) +{ + assert(left != NULL); + assert(right != NULL); + + if (left->potential_buckets_count == 0 || right->potential_buckets_count == 0) + return 1; + + int i = 0; + int j = 0; + + while (i < left->potential_buckets_count && j < right->potential_buckets_count) + { + int res = aggregator_bucket_compare(left->potential_buckets[i], right->potential_buckets[j]); + + if (res == 0) + return 0; + else if (res == -1) + i++; + else if (res == 1) + j++; + } + + return 1; +} + +static void +second_pass(struct trie_node *node) +{ + assert(node != NULL); + + /* Potential nexthop is assigned to nexthop assigned during first pass */ + if (is_leaf(node)) + { + node->potential_buckets[node->potential_buckets_count++] = node->bucket; + return; + } + + struct trie_node * const left = node->child[0]; + struct trie_node * const right = node->child[1]; + + assert(left != NULL); + assert(right != NULL); + //assert(left->potential_buckets_count > 0); + //assert(right->potential_buckets_count > 0); + + second_pass(left); + second_pass(right); + + qsort(left->potential_buckets, left->potential_buckets_count, sizeof(struct aggregator_bucket *), aggregator_bucket_compare); + qsort(right->potential_buckets, right->potential_buckets_count, sizeof(struct aggregator_bucket *), aggregator_bucket_compare); + + if (bucket_sets_are_disjoint(left, right)) + aggregator_bucket_union(node, left, right); + else + aggregator_bucket_intersection(node, left, right); +} + +/* + * Check if @bucket is one of potential nexthop buckets in @node + */ +static int +bucket_is_present(const struct aggregator_bucket *bucket, const struct trie_node *node) +{ + for (int i = 0; i < node->potential_buckets_count; i++) + if (node->potential_buckets[i] == bucket) + return 1; + + return 0; +} + +static void +third_pass_helper(struct trie_node *node) +{ + if (!node) + return; + + assert(node->parent != NULL); + + if (node->parent->bucket == NULL || bucket_is_present(node->parent->bucket, node)) + node->bucket = NULL; + else + { + assert(node->potential_buckets_count > 0); + node->bucket = node->potential_buckets[0]; + } + + third_pass_helper(node->child[0]); + third_pass_helper(node->child[1]); + + /* Leaf node with unassigned nexthop is deleted */ + if (is_leaf(node) && node->bucket == NULL) + remove_node(node); +} + +static void +third_pass(struct trie_node *node) +{ + assert(node != NULL); + + if (!node) + return; + + /* Node is a root */ + if (!node->parent) + { + assert(node->child[0] != NULL); + assert(node->child[1] != NULL); + assert(node->potential_buckets_count > 0); + + if (node->potential_buckets_count > 0) + { + node->bucket = node->potential_buckets[0]; + third_pass_helper(node->child[0]); + third_pass_helper(node->child[1]); + } + } +} + +static void +get_trie_prefix_count_helper(const struct trie_node *node, int *count) +{ + if (is_leaf(node)) + { + *count += 1; + return; + } + + if (node->child[0]) + get_trie_prefix_count_helper(node->child[0], count); + + if (node->child[1]) + get_trie_prefix_count_helper(node->child[1], count); +} + +static int +get_trie_prefix_count(const struct trie_node *node) +{ + int count = 0; + get_trie_prefix_count_helper(node, &count); + + return count; +} + +static void +get_trie_depth_helper(const struct trie_node *node, int *result, int depth) +{ + if (is_leaf(node)) + { + if (depth > *result) + *result = depth; + + return; + } + + if (node->child[0]) + get_trie_depth_helper(node->child[0], result, depth + 1); + + if (node->child[1]) + get_trie_depth_helper(node->child[1], result, depth + 1); +} + +static int +get_trie_depth(const struct trie_node *node) +{ + int result = 0; + get_trie_depth_helper(node, &result, 0); + + return result; +} + +static void +extract_prefixes_helper(const struct trie_node *node, struct aggregated_prefixes * const prefixes, ip4_addr prefix, int depth) +{ + assert(node != NULL); + assert(prefixes != NULL); + + log("extracting: %I4", _I(prefix)); + + if (is_leaf(node)) + { + // assert(node->bucket != NULL); + assert(prefixes->count < prefixes->capacity); + + prefixes->prefix_buckets[prefixes->count++] = (struct prefix_bucket) { + .trie_prefix = NET_ADDR_IP4(_I(prefix), depth), + .bucket = node->bucket ? node->bucket : NULL, + }; + + return; + } + + if (node->child[0]) + extract_prefixes_helper(node->child[0], prefixes, _MI4(_I(prefix) | (0 << (31 - depth))), depth + 1); + + if (node->child[1]) + extract_prefixes_helper(node->child[1], prefixes, _MI4(_I(prefix) | (1 << (31 - depth))), depth + 1); +} + +static void +extract_prefixes(const struct trie_node *node, struct aggregated_prefixes *prefixes) +{ + extract_prefixes_helper(node, prefixes, _MI4(0), 0); +} + /* * Set static attribute in @rta from static attribute in @old according to @sa. */ @@ -594,6 +1019,59 @@ aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new sl_free(old_route); } + trie_init(p); + + HASH_WALK(p->buckets, next_hash, bucket) + { + for (const struct rte *rte = bucket->rte; rte; rte = rte->next) + { + union net_addr_union *uptr = (net_addr_union *)rte->net->n.addr; + trie_insert_prefix(uptr, bucket, p->root, p->trie_slab); + } + } + HASH_WALK_END; + + assert(p->root != NULL); + assert(p->trie_slab != NULL); + + log("protocol: %p, root: %p, slab: %p", p, p->root, p->trie_slab); + log("Number of prefixes before aggregation: %d", get_trie_prefix_count(p->root)); + log("Trie depth before aggregation: %d", get_trie_depth(p->root)); + + first_pass(p->root, p->trie_slab); + log("Trie depth after first pass: %d", get_trie_depth(p->root)); + second_pass(p->root); + log("Trie depth after second pass: %d", get_trie_depth(p->root)); + third_pass(p->root); + log("Trie depth after third pass: %d", get_trie_depth(p->root)); + + if (is_leaf(p->root)) + log("WARNING: root is leaf!"); + + const int prefix_count = get_trie_prefix_count(p->root); + + struct aggregated_prefixes *prefixes = allocz(sizeof(struct aggregated_prefixes) + sizeof(struct prefix_bucket) * prefix_count); + prefixes->capacity = prefix_count; + prefixes->count = 0; + + log("Number of prefixes after aggregation: %d", prefix_count); + extract_prefixes(p->root, prefixes); + log("Aggregated prefixes count: %d", prefixes->count); + log("Trie depth: %d", get_trie_depth(p->root)); + + assert(prefixes->count == prefix_count); + + struct buffer buf; + LOG_BUFFER_INIT(buf); + + for (int i = 0; i < prefixes->count; i++) + { + int res = buffer_print(&buf, "%I4", prefixes->prefix_buckets[i].trie_prefix.prefix); + assert(res != -1); + } + + log("%s", buf.start); + /* Announce changes */ if (old_bucket) aggregator_bucket_update(p, old_bucket, net); diff --git a/proto/aggregator/aggregator.h b/proto/aggregator/aggregator.h index 19459b1d..63540199 100644 --- a/proto/aggregator/aggregator.h +++ b/proto/aggregator/aggregator.h @@ -17,6 +17,8 @@ #include "nest/protocol.h" #include "lib/hash.h" +#define MAX_POTENTIAL_NEXTHOP_COUNT 16 + struct aggregator_config { struct proto_config c; struct channel_config *src, *dst; @@ -61,6 +63,10 @@ struct aggregator_proto { /* Merge filter */ const struct f_line *merge_by; event reload_buckets; + + /* Aggregation trie */ + slab *trie_slab; + struct trie_node *root; }; enum aggr_item_type { @@ -83,4 +89,23 @@ struct aggr_item_node { struct aggr_item i; }; +struct trie_node { + struct trie_node *parent; + struct trie_node *child[2]; + struct aggregator_bucket *bucket; + struct aggregator_bucket *potential_buckets[MAX_POTENTIAL_NEXTHOP_COUNT]; + int potential_buckets_count; +}; + +struct prefix_bucket { + net_addr_ip4 trie_prefix; + struct aggregator_bucket *bucket; +}; + +struct aggregated_prefixes { + int count; + int capacity; + struct prefix_bucket prefix_buckets[]; +}; + #endif