diff options
Diffstat (limited to 'src/internal/poll')
55 files changed, 5345 insertions, 0 deletions
diff --git a/src/internal/poll/copy_file_range_linux.go b/src/internal/poll/copy_file_range_linux.go new file mode 100644 index 0000000..5b9e5d4 --- /dev/null +++ b/src/internal/poll/copy_file_range_linux.go @@ -0,0 +1,164 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import ( + "internal/syscall/unix" + "sync/atomic" + "syscall" +) + +var copyFileRangeSupported int32 = -1 // accessed atomically + +const maxCopyFileRangeRound = 1 << 30 + +func kernelVersion() (major int, minor int) { + var uname syscall.Utsname + if err := syscall.Uname(&uname); err != nil { + return + } + + rl := uname.Release + var values [2]int + vi := 0 + value := 0 + for _, c := range rl { + if '0' <= c && c <= '9' { + value = (value * 10) + int(c-'0') + } else { + // Note that we're assuming N.N.N here. If we see anything else we are likely to + // mis-parse it. + values[vi] = value + vi++ + if vi >= len(values) { + break + } + value = 0 + } + } + switch vi { + case 0: + return 0, 0 + case 1: + return values[0], 0 + case 2: + return values[0], values[1] + } + return +} + +// CopyFileRange copies at most remain bytes of data from src to dst, using +// the copy_file_range system call. dst and src must refer to regular files. +func CopyFileRange(dst, src *FD, remain int64) (written int64, handled bool, err error) { + if supported := atomic.LoadInt32(©FileRangeSupported); supported == 0 { + return 0, false, nil + } else if supported == -1 { + major, minor := kernelVersion() + if major > 5 || (major == 5 && minor >= 3) { + atomic.StoreInt32(©FileRangeSupported, 1) + } else { + // copy_file_range(2) is broken in various ways on kernels older than 5.3, + // see issue #42400 and + // https://man7.org/linux/man-pages/man2/copy_file_range.2.html#VERSIONS + atomic.StoreInt32(©FileRangeSupported, 0) + return 0, false, nil + } + } + for remain > 0 { + max := remain + if max > maxCopyFileRangeRound { + max = maxCopyFileRangeRound + } + n, err := copyFileRange(dst, src, int(max)) + switch err { + case syscall.ENOSYS: + // copy_file_range(2) was introduced in Linux 4.5. + // Go supports Linux >= 2.6.33, so the system call + // may not be present. + // + // If we see ENOSYS, we have certainly not transferred + // any data, so we can tell the caller that we + // couldn't handle the transfer and let them fall + // back to more generic code. + // + // Seeing ENOSYS also means that we will not try to + // use copy_file_range(2) again. + atomic.StoreInt32(©FileRangeSupported, 0) + return 0, false, nil + case syscall.EXDEV, syscall.EINVAL, syscall.EIO, syscall.EOPNOTSUPP, syscall.EPERM: + // Prior to Linux 5.3, it was not possible to + // copy_file_range across file systems. Similarly to + // the ENOSYS case above, if we see EXDEV, we have + // not transferred any data, and we can let the caller + // fall back to generic code. + // + // As for EINVAL, that is what we see if, for example, + // dst or src refer to a pipe rather than a regular + // file. This is another case where no data has been + // transferred, so we consider it unhandled. + // + // If src and dst are on CIFS, we can see EIO. + // See issue #42334. + // + // If the file is on NFS, we can see EOPNOTSUPP. + // See issue #40731. + // + // If the process is running inside a Docker container, + // we might see EPERM instead of ENOSYS. See issue + // #40893. Since EPERM might also be a legitimate error, + // don't mark copy_file_range(2) as unsupported. + return 0, false, nil + case nil: + if n == 0 { + // If we did not read any bytes at all, + // then this file may be in a file system + // where copy_file_range silently fails. + // https://lore.kernel.org/linux-fsdevel/20210126233840.GG4626@dread.disaster.area/T/#m05753578c7f7882f6e9ffe01f981bc223edef2b0 + if written == 0 { + return 0, false, nil + } + // Otherwise src is at EOF, which means + // we are done. + return written, true, nil + } + remain -= n + written += n + default: + return written, true, err + } + } + return written, true, nil +} + +// copyFileRange performs one round of copy_file_range(2). +func copyFileRange(dst, src *FD, max int) (written int64, err error) { + // The signature of copy_file_range(2) is: + // + // ssize_t copy_file_range(int fd_in, loff_t *off_in, + // int fd_out, loff_t *off_out, + // size_t len, unsigned int flags); + // + // Note that in the call to unix.CopyFileRange below, we use nil + // values for off_in and off_out. For the system call, this means + // "use and update the file offsets". That is why we must acquire + // locks for both file descriptors (and why this whole machinery is + // in the internal/poll package to begin with). + if err := dst.writeLock(); err != nil { + return 0, err + } + defer dst.writeUnlock() + if err := src.readLock(); err != nil { + return 0, err + } + defer src.readUnlock() + var n int + for { + n, err = unix.CopyFileRange(src.Sysfd, nil, dst.Sysfd, nil, max, 0) + if err != syscall.EINTR { + break + } + } + return int64(n), err +} diff --git a/src/internal/poll/errno_unix.go b/src/internal/poll/errno_unix.go new file mode 100644 index 0000000..8eed93a --- /dev/null +++ b/src/internal/poll/errno_unix.go @@ -0,0 +1,33 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build unix + +package poll + +import "syscall" + +// Do the interface allocations only once for common +// Errno values. +var ( + errEAGAIN error = syscall.EAGAIN + errEINVAL error = syscall.EINVAL + errENOENT error = syscall.ENOENT +) + +// errnoErr returns common boxed Errno values, to prevent +// allocations at runtime. +func errnoErr(e syscall.Errno) error { + switch e { + case 0: + return nil + case syscall.EAGAIN: + return errEAGAIN + case syscall.EINVAL: + return errEINVAL + case syscall.ENOENT: + return errENOENT + } + return e +} diff --git a/src/internal/poll/errno_windows.go b/src/internal/poll/errno_windows.go new file mode 100644 index 0000000..3679aa8 --- /dev/null +++ b/src/internal/poll/errno_windows.go @@ -0,0 +1,31 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build windows + +package poll + +import "syscall" + +// Do the interface allocations only once for common +// Errno values. + +var ( + errERROR_IO_PENDING error = syscall.Errno(syscall.ERROR_IO_PENDING) +) + +// ErrnoErr returns common boxed Errno values, to prevent +// allocations at runtime. +func errnoErr(e syscall.Errno) error { + switch e { + case 0: + return nil + case syscall.ERROR_IO_PENDING: + return errERROR_IO_PENDING + } + // TODO: add more here, after collecting data on the common + // error values see on Windows. (perhaps when running + // all.bat?) + return e +} diff --git a/src/internal/poll/error_linux_test.go b/src/internal/poll/error_linux_test.go new file mode 100644 index 0000000..059fb8e --- /dev/null +++ b/src/internal/poll/error_linux_test.go @@ -0,0 +1,31 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll_test + +import ( + "errors" + "internal/poll" + "os" + "syscall" +) + +func badStateFile() (*os.File, error) { + if os.Getuid() != 0 { + return nil, errors.New("must be root") + } + // Using OpenFile for a device file is an easy way to make a + // file attached to the runtime-integrated network poller and + // configured in halfway. + return os.OpenFile("/dev/net/tun", os.O_RDWR, 0) +} + +func isBadStateFileError(err error) (string, bool) { + switch err { + case poll.ErrNotPollable, syscall.EBADFD: + return "", true + default: + return "not pollable or file in bad state error", false + } +} diff --git a/src/internal/poll/error_stub_test.go b/src/internal/poll/error_stub_test.go new file mode 100644 index 0000000..48e0952 --- /dev/null +++ b/src/internal/poll/error_stub_test.go @@ -0,0 +1,21 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !linux + +package poll_test + +import ( + "errors" + "os" + "runtime" +) + +func badStateFile() (*os.File, error) { + return nil, errors.New("not supported on " + runtime.GOOS) +} + +func isBadStateFileError(err error) (string, bool) { + return "", false +} diff --git a/src/internal/poll/error_test.go b/src/internal/poll/error_test.go new file mode 100644 index 0000000..abc8b16 --- /dev/null +++ b/src/internal/poll/error_test.go @@ -0,0 +1,51 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll_test + +import ( + "fmt" + "io/fs" + "net" + "os" + "testing" + "time" +) + +func TestReadError(t *testing.T) { + t.Run("ErrNotPollable", func(t *testing.T) { + f, err := badStateFile() + if err != nil { + t.Skip(err) + } + defer f.Close() + + // Give scheduler a chance to have two separated + // goroutines: an event poller and an event waiter. + time.Sleep(100 * time.Millisecond) + + var b [1]byte + _, err = f.Read(b[:]) + if perr := parseReadError(err, isBadStateFileError); perr != nil { + t.Fatal(perr) + } + }) +} + +func parseReadError(nestedErr error, verify func(error) (string, bool)) error { + err := nestedErr + if nerr, ok := err.(*net.OpError); ok { + err = nerr.Err + } + if nerr, ok := err.(*fs.PathError); ok { + err = nerr.Err + } + if nerr, ok := err.(*os.SyscallError); ok { + err = nerr.Err + } + if s, ok := verify(err); !ok { + return fmt.Errorf("got %v; want %s", nestedErr, s) + } + return nil +} diff --git a/src/internal/poll/export_linux_test.go b/src/internal/poll/export_linux_test.go new file mode 100644 index 0000000..7fba793 --- /dev/null +++ b/src/internal/poll/export_linux_test.go @@ -0,0 +1,22 @@ +// Copyright 2021 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Export guts for testing on linux. +// Since testing imports os and os imports internal/poll, +// the internal/poll tests can not be in package poll. + +package poll + +var ( + GetPipe = getPipe + PutPipe = putPipe + NewPipe = newPipe + DestroyPipe = destroyPipe +) + +func GetPipeFds(p *SplicePipe) (int, int) { + return p.rfd, p.wfd +} + +type SplicePipe = splicePipe diff --git a/src/internal/poll/export_posix_test.go b/src/internal/poll/export_posix_test.go new file mode 100644 index 0000000..3415ab3 --- /dev/null +++ b/src/internal/poll/export_posix_test.go @@ -0,0 +1,15 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build unix || windows + +// Export guts for testing on posix. +// Since testing imports os and os imports internal/poll, +// the internal/poll tests can not be in package poll. + +package poll + +func (fd *FD) EOFError(n int, err error) error { + return fd.eofError(n, err) +} diff --git a/src/internal/poll/export_test.go b/src/internal/poll/export_test.go new file mode 100644 index 0000000..02664d9 --- /dev/null +++ b/src/internal/poll/export_test.go @@ -0,0 +1,35 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Export guts for testing. +// Since testing imports os and os imports internal/poll, +// the internal/poll tests can not be in package poll. + +package poll + +var Consume = consume + +type FDMutex struct { + fdMutex +} + +func (mu *FDMutex) Incref() bool { + return mu.incref() +} + +func (mu *FDMutex) IncrefAndClose() bool { + return mu.increfAndClose() +} + +func (mu *FDMutex) Decref() bool { + return mu.decref() +} + +func (mu *FDMutex) RWLock(read bool) bool { + return mu.rwlock(read) +} + +func (mu *FDMutex) RWUnlock(read bool) bool { + return mu.rwunlock(read) +} diff --git a/src/internal/poll/export_windows_test.go b/src/internal/poll/export_windows_test.go new file mode 100644 index 0000000..88ed71a --- /dev/null +++ b/src/internal/poll/export_windows_test.go @@ -0,0 +1,17 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Export guts for testing on windows. +// Since testing imports os and os imports internal/poll, +// the internal/poll tests can not be in package poll. + +package poll + +var ( + LogInitFD = &logInitFD +) + +func (fd *FD) IsPartOfNetpoll() bool { + return fd.pd.runtimeCtx != 0 +} diff --git a/src/internal/poll/fcntl_js.go b/src/internal/poll/fcntl_js.go new file mode 100644 index 0000000..0f42ef6 --- /dev/null +++ b/src/internal/poll/fcntl_js.go @@ -0,0 +1,14 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build js && wasm + +package poll + +import "syscall" + +// fcntl not supported on js/wasm +func fcntl(fd int, cmd int, arg int) (int, error) { + return 0, syscall.ENOSYS +} diff --git a/src/internal/poll/fcntl_libc.go b/src/internal/poll/fcntl_libc.go new file mode 100644 index 0000000..13614dc --- /dev/null +++ b/src/internal/poll/fcntl_libc.go @@ -0,0 +1,14 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build aix || darwin || solaris + +package poll + +import _ "unsafe" // for go:linkname + +// Implemented in the syscall package. +// +//go:linkname fcntl syscall.fcntl +func fcntl(fd int, cmd int, arg int) (int, error) diff --git a/src/internal/poll/fcntl_syscall.go b/src/internal/poll/fcntl_syscall.go new file mode 100644 index 0000000..accff5e --- /dev/null +++ b/src/internal/poll/fcntl_syscall.go @@ -0,0 +1,20 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build dragonfly || freebsd || linux || netbsd || openbsd + +package poll + +import ( + "internal/syscall/unix" + "syscall" +) + +func fcntl(fd int, cmd int, arg int) (int, error) { + r, _, e := syscall.Syscall(unix.FcntlSyscall, uintptr(fd), uintptr(cmd), uintptr(arg)) + if e != 0 { + return int(r), syscall.Errno(e) + } + return int(r), nil +} diff --git a/src/internal/poll/fd.go b/src/internal/poll/fd.go new file mode 100644 index 0000000..ef61d0c --- /dev/null +++ b/src/internal/poll/fd.go @@ -0,0 +1,83 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package poll supports non-blocking I/O on file descriptors with polling. +// This supports I/O operations that block only a goroutine, not a thread. +// This is used by the net and os packages. +// It uses a poller built into the runtime, with support from the +// runtime scheduler. +package poll + +import ( + "errors" +) + +// errNetClosing is the type of the variable ErrNetClosing. +// This is used to implement the net.Error interface. +type errNetClosing struct{} + +// Error returns the error message for ErrNetClosing. +// Keep this string consistent because of issue #4373: +// since historically programs have not been able to detect +// this error, they look for the string. +func (e errNetClosing) Error() string { return "use of closed network connection" } + +func (e errNetClosing) Timeout() bool { return false } +func (e errNetClosing) Temporary() bool { return false } + +// ErrNetClosing is returned when a network descriptor is used after +// it has been closed. +var ErrNetClosing = errNetClosing{} + +// ErrFileClosing is returned when a file descriptor is used after it +// has been closed. +var ErrFileClosing = errors.New("use of closed file") + +// ErrNoDeadline is returned when a request is made to set a deadline +// on a file type that does not use the poller. +var ErrNoDeadline = errors.New("file type does not support deadline") + +// Return the appropriate closing error based on isFile. +func errClosing(isFile bool) error { + if isFile { + return ErrFileClosing + } + return ErrNetClosing +} + +// ErrDeadlineExceeded is returned for an expired deadline. +// This is exported by the os package as os.ErrDeadlineExceeded. +var ErrDeadlineExceeded error = &DeadlineExceededError{} + +// DeadlineExceededError is returned for an expired deadline. +type DeadlineExceededError struct{} + +// Implement the net.Error interface. +// The string is "i/o timeout" because that is what was returned +// by earlier Go versions. Changing it may break programs that +// match on error strings. +func (e *DeadlineExceededError) Error() string { return "i/o timeout" } +func (e *DeadlineExceededError) Timeout() bool { return true } +func (e *DeadlineExceededError) Temporary() bool { return true } + +// ErrNotPollable is returned when the file or socket is not suitable +// for event notification. +var ErrNotPollable = errors.New("not pollable") + +// consume removes data from a slice of byte slices, for writev. +func consume(v *[][]byte, n int64) { + for len(*v) > 0 { + ln0 := int64(len((*v)[0])) + if ln0 > n { + (*v)[0] = (*v)[0][n:] + return + } + n -= ln0 + (*v)[0] = nil + *v = (*v)[1:] + } +} + +// TestHookDidWritev is a hook for testing writev. +var TestHookDidWritev = func(wrote int) {} diff --git a/src/internal/poll/fd_fsync_darwin.go b/src/internal/poll/fd_fsync_darwin.go new file mode 100644 index 0000000..48e7596 --- /dev/null +++ b/src/internal/poll/fd_fsync_darwin.go @@ -0,0 +1,21 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import "syscall" + +// Fsync invokes SYS_FCNTL with SYS_FULLFSYNC because +// on OS X, SYS_FSYNC doesn't fully flush contents to disk. +// See Issue #26650 as well as the man page for fsync on OS X. +func (fd *FD) Fsync() error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return ignoringEINTR(func() error { + _, err := fcntl(fd.Sysfd, syscall.F_FULLFSYNC, 0) + return err + }) +} diff --git a/src/internal/poll/fd_fsync_posix.go b/src/internal/poll/fd_fsync_posix.go new file mode 100644 index 0000000..6f17019 --- /dev/null +++ b/src/internal/poll/fd_fsync_posix.go @@ -0,0 +1,20 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build aix || dragonfly || freebsd || (js && wasm) || linux || netbsd || openbsd || solaris + +package poll + +import "syscall" + +// Fsync wraps syscall.Fsync. +func (fd *FD) Fsync() error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return ignoringEINTR(func() error { + return syscall.Fsync(fd.Sysfd) + }) +} diff --git a/src/internal/poll/fd_fsync_windows.go b/src/internal/poll/fd_fsync_windows.go new file mode 100644 index 0000000..fb12119 --- /dev/null +++ b/src/internal/poll/fd_fsync_windows.go @@ -0,0 +1,16 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import "syscall" + +// Fsync wraps syscall.Fsync. +func (fd *FD) Fsync() error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.Fsync(fd.Sysfd) +} diff --git a/src/internal/poll/fd_io_plan9.go b/src/internal/poll/fd_io_plan9.go new file mode 100644 index 0000000..3205ac8 --- /dev/null +++ b/src/internal/poll/fd_io_plan9.go @@ -0,0 +1,92 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import ( + "internal/itoa" + "runtime" + "sync" + "syscall" +) + +// asyncIO implements asynchronous cancelable I/O. +// An asyncIO represents a single asynchronous Read or Write +// operation. The result is returned on the result channel. +// The undergoing I/O system call can either complete or be +// interrupted by a note. +type asyncIO struct { + res chan result + + // mu guards the pid field. + mu sync.Mutex + + // pid holds the process id of + // the process running the IO operation. + pid int +} + +// result is the return value of a Read or Write operation. +type result struct { + n int + err error +} + +// newAsyncIO returns a new asyncIO that performs an I/O +// operation by calling fn, which must do one and only one +// interruptible system call. +func newAsyncIO(fn func([]byte) (int, error), b []byte) *asyncIO { + aio := &asyncIO{ + res: make(chan result, 0), + } + aio.mu.Lock() + go func() { + // Lock the current goroutine to its process + // and store the pid in io so that Cancel can + // interrupt it. We ignore the "hangup" signal, + // so the signal does not take down the entire + // Go runtime. + runtime.LockOSThread() + runtime_ignoreHangup() + aio.pid = syscall.Getpid() + aio.mu.Unlock() + + n, err := fn(b) + + aio.mu.Lock() + aio.pid = -1 + runtime_unignoreHangup() + aio.mu.Unlock() + + aio.res <- result{n, err} + }() + return aio +} + +// Cancel interrupts the I/O operation, causing +// the Wait function to return. +func (aio *asyncIO) Cancel() { + aio.mu.Lock() + defer aio.mu.Unlock() + if aio.pid == -1 { + return + } + f, e := syscall.Open("/proc/"+itoa.Itoa(aio.pid)+"/note", syscall.O_WRONLY) + if e != nil { + return + } + syscall.Write(f, []byte("hangup")) + syscall.Close(f) +} + +// Wait for the I/O operation to complete. +func (aio *asyncIO) Wait() (int, error) { + res := <-aio.res + return res.n, res.err +} + +// The following functions, provided by the runtime, are used to +// ignore and unignore the "hangup" signal received by the process. +func runtime_ignoreHangup() +func runtime_unignoreHangup() diff --git a/src/internal/poll/fd_mutex.go b/src/internal/poll/fd_mutex.go new file mode 100644 index 0000000..0a8ee6f --- /dev/null +++ b/src/internal/poll/fd_mutex.go @@ -0,0 +1,252 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import "sync/atomic" + +// fdMutex is a specialized synchronization primitive that manages +// lifetime of an fd and serializes access to Read, Write and Close +// methods on FD. +type fdMutex struct { + state uint64 + rsema uint32 + wsema uint32 +} + +// fdMutex.state is organized as follows: +// 1 bit - whether FD is closed, if set all subsequent lock operations will fail. +// 1 bit - lock for read operations. +// 1 bit - lock for write operations. +// 20 bits - total number of references (read+write+misc). +// 20 bits - number of outstanding read waiters. +// 20 bits - number of outstanding write waiters. +const ( + mutexClosed = 1 << 0 + mutexRLock = 1 << 1 + mutexWLock = 1 << 2 + mutexRef = 1 << 3 + mutexRefMask = (1<<20 - 1) << 3 + mutexRWait = 1 << 23 + mutexRMask = (1<<20 - 1) << 23 + mutexWWait = 1 << 43 + mutexWMask = (1<<20 - 1) << 43 +) + +const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)" + +// Read operations must do rwlock(true)/rwunlock(true). +// +// Write operations must do rwlock(false)/rwunlock(false). +// +// Misc operations must do incref/decref. +// Misc operations include functions like setsockopt and setDeadline. +// They need to use incref/decref to ensure that they operate on the +// correct fd in presence of a concurrent close call (otherwise fd can +// be closed under their feet). +// +// Close operations must do increfAndClose/decref. + +// incref adds a reference to mu. +// It reports whether mu is available for reading or writing. +func (mu *fdMutex) incref() bool { + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexClosed != 0 { + return false + } + new := old + mutexRef + if new&mutexRefMask == 0 { + panic(overflowMsg) + } + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + return true + } + } +} + +// increfAndClose sets the state of mu to closed. +// It returns false if the file was already closed. +func (mu *fdMutex) increfAndClose() bool { + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexClosed != 0 { + return false + } + // Mark as closed and acquire a reference. + new := (old | mutexClosed) + mutexRef + if new&mutexRefMask == 0 { + panic(overflowMsg) + } + // Remove all read and write waiters. + new &^= mutexRMask | mutexWMask + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + // Wake all read and write waiters, + // they will observe closed flag after wakeup. + for old&mutexRMask != 0 { + old -= mutexRWait + runtime_Semrelease(&mu.rsema) + } + for old&mutexWMask != 0 { + old -= mutexWWait + runtime_Semrelease(&mu.wsema) + } + return true + } + } +} + +// decref removes a reference from mu. +// It reports whether there is no remaining reference. +func (mu *fdMutex) decref() bool { + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexRefMask == 0 { + panic("inconsistent poll.fdMutex") + } + new := old - mutexRef + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + return new&(mutexClosed|mutexRefMask) == mutexClosed + } + } +} + +// lock adds a reference to mu and locks mu. +// It reports whether mu is available for reading or writing. +func (mu *fdMutex) rwlock(read bool) bool { + var mutexBit, mutexWait, mutexMask uint64 + var mutexSema *uint32 + if read { + mutexBit = mutexRLock + mutexWait = mutexRWait + mutexMask = mutexRMask + mutexSema = &mu.rsema + } else { + mutexBit = mutexWLock + mutexWait = mutexWWait + mutexMask = mutexWMask + mutexSema = &mu.wsema + } + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexClosed != 0 { + return false + } + var new uint64 + if old&mutexBit == 0 { + // Lock is free, acquire it. + new = (old | mutexBit) + mutexRef + if new&mutexRefMask == 0 { + panic(overflowMsg) + } + } else { + // Wait for lock. + new = old + mutexWait + if new&mutexMask == 0 { + panic(overflowMsg) + } + } + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + if old&mutexBit == 0 { + return true + } + runtime_Semacquire(mutexSema) + // The signaller has subtracted mutexWait. + } + } +} + +// unlock removes a reference from mu and unlocks mu. +// It reports whether there is no remaining reference. +func (mu *fdMutex) rwunlock(read bool) bool { + var mutexBit, mutexWait, mutexMask uint64 + var mutexSema *uint32 + if read { + mutexBit = mutexRLock + mutexWait = mutexRWait + mutexMask = mutexRMask + mutexSema = &mu.rsema + } else { + mutexBit = mutexWLock + mutexWait = mutexWWait + mutexMask = mutexWMask + mutexSema = &mu.wsema + } + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexBit == 0 || old&mutexRefMask == 0 { + panic("inconsistent poll.fdMutex") + } + // Drop lock, drop reference and wake read waiter if present. + new := (old &^ mutexBit) - mutexRef + if old&mutexMask != 0 { + new -= mutexWait + } + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + if old&mutexMask != 0 { + runtime_Semrelease(mutexSema) + } + return new&(mutexClosed|mutexRefMask) == mutexClosed + } + } +} + +// Implemented in runtime package. +func runtime_Semacquire(sema *uint32) +func runtime_Semrelease(sema *uint32) + +// incref adds a reference to fd. +// It returns an error when fd cannot be used. +func (fd *FD) incref() error { + if !fd.fdmu.incref() { + return errClosing(fd.isFile) + } + return nil +} + +// decref removes a reference from fd. +// It also closes fd when the state of fd is set to closed and there +// is no remaining reference. +func (fd *FD) decref() error { + if fd.fdmu.decref() { + return fd.destroy() + } + return nil +} + +// readLock adds a reference to fd and locks fd for reading. +// It returns an error when fd cannot be used for reading. +func (fd *FD) readLock() error { + if !fd.fdmu.rwlock(true) { + return errClosing(fd.isFile) + } + return nil +} + +// readUnlock removes a reference from fd and unlocks fd for reading. +// It also closes fd when the state of fd is set to closed and there +// is no remaining reference. +func (fd *FD) readUnlock() { + if fd.fdmu.rwunlock(true) { + fd.destroy() + } +} + +// writeLock adds a reference to fd and locks fd for writing. +// It returns an error when fd cannot be used for writing. +func (fd *FD) writeLock() error { + if !fd.fdmu.rwlock(false) { + return errClosing(fd.isFile) + } + return nil +} + +// writeUnlock removes a reference from fd and unlocks fd for writing. +// It also closes fd when the state of fd is set to closed and there +// is no remaining reference. +func (fd *FD) writeUnlock() { + if fd.fdmu.rwunlock(false) { + fd.destroy() + } +} diff --git a/src/internal/poll/fd_mutex_test.go b/src/internal/poll/fd_mutex_test.go new file mode 100644 index 0000000..3029b9a --- /dev/null +++ b/src/internal/poll/fd_mutex_test.go @@ -0,0 +1,222 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll_test + +import ( + . "internal/poll" + "math/rand" + "runtime" + "strings" + "testing" + "time" +) + +func TestMutexLock(t *testing.T) { + var mu FDMutex + + if !mu.Incref() { + t.Fatal("broken") + } + if mu.Decref() { + t.Fatal("broken") + } + + if !mu.RWLock(true) { + t.Fatal("broken") + } + if mu.RWUnlock(true) { + t.Fatal("broken") + } + + if !mu.RWLock(false) { + t.Fatal("broken") + } + if mu.RWUnlock(false) { + t.Fatal("broken") + } +} + +func TestMutexClose(t *testing.T) { + var mu FDMutex + if !mu.IncrefAndClose() { + t.Fatal("broken") + } + + if mu.Incref() { + t.Fatal("broken") + } + if mu.RWLock(true) { + t.Fatal("broken") + } + if mu.RWLock(false) { + t.Fatal("broken") + } + if mu.IncrefAndClose() { + t.Fatal("broken") + } +} + +func TestMutexCloseUnblock(t *testing.T) { + c := make(chan bool, 4) + var mu FDMutex + mu.RWLock(true) + for i := 0; i < 4; i++ { + go func() { + if mu.RWLock(true) { + t.Error("broken") + return + } + c <- true + }() + } + // Concurrent goroutines must not be able to read lock the mutex. + time.Sleep(time.Millisecond) + select { + case <-c: + t.Fatal("broken") + default: + } + mu.IncrefAndClose() // Must unblock the readers. + for i := 0; i < 4; i++ { + select { + case <-c: + case <-time.After(10 * time.Second): + t.Fatal("broken") + } + } + if mu.Decref() { + t.Fatal("broken") + } + if !mu.RWUnlock(true) { + t.Fatal("broken") + } +} + +func TestMutexPanic(t *testing.T) { + ensurePanics := func(f func()) { + defer func() { + if recover() == nil { + t.Fatal("does not panic") + } + }() + f() + } + + var mu FDMutex + ensurePanics(func() { mu.Decref() }) + ensurePanics(func() { mu.RWUnlock(true) }) + ensurePanics(func() { mu.RWUnlock(false) }) + + ensurePanics(func() { mu.Incref(); mu.Decref(); mu.Decref() }) + ensurePanics(func() { mu.RWLock(true); mu.RWUnlock(true); mu.RWUnlock(true) }) + ensurePanics(func() { mu.RWLock(false); mu.RWUnlock(false); mu.RWUnlock(false) }) + + // ensure that it's still not broken + mu.Incref() + mu.Decref() + mu.RWLock(true) + mu.RWUnlock(true) + mu.RWLock(false) + mu.RWUnlock(false) +} + +func TestMutexOverflowPanic(t *testing.T) { + defer func() { + r := recover() + if r == nil { + t.Fatal("did not panic") + } + msg, ok := r.(string) + if !ok { + t.Fatalf("unexpected panic type %T", r) + } + if !strings.Contains(msg, "too many") || strings.Contains(msg, "inconsistent") { + t.Fatalf("wrong panic message %q", msg) + } + }() + + var mu1 FDMutex + for i := 0; i < 1<<21; i++ { + mu1.Incref() + } +} + +func TestMutexStress(t *testing.T) { + P := 8 + N := int(1e6) + if testing.Short() { + P = 4 + N = 1e4 + } + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P)) + done := make(chan bool, P) + var mu FDMutex + var readState [2]uint64 + var writeState [2]uint64 + for p := 0; p < P; p++ { + go func() { + defer func() { + done <- !t.Failed() + }() + r := rand.New(rand.NewSource(rand.Int63())) + for i := 0; i < N; i++ { + switch r.Intn(3) { + case 0: + if !mu.Incref() { + t.Error("broken") + return + } + if mu.Decref() { + t.Error("broken") + return + } + case 1: + if !mu.RWLock(true) { + t.Error("broken") + return + } + // Ensure that it provides mutual exclusion for readers. + if readState[0] != readState[1] { + t.Error("broken") + return + } + readState[0]++ + readState[1]++ + if mu.RWUnlock(true) { + t.Error("broken") + return + } + case 2: + if !mu.RWLock(false) { + t.Error("broken") + return + } + // Ensure that it provides mutual exclusion for writers. + if writeState[0] != writeState[1] { + t.Error("broken") + return + } + writeState[0]++ + writeState[1]++ + if mu.RWUnlock(false) { + t.Error("broken") + return + } + } + } + }() + } + for p := 0; p < P; p++ { + if !<-done { + t.FailNow() + } + } + if !mu.IncrefAndClose() { + t.Fatal("broken") + } + if !mu.Decref() { + t.Fatal("broken") + } +} diff --git a/src/internal/poll/fd_opendir_darwin.go b/src/internal/poll/fd_opendir_darwin.go new file mode 100644 index 0000000..3ae2dc8 --- /dev/null +++ b/src/internal/poll/fd_opendir_darwin.go @@ -0,0 +1,39 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import ( + "syscall" + _ "unsafe" // for go:linkname +) + +// OpenDir returns a pointer to a DIR structure suitable for +// ReadDir. In case of an error, the name of the failed +// syscall is returned along with a syscall.Errno. +func (fd *FD) OpenDir() (uintptr, string, error) { + // fdopendir(3) takes control of the file descriptor, + // so use a dup. + fd2, call, err := fd.Dup() + if err != nil { + return 0, call, err + } + var dir uintptr + for { + dir, err = fdopendir(fd2) + if err != syscall.EINTR { + break + } + } + if err != nil { + syscall.Close(fd2) + return 0, "fdopendir", err + } + return dir, "", nil +} + +// Implemented in syscall/syscall_darwin.go. +// +//go:linkname fdopendir syscall.fdopendir +func fdopendir(fd int) (dir uintptr, err error) diff --git a/src/internal/poll/fd_plan9.go b/src/internal/poll/fd_plan9.go new file mode 100644 index 0000000..0b5b937 --- /dev/null +++ b/src/internal/poll/fd_plan9.go @@ -0,0 +1,233 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import ( + "errors" + "io" + "sync" + "sync/atomic" + "time" +) + +type atomicBool int32 + +func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 } +func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) } +func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) } + +type FD struct { + // Lock sysfd and serialize access to Read and Write methods. + fdmu fdMutex + + Destroy func() + + // deadlines + rmu sync.Mutex + wmu sync.Mutex + raio *asyncIO + waio *asyncIO + rtimer *time.Timer + wtimer *time.Timer + rtimedout atomicBool // set true when read deadline has been reached + wtimedout atomicBool // set true when write deadline has been reached + + // Whether this is a normal file. + // On Plan 9 we do not use this package for ordinary files, + // so this is always false, but the field is present because + // shared code in fd_mutex.go checks it. + isFile bool +} + +// We need this to close out a file descriptor when it is unlocked, +// but the real implementation has to live in the net package because +// it uses os.File's. +func (fd *FD) destroy() error { + if fd.Destroy != nil { + fd.Destroy() + } + return nil +} + +// Close handles the locking for closing an FD. The real operation +// is in the net package. +func (fd *FD) Close() error { + if !fd.fdmu.increfAndClose() { + return errClosing(fd.isFile) + } + return nil +} + +// Read implements io.Reader. +func (fd *FD) Read(fn func([]byte) (int, error), b []byte) (int, error) { + if err := fd.readLock(); err != nil { + return 0, err + } + defer fd.readUnlock() + if len(b) == 0 { + return 0, nil + } + fd.rmu.Lock() + if fd.rtimedout.isSet() { + fd.rmu.Unlock() + return 0, ErrDeadlineExceeded + } + fd.raio = newAsyncIO(fn, b) + fd.rmu.Unlock() + n, err := fd.raio.Wait() + fd.raio = nil + if isHangup(err) { + err = io.EOF + } + if isInterrupted(err) { + err = ErrDeadlineExceeded + } + return n, err +} + +// Write implements io.Writer. +func (fd *FD) Write(fn func([]byte) (int, error), b []byte) (int, error) { + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + fd.wmu.Lock() + if fd.wtimedout.isSet() { + fd.wmu.Unlock() + return 0, ErrDeadlineExceeded + } + fd.waio = newAsyncIO(fn, b) + fd.wmu.Unlock() + n, err := fd.waio.Wait() + fd.waio = nil + if isInterrupted(err) { + err = ErrDeadlineExceeded + } + return n, err +} + +// SetDeadline sets the read and write deadlines associated with fd. +func (fd *FD) SetDeadline(t time.Time) error { + return setDeadlineImpl(fd, t, 'r'+'w') +} + +// SetReadDeadline sets the read deadline associated with fd. +func (fd *FD) SetReadDeadline(t time.Time) error { + return setDeadlineImpl(fd, t, 'r') +} + +// SetWriteDeadline sets the write deadline associated with fd. +func (fd *FD) SetWriteDeadline(t time.Time) error { + return setDeadlineImpl(fd, t, 'w') +} + +func setDeadlineImpl(fd *FD, t time.Time, mode int) error { + d := t.Sub(time.Now()) + if mode == 'r' || mode == 'r'+'w' { + fd.rmu.Lock() + defer fd.rmu.Unlock() + fd.rtimedout.setFalse() + } + if mode == 'w' || mode == 'r'+'w' { + fd.wmu.Lock() + defer fd.wmu.Unlock() + fd.wtimedout.setFalse() + } + if t.IsZero() || d < 0 { + // Stop timer + if mode == 'r' || mode == 'r'+'w' { + if fd.rtimer != nil { + fd.rtimer.Stop() + } + fd.rtimer = nil + } + if mode == 'w' || mode == 'r'+'w' { + if fd.wtimer != nil { + fd.wtimer.Stop() + } + fd.wtimer = nil + } + } else { + // Interrupt I/O operation once timer has expired + if mode == 'r' || mode == 'r'+'w' { + fd.rtimer = time.AfterFunc(d, func() { + fd.rmu.Lock() + fd.rtimedout.setTrue() + if fd.raio != nil { + fd.raio.Cancel() + } + fd.rmu.Unlock() + }) + } + if mode == 'w' || mode == 'r'+'w' { + fd.wtimer = time.AfterFunc(d, func() { + fd.wmu.Lock() + fd.wtimedout.setTrue() + if fd.waio != nil { + fd.waio.Cancel() + } + fd.wmu.Unlock() + }) + } + } + if !t.IsZero() && d < 0 { + // Interrupt current I/O operation + if mode == 'r' || mode == 'r'+'w' { + fd.rtimedout.setTrue() + if fd.raio != nil { + fd.raio.Cancel() + } + } + if mode == 'w' || mode == 'r'+'w' { + fd.wtimedout.setTrue() + if fd.waio != nil { + fd.waio.Cancel() + } + } + } + return nil +} + +// On Plan 9 only, expose the locking for the net code. + +// ReadLock wraps FD.readLock. +func (fd *FD) ReadLock() error { + return fd.readLock() +} + +// ReadUnlock wraps FD.readUnlock. +func (fd *FD) ReadUnlock() { + fd.readUnlock() +} + +func isHangup(err error) bool { + return err != nil && stringsHasSuffix(err.Error(), "Hangup") +} + +func isInterrupted(err error) bool { + return err != nil && stringsHasSuffix(err.Error(), "interrupted") +} + +// IsPollDescriptor reports whether fd is the descriptor being used by the poller. +// This is only used for testing. +func IsPollDescriptor(fd uintptr) bool { + return false +} + +// RawControl invokes the user-defined function f for a non-IO +// operation. +func (fd *FD) RawControl(f func(uintptr)) error { + return errors.New("not implemented") +} + +// RawRead invokes the user-defined function f for a read operation. +func (fd *FD) RawRead(f func(uintptr) bool) error { + return errors.New("not implemented") +} + +// RawWrite invokes the user-defined function f for a write operation. +func (fd *FD) RawWrite(f func(uintptr) bool) error { + return errors.New("not implemented") +} diff --git a/src/internal/poll/fd_poll_js.go b/src/internal/poll/fd_poll_js.go new file mode 100644 index 0000000..84bfcae --- /dev/null +++ b/src/internal/poll/fd_poll_js.go @@ -0,0 +1,99 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build js && wasm + +package poll + +import ( + "syscall" + "time" +) + +type pollDesc struct { + fd *FD + closing bool +} + +func (pd *pollDesc) init(fd *FD) error { pd.fd = fd; return nil } + +func (pd *pollDesc) close() {} + +func (pd *pollDesc) evict() { + pd.closing = true + if pd.fd != nil { + syscall.StopIO(pd.fd.Sysfd) + } +} + +func (pd *pollDesc) prepare(mode int, isFile bool) error { + if pd.closing { + return errClosing(isFile) + } + return nil +} + +func (pd *pollDesc) prepareRead(isFile bool) error { return pd.prepare('r', isFile) } + +func (pd *pollDesc) prepareWrite(isFile bool) error { return pd.prepare('w', isFile) } + +func (pd *pollDesc) wait(mode int, isFile bool) error { + if pd.closing { + return errClosing(isFile) + } + if isFile { // TODO(neelance): wasm: Use callbacks from JS to block until the read/write finished. + return nil + } + return ErrDeadlineExceeded +} + +func (pd *pollDesc) waitRead(isFile bool) error { return pd.wait('r', isFile) } + +func (pd *pollDesc) waitWrite(isFile bool) error { return pd.wait('w', isFile) } + +func (pd *pollDesc) waitCanceled(mode int) {} + +func (pd *pollDesc) pollable() bool { return true } + +// SetDeadline sets the read and write deadlines associated with fd. +func (fd *FD) SetDeadline(t time.Time) error { + return setDeadlineImpl(fd, t, 'r'+'w') +} + +// SetReadDeadline sets the read deadline associated with fd. +func (fd *FD) SetReadDeadline(t time.Time) error { + return setDeadlineImpl(fd, t, 'r') +} + +// SetWriteDeadline sets the write deadline associated with fd. +func (fd *FD) SetWriteDeadline(t time.Time) error { + return setDeadlineImpl(fd, t, 'w') +} + +func setDeadlineImpl(fd *FD, t time.Time, mode int) error { + d := t.UnixNano() + if t.IsZero() { + d = 0 + } + if err := fd.incref(); err != nil { + return err + } + switch mode { + case 'r': + syscall.SetReadDeadline(fd.Sysfd, d) + case 'w': + syscall.SetWriteDeadline(fd.Sysfd, d) + case 'r' + 'w': + syscall.SetReadDeadline(fd.Sysfd, d) + syscall.SetWriteDeadline(fd.Sysfd, d) + } + fd.decref() + return nil +} + +// IsPollDescriptor reports whether fd is the descriptor being used by the poller. +// This is only used for testing. +func IsPollDescriptor(fd uintptr) bool { + return false +} diff --git a/src/internal/poll/fd_poll_runtime.go b/src/internal/poll/fd_poll_runtime.go new file mode 100644 index 0000000..0a2e76d --- /dev/null +++ b/src/internal/poll/fd_poll_runtime.go @@ -0,0 +1,169 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build unix || windows + +package poll + +import ( + "errors" + "sync" + "syscall" + "time" + _ "unsafe" // for go:linkname +) + +// runtimeNano returns the current value of the runtime clock in nanoseconds. +// +//go:linkname runtimeNano runtime.nanotime +func runtimeNano() int64 + +func runtime_pollServerInit() +func runtime_pollOpen(fd uintptr) (uintptr, int) +func runtime_pollClose(ctx uintptr) +func runtime_pollWait(ctx uintptr, mode int) int +func runtime_pollWaitCanceled(ctx uintptr, mode int) +func runtime_pollReset(ctx uintptr, mode int) int +func runtime_pollSetDeadline(ctx uintptr, d int64, mode int) +func runtime_pollUnblock(ctx uintptr) +func runtime_isPollServerDescriptor(fd uintptr) bool + +type pollDesc struct { + runtimeCtx uintptr +} + +var serverInit sync.Once + +func (pd *pollDesc) init(fd *FD) error { + serverInit.Do(runtime_pollServerInit) + ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) + if errno != 0 { + return errnoErr(syscall.Errno(errno)) + } + pd.runtimeCtx = ctx + return nil +} + +func (pd *pollDesc) close() { + if pd.runtimeCtx == 0 { + return + } + runtime_pollClose(pd.runtimeCtx) + pd.runtimeCtx = 0 +} + +// Evict evicts fd from the pending list, unblocking any I/O running on fd. +func (pd *pollDesc) evict() { + if pd.runtimeCtx == 0 { + return + } + runtime_pollUnblock(pd.runtimeCtx) +} + +func (pd *pollDesc) prepare(mode int, isFile bool) error { + if pd.runtimeCtx == 0 { + return nil + } + res := runtime_pollReset(pd.runtimeCtx, mode) + return convertErr(res, isFile) +} + +func (pd *pollDesc) prepareRead(isFile bool) error { + return pd.prepare('r', isFile) +} + +func (pd *pollDesc) prepareWrite(isFile bool) error { + return pd.prepare('w', isFile) +} + +func (pd *pollDesc) wait(mode int, isFile bool) error { + if pd.runtimeCtx == 0 { + return errors.New("waiting for unsupported file type") + } + res := runtime_pollWait(pd.runtimeCtx, mode) + return convertErr(res, isFile) +} + +func (pd *pollDesc) waitRead(isFile bool) error { + return pd.wait('r', isFile) +} + +func (pd *pollDesc) waitWrite(isFile bool) error { + return pd.wait('w', isFile) +} + +func (pd *pollDesc) waitCanceled(mode int) { + if pd.runtimeCtx == 0 { + return + } + runtime_pollWaitCanceled(pd.runtimeCtx, mode) +} + +func (pd *pollDesc) pollable() bool { + return pd.runtimeCtx != 0 +} + +// Error values returned by runtime_pollReset and runtime_pollWait. +// These must match the values in runtime/netpoll.go. +const ( + pollNoError = 0 + pollErrClosing = 1 + pollErrTimeout = 2 + pollErrNotPollable = 3 +) + +func convertErr(res int, isFile bool) error { + switch res { + case pollNoError: + return nil + case pollErrClosing: + return errClosing(isFile) + case pollErrTimeout: + return ErrDeadlineExceeded + case pollErrNotPollable: + return ErrNotPollable + } + println("unreachable: ", res) + panic("unreachable") +} + +// SetDeadline sets the read and write deadlines associated with fd. +func (fd *FD) SetDeadline(t time.Time) error { + return setDeadlineImpl(fd, t, 'r'+'w') +} + +// SetReadDeadline sets the read deadline associated with fd. +func (fd *FD) SetReadDeadline(t time.Time) error { + return setDeadlineImpl(fd, t, 'r') +} + +// SetWriteDeadline sets the write deadline associated with fd. +func (fd *FD) SetWriteDeadline(t time.Time) error { + return setDeadlineImpl(fd, t, 'w') +} + +func setDeadlineImpl(fd *FD, t time.Time, mode int) error { + var d int64 + if !t.IsZero() { + d = int64(time.Until(t)) + if d == 0 { + d = -1 // don't confuse deadline right now with no deadline + } + } + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + if fd.pd.runtimeCtx == 0 { + return ErrNoDeadline + } + runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode) + return nil +} + +// IsPollDescriptor reports whether fd is the descriptor being used by the poller. +// This is only used for testing. +func IsPollDescriptor(fd uintptr) bool { + return runtime_isPollServerDescriptor(fd) +} diff --git a/src/internal/poll/fd_posix.go b/src/internal/poll/fd_posix.go new file mode 100644 index 0000000..778fe1e --- /dev/null +++ b/src/internal/poll/fd_posix.go @@ -0,0 +1,79 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build unix || (js && wasm) || windows + +package poll + +import ( + "io" + "syscall" +) + +// eofError returns io.EOF when fd is available for reading end of +// file. +func (fd *FD) eofError(n int, err error) error { + if n == 0 && err == nil && fd.ZeroReadIsEOF { + return io.EOF + } + return err +} + +// Shutdown wraps syscall.Shutdown. +func (fd *FD) Shutdown(how int) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.Shutdown(fd.Sysfd, how) +} + +// Fchown wraps syscall.Fchown. +func (fd *FD) Fchown(uid, gid int) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return ignoringEINTR(func() error { + return syscall.Fchown(fd.Sysfd, uid, gid) + }) +} + +// Ftruncate wraps syscall.Ftruncate. +func (fd *FD) Ftruncate(size int64) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return ignoringEINTR(func() error { + return syscall.Ftruncate(fd.Sysfd, size) + }) +} + +// RawControl invokes the user-defined function f for a non-IO +// operation. +func (fd *FD) RawControl(f func(uintptr)) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + f(uintptr(fd.Sysfd)) + return nil +} + +// ignoringEINTR makes a function call and repeats it if it returns +// an EINTR error. This appears to be required even though we install all +// signal handlers with SA_RESTART: see #22838, #38033, #38836, #40846. +// Also #20400 and #36644 are issues in which a signal handler is +// installed without setting SA_RESTART. None of these are the common case, +// but there are enough of them that it seems that we can't avoid +// an EINTR loop. +func ignoringEINTR(fn func() error) error { + for { + err := fn() + if err != syscall.EINTR { + return err + } + } +} diff --git a/src/internal/poll/fd_posix_test.go b/src/internal/poll/fd_posix_test.go new file mode 100644 index 0000000..b97e465 --- /dev/null +++ b/src/internal/poll/fd_posix_test.go @@ -0,0 +1,43 @@ +// Copyright 2012 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build unix || windows + +package poll_test + +import ( + . "internal/poll" + "io" + "testing" +) + +var eofErrorTests = []struct { + n int + err error + fd *FD + expected error +}{ + {100, nil, &FD{ZeroReadIsEOF: true}, nil}, + {100, io.EOF, &FD{ZeroReadIsEOF: true}, io.EOF}, + {100, ErrNetClosing, &FD{ZeroReadIsEOF: true}, ErrNetClosing}, + {0, nil, &FD{ZeroReadIsEOF: true}, io.EOF}, + {0, io.EOF, &FD{ZeroReadIsEOF: true}, io.EOF}, + {0, ErrNetClosing, &FD{ZeroReadIsEOF: true}, ErrNetClosing}, + + {100, nil, &FD{ZeroReadIsEOF: false}, nil}, + {100, io.EOF, &FD{ZeroReadIsEOF: false}, io.EOF}, + {100, ErrNetClosing, &FD{ZeroReadIsEOF: false}, ErrNetClosing}, + {0, nil, &FD{ZeroReadIsEOF: false}, nil}, + {0, io.EOF, &FD{ZeroReadIsEOF: false}, io.EOF}, + {0, ErrNetClosing, &FD{ZeroReadIsEOF: false}, ErrNetClosing}, +} + +func TestEOFError(t *testing.T) { + for _, tt := range eofErrorTests { + actual := tt.fd.EOFError(tt.n, tt.err) + if actual != tt.expected { + t.Errorf("eofError(%v, %v, %v): expected %v, actual %v", tt.n, tt.err, tt.fd.ZeroReadIsEOF, tt.expected, actual) + } + } +} diff --git a/src/internal/poll/fd_unix.go b/src/internal/poll/fd_unix.go new file mode 100644 index 0000000..2786064 --- /dev/null +++ b/src/internal/poll/fd_unix.go @@ -0,0 +1,799 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build unix || (js && wasm) + +package poll + +import ( + "internal/syscall/unix" + "io" + "sync/atomic" + "syscall" +) + +// FD is a file descriptor. The net and os packages use this type as a +// field of a larger type representing a network connection or OS file. +type FD struct { + // Lock sysfd and serialize access to Read and Write methods. + fdmu fdMutex + + // System file descriptor. Immutable until Close. + Sysfd int + + // I/O poller. + pd pollDesc + + // Writev cache. + iovecs *[]syscall.Iovec + + // Semaphore signaled when file is closed. + csema uint32 + + // Non-zero if this file has been set to blocking mode. + isBlocking uint32 + + // Whether this is a streaming descriptor, as opposed to a + // packet-based descriptor like a UDP socket. Immutable. + IsStream bool + + // Whether a zero byte read indicates EOF. This is false for a + // message based socket connection. + ZeroReadIsEOF bool + + // Whether this is a file rather than a network socket. + isFile bool +} + +// Init initializes the FD. The Sysfd field should already be set. +// This can be called multiple times on a single FD. +// The net argument is a network name from the net package (e.g., "tcp"), +// or "file". +// Set pollable to true if fd should be managed by runtime netpoll. +func (fd *FD) Init(net string, pollable bool) error { + // We don't actually care about the various network types. + if net == "file" { + fd.isFile = true + } + if !pollable { + fd.isBlocking = 1 + return nil + } + err := fd.pd.init(fd) + if err != nil { + // If we could not initialize the runtime poller, + // assume we are using blocking mode. + fd.isBlocking = 1 + } + return err +} + +// Destroy closes the file descriptor. This is called when there are +// no remaining references. +func (fd *FD) destroy() error { + // Poller may want to unregister fd in readiness notification mechanism, + // so this must be executed before CloseFunc. + fd.pd.close() + + // We don't use ignoringEINTR here because POSIX does not define + // whether the descriptor is closed if close returns EINTR. + // If the descriptor is indeed closed, using a loop would race + // with some other goroutine opening a new descriptor. + // (The Linux kernel guarantees that it is closed on an EINTR error.) + err := CloseFunc(fd.Sysfd) + + fd.Sysfd = -1 + runtime_Semrelease(&fd.csema) + return err +} + +// Close closes the FD. The underlying file descriptor is closed by the +// destroy method when there are no remaining references. +func (fd *FD) Close() error { + if !fd.fdmu.increfAndClose() { + return errClosing(fd.isFile) + } + + // Unblock any I/O. Once it all unblocks and returns, + // so that it cannot be referring to fd.sysfd anymore, + // the final decref will close fd.sysfd. This should happen + // fairly quickly, since all the I/O is non-blocking, and any + // attempts to block in the pollDesc will return errClosing(fd.isFile). + fd.pd.evict() + + // The call to decref will call destroy if there are no other + // references. + err := fd.decref() + + // Wait until the descriptor is closed. If this was the only + // reference, it is already closed. Only wait if the file has + // not been set to blocking mode, as otherwise any current I/O + // may be blocking, and that would block the Close. + // No need for an atomic read of isBlocking, increfAndClose means + // we have exclusive access to fd. + if fd.isBlocking == 0 { + runtime_Semacquire(&fd.csema) + } + + return err +} + +// SetBlocking puts the file into blocking mode. +func (fd *FD) SetBlocking() error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + // Atomic store so that concurrent calls to SetBlocking + // do not cause a race condition. isBlocking only ever goes + // from 0 to 1 so there is no real race here. + atomic.StoreUint32(&fd.isBlocking, 1) + return syscall.SetNonblock(fd.Sysfd, false) +} + +// Darwin and FreeBSD can't read or write 2GB+ files at a time, +// even on 64-bit systems. +// The same is true of socket implementations on many systems. +// See golang.org/issue/7812 and golang.org/issue/16266. +// Use 1GB instead of, say, 2GB-1, to keep subsequent reads aligned. +const maxRW = 1 << 30 + +// Read implements io.Reader. +func (fd *FD) Read(p []byte) (int, error) { + if err := fd.readLock(); err != nil { + return 0, err + } + defer fd.readUnlock() + if len(p) == 0 { + // If the caller wanted a zero byte read, return immediately + // without trying (but after acquiring the readLock). + // Otherwise syscall.Read returns 0, nil which looks like + // io.EOF. + // TODO(bradfitz): make it wait for readability? (Issue 15735) + return 0, nil + } + if err := fd.pd.prepareRead(fd.isFile); err != nil { + return 0, err + } + if fd.IsStream && len(p) > maxRW { + p = p[:maxRW] + } + for { + n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p) + if err != nil { + n = 0 + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitRead(fd.isFile); err == nil { + continue + } + } + } + err = fd.eofError(n, err) + return n, err + } +} + +// Pread wraps the pread system call. +func (fd *FD) Pread(p []byte, off int64) (int, error) { + // Call incref, not readLock, because since pread specifies the + // offset it is independent from other reads. + // Similarly, using the poller doesn't make sense for pread. + if err := fd.incref(); err != nil { + return 0, err + } + if fd.IsStream && len(p) > maxRW { + p = p[:maxRW] + } + var ( + n int + err error + ) + for { + n, err = syscall.Pread(fd.Sysfd, p, off) + if err != syscall.EINTR { + break + } + } + if err != nil { + n = 0 + } + fd.decref() + err = fd.eofError(n, err) + return n, err +} + +// ReadFrom wraps the recvfrom network call. +func (fd *FD) ReadFrom(p []byte) (int, syscall.Sockaddr, error) { + if err := fd.readLock(); err != nil { + return 0, nil, err + } + defer fd.readUnlock() + if err := fd.pd.prepareRead(fd.isFile); err != nil { + return 0, nil, err + } + for { + n, sa, err := syscall.Recvfrom(fd.Sysfd, p, 0) + if err != nil { + if err == syscall.EINTR { + continue + } + n = 0 + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitRead(fd.isFile); err == nil { + continue + } + } + } + err = fd.eofError(n, err) + return n, sa, err + } +} + +// ReadFromInet4 wraps the recvfrom network call for IPv4. +func (fd *FD) ReadFromInet4(p []byte, from *syscall.SockaddrInet4) (int, error) { + if err := fd.readLock(); err != nil { + return 0, err + } + defer fd.readUnlock() + if err := fd.pd.prepareRead(fd.isFile); err != nil { + return 0, err + } + for { + n, err := unix.RecvfromInet4(fd.Sysfd, p, 0, from) + if err != nil { + if err == syscall.EINTR { + continue + } + n = 0 + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitRead(fd.isFile); err == nil { + continue + } + } + } + err = fd.eofError(n, err) + return n, err + } +} + +// ReadFromInet6 wraps the recvfrom network call for IPv6. +func (fd *FD) ReadFromInet6(p []byte, from *syscall.SockaddrInet6) (int, error) { + if err := fd.readLock(); err != nil { + return 0, err + } + defer fd.readUnlock() + if err := fd.pd.prepareRead(fd.isFile); err != nil { + return 0, err + } + for { + n, err := unix.RecvfromInet6(fd.Sysfd, p, 0, from) + if err != nil { + if err == syscall.EINTR { + continue + } + n = 0 + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitRead(fd.isFile); err == nil { + continue + } + } + } + err = fd.eofError(n, err) + return n, err + } +} + +// ReadMsg wraps the recvmsg network call. +func (fd *FD) ReadMsg(p []byte, oob []byte, flags int) (int, int, int, syscall.Sockaddr, error) { + if err := fd.readLock(); err != nil { + return 0, 0, 0, nil, err + } + defer fd.readUnlock() + if err := fd.pd.prepareRead(fd.isFile); err != nil { + return 0, 0, 0, nil, err + } + for { + n, oobn, sysflags, sa, err := syscall.Recvmsg(fd.Sysfd, p, oob, flags) + if err != nil { + if err == syscall.EINTR { + continue + } + // TODO(dfc) should n and oobn be set to 0 + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitRead(fd.isFile); err == nil { + continue + } + } + } + err = fd.eofError(n, err) + return n, oobn, sysflags, sa, err + } +} + +// ReadMsgInet4 is ReadMsg, but specialized for syscall.SockaddrInet4. +func (fd *FD) ReadMsgInet4(p []byte, oob []byte, flags int, sa4 *syscall.SockaddrInet4) (int, int, int, error) { + if err := fd.readLock(); err != nil { + return 0, 0, 0, err + } + defer fd.readUnlock() + if err := fd.pd.prepareRead(fd.isFile); err != nil { + return 0, 0, 0, err + } + for { + n, oobn, sysflags, err := unix.RecvmsgInet4(fd.Sysfd, p, oob, flags, sa4) + if err != nil { + if err == syscall.EINTR { + continue + } + // TODO(dfc) should n and oobn be set to 0 + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitRead(fd.isFile); err == nil { + continue + } + } + } + err = fd.eofError(n, err) + return n, oobn, sysflags, err + } +} + +// ReadMsgInet6 is ReadMsg, but specialized for syscall.SockaddrInet6. +func (fd *FD) ReadMsgInet6(p []byte, oob []byte, flags int, sa6 *syscall.SockaddrInet6) (int, int, int, error) { + if err := fd.readLock(); err != nil { + return 0, 0, 0, err + } + defer fd.readUnlock() + if err := fd.pd.prepareRead(fd.isFile); err != nil { + return 0, 0, 0, err + } + for { + n, oobn, sysflags, err := unix.RecvmsgInet6(fd.Sysfd, p, oob, flags, sa6) + if err != nil { + if err == syscall.EINTR { + continue + } + // TODO(dfc) should n and oobn be set to 0 + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitRead(fd.isFile); err == nil { + continue + } + } + } + err = fd.eofError(n, err) + return n, oobn, sysflags, err + } +} + +// Write implements io.Writer. +func (fd *FD) Write(p []byte) (int, error) { + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + if err := fd.pd.prepareWrite(fd.isFile); err != nil { + return 0, err + } + var nn int + for { + max := len(p) + if fd.IsStream && max-nn > maxRW { + max = nn + maxRW + } + n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max]) + if n > 0 { + nn += n + } + if nn == len(p) { + return nn, err + } + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitWrite(fd.isFile); err == nil { + continue + } + } + if err != nil { + return nn, err + } + if n == 0 { + return nn, io.ErrUnexpectedEOF + } + } +} + +// Pwrite wraps the pwrite system call. +func (fd *FD) Pwrite(p []byte, off int64) (int, error) { + // Call incref, not writeLock, because since pwrite specifies the + // offset it is independent from other writes. + // Similarly, using the poller doesn't make sense for pwrite. + if err := fd.incref(); err != nil { + return 0, err + } + defer fd.decref() + var nn int + for { + max := len(p) + if fd.IsStream && max-nn > maxRW { + max = nn + maxRW + } + n, err := syscall.Pwrite(fd.Sysfd, p[nn:max], off+int64(nn)) + if err == syscall.EINTR { + continue + } + if n > 0 { + nn += n + } + if nn == len(p) { + return nn, err + } + if err != nil { + return nn, err + } + if n == 0 { + return nn, io.ErrUnexpectedEOF + } + } +} + +// WriteToInet4 wraps the sendto network call for IPv4 addresses. +func (fd *FD) WriteToInet4(p []byte, sa *syscall.SockaddrInet4) (int, error) { + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + if err := fd.pd.prepareWrite(fd.isFile); err != nil { + return 0, err + } + for { + err := unix.SendtoInet4(fd.Sysfd, p, 0, sa) + if err == syscall.EINTR { + continue + } + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitWrite(fd.isFile); err == nil { + continue + } + } + if err != nil { + return 0, err + } + return len(p), nil + } +} + +// WriteToInet6 wraps the sendto network call for IPv6 addresses. +func (fd *FD) WriteToInet6(p []byte, sa *syscall.SockaddrInet6) (int, error) { + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + if err := fd.pd.prepareWrite(fd.isFile); err != nil { + return 0, err + } + for { + err := unix.SendtoInet6(fd.Sysfd, p, 0, sa) + if err == syscall.EINTR { + continue + } + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitWrite(fd.isFile); err == nil { + continue + } + } + if err != nil { + return 0, err + } + return len(p), nil + } +} + +// WriteTo wraps the sendto network call. +func (fd *FD) WriteTo(p []byte, sa syscall.Sockaddr) (int, error) { + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + if err := fd.pd.prepareWrite(fd.isFile); err != nil { + return 0, err + } + for { + err := syscall.Sendto(fd.Sysfd, p, 0, sa) + if err == syscall.EINTR { + continue + } + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitWrite(fd.isFile); err == nil { + continue + } + } + if err != nil { + return 0, err + } + return len(p), nil + } +} + +// WriteMsg wraps the sendmsg network call. +func (fd *FD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, error) { + if err := fd.writeLock(); err != nil { + return 0, 0, err + } + defer fd.writeUnlock() + if err := fd.pd.prepareWrite(fd.isFile); err != nil { + return 0, 0, err + } + for { + n, err := syscall.SendmsgN(fd.Sysfd, p, oob, sa, 0) + if err == syscall.EINTR { + continue + } + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitWrite(fd.isFile); err == nil { + continue + } + } + if err != nil { + return n, 0, err + } + return n, len(oob), err + } +} + +// WriteMsgInet4 is WriteMsg specialized for syscall.SockaddrInet4. +func (fd *FD) WriteMsgInet4(p []byte, oob []byte, sa *syscall.SockaddrInet4) (int, int, error) { + if err := fd.writeLock(); err != nil { + return 0, 0, err + } + defer fd.writeUnlock() + if err := fd.pd.prepareWrite(fd.isFile); err != nil { + return 0, 0, err + } + for { + n, err := unix.SendmsgNInet4(fd.Sysfd, p, oob, sa, 0) + if err == syscall.EINTR { + continue + } + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitWrite(fd.isFile); err == nil { + continue + } + } + if err != nil { + return n, 0, err + } + return n, len(oob), err + } +} + +// WriteMsgInet6 is WriteMsg specialized for syscall.SockaddrInet6. +func (fd *FD) WriteMsgInet6(p []byte, oob []byte, sa *syscall.SockaddrInet6) (int, int, error) { + if err := fd.writeLock(); err != nil { + return 0, 0, err + } + defer fd.writeUnlock() + if err := fd.pd.prepareWrite(fd.isFile); err != nil { + return 0, 0, err + } + for { + n, err := unix.SendmsgNInet6(fd.Sysfd, p, oob, sa, 0) + if err == syscall.EINTR { + continue + } + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitWrite(fd.isFile); err == nil { + continue + } + } + if err != nil { + return n, 0, err + } + return n, len(oob), err + } +} + +// Accept wraps the accept network call. +func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { + if err := fd.readLock(); err != nil { + return -1, nil, "", err + } + defer fd.readUnlock() + + if err := fd.pd.prepareRead(fd.isFile); err != nil { + return -1, nil, "", err + } + for { + s, rsa, errcall, err := accept(fd.Sysfd) + if err == nil { + return s, rsa, "", err + } + switch err { + case syscall.EINTR: + continue + case syscall.EAGAIN: + if fd.pd.pollable() { + if err = fd.pd.waitRead(fd.isFile); err == nil { + continue + } + } + case syscall.ECONNABORTED: + // This means that a socket on the listen + // queue was closed before we Accept()ed it; + // it's a silly error, so try again. + continue + } + return -1, nil, errcall, err + } +} + +// Seek wraps syscall.Seek. +func (fd *FD) Seek(offset int64, whence int) (int64, error) { + if err := fd.incref(); err != nil { + return 0, err + } + defer fd.decref() + return syscall.Seek(fd.Sysfd, offset, whence) +} + +// ReadDirent wraps syscall.ReadDirent. +// We treat this like an ordinary system call rather than a call +// that tries to fill the buffer. +func (fd *FD) ReadDirent(buf []byte) (int, error) { + if err := fd.incref(); err != nil { + return 0, err + } + defer fd.decref() + for { + n, err := ignoringEINTRIO(syscall.ReadDirent, fd.Sysfd, buf) + if err != nil { + n = 0 + if err == syscall.EAGAIN && fd.pd.pollable() { + if err = fd.pd.waitRead(fd.isFile); err == nil { + continue + } + } + } + // Do not call eofError; caller does not expect to see io.EOF. + return n, err + } +} + +// Fchmod wraps syscall.Fchmod. +func (fd *FD) Fchmod(mode uint32) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return ignoringEINTR(func() error { + return syscall.Fchmod(fd.Sysfd, mode) + }) +} + +// Fchdir wraps syscall.Fchdir. +func (fd *FD) Fchdir() error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.Fchdir(fd.Sysfd) +} + +// Fstat wraps syscall.Fstat +func (fd *FD) Fstat(s *syscall.Stat_t) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return ignoringEINTR(func() error { + return syscall.Fstat(fd.Sysfd, s) + }) +} + +// tryDupCloexec indicates whether F_DUPFD_CLOEXEC should be used. +// If the kernel doesn't support it, this is set to 0. +var tryDupCloexec = int32(1) + +// DupCloseOnExec dups fd and marks it close-on-exec. +func DupCloseOnExec(fd int) (int, string, error) { + if syscall.F_DUPFD_CLOEXEC != 0 && atomic.LoadInt32(&tryDupCloexec) == 1 { + r0, e1 := fcntl(fd, syscall.F_DUPFD_CLOEXEC, 0) + if e1 == nil { + return r0, "", nil + } + switch e1.(syscall.Errno) { + case syscall.EINVAL, syscall.ENOSYS: + // Old kernel, or js/wasm (which returns + // ENOSYS). Fall back to the portable way from + // now on. + atomic.StoreInt32(&tryDupCloexec, 0) + default: + return -1, "fcntl", e1 + } + } + return dupCloseOnExecOld(fd) +} + +// dupCloseOnExecOld is the traditional way to dup an fd and +// set its O_CLOEXEC bit, using two system calls. +func dupCloseOnExecOld(fd int) (int, string, error) { + syscall.ForkLock.RLock() + defer syscall.ForkLock.RUnlock() + newfd, err := syscall.Dup(fd) + if err != nil { + return -1, "dup", err + } + syscall.CloseOnExec(newfd) + return newfd, "", nil +} + +// Dup duplicates the file descriptor. +func (fd *FD) Dup() (int, string, error) { + if err := fd.incref(); err != nil { + return -1, "", err + } + defer fd.decref() + return DupCloseOnExec(fd.Sysfd) +} + +// On Unix variants only, expose the IO event for the net code. + +// WaitWrite waits until data can be read from fd. +func (fd *FD) WaitWrite() error { + return fd.pd.waitWrite(fd.isFile) +} + +// WriteOnce is for testing only. It makes a single write call. +func (fd *FD) WriteOnce(p []byte) (int, error) { + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + return ignoringEINTRIO(syscall.Write, fd.Sysfd, p) +} + +// RawRead invokes the user-defined function f for a read operation. +func (fd *FD) RawRead(f func(uintptr) bool) error { + if err := fd.readLock(); err != nil { + return err + } + defer fd.readUnlock() + if err := fd.pd.prepareRead(fd.isFile); err != nil { + return err + } + for { + if f(uintptr(fd.Sysfd)) { + return nil + } + if err := fd.pd.waitRead(fd.isFile); err != nil { + return err + } + } +} + +// RawWrite invokes the user-defined function f for a write operation. +func (fd *FD) RawWrite(f func(uintptr) bool) error { + if err := fd.writeLock(); err != nil { + return err + } + defer fd.writeUnlock() + if err := fd.pd.prepareWrite(fd.isFile); err != nil { + return err + } + for { + if f(uintptr(fd.Sysfd)) { + return nil + } + if err := fd.pd.waitWrite(fd.isFile); err != nil { + return err + } + } +} + +// ignoringEINTRIO is like ignoringEINTR, but just for IO calls. +func ignoringEINTRIO(fn func(fd int, p []byte) (int, error), fd int, p []byte) (int, error) { + for { + n, err := fn(fd, p) + if err != syscall.EINTR { + return n, err + } + } +} diff --git a/src/internal/poll/fd_windows.go b/src/internal/poll/fd_windows.go new file mode 100644 index 0000000..1ca281b --- /dev/null +++ b/src/internal/poll/fd_windows.go @@ -0,0 +1,1336 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import ( + "errors" + "internal/race" + "internal/syscall/windows" + "io" + "sync" + "syscall" + "unicode/utf16" + "unicode/utf8" + "unsafe" +) + +var ( + initErr error + ioSync uint64 +) + +// This package uses the SetFileCompletionNotificationModes Windows +// API to skip calling GetQueuedCompletionStatus if an IO operation +// completes synchronously. There is a known bug where +// SetFileCompletionNotificationModes crashes on some systems (see +// https://support.microsoft.com/kb/2568167 for details). + +var useSetFileCompletionNotificationModes bool // determines is SetFileCompletionNotificationModes is present and safe to use + +// checkSetFileCompletionNotificationModes verifies that +// SetFileCompletionNotificationModes Windows API is present +// on the system and is safe to use. +// See https://support.microsoft.com/kb/2568167 for details. +func checkSetFileCompletionNotificationModes() { + err := syscall.LoadSetFileCompletionNotificationModes() + if err != nil { + return + } + protos := [2]int32{syscall.IPPROTO_TCP, 0} + var buf [32]syscall.WSAProtocolInfo + len := uint32(unsafe.Sizeof(buf)) + n, err := syscall.WSAEnumProtocols(&protos[0], &buf[0], &len) + if err != nil { + return + } + for i := int32(0); i < n; i++ { + if buf[i].ServiceFlags1&syscall.XP1_IFS_HANDLES == 0 { + return + } + } + useSetFileCompletionNotificationModes = true +} + +func init() { + var d syscall.WSAData + e := syscall.WSAStartup(uint32(0x202), &d) + if e != nil { + initErr = e + } + checkSetFileCompletionNotificationModes() +} + +// operation contains superset of data necessary to perform all async IO. +type operation struct { + // Used by IOCP interface, it must be first field + // of the struct, as our code rely on it. + o syscall.Overlapped + + // fields used by runtime.netpoll + runtimeCtx uintptr + mode int32 + errno int32 + qty uint32 + + // fields used only by net package + fd *FD + buf syscall.WSABuf + msg windows.WSAMsg + sa syscall.Sockaddr + rsa *syscall.RawSockaddrAny + rsan int32 + handle syscall.Handle + flags uint32 + bufs []syscall.WSABuf +} + +func (o *operation) InitBuf(buf []byte) { + o.buf.Len = uint32(len(buf)) + o.buf.Buf = nil + if len(buf) != 0 { + o.buf.Buf = &buf[0] + } +} + +func (o *operation) InitBufs(buf *[][]byte) { + if o.bufs == nil { + o.bufs = make([]syscall.WSABuf, 0, len(*buf)) + } else { + o.bufs = o.bufs[:0] + } + for _, b := range *buf { + if len(b) == 0 { + o.bufs = append(o.bufs, syscall.WSABuf{}) + continue + } + for len(b) > maxRW { + o.bufs = append(o.bufs, syscall.WSABuf{Len: maxRW, Buf: &b[0]}) + b = b[maxRW:] + } + if len(b) > 0 { + o.bufs = append(o.bufs, syscall.WSABuf{Len: uint32(len(b)), Buf: &b[0]}) + } + } +} + +// ClearBufs clears all pointers to Buffers parameter captured +// by InitBufs, so it can be released by garbage collector. +func (o *operation) ClearBufs() { + for i := range o.bufs { + o.bufs[i].Buf = nil + } + o.bufs = o.bufs[:0] +} + +func (o *operation) InitMsg(p []byte, oob []byte) { + o.InitBuf(p) + o.msg.Buffers = &o.buf + o.msg.BufferCount = 1 + + o.msg.Name = nil + o.msg.Namelen = 0 + + o.msg.Flags = 0 + o.msg.Control.Len = uint32(len(oob)) + o.msg.Control.Buf = nil + if len(oob) != 0 { + o.msg.Control.Buf = &oob[0] + } +} + +// execIO executes a single IO operation o. It submits and cancels +// IO in the current thread for systems where Windows CancelIoEx API +// is available. Alternatively, it passes the request onto +// runtime netpoll and waits for completion or cancels request. +func execIO(o *operation, submit func(o *operation) error) (int, error) { + if o.fd.pd.runtimeCtx == 0 { + return 0, errors.New("internal error: polling on unsupported descriptor type") + } + + fd := o.fd + // Notify runtime netpoll about starting IO. + err := fd.pd.prepare(int(o.mode), fd.isFile) + if err != nil { + return 0, err + } + // Start IO. + err = submit(o) + switch err { + case nil: + // IO completed immediately + if o.fd.skipSyncNotif { + // No completion message will follow, so return immediately. + return int(o.qty), nil + } + // Need to get our completion message anyway. + case syscall.ERROR_IO_PENDING: + // IO started, and we have to wait for its completion. + err = nil + default: + return 0, err + } + // Wait for our request to complete. + err = fd.pd.wait(int(o.mode), fd.isFile) + if err == nil { + // All is good. Extract our IO results and return. + if o.errno != 0 { + err = syscall.Errno(o.errno) + // More data available. Return back the size of received data. + if err == syscall.ERROR_MORE_DATA || err == windows.WSAEMSGSIZE { + return int(o.qty), err + } + return 0, err + } + return int(o.qty), nil + } + // IO is interrupted by "close" or "timeout" + netpollErr := err + switch netpollErr { + case ErrNetClosing, ErrFileClosing, ErrDeadlineExceeded: + // will deal with those. + default: + panic("unexpected runtime.netpoll error: " + netpollErr.Error()) + } + // Cancel our request. + err = syscall.CancelIoEx(fd.Sysfd, &o.o) + // Assuming ERROR_NOT_FOUND is returned, if IO is completed. + if err != nil && err != syscall.ERROR_NOT_FOUND { + // TODO(brainman): maybe do something else, but panic. + panic(err) + } + // Wait for cancellation to complete. + fd.pd.waitCanceled(int(o.mode)) + if o.errno != 0 { + err = syscall.Errno(o.errno) + if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled + err = netpollErr + } + return 0, err + } + // We issued a cancellation request. But, it seems, IO operation succeeded + // before the cancellation request run. We need to treat the IO operation as + // succeeded (the bytes are actually sent/recv from network). + return int(o.qty), nil +} + +// FD is a file descriptor. The net and os packages embed this type in +// a larger type representing a network connection or OS file. +type FD struct { + // Lock sysfd and serialize access to Read and Write methods. + fdmu fdMutex + + // System file descriptor. Immutable until Close. + Sysfd syscall.Handle + + // Read operation. + rop operation + // Write operation. + wop operation + + // I/O poller. + pd pollDesc + + // Used to implement pread/pwrite. + l sync.Mutex + + // For console I/O. + lastbits []byte // first few bytes of the last incomplete rune in last write + readuint16 []uint16 // buffer to hold uint16s obtained with ReadConsole + readbyte []byte // buffer to hold decoding of readuint16 from utf16 to utf8 + readbyteOffset int // readbyte[readOffset:] is yet to be consumed with file.Read + + // Semaphore signaled when file is closed. + csema uint32 + + skipSyncNotif bool + + // Whether this is a streaming descriptor, as opposed to a + // packet-based descriptor like a UDP socket. + IsStream bool + + // Whether a zero byte read indicates EOF. This is false for a + // message based socket connection. + ZeroReadIsEOF bool + + // Whether this is a file rather than a network socket. + isFile bool + + // The kind of this file. + kind fileKind +} + +// fileKind describes the kind of file. +type fileKind byte + +const ( + kindNet fileKind = iota + kindFile + kindConsole + kindDir + kindPipe +) + +// logInitFD is set by tests to enable file descriptor initialization logging. +var logInitFD func(net string, fd *FD, err error) + +// Init initializes the FD. The Sysfd field should already be set. +// This can be called multiple times on a single FD. +// The net argument is a network name from the net package (e.g., "tcp"), +// or "file" or "console" or "dir". +// Set pollable to true if fd should be managed by runtime netpoll. +func (fd *FD) Init(net string, pollable bool) (string, error) { + if initErr != nil { + return "", initErr + } + + switch net { + case "file": + fd.kind = kindFile + case "console": + fd.kind = kindConsole + case "dir": + fd.kind = kindDir + case "pipe": + fd.kind = kindPipe + case "tcp", "tcp4", "tcp6", + "udp", "udp4", "udp6", + "ip", "ip4", "ip6", + "unix", "unixgram", "unixpacket": + fd.kind = kindNet + default: + return "", errors.New("internal error: unknown network type " + net) + } + fd.isFile = fd.kind != kindNet + + var err error + if pollable { + // Only call init for a network socket. + // This means that we don't add files to the runtime poller. + // Adding files to the runtime poller can confuse matters + // if the user is doing their own overlapped I/O. + // See issue #21172. + // + // In general the code below avoids calling the execIO + // function for non-network sockets. If some method does + // somehow call execIO, then execIO, and therefore the + // calling method, will return an error, because + // fd.pd.runtimeCtx will be 0. + err = fd.pd.init(fd) + } + if logInitFD != nil { + logInitFD(net, fd, err) + } + if err != nil { + return "", err + } + if pollable && useSetFileCompletionNotificationModes { + // We do not use events, so we can skip them always. + flags := uint8(syscall.FILE_SKIP_SET_EVENT_ON_HANDLE) + // It's not safe to skip completion notifications for UDP: + // https://docs.microsoft.com/en-us/archive/blogs/winserverperformance/designing-applications-for-high-performance-part-iii + if net == "tcp" { + flags |= syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS + } + err := syscall.SetFileCompletionNotificationModes(fd.Sysfd, flags) + if err == nil && flags&syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS != 0 { + fd.skipSyncNotif = true + } + } + // Disable SIO_UDP_CONNRESET behavior. + // http://support.microsoft.com/kb/263823 + switch net { + case "udp", "udp4", "udp6": + ret := uint32(0) + flag := uint32(0) + size := uint32(unsafe.Sizeof(flag)) + err := syscall.WSAIoctl(fd.Sysfd, syscall.SIO_UDP_CONNRESET, (*byte)(unsafe.Pointer(&flag)), size, nil, 0, &ret, nil, 0) + if err != nil { + return "wsaioctl", err + } + } + fd.rop.mode = 'r' + fd.wop.mode = 'w' + fd.rop.fd = fd + fd.wop.fd = fd + fd.rop.runtimeCtx = fd.pd.runtimeCtx + fd.wop.runtimeCtx = fd.pd.runtimeCtx + return "", nil +} + +func (fd *FD) destroy() error { + if fd.Sysfd == syscall.InvalidHandle { + return syscall.EINVAL + } + // Poller may want to unregister fd in readiness notification mechanism, + // so this must be executed before fd.CloseFunc. + fd.pd.close() + var err error + switch fd.kind { + case kindNet: + // The net package uses the CloseFunc variable for testing. + err = CloseFunc(fd.Sysfd) + case kindDir: + err = syscall.FindClose(fd.Sysfd) + default: + err = syscall.CloseHandle(fd.Sysfd) + } + fd.Sysfd = syscall.InvalidHandle + runtime_Semrelease(&fd.csema) + return err +} + +// Close closes the FD. The underlying file descriptor is closed by +// the destroy method when there are no remaining references. +func (fd *FD) Close() error { + if !fd.fdmu.increfAndClose() { + return errClosing(fd.isFile) + } + if fd.kind == kindPipe { + syscall.CancelIoEx(fd.Sysfd, nil) + } + // unblock pending reader and writer + fd.pd.evict() + err := fd.decref() + // Wait until the descriptor is closed. If this was the only + // reference, it is already closed. + runtime_Semacquire(&fd.csema) + return err +} + +// Windows ReadFile and WSARecv use DWORD (uint32) parameter to pass buffer length. +// This prevents us reading blocks larger than 4GB. +// See golang.org/issue/26923. +const maxRW = 1 << 30 // 1GB is large enough and keeps subsequent reads aligned + +// Read implements io.Reader. +func (fd *FD) Read(buf []byte) (int, error) { + if err := fd.readLock(); err != nil { + return 0, err + } + defer fd.readUnlock() + + if len(buf) > maxRW { + buf = buf[:maxRW] + } + + var n int + var err error + if fd.isFile { + fd.l.Lock() + defer fd.l.Unlock() + switch fd.kind { + case kindConsole: + n, err = fd.readConsole(buf) + default: + n, err = syscall.Read(fd.Sysfd, buf) + if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED { + // Close uses CancelIoEx to interrupt concurrent I/O for pipes. + // If the fd is a pipe and the Read was interrupted by CancelIoEx, + // we assume it is interrupted by Close. + err = ErrFileClosing + } + } + if err != nil { + n = 0 + } + } else { + o := &fd.rop + o.InitBuf(buf) + n, err = execIO(o, func(o *operation) error { + return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) + }) + if race.Enabled { + race.Acquire(unsafe.Pointer(&ioSync)) + } + } + if len(buf) != 0 { + err = fd.eofError(n, err) + } + return n, err +} + +var ReadConsole = syscall.ReadConsole // changed for testing + +// readConsole reads utf16 characters from console File, +// encodes them into utf8 and stores them in buffer b. +// It returns the number of utf8 bytes read and an error, if any. +func (fd *FD) readConsole(b []byte) (int, error) { + if len(b) == 0 { + return 0, nil + } + + if fd.readuint16 == nil { + // Note: syscall.ReadConsole fails for very large buffers. + // The limit is somewhere around (but not exactly) 16384. + // Stay well below. + fd.readuint16 = make([]uint16, 0, 10000) + fd.readbyte = make([]byte, 0, 4*cap(fd.readuint16)) + } + + for fd.readbyteOffset >= len(fd.readbyte) { + n := cap(fd.readuint16) - len(fd.readuint16) + if n > len(b) { + n = len(b) + } + var nw uint32 + err := ReadConsole(fd.Sysfd, &fd.readuint16[:len(fd.readuint16)+1][len(fd.readuint16)], uint32(n), &nw, nil) + if err != nil { + return 0, err + } + uint16s := fd.readuint16[:len(fd.readuint16)+int(nw)] + fd.readuint16 = fd.readuint16[:0] + buf := fd.readbyte[:0] + for i := 0; i < len(uint16s); i++ { + r := rune(uint16s[i]) + if utf16.IsSurrogate(r) { + if i+1 == len(uint16s) { + if nw > 0 { + // Save half surrogate pair for next time. + fd.readuint16 = fd.readuint16[:1] + fd.readuint16[0] = uint16(r) + break + } + r = utf8.RuneError + } else { + r = utf16.DecodeRune(r, rune(uint16s[i+1])) + if r != utf8.RuneError { + i++ + } + } + } + n := utf8.EncodeRune(buf[len(buf):cap(buf)], r) + buf = buf[:len(buf)+n] + } + fd.readbyte = buf + fd.readbyteOffset = 0 + if nw == 0 { + break + } + } + + src := fd.readbyte[fd.readbyteOffset:] + var i int + for i = 0; i < len(src) && i < len(b); i++ { + x := src[i] + if x == 0x1A { // Ctrl-Z + if i == 0 { + fd.readbyteOffset++ + } + break + } + b[i] = x + } + fd.readbyteOffset += i + return i, nil +} + +// Pread emulates the Unix pread system call. +func (fd *FD) Pread(b []byte, off int64) (int, error) { + // Call incref, not readLock, because since pread specifies the + // offset it is independent from other reads. + if err := fd.incref(); err != nil { + return 0, err + } + defer fd.decref() + + if len(b) > maxRW { + b = b[:maxRW] + } + + fd.l.Lock() + defer fd.l.Unlock() + curoffset, e := syscall.Seek(fd.Sysfd, 0, io.SeekCurrent) + if e != nil { + return 0, e + } + defer syscall.Seek(fd.Sysfd, curoffset, io.SeekStart) + o := syscall.Overlapped{ + OffsetHigh: uint32(off >> 32), + Offset: uint32(off), + } + var done uint32 + e = syscall.ReadFile(fd.Sysfd, b, &done, &o) + if e != nil { + done = 0 + if e == syscall.ERROR_HANDLE_EOF { + e = io.EOF + } + } + if len(b) != 0 { + e = fd.eofError(int(done), e) + } + return int(done), e +} + +// ReadFrom wraps the recvfrom network call. +func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) { + if len(buf) == 0 { + return 0, nil, nil + } + if len(buf) > maxRW { + buf = buf[:maxRW] + } + if err := fd.readLock(); err != nil { + return 0, nil, err + } + defer fd.readUnlock() + o := &fd.rop + o.InitBuf(buf) + n, err := execIO(o, func(o *operation) error { + if o.rsa == nil { + o.rsa = new(syscall.RawSockaddrAny) + } + o.rsan = int32(unsafe.Sizeof(*o.rsa)) + return syscall.WSARecvFrom(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil) + }) + err = fd.eofError(n, err) + if err != nil { + return n, nil, err + } + sa, _ := o.rsa.Sockaddr() + return n, sa, nil +} + +// ReadFromInet4 wraps the recvfrom network call for IPv4. +func (fd *FD) ReadFromInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error) { + if len(buf) == 0 { + return 0, nil + } + if len(buf) > maxRW { + buf = buf[:maxRW] + } + if err := fd.readLock(); err != nil { + return 0, err + } + defer fd.readUnlock() + o := &fd.rop + o.InitBuf(buf) + n, err := execIO(o, func(o *operation) error { + if o.rsa == nil { + o.rsa = new(syscall.RawSockaddrAny) + } + o.rsan = int32(unsafe.Sizeof(*o.rsa)) + return syscall.WSARecvFrom(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil) + }) + err = fd.eofError(n, err) + if err != nil { + return n, err + } + rawToSockaddrInet4(o.rsa, sa4) + return n, err +} + +// ReadFromInet6 wraps the recvfrom network call for IPv6. +func (fd *FD) ReadFromInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error) { + if len(buf) == 0 { + return 0, nil + } + if len(buf) > maxRW { + buf = buf[:maxRW] + } + if err := fd.readLock(); err != nil { + return 0, err + } + defer fd.readUnlock() + o := &fd.rop + o.InitBuf(buf) + n, err := execIO(o, func(o *operation) error { + if o.rsa == nil { + o.rsa = new(syscall.RawSockaddrAny) + } + o.rsan = int32(unsafe.Sizeof(*o.rsa)) + return syscall.WSARecvFrom(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil) + }) + err = fd.eofError(n, err) + if err != nil { + return n, err + } + rawToSockaddrInet6(o.rsa, sa6) + return n, err +} + +// Write implements io.Writer. +func (fd *FD) Write(buf []byte) (int, error) { + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + if fd.isFile { + fd.l.Lock() + defer fd.l.Unlock() + } + + ntotal := 0 + for len(buf) > 0 { + b := buf + if len(b) > maxRW { + b = b[:maxRW] + } + var n int + var err error + if fd.isFile { + switch fd.kind { + case kindConsole: + n, err = fd.writeConsole(b) + default: + n, err = syscall.Write(fd.Sysfd, b) + if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED { + // Close uses CancelIoEx to interrupt concurrent I/O for pipes. + // If the fd is a pipe and the Write was interrupted by CancelIoEx, + // we assume it is interrupted by Close. + err = ErrFileClosing + } + } + if err != nil { + n = 0 + } + } else { + if race.Enabled { + race.ReleaseMerge(unsafe.Pointer(&ioSync)) + } + o := &fd.wop + o.InitBuf(b) + n, err = execIO(o, func(o *operation) error { + return syscall.WSASend(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil) + }) + } + ntotal += n + if err != nil { + return ntotal, err + } + buf = buf[n:] + } + return ntotal, nil +} + +// writeConsole writes len(b) bytes to the console File. +// It returns the number of bytes written and an error, if any. +func (fd *FD) writeConsole(b []byte) (int, error) { + n := len(b) + runes := make([]rune, 0, 256) + if len(fd.lastbits) > 0 { + b = append(fd.lastbits, b...) + fd.lastbits = nil + + } + for len(b) >= utf8.UTFMax || utf8.FullRune(b) { + r, l := utf8.DecodeRune(b) + runes = append(runes, r) + b = b[l:] + } + if len(b) > 0 { + fd.lastbits = make([]byte, len(b)) + copy(fd.lastbits, b) + } + // syscall.WriteConsole seems to fail, if given large buffer. + // So limit the buffer to 16000 characters. This number was + // discovered by experimenting with syscall.WriteConsole. + const maxWrite = 16000 + for len(runes) > 0 { + m := len(runes) + if m > maxWrite { + m = maxWrite + } + chunk := runes[:m] + runes = runes[m:] + uint16s := utf16.Encode(chunk) + for len(uint16s) > 0 { + var written uint32 + err := syscall.WriteConsole(fd.Sysfd, &uint16s[0], uint32(len(uint16s)), &written, nil) + if err != nil { + return 0, err + } + uint16s = uint16s[written:] + } + } + return n, nil +} + +// Pwrite emulates the Unix pwrite system call. +func (fd *FD) Pwrite(buf []byte, off int64) (int, error) { + // Call incref, not writeLock, because since pwrite specifies the + // offset it is independent from other writes. + if err := fd.incref(); err != nil { + return 0, err + } + defer fd.decref() + + fd.l.Lock() + defer fd.l.Unlock() + curoffset, e := syscall.Seek(fd.Sysfd, 0, io.SeekCurrent) + if e != nil { + return 0, e + } + defer syscall.Seek(fd.Sysfd, curoffset, io.SeekStart) + + ntotal := 0 + for len(buf) > 0 { + b := buf + if len(b) > maxRW { + b = b[:maxRW] + } + var n uint32 + o := syscall.Overlapped{ + OffsetHigh: uint32(off >> 32), + Offset: uint32(off), + } + e = syscall.WriteFile(fd.Sysfd, b, &n, &o) + ntotal += int(n) + if e != nil { + return ntotal, e + } + buf = buf[n:] + off += int64(n) + } + return ntotal, nil +} + +// Writev emulates the Unix writev system call. +func (fd *FD) Writev(buf *[][]byte) (int64, error) { + if len(*buf) == 0 { + return 0, nil + } + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + if race.Enabled { + race.ReleaseMerge(unsafe.Pointer(&ioSync)) + } + o := &fd.wop + o.InitBufs(buf) + n, err := execIO(o, func(o *operation) error { + return syscall.WSASend(o.fd.Sysfd, &o.bufs[0], uint32(len(o.bufs)), &o.qty, 0, &o.o, nil) + }) + o.ClearBufs() + TestHookDidWritev(n) + consume(buf, int64(n)) + return int64(n), err +} + +// WriteTo wraps the sendto network call. +func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + + if len(buf) == 0 { + // handle zero-byte payload + o := &fd.wop + o.InitBuf(buf) + o.sa = sa + n, err := execIO(o, func(o *operation) error { + return syscall.WSASendto(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil) + }) + return n, err + } + + ntotal := 0 + for len(buf) > 0 { + b := buf + if len(b) > maxRW { + b = b[:maxRW] + } + o := &fd.wop + o.InitBuf(b) + o.sa = sa + n, err := execIO(o, func(o *operation) error { + return syscall.WSASendto(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil) + }) + ntotal += int(n) + if err != nil { + return ntotal, err + } + buf = buf[n:] + } + return ntotal, nil +} + +// WriteToInet4 is WriteTo, specialized for syscall.SockaddrInet4. +func (fd *FD) WriteToInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error) { + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + + if len(buf) == 0 { + // handle zero-byte payload + o := &fd.wop + o.InitBuf(buf) + n, err := execIO(o, func(o *operation) error { + return windows.WSASendtoInet4(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, sa4, &o.o, nil) + }) + return n, err + } + + ntotal := 0 + for len(buf) > 0 { + b := buf + if len(b) > maxRW { + b = b[:maxRW] + } + o := &fd.wop + o.InitBuf(b) + n, err := execIO(o, func(o *operation) error { + return windows.WSASendtoInet4(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, sa4, &o.o, nil) + }) + ntotal += int(n) + if err != nil { + return ntotal, err + } + buf = buf[n:] + } + return ntotal, nil +} + +// WriteToInet6 is WriteTo, specialized for syscall.SockaddrInet6. +func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error) { + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + + if len(buf) == 0 { + // handle zero-byte payload + o := &fd.wop + o.InitBuf(buf) + n, err := execIO(o, func(o *operation) error { + return windows.WSASendtoInet6(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, sa6, &o.o, nil) + }) + return n, err + } + + ntotal := 0 + for len(buf) > 0 { + b := buf + if len(b) > maxRW { + b = b[:maxRW] + } + o := &fd.wop + o.InitBuf(b) + n, err := execIO(o, func(o *operation) error { + return windows.WSASendtoInet6(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, sa6, &o.o, nil) + }) + ntotal += int(n) + if err != nil { + return ntotal, err + } + buf = buf[n:] + } + return ntotal, nil +} + +// Call ConnectEx. This doesn't need any locking, since it is only +// called when the descriptor is first created. This is here rather +// than in the net package so that it can use fd.wop. +func (fd *FD) ConnectEx(ra syscall.Sockaddr) error { + o := &fd.wop + o.sa = ra + _, err := execIO(o, func(o *operation) error { + return ConnectExFunc(o.fd.Sysfd, o.sa, nil, 0, nil, &o.o) + }) + return err +} + +func (fd *FD) acceptOne(s syscall.Handle, rawsa []syscall.RawSockaddrAny, o *operation) (string, error) { + // Submit accept request. + o.handle = s + o.rsan = int32(unsafe.Sizeof(rawsa[0])) + _, err := execIO(o, func(o *operation) error { + return AcceptFunc(o.fd.Sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o) + }) + if err != nil { + CloseFunc(s) + return "acceptex", err + } + + // Inherit properties of the listening socket. + err = syscall.Setsockopt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&fd.Sysfd)), int32(unsafe.Sizeof(fd.Sysfd))) + if err != nil { + CloseFunc(s) + return "setsockopt", err + } + + return "", nil +} + +// Accept handles accepting a socket. The sysSocket parameter is used +// to allocate the net socket. +func (fd *FD) Accept(sysSocket func() (syscall.Handle, error)) (syscall.Handle, []syscall.RawSockaddrAny, uint32, string, error) { + if err := fd.readLock(); err != nil { + return syscall.InvalidHandle, nil, 0, "", err + } + defer fd.readUnlock() + + o := &fd.rop + var rawsa [2]syscall.RawSockaddrAny + for { + s, err := sysSocket() + if err != nil { + return syscall.InvalidHandle, nil, 0, "", err + } + + errcall, err := fd.acceptOne(s, rawsa[:], o) + if err == nil { + return s, rawsa[:], uint32(o.rsan), "", nil + } + + // Sometimes we see WSAECONNRESET and ERROR_NETNAME_DELETED is + // returned here. These happen if connection reset is received + // before AcceptEx could complete. These errors relate to new + // connection, not to AcceptEx, so ignore broken connection and + // try AcceptEx again for more connections. + errno, ok := err.(syscall.Errno) + if !ok { + return syscall.InvalidHandle, nil, 0, errcall, err + } + switch errno { + case syscall.ERROR_NETNAME_DELETED, syscall.WSAECONNRESET: + // ignore these and try again + default: + return syscall.InvalidHandle, nil, 0, errcall, err + } + } +} + +// Seek wraps syscall.Seek. +func (fd *FD) Seek(offset int64, whence int) (int64, error) { + if err := fd.incref(); err != nil { + return 0, err + } + defer fd.decref() + + fd.l.Lock() + defer fd.l.Unlock() + + return syscall.Seek(fd.Sysfd, offset, whence) +} + +// FindNextFile wraps syscall.FindNextFile. +func (fd *FD) FindNextFile(data *syscall.Win32finddata) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.FindNextFile(fd.Sysfd, data) +} + +// Fchmod updates syscall.ByHandleFileInformation.Fileattributes when needed. +func (fd *FD) Fchmod(mode uint32) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + + var d syscall.ByHandleFileInformation + if err := syscall.GetFileInformationByHandle(fd.Sysfd, &d); err != nil { + return err + } + attrs := d.FileAttributes + if mode&syscall.S_IWRITE != 0 { + attrs &^= syscall.FILE_ATTRIBUTE_READONLY + } else { + attrs |= syscall.FILE_ATTRIBUTE_READONLY + } + if attrs == d.FileAttributes { + return nil + } + + var du windows.FILE_BASIC_INFO + du.FileAttributes = attrs + l := uint32(unsafe.Sizeof(d)) + return windows.SetFileInformationByHandle(fd.Sysfd, windows.FileBasicInfo, uintptr(unsafe.Pointer(&du)), l) +} + +// Fchdir wraps syscall.Fchdir. +func (fd *FD) Fchdir() error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.Fchdir(fd.Sysfd) +} + +// GetFileType wraps syscall.GetFileType. +func (fd *FD) GetFileType() (uint32, error) { + if err := fd.incref(); err != nil { + return 0, err + } + defer fd.decref() + return syscall.GetFileType(fd.Sysfd) +} + +// GetFileInformationByHandle wraps GetFileInformationByHandle. +func (fd *FD) GetFileInformationByHandle(data *syscall.ByHandleFileInformation) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.GetFileInformationByHandle(fd.Sysfd, data) +} + +// RawRead invokes the user-defined function f for a read operation. +func (fd *FD) RawRead(f func(uintptr) bool) error { + if err := fd.readLock(); err != nil { + return err + } + defer fd.readUnlock() + for { + if f(uintptr(fd.Sysfd)) { + return nil + } + + // Use a zero-byte read as a way to get notified when this + // socket is readable. h/t https://stackoverflow.com/a/42019668/332798 + o := &fd.rop + o.InitBuf(nil) + if !fd.IsStream { + o.flags |= windows.MSG_PEEK + } + _, err := execIO(o, func(o *operation) error { + return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) + }) + if err == windows.WSAEMSGSIZE { + // expected with a 0-byte peek, ignore. + } else if err != nil { + return err + } + } +} + +// RawWrite invokes the user-defined function f for a write operation. +func (fd *FD) RawWrite(f func(uintptr) bool) error { + if err := fd.writeLock(); err != nil { + return err + } + defer fd.writeUnlock() + + if f(uintptr(fd.Sysfd)) { + return nil + } + + // TODO(tmm1): find a way to detect socket writability + return syscall.EWINDOWS +} + +func sockaddrInet4ToRaw(rsa *syscall.RawSockaddrAny, sa *syscall.SockaddrInet4) int32 { + *rsa = syscall.RawSockaddrAny{} + raw := (*syscall.RawSockaddrInet4)(unsafe.Pointer(rsa)) + raw.Family = syscall.AF_INET + p := (*[2]byte)(unsafe.Pointer(&raw.Port)) + p[0] = byte(sa.Port >> 8) + p[1] = byte(sa.Port) + raw.Addr = sa.Addr + return int32(unsafe.Sizeof(*raw)) +} + +func sockaddrInet6ToRaw(rsa *syscall.RawSockaddrAny, sa *syscall.SockaddrInet6) int32 { + *rsa = syscall.RawSockaddrAny{} + raw := (*syscall.RawSockaddrInet6)(unsafe.Pointer(rsa)) + raw.Family = syscall.AF_INET6 + p := (*[2]byte)(unsafe.Pointer(&raw.Port)) + p[0] = byte(sa.Port >> 8) + p[1] = byte(sa.Port) + raw.Scope_id = sa.ZoneId + raw.Addr = sa.Addr + return int32(unsafe.Sizeof(*raw)) +} + +func rawToSockaddrInet4(rsa *syscall.RawSockaddrAny, sa *syscall.SockaddrInet4) { + pp := (*syscall.RawSockaddrInet4)(unsafe.Pointer(rsa)) + p := (*[2]byte)(unsafe.Pointer(&pp.Port)) + sa.Port = int(p[0])<<8 + int(p[1]) + sa.Addr = pp.Addr +} + +func rawToSockaddrInet6(rsa *syscall.RawSockaddrAny, sa *syscall.SockaddrInet6) { + pp := (*syscall.RawSockaddrInet6)(unsafe.Pointer(rsa)) + p := (*[2]byte)(unsafe.Pointer(&pp.Port)) + sa.Port = int(p[0])<<8 + int(p[1]) + sa.ZoneId = pp.Scope_id + sa.Addr = pp.Addr +} + +func sockaddrToRaw(rsa *syscall.RawSockaddrAny, sa syscall.Sockaddr) (int32, error) { + switch sa := sa.(type) { + case *syscall.SockaddrInet4: + sz := sockaddrInet4ToRaw(rsa, sa) + return sz, nil + case *syscall.SockaddrInet6: + sz := sockaddrInet6ToRaw(rsa, sa) + return sz, nil + default: + return 0, syscall.EWINDOWS + } +} + +// ReadMsg wraps the WSARecvMsg network call. +func (fd *FD) ReadMsg(p []byte, oob []byte, flags int) (int, int, int, syscall.Sockaddr, error) { + if err := fd.readLock(); err != nil { + return 0, 0, 0, nil, err + } + defer fd.readUnlock() + + if len(p) > maxRW { + p = p[:maxRW] + } + + o := &fd.rop + o.InitMsg(p, oob) + if o.rsa == nil { + o.rsa = new(syscall.RawSockaddrAny) + } + o.msg.Name = (syscall.Pointer)(unsafe.Pointer(o.rsa)) + o.msg.Namelen = int32(unsafe.Sizeof(*o.rsa)) + o.msg.Flags = uint32(flags) + n, err := execIO(o, func(o *operation) error { + return windows.WSARecvMsg(o.fd.Sysfd, &o.msg, &o.qty, &o.o, nil) + }) + err = fd.eofError(n, err) + var sa syscall.Sockaddr + if err == nil { + sa, err = o.rsa.Sockaddr() + } + return n, int(o.msg.Control.Len), int(o.msg.Flags), sa, err +} + +// ReadMsgInet4 is ReadMsg, but specialized to return a syscall.SockaddrInet4. +func (fd *FD) ReadMsgInet4(p []byte, oob []byte, flags int, sa4 *syscall.SockaddrInet4) (int, int, int, error) { + if err := fd.readLock(); err != nil { + return 0, 0, 0, err + } + defer fd.readUnlock() + + if len(p) > maxRW { + p = p[:maxRW] + } + + o := &fd.rop + o.InitMsg(p, oob) + if o.rsa == nil { + o.rsa = new(syscall.RawSockaddrAny) + } + o.msg.Name = (syscall.Pointer)(unsafe.Pointer(o.rsa)) + o.msg.Namelen = int32(unsafe.Sizeof(*o.rsa)) + o.msg.Flags = uint32(flags) + n, err := execIO(o, func(o *operation) error { + return windows.WSARecvMsg(o.fd.Sysfd, &o.msg, &o.qty, &o.o, nil) + }) + err = fd.eofError(n, err) + if err == nil { + rawToSockaddrInet4(o.rsa, sa4) + } + return n, int(o.msg.Control.Len), int(o.msg.Flags), err +} + +// ReadMsgInet6 is ReadMsg, but specialized to return a syscall.SockaddrInet6. +func (fd *FD) ReadMsgInet6(p []byte, oob []byte, flags int, sa6 *syscall.SockaddrInet6) (int, int, int, error) { + if err := fd.readLock(); err != nil { + return 0, 0, 0, err + } + defer fd.readUnlock() + + if len(p) > maxRW { + p = p[:maxRW] + } + + o := &fd.rop + o.InitMsg(p, oob) + if o.rsa == nil { + o.rsa = new(syscall.RawSockaddrAny) + } + o.msg.Name = (syscall.Pointer)(unsafe.Pointer(o.rsa)) + o.msg.Namelen = int32(unsafe.Sizeof(*o.rsa)) + o.msg.Flags = uint32(flags) + n, err := execIO(o, func(o *operation) error { + return windows.WSARecvMsg(o.fd.Sysfd, &o.msg, &o.qty, &o.o, nil) + }) + err = fd.eofError(n, err) + if err == nil { + rawToSockaddrInet6(o.rsa, sa6) + } + return n, int(o.msg.Control.Len), int(o.msg.Flags), err +} + +// WriteMsg wraps the WSASendMsg network call. +func (fd *FD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, error) { + if len(p) > maxRW { + return 0, 0, errors.New("packet is too large (only 1GB is allowed)") + } + + if err := fd.writeLock(); err != nil { + return 0, 0, err + } + defer fd.writeUnlock() + + o := &fd.wop + o.InitMsg(p, oob) + if sa != nil { + if o.rsa == nil { + o.rsa = new(syscall.RawSockaddrAny) + } + len, err := sockaddrToRaw(o.rsa, sa) + if err != nil { + return 0, 0, err + } + o.msg.Name = (syscall.Pointer)(unsafe.Pointer(o.rsa)) + o.msg.Namelen = len + } + n, err := execIO(o, func(o *operation) error { + return windows.WSASendMsg(o.fd.Sysfd, &o.msg, 0, &o.qty, &o.o, nil) + }) + return n, int(o.msg.Control.Len), err +} + +// WriteMsgInet4 is WriteMsg specialized for syscall.SockaddrInet4. +func (fd *FD) WriteMsgInet4(p []byte, oob []byte, sa *syscall.SockaddrInet4) (int, int, error) { + if len(p) > maxRW { + return 0, 0, errors.New("packet is too large (only 1GB is allowed)") + } + + if err := fd.writeLock(); err != nil { + return 0, 0, err + } + defer fd.writeUnlock() + + o := &fd.wop + o.InitMsg(p, oob) + if o.rsa == nil { + o.rsa = new(syscall.RawSockaddrAny) + } + len := sockaddrInet4ToRaw(o.rsa, sa) + o.msg.Name = (syscall.Pointer)(unsafe.Pointer(o.rsa)) + o.msg.Namelen = len + n, err := execIO(o, func(o *operation) error { + return windows.WSASendMsg(o.fd.Sysfd, &o.msg, 0, &o.qty, &o.o, nil) + }) + return n, int(o.msg.Control.Len), err +} + +// WriteMsgInet6 is WriteMsg specialized for syscall.SockaddrInet6. +func (fd *FD) WriteMsgInet6(p []byte, oob []byte, sa *syscall.SockaddrInet6) (int, int, error) { + if len(p) > maxRW { + return 0, 0, errors.New("packet is too large (only 1GB is allowed)") + } + + if err := fd.writeLock(); err != nil { + return 0, 0, err + } + defer fd.writeUnlock() + + o := &fd.wop + o.InitMsg(p, oob) + if o.rsa == nil { + o.rsa = new(syscall.RawSockaddrAny) + } + len := sockaddrInet6ToRaw(o.rsa, sa) + o.msg.Name = (syscall.Pointer)(unsafe.Pointer(o.rsa)) + o.msg.Namelen = len + n, err := execIO(o, func(o *operation) error { + return windows.WSASendMsg(o.fd.Sysfd, &o.msg, 0, &o.qty, &o.o, nil) + }) + return n, int(o.msg.Control.Len), err +} diff --git a/src/internal/poll/fd_windows_test.go b/src/internal/poll/fd_windows_test.go new file mode 100644 index 0000000..e3ca0e2 --- /dev/null +++ b/src/internal/poll/fd_windows_test.go @@ -0,0 +1,111 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll_test + +import ( + "fmt" + "internal/poll" + "os" + "sync" + "syscall" + "testing" +) + +type loggedFD struct { + Net string + FD *poll.FD + Err error +} + +var ( + logMu sync.Mutex + loggedFDs map[syscall.Handle]*loggedFD +) + +func logFD(net string, fd *poll.FD, err error) { + logMu.Lock() + defer logMu.Unlock() + + loggedFDs[fd.Sysfd] = &loggedFD{ + Net: net, + FD: fd, + Err: err, + } +} + +func init() { + loggedFDs = make(map[syscall.Handle]*loggedFD) + *poll.LogInitFD = logFD +} + +func findLoggedFD(h syscall.Handle) (lfd *loggedFD, found bool) { + logMu.Lock() + defer logMu.Unlock() + + lfd, found = loggedFDs[h] + return lfd, found +} + +// checkFileIsNotPartOfNetpoll verifies that f is not managed by netpoll. +// It returns error, if check fails. +func checkFileIsNotPartOfNetpoll(f *os.File) error { + lfd, found := findLoggedFD(syscall.Handle(f.Fd())) + if !found { + return fmt.Errorf("%v fd=%v: is not found in the log", f.Name(), f.Fd()) + } + if lfd.FD.IsPartOfNetpoll() { + return fmt.Errorf("%v fd=%v: is part of netpoll, but should not be (logged: net=%v err=%v)", f.Name(), f.Fd(), lfd.Net, lfd.Err) + } + return nil +} + +func TestFileFdsAreInitialised(t *testing.T) { + exe, err := os.Executable() + if err != nil { + t.Fatal(err) + } + f, err := os.Open(exe) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + err = checkFileIsNotPartOfNetpoll(f) + if err != nil { + t.Fatal(err) + } +} + +func TestSerialFdsAreInitialised(t *testing.T) { + for _, name := range []string{"COM1", "COM2", "COM3", "COM4"} { + t.Run(name, func(t *testing.T) { + h, err := syscall.CreateFile(syscall.StringToUTF16Ptr(name), + syscall.GENERIC_READ|syscall.GENERIC_WRITE, + 0, + nil, + syscall.OPEN_EXISTING, + syscall.FILE_ATTRIBUTE_NORMAL|syscall.FILE_FLAG_OVERLAPPED, + 0) + if err != nil { + if errno, ok := err.(syscall.Errno); ok { + switch errno { + case syscall.ERROR_FILE_NOT_FOUND, + syscall.ERROR_ACCESS_DENIED: + t.Log("Skipping: ", err) + return + } + } + t.Fatal(err) + } + f := os.NewFile(uintptr(h), name) + defer f.Close() + + err = checkFileIsNotPartOfNetpoll(f) + if err != nil { + t.Fatal(err) + } + }) + } +} diff --git a/src/internal/poll/fd_writev_darwin.go b/src/internal/poll/fd_writev_darwin.go new file mode 100644 index 0000000..b5b8998 --- /dev/null +++ b/src/internal/poll/fd_writev_darwin.go @@ -0,0 +1,17 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build darwin + +package poll + +import ( + "syscall" + _ "unsafe" // for go:linkname +) + +// Implemented in syscall/syscall_darwin.go. +// +//go:linkname writev syscall.writev +func writev(fd int, iovecs []syscall.Iovec) (uintptr, error) diff --git a/src/internal/poll/fd_writev_illumos.go b/src/internal/poll/fd_writev_illumos.go new file mode 100644 index 0000000..79190c2 --- /dev/null +++ b/src/internal/poll/fd_writev_illumos.go @@ -0,0 +1,16 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build illumos + +package poll + +import ( + "internal/syscall/unix" + "syscall" +) + +func writev(fd int, iovecs []syscall.Iovec) (uintptr, error) { + return unix.Writev(fd, iovecs) +} diff --git a/src/internal/poll/fd_writev_unix.go b/src/internal/poll/fd_writev_unix.go new file mode 100644 index 0000000..aa96d10 --- /dev/null +++ b/src/internal/poll/fd_writev_unix.go @@ -0,0 +1,29 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build dragonfly || freebsd || linux || netbsd || openbsd + +package poll + +import ( + "syscall" + "unsafe" +) + +func writev(fd int, iovecs []syscall.Iovec) (uintptr, error) { + var ( + r uintptr + e syscall.Errno + ) + for { + r, _, e = syscall.Syscall(syscall.SYS_WRITEV, uintptr(fd), uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs))) + if e != syscall.EINTR { + break + } + } + if e != 0 { + return r, e + } + return r, nil +} diff --git a/src/internal/poll/hook_cloexec.go b/src/internal/poll/hook_cloexec.go new file mode 100644 index 0000000..5b3cdce --- /dev/null +++ b/src/internal/poll/hook_cloexec.go @@ -0,0 +1,12 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build dragonfly || freebsd || linux || netbsd || openbsd || solaris + +package poll + +import "syscall" + +// Accept4Func is used to hook the accept4 call. +var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4 diff --git a/src/internal/poll/hook_unix.go b/src/internal/poll/hook_unix.go new file mode 100644 index 0000000..1a50356 --- /dev/null +++ b/src/internal/poll/hook_unix.go @@ -0,0 +1,15 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build unix || (js && wasm) + +package poll + +import "syscall" + +// CloseFunc is used to hook the close call. +var CloseFunc func(int) error = syscall.Close + +// AcceptFunc is used to hook the accept call. +var AcceptFunc func(int) (int, syscall.Sockaddr, error) = syscall.Accept diff --git a/src/internal/poll/hook_windows.go b/src/internal/poll/hook_windows.go new file mode 100644 index 0000000..0bd950e --- /dev/null +++ b/src/internal/poll/hook_windows.go @@ -0,0 +1,16 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import "syscall" + +// CloseFunc is used to hook the close call. +var CloseFunc func(syscall.Handle) error = syscall.Closesocket + +// AcceptFunc is used to hook the accept call. +var AcceptFunc func(syscall.Handle, syscall.Handle, *byte, uint32, uint32, uint32, *uint32, *syscall.Overlapped) error = syscall.AcceptEx + +// ConnectExFunc is used to hook the ConnectEx call. +var ConnectExFunc func(syscall.Handle, syscall.Sockaddr, *byte, uint32, *uint32, *syscall.Overlapped) error = syscall.ConnectEx diff --git a/src/internal/poll/iovec_illumos.go b/src/internal/poll/iovec_illumos.go new file mode 100644 index 0000000..00a65d7 --- /dev/null +++ b/src/internal/poll/iovec_illumos.go @@ -0,0 +1,16 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build illumos + +package poll + +import ( + "syscall" + "unsafe" +) + +func newIovecWithBase(base *byte) syscall.Iovec { + return syscall.Iovec{Base: (*int8)(unsafe.Pointer(base))} +} diff --git a/src/internal/poll/iovec_unix.go b/src/internal/poll/iovec_unix.go new file mode 100644 index 0000000..c150084 --- /dev/null +++ b/src/internal/poll/iovec_unix.go @@ -0,0 +1,13 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd + +package poll + +import "syscall" + +func newIovecWithBase(base *byte) syscall.Iovec { + return syscall.Iovec{Base: base} +} diff --git a/src/internal/poll/read_test.go b/src/internal/poll/read_test.go new file mode 100644 index 0000000..598a52e --- /dev/null +++ b/src/internal/poll/read_test.go @@ -0,0 +1,61 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll_test + +import ( + "os" + "runtime" + "sync" + "testing" + "time" +) + +func TestRead(t *testing.T) { + t.Run("SpecialFile", func(t *testing.T) { + var wg sync.WaitGroup + for _, p := range specialFiles() { + for i := 0; i < 4; i++ { + wg.Add(1) + go func(p string) { + defer wg.Done() + for i := 0; i < 100; i++ { + if _, err := os.ReadFile(p); err != nil { + t.Error(err) + return + } + time.Sleep(time.Nanosecond) + } + }(p) + } + } + wg.Wait() + }) +} + +func specialFiles() []string { + var ps []string + switch runtime.GOOS { + case "darwin", "ios", "dragonfly", "freebsd", "netbsd", "openbsd": + ps = []string{ + "/dev/null", + } + case "linux": + ps = []string{ + "/dev/null", + "/proc/stat", + "/sys/devices/system/cpu/online", + } + } + nps := ps[:0] + for _, p := range ps { + f, err := os.Open(p) + if err != nil { + continue + } + f.Close() + nps = append(nps, p) + } + return nps +} diff --git a/src/internal/poll/sendfile_bsd.go b/src/internal/poll/sendfile_bsd.go new file mode 100644 index 0000000..89315a8 --- /dev/null +++ b/src/internal/poll/sendfile_bsd.go @@ -0,0 +1,59 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build darwin || dragonfly || freebsd + +package poll + +import "syscall" + +// maxSendfileSize is the largest chunk size we ask the kernel to copy +// at a time. +const maxSendfileSize int = 4 << 20 + +// SendFile wraps the sendfile system call. +func SendFile(dstFD *FD, src int, pos, remain int64) (int64, error) { + if err := dstFD.writeLock(); err != nil { + return 0, err + } + defer dstFD.writeUnlock() + if err := dstFD.pd.prepareWrite(dstFD.isFile); err != nil { + return 0, err + } + + dst := dstFD.Sysfd + var written int64 + var err error + for remain > 0 { + n := maxSendfileSize + if int64(n) > remain { + n = int(remain) + } + pos1 := pos + n, err1 := syscall.Sendfile(dst, src, &pos1, n) + if n > 0 { + pos += int64(n) + written += int64(n) + remain -= int64(n) + } else if n == 0 && err1 == nil { + break + } + if err1 == syscall.EINTR { + continue + } + if err1 == syscall.EAGAIN { + if err1 = dstFD.pd.waitWrite(dstFD.isFile); err1 == nil { + continue + } + } + if err1 != nil { + // This includes syscall.ENOSYS (no kernel + // support) and syscall.EINVAL (fd types which + // don't implement sendfile) + err = err1 + break + } + } + return written, err +} diff --git a/src/internal/poll/sendfile_linux.go b/src/internal/poll/sendfile_linux.go new file mode 100644 index 0000000..6e78523 --- /dev/null +++ b/src/internal/poll/sendfile_linux.go @@ -0,0 +1,55 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import "syscall" + +// maxSendfileSize is the largest chunk size we ask the kernel to copy +// at a time. +const maxSendfileSize int = 4 << 20 + +// SendFile wraps the sendfile system call. +func SendFile(dstFD *FD, src int, remain int64) (int64, error) { + if err := dstFD.writeLock(); err != nil { + return 0, err + } + defer dstFD.writeUnlock() + if err := dstFD.pd.prepareWrite(dstFD.isFile); err != nil { + return 0, err + } + + dst := dstFD.Sysfd + var written int64 + var err error + for remain > 0 { + n := maxSendfileSize + if int64(n) > remain { + n = int(remain) + } + n, err1 := syscall.Sendfile(dst, src, nil, n) + if n > 0 { + written += int64(n) + remain -= int64(n) + } else if n == 0 && err1 == nil { + break + } + if err1 == syscall.EINTR { + continue + } + if err1 == syscall.EAGAIN { + if err1 = dstFD.pd.waitWrite(dstFD.isFile); err1 == nil { + continue + } + } + if err1 != nil { + // This includes syscall.ENOSYS (no kernel + // support) and syscall.EINVAL (fd types which + // don't implement sendfile) + err = err1 + break + } + } + return written, err +} diff --git a/src/internal/poll/sendfile_solaris.go b/src/internal/poll/sendfile_solaris.go new file mode 100644 index 0000000..7ae18f4 --- /dev/null +++ b/src/internal/poll/sendfile_solaris.go @@ -0,0 +1,66 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import "syscall" + +// Not strictly needed, but very helpful for debugging, see issue #10221. +// +//go:cgo_import_dynamic _ _ "libsendfile.so" +//go:cgo_import_dynamic _ _ "libsocket.so" + +// maxSendfileSize is the largest chunk size we ask the kernel to copy +// at a time. +const maxSendfileSize int = 4 << 20 + +// SendFile wraps the sendfile system call. +func SendFile(dstFD *FD, src int, pos, remain int64) (int64, error) { + if err := dstFD.writeLock(); err != nil { + return 0, err + } + defer dstFD.writeUnlock() + if err := dstFD.pd.prepareWrite(dstFD.isFile); err != nil { + return 0, err + } + + dst := dstFD.Sysfd + var written int64 + var err error + for remain > 0 { + n := maxSendfileSize + if int64(n) > remain { + n = int(remain) + } + pos1 := pos + n, err1 := syscall.Sendfile(dst, src, &pos1, n) + if err1 == syscall.EAGAIN || err1 == syscall.EINTR { + // partial write may have occurred + n = int(pos1 - pos) + } + if n > 0 { + pos += int64(n) + written += int64(n) + remain -= int64(n) + } else if n == 0 && err1 == nil { + break + } + if err1 == syscall.EAGAIN { + if err1 = dstFD.pd.waitWrite(dstFD.isFile); err1 == nil { + continue + } + } + if err1 == syscall.EINTR { + continue + } + if err1 != nil { + // This includes syscall.ENOSYS (no kernel + // support) and syscall.EINVAL (fd types which + // don't implement sendfile) + err = err1 + break + } + } + return written, err +} diff --git a/src/internal/poll/sendfile_windows.go b/src/internal/poll/sendfile_windows.go new file mode 100644 index 0000000..50c3ee8 --- /dev/null +++ b/src/internal/poll/sendfile_windows.go @@ -0,0 +1,81 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import ( + "io" + "syscall" +) + +// SendFile wraps the TransmitFile call. +func SendFile(fd *FD, src syscall.Handle, n int64) (written int64, err error) { + if fd.kind == kindPipe { + // TransmitFile does not work with pipes + return 0, syscall.ESPIPE + } + + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + + o := &fd.wop + o.handle = src + + // TODO(brainman): skip calling syscall.Seek if OS allows it + curpos, err := syscall.Seek(o.handle, 0, io.SeekCurrent) + if err != nil { + return 0, err + } + + if n <= 0 { // We don't know the size of the file so infer it. + // Find the number of bytes offset from curpos until the end of the file. + n, err = syscall.Seek(o.handle, -curpos, io.SeekEnd) + if err != nil { + return + } + // Now seek back to the original position. + if _, err = syscall.Seek(o.handle, curpos, io.SeekStart); err != nil { + return + } + } + + // TransmitFile can be invoked in one call with at most + // 2,147,483,646 bytes: the maximum value for a 32-bit integer minus 1. + // See https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nf-mswsock-transmitfile + const maxChunkSizePerCall = int64(0x7fffffff - 1) + + for n > 0 { + chunkSize := maxChunkSizePerCall + if chunkSize > n { + chunkSize = n + } + + o.qty = uint32(chunkSize) + o.o.Offset = uint32(curpos) + o.o.OffsetHigh = uint32(curpos >> 32) + + nw, err := execIO(o, func(o *operation) error { + return syscall.TransmitFile(o.fd.Sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND) + }) + if err != nil { + return written, err + } + + curpos += int64(nw) + + // Some versions of Windows (Windows 10 1803) do not set + // file position after TransmitFile completes. + // So just use Seek to set file position. + if _, err = syscall.Seek(o.handle, curpos, io.SeekStart); err != nil { + return written, err + } + + n -= int64(nw) + written += int64(nw) + } + + return +} diff --git a/src/internal/poll/sock_cloexec.go b/src/internal/poll/sock_cloexec.go new file mode 100644 index 0000000..cb40f47 --- /dev/null +++ b/src/internal/poll/sock_cloexec.go @@ -0,0 +1,50 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This file implements accept for platforms that provide a fast path for +// setting SetNonblock and CloseOnExec. + +//go:build dragonfly || freebsd || (linux && !arm) || netbsd || openbsd || solaris + +package poll + +import "syscall" + +// Wrapper around the accept system call that marks the returned file +// descriptor as nonblocking and close-on-exec. +func accept(s int) (int, syscall.Sockaddr, string, error) { + ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC) + // On Linux the accept4 system call was introduced in 2.6.28 + // kernel and on FreeBSD it was introduced in 10 kernel. If we + // get an ENOSYS error on both Linux and FreeBSD, or EINVAL + // error on Linux, fall back to using accept. + switch err { + case nil: + return ns, sa, "", nil + default: // errors other than the ones listed + return -1, sa, "accept4", err + case syscall.ENOSYS: // syscall missing + case syscall.EINVAL: // some Linux use this instead of ENOSYS + case syscall.EACCES: // some Linux use this instead of ENOSYS + case syscall.EFAULT: // some Linux use this instead of ENOSYS + } + + // See ../syscall/exec_unix.go for description of ForkLock. + // It is probably okay to hold the lock across syscall.Accept + // because we have put fd.sysfd into non-blocking mode. + // However, a call to the File method will put it back into + // blocking mode. We can't take that risk, so no use of ForkLock here. + ns, sa, err = AcceptFunc(s) + if err == nil { + syscall.CloseOnExec(ns) + } + if err != nil { + return -1, nil, "accept", err + } + if err = syscall.SetNonblock(ns, true); err != nil { + CloseFunc(ns) + return -1, nil, "setnonblock", err + } + return ns, sa, "", nil +} diff --git a/src/internal/poll/sock_cloexec_accept.go b/src/internal/poll/sock_cloexec_accept.go new file mode 100644 index 0000000..4b86de5 --- /dev/null +++ b/src/internal/poll/sock_cloexec_accept.go @@ -0,0 +1,51 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This file implements accept for platforms that provide a fast path for +// setting SetNonblock and CloseOnExec, but don't necessarily have accept4. +// This is the code we used for accept in Go 1.17 and earlier. +// On Linux the accept4 system call was introduced in 2.6.28 kernel, +// and our minimum requirement is 2.6.32, so we simplified the function. +// Unfortunately, on ARM accept4 wasn't added until 2.6.36, so for ARM +// only we continue using the older code. + +//go:build linux && arm + +package poll + +import "syscall" + +// Wrapper around the accept system call that marks the returned file +// descriptor as nonblocking and close-on-exec. +func accept(s int) (int, syscall.Sockaddr, string, error) { + ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC) + switch err { + case nil: + return ns, sa, "", nil + default: // errors other than the ones listed + return -1, sa, "accept4", err + case syscall.ENOSYS: // syscall missing + case syscall.EINVAL: // some Linux use this instead of ENOSYS + case syscall.EACCES: // some Linux use this instead of ENOSYS + case syscall.EFAULT: // some Linux use this instead of ENOSYS + } + + // See ../syscall/exec_unix.go for description of ForkLock. + // It is probably okay to hold the lock across syscall.Accept + // because we have put fd.sysfd into non-blocking mode. + // However, a call to the File method will put it back into + // blocking mode. We can't take that risk, so no use of ForkLock here. + ns, sa, err = AcceptFunc(s) + if err == nil { + syscall.CloseOnExec(ns) + } + if err != nil { + return -1, nil, "accept", err + } + if err = syscall.SetNonblock(ns, true); err != nil { + CloseFunc(ns) + return -1, nil, "setnonblock", err + } + return ns, sa, "", nil +} diff --git a/src/internal/poll/sockopt.go b/src/internal/poll/sockopt.go new file mode 100644 index 0000000..a7c9d11 --- /dev/null +++ b/src/internal/poll/sockopt.go @@ -0,0 +1,36 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build unix || windows + +package poll + +import "syscall" + +// SetsockoptInt wraps the setsockopt network call with an int argument. +func (fd *FD) SetsockoptInt(level, name, arg int) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.SetsockoptInt(fd.Sysfd, level, name, arg) +} + +// SetsockoptInet4Addr wraps the setsockopt network call with an IPv4 address. +func (fd *FD) SetsockoptInet4Addr(level, name int, arg [4]byte) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.SetsockoptInet4Addr(fd.Sysfd, level, name, arg) +} + +// SetsockoptLinger wraps the setsockopt network call with a Linger argument. +func (fd *FD) SetsockoptLinger(level, name int, l *syscall.Linger) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.SetsockoptLinger(fd.Sysfd, level, name, l) +} diff --git a/src/internal/poll/sockopt_linux.go b/src/internal/poll/sockopt_linux.go new file mode 100644 index 0000000..bc79c35 --- /dev/null +++ b/src/internal/poll/sockopt_linux.go @@ -0,0 +1,16 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import "syscall" + +// SetsockoptIPMreqn wraps the setsockopt network call with an IPMreqn argument. +func (fd *FD) SetsockoptIPMreqn(level, name int, mreq *syscall.IPMreqn) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.SetsockoptIPMreqn(fd.Sysfd, level, name, mreq) +} diff --git a/src/internal/poll/sockopt_unix.go b/src/internal/poll/sockopt_unix.go new file mode 100644 index 0000000..9cba44d --- /dev/null +++ b/src/internal/poll/sockopt_unix.go @@ -0,0 +1,18 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build unix + +package poll + +import "syscall" + +// SetsockoptByte wraps the setsockopt network call with a byte argument. +func (fd *FD) SetsockoptByte(level, name int, arg byte) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.SetsockoptByte(fd.Sysfd, level, name, arg) +} diff --git a/src/internal/poll/sockopt_windows.go b/src/internal/poll/sockopt_windows.go new file mode 100644 index 0000000..dd5fb70 --- /dev/null +++ b/src/internal/poll/sockopt_windows.go @@ -0,0 +1,25 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import "syscall" + +// Setsockopt wraps the setsockopt network call. +func (fd *FD) Setsockopt(level, optname int32, optval *byte, optlen int32) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.Setsockopt(fd.Sysfd, level, optname, optval, optlen) +} + +// WSAIoctl wraps the WSAIoctl network call. +func (fd *FD) WSAIoctl(iocc uint32, inbuf *byte, cbif uint32, outbuf *byte, cbob uint32, cbbr *uint32, overlapped *syscall.Overlapped, completionRoutine uintptr) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.WSAIoctl(fd.Sysfd, iocc, inbuf, cbif, outbuf, cbob, cbbr, overlapped, completionRoutine) +} diff --git a/src/internal/poll/sockoptip.go b/src/internal/poll/sockoptip.go new file mode 100644 index 0000000..41955e1 --- /dev/null +++ b/src/internal/poll/sockoptip.go @@ -0,0 +1,27 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build unix || windows + +package poll + +import "syscall" + +// SetsockoptIPMreq wraps the setsockopt network call with an IPMreq argument. +func (fd *FD) SetsockoptIPMreq(level, name int, mreq *syscall.IPMreq) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.SetsockoptIPMreq(fd.Sysfd, level, name, mreq) +} + +// SetsockoptIPv6Mreq wraps the setsockopt network call with an IPv6Mreq argument. +func (fd *FD) SetsockoptIPv6Mreq(level, name int, mreq *syscall.IPv6Mreq) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + return syscall.SetsockoptIPv6Mreq(fd.Sysfd, level, name, mreq) +} diff --git a/src/internal/poll/splice_linux.go b/src/internal/poll/splice_linux.go new file mode 100644 index 0000000..43eec04 --- /dev/null +++ b/src/internal/poll/splice_linux.go @@ -0,0 +1,250 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import ( + "internal/syscall/unix" + "runtime" + "sync" + "sync/atomic" + "syscall" + "unsafe" +) + +const ( + // spliceNonblock makes calls to splice(2) non-blocking. + spliceNonblock = 0x2 + + // maxSpliceSize is the maximum amount of data Splice asks + // the kernel to move in a single call to splice(2). + maxSpliceSize = 4 << 20 +) + +// Splice transfers at most remain bytes of data from src to dst, using the +// splice system call to minimize copies of data from and to userspace. +// +// Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer. +// src and dst must both be stream-oriented sockets. +// +// If err != nil, sc is the system call which caused the error. +func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) { + p, sc, err := getPipe() + if err != nil { + return 0, false, sc, err + } + defer putPipe(p) + var inPipe, n int + for err == nil && remain > 0 { + max := maxSpliceSize + if int64(max) > remain { + max = int(remain) + } + inPipe, err = spliceDrain(p.wfd, src, max) + // The operation is considered handled if splice returns no + // error, or an error other than EINVAL. An EINVAL means the + // kernel does not support splice for the socket type of src. + // The failed syscall does not consume any data so it is safe + // to fall back to a generic copy. + // + // spliceDrain should never return EAGAIN, so if err != nil, + // Splice cannot continue. + // + // If inPipe == 0 && err == nil, src is at EOF, and the + // transfer is complete. + handled = handled || (err != syscall.EINVAL) + if err != nil || inPipe == 0 { + break + } + p.data += inPipe + + n, err = splicePump(dst, p.rfd, inPipe) + if n > 0 { + written += int64(n) + remain -= int64(n) + p.data -= n + } + } + if err != nil { + return written, handled, "splice", err + } + return written, true, "", nil +} + +// spliceDrain moves data from a socket to a pipe. +// +// Invariant: when entering spliceDrain, the pipe is empty. It is either in its +// initial state, or splicePump has emptied it previously. +// +// Given this, spliceDrain can reasonably assume that the pipe is ready for +// writing, so if splice returns EAGAIN, it must be because the socket is not +// ready for reading. +// +// If spliceDrain returns (0, nil), src is at EOF. +func spliceDrain(pipefd int, sock *FD, max int) (int, error) { + if err := sock.readLock(); err != nil { + return 0, err + } + defer sock.readUnlock() + if err := sock.pd.prepareRead(sock.isFile); err != nil { + return 0, err + } + for { + n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock) + if err == syscall.EINTR { + continue + } + if err != syscall.EAGAIN { + return n, err + } + if err := sock.pd.waitRead(sock.isFile); err != nil { + return n, err + } + } +} + +// splicePump moves all the buffered data from a pipe to a socket. +// +// Invariant: when entering splicePump, there are exactly inPipe +// bytes of data in the pipe, from a previous call to spliceDrain. +// +// By analogy to the condition from spliceDrain, splicePump +// only needs to poll the socket for readiness, if splice returns +// EAGAIN. +// +// If splicePump cannot move all the data in a single call to +// splice(2), it loops over the buffered data until it has written +// all of it to the socket. This behavior is similar to the Write +// step of an io.Copy in userspace. +func splicePump(sock *FD, pipefd int, inPipe int) (int, error) { + if err := sock.writeLock(); err != nil { + return 0, err + } + defer sock.writeUnlock() + if err := sock.pd.prepareWrite(sock.isFile); err != nil { + return 0, err + } + written := 0 + for inPipe > 0 { + n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock) + // Here, the condition n == 0 && err == nil should never be + // observed, since Splice controls the write side of the pipe. + if n > 0 { + inPipe -= n + written += n + continue + } + if err != syscall.EAGAIN { + return written, err + } + if err := sock.pd.waitWrite(sock.isFile); err != nil { + return written, err + } + } + return written, nil +} + +// splice wraps the splice system call. Since the current implementation +// only uses splice on sockets and pipes, the offset arguments are unused. +// splice returns int instead of int64, because callers never ask it to +// move more data in a single call than can fit in an int32. +func splice(out int, in int, max int, flags int) (int, error) { + n, err := syscall.Splice(in, nil, out, nil, max, flags) + return int(n), err +} + +type splicePipeFields struct { + rfd int + wfd int + data int +} + +type splicePipe struct { + splicePipeFields + + // We want to use a finalizer, so ensure that the size is + // large enough to not use the tiny allocator. + _ [24 - unsafe.Sizeof(splicePipeFields{})%24]byte +} + +// splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers. +// The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up +// a finalizer for each pipe to close its file descriptors before the actual GC. +var splicePipePool = sync.Pool{New: newPoolPipe} + +func newPoolPipe() any { + // Discard the error which occurred during the creation of pipe buffer, + // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback. + p := newPipe() + if p == nil { + return nil + } + runtime.SetFinalizer(p, destroyPipe) + return p +} + +// getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache. +// +// Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error +// and system call name splice in a string as the indication. +func getPipe() (*splicePipe, string, error) { + v := splicePipePool.Get() + if v == nil { + return nil, "splice", syscall.EINVAL + } + return v.(*splicePipe), "", nil +} + +func putPipe(p *splicePipe) { + // If there is still data left in the pipe, + // then close and discard it instead of putting it back into the pool. + if p.data != 0 { + runtime.SetFinalizer(p, nil) + destroyPipe(p) + return + } + splicePipePool.Put(p) +} + +var disableSplice unsafe.Pointer + +// newPipe sets up a pipe for a splice operation. +func newPipe() (sp *splicePipe) { + p := (*bool)(atomic.LoadPointer(&disableSplice)) + if p != nil && *p { + return nil + } + + var fds [2]int + // pipe2 was added in 2.6.27 and our minimum requirement is 2.6.23, so it + // might not be implemented. Falling back to pipe is possible, but prior to + // 2.6.29 splice returns -EAGAIN instead of 0 when the connection is + // closed. + const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK + if err := syscall.Pipe2(fds[:], flags); err != nil { + return nil + } + + sp = &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}} + + if p == nil { + p = new(bool) + defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p)) + + // F_GETPIPE_SZ was added in 2.6.35, which does not have the -EAGAIN bug. + if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 { + *p = true + destroyPipe(sp) + return nil + } + } + + return +} + +// destroyPipe destroys a pipe. +func destroyPipe(p *splicePipe) { + CloseFunc(p.rfd) + CloseFunc(p.wfd) +} diff --git a/src/internal/poll/splice_linux_test.go b/src/internal/poll/splice_linux_test.go new file mode 100644 index 0000000..29bcaab --- /dev/null +++ b/src/internal/poll/splice_linux_test.go @@ -0,0 +1,136 @@ +// Copyright 2021 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll_test + +import ( + "internal/poll" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +var closeHook atomic.Value // func(fd int) + +func init() { + closeFunc := poll.CloseFunc + poll.CloseFunc = func(fd int) (err error) { + if v := closeHook.Load(); v != nil { + if hook := v.(func(int)); hook != nil { + hook(fd) + } + } + return closeFunc(fd) + } +} + +func TestSplicePipePool(t *testing.T) { + const N = 64 + var ( + p *poll.SplicePipe + ps []*poll.SplicePipe + allFDs []int + pendingFDs sync.Map // fd → struct{}{} + err error + ) + + closeHook.Store(func(fd int) { pendingFDs.Delete(fd) }) + t.Cleanup(func() { closeHook.Store((func(int))(nil)) }) + + for i := 0; i < N; i++ { + p, _, err = poll.GetPipe() + if err != nil { + t.Skipf("failed to create pipe due to error(%v), skip this test", err) + } + _, pwfd := poll.GetPipeFds(p) + allFDs = append(allFDs, pwfd) + pendingFDs.Store(pwfd, struct{}{}) + ps = append(ps, p) + } + for _, p = range ps { + poll.PutPipe(p) + } + ps = nil + p = nil + + // Exploit the timeout of "go test" as a timer for the subsequent verification. + timeout := 5 * time.Minute + if deadline, ok := t.Deadline(); ok { + timeout = deadline.Sub(time.Now()) + timeout -= timeout / 10 // Leave 10% headroom for cleanup. + } + expiredTime := time.NewTimer(timeout) + defer expiredTime.Stop() + + // Trigger garbage collection repeatedly, waiting for all pipes in sync.Pool + // to either be deallocated and closed, or to time out. + for { + runtime.GC() + time.Sleep(10 * time.Millisecond) + + // Detect whether all pipes are closed properly. + var leakedFDs []int + pendingFDs.Range(func(k, v any) bool { + leakedFDs = append(leakedFDs, k.(int)) + return true + }) + if len(leakedFDs) == 0 { + break + } + + select { + case <-expiredTime.C: + t.Logf("all descriptors: %v", allFDs) + t.Fatalf("leaked descriptors: %v", leakedFDs) + default: + } + } +} + +func BenchmarkSplicePipe(b *testing.B) { + b.Run("SplicePipeWithPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + p, _, err := poll.GetPipe() + if err != nil { + continue + } + poll.PutPipe(p) + } + }) + b.Run("SplicePipeWithoutPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + p := poll.NewPipe() + if p == nil { + b.Skip("newPipe returned nil") + } + poll.DestroyPipe(p) + } + }) +} + +func BenchmarkSplicePipePoolParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p, _, err := poll.GetPipe() + if err != nil { + continue + } + poll.PutPipe(p) + } + }) +} + +func BenchmarkSplicePipeNativeParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p := poll.NewPipe() + if p == nil { + b.Skip("newPipe returned nil") + } + poll.DestroyPipe(p) + } + }) +} diff --git a/src/internal/poll/strconv.go b/src/internal/poll/strconv.go new file mode 100644 index 0000000..2b052fa --- /dev/null +++ b/src/internal/poll/strconv.go @@ -0,0 +1,13 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build plan9 + +package poll + +// stringsHasSuffix is strings.HasSuffix. It reports whether s ends in +// suffix. +func stringsHasSuffix(s, suffix string) bool { + return len(s) >= len(suffix) && s[len(s)-len(suffix):] == suffix +} diff --git a/src/internal/poll/sys_cloexec.go b/src/internal/poll/sys_cloexec.go new file mode 100644 index 0000000..7cd8001 --- /dev/null +++ b/src/internal/poll/sys_cloexec.go @@ -0,0 +1,36 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This file implements accept for platforms that do not provide a fast path for +// setting SetNonblock and CloseOnExec. + +//go:build aix || darwin || (js && wasm) + +package poll + +import ( + "syscall" +) + +// Wrapper around the accept system call that marks the returned file +// descriptor as nonblocking and close-on-exec. +func accept(s int) (int, syscall.Sockaddr, string, error) { + // See ../syscall/exec_unix.go for description of ForkLock. + // It is probably okay to hold the lock across syscall.Accept + // because we have put fd.sysfd into non-blocking mode. + // However, a call to the File method will put it back into + // blocking mode. We can't take that risk, so no use of ForkLock here. + ns, sa, err := AcceptFunc(s) + if err == nil { + syscall.CloseOnExec(ns) + } + if err != nil { + return -1, nil, "accept", err + } + if err = syscall.SetNonblock(ns, true); err != nil { + CloseFunc(ns) + return -1, nil, "setnonblock", err + } + return ns, sa, "", nil +} diff --git a/src/internal/poll/writev.go b/src/internal/poll/writev.go new file mode 100644 index 0000000..cd600b6 --- /dev/null +++ b/src/internal/poll/writev.go @@ -0,0 +1,87 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build darwin || dragonfly || freebsd || illumos || linux || netbsd || openbsd + +package poll + +import ( + "io" + "syscall" +) + +// Writev wraps the writev system call. +func (fd *FD) Writev(v *[][]byte) (int64, error) { + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + if err := fd.pd.prepareWrite(fd.isFile); err != nil { + return 0, err + } + + var iovecs []syscall.Iovec + if fd.iovecs != nil { + iovecs = *fd.iovecs + } + // TODO: read from sysconf(_SC_IOV_MAX)? The Linux default is + // 1024 and this seems conservative enough for now. Darwin's + // UIO_MAXIOV also seems to be 1024. + maxVec := 1024 + + var n int64 + var err error + for len(*v) > 0 { + iovecs = iovecs[:0] + for _, chunk := range *v { + if len(chunk) == 0 { + continue + } + iovecs = append(iovecs, newIovecWithBase(&chunk[0])) + if fd.IsStream && len(chunk) > 1<<30 { + iovecs[len(iovecs)-1].SetLen(1 << 30) + break // continue chunk on next writev + } + iovecs[len(iovecs)-1].SetLen(len(chunk)) + if len(iovecs) == maxVec { + break + } + } + if len(iovecs) == 0 { + break + } + if fd.iovecs == nil { + fd.iovecs = new([]syscall.Iovec) + } + *fd.iovecs = iovecs // cache + + var wrote uintptr + wrote, err = writev(fd.Sysfd, iovecs) + if wrote == ^uintptr(0) { + wrote = 0 + } + TestHookDidWritev(int(wrote)) + n += int64(wrote) + consume(v, int64(wrote)) + for i := range iovecs { + iovecs[i] = syscall.Iovec{} + } + if err != nil { + if err == syscall.EINTR { + continue + } + if err == syscall.EAGAIN { + if err = fd.pd.waitWrite(fd.isFile); err == nil { + continue + } + } + break + } + if n == 0 { + err = io.ErrUnexpectedEOF + break + } + } + return n, err +} diff --git a/src/internal/poll/writev_test.go b/src/internal/poll/writev_test.go new file mode 100644 index 0000000..b46657c --- /dev/null +++ b/src/internal/poll/writev_test.go @@ -0,0 +1,62 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll_test + +import ( + "internal/poll" + "reflect" + "testing" +) + +func TestConsume(t *testing.T) { + tests := []struct { + in [][]byte + consume int64 + want [][]byte + }{ + { + in: [][]byte{[]byte("foo"), []byte("bar")}, + consume: 0, + want: [][]byte{[]byte("foo"), []byte("bar")}, + }, + { + in: [][]byte{[]byte("foo"), []byte("bar")}, + consume: 2, + want: [][]byte{[]byte("o"), []byte("bar")}, + }, + { + in: [][]byte{[]byte("foo"), []byte("bar")}, + consume: 3, + want: [][]byte{[]byte("bar")}, + }, + { + in: [][]byte{[]byte("foo"), []byte("bar")}, + consume: 4, + want: [][]byte{[]byte("ar")}, + }, + { + in: [][]byte{nil, nil, nil, []byte("bar")}, + consume: 1, + want: [][]byte{[]byte("ar")}, + }, + { + in: [][]byte{nil, nil, nil, []byte("foo")}, + consume: 0, + want: [][]byte{[]byte("foo")}, + }, + { + in: [][]byte{nil, nil, nil}, + consume: 0, + want: [][]byte{}, + }, + } + for i, tt := range tests { + in := tt.in + poll.Consume(&in, tt.consume) + if !reflect.DeepEqual(in, tt.want) { + t.Errorf("%d. after consume(%d) = %+v, want %+v", i, tt.consume, in, tt.want) + } + } +} |