diff options
Diffstat (limited to 'src/cmd/go/internal/par')
-rw-r--r-- | src/cmd/go/internal/par/queue.go | 88 | ||||
-rw-r--r-- | src/cmd/go/internal/par/queue_test.go | 79 | ||||
-rw-r--r-- | src/cmd/go/internal/par/work.go | 190 | ||||
-rw-r--r-- | src/cmd/go/internal/par/work_test.go | 77 |
4 files changed, 434 insertions, 0 deletions
diff --git a/src/cmd/go/internal/par/queue.go b/src/cmd/go/internal/par/queue.go new file mode 100644 index 0000000..180bc75 --- /dev/null +++ b/src/cmd/go/internal/par/queue.go @@ -0,0 +1,88 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package par + +import "fmt" + +// Queue manages a set of work items to be executed in parallel. The number of +// active work items is limited, and excess items are queued sequentially. +type Queue struct { + maxActive int + st chan queueState +} + +type queueState struct { + active int // number of goroutines processing work; always nonzero when len(backlog) > 0 + backlog []func() + idle chan struct{} // if non-nil, closed when active becomes 0 +} + +// NewQueue returns a Queue that executes up to maxActive items in parallel. +// +// maxActive must be positive. +func NewQueue(maxActive int) *Queue { + if maxActive < 1 { + panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive)) + } + + q := &Queue{ + maxActive: maxActive, + st: make(chan queueState, 1), + } + q.st <- queueState{} + return q +} + +// Add adds f as a work item in the queue. +// +// Add returns immediately, but the queue will be marked as non-idle until after +// f (and any subsequently-added work) has completed. +func (q *Queue) Add(f func()) { + st := <-q.st + if st.active == q.maxActive { + st.backlog = append(st.backlog, f) + q.st <- st + return + } + if st.active == 0 { + // Mark q as non-idle. + st.idle = nil + } + st.active++ + q.st <- st + + go func() { + for { + f() + + st := <-q.st + if len(st.backlog) == 0 { + if st.active--; st.active == 0 && st.idle != nil { + close(st.idle) + } + q.st <- st + return + } + f, st.backlog = st.backlog[0], st.backlog[1:] + q.st <- st + } + }() +} + +// Idle returns a channel that will be closed when q has no (active or enqueued) +// work outstanding. +func (q *Queue) Idle() <-chan struct{} { + st := <-q.st + defer func() { q.st <- st }() + + if st.idle == nil { + st.idle = make(chan struct{}) + if st.active == 0 { + close(st.idle) + } + } + + return st.idle +} diff --git a/src/cmd/go/internal/par/queue_test.go b/src/cmd/go/internal/par/queue_test.go new file mode 100644 index 0000000..1331e65 --- /dev/null +++ b/src/cmd/go/internal/par/queue_test.go @@ -0,0 +1,79 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package par + +import ( + "sync" + "testing" +) + +func TestQueueIdle(t *testing.T) { + q := NewQueue(1) + select { + case <-q.Idle(): + default: + t.Errorf("NewQueue(1) is not initially idle.") + } + + started := make(chan struct{}) + unblock := make(chan struct{}) + q.Add(func() { + close(started) + <-unblock + }) + + <-started + idle := q.Idle() + select { + case <-idle: + t.Errorf("NewQueue(1) is marked idle while processing work.") + default: + } + + close(unblock) + <-idle // Should be closed as soon as the Add callback returns. +} + +func TestQueueBacklog(t *testing.T) { + const ( + maxActive = 2 + totalWork = 3 * maxActive + ) + + q := NewQueue(maxActive) + t.Logf("q = NewQueue(%d)", maxActive) + + var wg sync.WaitGroup + wg.Add(totalWork) + started := make([]chan struct{}, totalWork) + unblock := make(chan struct{}) + for i := range started { + started[i] = make(chan struct{}) + i := i + q.Add(func() { + close(started[i]) + <-unblock + wg.Done() + }) + } + + for i, c := range started { + if i < maxActive { + <-c // Work item i should be started immediately. + } else { + select { + case <-c: + t.Errorf("Work item %d started before previous items finished.", i) + default: + } + } + } + + close(unblock) + for _, c := range started[maxActive:] { + <-c + } + wg.Wait() +} diff --git a/src/cmd/go/internal/par/work.go b/src/cmd/go/internal/par/work.go new file mode 100644 index 0000000..960cec6 --- /dev/null +++ b/src/cmd/go/internal/par/work.go @@ -0,0 +1,190 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package par implements parallel execution helpers. +package par + +import ( + "math/rand" + "sync" + "sync/atomic" +) + +// Work manages a set of work items to be executed in parallel, at most once each. +// The items in the set must all be valid map keys. +type Work struct { + f func(interface{}) // function to run for each item + running int // total number of runners + + mu sync.Mutex + added map[interface{}]bool // items added to set + todo []interface{} // items yet to be run + wait sync.Cond // wait when todo is empty + waiting int // number of runners waiting for todo +} + +func (w *Work) init() { + if w.added == nil { + w.added = make(map[interface{}]bool) + } +} + +// Add adds item to the work set, if it hasn't already been added. +func (w *Work) Add(item interface{}) { + w.mu.Lock() + w.init() + if !w.added[item] { + w.added[item] = true + w.todo = append(w.todo, item) + if w.waiting > 0 { + w.wait.Signal() + } + } + w.mu.Unlock() +} + +// Do runs f in parallel on items from the work set, +// with at most n invocations of f running at a time. +// It returns when everything added to the work set has been processed. +// At least one item should have been added to the work set +// before calling Do (or else Do returns immediately), +// but it is allowed for f(item) to add new items to the set. +// Do should only be used once on a given Work. +func (w *Work) Do(n int, f func(item interface{})) { + if n < 1 { + panic("par.Work.Do: n < 1") + } + if w.running >= 1 { + panic("par.Work.Do: already called Do") + } + + w.running = n + w.f = f + w.wait.L = &w.mu + + for i := 0; i < n-1; i++ { + go w.runner() + } + w.runner() +} + +// runner executes work in w until both nothing is left to do +// and all the runners are waiting for work. +// (Then all the runners return.) +func (w *Work) runner() { + for { + // Wait for something to do. + w.mu.Lock() + for len(w.todo) == 0 { + w.waiting++ + if w.waiting == w.running { + // All done. + w.wait.Broadcast() + w.mu.Unlock() + return + } + w.wait.Wait() + w.waiting-- + } + + // Pick something to do at random, + // to eliminate pathological contention + // in case items added at about the same time + // are most likely to contend. + i := rand.Intn(len(w.todo)) + item := w.todo[i] + w.todo[i] = w.todo[len(w.todo)-1] + w.todo = w.todo[:len(w.todo)-1] + w.mu.Unlock() + + w.f(item) + } +} + +// Cache runs an action once per key and caches the result. +type Cache struct { + m sync.Map +} + +type cacheEntry struct { + done uint32 + mu sync.Mutex + result interface{} +} + +// Do calls the function f if and only if Do is being called for the first time with this key. +// No call to Do with a given key returns until the one call to f returns. +// Do returns the value returned by the one call to f. +func (c *Cache) Do(key interface{}, f func() interface{}) interface{} { + entryIface, ok := c.m.Load(key) + if !ok { + entryIface, _ = c.m.LoadOrStore(key, new(cacheEntry)) + } + e := entryIface.(*cacheEntry) + if atomic.LoadUint32(&e.done) == 0 { + e.mu.Lock() + if atomic.LoadUint32(&e.done) == 0 { + e.result = f() + atomic.StoreUint32(&e.done, 1) + } + e.mu.Unlock() + } + return e.result +} + +// Get returns the cached result associated with key. +// It returns nil if there is no such result. +// If the result for key is being computed, Get does not wait for the computation to finish. +func (c *Cache) Get(key interface{}) interface{} { + entryIface, ok := c.m.Load(key) + if !ok { + return nil + } + e := entryIface.(*cacheEntry) + if atomic.LoadUint32(&e.done) == 0 { + return nil + } + return e.result +} + +// Clear removes all entries in the cache. +// +// Concurrent calls to Get may return old values. Concurrent calls to Do +// may return old values or store results in entries that have been deleted. +// +// TODO(jayconrod): Delete this after the package cache clearing functions +// in internal/load have been removed. +func (c *Cache) Clear() { + c.m.Range(func(key, value interface{}) bool { + c.m.Delete(key) + return true + }) +} + +// Delete removes an entry from the map. It is safe to call Delete for an +// entry that does not exist. Delete will return quickly, even if the result +// for a key is still being computed; the computation will finish, but the +// result won't be accessible through the cache. +// +// TODO(jayconrod): Delete this after the package cache clearing functions +// in internal/load have been removed. +func (c *Cache) Delete(key interface{}) { + c.m.Delete(key) +} + +// DeleteIf calls pred for each key in the map. If pred returns true for a key, +// DeleteIf removes the corresponding entry. If the result for a key is +// still being computed, DeleteIf will remove the entry without waiting for +// the computation to finish. The result won't be accessible through the cache. +// +// TODO(jayconrod): Delete this after the package cache clearing functions +// in internal/load have been removed. +func (c *Cache) DeleteIf(pred func(key interface{}) bool) { + c.m.Range(func(key, _ interface{}) bool { + if pred(key) { + c.Delete(key) + } + return true + }) +} diff --git a/src/cmd/go/internal/par/work_test.go b/src/cmd/go/internal/par/work_test.go new file mode 100644 index 0000000..f104bc4 --- /dev/null +++ b/src/cmd/go/internal/par/work_test.go @@ -0,0 +1,77 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package par + +import ( + "sync/atomic" + "testing" + "time" +) + +func TestWork(t *testing.T) { + var w Work + + const N = 10000 + n := int32(0) + w.Add(N) + w.Do(100, func(x interface{}) { + atomic.AddInt32(&n, 1) + i := x.(int) + if i >= 2 { + w.Add(i - 1) + w.Add(i - 2) + } + w.Add(i >> 1) + w.Add((i >> 1) ^ 1) + }) + if n != N+1 { + t.Fatalf("ran %d items, expected %d", n, N+1) + } +} + +func TestWorkParallel(t *testing.T) { + for tries := 0; tries < 10; tries++ { + var w Work + const N = 100 + for i := 0; i < N; i++ { + w.Add(i) + } + start := time.Now() + var n int32 + w.Do(N, func(x interface{}) { + time.Sleep(1 * time.Millisecond) + atomic.AddInt32(&n, +1) + }) + if n != N { + t.Fatalf("par.Work.Do did not do all the work") + } + if time.Since(start) < N/2*time.Millisecond { + return + } + } + t.Fatalf("par.Work.Do does not seem to be parallel") +} + +func TestCache(t *testing.T) { + var cache Cache + + n := 1 + v := cache.Do(1, func() interface{} { n++; return n }) + if v != 2 { + t.Fatalf("cache.Do(1) did not run f") + } + v = cache.Do(1, func() interface{} { n++; return n }) + if v != 2 { + t.Fatalf("cache.Do(1) ran f again!") + } + v = cache.Do(2, func() interface{} { n++; return n }) + if v != 3 { + t.Fatalf("cache.Do(2) did not run f") + } + v = cache.Do(1, func() interface{} { n++; return n }) + if v != 2 { + t.Fatalf("cache.Do(1) did not returned saved value from original cache.Do(1)") + } +} |