summaryrefslogtreecommitdiffstats
path: root/src/internal/poll/splice_linux.go
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-16 19:23:18 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-16 19:23:18 +0000
commit43a123c1ae6613b3efeed291fa552ecd909d3acf (patch)
treefd92518b7024bc74031f78a1cf9e454b65e73665 /src/internal/poll/splice_linux.go
parentInitial commit. (diff)
downloadgolang-1.20-43a123c1ae6613b3efeed291fa552ecd909d3acf.tar.xz
golang-1.20-43a123c1ae6613b3efeed291fa552ecd909d3acf.zip
Adding upstream version 1.20.14.upstream/1.20.14upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/internal/poll/splice_linux.go')
-rw-r--r--src/internal/poll/splice_linux.go231
1 files changed, 231 insertions, 0 deletions
diff --git a/src/internal/poll/splice_linux.go b/src/internal/poll/splice_linux.go
new file mode 100644
index 0000000..96cbe4a
--- /dev/null
+++ b/src/internal/poll/splice_linux.go
@@ -0,0 +1,231 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package poll
+
+import (
+ "runtime"
+ "sync"
+ "syscall"
+ "unsafe"
+)
+
+const (
+ // spliceNonblock makes calls to splice(2) non-blocking.
+ spliceNonblock = 0x2
+
+ // maxSpliceSize is the maximum amount of data Splice asks
+ // the kernel to move in a single call to splice(2).
+ // We use 1MB as Splice writes data through a pipe, and 1MB is the default maximum pipe buffer size,
+ // which is determined by /proc/sys/fs/pipe-max-size.
+ maxSpliceSize = 1 << 20
+)
+
+// Splice transfers at most remain bytes of data from src to dst, using the
+// splice system call to minimize copies of data from and to userspace.
+//
+// Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer.
+// src and dst must both be stream-oriented sockets.
+//
+// If err != nil, sc is the system call which caused the error.
+func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) {
+ p, sc, err := getPipe()
+ if err != nil {
+ return 0, false, sc, err
+ }
+ defer putPipe(p)
+ var inPipe, n int
+ for err == nil && remain > 0 {
+ max := maxSpliceSize
+ if int64(max) > remain {
+ max = int(remain)
+ }
+ inPipe, err = spliceDrain(p.wfd, src, max)
+ // The operation is considered handled if splice returns no
+ // error, or an error other than EINVAL. An EINVAL means the
+ // kernel does not support splice for the socket type of src.
+ // The failed syscall does not consume any data so it is safe
+ // to fall back to a generic copy.
+ //
+ // spliceDrain should never return EAGAIN, so if err != nil,
+ // Splice cannot continue.
+ //
+ // If inPipe == 0 && err == nil, src is at EOF, and the
+ // transfer is complete.
+ handled = handled || (err != syscall.EINVAL)
+ if err != nil || inPipe == 0 {
+ break
+ }
+ p.data += inPipe
+
+ n, err = splicePump(dst, p.rfd, inPipe)
+ if n > 0 {
+ written += int64(n)
+ remain -= int64(n)
+ p.data -= n
+ }
+ }
+ if err != nil {
+ return written, handled, "splice", err
+ }
+ return written, true, "", nil
+}
+
+// spliceDrain moves data from a socket to a pipe.
+//
+// Invariant: when entering spliceDrain, the pipe is empty. It is either in its
+// initial state, or splicePump has emptied it previously.
+//
+// Given this, spliceDrain can reasonably assume that the pipe is ready for
+// writing, so if splice returns EAGAIN, it must be because the socket is not
+// ready for reading.
+//
+// If spliceDrain returns (0, nil), src is at EOF.
+func spliceDrain(pipefd int, sock *FD, max int) (int, error) {
+ if err := sock.readLock(); err != nil {
+ return 0, err
+ }
+ defer sock.readUnlock()
+ if err := sock.pd.prepareRead(sock.isFile); err != nil {
+ return 0, err
+ }
+ for {
+ n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock)
+ if err == syscall.EINTR {
+ continue
+ }
+ if err != syscall.EAGAIN {
+ return n, err
+ }
+ if err := sock.pd.waitRead(sock.isFile); err != nil {
+ return n, err
+ }
+ }
+}
+
+// splicePump moves all the buffered data from a pipe to a socket.
+//
+// Invariant: when entering splicePump, there are exactly inPipe
+// bytes of data in the pipe, from a previous call to spliceDrain.
+//
+// By analogy to the condition from spliceDrain, splicePump
+// only needs to poll the socket for readiness, if splice returns
+// EAGAIN.
+//
+// If splicePump cannot move all the data in a single call to
+// splice(2), it loops over the buffered data until it has written
+// all of it to the socket. This behavior is similar to the Write
+// step of an io.Copy in userspace.
+func splicePump(sock *FD, pipefd int, inPipe int) (int, error) {
+ if err := sock.writeLock(); err != nil {
+ return 0, err
+ }
+ defer sock.writeUnlock()
+ if err := sock.pd.prepareWrite(sock.isFile); err != nil {
+ return 0, err
+ }
+ written := 0
+ for inPipe > 0 {
+ n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock)
+ // Here, the condition n == 0 && err == nil should never be
+ // observed, since Splice controls the write side of the pipe.
+ if n > 0 {
+ inPipe -= n
+ written += n
+ continue
+ }
+ if err != syscall.EAGAIN {
+ return written, err
+ }
+ if err := sock.pd.waitWrite(sock.isFile); err != nil {
+ return written, err
+ }
+ }
+ return written, nil
+}
+
+// splice wraps the splice system call. Since the current implementation
+// only uses splice on sockets and pipes, the offset arguments are unused.
+// splice returns int instead of int64, because callers never ask it to
+// move more data in a single call than can fit in an int32.
+func splice(out int, in int, max int, flags int) (int, error) {
+ n, err := syscall.Splice(in, nil, out, nil, max, flags)
+ return int(n), err
+}
+
+type splicePipeFields struct {
+ rfd int
+ wfd int
+ data int
+}
+
+type splicePipe struct {
+ splicePipeFields
+
+ // We want to use a finalizer, so ensure that the size is
+ // large enough to not use the tiny allocator.
+ _ [24 - unsafe.Sizeof(splicePipeFields{})%24]byte
+}
+
+// splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers.
+// The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up
+// a finalizer for each pipe to close its file descriptors before the actual GC.
+var splicePipePool = sync.Pool{New: newPoolPipe}
+
+func newPoolPipe() any {
+ // Discard the error which occurred during the creation of pipe buffer,
+ // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
+ p := newPipe()
+ if p == nil {
+ return nil
+ }
+ runtime.SetFinalizer(p, destroyPipe)
+ return p
+}
+
+// getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache.
+//
+// Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error
+// and system call name splice in a string as the indication.
+func getPipe() (*splicePipe, string, error) {
+ v := splicePipePool.Get()
+ if v == nil {
+ return nil, "splice", syscall.EINVAL
+ }
+ return v.(*splicePipe), "", nil
+}
+
+func putPipe(p *splicePipe) {
+ // If there is still data left in the pipe,
+ // then close and discard it instead of putting it back into the pool.
+ if p.data != 0 {
+ runtime.SetFinalizer(p, nil)
+ destroyPipe(p)
+ return
+ }
+ splicePipePool.Put(p)
+}
+
+// newPipe sets up a pipe for a splice operation.
+func newPipe() *splicePipe {
+ var fds [2]int
+ if err := syscall.Pipe2(fds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK); err != nil {
+ return nil
+ }
+
+ // Splice will loop writing maxSpliceSize bytes from the source to the pipe,
+ // and then write those bytes from the pipe to the destination.
+ // Set the pipe buffer size to maxSpliceSize to optimize that.
+ // Ignore errors here, as a smaller buffer size will work,
+ // although it will require more system calls.
+ fcntl(fds[0], syscall.F_SETPIPE_SZ, maxSpliceSize)
+
+ return &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}}
+}
+
+// destroyPipe destroys a pipe.
+func destroyPipe(p *splicePipe) {
+ CloseFunc(p.rfd)
+ CloseFunc(p.wfd)
+}