summaryrefslogtreecommitdiffstats
path: root/src/runtime/netpoll_kqueue.go
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-16 19:19:13 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-16 19:19:13 +0000
commitccd992355df7192993c666236047820244914598 (patch)
treef00fea65147227b7743083c6148396f74cd66935 /src/runtime/netpoll_kqueue.go
parentInitial commit. (diff)
downloadgolang-1.21-ccd992355df7192993c666236047820244914598.tar.xz
golang-1.21-ccd992355df7192993c666236047820244914598.zip
Adding upstream version 1.21.8.upstream/1.21.8
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/runtime/netpoll_kqueue.go')
-rw-r--r--src/runtime/netpoll_kqueue.go215
1 files changed, 215 insertions, 0 deletions
diff --git a/src/runtime/netpoll_kqueue.go b/src/runtime/netpoll_kqueue.go
new file mode 100644
index 0000000..3af45e6
--- /dev/null
+++ b/src/runtime/netpoll_kqueue.go
@@ -0,0 +1,215 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build darwin || dragonfly || freebsd || netbsd || openbsd
+
+package runtime
+
+// Integrated network poller (kqueue-based implementation).
+
+import (
+ "internal/goarch"
+ "runtime/internal/atomic"
+ "unsafe"
+)
+
+var (
+ kq int32 = -1
+
+ netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
+
+ netpollWakeSig atomic.Uint32 // used to avoid duplicate calls of netpollBreak
+)
+
+func netpollinit() {
+ kq = kqueue()
+ if kq < 0 {
+ println("runtime: kqueue failed with", -kq)
+ throw("runtime: netpollinit failed")
+ }
+ closeonexec(kq)
+ r, w, errno := nonblockingPipe()
+ if errno != 0 {
+ println("runtime: pipe failed with", -errno)
+ throw("runtime: pipe failed")
+ }
+ ev := keventt{
+ filter: _EVFILT_READ,
+ flags: _EV_ADD,
+ }
+ *(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
+ n := kevent(kq, &ev, 1, nil, 0, nil)
+ if n < 0 {
+ println("runtime: kevent failed with", -n)
+ throw("runtime: kevent failed")
+ }
+ netpollBreakRd = uintptr(r)
+ netpollBreakWr = uintptr(w)
+}
+
+func netpollIsPollDescriptor(fd uintptr) bool {
+ return fd == uintptr(kq) || fd == netpollBreakRd || fd == netpollBreakWr
+}
+
+func netpollopen(fd uintptr, pd *pollDesc) int32 {
+ // Arm both EVFILT_READ and EVFILT_WRITE in edge-triggered mode (EV_CLEAR)
+ // for the whole fd lifetime. The notifications are automatically unregistered
+ // when fd is closed.
+ var ev [2]keventt
+ *(*uintptr)(unsafe.Pointer(&ev[0].ident)) = fd
+ ev[0].filter = _EVFILT_READ
+ ev[0].flags = _EV_ADD | _EV_CLEAR
+ ev[0].fflags = 0
+ ev[0].data = 0
+
+ if goarch.PtrSize == 4 {
+ // We only have a pointer-sized field to store into,
+ // so on a 32-bit system we get no sequence protection.
+ // TODO(iant): If we notice any problems we could at least
+ // steal the low-order 2 bits for a tiny sequence number.
+ ev[0].udata = (*byte)(unsafe.Pointer(pd))
+ } else {
+ tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
+ ev[0].udata = (*byte)(unsafe.Pointer(uintptr(tp)))
+ }
+ ev[1] = ev[0]
+ ev[1].filter = _EVFILT_WRITE
+ n := kevent(kq, &ev[0], 2, nil, 0, nil)
+ if n < 0 {
+ return -n
+ }
+ return 0
+}
+
+func netpollclose(fd uintptr) int32 {
+ // Don't need to unregister because calling close()
+ // on fd will remove any kevents that reference the descriptor.
+ return 0
+}
+
+func netpollarm(pd *pollDesc, mode int) {
+ throw("runtime: unused")
+}
+
+// netpollBreak interrupts a kevent.
+func netpollBreak() {
+ // Failing to cas indicates there is an in-flight wakeup, so we're done here.
+ if !netpollWakeSig.CompareAndSwap(0, 1) {
+ return
+ }
+
+ for {
+ var b byte
+ n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
+ if n == 1 || n == -_EAGAIN {
+ break
+ }
+ if n == -_EINTR {
+ continue
+ }
+ println("runtime: netpollBreak write failed with", -n)
+ throw("runtime: netpollBreak write failed")
+ }
+}
+
+// netpoll checks for ready network connections.
+// Returns list of goroutines that become runnable.
+// delay < 0: blocks indefinitely
+// delay == 0: does not block, just polls
+// delay > 0: block for up to that many nanoseconds
+func netpoll(delay int64) gList {
+ if kq == -1 {
+ return gList{}
+ }
+ var tp *timespec
+ var ts timespec
+ if delay < 0 {
+ tp = nil
+ } else if delay == 0 {
+ tp = &ts
+ } else {
+ ts.setNsec(delay)
+ if ts.tv_sec > 1e6 {
+ // Darwin returns EINVAL if the sleep time is too long.
+ ts.tv_sec = 1e6
+ }
+ tp = &ts
+ }
+ var events [64]keventt
+retry:
+ n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
+ if n < 0 {
+ if n != -_EINTR {
+ println("runtime: kevent on fd", kq, "failed with", -n)
+ throw("runtime: netpoll failed")
+ }
+ // If a timed sleep was interrupted, just return to
+ // recalculate how long we should sleep now.
+ if delay > 0 {
+ return gList{}
+ }
+ goto retry
+ }
+ var toRun gList
+ for i := 0; i < int(n); i++ {
+ ev := &events[i]
+
+ if uintptr(ev.ident) == netpollBreakRd {
+ if ev.filter != _EVFILT_READ {
+ println("runtime: netpoll: break fd ready for", ev.filter)
+ throw("runtime: netpoll: break fd ready for something unexpected")
+ }
+ if delay != 0 {
+ // netpollBreak could be picked up by a
+ // nonblocking poll. Only read the byte
+ // if blocking.
+ var tmp [16]byte
+ read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
+ netpollWakeSig.Store(0)
+ }
+ continue
+ }
+
+ var mode int32
+ switch ev.filter {
+ case _EVFILT_READ:
+ mode += 'r'
+
+ // On some systems when the read end of a pipe
+ // is closed the write end will not get a
+ // _EVFILT_WRITE event, but will get a
+ // _EVFILT_READ event with EV_EOF set.
+ // Note that setting 'w' here just means that we
+ // will wake up a goroutine waiting to write;
+ // that goroutine will try the write again,
+ // and the appropriate thing will happen based
+ // on what that write returns (success, EPIPE, EAGAIN).
+ if ev.flags&_EV_EOF != 0 {
+ mode += 'w'
+ }
+ case _EVFILT_WRITE:
+ mode += 'w'
+ }
+ if mode != 0 {
+ var pd *pollDesc
+ var tag uintptr
+ if goarch.PtrSize == 4 {
+ // No sequence protection on 32-bit systems.
+ // See netpollopen for details.
+ pd = (*pollDesc)(unsafe.Pointer(ev.udata))
+ tag = 0
+ } else {
+ tp := taggedPointer(uintptr(unsafe.Pointer(ev.udata)))
+ pd = (*pollDesc)(tp.pointer())
+ tag = tp.tag()
+ if pd.fdseq.Load() != tag {
+ continue
+ }
+ }
+ pd.setEventErr(ev.flags == _EV_ERROR, tag)
+ netpollready(&toRun, pd, mode)
+ }
+ }
+ return toRun
+}