diff --git a/proto/bmp/bmp.c b/proto/bmp/bmp.c index 599024f5..2646a1f4 100644 --- a/proto/bmp/bmp.c +++ b/proto/bmp/bmp.c @@ -225,6 +225,14 @@ bmp_send_peer_up_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp, static void bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs); +// Stores TX data +struct bmp_tx_buffer { + resource r; + node n; + byte *end; + byte buf[0xfff00]; +}; + // Stores necessary any data in list struct bmp_data_node { node n; @@ -274,22 +282,63 @@ bmp_init_msg_serialize(buffer *stream, const char *sys_descr, const char *sys_na bmp_info_tlv_hdr_serialize(stream, BMP_INFO_TLV_TYPE_SYS_NAME, sys_name); } +static void btb_free(resource *r UNUSED) { + UNUSED struct bmp_tx_buffer *btb = SKIP_BACK(struct bmp_tx_buffer, r, r); +} + +static void +btb_dump(resource *r) +{ + struct bmp_tx_buffer *btb = SKIP_BACK(struct bmp_tx_buffer, r, r); + debug("used %u bytes\n", btb->end - btb->buf); +} + +static struct resmem +btb_memsize(resource *r) +{ + struct bmp_tx_buffer *btb = SKIP_BACK(struct bmp_tx_buffer, r, r); + return (struct resmem) { + .effective = btb->end - btb->buf, + .overhead = (sizeof *btb) - (btb->end - btb->buf), + }; +} + +static struct resclass bmp_tx_buffer_class = { + .name = "BMP TX buffer", + .size = sizeof(struct bmp_tx_buffer), + .free = btb_free, + .dump = btb_dump, + .memsize = btb_memsize, +}; + static void bmp_schedule_tx_packet(struct bmp_proto *p, const byte *payload, const size_t size) { ASSERT(p->started); - struct bmp_data_node *tx_data = mb_alloc(p->tx_mem_pool, sizeof (struct bmp_data_node)); - tx_data->data = mb_alloc(p->tx_mem_pool, size); - memcpy(tx_data->data, payload, size); - tx_data->data_size = size; - add_tail(&p->tx_queue, &tx_data->n); +// log(L_INFO "schedule tx packet %p %u", payload, size); - if (sk_tx_buffer_empty(p->sk) - && !ev_active(p->tx_ev)) + struct bmp_tx_buffer *btb = EMPTY_LIST(p->tx_queue) ? NULL : + SKIP_BACK(struct bmp_tx_buffer, n, TAIL(p->tx_queue)); + if (btb && (btb->end + size > btb->buf + sizeof btb->buf)) + btb = NULL; + + if (!btb) { - ev_schedule(p->tx_ev); + btb = ralloc(p->tx_mem_pool, &bmp_tx_buffer_class); + btb->end = btb->buf; + btb->n = (node) {}; + add_tail(&p->tx_queue, &btb->n); +// log(L_INFO "btb alloc buf %p end %p", btb->buf, btb->end); } + else +// log(L_INFO "btb found buf %p end %p", btb->buf, btb->end); + + memcpy(btb->end, payload, size); + btb->end += size; + + if (!p->sk->tbuf && !ev_active(p->tx_ev)) + ev_schedule(p->tx_ev); } static void @@ -305,35 +354,33 @@ bmp_fire_tx(void *p_) "Called BMP TX event handler when there is not any data to send" ); - size_t cnt = 0; // Counts max packets which we want to send per TX slot - struct bmp_data_node *tx_data; - struct bmp_data_node *tx_data_next; - WALK_LIST_DELSAFE(tx_data, tx_data_next, p->tx_queue) + int cnt = 0; + struct bmp_tx_buffer *btb; + WALK_LIST_FIRST2(btb, n, p->tx_queue) { - if (tx_data->data_size > p->sk->tbsize) - { - sk_set_tbsize(p->sk, tx_data->data_size); - } - - size_t data_size = tx_data->data_size; - memcpy(p->sk->tbuf, tx_data->data, data_size); - mb_free(tx_data->data); - rem_node((node *) tx_data); - mb_free(tx_data); - - if (sk_send(p->sk, data_size) <= 0) + if (p->tx_pending) return; - // BMP packets should be treat with lowest priority when scheduling sending - // packets to target. That's why we want to send max. 32 packets per event - // call - if (++cnt > 32) + rem_node(&btb->n); + + p->sk->tbuf = btb->buf; + p->tx_pending = btb; + +// log(L_INFO "btb send buf %p end %p", btb->buf, btb->end); + + if (sk_send(p->sk, btb->end - btb->buf) <= 0) + return; + +// log(L_INFO "btb free buf %p", btb->buf); + + p->sk->tbuf = NULL; + p->tx_pending = NULL; + rfree(&btb->r); + + if (cnt++ > 32) { if (!ev_active(p->tx_ev)) - { - ev_schedule(p->tx_ev); - } - + ev_schedule(p->tx_ev); return; } } @@ -342,6 +389,14 @@ bmp_fire_tx(void *p_) static void bmp_tx(struct birdsock *sk) { + struct bmp_proto *p = sk->data; + +// log(L_INFO "btb free buf %p", p->tx_pending); + + rfree(&p->tx_pending->r); + p->tx_pending = NULL; + sk->tbuf = NULL; + bmp_fire_tx(sk->data); } @@ -783,25 +838,27 @@ bmp_send_peer_up_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp, static void bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, const byte *data, size_t length, btime timestamp) { - struct bmp_data_node *upd_msg = mb_alloc(p->update_msg_mem_pool, - sizeof (struct bmp_data_node)); - upd_msg->data = mb_alloc(p->update_msg_mem_pool, length); - memcpy(upd_msg->data, data, length); - upd_msg->data_size = length; - add_tail(&p->update_msg_queue, &upd_msg->n); + static byte locbuf[0xff000]; + buffer payload = { + .start = locbuf, + .pos = locbuf, + .end = locbuf + sizeof locbuf, + }; - /* Save some metadata */ struct bgp_proto *bgp = bs->bgp; - upd_msg->remote_as = bgp->remote_as; - upd_msg->remote_id = bgp->remote_id; - upd_msg->remote_ip = bgp->remote_ip; - upd_msg->timestamp = timestamp; - upd_msg->global_peer = bmp_is_peer_global_instance(bgp); - upd_msg->policy = bmp_stream_policy(bs); + bmp_route_monitor_msg_serialize(&payload, + bmp_is_peer_global_instance(bgp), + bmp_stream_policy(bs), + bgp->remote_as, + bgp->remote_id, + true, + bgp->remote_ip, + data, + length, + timestamp + ); - /* Kick the commit */ - if (!ev_active(p->update_ev)) - ev_schedule(p->update_ev); + bmp_schedule_tx_packet(p, locbuf, payload.pos - locbuf); } static void @@ -820,38 +877,6 @@ bmp_route_monitor_notify(struct bmp_proto *p, struct bmp_stream *bs, log(L_WARN "%s: Cannot encode update for %N", p->p.name, n); } -static void -bmp_route_monitor_commit(void *p_) -{ - struct bmp_proto *p = p_; - - if (!p->started) - return; - - buffer payload - = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE); - - struct bmp_data_node *data, *data_next; - WALK_LIST_DELSAFE(data, data_next, p->update_msg_queue) - { - bmp_route_monitor_msg_serialize(&payload, - data->global_peer, data->policy, - data->remote_as, data->remote_id, true, - data->remote_ip, data->data, data->data_size, - data->timestamp); - - bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload)); - - bmp_buffer_flush(&payload); - - mb_free(data->data); - rem_node(&data->n); - mb_free(data); - } - - bmp_buffer_free(&payload); -} - static void bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs) { @@ -967,12 +992,13 @@ bmp_send_termination_msg(struct bmp_proto *p, bmp_put_u16(&stream, BMP_TERM_INFO_REASON); bmp_put_u16(&stream, BMP_TERM_REASON_CODE_SIZE); // 2-byte code indication the reason bmp_put_u16(&stream, reason); - memcpy(p->sk->tbuf, bmp_buffer_data(&stream), bmp_buffer_pos(&stream)); + p->sk->tbuf = bmp_buffer_data(&stream); IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL( sk_send(p->sk, bmp_buffer_pos(&stream)) < 0, "Failed to send BMP termination message" ); + p->sk->tbuf = NULL; bmp_buffer_free(&stream); } @@ -1119,7 +1145,6 @@ bmp_connect(struct bmp_proto *p) sk->dport = p->station_port; sk->ttl = IP4_MAX_TTL; sk->tos = IP_PREC_INTERNET_CONTROL; - sk->tbsize = BGP_TX_BUFFER_EXT_SIZE; sk->tx_hook = bmp_connected; sk->err_hook = bmp_sock_err; @@ -1245,9 +1270,8 @@ bmp_start(struct proto *P) p->buffer_mpool = rp_new(P->pool, "BMP Buffer"); p->tx_mem_pool = rp_new(P->pool, "BMP Tx"); - p->update_msg_mem_pool = rp_new(P->pool, "BMP Update"); p->tx_ev = ev_new_init(p->p.pool, bmp_fire_tx, p); - p->update_ev = ev_new_init(p->p.pool, bmp_route_monitor_commit, p); + p->tx_pending = NULL; p->connect_retry_timer = tm_new_init(p->p.pool, bmp_connection_retry, p, 0, 0); p->sk = NULL; @@ -1256,7 +1280,6 @@ bmp_start(struct proto *P) HASH_INIT(p->table_map, P->pool, 4); init_list(&p->tx_queue); - init_list(&p->update_msg_queue); p->started = false; p->sock_err = 0; add_tail(&bmp_proto_list, &p->bmp_node); diff --git a/proto/bmp/bmp.h b/proto/bmp/bmp.h index 45844836..9cb3d465 100644 --- a/proto/bmp/bmp.h +++ b/proto/bmp/bmp.h @@ -66,10 +66,9 @@ struct bmp_proto { // struct bmp_peer_map bgp_peers; // Stores 'bgp_proto' structure per BGP peer pool *buffer_mpool; // Memory pool used for BMP buffer allocations pool *tx_mem_pool; // Memory pool used for packet allocations designated to BMP collector - pool *update_msg_mem_pool; // Memory pool used for BPG UPDATE MSG allocations list tx_queue; // Stores queued packets going to be sent + struct bmp_tx_buffer *tx_pending;// This buffer waits for socket to flush timer *connect_retry_timer; // Timer for retrying connection to the BMP collector - list update_msg_queue; // Stores all composed BGP UPDATE MSGs bool started; // Flag that stores running status of BMP instance int sock_err; // Last socket error code };