diff options
Diffstat (limited to 'src/internal/poll/fd_mutex.go')
-rw-r--r-- | src/internal/poll/fd_mutex.go | 252 |
1 files changed, 252 insertions, 0 deletions
diff --git a/src/internal/poll/fd_mutex.go b/src/internal/poll/fd_mutex.go new file mode 100644 index 0000000..0a8ee6f --- /dev/null +++ b/src/internal/poll/fd_mutex.go @@ -0,0 +1,252 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +import "sync/atomic" + +// fdMutex is a specialized synchronization primitive that manages +// lifetime of an fd and serializes access to Read, Write and Close +// methods on FD. +type fdMutex struct { + state uint64 + rsema uint32 + wsema uint32 +} + +// fdMutex.state is organized as follows: +// 1 bit - whether FD is closed, if set all subsequent lock operations will fail. +// 1 bit - lock for read operations. +// 1 bit - lock for write operations. +// 20 bits - total number of references (read+write+misc). +// 20 bits - number of outstanding read waiters. +// 20 bits - number of outstanding write waiters. +const ( + mutexClosed = 1 << 0 + mutexRLock = 1 << 1 + mutexWLock = 1 << 2 + mutexRef = 1 << 3 + mutexRefMask = (1<<20 - 1) << 3 + mutexRWait = 1 << 23 + mutexRMask = (1<<20 - 1) << 23 + mutexWWait = 1 << 43 + mutexWMask = (1<<20 - 1) << 43 +) + +const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)" + +// Read operations must do rwlock(true)/rwunlock(true). +// +// Write operations must do rwlock(false)/rwunlock(false). +// +// Misc operations must do incref/decref. +// Misc operations include functions like setsockopt and setDeadline. +// They need to use incref/decref to ensure that they operate on the +// correct fd in presence of a concurrent close call (otherwise fd can +// be closed under their feet). +// +// Close operations must do increfAndClose/decref. + +// incref adds a reference to mu. +// It reports whether mu is available for reading or writing. +func (mu *fdMutex) incref() bool { + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexClosed != 0 { + return false + } + new := old + mutexRef + if new&mutexRefMask == 0 { + panic(overflowMsg) + } + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + return true + } + } +} + +// increfAndClose sets the state of mu to closed. +// It returns false if the file was already closed. +func (mu *fdMutex) increfAndClose() bool { + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexClosed != 0 { + return false + } + // Mark as closed and acquire a reference. + new := (old | mutexClosed) + mutexRef + if new&mutexRefMask == 0 { + panic(overflowMsg) + } + // Remove all read and write waiters. + new &^= mutexRMask | mutexWMask + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + // Wake all read and write waiters, + // they will observe closed flag after wakeup. + for old&mutexRMask != 0 { + old -= mutexRWait + runtime_Semrelease(&mu.rsema) + } + for old&mutexWMask != 0 { + old -= mutexWWait + runtime_Semrelease(&mu.wsema) + } + return true + } + } +} + +// decref removes a reference from mu. +// It reports whether there is no remaining reference. +func (mu *fdMutex) decref() bool { + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexRefMask == 0 { + panic("inconsistent poll.fdMutex") + } + new := old - mutexRef + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + return new&(mutexClosed|mutexRefMask) == mutexClosed + } + } +} + +// lock adds a reference to mu and locks mu. +// It reports whether mu is available for reading or writing. +func (mu *fdMutex) rwlock(read bool) bool { + var mutexBit, mutexWait, mutexMask uint64 + var mutexSema *uint32 + if read { + mutexBit = mutexRLock + mutexWait = mutexRWait + mutexMask = mutexRMask + mutexSema = &mu.rsema + } else { + mutexBit = mutexWLock + mutexWait = mutexWWait + mutexMask = mutexWMask + mutexSema = &mu.wsema + } + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexClosed != 0 { + return false + } + var new uint64 + if old&mutexBit == 0 { + // Lock is free, acquire it. + new = (old | mutexBit) + mutexRef + if new&mutexRefMask == 0 { + panic(overflowMsg) + } + } else { + // Wait for lock. + new = old + mutexWait + if new&mutexMask == 0 { + panic(overflowMsg) + } + } + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + if old&mutexBit == 0 { + return true + } + runtime_Semacquire(mutexSema) + // The signaller has subtracted mutexWait. + } + } +} + +// unlock removes a reference from mu and unlocks mu. +// It reports whether there is no remaining reference. +func (mu *fdMutex) rwunlock(read bool) bool { + var mutexBit, mutexWait, mutexMask uint64 + var mutexSema *uint32 + if read { + mutexBit = mutexRLock + mutexWait = mutexRWait + mutexMask = mutexRMask + mutexSema = &mu.rsema + } else { + mutexBit = mutexWLock + mutexWait = mutexWWait + mutexMask = mutexWMask + mutexSema = &mu.wsema + } + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexBit == 0 || old&mutexRefMask == 0 { + panic("inconsistent poll.fdMutex") + } + // Drop lock, drop reference and wake read waiter if present. + new := (old &^ mutexBit) - mutexRef + if old&mutexMask != 0 { + new -= mutexWait + } + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + if old&mutexMask != 0 { + runtime_Semrelease(mutexSema) + } + return new&(mutexClosed|mutexRefMask) == mutexClosed + } + } +} + +// Implemented in runtime package. +func runtime_Semacquire(sema *uint32) +func runtime_Semrelease(sema *uint32) + +// incref adds a reference to fd. +// It returns an error when fd cannot be used. +func (fd *FD) incref() error { + if !fd.fdmu.incref() { + return errClosing(fd.isFile) + } + return nil +} + +// decref removes a reference from fd. +// It also closes fd when the state of fd is set to closed and there +// is no remaining reference. +func (fd *FD) decref() error { + if fd.fdmu.decref() { + return fd.destroy() + } + return nil +} + +// readLock adds a reference to fd and locks fd for reading. +// It returns an error when fd cannot be used for reading. +func (fd *FD) readLock() error { + if !fd.fdmu.rwlock(true) { + return errClosing(fd.isFile) + } + return nil +} + +// readUnlock removes a reference from fd and unlocks fd for reading. +// It also closes fd when the state of fd is set to closed and there +// is no remaining reference. +func (fd *FD) readUnlock() { + if fd.fdmu.rwunlock(true) { + fd.destroy() + } +} + +// writeLock adds a reference to fd and locks fd for writing. +// It returns an error when fd cannot be used for writing. +func (fd *FD) writeLock() error { + if !fd.fdmu.rwlock(false) { + return errClosing(fd.isFile) + } + return nil +} + +// writeUnlock removes a reference from fd and unlocks fd for writing. +// It also closes fd when the state of fd is set to closed and there +// is no remaining reference. +func (fd *FD) writeUnlock() { + if fd.fdmu.rwunlock(false) { + fd.destroy() + } +} |