summaryrefslogtreecommitdiffstats
path: root/pkg/v1/remote/write.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/v1/remote/write.go')
-rw-r--r--pkg/v1/remote/write.go1003
1 files changed, 1003 insertions, 0 deletions
diff --git a/pkg/v1/remote/write.go b/pkg/v1/remote/write.go
new file mode 100644
index 0000000..5dbaa7c
--- /dev/null
+++ b/pkg/v1/remote/write.go
@@ -0,0 +1,1003 @@
+// Copyright 2018 Google LLC All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package remote
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "sort"
+ "strings"
+
+ "github.com/google/go-containerregistry/internal/redact"
+ "github.com/google/go-containerregistry/internal/retry"
+ "github.com/google/go-containerregistry/pkg/logs"
+ "github.com/google/go-containerregistry/pkg/name"
+ v1 "github.com/google/go-containerregistry/pkg/v1"
+ "github.com/google/go-containerregistry/pkg/v1/partial"
+ "github.com/google/go-containerregistry/pkg/v1/remote/transport"
+ "github.com/google/go-containerregistry/pkg/v1/stream"
+ "github.com/google/go-containerregistry/pkg/v1/types"
+ "golang.org/x/sync/errgroup"
+)
+
+// Taggable is an interface that enables a manifest PUT (e.g. for tagging).
+type Taggable interface {
+ RawManifest() ([]byte, error)
+}
+
+// Write pushes the provided img to the specified image reference.
+func Write(ref name.Reference, img v1.Image, options ...Option) (rerr error) {
+ o, err := makeOptions(ref.Context(), options...)
+ if err != nil {
+ return err
+ }
+
+ var p *progress
+ if o.updates != nil {
+ p = &progress{updates: o.updates}
+ p.lastUpdate = &v1.Update{}
+ p.lastUpdate.Total, err = countImage(img, o.allowNondistributableArtifacts)
+ if err != nil {
+ return err
+ }
+ defer close(o.updates)
+ defer func() { _ = p.err(rerr) }()
+ }
+ return writeImage(o.context, ref, img, o, p)
+}
+
+func writeImage(ctx context.Context, ref name.Reference, img v1.Image, o *options, progress *progress) error {
+ ls, err := img.Layers()
+ if err != nil {
+ return err
+ }
+ scopes := scopesForUploadingImage(ref.Context(), ls)
+ tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes)
+ if err != nil {
+ return err
+ }
+ w := writer{
+ repo: ref.Context(),
+ client: &http.Client{Transport: tr},
+ progress: progress,
+ backoff: o.retryBackoff,
+ predicate: o.retryPredicate,
+ }
+
+ // Upload individual blobs and collect any errors.
+ blobChan := make(chan v1.Layer, 2*o.jobs)
+ g, gctx := errgroup.WithContext(ctx)
+ for i := 0; i < o.jobs; i++ {
+ // Start N workers consuming blobs to upload.
+ g.Go(func() error {
+ for b := range blobChan {
+ if err := w.uploadOne(gctx, b); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ }
+
+ // Upload individual layers in goroutines and collect any errors.
+ // If we can dedupe by the layer digest, try to do so. If we can't determine
+ // the digest for whatever reason, we can't dedupe and might re-upload.
+ g.Go(func() error {
+ defer close(blobChan)
+ uploaded := map[v1.Hash]bool{}
+ for _, l := range ls {
+ l := l
+
+ // Handle foreign layers.
+ mt, err := l.MediaType()
+ if err != nil {
+ return err
+ }
+ if !mt.IsDistributable() && !o.allowNondistributableArtifacts {
+ continue
+ }
+
+ // Streaming layers calculate their digests while uploading them. Assume
+ // an error here indicates we need to upload the layer.
+ h, err := l.Digest()
+ if err == nil {
+ // If we can determine the layer's digest ahead of
+ // time, use it to dedupe uploads.
+ if uploaded[h] {
+ continue // Already uploading.
+ }
+ uploaded[h] = true
+ }
+ select {
+ case blobChan <- l:
+ case <-gctx.Done():
+ return gctx.Err()
+ }
+ }
+ return nil
+ })
+
+ if l, err := partial.ConfigLayer(img); err != nil {
+ // We can't read the ConfigLayer, possibly because of streaming layers,
+ // since the layer DiffIDs haven't been calculated yet. Attempt to wait
+ // for the other layers to be uploaded, then try the config again.
+ if err := g.Wait(); err != nil {
+ return err
+ }
+
+ // Now that all the layers are uploaded, try to upload the config file blob.
+ l, err := partial.ConfigLayer(img)
+ if err != nil {
+ return err
+ }
+ if err := w.uploadOne(ctx, l); err != nil {
+ return err
+ }
+ } else {
+ // We *can* read the ConfigLayer, so upload it concurrently with the layers.
+ g.Go(func() error {
+ return w.uploadOne(gctx, l)
+ })
+
+ // Wait for the layers + config.
+ if err := g.Wait(); err != nil {
+ return err
+ }
+ }
+
+ // With all of the constituent elements uploaded, upload the manifest
+ // to commit the image.
+ return w.commitManifest(ctx, img, ref)
+}
+
+// writer writes the elements of an image to a remote image reference.
+type writer struct {
+ repo name.Repository
+ client *http.Client
+
+ progress *progress
+ backoff Backoff
+ predicate retry.Predicate
+}
+
+// url returns a url.Url for the specified path in the context of this remote image reference.
+func (w *writer) url(path string) url.URL {
+ return url.URL{
+ Scheme: w.repo.Registry.Scheme(),
+ Host: w.repo.RegistryStr(),
+ Path: path,
+ }
+}
+
+// nextLocation extracts the fully-qualified URL to which we should send the next request in an upload sequence.
+func (w *writer) nextLocation(resp *http.Response) (string, error) {
+ loc := resp.Header.Get("Location")
+ if len(loc) == 0 {
+ return "", errors.New("missing Location header")
+ }
+ u, err := url.Parse(loc)
+ if err != nil {
+ return "", err
+ }
+
+ // If the location header returned is just a url path, then fully qualify it.
+ // We cannot simply call w.url, since there might be an embedded query string.
+ return resp.Request.URL.ResolveReference(u).String(), nil
+}
+
+// checkExistingBlob checks if a blob exists already in the repository by making a
+// HEAD request to the blob store API. GCR performs an existence check on the
+// initiation if "mount" is specified, even if no "from" sources are specified.
+// However, this is not broadly applicable to all registries, e.g. ECR.
+func (w *writer) checkExistingBlob(ctx context.Context, h v1.Hash) (bool, error) {
+ u := w.url(fmt.Sprintf("/v2/%s/blobs/%s", w.repo.RepositoryStr(), h.String()))
+
+ req, err := http.NewRequest(http.MethodHead, u.String(), nil)
+ if err != nil {
+ return false, err
+ }
+
+ resp, err := w.client.Do(req.WithContext(ctx))
+ if err != nil {
+ return false, err
+ }
+ defer resp.Body.Close()
+
+ if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil {
+ return false, err
+ }
+
+ return resp.StatusCode == http.StatusOK, nil
+}
+
+// checkExistingManifest checks if a manifest exists already in the repository
+// by making a HEAD request to the manifest API.
+func (w *writer) checkExistingManifest(ctx context.Context, h v1.Hash, mt types.MediaType) (bool, error) {
+ u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), h.String()))
+
+ req, err := http.NewRequest(http.MethodHead, u.String(), nil)
+ if err != nil {
+ return false, err
+ }
+ req.Header.Set("Accept", string(mt))
+
+ resp, err := w.client.Do(req.WithContext(ctx))
+ if err != nil {
+ return false, err
+ }
+ defer resp.Body.Close()
+
+ if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil {
+ return false, err
+ }
+
+ return resp.StatusCode == http.StatusOK, nil
+}
+
+// initiateUpload initiates the blob upload, which starts with a POST that can
+// optionally include the hash of the layer and a list of repositories from
+// which that layer might be read. On failure, an error is returned.
+// On success, the layer was either mounted (nothing more to do) or a blob
+// upload was initiated and the body of that blob should be sent to the returned
+// location.
+func (w *writer) initiateUpload(ctx context.Context, from, mount, origin string) (location string, mounted bool, err error) {
+ u := w.url(fmt.Sprintf("/v2/%s/blobs/uploads/", w.repo.RepositoryStr()))
+ uv := url.Values{}
+ if mount != "" && from != "" {
+ // Quay will fail if we specify a "mount" without a "from".
+ uv.Set("mount", mount)
+ uv.Set("from", from)
+ if origin != "" {
+ uv.Set("origin", origin)
+ }
+ }
+ u.RawQuery = uv.Encode()
+
+ // Make the request to initiate the blob upload.
+ req, err := http.NewRequest(http.MethodPost, u.String(), nil)
+ if err != nil {
+ return "", false, err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ resp, err := w.client.Do(req.WithContext(ctx))
+ if err != nil {
+ return "", false, err
+ }
+ defer resp.Body.Close()
+
+ if err := transport.CheckError(resp, http.StatusCreated, http.StatusAccepted); err != nil {
+ if origin != "" && origin != w.repo.RegistryStr() {
+ // https://github.com/google/go-containerregistry/issues/1404
+ logs.Warn.Printf("retrying without mount: %v", err)
+ return w.initiateUpload(ctx, "", "", "")
+ }
+ return "", false, err
+ }
+
+ // Check the response code to determine the result.
+ switch resp.StatusCode {
+ case http.StatusCreated:
+ // We're done, we were able to fast-path.
+ return "", true, nil
+ case http.StatusAccepted:
+ // Proceed to PATCH, upload has begun.
+ loc, err := w.nextLocation(resp)
+ return loc, false, err
+ default:
+ panic("Unreachable: initiateUpload")
+ }
+}
+
+// streamBlob streams the contents of the blob to the specified location.
+// On failure, this will return an error. On success, this will return the location
+// header indicating how to commit the streamed blob.
+func (w *writer) streamBlob(ctx context.Context, layer v1.Layer, streamLocation string) (commitLocation string, rerr error) {
+ reset := func() {}
+ defer func() {
+ if rerr != nil {
+ reset()
+ }
+ }()
+ blob, err := layer.Compressed()
+ if err != nil {
+ return "", err
+ }
+
+ getBody := layer.Compressed
+ if w.progress != nil {
+ var count int64
+ blob = &progressReader{rc: blob, progress: w.progress, count: &count}
+ getBody = func() (io.ReadCloser, error) {
+ blob, err := layer.Compressed()
+ if err != nil {
+ return nil, err
+ }
+ return &progressReader{rc: blob, progress: w.progress, count: &count}, nil
+ }
+ reset = func() {
+ w.progress.complete(-count)
+ }
+ }
+
+ req, err := http.NewRequest(http.MethodPatch, streamLocation, blob)
+ if err != nil {
+ return "", err
+ }
+ if _, ok := layer.(*stream.Layer); !ok {
+ // We can't retry streaming layers.
+ req.GetBody = getBody
+ }
+ req.Header.Set("Content-Type", "application/octet-stream")
+
+ resp, err := w.client.Do(req.WithContext(ctx))
+ if err != nil {
+ return "", err
+ }
+ defer resp.Body.Close()
+
+ if err := transport.CheckError(resp, http.StatusNoContent, http.StatusAccepted, http.StatusCreated); err != nil {
+ return "", err
+ }
+
+ // The blob has been uploaded, return the location header indicating
+ // how to commit this layer.
+ return w.nextLocation(resp)
+}
+
+// commitBlob commits this blob by sending a PUT to the location returned from
+// streaming the blob.
+func (w *writer) commitBlob(ctx context.Context, location, digest string) error {
+ u, err := url.Parse(location)
+ if err != nil {
+ return err
+ }
+ v := u.Query()
+ v.Set("digest", digest)
+ u.RawQuery = v.Encode()
+
+ req, err := http.NewRequest(http.MethodPut, u.String(), nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", "application/octet-stream")
+
+ resp, err := w.client.Do(req.WithContext(ctx))
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ return transport.CheckError(resp, http.StatusCreated)
+}
+
+// incrProgress increments and sends a progress update, if WithProgress is used.
+func (w *writer) incrProgress(written int64) {
+ if w.progress == nil {
+ return
+ }
+ w.progress.complete(written)
+}
+
+// uploadOne performs a complete upload of a single layer.
+func (w *writer) uploadOne(ctx context.Context, l v1.Layer) error {
+ tryUpload := func() error {
+ ctx := retry.Never(ctx)
+ var from, mount, origin string
+ if h, err := l.Digest(); err == nil {
+ // If we know the digest, this isn't a streaming layer. Do an existence
+ // check so we can skip uploading the layer if possible.
+ existing, err := w.checkExistingBlob(ctx, h)
+ if err != nil {
+ return err
+ }
+ if existing {
+ size, err := l.Size()
+ if err != nil {
+ return err
+ }
+ w.incrProgress(size)
+ logs.Progress.Printf("existing blob: %v", h)
+ return nil
+ }
+
+ mount = h.String()
+ }
+ if ml, ok := l.(*MountableLayer); ok {
+ from = ml.Reference.Context().RepositoryStr()
+ origin = ml.Reference.Context().RegistryStr()
+ }
+
+ location, mounted, err := w.initiateUpload(ctx, from, mount, origin)
+ if err != nil {
+ return err
+ } else if mounted {
+ size, err := l.Size()
+ if err != nil {
+ return err
+ }
+ w.incrProgress(size)
+ h, err := l.Digest()
+ if err != nil {
+ return err
+ }
+ logs.Progress.Printf("mounted blob: %s", h.String())
+ return nil
+ }
+
+ // Only log layers with +json or +yaml. We can let through other stuff if it becomes popular.
+ // TODO(opencontainers/image-spec#791): Would be great to have an actual parser.
+ mt, err := l.MediaType()
+ if err != nil {
+ return err
+ }
+ smt := string(mt)
+ if !(strings.HasSuffix(smt, "+json") || strings.HasSuffix(smt, "+yaml")) {
+ ctx = redact.NewContext(ctx, "omitting binary blobs from logs")
+ }
+
+ location, err = w.streamBlob(ctx, l, location)
+ if err != nil {
+ return err
+ }
+
+ h, err := l.Digest()
+ if err != nil {
+ return err
+ }
+ digest := h.String()
+
+ if err := w.commitBlob(ctx, location, digest); err != nil {
+ return err
+ }
+ logs.Progress.Printf("pushed blob: %s", digest)
+ return nil
+ }
+
+ return retry.Retry(tryUpload, w.predicate, w.backoff)
+}
+
+type withLayer interface {
+ Layer(v1.Hash) (v1.Layer, error)
+}
+
+func (w *writer) writeIndex(ctx context.Context, ref name.Reference, ii v1.ImageIndex, options ...Option) error {
+ index, err := ii.IndexManifest()
+ if err != nil {
+ return err
+ }
+
+ o, err := makeOptions(ref.Context(), options...)
+ if err != nil {
+ return err
+ }
+
+ // TODO(#803): Pipe through remote.WithJobs and upload these in parallel.
+ for _, desc := range index.Manifests {
+ ref := ref.Context().Digest(desc.Digest.String())
+ exists, err := w.checkExistingManifest(ctx, desc.Digest, desc.MediaType)
+ if err != nil {
+ return err
+ }
+ if exists {
+ logs.Progress.Print("existing manifest: ", desc.Digest)
+ continue
+ }
+
+ switch desc.MediaType {
+ case types.OCIImageIndex, types.DockerManifestList:
+ ii, err := ii.ImageIndex(desc.Digest)
+ if err != nil {
+ return err
+ }
+ if err := w.writeIndex(ctx, ref, ii, options...); err != nil {
+ return err
+ }
+ case types.OCIManifestSchema1, types.DockerManifestSchema2:
+ img, err := ii.Image(desc.Digest)
+ if err != nil {
+ return err
+ }
+ if err := writeImage(ctx, ref, img, o, w.progress); err != nil {
+ return err
+ }
+ default:
+ // Workaround for #819.
+ if wl, ok := ii.(withLayer); ok {
+ layer, err := wl.Layer(desc.Digest)
+ if err != nil {
+ return err
+ }
+ if err := w.uploadOne(ctx, layer); err != nil {
+ return err
+ }
+ }
+ }
+ }
+
+ // With all of the constituent elements uploaded, upload the manifest
+ // to commit the image.
+ return w.commitManifest(ctx, ii, ref)
+}
+
+type withMediaType interface {
+ MediaType() (types.MediaType, error)
+}
+
+// This is really silly, but go interfaces don't let me satisfy remote.Taggable
+// with remote.Descriptor because of name collisions between method names and
+// struct fields.
+//
+// Use reflection to either pull the v1.Descriptor out of remote.Descriptor or
+// create a descriptor based on the RawManifest and (optionally) MediaType.
+func unpackTaggable(t Taggable) ([]byte, *v1.Descriptor, error) {
+ if d, ok := t.(*Descriptor); ok {
+ return d.Manifest, &d.Descriptor, nil
+ }
+ b, err := t.RawManifest()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // A reasonable default if Taggable doesn't implement MediaType.
+ mt := types.DockerManifestSchema2
+
+ if wmt, ok := t.(withMediaType); ok {
+ m, err := wmt.MediaType()
+ if err != nil {
+ return nil, nil, err
+ }
+ mt = m
+ }
+
+ h, sz, err := v1.SHA256(bytes.NewReader(b))
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return b, &v1.Descriptor{
+ MediaType: mt,
+ Size: sz,
+ Digest: h,
+ }, nil
+}
+
+// commitSubjectReferrers is responsible for updating the fallback tag manifest to track descriptors referring to a subject for registries that don't yet support the Referrers API.
+// TODO: use conditional requests to avoid race conditions
+func (w *writer) commitSubjectReferrers(ctx context.Context, sub name.Digest, add v1.Descriptor) error {
+ // Check if the registry supports Referrers API.
+ // TODO: This should be done once per registry, not once per subject.
+ u := w.url(fmt.Sprintf("/v2/%s/referrers/%s", w.repo.RepositoryStr(), sub.DigestStr()))
+ req, err := http.NewRequest(http.MethodGet, u.String(), nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Accept", string(types.OCIImageIndex))
+ resp, err := w.client.Do(req.WithContext(ctx))
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound, http.StatusBadRequest); err != nil {
+ return err
+ }
+ if resp.StatusCode == http.StatusOK {
+ // The registry supports Referrers API. The registry is responsible for updating the referrers list.
+ return nil
+ }
+
+ // The registry doesn't support Referrers API, we need to update the manifest tagged with the fallback tag.
+ // Make the request to GET the current manifest.
+ t := fallbackTag(sub)
+ u = w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), t.Identifier()))
+ req, err = http.NewRequest(http.MethodGet, u.String(), nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Accept", string(types.OCIImageIndex))
+ resp, err = w.client.Do(req.WithContext(ctx))
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ var im v1.IndexManifest
+ if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil {
+ return err
+ } else if resp.StatusCode == http.StatusNotFound {
+ // Not found just means there are no attachments. Start with an empty index.
+ im = v1.IndexManifest{
+ SchemaVersion: 2,
+ MediaType: types.OCIImageIndex,
+ Manifests: []v1.Descriptor{add},
+ }
+ } else {
+ if err := json.NewDecoder(resp.Body).Decode(&im); err != nil {
+ return err
+ }
+ if im.SchemaVersion != 2 {
+ return fmt.Errorf("fallback tag manifest is not a schema version 2: %d", im.SchemaVersion)
+ }
+ if im.MediaType != types.OCIImageIndex {
+ return fmt.Errorf("fallback tag manifest is not an OCI image index: %s", im.MediaType)
+ }
+ for _, desc := range im.Manifests {
+ if desc.Digest == add.Digest {
+ // The digest is already attached, nothing to do.
+ logs.Progress.Printf("fallback tag %s already had referrer", t.Identifier())
+ return nil
+ }
+ }
+ // Append the new descriptor to the index.
+ im.Manifests = append(im.Manifests, add)
+ }
+
+ // Sort the manifests for reproducibility.
+ sort.Slice(im.Manifests, func(i, j int) bool {
+ return im.Manifests[i].Digest.String() < im.Manifests[j].Digest.String()
+ })
+ logs.Progress.Printf("updating fallback tag %s with new referrer", t.Identifier())
+ if err := w.commitManifest(ctx, fallbackTaggable{im}, t); err != nil {
+ return err
+ }
+ return nil
+}
+
+type fallbackTaggable struct {
+ im v1.IndexManifest
+}
+
+func (f fallbackTaggable) RawManifest() ([]byte, error) { return json.Marshal(f.im) }
+func (f fallbackTaggable) MediaType() (types.MediaType, error) { return types.OCIImageIndex, nil }
+
+// commitManifest does a PUT of the image's manifest.
+func (w *writer) commitManifest(ctx context.Context, t Taggable, ref name.Reference) error {
+ // If the manifest refers to a subject, we need to check whether we need to update the fallback tag manifest.
+ raw, err := t.RawManifest()
+ if err != nil {
+ return err
+ }
+ var mf struct {
+ MediaType types.MediaType `json:"mediaType"`
+ Subject *v1.Descriptor `json:"subject,omitempty"`
+ Config struct {
+ MediaType types.MediaType `json:"mediaType"`
+ } `json:"config"`
+ }
+ if err := json.Unmarshal(raw, &mf); err != nil {
+ return err
+ }
+
+ tryUpload := func() error {
+ ctx := retry.Never(ctx)
+ raw, desc, err := unpackTaggable(t)
+ if err != nil {
+ return err
+ }
+
+ u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), ref.Identifier()))
+
+ // Make the request to PUT the serialized manifest
+ req, err := http.NewRequest(http.MethodPut, u.String(), bytes.NewBuffer(raw))
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", string(desc.MediaType))
+
+ resp, err := w.client.Do(req.WithContext(ctx))
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ if err := transport.CheckError(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted); err != nil {
+ return err
+ }
+
+ // If the manifest referred to a subject, we may need to update the fallback tag manifest.
+ // TODO: If this fails, we'll retry the whole upload. We should retry just this part.
+ if mf.Subject != nil {
+ h, size, err := v1.SHA256(bytes.NewReader(raw))
+ if err != nil {
+ return err
+ }
+ desc := v1.Descriptor{
+ ArtifactType: string(mf.Config.MediaType),
+ MediaType: mf.MediaType,
+ Digest: h,
+ Size: size,
+ }
+ if err := w.commitSubjectReferrers(ctx,
+ ref.Context().Digest(mf.Subject.Digest.String()),
+ desc); err != nil {
+ return err
+ }
+ }
+
+ // The image was successfully pushed!
+ logs.Progress.Printf("%v: digest: %v size: %d", ref, desc.Digest, desc.Size)
+ w.incrProgress(int64(len(raw)))
+ return nil
+ }
+
+ return retry.Retry(tryUpload, w.predicate, w.backoff)
+}
+
+func scopesForUploadingImage(repo name.Repository, layers []v1.Layer) []string {
+ // use a map as set to remove duplicates scope strings
+ scopeSet := map[string]struct{}{}
+
+ for _, l := range layers {
+ if ml, ok := l.(*MountableLayer); ok {
+ // we will add push scope for ref.Context() after the loop.
+ // for now we ask pull scope for references of the same registry
+ if ml.Reference.Context().String() != repo.String() && ml.Reference.Context().Registry.String() == repo.Registry.String() {
+ scopeSet[ml.Reference.Scope(transport.PullScope)] = struct{}{}
+ }
+ }
+ }
+
+ scopes := make([]string, 0)
+ // Push scope should be the first element because a few registries just look at the first scope to determine access.
+ scopes = append(scopes, repo.Scope(transport.PushScope))
+
+ for scope := range scopeSet {
+ scopes = append(scopes, scope)
+ }
+
+ return scopes
+}
+
+// WriteIndex pushes the provided ImageIndex to the specified image reference.
+// WriteIndex will attempt to push all of the referenced manifests before
+// attempting to push the ImageIndex, to retain referential integrity.
+func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) (rerr error) {
+ o, err := makeOptions(ref.Context(), options...)
+ if err != nil {
+ return err
+ }
+
+ scopes := []string{ref.Scope(transport.PushScope)}
+ tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes)
+ if err != nil {
+ return err
+ }
+ w := writer{
+ repo: ref.Context(),
+ client: &http.Client{Transport: tr},
+ backoff: o.retryBackoff,
+ predicate: o.retryPredicate,
+ }
+
+ if o.updates != nil {
+ w.progress = &progress{updates: o.updates}
+ w.progress.lastUpdate = &v1.Update{}
+
+ defer close(o.updates)
+ defer func() { w.progress.err(rerr) }()
+
+ w.progress.lastUpdate.Total, err = countIndex(ii, o.allowNondistributableArtifacts)
+ if err != nil {
+ return err
+ }
+ }
+
+ return w.writeIndex(o.context, ref, ii, options...)
+}
+
+// countImage counts the total size of all layers + config blob + manifest for
+// an image. It de-dupes duplicate layers.
+func countImage(img v1.Image, allowNondistributableArtifacts bool) (int64, error) {
+ var total int64
+ ls, err := img.Layers()
+ if err != nil {
+ return 0, err
+ }
+ seen := map[v1.Hash]bool{}
+ for _, l := range ls {
+ // Handle foreign layers.
+ mt, err := l.MediaType()
+ if err != nil {
+ return 0, err
+ }
+ if !mt.IsDistributable() && !allowNondistributableArtifacts {
+ continue
+ }
+
+ // TODO: support streaming layers which update the total count as they write.
+ if _, ok := l.(*stream.Layer); ok {
+ return 0, errors.New("cannot use stream.Layer and WithProgress")
+ }
+
+ // Dedupe layers.
+ d, err := l.Digest()
+ if err != nil {
+ return 0, err
+ }
+ if seen[d] {
+ continue
+ }
+ seen[d] = true
+
+ size, err := l.Size()
+ if err != nil {
+ return 0, err
+ }
+ total += size
+ }
+ b, err := img.RawConfigFile()
+ if err != nil {
+ return 0, err
+ }
+ total += int64(len(b))
+ size, err := img.Size()
+ if err != nil {
+ return 0, err
+ }
+ total += size
+ return total, nil
+}
+
+// countIndex counts the total size of all images + sub-indexes for an index.
+// It does not attempt to de-dupe duplicate images, etc.
+func countIndex(idx v1.ImageIndex, allowNondistributableArtifacts bool) (int64, error) {
+ var total int64
+ mf, err := idx.IndexManifest()
+ if err != nil {
+ return 0, err
+ }
+
+ for _, desc := range mf.Manifests {
+ switch desc.MediaType {
+ case types.OCIImageIndex, types.DockerManifestList:
+ sidx, err := idx.ImageIndex(desc.Digest)
+ if err != nil {
+ return 0, err
+ }
+ size, err := countIndex(sidx, allowNondistributableArtifacts)
+ if err != nil {
+ return 0, err
+ }
+ total += size
+ case types.OCIManifestSchema1, types.DockerManifestSchema2:
+ simg, err := idx.Image(desc.Digest)
+ if err != nil {
+ return 0, err
+ }
+ size, err := countImage(simg, allowNondistributableArtifacts)
+ if err != nil {
+ return 0, err
+ }
+ total += size
+ default:
+ // Workaround for #819.
+ if wl, ok := idx.(withLayer); ok {
+ layer, err := wl.Layer(desc.Digest)
+ if err != nil {
+ return 0, err
+ }
+ size, err := layer.Size()
+ if err != nil {
+ return 0, err
+ }
+ total += size
+ }
+ }
+ }
+
+ size, err := idx.Size()
+ if err != nil {
+ return 0, err
+ }
+ total += size
+ return total, nil
+}
+
+// WriteLayer uploads the provided Layer to the specified repo.
+func WriteLayer(repo name.Repository, layer v1.Layer, options ...Option) (rerr error) {
+ o, err := makeOptions(repo, options...)
+ if err != nil {
+ return err
+ }
+ scopes := scopesForUploadingImage(repo, []v1.Layer{layer})
+ tr, err := transport.NewWithContext(o.context, repo.Registry, o.auth, o.transport, scopes)
+ if err != nil {
+ return err
+ }
+ w := writer{
+ repo: repo,
+ client: &http.Client{Transport: tr},
+ backoff: o.retryBackoff,
+ predicate: o.retryPredicate,
+ }
+
+ if o.updates != nil {
+ w.progress = &progress{updates: o.updates}
+ w.progress.lastUpdate = &v1.Update{}
+
+ defer close(o.updates)
+ defer func() { w.progress.err(rerr) }()
+
+ // TODO: support streaming layers which update the total count as they write.
+ if _, ok := layer.(*stream.Layer); ok {
+ return errors.New("cannot use stream.Layer and WithProgress")
+ }
+ size, err := layer.Size()
+ if err != nil {
+ return err
+ }
+ w.progress.total(size)
+ }
+ return w.uploadOne(o.context, layer)
+}
+
+// Tag adds a tag to the given Taggable via PUT /v2/.../manifests/<tag>
+//
+// Notable implementations of Taggable are v1.Image, v1.ImageIndex, and
+// remote.Descriptor.
+//
+// If t implements MediaType, we will use that for the Content-Type, otherwise
+// we will default to types.DockerManifestSchema2.
+//
+// Tag does not attempt to write anything other than the manifest, so callers
+// should ensure that all blobs or manifests that are referenced by t exist
+// in the target registry.
+func Tag(tag name.Tag, t Taggable, options ...Option) error {
+ return Put(tag, t, options...)
+}
+
+// Put adds a manifest from the given Taggable via PUT /v1/.../manifest/<ref>
+//
+// Notable implementations of Taggable are v1.Image, v1.ImageIndex, and
+// remote.Descriptor.
+//
+// If t implements MediaType, we will use that for the Content-Type, otherwise
+// we will default to types.DockerManifestSchema2.
+//
+// Put does not attempt to write anything other than the manifest, so callers
+// should ensure that all blobs or manifests that are referenced by t exist
+// in the target registry.
+func Put(ref name.Reference, t Taggable, options ...Option) error {
+ o, err := makeOptions(ref.Context(), options...)
+ if err != nil {
+ return err
+ }
+ scopes := []string{ref.Scope(transport.PushScope)}
+
+ // TODO: This *always* does a token exchange. For some registries,
+ // that's pretty slow. Some ideas;
+ // * Tag could take a list of tags.
+ // * Allow callers to pass in a transport.Transport, typecheck
+ // it to allow them to reuse the transport across multiple calls.
+ // * WithTag option to do multiple manifest PUTs in commitManifest.
+ tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes)
+ if err != nil {
+ return err
+ }
+ w := writer{
+ repo: ref.Context(),
+ client: &http.Client{Transport: tr},
+ backoff: o.retryBackoff,
+ predicate: o.retryPredicate,
+ }
+
+ return w.commitManifest(o.context, t, ref)
+}