0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2024-12-22 17:51:53 +00:00

Sockets: Unified API for main and other loops

Now sk_open() requires an explicit IO loop to open the socket in. Also
specific functions for socket RX pause / resume are added to allow for
BGP corking.

And last but not least, socket reloop is now synchronous to resolve
weird cases of the target loop stopping before actually picking up the
relooped socket. Now the caller must ensure that both loops are locked
while relooping, and this way all sockets always have their respective
loop.
This commit is contained in:
Maria Matejka 2023-04-02 19:15:22 +02:00
parent 571c4f69bf
commit 836e857b30
20 changed files with 243 additions and 227 deletions

View File

@ -16,10 +16,6 @@
extern struct birdloop main_birdloop; extern struct birdloop main_birdloop;
void sk_start(sock *s);
void sk_stop(sock *s);
void sk_reloop(sock *s, struct birdloop *loop);
/* Start a new birdloop owned by given pool and domain */ /* Start a new birdloop owned by given pool and domain */
struct birdloop *birdloop_new(pool *p, uint order, const char *name); struct birdloop *birdloop_new(pool *p, uint order, const char *name);
@ -58,6 +54,10 @@ struct birdloop_flag_handler {
void birdloop_flag(struct birdloop *loop, u32 flag); void birdloop_flag(struct birdloop *loop, u32 flag);
void birdloop_flag_set_handler(struct birdloop *, struct birdloop_flag_handler *); void birdloop_flag_set_handler(struct birdloop *, struct birdloop_flag_handler *);
/* Setup sockets */
void birdloop_add_socket(struct birdloop *, struct birdsock *);
void birdloop_remove_socket(struct birdloop *, struct birdsock *);
void birdloop_init(void); void birdloop_init(void);
/* Yield for a little while. Use only in special cases. */ /* Yield for a little while. Use only in special cases. */

View File

@ -80,17 +80,22 @@ typedef struct birdsock {
const char *password; /* Password for MD5 authentication */ const char *password; /* Password for MD5 authentication */
const char *err; /* Error message */ const char *err; /* Error message */
struct ssh_sock *ssh; /* Used in SK_SSH */ struct ssh_sock *ssh; /* Used in SK_SSH */
struct event reloop; /* Reloop event */ struct birdloop *loop; /* BIRDLoop owning this socket */
} sock; } sock;
sock *sock_new(pool *); /* Allocate new socket */ sock *sock_new(pool *); /* Allocate new socket */
#define sk_new(X) sock_new(X) /* Wrapper to avoid name collision with OpenSSL */ #define sk_new(X) sock_new(X) /* Wrapper to avoid name collision with OpenSSL */
int sk_open(sock *); /* Open socket */ int sk_open(sock *, struct birdloop *); /* Open socket */
void sk_reloop(sock *, struct birdloop *); /* Move socket to another loop. Both loops must be locked. */
int sk_rx_ready(sock *s); int sk_rx_ready(sock *s);
_Bool sk_tx_pending(sock *s);
int sk_send(sock *, uint len); /* Send data, <0=err, >0=ok, 0=sleep */ int sk_send(sock *, uint len); /* Send data, <0=err, >0=ok, 0=sleep */
int sk_send_to(sock *, uint len, ip_addr to, uint port); /* sk_send to given destination */ int sk_send_to(sock *, uint len, ip_addr to, uint port); /* sk_send to given destination */
void sk_reallocate(sock *); /* Free and allocate tbuf & rbuf */ void sk_reallocate(sock *); /* Free and allocate tbuf & rbuf */
void sk_pause_rx(struct birdloop *loop, sock *s);
void sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint));
void sk_set_rbsize(sock *s, uint val); /* Resize RX buffer */ void sk_set_rbsize(sock *s, uint val); /* Resize RX buffer */
void sk_set_tbsize(sock *s, uint val); /* Resize TX buffer, keeping content */ void sk_set_tbsize(sock *s, uint val); /* Resize TX buffer, keeping content */
void sk_set_tbuf(sock *s, void *tbuf); /* Switch TX buffer, NULL-> return to internal */ void sk_set_tbuf(sock *s, void *tbuf); /* Switch TX buffer, NULL-> return to internal */
@ -114,6 +119,7 @@ int sk_set_icmp6_filter(sock *s, int p1, int p2);
void sk_log_error(sock *s, const char *p); void sk_log_error(sock *s, const char *p);
byte * sk_rx_buffer(sock *s, int *len); /* Temporary */ byte * sk_rx_buffer(sock *s, int *len); /* Temporary */
sock *sk_next(sock *s);
extern int sk_priority_control; /* Suggested priority for control traffic, should be sysdep define */ extern int sk_priority_control; /* Suggested priority for control traffic, should be sysdep define */
@ -127,11 +133,9 @@ extern int sk_priority_control; /* Suggested priority for control traffic, shou
#define SKF_HIGH_PORT 0x20 /* Choose port from high range if possible */ #define SKF_HIGH_PORT 0x20 /* Choose port from high range if possible */
#define SKF_FREEBIND 0x40 /* Allow socket to bind to a nonlocal address */ #define SKF_FREEBIND 0x40 /* Allow socket to bind to a nonlocal address */
#define SKF_THREAD 0x100 /* Socked used in thread, Do not add to main loop */
#define SKF_TRUNCATED 0x200 /* Received packet was truncated, set by IO layer */ #define SKF_TRUNCATED 0x200 /* Received packet was truncated, set by IO layer */
#define SKF_HDRINCL 0x400 /* Used internally */ #define SKF_HDRINCL 0x400 /* Used internally */
#define SKF_PKTINFO 0x800 /* Used internally */ #define SKF_PKTINFO 0x800 /* Used internally */
#define SKF_PASSIVE_THREAD 0x1000 /* Child sockets used in thread, do not add to main loop */
/* /*
* Socket types SA SP DA DP IF TTL SendTo (?=may, -=must not, *=must) * Socket types SA SP DA DP IF TTL SendTo (?=may, -=must not, *=must)

View File

@ -1617,7 +1617,7 @@ babel_open_socket(struct babel_iface *ifa)
sk->ttl = 1; sk->ttl = 1;
sk->flags = SKF_LADDR_RX; sk->flags = SKF_LADDR_RX;
if (sk_open(sk) < 0) if (sk_open(sk, p->p.loop) < 0)
goto err; goto err;
if (sk_setup_multicast(sk) < 0) if (sk_setup_multicast(sk) < 0)

View File

@ -603,16 +603,10 @@ bfd_free_iface(struct bfd_iface *ifa)
return; return;
if (ifa->sk) if (ifa->sk)
{
sk_stop(ifa->sk);
rfree(ifa->sk); rfree(ifa->sk);
}
if (ifa->rx) if (ifa->rx)
{
sk_stop(ifa->rx);
rfree(ifa->rx); rfree(ifa->rx);
}
rem_node(&ifa->n); rem_node(&ifa->n);
mb_free(ifa); mb_free(ifa);
@ -1100,11 +1094,6 @@ bfd_shutdown(struct proto *P)
bfd_drop_requests(p); bfd_drop_requests(p);
if (p->rx4_1) sk_stop(p->rx4_1);
if (p->rx4_m) sk_stop(p->rx4_m);
if (p->rx6_1) sk_stop(p->rx6_1);
if (p->rx6_m) sk_stop(p->rx6_m);
return PS_DOWN; return PS_DOWN;
} }

View File

@ -430,12 +430,11 @@ bfd_open_rx_sk(struct bfd_proto *p, int multihop, int af)
/* TODO: configurable ToS and priority */ /* TODO: configurable ToS and priority */
sk->tos = IP_PREC_INTERNET_CONTROL; sk->tos = IP_PREC_INTERNET_CONTROL;
sk->priority = sk_priority_control; sk->priority = sk_priority_control;
sk->flags = SKF_THREAD | SKF_LADDR_RX | (!multihop ? SKF_TTL_RX : 0); sk->flags = SKF_LADDR_RX | (!multihop ? SKF_TTL_RX : 0);
if (sk_open(sk) < 0) if (sk_open(sk, p->p.loop) < 0)
goto err; goto err;
sk_start(sk);
return sk; return sk;
err: err:
@ -462,12 +461,11 @@ bfd_open_rx_sk_bound(struct bfd_proto *p, ip_addr local, struct iface *ifa)
/* TODO: configurable ToS and priority */ /* TODO: configurable ToS and priority */
sk->tos = IP_PREC_INTERNET_CONTROL; sk->tos = IP_PREC_INTERNET_CONTROL;
sk->priority = sk_priority_control; sk->priority = sk_priority_control;
sk->flags = SKF_THREAD | SKF_BIND | (ifa ? SKF_TTL_RX : 0); sk->flags = SKF_BIND | (ifa ? SKF_TTL_RX : 0);
if (sk_open(sk) < 0) if (sk_open(sk, p->p.loop) < 0)
goto err; goto err;
sk_start(sk);
return sk; return sk;
err: err:
@ -494,12 +492,11 @@ bfd_open_tx_sk(struct bfd_proto *p, ip_addr local, struct iface *ifa)
sk->tos = IP_PREC_INTERNET_CONTROL; sk->tos = IP_PREC_INTERNET_CONTROL;
sk->priority = sk_priority_control; sk->priority = sk_priority_control;
sk->ttl = ifa ? 255 : -1; sk->ttl = ifa ? 255 : -1;
sk->flags = SKF_THREAD | SKF_BIND | SKF_HIGH_PORT; sk->flags = SKF_BIND | SKF_HIGH_PORT;
if (sk_open(sk) < 0) if (sk_open(sk, p->p.loop) < 0)
goto err; goto err;
sk_start(sk);
return sk; return sk;
err: err:

View File

@ -263,7 +263,7 @@ bgp_listen_create(void *_ UNUSED)
sk->rx_hook = bgp_incoming_connection; sk->rx_hook = bgp_incoming_connection;
sk->err_hook = bgp_listen_sock_err; sk->err_hook = bgp_listen_sock_err;
if (sk_open(sk) < 0) if (sk_open(sk, &main_birdloop) < 0)
{ {
sk_log_error(sk, p->p.name); sk_log_error(sk, p->p.name);
log(L_ERR "%s: Cannot open listening socket", p->p.name); log(L_ERR "%s: Cannot open listening socket", p->p.name);
@ -1203,7 +1203,7 @@ bgp_connect(struct bgp_proto *p) /* Enter Connect state and start establishing c
bgp_setup_sk(conn, s); bgp_setup_sk(conn, s);
bgp_conn_set_state(conn, BS_CONNECT); bgp_conn_set_state(conn, BS_CONNECT);
if (sk_open(s) < 0) if (sk_open(s, p->p.loop) < 0)
goto err; goto err;
/* Set minimal receive TTL if needed */ /* Set minimal receive TTL if needed */

View File

@ -3015,7 +3015,7 @@ bgp_kick_tx(void *vconn)
; ;
if (!max && !ev_active(conn->tx_ev)) if (!max && !ev_active(conn->tx_ev))
ev_schedule(conn->tx_ev); proto_send_event(&conn->bgp->p, conn->tx_ev);
} }
void void
@ -3023,13 +3023,14 @@ bgp_tx(sock *sk)
{ {
struct bgp_conn *conn = sk->data; struct bgp_conn *conn = sk->data;
ASSERT_DIE(birdloop_inside(conn->bgp->p.loop));
DBG("BGP: TX hook\n"); DBG("BGP: TX hook\n");
uint max = 1024; uint max = 1024;
while (--max && (bgp_fire_tx(conn) > 0)) while (--max && (bgp_fire_tx(conn) > 0))
; ;
if (!max && !ev_active(conn->tx_ev)) if (!max && !ev_active(conn->tx_ev))
ev_schedule(conn->tx_ev); proto_send_event(&conn->bgp->p, conn->tx_ev);
} }

View File

@ -136,7 +136,7 @@ ospf_sk_open(struct ospf_iface *ifa)
sk->flags = SKF_LADDR_RX | (ifa->check_ttl ? SKF_TTL_RX : 0); sk->flags = SKF_LADDR_RX | (ifa->check_ttl ? SKF_TTL_RX : 0);
sk->ttl = ifa->cf->ttl_security ? 255 : 1; sk->ttl = ifa->cf->ttl_security ? 255 : 1;
if (sk_open(sk) < 0) if (sk_open(sk, p->p.loop) < 0)
goto err; goto err;
/* 12 is an offset of the checksum in an OSPFv3 packet */ /* 12 is an offset of the checksum in an OSPFv3 packet */
@ -220,7 +220,7 @@ ospf_open_vlink_sk(struct ospf_proto *p)
sk->data = (void *) p; sk->data = (void *) p;
sk->flags = 0; sk->flags = 0;
if (sk_open(sk) < 0) if (sk_open(sk, p->p.loop) < 0)
goto err; goto err;
/* 12 is an offset of the checksum in an OSPFv3 packet */ /* 12 is an offset of the checksum in an OSPFv3 packet */

View File

@ -493,7 +493,7 @@ radv_sk_open(struct radv_iface *ifa)
sk->data = ifa; sk->data = ifa;
sk->flags = SKF_LADDR_RX; sk->flags = SKF_LADDR_RX;
if (sk_open(sk) < 0) if (sk_open(sk, ifa->ra->p.loop) < 0)
goto err; goto err;
/* We want listen just to ICMPv6 messages of type RS and RA */ /* We want listen just to ICMPv6 messages of type RS and RA */

View File

@ -1012,7 +1012,7 @@ rip_open_socket(struct rip_iface *ifa)
/* sk->rbsize and sk->tbsize are handled in rip_iface_update_buffers() */ /* sk->rbsize and sk->tbsize are handled in rip_iface_update_buffers() */
if (sk_open(sk) < 0) if (sk_open(sk, p->p.loop) < 0)
goto err; goto err;
if (ifa->cf->mode == RIP_IM_MULTICAST) if (ifa->cf->mode == RIP_IM_MULTICAST)

View File

@ -35,11 +35,9 @@ rpki_tr_ssh_open(struct rpki_tr_sock *tr)
sk->ssh->subsystem = "rpki-rtr"; sk->ssh->subsystem = "rpki-rtr";
sk->ssh->state = SK_SSH_CONNECT; sk->ssh->state = SK_SSH_CONNECT;
if (sk_open(sk) != 0) if (sk_open(sk, cache->p->p.loop) != 0)
return RPKI_TR_ERROR; return RPKI_TR_ERROR;
sk_start(sk);
return RPKI_TR_SUCCESS; return RPKI_TR_SUCCESS;
} }

View File

@ -28,11 +28,9 @@ rpki_tr_tcp_open(struct rpki_tr_sock *tr)
sk->type = SK_TCP_ACTIVE; sk->type = SK_TCP_ACTIVE;
if (sk_open(sk) != 0) if (sk_open(sk, tr->cache->p->p.loop) != 0)
return RPKI_TR_ERROR; return RPKI_TR_ERROR;
sk_start(sk);
return RPKI_TR_SUCCESS; return RPKI_TR_SUCCESS;
} }

View File

@ -85,7 +85,6 @@ rpki_tr_open(struct rpki_tr_sock *tr)
sk->rbsize = RPKI_RX_BUFFER_SIZE; sk->rbsize = RPKI_RX_BUFFER_SIZE;
sk->tbsize = RPKI_TX_BUFFER_SIZE; sk->tbsize = RPKI_TX_BUFFER_SIZE;
sk->tos = IP_PREC_INTERNET_CONTROL; sk->tos = IP_PREC_INTERNET_CONTROL;
sk->flags |= SKF_THREAD;
sk->vrf = cache->p->p.vrf; sk->vrf = cache->p->p.vrf;
if (ipa_zero(sk->daddr) && sk->host) if (ipa_zero(sk->daddr) && sk->host)
@ -121,7 +120,6 @@ rpki_tr_close(struct rpki_tr_sock *tr)
if (tr->sk) if (tr->sk)
{ {
sk_stop(tr->sk);
rfree(tr->sk); rfree(tr->sk);
tr->sk = NULL; tr->sk = NULL;
} }

View File

@ -1088,7 +1088,7 @@ krt_sock_open(pool *pool, void *data, int table_id UNUSED)
sk->fd = fd; sk->fd = fd;
sk->data = data; sk->data = data;
if (sk_open(sk) < 0) if (sk_open(sk, &main_birdloop) < 0)
bug("krt-sock: sk_open failed"); bug("krt-sock: sk_open failed");
return sk; return sk;

View File

@ -2043,7 +2043,7 @@ nl_open_async(void)
sk->rx_hook = nl_async_hook; sk->rx_hook = nl_async_hook;
sk->err_hook = nl_async_err_hook; sk->err_hook = nl_async_err_hook;
sk->fd = fd; sk->fd = fd;
if (sk_open(sk) < 0) if (sk_open(sk, &main_birdloop) < 0)
bug("Netlink: sk_open failed"); bug("Netlink: sk_open failed");
} }

View File

@ -310,59 +310,95 @@ sockets_init(struct birdloop *loop)
loop->sock_num = 0; loop->sock_num = 0;
} }
static void void
sockets_add(struct birdloop *loop, sock *s) socket_changed(sock *s)
{ {
LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num); struct birdloop *loop = s->loop;
add_tail(&loop->sock_list, &s->n); ASSERT_DIE(birdloop_inside(loop));
loop->sock_num++;
s->index = -1;
if (loop->thread)
atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
loop->sock_changed++;
birdloop_ping(loop); birdloop_ping(loop);
} }
void void
sk_start(sock *s) birdloop_add_socket(struct birdloop *loop, sock *s)
{ {
ASSERT_DIE(birdloop_current != &main_birdloop); ASSERT_DIE(birdloop_inside(loop));
sockets_add(birdloop_current, s); ASSERT_DIE(!s->loop);
LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num);
add_tail(&loop->sock_list, &s->n);
loop->sock_num++;
s->loop = loop;
s->index = -1;
socket_changed(s);
} }
static void extern sock *stored_sock; /* mainloop hack */
sockets_remove(struct birdloop *loop, sock *s)
void
birdloop_remove_socket(struct birdloop *loop, sock *s)
{ {
if (!enlisted(&s->n)) ASSERT_DIE(!enlisted(&s->n) == !s->loop);
if (!s->loop)
return; return;
ASSERT_DIE(birdloop_inside(loop));
ASSERT_DIE(s->loop == loop);
/* Decouple the socket from the loop at all. */ /* Decouple the socket from the loop at all. */
LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num); LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num);
if (loop->sock_active == s)
loop->sock_active = sk_next(s);
if ((loop == &main_birdloop) && (s == stored_sock))
stored_sock = sk_next(s);
rem_node(&s->n); rem_node(&s->n);
loop->sock_num--; loop->sock_num--;
if (loop->thread)
atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
socket_changed(s);
s->loop = NULL;
s->index = -1; s->index = -1;
/* Close the filedescriptor. If it ever gets into the poll(), it just returns
* POLLNVAL for this fd which then is ignored because nobody checks for
* that result. Or some other routine opens another fd, getting this number,
* yet also in this case poll() at worst spuriously returns and nobody checks
* for the result in this fd. No further precaution is needed. */
close(s->fd);
} }
void void
sk_stop(sock *s) sk_reloop(sock *s, struct birdloop *loop)
{ {
sockets_remove(birdloop_current, s); ASSERT_DIE(birdloop_inside(loop));
ASSERT_DIE(birdloop_inside(s->loop));
if (loop == s->loop)
return;
birdloop_remove_socket(s->loop, s);
birdloop_add_socket(loop, s);
}
void
sk_pause_rx(struct birdloop *loop, sock *s)
{
ASSERT_DIE(birdloop_inside(loop));
s->rx_hook = NULL;
socket_changed(s);
}
void
sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint))
{
ASSERT_DIE(birdloop_inside(loop));
ASSERT_DIE(hook);
s->rx_hook = hook;
socket_changed(s);
} }
static inline uint sk_want_events(sock *s) static inline uint sk_want_events(sock *s)
{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); } { return (s->rx_hook ? POLLIN : 0) | (sk_tx_pending(s) ? POLLOUT : 0); }
void void
sockets_prepare(struct birdloop *loop, struct pfd *pfd) sockets_prepare(struct birdloop *loop, struct pfd *pfd)
@ -392,40 +428,60 @@ sockets_prepare(struct birdloop *loop, struct pfd *pfd)
int sk_read(sock *s, int revents); int sk_read(sock *s, int revents);
int sk_write(sock *s); int sk_write(sock *s);
void sk_err(sock *s, int revents);
static void static int
sockets_fire(struct birdloop *loop) sockets_fire(struct birdloop *loop)
{ {
if (EMPTY_LIST(loop->sock_list))
return 0;
int sch = 0;
times_update(); times_update();
struct pollfd *pfd = loop->thread->pfd->pfd.data; struct pollfd *pfd = loop->thread->pfd->pfd.data;
sock *s; node *n, *nxt; loop->sock_active = SKIP_BACK(sock, n, HEAD(loop->sock_list));
WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n)
while (loop->sock_active)
{ {
if (s->index < 0) sock *s = loop->sock_active;
continue;
int rev = pfd[s->index].revents;
if (!rev)
continue;
if (rev & POLLNVAL)
bug("poll: invalid fd %d", s->fd);
int rev;
if ((s->index >= 0) && (rev = pfd[s->index].revents) && !(rev & POLLNVAL))
{
int e = 1; int e = 1;
if (rev & POLLIN)
while (e && s->rx_hook)
e = sk_read(s, rev);
if (rev & POLLOUT) if (rev & POLLOUT)
{ {
atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); while ((s == loop->sock_active) && (e = sk_write(s)))
while (e = sk_write(s))
; ;
if (s != loop->sock_active)
continue;
if (!sk_tx_pending(s))
sch++;
} }
if (rev & POLLIN)
while (e && (s == loop->sock_active) && s->rx_hook)
e = sk_read(s, rev);
if (s != loop->sock_active)
continue;
if (!(rev & (POLLOUT | POLLIN)) && (rev & POLLERR))
sk_err(s, rev);
if (s != loop->sock_active)
continue;
} }
loop->sock_active = sk_next(s);
}
return sch;
} }
/* /*
@ -547,7 +603,8 @@ bird_thread_main(void *arg)
thr->meta->thread = thr; thr->meta->thread = thr;
birdloop_enter(thr->meta); birdloop_enter(thr->meta);
u32 refresh_sockets = 1; thr->sock_changed = 1;
struct pfd pfd; struct pfd pfd;
BUFFER_INIT(pfd.pfd, thr->pool, 16); BUFFER_INIT(pfd.pfd, thr->pool, 16);
BUFFER_INIT(pfd.loop, thr->pool, 16); BUFFER_INIT(pfd.loop, thr->pool, 16);
@ -563,7 +620,7 @@ bird_thread_main(void *arg)
{ {
birdloop_enter(loop); birdloop_enter(loop);
if (!EMPTY_LIST(loop->sock_list)) if (!EMPTY_LIST(loop->sock_list))
refresh_sockets = 1; thr->sock_changed = 1;
birdloop_leave(loop); birdloop_leave(loop);
} }
@ -590,10 +647,10 @@ bird_thread_main(void *arg)
ev_run_list(&thr->priority_events); ev_run_list(&thr->priority_events);
/* Do we have to refresh sockets? */ /* Do we have to refresh sockets? */
refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel); if (thr->sock_changed)
if (refresh_sockets)
{ {
thr->sock_changed = 0;
BUFFER_FLUSH(pfd.pfd); BUFFER_FLUSH(pfd.pfd);
BUFFER_FLUSH(pfd.loop); BUFFER_FLUSH(pfd.loop);
@ -608,7 +665,6 @@ bird_thread_main(void *arg)
} }
ASSERT_DIE(pfd.loop.used == pfd.pfd.used); ASSERT_DIE(pfd.loop.used == pfd.pfd.used);
refresh_sockets = 0;
} }
/* Nothing to do in at least 5 seconds, flush local hot page cache */ /* Nothing to do in at least 5 seconds, flush local hot page cache */
else if (timeout > 5000) else if (timeout > 5000)
@ -957,6 +1013,15 @@ birdloop_init(void)
static void static void
birdloop_stop_internal(struct birdloop *loop) birdloop_stop_internal(struct birdloop *loop)
{ {
LOOP_TRACE(loop, "Stopping");
/* Block incoming pings */
u32 ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire);
while (!atomic_compare_exchange_strong_explicit(
&loop->thread_transition, &ltt, LTT_PING,
memory_order_acq_rel, memory_order_acquire))
;
/* Flush remaining events */ /* Flush remaining events */
ASSERT_DIE(!ev_run_list(&loop->event_list)); ASSERT_DIE(!ev_run_list(&loop->event_list));
@ -965,17 +1030,27 @@ birdloop_stop_internal(struct birdloop *loop)
while (t = timers_first(&loop->time)) while (t = timers_first(&loop->time))
tm_stop(t); tm_stop(t);
/* No sockets allowed */ /* Drop sockets */
ASSERT_DIE(EMPTY_LIST(loop->sock_list)); sock *s;
WALK_LIST_FIRST2(s, n, loop->sock_list)
birdloop_remove_socket(loop, s);
/* Unschedule from Meta */ /* Unschedule from Meta */
ev_postpone(&loop->event); ev_postpone(&loop->event);
tm_stop(&loop->timer); tm_stop(&loop->timer);
/* Declare loop stopped */ /* Remove from thread loop list */
rem_node(&loop->n); rem_node(&loop->n);
loop->thread = NULL;
/* Leave the loop context without causing any other fuss */
ASSERT_DIE(!ev_active(&loop->event));
loop->ping_pending = 0;
birdloop_leave(loop); birdloop_leave(loop);
/* Request local socket reload */
this_thread->sock_changed++;
/* Tail-call the stopped hook */ /* Tail-call the stopped hook */
loop->stopped(loop->stop_data); loop->stopped(loop->stop_data);
} }
@ -989,12 +1064,14 @@ birdloop_run(void *_loop)
struct birdloop *loop = _loop; struct birdloop *loop = _loop;
birdloop_enter(loop); birdloop_enter(loop);
LOOP_TRACE(loop, "Regular run");
if (loop->stopped) if (loop->stopped)
/* Birdloop left inside the helper function */ /* Birdloop left inside the helper function */
return birdloop_stop_internal(loop); return birdloop_stop_internal(loop);
/* Process sockets */ /* Process sockets */
sockets_fire(loop); this_thread->sock_changed += sockets_fire(loop);
/* Run timers */ /* Run timers */
timers_fire(&loop->time, 0); timers_fire(&loop->time, 0);
@ -1016,6 +1093,10 @@ birdloop_run(void *_loop)
else else
tm_stop(&loop->timer); tm_stop(&loop->timer);
/* Collect socket change requests */
this_thread->sock_changed += loop->sock_changed;
loop->sock_changed = 0;
birdloop_leave(loop); birdloop_leave(loop);
} }
@ -1074,6 +1155,8 @@ birdloop_new(pool *pp, uint order, const char *name)
static void static void
birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data)
{ {
LOOP_TRACE(loop, "Stop requested");
loop->stopped = stopped; loop->stopped = stopped;
loop->stop_data = data; loop->stop_data = data;
@ -1100,8 +1183,7 @@ birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *dat
void void
birdloop_free(struct birdloop *loop) birdloop_free(struct birdloop *loop)
{ {
ASSERT_DIE(loop->links == 0); ASSERT_DIE(loop->thread == NULL);
ASSERT_DIE(birdloop_in_this_thread(loop));
domain_free(loop->time.domain); domain_free(loop->time.domain);
rfree(loop->pool); rfree(loop->pool);
@ -1170,20 +1252,6 @@ birdloop_unmask_wakeups(struct birdloop *loop)
birdloop_wakeup_masked_count = 0; birdloop_wakeup_masked_count = 0;
} }
void
birdloop_link(struct birdloop *loop)
{
ASSERT_DIE(birdloop_inside(loop));
loop->links++;
}
void
birdloop_unlink(struct birdloop *loop)
{
ASSERT_DIE(birdloop_inside(loop));
loop->links--;
}
void void
birdloop_yield(void) birdloop_yield(void)
{ {

View File

@ -22,6 +22,7 @@ struct pfd {
}; };
void sockets_prepare(struct birdloop *, struct pfd *); void sockets_prepare(struct birdloop *, struct pfd *);
void socket_changed(struct birdsock *);
void pipe_new(struct pipe *); void pipe_new(struct pipe *);
void pipe_pollin(struct pipe *, struct pfd *); void pipe_pollin(struct pipe *, struct pfd *);
@ -40,12 +41,12 @@ struct birdloop
struct timeloop time; struct timeloop time;
event_list event_list; event_list event_list;
list sock_list; list sock_list;
struct birdsock *sock_active;
int sock_num; int sock_num;
uint sock_changed;
uint ping_pending; uint ping_pending;
uint links;
_Atomic u32 thread_transition; _Atomic u32 thread_transition;
#define LTT_PING 1 #define LTT_PING 1
#define LTT_MOVE 2 #define LTT_MOVE 2
@ -66,8 +67,6 @@ struct bird_thread
{ {
node n; node n;
_Atomic u32 poll_changed;
struct pipe wakeup; struct pipe wakeup;
event_list priority_events; event_list priority_events;
@ -83,6 +82,8 @@ struct bird_thread
struct pfd *pfd; struct pfd *pfd;
event cleanup_event; event cleanup_event;
int sock_changed;
}; };
#endif #endif

View File

@ -722,10 +722,7 @@ sk_log_error(sock *s, const char *p)
* Actual struct birdsock code * Actual struct birdsock code
*/ */
static struct birdsock *current_sock; sock *
static struct birdsock *stored_sock;
static inline sock *
sk_next(sock *s) sk_next(sock *s)
{ {
if (!s->n.next->next) if (!s->n.next->next)
@ -787,6 +784,7 @@ sk_ssh_free(sock *s)
} }
#endif #endif
static void static void
sk_free(resource *r) sk_free(resource *r)
{ {
@ -799,18 +797,10 @@ sk_free(resource *r)
sk_ssh_free(s); sk_ssh_free(s);
#endif #endif
if ((s->fd < 0) || (s->flags & SKF_THREAD)) if (s->loop)
return; birdloop_remove_socket(s->loop, s);
if (s == current_sock) if (s->fd >= 0 && s->type != SK_SSH && s->type != SK_SSH_ACTIVE)
current_sock = sk_next(s);
if (s == stored_sock)
stored_sock = sk_next(s);
if (enlisted(&s->n))
rem_node(&s->n);
if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE)
close(s->fd); close(s->fd);
s->fd = -1; s->fd = -1;
@ -1022,12 +1012,6 @@ sk_setup(sock *s)
return 0; return 0;
} }
static void
sk_insert(sock *s)
{
add_tail(&main_birdloop.sock_list, &s->n);
}
static void static void
sk_tcp_connected(sock *s) sk_tcp_connected(sock *s)
{ {
@ -1101,10 +1085,7 @@ sk_passive_connected(sock *s, int type)
return 1; return 1;
} }
if (s->flags & SKF_PASSIVE_THREAD) birdloop_add_socket(s->loop, t);
t->flags |= SKF_THREAD;
else
sk_insert(t);
sk_alloc_bufs(t); sk_alloc_bufs(t);
s->rx_hook(t, 0); s->rx_hook(t, 0);
@ -1319,6 +1300,7 @@ sk_open_ssh(sock *s)
/** /**
* sk_open - open a socket * sk_open - open a socket
* @loop: loop
* @s: socket * @s: socket
* *
* This function takes a socket resource created by sk_new() and * This function takes a socket resource created by sk_new() and
@ -1328,7 +1310,7 @@ sk_open_ssh(sock *s)
* Result: 0 for success, -1 for an error. * Result: 0 for success, -1 for an error.
*/ */
int int
sk_open(sock *s) sk_open(sock *s, struct birdloop *loop)
{ {
int af = AF_UNSPEC; int af = AF_UNSPEC;
int fd = -1; int fd = -1;
@ -1481,9 +1463,7 @@ sk_open(sock *s)
sk_alloc_bufs(s); sk_alloc_bufs(s);
} }
if (!(s->flags & SKF_THREAD)) birdloop_add_socket(loop, s);
sk_insert(s);
return 0; return 0;
err: err:
@ -1493,7 +1473,7 @@ err:
} }
int int
sk_open_unix(sock *s, char *name) sk_open_unix(sock *s, struct birdloop *loop, char *name)
{ {
struct sockaddr_un sa; struct sockaddr_un sa;
int fd; int fd;
@ -1520,40 +1500,10 @@ sk_open_unix(sock *s, char *name)
return -1; return -1;
s->fd = fd; s->fd = fd;
sk_insert(s); birdloop_add_socket(loop, s);
return 0; return 0;
} }
static void
sk_reloop_hook(void *_vs)
{
sock *s = _vs;
if (birdloop_inside(&main_birdloop))
{
s->flags &= ~SKF_THREAD;
sk_insert(s);
}
else
{
s->flags |= SKF_THREAD;
sk_start(s);
}
}
void
sk_reloop(sock *s, struct birdloop *loop)
{
if (enlisted(&s->n))
rem_node(&s->n);
s->reloop = (event) {
.hook = sk_reloop_hook,
.data = s,
};
ev_send_loop(loop, &s->reloop);
}
#define CMSG_RX_SPACE MAX(CMSG4_SPACE_PKTINFO+CMSG4_SPACE_TTL, \ #define CMSG_RX_SPACE MAX(CMSG4_SPACE_PKTINFO+CMSG4_SPACE_TTL, \
CMSG6_SPACE_PKTINFO+CMSG6_SPACE_TTL) CMSG6_SPACE_PKTINFO+CMSG6_SPACE_TTL)
@ -1676,6 +1626,13 @@ sk_recvmsg(sock *s)
static inline void reset_tx_buffer(sock *s) { s->ttx = s->tpos = s->tbuf; } static inline void reset_tx_buffer(sock *s) { s->ttx = s->tpos = s->tbuf; }
_Bool
sk_tx_pending(sock *s)
{
return s->ttx != s->tpos;
}
static int static int
sk_maybe_write(sock *s) sk_maybe_write(sock *s)
{ {
@ -1686,7 +1643,7 @@ sk_maybe_write(sock *s)
case SK_TCP: case SK_TCP:
case SK_MAGIC: case SK_MAGIC:
case SK_UNIX: case SK_UNIX:
while (s->ttx != s->tpos) while (sk_tx_pending(s))
{ {
e = write(s->fd, s->ttx, s->tpos - s->ttx); e = write(s->fd, s->ttx, s->tpos - s->ttx);
@ -1708,7 +1665,7 @@ sk_maybe_write(sock *s)
#ifdef HAVE_LIBSSH #ifdef HAVE_LIBSSH
case SK_SSH: case SK_SSH:
while (s->ttx != s->tpos) while (sk_tx_pending(s))
{ {
e = ssh_channel_write(s->ssh->channel, s->ttx, s->tpos - s->ttx); e = ssh_channel_write(s->ssh->channel, s->ttx, s->tpos - s->ttx);
@ -1791,7 +1748,12 @@ sk_send(sock *s, unsigned len)
{ {
s->ttx = s->tbuf; s->ttx = s->tbuf;
s->tpos = s->tbuf + len; s->tpos = s->tbuf + len;
return sk_maybe_write(s);
int e = sk_maybe_write(s);
if (e == 0) /* Trigger thread poll reload to poll this socket's write. */
socket_changed(s);
return e;
} }
/** /**
@ -1838,7 +1800,7 @@ call_rx_hook(sock *s, int size)
if (s->rx_hook(s, size)) if (s->rx_hook(s, size))
{ {
/* We need to be careful since the socket could have been deleted by the hook */ /* We need to be careful since the socket could have been deleted by the hook */
if (current_sock == s) if (s->loop->sock_active == s)
s->rpos = s->rbuf; s->rpos = s->rbuf;
} }
} }
@ -2002,7 +1964,7 @@ sk_write_noflush(sock *s)
#endif #endif
default: default:
if (s->ttx != s->tpos && sk_maybe_write(s) > 0) if (sk_tx_pending(s) && sk_maybe_write(s) > 0)
{ {
if (s->tx_hook) if (s->tx_hook)
s->tx_hook(s); s->tx_hook(s);
@ -2224,6 +2186,8 @@ static int short_loops = 0;
#define SHORT_LOOP_MAX 10 #define SHORT_LOOP_MAX 10
#define WORK_EVENTS_MAX 10 #define WORK_EVENTS_MAX 10
sock *stored_sock;
void void
io_loop(void) io_loop(void)
{ {
@ -2312,17 +2276,13 @@ io_loop(void)
times_update(); times_update();
/* guaranteed to be non-empty */ /* guaranteed to be non-empty */
current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
while (current_sock) while (main_birdloop.sock_active)
{ {
sock *s = current_sock; sock *s = main_birdloop.sock_active;
if (s->index == -1) if (s->index != -1)
{ {
current_sock = sk_next(s);
goto next;
}
int e; int e;
int steps; int steps;
@ -2333,10 +2293,11 @@ io_loop(void)
steps--; steps--;
io_log_event(s->rx_hook, s->data); io_log_event(s->rx_hook, s->data);
e = sk_read(s, pfd.pfd.data[s->index].revents); e = sk_read(s, pfd.pfd.data[s->index].revents);
if (s != current_sock)
goto next;
} }
while (e && s->rx_hook && steps); while (e && (main_birdloop.sock_active == s) && s->rx_hook && steps);
if (s != main_birdloop.sock_active)
continue;
steps = MAX_STEPS; steps = MAX_STEPS;
if (pfd.pfd.data[s->index].revents & POLLOUT) if (pfd.pfd.data[s->index].revents & POLLOUT)
@ -2345,13 +2306,14 @@ io_loop(void)
steps--; steps--;
io_log_event(s->tx_hook, s->data); io_log_event(s->tx_hook, s->data);
e = sk_write(s); e = sk_write(s);
if (s != current_sock)
goto next;
} }
while (e && steps); while (e && (main_birdloop.sock_active == s) && steps);
current_sock = sk_next(s); if (s != main_birdloop.sock_active)
next: ; continue;
}
main_birdloop.sock_active = sk_next(s);
} }
short_loops++; short_loops++;
@ -2360,41 +2322,38 @@ io_loop(void)
short_loops = 0; short_loops = 0;
int count = 0; int count = 0;
current_sock = stored_sock; main_birdloop.sock_active = stored_sock;
if (current_sock == NULL) if (main_birdloop.sock_active == NULL)
current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
while (current_sock && count < MAX_RX_STEPS) while (main_birdloop.sock_active && count < MAX_RX_STEPS)
{ {
sock *s = current_sock; sock *s = main_birdloop.sock_active;
if (s->index == -1) if (s->index == -1)
{
current_sock = sk_next(s);
goto next2; goto next2;
}
if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook) if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook)
{ {
count++; count++;
io_log_event(s->rx_hook, s->data); io_log_event(s->rx_hook, s->data);
sk_read(s, pfd.pfd.data[s->index].revents); sk_read(s, pfd.pfd.data[s->index].revents);
if (s != current_sock) if (s != main_birdloop.sock_active)
goto next2; continue;
} }
if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR)) if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR))
{ {
sk_err(s, pfd.pfd.data[s->index].revents); sk_err(s, pfd.pfd.data[s->index].revents);
if (s != current_sock) if (s != main_birdloop.sock_active)
goto next2; continue;
} }
current_sock = sk_next(s);
next2: ; next2: ;
main_birdloop.sock_active = sk_next(s);
} }
stored_sock = current_sock; stored_sock = main_birdloop.sock_active;
} }
} }
} }

View File

@ -542,7 +542,7 @@ cli_init_unix(uid_t use_uid, gid_t use_gid)
/* Return value intentionally ignored */ /* Return value intentionally ignored */
unlink(path_control_socket); unlink(path_control_socket);
if (sk_open_unix(s, path_control_socket) < 0) if (sk_open_unix(s, &main_birdloop, path_control_socket) < 0)
die("Cannot create control socket %s: %m", path_control_socket); die("Cannot create control socket %s: %m", path_control_socket);
if (use_uid || use_gid) if (use_uid || use_gid)

View File

@ -9,6 +9,9 @@
#ifndef _BIRD_UNIX_H_ #ifndef _BIRD_UNIX_H_
#define _BIRD_UNIX_H_ #define _BIRD_UNIX_H_
#include "nest/bird.h"
#include "lib/io-loop.h"
#include <sys/socket.h> #include <sys/socket.h>
#include <signal.h> #include <signal.h>
@ -110,7 +113,7 @@ extern volatile sig_atomic_t async_shutdown_flag;
void io_init(void); void io_init(void);
void io_loop(void); void io_loop(void);
void io_log_dump(void); void io_log_dump(void);
int sk_open_unix(struct birdsock *s, char *name); int sk_open_unix(struct birdsock *s, struct birdloop *, char *name);
struct rfile *rf_open(struct pool *, const char *name, const char *mode); struct rfile *rf_open(struct pool *, const char *name, const char *mode);
void *rf_file(struct rfile *f); void *rf_file(struct rfile *f);
int rf_fileno(struct rfile *f); int rf_fileno(struct rfile *f);