diff --git a/lib/event.c b/lib/event.c index 766ffa15..10f83c28 100644 --- a/lib/event.c +++ b/lib/event.c @@ -283,7 +283,13 @@ void ev_uncork(struct event_cork *ec) birdloop_ping(el->loop); } - UNLOCK_DOMAIN(cork, ec->lock); + struct birdsock *sk; + WALK_LIST_FIRST2(sk, cork_node, ec->sockets) + { +// log(L_TRACE "Socket %p uncorked", sk); + rem_node(&sk->cork_node); + sk_ping(sk); + } - birdloop_ping(&main_birdloop); + UNLOCK_DOMAIN(cork, ec->lock); } diff --git a/lib/event.h b/lib/event.h index cd85bf78..3af33a7f 100644 --- a/lib/event.h +++ b/lib/event.h @@ -38,6 +38,7 @@ struct event_cork { DOMAIN(cork) lock; u32 count; list events; + list sockets; }; extern event_list global_event_list; @@ -56,6 +57,7 @@ static inline void ev_init_list(event_list *el, struct birdloop *loop, const cha static inline void ev_init_cork(struct event_cork *ec, const char *name) { init_list(&ec->events); + init_list(&ec->sockets); ec->lock = DOMAIN_NEW(cork, name); ec->count = 0; }; diff --git a/lib/io-loop.h b/lib/io-loop.h index 386a31d5..d60fb1ae 100644 --- a/lib/io-loop.h +++ b/lib/io-loop.h @@ -17,6 +17,7 @@ void sk_start(sock *s); void sk_stop(sock *s); void sk_reloop(sock *s, struct birdloop *loop); +void sk_ping(sock *s); extern struct birdloop main_birdloop; diff --git a/lib/socket.h b/lib/socket.h index 17d647f3..89398edf 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -59,6 +59,7 @@ typedef struct birdsock { uint rbsize; int (*rx_hook)(struct birdsock *, uint size); /* NULL=receiving turned off, returns 1 to clear rx buffer */ struct event_cork *cork; /* Cork to temporarily stop receiving data */ + node cork_node; /* Node in cork list */ byte *tbuf, *tpos; /* NULL=allocate automatically */ byte *ttx; /* Internal */ diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c index 1d3a555f..5ab93b31 100644 --- a/sysdep/unix/io-loop.c +++ b/sysdep/unix/io-loop.c @@ -218,7 +218,36 @@ sk_stop(sock *s) } static inline uint sk_want_events(sock *s) -{ return ((s->rx_hook && !ev_corked(s->cork)) ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); } +{ + uint out = ((s->ttx != s->tpos) ? POLLOUT : 0); + if (s->rx_hook) + if (s->cork) + { + LOCK_DOMAIN(cork, s->cork->lock); + if (!enlisted(&s->cork_node)) + if (s->cork->count) + { +// log(L_TRACE "Socket %p corked", s); + add_tail(&s->cork->sockets, &s->cork_node); + } + else + out |= POLLIN; + UNLOCK_DOMAIN(cork, s->cork->lock); + } + else + out |= POLLIN; + +// log(L_TRACE "sk_want_events(%p) = %x", s, out); + return out; +} + + +void +sk_ping(sock *s) +{ + s->loop->poll_changed = 1; + birdloop_ping(s->loop); +} /* FIXME: this should be called from sock code @@ -284,7 +313,10 @@ sockets_fire(struct birdloop *loop) /* Last fd is internal wakeup fd */ if (pfd[poll_num].revents & POLLIN) + { wakeup_drain(loop); + loop->poll_changed = 1; + } int i; for (i = 0; i < poll_num; pfd++, psk++, i++) diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c index b57e5894..fa836f28 100644 --- a/sysdep/unix/io.c +++ b/sysdep/unix/io.c @@ -802,8 +802,13 @@ sk_free(resource *r) sk_ssh_free(s); #endif - if ((s->fd < 0) || (s->flags & SKF_THREAD)) - return; + if (s->cork) + { + LOCK_DOMAIN(cork, s->cork->lock); + if (enlisted(&s->cork_node)) + rem_node(&s->cork_node); + UNLOCK_DOMAIN(cork, s->cork->lock); + } if (!s->loop) ; @@ -820,7 +825,7 @@ sk_free(resource *r) rem_node(&s->n); } - if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE) + if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE && s->fd != -1) close(s->fd); s->fd = -1; @@ -1710,6 +1715,7 @@ sk_maybe_write(sock *s) s->err_hook(s, (errno != EPIPE) ? errno : 0); return -1; } + sk_ping(s); return 0; } s->ttx += e; @@ -1917,6 +1923,25 @@ sk_read(sock *s, int revents) case SK_TCP: case SK_UNIX: { + if (s->cork) + { + int cont = 0; + LOCK_DOMAIN(cork, s->cork->lock); + if (!enlisted(&s->cork_node)) + if (s->cork->count) + { +// log(L_TRACE "Socket %p corked", s); + add_tail(&s->cork->sockets, &s->cork_node); + sk_ping(s); + } + else + cont = 1; + UNLOCK_DOMAIN(cork, s->cork->lock); + + if (!cont) + return 0; + } + int c = read(s->fd, s->rpos, s->rbuf + s->rbsize - s->rpos); if (c < 0)