diff options
Diffstat (limited to 'src/io/pipe.go')
-rw-r--r-- | src/io/pipe.go | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/src/io/pipe.go b/src/io/pipe.go new file mode 100644 index 0000000..f34cf25 --- /dev/null +++ b/src/io/pipe.go @@ -0,0 +1,202 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Pipe adapter to connect code expecting an io.Reader +// with code expecting an io.Writer. + +package io + +import ( + "errors" + "sync" +) + +// onceError is an object that will only store an error once. +type onceError struct { + sync.Mutex // guards following + err error +} + +func (a *onceError) Store(err error) { + a.Lock() + defer a.Unlock() + if a.err != nil { + return + } + a.err = err +} +func (a *onceError) Load() error { + a.Lock() + defer a.Unlock() + return a.err +} + +// ErrClosedPipe is the error used for read or write operations on a closed pipe. +var ErrClosedPipe = errors.New("io: read/write on closed pipe") + +// A pipe is the shared pipe structure underlying PipeReader and PipeWriter. +type pipe struct { + wrMu sync.Mutex // Serializes Write operations + wrCh chan []byte + rdCh chan int + + once sync.Once // Protects closing done + done chan struct{} + rerr onceError + werr onceError +} + +func (p *pipe) read(b []byte) (n int, err error) { + select { + case <-p.done: + return 0, p.readCloseError() + default: + } + + select { + case bw := <-p.wrCh: + nr := copy(b, bw) + p.rdCh <- nr + return nr, nil + case <-p.done: + return 0, p.readCloseError() + } +} + +func (p *pipe) closeRead(err error) error { + if err == nil { + err = ErrClosedPipe + } + p.rerr.Store(err) + p.once.Do(func() { close(p.done) }) + return nil +} + +func (p *pipe) write(b []byte) (n int, err error) { + select { + case <-p.done: + return 0, p.writeCloseError() + default: + p.wrMu.Lock() + defer p.wrMu.Unlock() + } + + for once := true; once || len(b) > 0; once = false { + select { + case p.wrCh <- b: + nw := <-p.rdCh + b = b[nw:] + n += nw + case <-p.done: + return n, p.writeCloseError() + } + } + return n, nil +} + +func (p *pipe) closeWrite(err error) error { + if err == nil { + err = EOF + } + p.werr.Store(err) + p.once.Do(func() { close(p.done) }) + return nil +} + +// readCloseError is considered internal to the pipe type. +func (p *pipe) readCloseError() error { + rerr := p.rerr.Load() + if werr := p.werr.Load(); rerr == nil && werr != nil { + return werr + } + return ErrClosedPipe +} + +// writeCloseError is considered internal to the pipe type. +func (p *pipe) writeCloseError() error { + werr := p.werr.Load() + if rerr := p.rerr.Load(); werr == nil && rerr != nil { + return rerr + } + return ErrClosedPipe +} + +// A PipeReader is the read half of a pipe. +type PipeReader struct{ pipe } + +// Read implements the standard Read interface: +// it reads data from the pipe, blocking until a writer +// arrives or the write end is closed. +// If the write end is closed with an error, that error is +// returned as err; otherwise err is EOF. +func (r *PipeReader) Read(data []byte) (n int, err error) { + return r.pipe.read(data) +} + +// Close closes the reader; subsequent writes to the +// write half of the pipe will return the error [ErrClosedPipe]. +func (r *PipeReader) Close() error { + return r.CloseWithError(nil) +} + +// CloseWithError closes the reader; subsequent writes +// to the write half of the pipe will return the error err. +// +// CloseWithError never overwrites the previous error if it exists +// and always returns nil. +func (r *PipeReader) CloseWithError(err error) error { + return r.pipe.closeRead(err) +} + +// A PipeWriter is the write half of a pipe. +type PipeWriter struct{ r PipeReader } + +// Write implements the standard Write interface: +// it writes data to the pipe, blocking until one or more readers +// have consumed all the data or the read end is closed. +// If the read end is closed with an error, that err is +// returned as err; otherwise err is [ErrClosedPipe]. +func (w *PipeWriter) Write(data []byte) (n int, err error) { + return w.r.pipe.write(data) +} + +// Close closes the writer; subsequent reads from the +// read half of the pipe will return no bytes and EOF. +func (w *PipeWriter) Close() error { + return w.CloseWithError(nil) +} + +// CloseWithError closes the writer; subsequent reads from the +// read half of the pipe will return no bytes and the error err, +// or EOF if err is nil. +// +// CloseWithError never overwrites the previous error if it exists +// and always returns nil. +func (w *PipeWriter) CloseWithError(err error) error { + return w.r.pipe.closeWrite(err) +} + +// Pipe creates a synchronous in-memory pipe. +// It can be used to connect code expecting an [io.Reader] +// with code expecting an [io.Writer]. +// +// Reads and Writes on the pipe are matched one to one +// except when multiple Reads are needed to consume a single Write. +// That is, each Write to the [PipeWriter] blocks until it has satisfied +// one or more Reads from the [PipeReader] that fully consume +// the written data. +// The data is copied directly from the Write to the corresponding +// Read (or Reads); there is no internal buffering. +// +// It is safe to call Read and Write in parallel with each other or with Close. +// Parallel calls to Read and parallel calls to Write are also safe: +// the individual calls will be gated sequentially. +func Pipe() (*PipeReader, *PipeWriter) { + pw := &PipeWriter{r: PipeReader{pipe: pipe{ + wrCh: make(chan []byte), + rdCh: make(chan int), + done: make(chan struct{}), + }}} + return &pw.r, pw +} |