From 43a123c1ae6613b3efeed291fa552ecd909d3acf Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Tue, 16 Apr 2024 21:23:18 +0200 Subject: Adding upstream version 1.20.14. Signed-off-by: Daniel Baumann --- src/internal/poll/splice_linux.go | 231 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100644 src/internal/poll/splice_linux.go (limited to 'src/internal/poll/splice_linux.go') diff --git a/src/internal/poll/splice_linux.go b/src/internal/poll/splice_linux.go new file mode 100644 index 0000000..96cbe4a --- /dev/null +++ b/src/internal/poll/splice_linux.go @@ -0,0 +1,231 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import ( + "runtime" + "sync" + "syscall" + "unsafe" +) + +const ( + // spliceNonblock makes calls to splice(2) non-blocking. + spliceNonblock = 0x2 + + // maxSpliceSize is the maximum amount of data Splice asks + // the kernel to move in a single call to splice(2). + // We use 1MB as Splice writes data through a pipe, and 1MB is the default maximum pipe buffer size, + // which is determined by /proc/sys/fs/pipe-max-size. + maxSpliceSize = 1 << 20 +) + +// Splice transfers at most remain bytes of data from src to dst, using the +// splice system call to minimize copies of data from and to userspace. +// +// Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer. +// src and dst must both be stream-oriented sockets. +// +// If err != nil, sc is the system call which caused the error. +func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) { + p, sc, err := getPipe() + if err != nil { + return 0, false, sc, err + } + defer putPipe(p) + var inPipe, n int + for err == nil && remain > 0 { + max := maxSpliceSize + if int64(max) > remain { + max = int(remain) + } + inPipe, err = spliceDrain(p.wfd, src, max) + // The operation is considered handled if splice returns no + // error, or an error other than EINVAL. An EINVAL means the + // kernel does not support splice for the socket type of src. + // The failed syscall does not consume any data so it is safe + // to fall back to a generic copy. + // + // spliceDrain should never return EAGAIN, so if err != nil, + // Splice cannot continue. + // + // If inPipe == 0 && err == nil, src is at EOF, and the + // transfer is complete. + handled = handled || (err != syscall.EINVAL) + if err != nil || inPipe == 0 { + break + } + p.data += inPipe + + n, err = splicePump(dst, p.rfd, inPipe) + if n > 0 { + written += int64(n) + remain -= int64(n) + p.data -= n + } + } + if err != nil { + return written, handled, "splice", err + } + return written, true, "", nil +} + +// spliceDrain moves data from a socket to a pipe. +// +// Invariant: when entering spliceDrain, the pipe is empty. It is either in its +// initial state, or splicePump has emptied it previously. +// +// Given this, spliceDrain can reasonably assume that the pipe is ready for +// writing, so if splice returns EAGAIN, it must be because the socket is not +// ready for reading. +// +// If spliceDrain returns (0, nil), src is at EOF. +func spliceDrain(pipefd int, sock *FD, max int) (int, error) { + if err := sock.readLock(); err != nil { + return 0, err + } + defer sock.readUnlock() + if err := sock.pd.prepareRead(sock.isFile); err != nil { + return 0, err + } + for { + n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock) + if err == syscall.EINTR { + continue + } + if err != syscall.EAGAIN { + return n, err + } + if err := sock.pd.waitRead(sock.isFile); err != nil { + return n, err + } + } +} + +// splicePump moves all the buffered data from a pipe to a socket. +// +// Invariant: when entering splicePump, there are exactly inPipe +// bytes of data in the pipe, from a previous call to spliceDrain. +// +// By analogy to the condition from spliceDrain, splicePump +// only needs to poll the socket for readiness, if splice returns +// EAGAIN. +// +// If splicePump cannot move all the data in a single call to +// splice(2), it loops over the buffered data until it has written +// all of it to the socket. This behavior is similar to the Write +// step of an io.Copy in userspace. +func splicePump(sock *FD, pipefd int, inPipe int) (int, error) { + if err := sock.writeLock(); err != nil { + return 0, err + } + defer sock.writeUnlock() + if err := sock.pd.prepareWrite(sock.isFile); err != nil { + return 0, err + } + written := 0 + for inPipe > 0 { + n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock) + // Here, the condition n == 0 && err == nil should never be + // observed, since Splice controls the write side of the pipe. + if n > 0 { + inPipe -= n + written += n + continue + } + if err != syscall.EAGAIN { + return written, err + } + if err := sock.pd.waitWrite(sock.isFile); err != nil { + return written, err + } + } + return written, nil +} + +// splice wraps the splice system call. Since the current implementation +// only uses splice on sockets and pipes, the offset arguments are unused. +// splice returns int instead of int64, because callers never ask it to +// move more data in a single call than can fit in an int32. +func splice(out int, in int, max int, flags int) (int, error) { + n, err := syscall.Splice(in, nil, out, nil, max, flags) + return int(n), err +} + +type splicePipeFields struct { + rfd int + wfd int + data int +} + +type splicePipe struct { + splicePipeFields + + // We want to use a finalizer, so ensure that the size is + // large enough to not use the tiny allocator. + _ [24 - unsafe.Sizeof(splicePipeFields{})%24]byte +} + +// splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers. +// The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up +// a finalizer for each pipe to close its file descriptors before the actual GC. +var splicePipePool = sync.Pool{New: newPoolPipe} + +func newPoolPipe() any { + // Discard the error which occurred during the creation of pipe buffer, + // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback. + p := newPipe() + if p == nil { + return nil + } + runtime.SetFinalizer(p, destroyPipe) + return p +} + +// getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache. +// +// Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error +// and system call name splice in a string as the indication. +func getPipe() (*splicePipe, string, error) { + v := splicePipePool.Get() + if v == nil { + return nil, "splice", syscall.EINVAL + } + return v.(*splicePipe), "", nil +} + +func putPipe(p *splicePipe) { + // If there is still data left in the pipe, + // then close and discard it instead of putting it back into the pool. + if p.data != 0 { + runtime.SetFinalizer(p, nil) + destroyPipe(p) + return + } + splicePipePool.Put(p) +} + +// newPipe sets up a pipe for a splice operation. +func newPipe() *splicePipe { + var fds [2]int + if err := syscall.Pipe2(fds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK); err != nil { + return nil + } + + // Splice will loop writing maxSpliceSize bytes from the source to the pipe, + // and then write those bytes from the pipe to the destination. + // Set the pipe buffer size to maxSpliceSize to optimize that. + // Ignore errors here, as a smaller buffer size will work, + // although it will require more system calls. + fcntl(fds[0], syscall.F_SETPIPE_SZ, maxSpliceSize) + + return &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}} +} + +// destroyPipe destroys a pipe. +func destroyPipe(p *splicePipe) { + CloseFunc(p.rfd) + CloseFunc(p.wfd) +} -- cgit v1.2.3