1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
|
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime
import (
"runtime/internal/atomic"
"unsafe"
)
// Solaris runtime-integrated network poller.
//
// Solaris uses event ports for scalable network I/O. Event
// ports are level-triggered, unlike epoll and kqueue which
// can be configured in both level-triggered and edge-triggered
// mode. Level triggering means we have to keep track of a few things
// ourselves. After we receive an event for a file descriptor,
// it's our responsibility to ask again to be notified for future
// events for that descriptor. When doing this we must keep track of
// what kind of events the goroutines are currently interested in,
// for example a fd may be open both for reading and writing.
//
// A description of the high level operation of this code
// follows. Networking code will get a file descriptor by some means
// and will register it with the netpolling mechanism by a code path
// that eventually calls runtime·netpollopen. runtime·netpollopen
// calls port_associate with an empty event set. That means that we
// will not receive any events at this point. The association needs
// to be done at this early point because we need to process the I/O
// readiness notification at some point in the future. If I/O becomes
// ready when nobody is listening, when we finally care about it,
// nobody will tell us anymore.
//
// Beside calling runtime·netpollopen, the networking code paths
// will call runtime·netpollarm each time goroutines are interested
// in doing network I/O. Because now we know what kind of I/O we
// are interested in (reading/writing), we can call port_associate
// passing the correct type of event set (POLLIN/POLLOUT). As we made
// sure to have already associated the file descriptor with the port,
// when we now call port_associate, we will unblock the main poller
// loop (in runtime·netpoll) right away if the socket is actually
// ready for I/O.
//
// The main poller loop runs in its own thread waiting for events
// using port_getn. When an event happens, it will tell the scheduler
// about it using runtime·netpollready. Besides doing this, it must
// also re-associate the events that were not part of this current
// notification with the file descriptor. Failing to do this would
// mean each notification will prevent concurrent code using the
// same file descriptor in parallel.
//
// The logic dealing with re-associations is encapsulated in
// runtime·netpollupdate. This function takes care to associate the
// descriptor only with the subset of events that were previously
// part of the association, except the one that just happened. We
// can't re-associate with that right away, because event ports
// are level triggered so it would cause a busy loop. Instead, that
// association is effected only by the runtime·netpollarm code path,
// when Go code actually asks for I/O.
//
// The open and arming mechanisms are serialized using the lock
// inside PollDesc. This is required because the netpoll loop runs
// asynchronously in respect to other Go code and by the time we get
// to call port_associate to update the association in the loop, the
// file descriptor might have been closed and reopened already. The
// lock allows runtime·netpollupdate to be called synchronously from
// the loop thread while preventing other threads operating to the
// same PollDesc, so once we unblock in the main loop, until we loop
// again we know for sure we are always talking about the same file
// descriptor and can safely access the data we want (the event set).
//go:cgo_import_dynamic libc_port_create port_create "libc.so"
//go:cgo_import_dynamic libc_port_associate port_associate "libc.so"
//go:cgo_import_dynamic libc_port_dissociate port_dissociate "libc.so"
//go:cgo_import_dynamic libc_port_getn port_getn "libc.so"
//go:cgo_import_dynamic libc_port_alert port_alert "libc.so"
//go:linkname libc_port_create libc_port_create
//go:linkname libc_port_associate libc_port_associate
//go:linkname libc_port_dissociate libc_port_dissociate
//go:linkname libc_port_getn libc_port_getn
//go:linkname libc_port_alert libc_port_alert
var (
libc_port_create,
libc_port_associate,
libc_port_dissociate,
libc_port_getn,
libc_port_alert libcFunc
netpollWakeSig uint32 // used to avoid duplicate calls of netpollBreak
)
func errno() int32 {
return *getg().m.perrno
}
func fcntl(fd, cmd, arg int32) int32 {
return int32(sysvicall3(&libc_fcntl, uintptr(fd), uintptr(cmd), uintptr(arg)))
}
func port_create() int32 {
return int32(sysvicall0(&libc_port_create))
}
func port_associate(port, source int32, object uintptr, events uint32, user uintptr) int32 {
return int32(sysvicall5(&libc_port_associate, uintptr(port), uintptr(source), object, uintptr(events), user))
}
func port_dissociate(port, source int32, object uintptr) int32 {
return int32(sysvicall3(&libc_port_dissociate, uintptr(port), uintptr(source), object))
}
func port_getn(port int32, evs *portevent, max uint32, nget *uint32, timeout *timespec) int32 {
return int32(sysvicall5(&libc_port_getn, uintptr(port), uintptr(unsafe.Pointer(evs)), uintptr(max), uintptr(unsafe.Pointer(nget)), uintptr(unsafe.Pointer(timeout))))
}
func port_alert(port int32, flags, events uint32, user uintptr) int32 {
return int32(sysvicall4(&libc_port_alert, uintptr(port), uintptr(flags), uintptr(events), user))
}
var portfd int32 = -1
func netpollinit() {
portfd = port_create()
if portfd >= 0 {
fcntl(portfd, _F_SETFD, _FD_CLOEXEC)
return
}
print("runtime: port_create failed (errno=", errno(), ")\n")
throw("runtime: netpollinit failed")
}
func netpollIsPollDescriptor(fd uintptr) bool {
return fd == uintptr(portfd)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
lock(&pd.lock)
// We don't register for any specific type of events yet, that's
// netpollarm's job. We merely ensure we call port_associate before
// asynchronous connect/accept completes, so when we actually want
// to do any I/O, the call to port_associate (from netpollarm,
// with the interested event set) will unblock port_getn right away
// because of the I/O readiness notification.
pd.user = 0
r := port_associate(portfd, _PORT_SOURCE_FD, fd, 0, uintptr(unsafe.Pointer(pd)))
unlock(&pd.lock)
return r
}
func netpollclose(fd uintptr) int32 {
return port_dissociate(portfd, _PORT_SOURCE_FD, fd)
}
// Updates the association with a new set of interested events. After
// this call, port_getn will return one and only one event for that
// particular descriptor, so this function needs to be called again.
func netpollupdate(pd *pollDesc, set, clear uint32) {
if pd.info().closing() {
return
}
old := pd.user
events := (old & ^clear) | set
if old == events {
return
}
if events != 0 && port_associate(portfd, _PORT_SOURCE_FD, pd.fd, events, uintptr(unsafe.Pointer(pd))) != 0 {
print("runtime: port_associate failed (errno=", errno(), ")\n")
throw("runtime: netpollupdate failed")
}
pd.user = events
}
// subscribe the fd to the port such that port_getn will return one event.
func netpollarm(pd *pollDesc, mode int) {
lock(&pd.lock)
switch mode {
case 'r':
netpollupdate(pd, _POLLIN, 0)
case 'w':
netpollupdate(pd, _POLLOUT, 0)
default:
throw("runtime: bad mode")
}
unlock(&pd.lock)
}
// netpollBreak interrupts a port_getn wait.
func netpollBreak() {
if atomic.Cas(&netpollWakeSig, 0, 1) {
// Use port_alert to put portfd into alert mode.
// This will wake up all threads sleeping in port_getn on portfd,
// and cause their calls to port_getn to return immediately.
// Further, until portfd is taken out of alert mode,
// all calls to port_getn will return immediately.
if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 {
if e := errno(); e != _EBUSY {
println("runtime: port_alert failed with", e)
throw("runtime: netpoll: port_alert failed")
}
}
}
}
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
if portfd == -1 {
return gList{}
}
var wait *timespec
var ts timespec
if delay < 0 {
wait = nil
} else if delay == 0 {
wait = &ts
} else {
ts.setNsec(delay)
if ts.tv_sec > 1e6 {
// An arbitrary cap on how long to wait for a timer.
// 1e6 s == ~11.5 days.
ts.tv_sec = 1e6
}
wait = &ts
}
var events [128]portevent
retry:
var n uint32 = 1
r := port_getn(portfd, &events[0], uint32(len(events)), &n, wait)
e := errno()
if r < 0 && e == _ETIME && n > 0 {
// As per port_getn(3C), an ETIME failure does not preclude the
// delivery of some number of events. Treat a timeout failure
// with delivered events as a success.
r = 0
}
if r < 0 {
if e != _EINTR && e != _ETIME {
print("runtime: port_getn on fd ", portfd, " failed (errno=", e, ")\n")
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted and there are no events,
// just return to recalculate how long we should sleep now.
if delay > 0 {
return gList{}
}
goto retry
}
var toRun gList
for i := 0; i < int(n); i++ {
ev := &events[i]
if ev.portev_source == _PORT_SOURCE_ALERT {
if ev.portev_events != _POLLHUP || unsafe.Pointer(ev.portev_user) != unsafe.Pointer(&portfd) {
throw("runtime: netpoll: bad port_alert wakeup")
}
if delay != 0 {
// Now that a blocking call to netpoll
// has seen the alert, take portfd
// back out of alert mode.
// See the comment in netpollBreak.
if port_alert(portfd, 0, 0, 0) < 0 {
e := errno()
println("runtime: port_alert failed with", e)
throw("runtime: netpoll: port_alert failed")
}
atomic.Store(&netpollWakeSig, 0)
}
continue
}
if ev.portev_events == 0 {
continue
}
pd := (*pollDesc)(unsafe.Pointer(ev.portev_user))
var mode, clear int32
if (ev.portev_events & (_POLLIN | _POLLHUP | _POLLERR)) != 0 {
mode += 'r'
clear |= _POLLIN
}
if (ev.portev_events & (_POLLOUT | _POLLHUP | _POLLERR)) != 0 {
mode += 'w'
clear |= _POLLOUT
}
// To effect edge-triggered events, we need to be sure to
// update our association with whatever events were not
// set with the event. For example if we are registered
// for POLLIN|POLLOUT, and we get POLLIN, besides waking
// the goroutine interested in POLLIN we have to not forget
// about the one interested in POLLOUT.
if clear != 0 {
lock(&pd.lock)
netpollupdate(pd, 0, uint32(clear))
unlock(&pd.lock)
}
if mode != 0 {
// TODO(mikio): Consider implementing event
// scanning error reporting once we are sure
// about the event port on SmartOS.
//
// See golang.org/x/issue/30840.
netpollready(&toRun, pd, mode)
}
}
return toRun
}
|