diff options
Diffstat (limited to 'src/runtime/select.go')
-rw-r--r-- | src/runtime/select.go | 632 |
1 files changed, 632 insertions, 0 deletions
diff --git a/src/runtime/select.go b/src/runtime/select.go new file mode 100644 index 0000000..b3a3085 --- /dev/null +++ b/src/runtime/select.go @@ -0,0 +1,632 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package runtime + +// This file contains the implementation of Go select statements. + +import ( + "internal/abi" + "unsafe" +) + +const debugSelect = false + +// Select case descriptor. +// Known to compiler. +// Changes here must also be made in src/cmd/compile/internal/walk/select.go's scasetype. +type scase struct { + c *hchan // chan + elem unsafe.Pointer // data element +} + +var ( + chansendpc = abi.FuncPCABIInternal(chansend) + chanrecvpc = abi.FuncPCABIInternal(chanrecv) +) + +func selectsetpc(pc *uintptr) { + *pc = getcallerpc() +} + +func sellock(scases []scase, lockorder []uint16) { + var c *hchan + for _, o := range lockorder { + c0 := scases[o].c + if c0 != c { + c = c0 + lock(&c.lock) + } + } +} + +func selunlock(scases []scase, lockorder []uint16) { + // We must be very careful here to not touch sel after we have unlocked + // the last lock, because sel can be freed right after the last unlock. + // Consider the following situation. + // First M calls runtime·park() in runtime·selectgo() passing the sel. + // Once runtime·park() has unlocked the last lock, another M makes + // the G that calls select runnable again and schedules it for execution. + // When the G runs on another M, it locks all the locks and frees sel. + // Now if the first M touches sel, it will access freed memory. + for i := len(lockorder) - 1; i >= 0; i-- { + c := scases[lockorder[i]].c + if i > 0 && c == scases[lockorder[i-1]].c { + continue // will unlock it on the next iteration + } + unlock(&c.lock) + } +} + +func selparkcommit(gp *g, _ unsafe.Pointer) bool { + // There are unlocked sudogs that point into gp's stack. Stack + // copying must lock the channels of those sudogs. + // Set activeStackChans here instead of before we try parking + // because we could self-deadlock in stack growth on a + // channel lock. + gp.activeStackChans = true + // Mark that it's safe for stack shrinking to occur now, + // because any thread acquiring this G's stack for shrinking + // is guaranteed to observe activeStackChans after this store. + gp.parkingOnChan.Store(false) + // Make sure we unlock after setting activeStackChans and + // unsetting parkingOnChan. The moment we unlock any of the + // channel locks we risk gp getting readied by a channel operation + // and so gp could continue running before everything before the + // unlock is visible (even to gp itself). + + // This must not access gp's stack (see gopark). In + // particular, it must not access the *hselect. That's okay, + // because by the time this is called, gp.waiting has all + // channels in lock order. + var lastc *hchan + for sg := gp.waiting; sg != nil; sg = sg.waitlink { + if sg.c != lastc && lastc != nil { + // As soon as we unlock the channel, fields in + // any sudog with that channel may change, + // including c and waitlink. Since multiple + // sudogs may have the same channel, we unlock + // only after we've passed the last instance + // of a channel. + unlock(&lastc.lock) + } + lastc = sg.c + } + if lastc != nil { + unlock(&lastc.lock) + } + return true +} + +func block() { + gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever +} + +// selectgo implements the select statement. +// +// cas0 points to an array of type [ncases]scase, and order0 points to +// an array of type [2*ncases]uint16 where ncases must be <= 65536. +// Both reside on the goroutine's stack (regardless of any escaping in +// selectgo). +// +// For race detector builds, pc0 points to an array of type +// [ncases]uintptr (also on the stack); for other builds, it's set to +// nil. +// +// selectgo returns the index of the chosen scase, which matches the +// ordinal position of its respective select{recv,send,default} call. +// Also, if the chosen scase was a receive operation, it reports whether +// a value was received. +func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { + if debugSelect { + print("select: cas0=", cas0, "\n") + } + + // NOTE: In order to maintain a lean stack size, the number of scases + // is capped at 65536. + cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0)) + order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) + + ncases := nsends + nrecvs + scases := cas1[:ncases:ncases] + pollorder := order1[:ncases:ncases] + lockorder := order1[ncases:][:ncases:ncases] + // NOTE: pollorder/lockorder's underlying array was not zero-initialized by compiler. + + // Even when raceenabled is true, there might be select + // statements in packages compiled without -race (e.g., + // ensureSigM in runtime/signal_unix.go). + var pcs []uintptr + if raceenabled && pc0 != nil { + pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0)) + pcs = pc1[:ncases:ncases] + } + casePC := func(casi int) uintptr { + if pcs == nil { + return 0 + } + return pcs[casi] + } + + var t0 int64 + if blockprofilerate > 0 { + t0 = cputicks() + } + + // The compiler rewrites selects that statically have + // only 0 or 1 cases plus default into simpler constructs. + // The only way we can end up with such small sel.ncase + // values here is for a larger select in which most channels + // have been nilled out. The general code handles those + // cases correctly, and they are rare enough not to bother + // optimizing (and needing to test). + + // generate permuted order + norder := 0 + for i := range scases { + cas := &scases[i] + + // Omit cases without channels from the poll and lock orders. + if cas.c == nil { + cas.elem = nil // allow GC + continue + } + + j := cheaprandn(uint32(norder + 1)) + pollorder[norder] = pollorder[j] + pollorder[j] = uint16(i) + norder++ + } + pollorder = pollorder[:norder] + lockorder = lockorder[:norder] + + // sort the cases by Hchan address to get the locking order. + // simple heap sort, to guarantee n log n time and constant stack footprint. + for i := range lockorder { + j := i + // Start with the pollorder to permute cases on the same channel. + c := scases[pollorder[i]].c + for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() { + k := (j - 1) / 2 + lockorder[j] = lockorder[k] + j = k + } + lockorder[j] = pollorder[i] + } + for i := len(lockorder) - 1; i >= 0; i-- { + o := lockorder[i] + c := scases[o].c + lockorder[i] = lockorder[0] + j := 0 + for { + k := j*2 + 1 + if k >= i { + break + } + if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() { + k++ + } + if c.sortkey() < scases[lockorder[k]].c.sortkey() { + lockorder[j] = lockorder[k] + j = k + continue + } + break + } + lockorder[j] = o + } + + if debugSelect { + for i := 0; i+1 < len(lockorder); i++ { + if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() { + print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n") + throw("select: broken sort") + } + } + } + + // lock all the channels involved in the select + sellock(scases, lockorder) + + var ( + gp *g + sg *sudog + c *hchan + k *scase + sglist *sudog + sgnext *sudog + qp unsafe.Pointer + nextp **sudog + ) + + // pass 1 - look for something already waiting + var casi int + var cas *scase + var caseSuccess bool + var caseReleaseTime int64 = -1 + var recvOK bool + for _, casei := range pollorder { + casi = int(casei) + cas = &scases[casi] + c = cas.c + + if casi >= nsends { + sg = c.sendq.dequeue() + if sg != nil { + goto recv + } + if c.qcount > 0 { + goto bufrecv + } + if c.closed != 0 { + goto rclose + } + } else { + if raceenabled { + racereadpc(c.raceaddr(), casePC(casi), chansendpc) + } + if c.closed != 0 { + goto sclose + } + sg = c.recvq.dequeue() + if sg != nil { + goto send + } + if c.qcount < c.dataqsiz { + goto bufsend + } + } + } + + if !block { + selunlock(scases, lockorder) + casi = -1 + goto retc + } + + // pass 2 - enqueue on all chans + gp = getg() + if gp.waiting != nil { + throw("gp.waiting != nil") + } + nextp = &gp.waiting + for _, casei := range lockorder { + casi = int(casei) + cas = &scases[casi] + c = cas.c + sg := acquireSudog() + sg.g = gp + sg.isSelect = true + // No stack splits between assigning elem and enqueuing + // sg on gp.waiting where copystack can find it. + sg.elem = cas.elem + sg.releasetime = 0 + if t0 != 0 { + sg.releasetime = -1 + } + sg.c = c + // Construct waiting list in lock order. + *nextp = sg + nextp = &sg.waitlink + + if casi < nsends { + c.sendq.enqueue(sg) + } else { + c.recvq.enqueue(sg) + } + } + + // wait for someone to wake us up + gp.param = nil + // Signal to anyone trying to shrink our stack that we're about + // to park on a channel. The window between when this G's status + // changes and when we set gp.activeStackChans is not safe for + // stack shrinking. + gp.parkingOnChan.Store(true) + gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1) + gp.activeStackChans = false + + sellock(scases, lockorder) + + gp.selectDone.Store(0) + sg = (*sudog)(gp.param) + gp.param = nil + + // pass 3 - dequeue from unsuccessful chans + // otherwise they stack up on quiet channels + // record the successful case, if any. + // We singly-linked up the SudoGs in lock order. + casi = -1 + cas = nil + caseSuccess = false + sglist = gp.waiting + // Clear all elem before unlinking from gp.waiting. + for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { + sg1.isSelect = false + sg1.elem = nil + sg1.c = nil + } + gp.waiting = nil + + for _, casei := range lockorder { + k = &scases[casei] + if sg == sglist { + // sg has already been dequeued by the G that woke us up. + casi = int(casei) + cas = k + caseSuccess = sglist.success + if sglist.releasetime > 0 { + caseReleaseTime = sglist.releasetime + } + } else { + c = k.c + if int(casei) < nsends { + c.sendq.dequeueSudoG(sglist) + } else { + c.recvq.dequeueSudoG(sglist) + } + } + sgnext = sglist.waitlink + sglist.waitlink = nil + releaseSudog(sglist) + sglist = sgnext + } + + if cas == nil { + throw("selectgo: bad wakeup") + } + + c = cas.c + + if debugSelect { + print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n") + } + + if casi < nsends { + if !caseSuccess { + goto sclose + } + } else { + recvOK = caseSuccess + } + + if raceenabled { + if casi < nsends { + raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) + } else if cas.elem != nil { + raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc) + } + } + if msanenabled { + if casi < nsends { + msanread(cas.elem, c.elemtype.Size_) + } else if cas.elem != nil { + msanwrite(cas.elem, c.elemtype.Size_) + } + } + if asanenabled { + if casi < nsends { + asanread(cas.elem, c.elemtype.Size_) + } else if cas.elem != nil { + asanwrite(cas.elem, c.elemtype.Size_) + } + } + + selunlock(scases, lockorder) + goto retc + +bufrecv: + // can receive from buffer + if raceenabled { + if cas.elem != nil { + raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc) + } + racenotify(c, c.recvx, nil) + } + if msanenabled && cas.elem != nil { + msanwrite(cas.elem, c.elemtype.Size_) + } + if asanenabled && cas.elem != nil { + asanwrite(cas.elem, c.elemtype.Size_) + } + recvOK = true + qp = chanbuf(c, c.recvx) + if cas.elem != nil { + typedmemmove(c.elemtype, cas.elem, qp) + } + typedmemclr(c.elemtype, qp) + c.recvx++ + if c.recvx == c.dataqsiz { + c.recvx = 0 + } + c.qcount-- + selunlock(scases, lockorder) + goto retc + +bufsend: + // can send to buffer + if raceenabled { + racenotify(c, c.sendx, nil) + raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) + } + if msanenabled { + msanread(cas.elem, c.elemtype.Size_) + } + if asanenabled { + asanread(cas.elem, c.elemtype.Size_) + } + typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) + c.sendx++ + if c.sendx == c.dataqsiz { + c.sendx = 0 + } + c.qcount++ + selunlock(scases, lockorder) + goto retc + +recv: + // can receive from sleeping sender (sg) + recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) + if debugSelect { + print("syncrecv: cas0=", cas0, " c=", c, "\n") + } + recvOK = true + goto retc + +rclose: + // read at end of closed channel + selunlock(scases, lockorder) + recvOK = false + if cas.elem != nil { + typedmemclr(c.elemtype, cas.elem) + } + if raceenabled { + raceacquire(c.raceaddr()) + } + goto retc + +send: + // can send to a sleeping receiver (sg) + if raceenabled { + raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) + } + if msanenabled { + msanread(cas.elem, c.elemtype.Size_) + } + if asanenabled { + asanread(cas.elem, c.elemtype.Size_) + } + send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) + if debugSelect { + print("syncsend: cas0=", cas0, " c=", c, "\n") + } + goto retc + +retc: + if caseReleaseTime > 0 { + blockevent(caseReleaseTime-t0, 1) + } + return casi, recvOK + +sclose: + // send on closed channel + selunlock(scases, lockorder) + panic(plainError("send on closed channel")) +} + +func (c *hchan) sortkey() uintptr { + return uintptr(unsafe.Pointer(c)) +} + +// A runtimeSelect is a single case passed to rselect. +// This must match ../reflect/value.go:/runtimeSelect +type runtimeSelect struct { + dir selectDir + typ unsafe.Pointer // channel type (not used here) + ch *hchan // channel + val unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir) +} + +// These values must match ../reflect/value.go:/SelectDir. +type selectDir int + +const ( + _ selectDir = iota + selectSend // case Chan <- Send + selectRecv // case <-Chan: + selectDefault // default +) + +//go:linkname reflect_rselect reflect.rselect +func reflect_rselect(cases []runtimeSelect) (int, bool) { + if len(cases) == 0 { + block() + } + sel := make([]scase, len(cases)) + orig := make([]int, len(cases)) + nsends, nrecvs := 0, 0 + dflt := -1 + for i, rc := range cases { + var j int + switch rc.dir { + case selectDefault: + dflt = i + continue + case selectSend: + j = nsends + nsends++ + case selectRecv: + nrecvs++ + j = len(cases) - nrecvs + } + + sel[j] = scase{c: rc.ch, elem: rc.val} + orig[j] = i + } + + // Only a default case. + if nsends+nrecvs == 0 { + return dflt, false + } + + // Compact sel and orig if necessary. + if nsends+nrecvs < len(cases) { + copy(sel[nsends:], sel[len(cases)-nrecvs:]) + copy(orig[nsends:], orig[len(cases)-nrecvs:]) + } + + order := make([]uint16, 2*(nsends+nrecvs)) + var pc0 *uintptr + if raceenabled { + pcs := make([]uintptr, nsends+nrecvs) + for i := range pcs { + selectsetpc(&pcs[i]) + } + pc0 = &pcs[0] + } + + chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1) + + // Translate chosen back to caller's ordering. + if chosen < 0 { + chosen = dflt + } else { + chosen = orig[chosen] + } + return chosen, recvOK +} + +func (q *waitq) dequeueSudoG(sgp *sudog) { + x := sgp.prev + y := sgp.next + if x != nil { + if y != nil { + // middle of queue + x.next = y + y.prev = x + sgp.next = nil + sgp.prev = nil + return + } + // end of queue + x.next = nil + q.last = x + sgp.prev = nil + return + } + if y != nil { + // start of queue + y.prev = nil + q.first = y + sgp.next = nil + return + } + + // x==y==nil. Either sgp is the only element in the queue, + // or it has already been removed. Use q.first to disambiguate. + if q.first == sgp { + q.first = nil + q.last = nil + } +} |