summaryrefslogtreecommitdiffstats
path: root/src/net/pipe.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/pipe.go')
-rw-r--r--src/net/pipe.go238
1 files changed, 238 insertions, 0 deletions
diff --git a/src/net/pipe.go b/src/net/pipe.go
new file mode 100644
index 0000000..f174193
--- /dev/null
+++ b/src/net/pipe.go
@@ -0,0 +1,238 @@
+// Copyright 2010 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import (
+ "io"
+ "os"
+ "sync"
+ "time"
+)
+
+// pipeDeadline is an abstraction for handling timeouts.
+type pipeDeadline struct {
+ mu sync.Mutex // Guards timer and cancel
+ timer *time.Timer
+ cancel chan struct{} // Must be non-nil
+}
+
+func makePipeDeadline() pipeDeadline {
+ return pipeDeadline{cancel: make(chan struct{})}
+}
+
+// set sets the point in time when the deadline will time out.
+// A timeout event is signaled by closing the channel returned by waiter.
+// Once a timeout has occurred, the deadline can be refreshed by specifying a
+// t value in the future.
+//
+// A zero value for t prevents timeout.
+func (d *pipeDeadline) set(t time.Time) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ if d.timer != nil && !d.timer.Stop() {
+ <-d.cancel // Wait for the timer callback to finish and close cancel
+ }
+ d.timer = nil
+
+ // Time is zero, then there is no deadline.
+ closed := isClosedChan(d.cancel)
+ if t.IsZero() {
+ if closed {
+ d.cancel = make(chan struct{})
+ }
+ return
+ }
+
+ // Time in the future, setup a timer to cancel in the future.
+ if dur := time.Until(t); dur > 0 {
+ if closed {
+ d.cancel = make(chan struct{})
+ }
+ d.timer = time.AfterFunc(dur, func() {
+ close(d.cancel)
+ })
+ return
+ }
+
+ // Time in the past, so close immediately.
+ if !closed {
+ close(d.cancel)
+ }
+}
+
+// wait returns a channel that is closed when the deadline is exceeded.
+func (d *pipeDeadline) wait() chan struct{} {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ return d.cancel
+}
+
+func isClosedChan(c <-chan struct{}) bool {
+ select {
+ case <-c:
+ return true
+ default:
+ return false
+ }
+}
+
+type pipeAddr struct{}
+
+func (pipeAddr) Network() string { return "pipe" }
+func (pipeAddr) String() string { return "pipe" }
+
+type pipe struct {
+ wrMu sync.Mutex // Serialize Write operations
+
+ // Used by local Read to interact with remote Write.
+ // Successful receive on rdRx is always followed by send on rdTx.
+ rdRx <-chan []byte
+ rdTx chan<- int
+
+ // Used by local Write to interact with remote Read.
+ // Successful send on wrTx is always followed by receive on wrRx.
+ wrTx chan<- []byte
+ wrRx <-chan int
+
+ once sync.Once // Protects closing localDone
+ localDone chan struct{}
+ remoteDone <-chan struct{}
+
+ readDeadline pipeDeadline
+ writeDeadline pipeDeadline
+}
+
+// Pipe creates a synchronous, in-memory, full duplex
+// network connection; both ends implement the Conn interface.
+// Reads on one end are matched with writes on the other,
+// copying data directly between the two; there is no internal
+// buffering.
+func Pipe() (Conn, Conn) {
+ cb1 := make(chan []byte)
+ cb2 := make(chan []byte)
+ cn1 := make(chan int)
+ cn2 := make(chan int)
+ done1 := make(chan struct{})
+ done2 := make(chan struct{})
+
+ p1 := &pipe{
+ rdRx: cb1, rdTx: cn1,
+ wrTx: cb2, wrRx: cn2,
+ localDone: done1, remoteDone: done2,
+ readDeadline: makePipeDeadline(),
+ writeDeadline: makePipeDeadline(),
+ }
+ p2 := &pipe{
+ rdRx: cb2, rdTx: cn2,
+ wrTx: cb1, wrRx: cn1,
+ localDone: done2, remoteDone: done1,
+ readDeadline: makePipeDeadline(),
+ writeDeadline: makePipeDeadline(),
+ }
+ return p1, p2
+}
+
+func (*pipe) LocalAddr() Addr { return pipeAddr{} }
+func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
+
+func (p *pipe) Read(b []byte) (int, error) {
+ n, err := p.read(b)
+ if err != nil && err != io.EOF && err != io.ErrClosedPipe {
+ err = &OpError{Op: "read", Net: "pipe", Err: err}
+ }
+ return n, err
+}
+
+func (p *pipe) read(b []byte) (n int, err error) {
+ switch {
+ case isClosedChan(p.localDone):
+ return 0, io.ErrClosedPipe
+ case isClosedChan(p.remoteDone):
+ return 0, io.EOF
+ case isClosedChan(p.readDeadline.wait()):
+ return 0, os.ErrDeadlineExceeded
+ }
+
+ select {
+ case bw := <-p.rdRx:
+ nr := copy(b, bw)
+ p.rdTx <- nr
+ return nr, nil
+ case <-p.localDone:
+ return 0, io.ErrClosedPipe
+ case <-p.remoteDone:
+ return 0, io.EOF
+ case <-p.readDeadline.wait():
+ return 0, os.ErrDeadlineExceeded
+ }
+}
+
+func (p *pipe) Write(b []byte) (int, error) {
+ n, err := p.write(b)
+ if err != nil && err != io.ErrClosedPipe {
+ err = &OpError{Op: "write", Net: "pipe", Err: err}
+ }
+ return n, err
+}
+
+func (p *pipe) write(b []byte) (n int, err error) {
+ switch {
+ case isClosedChan(p.localDone):
+ return 0, io.ErrClosedPipe
+ case isClosedChan(p.remoteDone):
+ return 0, io.ErrClosedPipe
+ case isClosedChan(p.writeDeadline.wait()):
+ return 0, os.ErrDeadlineExceeded
+ }
+
+ p.wrMu.Lock() // Ensure entirety of b is written together
+ defer p.wrMu.Unlock()
+ for once := true; once || len(b) > 0; once = false {
+ select {
+ case p.wrTx <- b:
+ nw := <-p.wrRx
+ b = b[nw:]
+ n += nw
+ case <-p.localDone:
+ return n, io.ErrClosedPipe
+ case <-p.remoteDone:
+ return n, io.ErrClosedPipe
+ case <-p.writeDeadline.wait():
+ return n, os.ErrDeadlineExceeded
+ }
+ }
+ return n, nil
+}
+
+func (p *pipe) SetDeadline(t time.Time) error {
+ if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
+ return io.ErrClosedPipe
+ }
+ p.readDeadline.set(t)
+ p.writeDeadline.set(t)
+ return nil
+}
+
+func (p *pipe) SetReadDeadline(t time.Time) error {
+ if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
+ return io.ErrClosedPipe
+ }
+ p.readDeadline.set(t)
+ return nil
+}
+
+func (p *pipe) SetWriteDeadline(t time.Time) error {
+ if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
+ return io.ErrClosedPipe
+ }
+ p.writeDeadline.set(t)
+ return nil
+}
+
+func (p *pipe) Close() error {
+ p.once.Do(func() { close(p.localDone) })
+ return nil
+}