summaryrefslogtreecommitdiffstats
path: root/pkg/v1/stream/layer.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/v1/stream/layer.go')
-rw-r--r--pkg/v1/stream/layer.go273
1 files changed, 273 insertions, 0 deletions
diff --git a/pkg/v1/stream/layer.go b/pkg/v1/stream/layer.go
new file mode 100644
index 0000000..d6f2df8
--- /dev/null
+++ b/pkg/v1/stream/layer.go
@@ -0,0 +1,273 @@
+// Copyright 2018 Google LLC All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package stream implements a single-pass streaming v1.Layer.
+package stream
+
+import (
+ "bufio"
+ "compress/gzip"
+ "crypto"
+ "encoding/hex"
+ "errors"
+ "hash"
+ "io"
+ "os"
+ "sync"
+
+ v1 "github.com/google/go-containerregistry/pkg/v1"
+ "github.com/google/go-containerregistry/pkg/v1/types"
+)
+
+var (
+ // ErrNotComputed is returned when the requested value is not yet
+ // computed because the stream has not been consumed yet.
+ ErrNotComputed = errors.New("value not computed until stream is consumed")
+
+ // ErrConsumed is returned by Compressed when the underlying stream has
+ // already been consumed and closed.
+ ErrConsumed = errors.New("stream was already consumed")
+)
+
+// Layer is a streaming implementation of v1.Layer.
+type Layer struct {
+ blob io.ReadCloser
+ consumed bool
+ compression int
+
+ mu sync.Mutex
+ digest, diffID *v1.Hash
+ size int64
+ mediaType types.MediaType
+}
+
+var _ v1.Layer = (*Layer)(nil)
+
+// LayerOption applies options to layer
+type LayerOption func(*Layer)
+
+// WithCompressionLevel sets the gzip compression. See `gzip.NewWriterLevel` for possible values.
+func WithCompressionLevel(level int) LayerOption {
+ return func(l *Layer) {
+ l.compression = level
+ }
+}
+
+// WithMediaType is a functional option for overriding the layer's media type.
+func WithMediaType(mt types.MediaType) LayerOption {
+ return func(l *Layer) {
+ l.mediaType = mt
+ }
+}
+
+// NewLayer creates a Layer from an io.ReadCloser.
+func NewLayer(rc io.ReadCloser, opts ...LayerOption) *Layer {
+ layer := &Layer{
+ blob: rc,
+ compression: gzip.BestSpeed,
+ // We use DockerLayer for now as uncompressed layers
+ // are unimplemented
+ mediaType: types.DockerLayer,
+ }
+
+ for _, opt := range opts {
+ opt(layer)
+ }
+
+ return layer
+}
+
+// Digest implements v1.Layer.
+func (l *Layer) Digest() (v1.Hash, error) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ if l.digest == nil {
+ return v1.Hash{}, ErrNotComputed
+ }
+ return *l.digest, nil
+}
+
+// DiffID implements v1.Layer.
+func (l *Layer) DiffID() (v1.Hash, error) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ if l.diffID == nil {
+ return v1.Hash{}, ErrNotComputed
+ }
+ return *l.diffID, nil
+}
+
+// Size implements v1.Layer.
+func (l *Layer) Size() (int64, error) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ if l.size == 0 {
+ return 0, ErrNotComputed
+ }
+ return l.size, nil
+}
+
+// MediaType implements v1.Layer
+func (l *Layer) MediaType() (types.MediaType, error) {
+ return l.mediaType, nil
+}
+
+// Uncompressed implements v1.Layer.
+func (l *Layer) Uncompressed() (io.ReadCloser, error) {
+ return nil, errors.New("NYI: stream.Layer.Uncompressed is not implemented")
+}
+
+// Compressed implements v1.Layer.
+func (l *Layer) Compressed() (io.ReadCloser, error) {
+ if l.consumed {
+ return nil, ErrConsumed
+ }
+ return newCompressedReader(l)
+}
+
+// finalize sets the layer to consumed and computes all hash and size values.
+func (l *Layer) finalize(uncompressed, compressed hash.Hash, size int64) error {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
+ diffID, err := v1.NewHash("sha256:" + hex.EncodeToString(uncompressed.Sum(nil)))
+ if err != nil {
+ return err
+ }
+ l.diffID = &diffID
+
+ digest, err := v1.NewHash("sha256:" + hex.EncodeToString(compressed.Sum(nil)))
+ if err != nil {
+ return err
+ }
+ l.digest = &digest
+
+ l.size = size
+ l.consumed = true
+ return nil
+}
+
+type compressedReader struct {
+ pr io.Reader
+ closer func() error
+}
+
+func newCompressedReader(l *Layer) (*compressedReader, error) {
+ // Collect digests of compressed and uncompressed stream and size of
+ // compressed stream.
+ h := crypto.SHA256.New()
+ zh := crypto.SHA256.New()
+ count := &countWriter{}
+
+ // gzip.Writer writes to the output stream via pipe, a hasher to
+ // capture compressed digest, and a countWriter to capture compressed
+ // size.
+ pr, pw := io.Pipe()
+
+ // Write compressed bytes to be read by the pipe.Reader, hashed by zh, and counted by count.
+ mw := io.MultiWriter(pw, zh, count)
+
+ // Buffer the output of the gzip writer so we don't have to wait on pr to keep writing.
+ // 64K ought to be small enough for anybody.
+ bw := bufio.NewWriterSize(mw, 2<<16)
+ zw, err := gzip.NewWriterLevel(bw, l.compression)
+ if err != nil {
+ return nil, err
+ }
+
+ doneDigesting := make(chan struct{})
+
+ cr := &compressedReader{
+ pr: pr,
+ closer: func() error {
+ // Immediately close pw without error. There are three ways to get
+ // here.
+ //
+ // 1. There was a copy error due from the underlying reader, in which
+ // case the error will not be overwritten.
+ // 2. Copying from the underlying reader completed successfully.
+ // 3. Close has been called before the underlying reader has been
+ // fully consumed. In this case pw must be closed in order to
+ // keep the flush of bw from blocking indefinitely.
+ //
+ // NOTE: pw.Close never returns an error. The signature is only to
+ // implement io.Closer.
+ _ = pw.Close()
+
+ // Close the inner ReadCloser.
+ //
+ // NOTE: net/http will call close on success, so if we've already
+ // closed the inner rc, it's not an error.
+ if err := l.blob.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
+ return err
+ }
+
+ // Finalize layer with its digest and size values.
+ <-doneDigesting
+ return l.finalize(h, zh, count.n)
+ },
+ }
+ go func() {
+ // Copy blob into the gzip writer, which also hashes and counts the
+ // size of the compressed output, and hasher of the raw contents.
+ _, copyErr := io.Copy(io.MultiWriter(h, zw), l.blob)
+
+ // Close the gzip writer once copying is done. If this is done in the
+ // Close method of compressedReader instead, then it can cause a panic
+ // when the compressedReader is closed before the blob is fully
+ // consumed and io.Copy in this goroutine is still blocking.
+ closeErr := zw.Close()
+
+ // Check errors from writing and closing streams.
+ if copyErr != nil {
+ close(doneDigesting)
+ pw.CloseWithError(copyErr)
+ return
+ }
+ if closeErr != nil {
+ close(doneDigesting)
+ pw.CloseWithError(closeErr)
+ return
+ }
+
+ // Flush the buffer once all writes are complete to the gzip writer.
+ if err := bw.Flush(); err != nil {
+ close(doneDigesting)
+ pw.CloseWithError(err)
+ return
+ }
+
+ // Notify closer that digests are done being written.
+ close(doneDigesting)
+
+ // Close the compressed reader to calculate digest/diffID/size. This
+ // will cause pr to return EOF which will cause readers of the
+ // Compressed stream to finish reading.
+ pw.CloseWithError(cr.Close())
+ }()
+
+ return cr, nil
+}
+
+func (cr *compressedReader) Read(b []byte) (int, error) { return cr.pr.Read(b) }
+
+func (cr *compressedReader) Close() error { return cr.closer() }
+
+// countWriter counts bytes written to it.
+type countWriter struct{ n int64 }
+
+func (c *countWriter) Write(p []byte) (int, error) {
+ c.n += int64(len(p))
+ return len(p), nil
+}