diff options
Diffstat (limited to 'src/internal/poll/splice_linux.go')
-rw-r--r-- | src/internal/poll/splice_linux.go | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/src/internal/poll/splice_linux.go b/src/internal/poll/splice_linux.go new file mode 100644 index 0000000..01baf14 --- /dev/null +++ b/src/internal/poll/splice_linux.go @@ -0,0 +1,194 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import ( + "internal/syscall/unix" + "sync/atomic" + "syscall" + "unsafe" +) + +const ( + // spliceNonblock makes calls to splice(2) non-blocking. + spliceNonblock = 0x2 + + // maxSpliceSize is the maximum amount of data Splice asks + // the kernel to move in a single call to splice(2). + maxSpliceSize = 4 << 20 +) + +// Splice transfers at most remain bytes of data from src to dst, using the +// splice system call to minimize copies of data from and to userspace. +// +// Splice creates a temporary pipe, to serve as a buffer for the data transfer. +// src and dst must both be stream-oriented sockets. +// +// If err != nil, sc is the system call which caused the error. +func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) { + prfd, pwfd, sc, err := newTempPipe() + if err != nil { + return 0, false, sc, err + } + defer destroyTempPipe(prfd, pwfd) + var inPipe, n int + for err == nil && remain > 0 { + max := maxSpliceSize + if int64(max) > remain { + max = int(remain) + } + inPipe, err = spliceDrain(pwfd, src, max) + // The operation is considered handled if splice returns no + // error, or an error other than EINVAL. An EINVAL means the + // kernel does not support splice for the socket type of src. + // The failed syscall does not consume any data so it is safe + // to fall back to a generic copy. + // + // spliceDrain should never return EAGAIN, so if err != nil, + // Splice cannot continue. + // + // If inPipe == 0 && err == nil, src is at EOF, and the + // transfer is complete. + handled = handled || (err != syscall.EINVAL) + if err != nil || (inPipe == 0 && err == nil) { + break + } + n, err = splicePump(dst, prfd, inPipe) + if n > 0 { + written += int64(n) + remain -= int64(n) + } + } + if err != nil { + return written, handled, "splice", err + } + return written, true, "", nil +} + +// spliceDrain moves data from a socket to a pipe. +// +// Invariant: when entering spliceDrain, the pipe is empty. It is either in its +// initial state, or splicePump has emptied it previously. +// +// Given this, spliceDrain can reasonably assume that the pipe is ready for +// writing, so if splice returns EAGAIN, it must be because the socket is not +// ready for reading. +// +// If spliceDrain returns (0, nil), src is at EOF. +func spliceDrain(pipefd int, sock *FD, max int) (int, error) { + if err := sock.readLock(); err != nil { + return 0, err + } + defer sock.readUnlock() + if err := sock.pd.prepareRead(sock.isFile); err != nil { + return 0, err + } + for { + n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock) + if err == syscall.EINTR { + continue + } + if err != syscall.EAGAIN { + return n, err + } + if err := sock.pd.waitRead(sock.isFile); err != nil { + return n, err + } + } +} + +// splicePump moves all the buffered data from a pipe to a socket. +// +// Invariant: when entering splicePump, there are exactly inPipe +// bytes of data in the pipe, from a previous call to spliceDrain. +// +// By analogy to the condition from spliceDrain, splicePump +// only needs to poll the socket for readiness, if splice returns +// EAGAIN. +// +// If splicePump cannot move all the data in a single call to +// splice(2), it loops over the buffered data until it has written +// all of it to the socket. This behavior is similar to the Write +// step of an io.Copy in userspace. +func splicePump(sock *FD, pipefd int, inPipe int) (int, error) { + if err := sock.writeLock(); err != nil { + return 0, err + } + defer sock.writeUnlock() + if err := sock.pd.prepareWrite(sock.isFile); err != nil { + return 0, err + } + written := 0 + for inPipe > 0 { + n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock) + // Here, the condition n == 0 && err == nil should never be + // observed, since Splice controls the write side of the pipe. + if n > 0 { + inPipe -= n + written += n + continue + } + if err != syscall.EAGAIN { + return written, err + } + if err := sock.pd.waitWrite(sock.isFile); err != nil { + return written, err + } + } + return written, nil +} + +// splice wraps the splice system call. Since the current implementation +// only uses splice on sockets and pipes, the offset arguments are unused. +// splice returns int instead of int64, because callers never ask it to +// move more data in a single call than can fit in an int32. +func splice(out int, in int, max int, flags int) (int, error) { + n, err := syscall.Splice(in, nil, out, nil, max, flags) + return int(n), err +} + +var disableSplice unsafe.Pointer + +// newTempPipe sets up a temporary pipe for a splice operation. +func newTempPipe() (prfd, pwfd int, sc string, err error) { + p := (*bool)(atomic.LoadPointer(&disableSplice)) + if p != nil && *p { + return -1, -1, "splice", syscall.EINVAL + } + + var fds [2]int + // pipe2 was added in 2.6.27 and our minimum requirement is 2.6.23, so it + // might not be implemented. Falling back to pipe is possible, but prior to + // 2.6.29 splice returns -EAGAIN instead of 0 when the connection is + // closed. + const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK + if err := syscall.Pipe2(fds[:], flags); err != nil { + return -1, -1, "pipe2", err + } + + if p == nil { + p = new(bool) + defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p)) + + // F_GETPIPE_SZ was added in 2.6.35, which does not have the -EAGAIN bug. + if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 { + *p = true + destroyTempPipe(fds[0], fds[1]) + return -1, -1, "fcntl", errno + } + } + + return fds[0], fds[1], "", nil +} + +// destroyTempPipe destroys a temporary pipe. +func destroyTempPipe(prfd, pwfd int) error { + err := CloseFunc(prfd) + err1 := CloseFunc(pwfd) + if err == nil { + return err1 + } + return err +} |