0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2025-01-15 05:21:54 +00:00

BMP: simplified update queuing

This commit drops an unnecessary layer of indirection, allocates the
output blocks by 1M chunks and drops most of the intermediary
allocations from the heap altogether.
This commit is contained in:
Maria Matejka 2024-09-17 16:27:54 +02:00
parent 13ddb0be00
commit f054e7df4e
2 changed files with 109 additions and 87 deletions

View File

@ -225,6 +225,14 @@ bmp_send_peer_up_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp,
static void bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs);
// Stores TX data
struct bmp_tx_buffer {
resource r;
node n;
byte *end;
byte buf[0xfff00];
};
// Stores necessary any data in list
struct bmp_data_node {
node n;
@ -274,22 +282,63 @@ bmp_init_msg_serialize(buffer *stream, const char *sys_descr, const char *sys_na
bmp_info_tlv_hdr_serialize(stream, BMP_INFO_TLV_TYPE_SYS_NAME, sys_name);
}
static void btb_free(resource *r UNUSED) {
UNUSED struct bmp_tx_buffer *btb = SKIP_BACK(struct bmp_tx_buffer, r, r);
}
static void
btb_dump(resource *r)
{
struct bmp_tx_buffer *btb = SKIP_BACK(struct bmp_tx_buffer, r, r);
debug("used %u bytes\n", btb->end - btb->buf);
}
static struct resmem
btb_memsize(resource *r)
{
struct bmp_tx_buffer *btb = SKIP_BACK(struct bmp_tx_buffer, r, r);
return (struct resmem) {
.effective = btb->end - btb->buf,
.overhead = (sizeof *btb) - (btb->end - btb->buf),
};
}
static struct resclass bmp_tx_buffer_class = {
.name = "BMP TX buffer",
.size = sizeof(struct bmp_tx_buffer),
.free = btb_free,
.dump = btb_dump,
.memsize = btb_memsize,
};
static void
bmp_schedule_tx_packet(struct bmp_proto *p, const byte *payload, const size_t size)
{
ASSERT(p->started);
struct bmp_data_node *tx_data = mb_alloc(p->tx_mem_pool, sizeof (struct bmp_data_node));
tx_data->data = mb_alloc(p->tx_mem_pool, size);
memcpy(tx_data->data, payload, size);
tx_data->data_size = size;
add_tail(&p->tx_queue, &tx_data->n);
// log(L_INFO "schedule tx packet %p %u", payload, size);
if (sk_tx_buffer_empty(p->sk)
&& !ev_active(p->tx_ev))
struct bmp_tx_buffer *btb = EMPTY_LIST(p->tx_queue) ? NULL :
SKIP_BACK(struct bmp_tx_buffer, n, TAIL(p->tx_queue));
if (btb && (btb->end + size > btb->buf + sizeof btb->buf))
btb = NULL;
if (!btb)
{
ev_schedule(p->tx_ev);
btb = ralloc(p->tx_mem_pool, &bmp_tx_buffer_class);
btb->end = btb->buf;
btb->n = (node) {};
add_tail(&p->tx_queue, &btb->n);
// log(L_INFO "btb alloc buf %p end %p", btb->buf, btb->end);
}
else
// log(L_INFO "btb found buf %p end %p", btb->buf, btb->end);
memcpy(btb->end, payload, size);
btb->end += size;
if (!p->sk->tbuf && !ev_active(p->tx_ev))
ev_schedule(p->tx_ev);
}
static void
@ -305,35 +354,33 @@ bmp_fire_tx(void *p_)
"Called BMP TX event handler when there is not any data to send"
);
size_t cnt = 0; // Counts max packets which we want to send per TX slot
struct bmp_data_node *tx_data;
struct bmp_data_node *tx_data_next;
WALK_LIST_DELSAFE(tx_data, tx_data_next, p->tx_queue)
int cnt = 0;
struct bmp_tx_buffer *btb;
WALK_LIST_FIRST2(btb, n, p->tx_queue)
{
if (tx_data->data_size > p->sk->tbsize)
{
sk_set_tbsize(p->sk, tx_data->data_size);
}
size_t data_size = tx_data->data_size;
memcpy(p->sk->tbuf, tx_data->data, data_size);
mb_free(tx_data->data);
rem_node((node *) tx_data);
mb_free(tx_data);
if (sk_send(p->sk, data_size) <= 0)
if (p->tx_pending)
return;
// BMP packets should be treat with lowest priority when scheduling sending
// packets to target. That's why we want to send max. 32 packets per event
// call
if (++cnt > 32)
rem_node(&btb->n);
p->sk->tbuf = btb->buf;
p->tx_pending = btb;
// log(L_INFO "btb send buf %p end %p", btb->buf, btb->end);
if (sk_send(p->sk, btb->end - btb->buf) <= 0)
return;
// log(L_INFO "btb free buf %p", btb->buf);
p->sk->tbuf = NULL;
p->tx_pending = NULL;
rfree(&btb->r);
if (cnt++ > 32)
{
if (!ev_active(p->tx_ev))
{
ev_schedule(p->tx_ev);
}
ev_schedule(p->tx_ev);
return;
}
}
@ -342,6 +389,14 @@ bmp_fire_tx(void *p_)
static void
bmp_tx(struct birdsock *sk)
{
struct bmp_proto *p = sk->data;
// log(L_INFO "btb free buf %p", p->tx_pending);
rfree(&p->tx_pending->r);
p->tx_pending = NULL;
sk->tbuf = NULL;
bmp_fire_tx(sk->data);
}
@ -783,25 +838,27 @@ bmp_send_peer_up_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp,
static void
bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, const byte *data, size_t length, btime timestamp)
{
struct bmp_data_node *upd_msg = mb_alloc(p->update_msg_mem_pool,
sizeof (struct bmp_data_node));
upd_msg->data = mb_alloc(p->update_msg_mem_pool, length);
memcpy(upd_msg->data, data, length);
upd_msg->data_size = length;
add_tail(&p->update_msg_queue, &upd_msg->n);
static byte locbuf[0xff000];
buffer payload = {
.start = locbuf,
.pos = locbuf,
.end = locbuf + sizeof locbuf,
};
/* Save some metadata */
struct bgp_proto *bgp = bs->bgp;
upd_msg->remote_as = bgp->remote_as;
upd_msg->remote_id = bgp->remote_id;
upd_msg->remote_ip = bgp->remote_ip;
upd_msg->timestamp = timestamp;
upd_msg->global_peer = bmp_is_peer_global_instance(bgp);
upd_msg->policy = bmp_stream_policy(bs);
bmp_route_monitor_msg_serialize(&payload,
bmp_is_peer_global_instance(bgp),
bmp_stream_policy(bs),
bgp->remote_as,
bgp->remote_id,
true,
bgp->remote_ip,
data,
length,
timestamp
);
/* Kick the commit */
if (!ev_active(p->update_ev))
ev_schedule(p->update_ev);
bmp_schedule_tx_packet(p, locbuf, payload.pos - locbuf);
}
static void
@ -820,38 +877,6 @@ bmp_route_monitor_notify(struct bmp_proto *p, struct bmp_stream *bs,
log(L_WARN "%s: Cannot encode update for %N", p->p.name, n);
}
static void
bmp_route_monitor_commit(void *p_)
{
struct bmp_proto *p = p_;
if (!p->started)
return;
buffer payload
= bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
struct bmp_data_node *data, *data_next;
WALK_LIST_DELSAFE(data, data_next, p->update_msg_queue)
{
bmp_route_monitor_msg_serialize(&payload,
data->global_peer, data->policy,
data->remote_as, data->remote_id, true,
data->remote_ip, data->data, data->data_size,
data->timestamp);
bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
bmp_buffer_flush(&payload);
mb_free(data->data);
rem_node(&data->n);
mb_free(data);
}
bmp_buffer_free(&payload);
}
static void
bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs)
{
@ -967,12 +992,13 @@ bmp_send_termination_msg(struct bmp_proto *p,
bmp_put_u16(&stream, BMP_TERM_INFO_REASON);
bmp_put_u16(&stream, BMP_TERM_REASON_CODE_SIZE); // 2-byte code indication the reason
bmp_put_u16(&stream, reason);
memcpy(p->sk->tbuf, bmp_buffer_data(&stream), bmp_buffer_pos(&stream));
p->sk->tbuf = bmp_buffer_data(&stream);
IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(
sk_send(p->sk, bmp_buffer_pos(&stream)) < 0,
"Failed to send BMP termination message"
);
p->sk->tbuf = NULL;
bmp_buffer_free(&stream);
}
@ -1119,7 +1145,6 @@ bmp_connect(struct bmp_proto *p)
sk->dport = p->station_port;
sk->ttl = IP4_MAX_TTL;
sk->tos = IP_PREC_INTERNET_CONTROL;
sk->tbsize = BGP_TX_BUFFER_EXT_SIZE;
sk->tx_hook = bmp_connected;
sk->err_hook = bmp_sock_err;
@ -1245,9 +1270,8 @@ bmp_start(struct proto *P)
p->buffer_mpool = rp_new(P->pool, "BMP Buffer");
p->tx_mem_pool = rp_new(P->pool, "BMP Tx");
p->update_msg_mem_pool = rp_new(P->pool, "BMP Update");
p->tx_ev = ev_new_init(p->p.pool, bmp_fire_tx, p);
p->update_ev = ev_new_init(p->p.pool, bmp_route_monitor_commit, p);
p->tx_pending = NULL;
p->connect_retry_timer = tm_new_init(p->p.pool, bmp_connection_retry, p, 0, 0);
p->sk = NULL;
@ -1256,7 +1280,6 @@ bmp_start(struct proto *P)
HASH_INIT(p->table_map, P->pool, 4);
init_list(&p->tx_queue);
init_list(&p->update_msg_queue);
p->started = false;
p->sock_err = 0;
add_tail(&bmp_proto_list, &p->bmp_node);

View File

@ -66,10 +66,9 @@ struct bmp_proto {
// struct bmp_peer_map bgp_peers; // Stores 'bgp_proto' structure per BGP peer
pool *buffer_mpool; // Memory pool used for BMP buffer allocations
pool *tx_mem_pool; // Memory pool used for packet allocations designated to BMP collector
pool *update_msg_mem_pool; // Memory pool used for BPG UPDATE MSG allocations
list tx_queue; // Stores queued packets going to be sent
struct bmp_tx_buffer *tx_pending;// This buffer waits for socket to flush
timer *connect_retry_timer; // Timer for retrying connection to the BMP collector
list update_msg_queue; // Stores all composed BGP UPDATE MSGs
bool started; // Flag that stores running status of BMP instance
int sock_err; // Last socket error code
};