diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h index 26319fa98b..fcfd489c6c 100644 --- a/include/io/channel-socket.h +++ b/include/io/channel-socket.h @@ -50,6 +50,11 @@ struct QIOChannelSocket { ssize_t zero_copy_queued; ssize_t zero_copy_sent; bool blocking; + /** + * This flag indicates whether any new data was successfully sent with + * zerocopy since the last qio_channel_socket_flush() call. + */ + bool new_zero_copy_sent_success; }; diff --git a/io/channel-socket.c b/io/channel-socket.c index 8b30d5b7f7..3053b35ad8 100644 --- a/io/channel-socket.c +++ b/io/channel-socket.c @@ -37,6 +37,12 @@ #define SOCKET_MAX_FDS 16 +#ifdef QEMU_MSG_ZEROCOPY +static int qio_channel_socket_flush_internal(QIOChannel *ioc, + bool block, + Error **errp); +#endif + SocketAddress * qio_channel_socket_get_local_address(QIOChannelSocket *ioc, Error **errp) @@ -66,6 +72,7 @@ qio_channel_socket_new(void) sioc->zero_copy_queued = 0; sioc->zero_copy_sent = 0; sioc->blocking = false; + sioc->new_zero_copy_sent_success = false; ioc = QIO_CHANNEL(sioc); qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN); @@ -618,6 +625,10 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc, size_t fdsize = sizeof(int) * nfds; struct cmsghdr *cmsg; int sflags = 0; +#ifdef QEMU_MSG_ZEROCOPY + bool blocking = sioc->blocking; + bool zerocopy_flushed_once = false; +#endif memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)); @@ -662,13 +673,30 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc, return QIO_CHANNEL_ERR_BLOCK; case EINTR: goto retry; +#ifdef QEMU_MSG_ZEROCOPY case ENOBUFS: if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) { - error_setg_errno(errp, errno, - "Process can't lock enough memory for using MSG_ZEROCOPY"); - return -1; + /** + * Socket error queueing may exhaust the OPTMEM limit. Try + * flushing the error queue once. + */ + if (!zerocopy_flushed_once) { + ret = qio_channel_socket_flush_internal(ioc, blocking, + errp); + if (ret < 0) { + return -1; + } + zerocopy_flushed_once = true; + goto retry; + } else { + error_setg_errno(errp, errno, + "Process can't lock enough memory for " + "using MSG_ZEROCOPY"); + return -1; + } } break; +#endif } error_setg_errno(errp, errno, @@ -777,8 +805,9 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc, #ifdef QEMU_MSG_ZEROCOPY -static int qio_channel_socket_flush(QIOChannel *ioc, - Error **errp) +static int qio_channel_socket_flush_internal(QIOChannel *ioc, + bool block, + Error **errp) { QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); struct msghdr msg = {}; @@ -786,7 +815,6 @@ static int qio_channel_socket_flush(QIOChannel *ioc, struct cmsghdr *cm; char control[CMSG_SPACE(sizeof(*serr))]; int received; - int ret; if (sioc->zero_copy_queued == sioc->zero_copy_sent) { return 0; @@ -796,16 +824,25 @@ static int qio_channel_socket_flush(QIOChannel *ioc, msg.msg_controllen = sizeof(control); memset(control, 0, sizeof(control)); - ret = 1; - while (sioc->zero_copy_sent < sioc->zero_copy_queued) { received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE); if (received < 0) { switch (errno) { case EAGAIN: - /* Nothing on errqueue, wait until something is available */ - qio_channel_wait(ioc, G_IO_ERR); - continue; + if (block) { + /* + * Nothing on errqueue, wait until something is + * available. + * + * Use G_IO_ERR instead of G_IO_IN since MSG_ERRQUEUE reads + * are signaled via POLLERR, not POLLIN, as the kernel + * sets POLLERR when zero-copy notificatons appear on the + * socket error queue. + */ + qio_channel_wait(ioc, G_IO_ERR); + continue; + } + return 0; case EINTR: continue; default: @@ -843,13 +880,32 @@ static int qio_channel_socket_flush(QIOChannel *ioc, /* No errors, count successfully finished sendmsg()*/ sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1; - /* If any sendmsg() succeeded using zero copy, return 0 at the end */ + /* If any sendmsg() succeeded using zero copy, mark zerocopy success */ if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) { - ret = 0; + sioc->new_zero_copy_sent_success = true; } } - return ret; + return 0; +} + +static int qio_channel_socket_flush(QIOChannel *ioc, + Error **errp) +{ + QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); + int ret; + + ret = qio_channel_socket_flush_internal(ioc, true, errp); + if (ret < 0) { + return ret; + } + + if (sioc->new_zero_copy_sent_success) { + sioc->new_zero_copy_sent_success = false; + return 0; + } + + return 1; } #endif /* QEMU_MSG_ZEROCOPY */