0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2025-01-13 20:41:53 +00:00

BMP: simplified update queuing

This commit drops an unnecessary layer of indirection, allocates the
output blocks by 1M chunks and drops most of the intermediary
allocations from the heap altogether.
This commit is contained in:
Maria Matejka 2024-09-17 16:27:54 +02:00 committed by Maria Matejka
parent 0f9f6424cd
commit 3f371ddff2
2 changed files with 111 additions and 87 deletions

View File

@ -225,6 +225,14 @@ bmp_send_peer_up_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp,
static void bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs); static void bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs);
// Stores TX data
struct bmp_tx_buffer {
resource r;
node n;
byte *end;
byte buf[0xfff00];
};
// Stores necessary any data in list // Stores necessary any data in list
struct bmp_data_node { struct bmp_data_node {
node n; node n;
@ -274,22 +282,65 @@ bmp_init_msg_serialize(buffer *stream, const char *sys_descr, const char *sys_na
bmp_info_tlv_hdr_serialize(stream, BMP_INFO_TLV_TYPE_SYS_NAME, sys_name); bmp_info_tlv_hdr_serialize(stream, BMP_INFO_TLV_TYPE_SYS_NAME, sys_name);
} }
static void btb_free(resource *r UNUSED) {
UNUSED struct bmp_tx_buffer *btb = SKIP_BACK(struct bmp_tx_buffer, r, r);
}
static void
btb_dump(resource *r)
{
struct bmp_tx_buffer *btb = SKIP_BACK(struct bmp_tx_buffer, r, r);
debug("used %u bytes\n", btb->end - btb->buf);
}
static struct resmem
btb_memsize(resource *r)
{
struct bmp_tx_buffer *btb = SKIP_BACK(struct bmp_tx_buffer, r, r);
return (struct resmem) {
.effective = btb->end - btb->buf,
.overhead = (sizeof *btb) - (btb->end - btb->buf),
};
}
static struct resclass bmp_tx_buffer_class = {
.name = "BMP TX buffer",
.size = sizeof(struct bmp_tx_buffer),
.free = btb_free,
.dump = btb_dump,
.memsize = btb_memsize,
};
static void static void
bmp_schedule_tx_packet(struct bmp_proto *p, const byte *payload, const size_t size) bmp_schedule_tx_packet(struct bmp_proto *p, const byte *payload, const size_t size)
{ {
ASSERT(p->started); ASSERT(p->started);
struct bmp_data_node *tx_data = mb_alloc(p->tx_mem_pool, sizeof (struct bmp_data_node)); // log(L_INFO "schedule tx packet %p %u", payload, size);
tx_data->data = mb_alloc(p->tx_mem_pool, size);
memcpy(tx_data->data, payload, size);
tx_data->data_size = size;
add_tail(&p->tx_queue, &tx_data->n);
if (sk_tx_buffer_empty(p->sk) struct bmp_tx_buffer *btb = EMPTY_LIST(p->tx_queue) ? NULL :
&& !ev_active(p->tx_ev)) SKIP_BACK(struct bmp_tx_buffer, n, TAIL(p->tx_queue));
if (btb && (btb->end + size > btb->buf + sizeof btb->buf))
btb = NULL;
if (!btb)
{ {
ev_schedule(p->tx_ev); btb = ralloc(p->tx_mem_pool, &bmp_tx_buffer_class);
btb->end = btb->buf;
btb->n = (node) {};
add_tail(&p->tx_queue, &btb->n);
// log(L_INFO "btb alloc buf %p end %p", btb->buf, btb->end);
} }
else
{
// log(L_INFO "btb found buf %p end %p", btb->buf, btb->end);
}
memcpy(btb->end, payload, size);
btb->end += size;
if (!p->sk->tbuf && !ev_active(p->tx_ev))
ev_schedule(p->tx_ev);
} }
static void static void
@ -305,35 +356,33 @@ bmp_fire_tx(void *p_)
"Called BMP TX event handler when there is not any data to send" "Called BMP TX event handler when there is not any data to send"
); );
size_t cnt = 0; // Counts max packets which we want to send per TX slot int cnt = 0;
struct bmp_data_node *tx_data; struct bmp_tx_buffer *btb;
struct bmp_data_node *tx_data_next; WALK_LIST_FIRST2(btb, n, p->tx_queue)
WALK_LIST_DELSAFE(tx_data, tx_data_next, p->tx_queue)
{ {
if (tx_data->data_size > p->sk->tbsize) if (p->tx_pending)
{
sk_set_tbsize(p->sk, tx_data->data_size);
}
size_t data_size = tx_data->data_size;
memcpy(p->sk->tbuf, tx_data->data, data_size);
mb_free(tx_data->data);
rem_node((node *) tx_data);
mb_free(tx_data);
if (sk_send(p->sk, data_size) <= 0)
return; return;
// BMP packets should be treat with lowest priority when scheduling sending rem_node(&btb->n);
// packets to target. That's why we want to send max. 32 packets per event
// call p->sk->tbuf = btb->buf;
if (++cnt > 32) p->tx_pending = btb;
// log(L_INFO "btb send buf %p end %p", btb->buf, btb->end);
if (sk_send(p->sk, btb->end - btb->buf) <= 0)
return;
// log(L_INFO "btb free buf %p", btb->buf);
p->sk->tbuf = NULL;
p->tx_pending = NULL;
rfree(&btb->r);
if (cnt++ > 32)
{ {
if (!ev_active(p->tx_ev)) if (!ev_active(p->tx_ev))
{ ev_schedule(p->tx_ev);
ev_schedule(p->tx_ev);
}
return; return;
} }
} }
@ -342,6 +391,14 @@ bmp_fire_tx(void *p_)
static void static void
bmp_tx(struct birdsock *sk) bmp_tx(struct birdsock *sk)
{ {
struct bmp_proto *p = sk->data;
// log(L_INFO "btb free buf %p", p->tx_pending);
rfree(&p->tx_pending->r);
p->tx_pending = NULL;
sk->tbuf = NULL;
bmp_fire_tx(sk->data); bmp_fire_tx(sk->data);
} }
@ -783,25 +840,27 @@ bmp_send_peer_up_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp,
static void static void
bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, const byte *data, size_t length, btime timestamp) bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, const byte *data, size_t length, btime timestamp)
{ {
struct bmp_data_node *upd_msg = mb_alloc(p->update_msg_mem_pool, static byte locbuf[0xff000];
sizeof (struct bmp_data_node)); buffer payload = {
upd_msg->data = mb_alloc(p->update_msg_mem_pool, length); .start = locbuf,
memcpy(upd_msg->data, data, length); .pos = locbuf,
upd_msg->data_size = length; .end = locbuf + sizeof locbuf,
add_tail(&p->update_msg_queue, &upd_msg->n); };
/* Save some metadata */
struct bgp_proto *bgp = bs->bgp; struct bgp_proto *bgp = bs->bgp;
upd_msg->remote_as = bgp->remote_as; bmp_route_monitor_msg_serialize(&payload,
upd_msg->remote_id = bgp->remote_id; bmp_is_peer_global_instance(bgp),
upd_msg->remote_ip = bgp->remote_ip; bmp_stream_policy(bs),
upd_msg->timestamp = timestamp; bgp->remote_as,
upd_msg->global_peer = bmp_is_peer_global_instance(bgp); bgp->remote_id,
upd_msg->policy = bmp_stream_policy(bs); true,
bgp->remote_ip,
data,
length,
timestamp
);
/* Kick the commit */ bmp_schedule_tx_packet(p, locbuf, payload.pos - locbuf);
if (!ev_active(p->update_ev))
ev_schedule(p->update_ev);
} }
static void static void
@ -820,38 +879,6 @@ bmp_route_monitor_notify(struct bmp_proto *p, struct bmp_stream *bs,
log(L_WARN "%s: Cannot encode update for %N", p->p.name, n); log(L_WARN "%s: Cannot encode update for %N", p->p.name, n);
} }
static void
bmp_route_monitor_commit(void *p_)
{
struct bmp_proto *p = p_;
if (!p->started)
return;
buffer payload
= bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
struct bmp_data_node *data, *data_next;
WALK_LIST_DELSAFE(data, data_next, p->update_msg_queue)
{
bmp_route_monitor_msg_serialize(&payload,
data->global_peer, data->policy,
data->remote_as, data->remote_id, true,
data->remote_ip, data->data, data->data_size,
data->timestamp);
bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
bmp_buffer_flush(&payload);
mb_free(data->data);
rem_node(&data->n);
mb_free(data);
}
bmp_buffer_free(&payload);
}
static void static void
bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs) bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs)
{ {
@ -967,12 +994,13 @@ bmp_send_termination_msg(struct bmp_proto *p,
bmp_put_u16(&stream, BMP_TERM_INFO_REASON); bmp_put_u16(&stream, BMP_TERM_INFO_REASON);
bmp_put_u16(&stream, BMP_TERM_REASON_CODE_SIZE); // 2-byte code indication the reason bmp_put_u16(&stream, BMP_TERM_REASON_CODE_SIZE); // 2-byte code indication the reason
bmp_put_u16(&stream, reason); bmp_put_u16(&stream, reason);
memcpy(p->sk->tbuf, bmp_buffer_data(&stream), bmp_buffer_pos(&stream)); p->sk->tbuf = bmp_buffer_data(&stream);
IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL( IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(
sk_send(p->sk, bmp_buffer_pos(&stream)) < 0, sk_send(p->sk, bmp_buffer_pos(&stream)) < 0,
"Failed to send BMP termination message" "Failed to send BMP termination message"
); );
p->sk->tbuf = NULL;
bmp_buffer_free(&stream); bmp_buffer_free(&stream);
} }
@ -1119,7 +1147,6 @@ bmp_connect(struct bmp_proto *p)
sk->dport = p->station_port; sk->dport = p->station_port;
sk->ttl = IP4_MAX_TTL; sk->ttl = IP4_MAX_TTL;
sk->tos = IP_PREC_INTERNET_CONTROL; sk->tos = IP_PREC_INTERNET_CONTROL;
sk->tbsize = BGP_TX_BUFFER_EXT_SIZE;
sk->tx_hook = bmp_connected; sk->tx_hook = bmp_connected;
sk->err_hook = bmp_sock_err; sk->err_hook = bmp_sock_err;
@ -1245,9 +1272,8 @@ bmp_start(struct proto *P)
p->buffer_mpool = rp_new(P->pool, "BMP Buffer"); p->buffer_mpool = rp_new(P->pool, "BMP Buffer");
p->tx_mem_pool = rp_new(P->pool, "BMP Tx"); p->tx_mem_pool = rp_new(P->pool, "BMP Tx");
p->update_msg_mem_pool = rp_new(P->pool, "BMP Update");
p->tx_ev = ev_new_init(p->p.pool, bmp_fire_tx, p); p->tx_ev = ev_new_init(p->p.pool, bmp_fire_tx, p);
p->update_ev = ev_new_init(p->p.pool, bmp_route_monitor_commit, p); p->tx_pending = NULL;
p->connect_retry_timer = tm_new_init(p->p.pool, bmp_connection_retry, p, 0, 0); p->connect_retry_timer = tm_new_init(p->p.pool, bmp_connection_retry, p, 0, 0);
p->sk = NULL; p->sk = NULL;
@ -1256,7 +1282,6 @@ bmp_start(struct proto *P)
HASH_INIT(p->table_map, P->pool, 4); HASH_INIT(p->table_map, P->pool, 4);
init_list(&p->tx_queue); init_list(&p->tx_queue);
init_list(&p->update_msg_queue);
p->started = false; p->started = false;
p->sock_err = 0; p->sock_err = 0;
add_tail(&bmp_proto_list, &p->bmp_node); add_tail(&bmp_proto_list, &p->bmp_node);

View File

@ -66,10 +66,9 @@ struct bmp_proto {
// struct bmp_peer_map bgp_peers; // Stores 'bgp_proto' structure per BGP peer // struct bmp_peer_map bgp_peers; // Stores 'bgp_proto' structure per BGP peer
pool *buffer_mpool; // Memory pool used for BMP buffer allocations pool *buffer_mpool; // Memory pool used for BMP buffer allocations
pool *tx_mem_pool; // Memory pool used for packet allocations designated to BMP collector pool *tx_mem_pool; // Memory pool used for packet allocations designated to BMP collector
pool *update_msg_mem_pool; // Memory pool used for BPG UPDATE MSG allocations
list tx_queue; // Stores queued packets going to be sent list tx_queue; // Stores queued packets going to be sent
struct bmp_tx_buffer *tx_pending;// This buffer waits for socket to flush
timer *connect_retry_timer; // Timer for retrying connection to the BMP collector timer *connect_retry_timer; // Timer for retrying connection to the BMP collector
list update_msg_queue; // Stores all composed BGP UPDATE MSGs
bool started; // Flag that stores running status of BMP instance bool started; // Flag that stores running status of BMP instance
int sock_err; // Last socket error code int sock_err; // Last socket error code
}; };