diff options
Diffstat (limited to 'pkg/v1/remote/write.go')
-rw-r--r-- | pkg/v1/remote/write.go | 1003 |
1 files changed, 1003 insertions, 0 deletions
diff --git a/pkg/v1/remote/write.go b/pkg/v1/remote/write.go new file mode 100644 index 0000000..5dbaa7c --- /dev/null +++ b/pkg/v1/remote/write.go @@ -0,0 +1,1003 @@ +// Copyright 2018 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "sort" + "strings" + + "github.com/google/go-containerregistry/internal/redact" + "github.com/google/go-containerregistry/internal/retry" + "github.com/google/go-containerregistry/pkg/logs" + "github.com/google/go-containerregistry/pkg/name" + v1 "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/partial" + "github.com/google/go-containerregistry/pkg/v1/remote/transport" + "github.com/google/go-containerregistry/pkg/v1/stream" + "github.com/google/go-containerregistry/pkg/v1/types" + "golang.org/x/sync/errgroup" +) + +// Taggable is an interface that enables a manifest PUT (e.g. for tagging). +type Taggable interface { + RawManifest() ([]byte, error) +} + +// Write pushes the provided img to the specified image reference. +func Write(ref name.Reference, img v1.Image, options ...Option) (rerr error) { + o, err := makeOptions(ref.Context(), options...) + if err != nil { + return err + } + + var p *progress + if o.updates != nil { + p = &progress{updates: o.updates} + p.lastUpdate = &v1.Update{} + p.lastUpdate.Total, err = countImage(img, o.allowNondistributableArtifacts) + if err != nil { + return err + } + defer close(o.updates) + defer func() { _ = p.err(rerr) }() + } + return writeImage(o.context, ref, img, o, p) +} + +func writeImage(ctx context.Context, ref name.Reference, img v1.Image, o *options, progress *progress) error { + ls, err := img.Layers() + if err != nil { + return err + } + scopes := scopesForUploadingImage(ref.Context(), ls) + tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes) + if err != nil { + return err + } + w := writer{ + repo: ref.Context(), + client: &http.Client{Transport: tr}, + progress: progress, + backoff: o.retryBackoff, + predicate: o.retryPredicate, + } + + // Upload individual blobs and collect any errors. + blobChan := make(chan v1.Layer, 2*o.jobs) + g, gctx := errgroup.WithContext(ctx) + for i := 0; i < o.jobs; i++ { + // Start N workers consuming blobs to upload. + g.Go(func() error { + for b := range blobChan { + if err := w.uploadOne(gctx, b); err != nil { + return err + } + } + return nil + }) + } + + // Upload individual layers in goroutines and collect any errors. + // If we can dedupe by the layer digest, try to do so. If we can't determine + // the digest for whatever reason, we can't dedupe and might re-upload. + g.Go(func() error { + defer close(blobChan) + uploaded := map[v1.Hash]bool{} + for _, l := range ls { + l := l + + // Handle foreign layers. + mt, err := l.MediaType() + if err != nil { + return err + } + if !mt.IsDistributable() && !o.allowNondistributableArtifacts { + continue + } + + // Streaming layers calculate their digests while uploading them. Assume + // an error here indicates we need to upload the layer. + h, err := l.Digest() + if err == nil { + // If we can determine the layer's digest ahead of + // time, use it to dedupe uploads. + if uploaded[h] { + continue // Already uploading. + } + uploaded[h] = true + } + select { + case blobChan <- l: + case <-gctx.Done(): + return gctx.Err() + } + } + return nil + }) + + if l, err := partial.ConfigLayer(img); err != nil { + // We can't read the ConfigLayer, possibly because of streaming layers, + // since the layer DiffIDs haven't been calculated yet. Attempt to wait + // for the other layers to be uploaded, then try the config again. + if err := g.Wait(); err != nil { + return err + } + + // Now that all the layers are uploaded, try to upload the config file blob. + l, err := partial.ConfigLayer(img) + if err != nil { + return err + } + if err := w.uploadOne(ctx, l); err != nil { + return err + } + } else { + // We *can* read the ConfigLayer, so upload it concurrently with the layers. + g.Go(func() error { + return w.uploadOne(gctx, l) + }) + + // Wait for the layers + config. + if err := g.Wait(); err != nil { + return err + } + } + + // With all of the constituent elements uploaded, upload the manifest + // to commit the image. + return w.commitManifest(ctx, img, ref) +} + +// writer writes the elements of an image to a remote image reference. +type writer struct { + repo name.Repository + client *http.Client + + progress *progress + backoff Backoff + predicate retry.Predicate +} + +// url returns a url.Url for the specified path in the context of this remote image reference. +func (w *writer) url(path string) url.URL { + return url.URL{ + Scheme: w.repo.Registry.Scheme(), + Host: w.repo.RegistryStr(), + Path: path, + } +} + +// nextLocation extracts the fully-qualified URL to which we should send the next request in an upload sequence. +func (w *writer) nextLocation(resp *http.Response) (string, error) { + loc := resp.Header.Get("Location") + if len(loc) == 0 { + return "", errors.New("missing Location header") + } + u, err := url.Parse(loc) + if err != nil { + return "", err + } + + // If the location header returned is just a url path, then fully qualify it. + // We cannot simply call w.url, since there might be an embedded query string. + return resp.Request.URL.ResolveReference(u).String(), nil +} + +// checkExistingBlob checks if a blob exists already in the repository by making a +// HEAD request to the blob store API. GCR performs an existence check on the +// initiation if "mount" is specified, even if no "from" sources are specified. +// However, this is not broadly applicable to all registries, e.g. ECR. +func (w *writer) checkExistingBlob(ctx context.Context, h v1.Hash) (bool, error) { + u := w.url(fmt.Sprintf("/v2/%s/blobs/%s", w.repo.RepositoryStr(), h.String())) + + req, err := http.NewRequest(http.MethodHead, u.String(), nil) + if err != nil { + return false, err + } + + resp, err := w.client.Do(req.WithContext(ctx)) + if err != nil { + return false, err + } + defer resp.Body.Close() + + if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil { + return false, err + } + + return resp.StatusCode == http.StatusOK, nil +} + +// checkExistingManifest checks if a manifest exists already in the repository +// by making a HEAD request to the manifest API. +func (w *writer) checkExistingManifest(ctx context.Context, h v1.Hash, mt types.MediaType) (bool, error) { + u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), h.String())) + + req, err := http.NewRequest(http.MethodHead, u.String(), nil) + if err != nil { + return false, err + } + req.Header.Set("Accept", string(mt)) + + resp, err := w.client.Do(req.WithContext(ctx)) + if err != nil { + return false, err + } + defer resp.Body.Close() + + if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil { + return false, err + } + + return resp.StatusCode == http.StatusOK, nil +} + +// initiateUpload initiates the blob upload, which starts with a POST that can +// optionally include the hash of the layer and a list of repositories from +// which that layer might be read. On failure, an error is returned. +// On success, the layer was either mounted (nothing more to do) or a blob +// upload was initiated and the body of that blob should be sent to the returned +// location. +func (w *writer) initiateUpload(ctx context.Context, from, mount, origin string) (location string, mounted bool, err error) { + u := w.url(fmt.Sprintf("/v2/%s/blobs/uploads/", w.repo.RepositoryStr())) + uv := url.Values{} + if mount != "" && from != "" { + // Quay will fail if we specify a "mount" without a "from". + uv.Set("mount", mount) + uv.Set("from", from) + if origin != "" { + uv.Set("origin", origin) + } + } + u.RawQuery = uv.Encode() + + // Make the request to initiate the blob upload. + req, err := http.NewRequest(http.MethodPost, u.String(), nil) + if err != nil { + return "", false, err + } + req.Header.Set("Content-Type", "application/json") + resp, err := w.client.Do(req.WithContext(ctx)) + if err != nil { + return "", false, err + } + defer resp.Body.Close() + + if err := transport.CheckError(resp, http.StatusCreated, http.StatusAccepted); err != nil { + if origin != "" && origin != w.repo.RegistryStr() { + // https://github.com/google/go-containerregistry/issues/1404 + logs.Warn.Printf("retrying without mount: %v", err) + return w.initiateUpload(ctx, "", "", "") + } + return "", false, err + } + + // Check the response code to determine the result. + switch resp.StatusCode { + case http.StatusCreated: + // We're done, we were able to fast-path. + return "", true, nil + case http.StatusAccepted: + // Proceed to PATCH, upload has begun. + loc, err := w.nextLocation(resp) + return loc, false, err + default: + panic("Unreachable: initiateUpload") + } +} + +// streamBlob streams the contents of the blob to the specified location. +// On failure, this will return an error. On success, this will return the location +// header indicating how to commit the streamed blob. +func (w *writer) streamBlob(ctx context.Context, layer v1.Layer, streamLocation string) (commitLocation string, rerr error) { + reset := func() {} + defer func() { + if rerr != nil { + reset() + } + }() + blob, err := layer.Compressed() + if err != nil { + return "", err + } + + getBody := layer.Compressed + if w.progress != nil { + var count int64 + blob = &progressReader{rc: blob, progress: w.progress, count: &count} + getBody = func() (io.ReadCloser, error) { + blob, err := layer.Compressed() + if err != nil { + return nil, err + } + return &progressReader{rc: blob, progress: w.progress, count: &count}, nil + } + reset = func() { + w.progress.complete(-count) + } + } + + req, err := http.NewRequest(http.MethodPatch, streamLocation, blob) + if err != nil { + return "", err + } + if _, ok := layer.(*stream.Layer); !ok { + // We can't retry streaming layers. + req.GetBody = getBody + } + req.Header.Set("Content-Type", "application/octet-stream") + + resp, err := w.client.Do(req.WithContext(ctx)) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if err := transport.CheckError(resp, http.StatusNoContent, http.StatusAccepted, http.StatusCreated); err != nil { + return "", err + } + + // The blob has been uploaded, return the location header indicating + // how to commit this layer. + return w.nextLocation(resp) +} + +// commitBlob commits this blob by sending a PUT to the location returned from +// streaming the blob. +func (w *writer) commitBlob(ctx context.Context, location, digest string) error { + u, err := url.Parse(location) + if err != nil { + return err + } + v := u.Query() + v.Set("digest", digest) + u.RawQuery = v.Encode() + + req, err := http.NewRequest(http.MethodPut, u.String(), nil) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/octet-stream") + + resp, err := w.client.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + + return transport.CheckError(resp, http.StatusCreated) +} + +// incrProgress increments and sends a progress update, if WithProgress is used. +func (w *writer) incrProgress(written int64) { + if w.progress == nil { + return + } + w.progress.complete(written) +} + +// uploadOne performs a complete upload of a single layer. +func (w *writer) uploadOne(ctx context.Context, l v1.Layer) error { + tryUpload := func() error { + ctx := retry.Never(ctx) + var from, mount, origin string + if h, err := l.Digest(); err == nil { + // If we know the digest, this isn't a streaming layer. Do an existence + // check so we can skip uploading the layer if possible. + existing, err := w.checkExistingBlob(ctx, h) + if err != nil { + return err + } + if existing { + size, err := l.Size() + if err != nil { + return err + } + w.incrProgress(size) + logs.Progress.Printf("existing blob: %v", h) + return nil + } + + mount = h.String() + } + if ml, ok := l.(*MountableLayer); ok { + from = ml.Reference.Context().RepositoryStr() + origin = ml.Reference.Context().RegistryStr() + } + + location, mounted, err := w.initiateUpload(ctx, from, mount, origin) + if err != nil { + return err + } else if mounted { + size, err := l.Size() + if err != nil { + return err + } + w.incrProgress(size) + h, err := l.Digest() + if err != nil { + return err + } + logs.Progress.Printf("mounted blob: %s", h.String()) + return nil + } + + // Only log layers with +json or +yaml. We can let through other stuff if it becomes popular. + // TODO(opencontainers/image-spec#791): Would be great to have an actual parser. + mt, err := l.MediaType() + if err != nil { + return err + } + smt := string(mt) + if !(strings.HasSuffix(smt, "+json") || strings.HasSuffix(smt, "+yaml")) { + ctx = redact.NewContext(ctx, "omitting binary blobs from logs") + } + + location, err = w.streamBlob(ctx, l, location) + if err != nil { + return err + } + + h, err := l.Digest() + if err != nil { + return err + } + digest := h.String() + + if err := w.commitBlob(ctx, location, digest); err != nil { + return err + } + logs.Progress.Printf("pushed blob: %s", digest) + return nil + } + + return retry.Retry(tryUpload, w.predicate, w.backoff) +} + +type withLayer interface { + Layer(v1.Hash) (v1.Layer, error) +} + +func (w *writer) writeIndex(ctx context.Context, ref name.Reference, ii v1.ImageIndex, options ...Option) error { + index, err := ii.IndexManifest() + if err != nil { + return err + } + + o, err := makeOptions(ref.Context(), options...) + if err != nil { + return err + } + + // TODO(#803): Pipe through remote.WithJobs and upload these in parallel. + for _, desc := range index.Manifests { + ref := ref.Context().Digest(desc.Digest.String()) + exists, err := w.checkExistingManifest(ctx, desc.Digest, desc.MediaType) + if err != nil { + return err + } + if exists { + logs.Progress.Print("existing manifest: ", desc.Digest) + continue + } + + switch desc.MediaType { + case types.OCIImageIndex, types.DockerManifestList: + ii, err := ii.ImageIndex(desc.Digest) + if err != nil { + return err + } + if err := w.writeIndex(ctx, ref, ii, options...); err != nil { + return err + } + case types.OCIManifestSchema1, types.DockerManifestSchema2: + img, err := ii.Image(desc.Digest) + if err != nil { + return err + } + if err := writeImage(ctx, ref, img, o, w.progress); err != nil { + return err + } + default: + // Workaround for #819. + if wl, ok := ii.(withLayer); ok { + layer, err := wl.Layer(desc.Digest) + if err != nil { + return err + } + if err := w.uploadOne(ctx, layer); err != nil { + return err + } + } + } + } + + // With all of the constituent elements uploaded, upload the manifest + // to commit the image. + return w.commitManifest(ctx, ii, ref) +} + +type withMediaType interface { + MediaType() (types.MediaType, error) +} + +// This is really silly, but go interfaces don't let me satisfy remote.Taggable +// with remote.Descriptor because of name collisions between method names and +// struct fields. +// +// Use reflection to either pull the v1.Descriptor out of remote.Descriptor or +// create a descriptor based on the RawManifest and (optionally) MediaType. +func unpackTaggable(t Taggable) ([]byte, *v1.Descriptor, error) { + if d, ok := t.(*Descriptor); ok { + return d.Manifest, &d.Descriptor, nil + } + b, err := t.RawManifest() + if err != nil { + return nil, nil, err + } + + // A reasonable default if Taggable doesn't implement MediaType. + mt := types.DockerManifestSchema2 + + if wmt, ok := t.(withMediaType); ok { + m, err := wmt.MediaType() + if err != nil { + return nil, nil, err + } + mt = m + } + + h, sz, err := v1.SHA256(bytes.NewReader(b)) + if err != nil { + return nil, nil, err + } + + return b, &v1.Descriptor{ + MediaType: mt, + Size: sz, + Digest: h, + }, nil +} + +// commitSubjectReferrers is responsible for updating the fallback tag manifest to track descriptors referring to a subject for registries that don't yet support the Referrers API. +// TODO: use conditional requests to avoid race conditions +func (w *writer) commitSubjectReferrers(ctx context.Context, sub name.Digest, add v1.Descriptor) error { + // Check if the registry supports Referrers API. + // TODO: This should be done once per registry, not once per subject. + u := w.url(fmt.Sprintf("/v2/%s/referrers/%s", w.repo.RepositoryStr(), sub.DigestStr())) + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return err + } + req.Header.Set("Accept", string(types.OCIImageIndex)) + resp, err := w.client.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + + if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound, http.StatusBadRequest); err != nil { + return err + } + if resp.StatusCode == http.StatusOK { + // The registry supports Referrers API. The registry is responsible for updating the referrers list. + return nil + } + + // The registry doesn't support Referrers API, we need to update the manifest tagged with the fallback tag. + // Make the request to GET the current manifest. + t := fallbackTag(sub) + u = w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), t.Identifier())) + req, err = http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return err + } + req.Header.Set("Accept", string(types.OCIImageIndex)) + resp, err = w.client.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + + var im v1.IndexManifest + if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil { + return err + } else if resp.StatusCode == http.StatusNotFound { + // Not found just means there are no attachments. Start with an empty index. + im = v1.IndexManifest{ + SchemaVersion: 2, + MediaType: types.OCIImageIndex, + Manifests: []v1.Descriptor{add}, + } + } else { + if err := json.NewDecoder(resp.Body).Decode(&im); err != nil { + return err + } + if im.SchemaVersion != 2 { + return fmt.Errorf("fallback tag manifest is not a schema version 2: %d", im.SchemaVersion) + } + if im.MediaType != types.OCIImageIndex { + return fmt.Errorf("fallback tag manifest is not an OCI image index: %s", im.MediaType) + } + for _, desc := range im.Manifests { + if desc.Digest == add.Digest { + // The digest is already attached, nothing to do. + logs.Progress.Printf("fallback tag %s already had referrer", t.Identifier()) + return nil + } + } + // Append the new descriptor to the index. + im.Manifests = append(im.Manifests, add) + } + + // Sort the manifests for reproducibility. + sort.Slice(im.Manifests, func(i, j int) bool { + return im.Manifests[i].Digest.String() < im.Manifests[j].Digest.String() + }) + logs.Progress.Printf("updating fallback tag %s with new referrer", t.Identifier()) + if err := w.commitManifest(ctx, fallbackTaggable{im}, t); err != nil { + return err + } + return nil +} + +type fallbackTaggable struct { + im v1.IndexManifest +} + +func (f fallbackTaggable) RawManifest() ([]byte, error) { return json.Marshal(f.im) } +func (f fallbackTaggable) MediaType() (types.MediaType, error) { return types.OCIImageIndex, nil } + +// commitManifest does a PUT of the image's manifest. +func (w *writer) commitManifest(ctx context.Context, t Taggable, ref name.Reference) error { + // If the manifest refers to a subject, we need to check whether we need to update the fallback tag manifest. + raw, err := t.RawManifest() + if err != nil { + return err + } + var mf struct { + MediaType types.MediaType `json:"mediaType"` + Subject *v1.Descriptor `json:"subject,omitempty"` + Config struct { + MediaType types.MediaType `json:"mediaType"` + } `json:"config"` + } + if err := json.Unmarshal(raw, &mf); err != nil { + return err + } + + tryUpload := func() error { + ctx := retry.Never(ctx) + raw, desc, err := unpackTaggable(t) + if err != nil { + return err + } + + u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), ref.Identifier())) + + // Make the request to PUT the serialized manifest + req, err := http.NewRequest(http.MethodPut, u.String(), bytes.NewBuffer(raw)) + if err != nil { + return err + } + req.Header.Set("Content-Type", string(desc.MediaType)) + + resp, err := w.client.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + + if err := transport.CheckError(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted); err != nil { + return err + } + + // If the manifest referred to a subject, we may need to update the fallback tag manifest. + // TODO: If this fails, we'll retry the whole upload. We should retry just this part. + if mf.Subject != nil { + h, size, err := v1.SHA256(bytes.NewReader(raw)) + if err != nil { + return err + } + desc := v1.Descriptor{ + ArtifactType: string(mf.Config.MediaType), + MediaType: mf.MediaType, + Digest: h, + Size: size, + } + if err := w.commitSubjectReferrers(ctx, + ref.Context().Digest(mf.Subject.Digest.String()), + desc); err != nil { + return err + } + } + + // The image was successfully pushed! + logs.Progress.Printf("%v: digest: %v size: %d", ref, desc.Digest, desc.Size) + w.incrProgress(int64(len(raw))) + return nil + } + + return retry.Retry(tryUpload, w.predicate, w.backoff) +} + +func scopesForUploadingImage(repo name.Repository, layers []v1.Layer) []string { + // use a map as set to remove duplicates scope strings + scopeSet := map[string]struct{}{} + + for _, l := range layers { + if ml, ok := l.(*MountableLayer); ok { + // we will add push scope for ref.Context() after the loop. + // for now we ask pull scope for references of the same registry + if ml.Reference.Context().String() != repo.String() && ml.Reference.Context().Registry.String() == repo.Registry.String() { + scopeSet[ml.Reference.Scope(transport.PullScope)] = struct{}{} + } + } + } + + scopes := make([]string, 0) + // Push scope should be the first element because a few registries just look at the first scope to determine access. + scopes = append(scopes, repo.Scope(transport.PushScope)) + + for scope := range scopeSet { + scopes = append(scopes, scope) + } + + return scopes +} + +// WriteIndex pushes the provided ImageIndex to the specified image reference. +// WriteIndex will attempt to push all of the referenced manifests before +// attempting to push the ImageIndex, to retain referential integrity. +func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) (rerr error) { + o, err := makeOptions(ref.Context(), options...) + if err != nil { + return err + } + + scopes := []string{ref.Scope(transport.PushScope)} + tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes) + if err != nil { + return err + } + w := writer{ + repo: ref.Context(), + client: &http.Client{Transport: tr}, + backoff: o.retryBackoff, + predicate: o.retryPredicate, + } + + if o.updates != nil { + w.progress = &progress{updates: o.updates} + w.progress.lastUpdate = &v1.Update{} + + defer close(o.updates) + defer func() { w.progress.err(rerr) }() + + w.progress.lastUpdate.Total, err = countIndex(ii, o.allowNondistributableArtifacts) + if err != nil { + return err + } + } + + return w.writeIndex(o.context, ref, ii, options...) +} + +// countImage counts the total size of all layers + config blob + manifest for +// an image. It de-dupes duplicate layers. +func countImage(img v1.Image, allowNondistributableArtifacts bool) (int64, error) { + var total int64 + ls, err := img.Layers() + if err != nil { + return 0, err + } + seen := map[v1.Hash]bool{} + for _, l := range ls { + // Handle foreign layers. + mt, err := l.MediaType() + if err != nil { + return 0, err + } + if !mt.IsDistributable() && !allowNondistributableArtifacts { + continue + } + + // TODO: support streaming layers which update the total count as they write. + if _, ok := l.(*stream.Layer); ok { + return 0, errors.New("cannot use stream.Layer and WithProgress") + } + + // Dedupe layers. + d, err := l.Digest() + if err != nil { + return 0, err + } + if seen[d] { + continue + } + seen[d] = true + + size, err := l.Size() + if err != nil { + return 0, err + } + total += size + } + b, err := img.RawConfigFile() + if err != nil { + return 0, err + } + total += int64(len(b)) + size, err := img.Size() + if err != nil { + return 0, err + } + total += size + return total, nil +} + +// countIndex counts the total size of all images + sub-indexes for an index. +// It does not attempt to de-dupe duplicate images, etc. +func countIndex(idx v1.ImageIndex, allowNondistributableArtifacts bool) (int64, error) { + var total int64 + mf, err := idx.IndexManifest() + if err != nil { + return 0, err + } + + for _, desc := range mf.Manifests { + switch desc.MediaType { + case types.OCIImageIndex, types.DockerManifestList: + sidx, err := idx.ImageIndex(desc.Digest) + if err != nil { + return 0, err + } + size, err := countIndex(sidx, allowNondistributableArtifacts) + if err != nil { + return 0, err + } + total += size + case types.OCIManifestSchema1, types.DockerManifestSchema2: + simg, err := idx.Image(desc.Digest) + if err != nil { + return 0, err + } + size, err := countImage(simg, allowNondistributableArtifacts) + if err != nil { + return 0, err + } + total += size + default: + // Workaround for #819. + if wl, ok := idx.(withLayer); ok { + layer, err := wl.Layer(desc.Digest) + if err != nil { + return 0, err + } + size, err := layer.Size() + if err != nil { + return 0, err + } + total += size + } + } + } + + size, err := idx.Size() + if err != nil { + return 0, err + } + total += size + return total, nil +} + +// WriteLayer uploads the provided Layer to the specified repo. +func WriteLayer(repo name.Repository, layer v1.Layer, options ...Option) (rerr error) { + o, err := makeOptions(repo, options...) + if err != nil { + return err + } + scopes := scopesForUploadingImage(repo, []v1.Layer{layer}) + tr, err := transport.NewWithContext(o.context, repo.Registry, o.auth, o.transport, scopes) + if err != nil { + return err + } + w := writer{ + repo: repo, + client: &http.Client{Transport: tr}, + backoff: o.retryBackoff, + predicate: o.retryPredicate, + } + + if o.updates != nil { + w.progress = &progress{updates: o.updates} + w.progress.lastUpdate = &v1.Update{} + + defer close(o.updates) + defer func() { w.progress.err(rerr) }() + + // TODO: support streaming layers which update the total count as they write. + if _, ok := layer.(*stream.Layer); ok { + return errors.New("cannot use stream.Layer and WithProgress") + } + size, err := layer.Size() + if err != nil { + return err + } + w.progress.total(size) + } + return w.uploadOne(o.context, layer) +} + +// Tag adds a tag to the given Taggable via PUT /v2/.../manifests/<tag> +// +// Notable implementations of Taggable are v1.Image, v1.ImageIndex, and +// remote.Descriptor. +// +// If t implements MediaType, we will use that for the Content-Type, otherwise +// we will default to types.DockerManifestSchema2. +// +// Tag does not attempt to write anything other than the manifest, so callers +// should ensure that all blobs or manifests that are referenced by t exist +// in the target registry. +func Tag(tag name.Tag, t Taggable, options ...Option) error { + return Put(tag, t, options...) +} + +// Put adds a manifest from the given Taggable via PUT /v1/.../manifest/<ref> +// +// Notable implementations of Taggable are v1.Image, v1.ImageIndex, and +// remote.Descriptor. +// +// If t implements MediaType, we will use that for the Content-Type, otherwise +// we will default to types.DockerManifestSchema2. +// +// Put does not attempt to write anything other than the manifest, so callers +// should ensure that all blobs or manifests that are referenced by t exist +// in the target registry. +func Put(ref name.Reference, t Taggable, options ...Option) error { + o, err := makeOptions(ref.Context(), options...) + if err != nil { + return err + } + scopes := []string{ref.Scope(transport.PushScope)} + + // TODO: This *always* does a token exchange. For some registries, + // that's pretty slow. Some ideas; + // * Tag could take a list of tags. + // * Allow callers to pass in a transport.Transport, typecheck + // it to allow them to reuse the transport across multiple calls. + // * WithTag option to do multiple manifest PUTs in commitManifest. + tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes) + if err != nil { + return err + } + w := writer{ + repo: ref.Context(), + client: &http.Client{Transport: tr}, + backoff: o.retryBackoff, + predicate: o.retryPredicate, + } + + return w.commitManifest(o.context, t, ref) +} |