diff --git a/proto/bmp/bmp.c b/proto/bmp/bmp.c index c233a4b4..4826d368 100644 --- a/proto/bmp/bmp.c +++ b/proto/bmp/bmp.c @@ -311,6 +311,12 @@ static struct resclass bmp_tx_buffer_class = { .memsize = btb_memsize, }; +static bool +bmp_check_tx_overflow(u64 cnt) +{ + return cnt * bmp_tx_buffer_class.size > (1ULL << 30); +} + static void bmp_schedule_tx_packet(struct bmp_proto *p, const byte *payload, const size_t size) { @@ -325,6 +331,9 @@ bmp_schedule_tx_packet(struct bmp_proto *p, const byte *payload, const size_t si if (!btb) { + if (bmp_check_tx_overflow(++p->tx_pending_count)) + ev_schedule(p->tx_overflow_event); + btb = ralloc(p->tx_mem_pool, &bmp_tx_buffer_class); btb->end = btb->buf; btb->n = (node) {}; @@ -377,6 +386,7 @@ bmp_fire_tx(void *p_) p->sk->tbuf = NULL; p->tx_pending = NULL; + --p->tx_pending_count; rfree(&btb->r); if (cnt++ > 32) @@ -397,6 +407,7 @@ bmp_tx(struct birdsock *sk) rfree(&p->tx_pending->r); p->tx_pending = NULL; + --p->tx_pending_count; sk->tbuf = NULL; bmp_fire_tx(sk->data); @@ -1200,6 +1211,26 @@ bmp_sock_err(sock *sk, int err) proto_notify_state(&p->p, PS_START); } +static void +bmp_tx_overflow(void *_p) +{ + struct bmp_proto *p = _p; + if (!bmp_check_tx_overflow(p->tx_pending_count)) + return; + + p->sock_err = 0; + + log(L_ERR "%s: Connection stalled", p->p.name); + + if (p->started) + bmp_down(p); + + bmp_close_socket(p); + tm_start(p->connect_retry_timer, CONNECT_RETRY_TIME); + + proto_notify_state(&p->p, PS_START); +} + /* BMP connect timeout event - switch from Idle/Connect state to Connect state */ static void bmp_connection_retry(timer *t) @@ -1216,6 +1247,24 @@ bmp_connection_retry(timer *t) static void bmp_close_socket(struct bmp_proto *p) { + if (p->tx_pending) + { + --p->tx_pending_count; + rfree(&p->tx_pending->r); + p->tx_pending = NULL; + } + + struct bmp_tx_buffer *btb; + WALK_LIST_FIRST2(btb, n, p->tx_queue) + { + --p->tx_pending_count; + + rem_node(&btb->n); + rfree(&btb->r); + } + + ASSERT_DIE(p->tx_pending_count == 0); + rfree(p->sk); p->sk = NULL; } @@ -1274,6 +1323,8 @@ bmp_start(struct proto *P) p->tx_mem_pool = rp_new(P->pool, "BMP Tx"); p->tx_ev = ev_new_init(p->p.pool, bmp_fire_tx, p); p->tx_pending = NULL; + p->tx_pending_count = 0; + p->tx_overflow_event = ev_new_init(p->p.pool, bmp_tx_overflow, p); p->connect_retry_timer = tm_new_init(p->p.pool, bmp_connection_retry, p, 0, 0); p->sk = NULL; diff --git a/proto/bmp/bmp.h b/proto/bmp/bmp.h index 9cb3d465..2406d905 100644 --- a/proto/bmp/bmp.h +++ b/proto/bmp/bmp.h @@ -68,6 +68,8 @@ struct bmp_proto { pool *tx_mem_pool; // Memory pool used for packet allocations designated to BMP collector list tx_queue; // Stores queued packets going to be sent struct bmp_tx_buffer *tx_pending;// This buffer waits for socket to flush + uint tx_pending_count; // How many buffers waiting for flush + event *tx_overflow_event; // Too many buffers waiting for flush timer *connect_retry_timer; // Timer for retrying connection to the BMP collector bool started; // Flag that stores running status of BMP instance int sock_err; // Last socket error code