summaryrefslogtreecommitdiffstats
path: root/src/io/pipe.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/io/pipe.go')
-rw-r--r--src/io/pipe.go202
1 files changed, 202 insertions, 0 deletions
diff --git a/src/io/pipe.go b/src/io/pipe.go
new file mode 100644
index 0000000..f34cf25
--- /dev/null
+++ b/src/io/pipe.go
@@ -0,0 +1,202 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Pipe adapter to connect code expecting an io.Reader
+// with code expecting an io.Writer.
+
+package io
+
+import (
+ "errors"
+ "sync"
+)
+
+// onceError is an object that will only store an error once.
+type onceError struct {
+ sync.Mutex // guards following
+ err error
+}
+
+func (a *onceError) Store(err error) {
+ a.Lock()
+ defer a.Unlock()
+ if a.err != nil {
+ return
+ }
+ a.err = err
+}
+func (a *onceError) Load() error {
+ a.Lock()
+ defer a.Unlock()
+ return a.err
+}
+
+// ErrClosedPipe is the error used for read or write operations on a closed pipe.
+var ErrClosedPipe = errors.New("io: read/write on closed pipe")
+
+// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
+type pipe struct {
+ wrMu sync.Mutex // Serializes Write operations
+ wrCh chan []byte
+ rdCh chan int
+
+ once sync.Once // Protects closing done
+ done chan struct{}
+ rerr onceError
+ werr onceError
+}
+
+func (p *pipe) read(b []byte) (n int, err error) {
+ select {
+ case <-p.done:
+ return 0, p.readCloseError()
+ default:
+ }
+
+ select {
+ case bw := <-p.wrCh:
+ nr := copy(b, bw)
+ p.rdCh <- nr
+ return nr, nil
+ case <-p.done:
+ return 0, p.readCloseError()
+ }
+}
+
+func (p *pipe) closeRead(err error) error {
+ if err == nil {
+ err = ErrClosedPipe
+ }
+ p.rerr.Store(err)
+ p.once.Do(func() { close(p.done) })
+ return nil
+}
+
+func (p *pipe) write(b []byte) (n int, err error) {
+ select {
+ case <-p.done:
+ return 0, p.writeCloseError()
+ default:
+ p.wrMu.Lock()
+ defer p.wrMu.Unlock()
+ }
+
+ for once := true; once || len(b) > 0; once = false {
+ select {
+ case p.wrCh <- b:
+ nw := <-p.rdCh
+ b = b[nw:]
+ n += nw
+ case <-p.done:
+ return n, p.writeCloseError()
+ }
+ }
+ return n, nil
+}
+
+func (p *pipe) closeWrite(err error) error {
+ if err == nil {
+ err = EOF
+ }
+ p.werr.Store(err)
+ p.once.Do(func() { close(p.done) })
+ return nil
+}
+
+// readCloseError is considered internal to the pipe type.
+func (p *pipe) readCloseError() error {
+ rerr := p.rerr.Load()
+ if werr := p.werr.Load(); rerr == nil && werr != nil {
+ return werr
+ }
+ return ErrClosedPipe
+}
+
+// writeCloseError is considered internal to the pipe type.
+func (p *pipe) writeCloseError() error {
+ werr := p.werr.Load()
+ if rerr := p.rerr.Load(); werr == nil && rerr != nil {
+ return rerr
+ }
+ return ErrClosedPipe
+}
+
+// A PipeReader is the read half of a pipe.
+type PipeReader struct{ pipe }
+
+// Read implements the standard Read interface:
+// it reads data from the pipe, blocking until a writer
+// arrives or the write end is closed.
+// If the write end is closed with an error, that error is
+// returned as err; otherwise err is EOF.
+func (r *PipeReader) Read(data []byte) (n int, err error) {
+ return r.pipe.read(data)
+}
+
+// Close closes the reader; subsequent writes to the
+// write half of the pipe will return the error [ErrClosedPipe].
+func (r *PipeReader) Close() error {
+ return r.CloseWithError(nil)
+}
+
+// CloseWithError closes the reader; subsequent writes
+// to the write half of the pipe will return the error err.
+//
+// CloseWithError never overwrites the previous error if it exists
+// and always returns nil.
+func (r *PipeReader) CloseWithError(err error) error {
+ return r.pipe.closeRead(err)
+}
+
+// A PipeWriter is the write half of a pipe.
+type PipeWriter struct{ r PipeReader }
+
+// Write implements the standard Write interface:
+// it writes data to the pipe, blocking until one or more readers
+// have consumed all the data or the read end is closed.
+// If the read end is closed with an error, that err is
+// returned as err; otherwise err is [ErrClosedPipe].
+func (w *PipeWriter) Write(data []byte) (n int, err error) {
+ return w.r.pipe.write(data)
+}
+
+// Close closes the writer; subsequent reads from the
+// read half of the pipe will return no bytes and EOF.
+func (w *PipeWriter) Close() error {
+ return w.CloseWithError(nil)
+}
+
+// CloseWithError closes the writer; subsequent reads from the
+// read half of the pipe will return no bytes and the error err,
+// or EOF if err is nil.
+//
+// CloseWithError never overwrites the previous error if it exists
+// and always returns nil.
+func (w *PipeWriter) CloseWithError(err error) error {
+ return w.r.pipe.closeWrite(err)
+}
+
+// Pipe creates a synchronous in-memory pipe.
+// It can be used to connect code expecting an [io.Reader]
+// with code expecting an [io.Writer].
+//
+// Reads and Writes on the pipe are matched one to one
+// except when multiple Reads are needed to consume a single Write.
+// That is, each Write to the [PipeWriter] blocks until it has satisfied
+// one or more Reads from the [PipeReader] that fully consume
+// the written data.
+// The data is copied directly from the Write to the corresponding
+// Read (or Reads); there is no internal buffering.
+//
+// It is safe to call Read and Write in parallel with each other or with Close.
+// Parallel calls to Read and parallel calls to Write are also safe:
+// the individual calls will be gated sequentially.
+func Pipe() (*PipeReader, *PipeWriter) {
+ pw := &PipeWriter{r: PipeReader{pipe: pipe{
+ wrCh: make(chan []byte),
+ rdCh: make(chan int),
+ done: make(chan struct{}),
+ }}}
+ return &pw.r, pw
+}