summaryrefslogtreecommitdiffstats
path: root/src/cmd/go/internal/par
diff options
context:
space:
mode:
Diffstat (limited to 'src/cmd/go/internal/par')
-rw-r--r--src/cmd/go/internal/par/queue.go88
-rw-r--r--src/cmd/go/internal/par/queue_test.go79
-rw-r--r--src/cmd/go/internal/par/work.go190
-rw-r--r--src/cmd/go/internal/par/work_test.go77
4 files changed, 434 insertions, 0 deletions
diff --git a/src/cmd/go/internal/par/queue.go b/src/cmd/go/internal/par/queue.go
new file mode 100644
index 0000000..180bc75
--- /dev/null
+++ b/src/cmd/go/internal/par/queue.go
@@ -0,0 +1,88 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package par
+
+import "fmt"
+
+// Queue manages a set of work items to be executed in parallel. The number of
+// active work items is limited, and excess items are queued sequentially.
+type Queue struct {
+ maxActive int
+ st chan queueState
+}
+
+type queueState struct {
+ active int // number of goroutines processing work; always nonzero when len(backlog) > 0
+ backlog []func()
+ idle chan struct{} // if non-nil, closed when active becomes 0
+}
+
+// NewQueue returns a Queue that executes up to maxActive items in parallel.
+//
+// maxActive must be positive.
+func NewQueue(maxActive int) *Queue {
+ if maxActive < 1 {
+ panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive))
+ }
+
+ q := &Queue{
+ maxActive: maxActive,
+ st: make(chan queueState, 1),
+ }
+ q.st <- queueState{}
+ return q
+}
+
+// Add adds f as a work item in the queue.
+//
+// Add returns immediately, but the queue will be marked as non-idle until after
+// f (and any subsequently-added work) has completed.
+func (q *Queue) Add(f func()) {
+ st := <-q.st
+ if st.active == q.maxActive {
+ st.backlog = append(st.backlog, f)
+ q.st <- st
+ return
+ }
+ if st.active == 0 {
+ // Mark q as non-idle.
+ st.idle = nil
+ }
+ st.active++
+ q.st <- st
+
+ go func() {
+ for {
+ f()
+
+ st := <-q.st
+ if len(st.backlog) == 0 {
+ if st.active--; st.active == 0 && st.idle != nil {
+ close(st.idle)
+ }
+ q.st <- st
+ return
+ }
+ f, st.backlog = st.backlog[0], st.backlog[1:]
+ q.st <- st
+ }
+ }()
+}
+
+// Idle returns a channel that will be closed when q has no (active or enqueued)
+// work outstanding.
+func (q *Queue) Idle() <-chan struct{} {
+ st := <-q.st
+ defer func() { q.st <- st }()
+
+ if st.idle == nil {
+ st.idle = make(chan struct{})
+ if st.active == 0 {
+ close(st.idle)
+ }
+ }
+
+ return st.idle
+}
diff --git a/src/cmd/go/internal/par/queue_test.go b/src/cmd/go/internal/par/queue_test.go
new file mode 100644
index 0000000..1331e65
--- /dev/null
+++ b/src/cmd/go/internal/par/queue_test.go
@@ -0,0 +1,79 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package par
+
+import (
+ "sync"
+ "testing"
+)
+
+func TestQueueIdle(t *testing.T) {
+ q := NewQueue(1)
+ select {
+ case <-q.Idle():
+ default:
+ t.Errorf("NewQueue(1) is not initially idle.")
+ }
+
+ started := make(chan struct{})
+ unblock := make(chan struct{})
+ q.Add(func() {
+ close(started)
+ <-unblock
+ })
+
+ <-started
+ idle := q.Idle()
+ select {
+ case <-idle:
+ t.Errorf("NewQueue(1) is marked idle while processing work.")
+ default:
+ }
+
+ close(unblock)
+ <-idle // Should be closed as soon as the Add callback returns.
+}
+
+func TestQueueBacklog(t *testing.T) {
+ const (
+ maxActive = 2
+ totalWork = 3 * maxActive
+ )
+
+ q := NewQueue(maxActive)
+ t.Logf("q = NewQueue(%d)", maxActive)
+
+ var wg sync.WaitGroup
+ wg.Add(totalWork)
+ started := make([]chan struct{}, totalWork)
+ unblock := make(chan struct{})
+ for i := range started {
+ started[i] = make(chan struct{})
+ i := i
+ q.Add(func() {
+ close(started[i])
+ <-unblock
+ wg.Done()
+ })
+ }
+
+ for i, c := range started {
+ if i < maxActive {
+ <-c // Work item i should be started immediately.
+ } else {
+ select {
+ case <-c:
+ t.Errorf("Work item %d started before previous items finished.", i)
+ default:
+ }
+ }
+ }
+
+ close(unblock)
+ for _, c := range started[maxActive:] {
+ <-c
+ }
+ wg.Wait()
+}
diff --git a/src/cmd/go/internal/par/work.go b/src/cmd/go/internal/par/work.go
new file mode 100644
index 0000000..960cec6
--- /dev/null
+++ b/src/cmd/go/internal/par/work.go
@@ -0,0 +1,190 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package par implements parallel execution helpers.
+package par
+
+import (
+ "math/rand"
+ "sync"
+ "sync/atomic"
+)
+
+// Work manages a set of work items to be executed in parallel, at most once each.
+// The items in the set must all be valid map keys.
+type Work struct {
+ f func(interface{}) // function to run for each item
+ running int // total number of runners
+
+ mu sync.Mutex
+ added map[interface{}]bool // items added to set
+ todo []interface{} // items yet to be run
+ wait sync.Cond // wait when todo is empty
+ waiting int // number of runners waiting for todo
+}
+
+func (w *Work) init() {
+ if w.added == nil {
+ w.added = make(map[interface{}]bool)
+ }
+}
+
+// Add adds item to the work set, if it hasn't already been added.
+func (w *Work) Add(item interface{}) {
+ w.mu.Lock()
+ w.init()
+ if !w.added[item] {
+ w.added[item] = true
+ w.todo = append(w.todo, item)
+ if w.waiting > 0 {
+ w.wait.Signal()
+ }
+ }
+ w.mu.Unlock()
+}
+
+// Do runs f in parallel on items from the work set,
+// with at most n invocations of f running at a time.
+// It returns when everything added to the work set has been processed.
+// At least one item should have been added to the work set
+// before calling Do (or else Do returns immediately),
+// but it is allowed for f(item) to add new items to the set.
+// Do should only be used once on a given Work.
+func (w *Work) Do(n int, f func(item interface{})) {
+ if n < 1 {
+ panic("par.Work.Do: n < 1")
+ }
+ if w.running >= 1 {
+ panic("par.Work.Do: already called Do")
+ }
+
+ w.running = n
+ w.f = f
+ w.wait.L = &w.mu
+
+ for i := 0; i < n-1; i++ {
+ go w.runner()
+ }
+ w.runner()
+}
+
+// runner executes work in w until both nothing is left to do
+// and all the runners are waiting for work.
+// (Then all the runners return.)
+func (w *Work) runner() {
+ for {
+ // Wait for something to do.
+ w.mu.Lock()
+ for len(w.todo) == 0 {
+ w.waiting++
+ if w.waiting == w.running {
+ // All done.
+ w.wait.Broadcast()
+ w.mu.Unlock()
+ return
+ }
+ w.wait.Wait()
+ w.waiting--
+ }
+
+ // Pick something to do at random,
+ // to eliminate pathological contention
+ // in case items added at about the same time
+ // are most likely to contend.
+ i := rand.Intn(len(w.todo))
+ item := w.todo[i]
+ w.todo[i] = w.todo[len(w.todo)-1]
+ w.todo = w.todo[:len(w.todo)-1]
+ w.mu.Unlock()
+
+ w.f(item)
+ }
+}
+
+// Cache runs an action once per key and caches the result.
+type Cache struct {
+ m sync.Map
+}
+
+type cacheEntry struct {
+ done uint32
+ mu sync.Mutex
+ result interface{}
+}
+
+// Do calls the function f if and only if Do is being called for the first time with this key.
+// No call to Do with a given key returns until the one call to f returns.
+// Do returns the value returned by the one call to f.
+func (c *Cache) Do(key interface{}, f func() interface{}) interface{} {
+ entryIface, ok := c.m.Load(key)
+ if !ok {
+ entryIface, _ = c.m.LoadOrStore(key, new(cacheEntry))
+ }
+ e := entryIface.(*cacheEntry)
+ if atomic.LoadUint32(&e.done) == 0 {
+ e.mu.Lock()
+ if atomic.LoadUint32(&e.done) == 0 {
+ e.result = f()
+ atomic.StoreUint32(&e.done, 1)
+ }
+ e.mu.Unlock()
+ }
+ return e.result
+}
+
+// Get returns the cached result associated with key.
+// It returns nil if there is no such result.
+// If the result for key is being computed, Get does not wait for the computation to finish.
+func (c *Cache) Get(key interface{}) interface{} {
+ entryIface, ok := c.m.Load(key)
+ if !ok {
+ return nil
+ }
+ e := entryIface.(*cacheEntry)
+ if atomic.LoadUint32(&e.done) == 0 {
+ return nil
+ }
+ return e.result
+}
+
+// Clear removes all entries in the cache.
+//
+// Concurrent calls to Get may return old values. Concurrent calls to Do
+// may return old values or store results in entries that have been deleted.
+//
+// TODO(jayconrod): Delete this after the package cache clearing functions
+// in internal/load have been removed.
+func (c *Cache) Clear() {
+ c.m.Range(func(key, value interface{}) bool {
+ c.m.Delete(key)
+ return true
+ })
+}
+
+// Delete removes an entry from the map. It is safe to call Delete for an
+// entry that does not exist. Delete will return quickly, even if the result
+// for a key is still being computed; the computation will finish, but the
+// result won't be accessible through the cache.
+//
+// TODO(jayconrod): Delete this after the package cache clearing functions
+// in internal/load have been removed.
+func (c *Cache) Delete(key interface{}) {
+ c.m.Delete(key)
+}
+
+// DeleteIf calls pred for each key in the map. If pred returns true for a key,
+// DeleteIf removes the corresponding entry. If the result for a key is
+// still being computed, DeleteIf will remove the entry without waiting for
+// the computation to finish. The result won't be accessible through the cache.
+//
+// TODO(jayconrod): Delete this after the package cache clearing functions
+// in internal/load have been removed.
+func (c *Cache) DeleteIf(pred func(key interface{}) bool) {
+ c.m.Range(func(key, _ interface{}) bool {
+ if pred(key) {
+ c.Delete(key)
+ }
+ return true
+ })
+}
diff --git a/src/cmd/go/internal/par/work_test.go b/src/cmd/go/internal/par/work_test.go
new file mode 100644
index 0000000..f104bc4
--- /dev/null
+++ b/src/cmd/go/internal/par/work_test.go
@@ -0,0 +1,77 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package par
+
+import (
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestWork(t *testing.T) {
+ var w Work
+
+ const N = 10000
+ n := int32(0)
+ w.Add(N)
+ w.Do(100, func(x interface{}) {
+ atomic.AddInt32(&n, 1)
+ i := x.(int)
+ if i >= 2 {
+ w.Add(i - 1)
+ w.Add(i - 2)
+ }
+ w.Add(i >> 1)
+ w.Add((i >> 1) ^ 1)
+ })
+ if n != N+1 {
+ t.Fatalf("ran %d items, expected %d", n, N+1)
+ }
+}
+
+func TestWorkParallel(t *testing.T) {
+ for tries := 0; tries < 10; tries++ {
+ var w Work
+ const N = 100
+ for i := 0; i < N; i++ {
+ w.Add(i)
+ }
+ start := time.Now()
+ var n int32
+ w.Do(N, func(x interface{}) {
+ time.Sleep(1 * time.Millisecond)
+ atomic.AddInt32(&n, +1)
+ })
+ if n != N {
+ t.Fatalf("par.Work.Do did not do all the work")
+ }
+ if time.Since(start) < N/2*time.Millisecond {
+ return
+ }
+ }
+ t.Fatalf("par.Work.Do does not seem to be parallel")
+}
+
+func TestCache(t *testing.T) {
+ var cache Cache
+
+ n := 1
+ v := cache.Do(1, func() interface{} { n++; return n })
+ if v != 2 {
+ t.Fatalf("cache.Do(1) did not run f")
+ }
+ v = cache.Do(1, func() interface{} { n++; return n })
+ if v != 2 {
+ t.Fatalf("cache.Do(1) ran f again!")
+ }
+ v = cache.Do(2, func() interface{} { n++; return n })
+ if v != 3 {
+ t.Fatalf("cache.Do(2) did not run f")
+ }
+ v = cache.Do(1, func() interface{} { n++; return n })
+ if v != 2 {
+ t.Fatalf("cache.Do(1) did not returned saved value from original cache.Do(1)")
+ }
+}