From 0ebaf2ec187241b545269c8d81e5cbd772cd6802 Mon Sep 17 00:00:00 2001 From: Igor Putovny Date: Tue, 11 Jun 2024 13:47:07 +0200 Subject: [PATCH] Run aggregation on feed end from src channel and request feeding after receiving update --- proto/aggregator/aggregator.c | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/proto/aggregator/aggregator.c b/proto/aggregator/aggregator.c index 8cba48b9..a46c7827 100644 --- a/proto/aggregator/aggregator.c +++ b/proto/aggregator/aggregator.c @@ -921,6 +921,22 @@ aggregate_on_settle_timer(struct settle *s) log("Aggregation is already finished"); } +static void +aggregate_on_feed_end(struct channel *C) +{ + struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, C->proto); + + if (C == p->src) + { + run_aggregation(p); + flush_trie(p); + p->root = NULL; + + if (p->first_run) + p->first_run = 0; + } +} + /* * Set static attribute in @rta from static attribute in @old according to @sa. */ @@ -1281,6 +1297,8 @@ HASH_DEFINE_REHASH_FN(AGGR_BUCK, struct aggregator_bucket); #define AGGR_DATA_MEMSIZE (sizeof(struct f_val) * p->aggr_on_count) +static void trie_init(struct aggregator_proto *p); + static void aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new, rte *old) { @@ -1296,6 +1314,8 @@ aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new if (!p->root) trie_init(p); + channel_request_feeding(p->src); + /* Find the objects for the old route */ if (old) old_route = HASH_FIND(p->routes, AGGR_RTE, old); @@ -1539,6 +1559,7 @@ aggregator_init(struct proto_config *CF) P->rt_notify = aggregator_rt_notify; P->preexport = aggregator_preexport; + P->feed_end = aggregate_on_feed_end; return P; } @@ -1628,6 +1649,7 @@ aggregator_start(struct proto *P) }; p->trie_pool = lp_new(P->pool); + p->first_run = 1; p->aggr_done = 0; settle_init(&p->aggr_timer, &p->aggr_timer_cf, aggregate_on_settle_timer, p);