From b97d924e52816b26660c4ab942b32381781e97fc Mon Sep 17 00:00:00 2001 From: Katerina Kubecova Date: Tue, 15 Oct 2024 11:31:32 +0200 Subject: [PATCH] Proto: adding lockless state table and journal To allow reading of protocol states from other protocols or completely different routines, we have to export these states to data structures not requiring to lock the protocol loops. On one hand, this doesn't give the reader the actual state "right now", on the other hand, getting that is impossible in a properly multithreaded environment and you will always get the information with some (little but noteworthy) delay. This implementation handles only the basic state information of the protocols, common for all the protocols. Adding protocol-specific state information should be done by implementing the protocol hook init_state(). Channel information is stored but not announced, as we don't need the announcements for now. --- nest/proto.c | 333 ++++++++++++++++++++++++++++++++++++++++++++++++ nest/protocol.h | 57 +++++++++ 2 files changed, 390 insertions(+) diff --git a/nest/proto.c b/nest/proto.c index 2bb0766e..19593533 100644 --- a/nest/proto.c +++ b/nest/proto.c @@ -44,10 +44,13 @@ static u32 graceful_restart_locks; static char *p_states[] = { "DOWN", "START", "UP", "STOP" }; static char *c_states[] = { "DOWN", "START", "UP", "STOP", "RESTART" }; +proto_state_table proto_state_table_pub; + extern struct protocol proto_unix_iface; static void proto_rethink_goal(struct proto *p); static char *proto_state_name(struct proto *p); +void proto_journal_item_cleanup(struct lfjour * journal UNUSED, struct lfjour_item *i); static void channel_init_limit(struct channel *c, struct limit *l, int dir, struct channel_limit *cf); static void channel_update_limit(struct channel *c, struct limit *l, int dir, struct channel_limit *cf); static void channel_reset_limit(struct channel *c, struct limit *l, int dir); @@ -239,6 +242,48 @@ proto_add_channel(struct proto *p, struct channel_config *cf) init_list(&c->roa_subscriptions); + /* Announcing existence of the channel */ + PST_LOCKED(ts) + { + /* Allocating channel ID */ + c->id = hmap_first_zero(&ts->channel_id_map); + hmap_set(&ts->channel_id_map, c->id); + + /* The current channel state table may be too small */ + if (c->id >= ts->length_channels) + { + ea_list **l = mb_allocz(ts->pool, sizeof(ea_list*) * ts->length_channels * 2); + memcpy(l, ts->channels, sizeof(ea_list*) * ts->length_channels); + mb_free(ts->channels); + + ts->channels = l; + ts->length_channels = ts->length_channels * 2; + } + + /* Create the actual channel information */ + struct ea_list *ca = NULL; + + ea_set_attr(&ca, EA_LITERAL_STORE_STRING(&ea_name, 0, c->name)); + ea_set_attr(&ca, EA_LITERAL_EMBEDDED(&ea_proto_id, 0, c->proto->id)); + ea_set_attr(&ca, EA_LITERAL_EMBEDDED(&ea_channel_id, 0, c->id)); + ea_set_attr(&ca, EA_LITERAL_EMBEDDED(&ea_in_keep, 0, c->in_keep)); + ea_set_attr(&ca, EA_LITERAL_STORE_PTR(&ea_rtable, 0, c->table)); + + ASSERT_DIE(c->id < ts->length_channels); + ASSERT_DIE(ts->channels[c->id] == NULL); + ts->channels[c->id] = ea_lookup_slow(ca, 0, EALS_IN_TABLE); + + /* Update channel list in protocol state */ + ASSERT_DIE(c->proto->id < ts->length_states); + + ea_set_attr(&p->ea_state, + EA_LITERAL_DIRECT_ADATA(&ea_proto_channel_list, 0, int_set_add( + tmp_linpool, ea_get_adata(p->ea_state, &ea_proto_channel_list), c->id))); + + ea_lookup(p->ea_state, 0, EALS_CUSTOM); + proto_announce_state_locked(ts, c->proto, p->ea_state); + } + CALL(c->class->init, c, cf); add_tail(&p->channels, &c->n); @@ -255,6 +300,21 @@ proto_remove_channel(struct proto *p UNUSED, struct channel *c) CD(c, "Removed", c->name); + ea_set_attr(&p->ea_state, + EA_LITERAL_DIRECT_ADATA(&ea_proto_channel_list, 0, int_set_del( + tmp_linpool, ea_get_adata(p->ea_state, &ea_proto_channel_list), c->id))); + + ea_lookup(p->ea_state, 0, EALS_CUSTOM); + proto_announce_state(c->proto, p->ea_state); + + PST_LOCKED(ts) + { + ASSERT_DIE(c->id < ts->length_channels); + ea_free_later(ts->channels[c->id]); + ts->channels[c->id] = NULL; + hmap_clear(&ts->channel_id_map, c->id); + } + rt_unlock_table(c->table); rem_node(&c->n); mb_free(c); @@ -1275,8 +1335,39 @@ proto_new(struct proto_config *cf) p->hash_key = random_u32(); cf->proto = p; + PST_LOCKED(tp) + { + p->id = hmap_first_zero(&tp->proto_id_map); + hmap_set(&tp->proto_id_map, p->id); + + if (p->id >= tp->length_states) + { + /* Grow the states array */ + ea_list **new_states = mb_allocz(tp->pool, sizeof *new_states * tp->length_states * 2); + memcpy(new_states, tp->states, tp->length_states * sizeof *new_states); + + mb_free(tp->states); + tp->states = new_states; + tp->length_states *= 2; + } + } + init_list(&p->channels); + /* + Making first version of proto eatters. + */ + struct ea_list *state = NULL; + + ea_set_attr(&state, EA_LITERAL_STORE_STRING(&ea_name, 0, p->name)); + ea_set_attr(&state, EA_LITERAL_STORE_PTR(&ea_protocol_type, 0, &p->proto)); + ea_set_attr(&state, EA_LITERAL_EMBEDDED(&ea_state, 0, p->proto_state)); + ea_set_attr(&state, EA_LITERAL_STORE_ADATA(&ea_last_modified, 0, &p->last_state_change, sizeof(btime))); + ea_set_attr(&state, EA_LITERAL_EMBEDDED(&ea_proto_id, 0, p->id)); + ea_set_attr(&state, EA_LITERAL_STORE_ADATA(&ea_proto_channel_list, 0, NULL, 0)); + + proto_announce_state(p, state); + return p; } @@ -1718,6 +1809,8 @@ proto_rethink_goal(struct proto *p) struct proto_config *nc = p->cf_new; struct proto *after = p->n.prev; + proto_announce_state(p, NULL); + DBG("%s has shut down for reconfiguration\n", p->name); p->cf->proto = NULL; OBSREF_CLEAR(p->global_config); @@ -2011,6 +2104,46 @@ protos_build(void) { proto_pool = rp_new(&root_pool, the_bird_domain.the_bird, "Protocols"); + /* Protocol attributes */ + ea_register_init(&ea_name); + ea_register_init(&ea_protocol_name); + ea_register_init(&ea_protocol_type); + ea_register_init(&ea_state); + ea_register_init(&ea_last_modified); + ea_register_init(&ea_info); + ea_register_init(&ea_proto_id); + ea_register_init(&ea_channel_id); + ea_register_init(&ea_in_keep); + ea_register_init(&ea_proto_channel_list); + ea_register_init(&ea_rtable); + + proto_state_table_pub.lock = DOMAIN_NEW(rtable); + + /* Init proto_state_table */ + pool *p = rp_new(&root_pool, the_bird_domain.the_bird, "Proto state table"); + + PST_LOCKED(ts) + { + ts->length_channels = 64; + ts->length_states = 32; + + hmap_init(&ts->proto_id_map, p, ts->length_states); /* for proto ids. Value of proto id is the same as index of that proto in ptoto_state_table->attrs */ + hmap_init(&ts->channel_id_map, p, ts->length_channels); + + ts->pool = p; + ts->states = mb_allocz(p, sizeof(ea_list *) * ts->length_states); + ts->channels = mb_allocz(p, sizeof(ea_list *) * ts->length_channels * 2); + } + + /* Init proto state journal */ + struct settle_config cf = {.min = 0, .max = 0}; + proto_state_table_pub.journal.item_done = proto_journal_item_cleanup; + proto_state_table_pub.journal.item_size = sizeof(struct proto_pending_update); + proto_state_table_pub.journal.loop = birdloop_new(&root_pool, DOMAIN_ORDER(service), 1, "proto journal loop"); + proto_state_table_pub.journal.domain = proto_state_table_pub.lock.rtable; + + lfjour_init(&proto_state_table_pub.journal, &cf); + protos_build_gen(); } @@ -2308,6 +2441,10 @@ proto_notify_state(struct proto *p, uint state) p->proto_state = state; p->last_state_change = current_time(); + ea_set_attr(&p->ea_state, EA_LITERAL_EMBEDDED(&ea_state, 0, p->proto_state)); + ea_lookup(p->ea_state, 0, EALS_CUSTOM); + proto_announce_state(p, p->ea_state); + switch (state) { case PS_START: @@ -2779,3 +2916,199 @@ proto_iterate_named(struct symbol *sym, struct protocol *proto, struct proto *ol return NULL; } } + +static void +proto_journal_item_cleanup_(ea_list *proto_attr, ea_list *old_attr) +{ + ea_free_later(old_attr); + + if (!proto_attr) + { + PST_LOCKED(tp) + { + int p_id = ea_get_int(old_attr, &ea_proto_id, 0); + hmap_clear(&tp->proto_id_map, p_id); + tp->states[p_id] = NULL; + } + } +} + +void +proto_journal_item_cleanup(struct lfjour * journal UNUSED, struct lfjour_item *i) +{ + /* Called after a journal update was has been read. */ + struct proto_pending_update *pupdate = SKIP_BACK(struct proto_pending_update, li, i); + proto_journal_item_cleanup_(pupdate->proto_attr, pupdate->old_proto_attr); +} + +void +proto_announce_state_locked(struct proto_state_table_private* ts, struct proto *p, ea_list *attr) +{ + /* + Should be called each time one (or more) variables tracked in proto eattrs changes. + Changes proto eattrs and activates journal. + */ + ea_set_attr(&attr, EA_LITERAL_STORE_ADATA(&ea_last_modified, 0, &p->last_state_change, sizeof(btime))); + + attr = ea_lookup(attr, 0, EALS_CUSTOM); + + ASSERT_DIE(p->id < ts->length_states); + ea_list *old_attr = ts->states[p->id]; + + if (attr == old_attr) + { + /* Nothing has changed */ + ea_free_later(attr); + return; + } + + ts->states[p->id] = attr; + + if (p->ea_state && p->ea_state->stored) + ea_free_later(p->ea_state); + p->ea_state = attr ? ea_ref(attr) : NULL; + + struct proto_pending_update *pupdate = SKIP_BACK(struct proto_pending_update, li, lfjour_push_prepare(&proto_state_table_pub.journal)); + + if (!pupdate) + { + proto_journal_item_cleanup_(attr, old_attr); + return; + } + + *pupdate = (struct proto_pending_update) { + .li = pupdate->li, /* Keep the item's internal state */ + .proto_attr = attr, + .old_proto_attr = old_attr, + .protocol = p + }; + + lfjour_push_commit(&proto_state_table_pub.journal); +} + +void +proto_announce_state(struct proto *p, ea_list *attr) +{ + PST_LOCKED(ts) + proto_announce_state_locked(ts, p, attr); +} + +struct proto_announce_state_deferred { + struct deferred_call dc; + struct proto *p; +}; + +static void proto_announce_state_deferred(struct deferred_call *dc) +{ + SKIP_BACK_DECLARE(struct proto_announce_state_deferred, pasd, dc, dc); + proto_announce_state(pasd->p, pasd->p->ea_state); +} + +void +proto_announce_state_later(struct proto *p, ea_list *attr) +{ + ea_free_later(p->ea_state); + p->ea_state = ea_lookup(attr, 0, EALS_CUSTOM); + + struct proto_announce_state_deferred pasd = { + .dc.hook = proto_announce_state_deferred, + .p = p, + }; + + defer_call(&pasd.dc, sizeof pasd); +} + +ea_list * +channel_get_state(int id) +{ + PST_LOCKED(ts) + { + ASSERT_DIE((u32) id < ts->length_channels); + if (ts->channels[id]) + return ea_ref_tmp(ts->channels[id]); + } + return NULL; +} + +ea_list * +proto_get_state(int id) +{ + ea_list *eal; + PST_LOCKED(ts) + { + ASSERT_DIE((u32)id < ts->length_states); + eal = ts->states[id]; + } + if (eal) + return ea_ref_tmp(eal); + return NULL; +} + +void +proto_states_subscribe(struct lfjour_recipient *r) +{ + PST_LOCKED(ts) + lfjour_register(&proto_state_table_pub.journal, r); +} + +/* State attribute declarations */ +struct ea_class ea_name = { + .name = "proto_name", + .type = T_STRING, +}; + +struct ea_class ea_protocol_name = { + .name = "proto_protocol_name", + .type = T_STRING, +}; + +struct ea_class ea_protocol_type = { + .name = "proto_protocol_type", + .type = T_PTR, +}; + +struct ea_class ea_main_table_id = { + .name = "proto_main_table_id", + .type = T_INT, +}; + +struct ea_class ea_state = { + .name = "proto_state", + .type = T_ENUM_STATE, +}; + +struct ea_class ea_last_modified = { + .name = "proto_last_modified", + .type = T_BTIME, +}; + +struct ea_class ea_info = { + .name = "proto_info", + .type = T_STRING, +}; + +struct ea_class ea_proto_id = { + .name = "proto_proto_id", + .type = T_INT, +}; + +struct ea_class ea_proto_channel_list = { + .name = "ea_proto_channel_list", + .type = T_CLIST, +}; + +struct ea_class ea_channel_id = { + .name = "proto_channel_id", + .type = T_INT, +}; + +struct ea_class ea_in_keep = { + .name = "channel_in_keep", + .type = T_INT, +}; + + +struct ea_class ea_rtable = { + .name = "rtable", + .type = T_PTR, +}; diff --git a/nest/protocol.h b/nest/protocol.h index bbb76a8a..84dd5570 100644 --- a/nest/protocol.h +++ b/nest/protocol.h @@ -173,6 +173,8 @@ struct proto { btime last_state_change; /* Time of last state transition */ char *last_state_name_announced; /* Last state name we've announced to the user */ char *message; /* State-change message, allocated from proto_pool */ + u32 id; /* Sequential ID used as index in proto_state_table */ + ea_list *ea_state; /* Last stored protocol state in proto_state_table (cached) */ /* * General protocol hooks: @@ -392,6 +394,60 @@ static inline int proto_is_inactive(struct proto *p) ; } +#define PROTO_STATE_TABLE_PUBLIC \ + DOMAIN(rtable) lock; /* Lock needed to access global protocol state table */ \ + struct lfjour journal; /* Subscribe here to get new content! */ \ + + +struct proto_state_table_private { + struct { + PROTO_STATE_TABLE_PUBLIC; + }; + struct proto_state_table_private **locked_at; + ea_list ** states; + ea_list ** channels; + u32 length_states; + u32 length_channels; + struct hmap proto_id_map; + struct hmap channel_id_map; + pool *pool; +}; + +typedef union proto_state_table { + struct { + PROTO_STATE_TABLE_PUBLIC; + }; + struct proto_state_table_private priv; +} proto_state_table; + +extern proto_state_table proto_state_table_pub; + +/* Define the lock cleanup function */ +LOBJ_UNLOCK_CLEANUP(proto_state_table, rtable); + +#define PST_IS_LOCKED LOBJ_IS_LOCKED(&proto_state_table_pub, rtable) +#define PST_LOCKED(tp) LOBJ_LOCKED(&proto_state_table_pub, tp, proto_state_table, rtable) +#define PST_LOCK(tp) LOBJ_LOCK(&proto_state_table_pub, tp, proto_state_table, rtable) + +struct proto_pending_update { + LFJOUR_ITEM_INHERIT(li); + ea_list *proto_attr; + ea_list *old_proto_attr; + struct proto *protocol; +}; + +void proto_announce_state_locked(struct proto_state_table_private *ts, struct proto *p, ea_list *attr); +void proto_announce_state(struct proto *p, ea_list *attr); +void proto_announce_state_later(struct proto *p, ea_list *attr); +ea_list *channel_get_state(int id); +ea_list *proto_get_state(int id); +void proto_states_subscribe(struct lfjour_recipient *r); + +/* Protocol journal attributes */ +extern struct ea_class ea_name, ea_protocol_name, ea_protocol_type, + ea_state, ea_last_modified, ea_info, ea_proto_id, ea_channel_id, ea_rtable, + ea_in_keep, ea_proto_channel_list; + /* * Debugging flags @@ -526,6 +582,7 @@ struct channel { node n; /* Node in proto->channels */ const char *name; /* Channel name (may be NULL) */ + u32 id; const struct channel_class *class; struct proto *proto;