summaryrefslogtreecommitdiffstats
path: root/src/internal/poll/fd_mutex.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/internal/poll/fd_mutex.go')
-rw-r--r--src/internal/poll/fd_mutex.go252
1 files changed, 252 insertions, 0 deletions
diff --git a/src/internal/poll/fd_mutex.go b/src/internal/poll/fd_mutex.go
new file mode 100644
index 0000000..0a8ee6f
--- /dev/null
+++ b/src/internal/poll/fd_mutex.go
@@ -0,0 +1,252 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package poll
+
+import "sync/atomic"
+
+// fdMutex is a specialized synchronization primitive that manages
+// lifetime of an fd and serializes access to Read, Write and Close
+// methods on FD.
+type fdMutex struct {
+ state uint64
+ rsema uint32
+ wsema uint32
+}
+
+// fdMutex.state is organized as follows:
+// 1 bit - whether FD is closed, if set all subsequent lock operations will fail.
+// 1 bit - lock for read operations.
+// 1 bit - lock for write operations.
+// 20 bits - total number of references (read+write+misc).
+// 20 bits - number of outstanding read waiters.
+// 20 bits - number of outstanding write waiters.
+const (
+ mutexClosed = 1 << 0
+ mutexRLock = 1 << 1
+ mutexWLock = 1 << 2
+ mutexRef = 1 << 3
+ mutexRefMask = (1<<20 - 1) << 3
+ mutexRWait = 1 << 23
+ mutexRMask = (1<<20 - 1) << 23
+ mutexWWait = 1 << 43
+ mutexWMask = (1<<20 - 1) << 43
+)
+
+const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)"
+
+// Read operations must do rwlock(true)/rwunlock(true).
+//
+// Write operations must do rwlock(false)/rwunlock(false).
+//
+// Misc operations must do incref/decref.
+// Misc operations include functions like setsockopt and setDeadline.
+// They need to use incref/decref to ensure that they operate on the
+// correct fd in presence of a concurrent close call (otherwise fd can
+// be closed under their feet).
+//
+// Close operations must do increfAndClose/decref.
+
+// incref adds a reference to mu.
+// It reports whether mu is available for reading or writing.
+func (mu *fdMutex) incref() bool {
+ for {
+ old := atomic.LoadUint64(&mu.state)
+ if old&mutexClosed != 0 {
+ return false
+ }
+ new := old + mutexRef
+ if new&mutexRefMask == 0 {
+ panic(overflowMsg)
+ }
+ if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+ return true
+ }
+ }
+}
+
+// increfAndClose sets the state of mu to closed.
+// It returns false if the file was already closed.
+func (mu *fdMutex) increfAndClose() bool {
+ for {
+ old := atomic.LoadUint64(&mu.state)
+ if old&mutexClosed != 0 {
+ return false
+ }
+ // Mark as closed and acquire a reference.
+ new := (old | mutexClosed) + mutexRef
+ if new&mutexRefMask == 0 {
+ panic(overflowMsg)
+ }
+ // Remove all read and write waiters.
+ new &^= mutexRMask | mutexWMask
+ if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+ // Wake all read and write waiters,
+ // they will observe closed flag after wakeup.
+ for old&mutexRMask != 0 {
+ old -= mutexRWait
+ runtime_Semrelease(&mu.rsema)
+ }
+ for old&mutexWMask != 0 {
+ old -= mutexWWait
+ runtime_Semrelease(&mu.wsema)
+ }
+ return true
+ }
+ }
+}
+
+// decref removes a reference from mu.
+// It reports whether there is no remaining reference.
+func (mu *fdMutex) decref() bool {
+ for {
+ old := atomic.LoadUint64(&mu.state)
+ if old&mutexRefMask == 0 {
+ panic("inconsistent poll.fdMutex")
+ }
+ new := old - mutexRef
+ if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+ return new&(mutexClosed|mutexRefMask) == mutexClosed
+ }
+ }
+}
+
+// lock adds a reference to mu and locks mu.
+// It reports whether mu is available for reading or writing.
+func (mu *fdMutex) rwlock(read bool) bool {
+ var mutexBit, mutexWait, mutexMask uint64
+ var mutexSema *uint32
+ if read {
+ mutexBit = mutexRLock
+ mutexWait = mutexRWait
+ mutexMask = mutexRMask
+ mutexSema = &mu.rsema
+ } else {
+ mutexBit = mutexWLock
+ mutexWait = mutexWWait
+ mutexMask = mutexWMask
+ mutexSema = &mu.wsema
+ }
+ for {
+ old := atomic.LoadUint64(&mu.state)
+ if old&mutexClosed != 0 {
+ return false
+ }
+ var new uint64
+ if old&mutexBit == 0 {
+ // Lock is free, acquire it.
+ new = (old | mutexBit) + mutexRef
+ if new&mutexRefMask == 0 {
+ panic(overflowMsg)
+ }
+ } else {
+ // Wait for lock.
+ new = old + mutexWait
+ if new&mutexMask == 0 {
+ panic(overflowMsg)
+ }
+ }
+ if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+ if old&mutexBit == 0 {
+ return true
+ }
+ runtime_Semacquire(mutexSema)
+ // The signaller has subtracted mutexWait.
+ }
+ }
+}
+
+// unlock removes a reference from mu and unlocks mu.
+// It reports whether there is no remaining reference.
+func (mu *fdMutex) rwunlock(read bool) bool {
+ var mutexBit, mutexWait, mutexMask uint64
+ var mutexSema *uint32
+ if read {
+ mutexBit = mutexRLock
+ mutexWait = mutexRWait
+ mutexMask = mutexRMask
+ mutexSema = &mu.rsema
+ } else {
+ mutexBit = mutexWLock
+ mutexWait = mutexWWait
+ mutexMask = mutexWMask
+ mutexSema = &mu.wsema
+ }
+ for {
+ old := atomic.LoadUint64(&mu.state)
+ if old&mutexBit == 0 || old&mutexRefMask == 0 {
+ panic("inconsistent poll.fdMutex")
+ }
+ // Drop lock, drop reference and wake read waiter if present.
+ new := (old &^ mutexBit) - mutexRef
+ if old&mutexMask != 0 {
+ new -= mutexWait
+ }
+ if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+ if old&mutexMask != 0 {
+ runtime_Semrelease(mutexSema)
+ }
+ return new&(mutexClosed|mutexRefMask) == mutexClosed
+ }
+ }
+}
+
+// Implemented in runtime package.
+func runtime_Semacquire(sema *uint32)
+func runtime_Semrelease(sema *uint32)
+
+// incref adds a reference to fd.
+// It returns an error when fd cannot be used.
+func (fd *FD) incref() error {
+ if !fd.fdmu.incref() {
+ return errClosing(fd.isFile)
+ }
+ return nil
+}
+
+// decref removes a reference from fd.
+// It also closes fd when the state of fd is set to closed and there
+// is no remaining reference.
+func (fd *FD) decref() error {
+ if fd.fdmu.decref() {
+ return fd.destroy()
+ }
+ return nil
+}
+
+// readLock adds a reference to fd and locks fd for reading.
+// It returns an error when fd cannot be used for reading.
+func (fd *FD) readLock() error {
+ if !fd.fdmu.rwlock(true) {
+ return errClosing(fd.isFile)
+ }
+ return nil
+}
+
+// readUnlock removes a reference from fd and unlocks fd for reading.
+// It also closes fd when the state of fd is set to closed and there
+// is no remaining reference.
+func (fd *FD) readUnlock() {
+ if fd.fdmu.rwunlock(true) {
+ fd.destroy()
+ }
+}
+
+// writeLock adds a reference to fd and locks fd for writing.
+// It returns an error when fd cannot be used for writing.
+func (fd *FD) writeLock() error {
+ if !fd.fdmu.rwlock(false) {
+ return errClosing(fd.isFile)
+ }
+ return nil
+}
+
+// writeUnlock removes a reference from fd and unlocks fd for writing.
+// It also closes fd when the state of fd is set to closed and there
+// is no remaining reference.
+func (fd *FD) writeUnlock() {
+ if fd.fdmu.rwunlock(false) {
+ fd.destroy()
+ }
+}