diff options
Diffstat (limited to 'src/sync')
31 files changed, 6257 insertions, 0 deletions
diff --git a/src/sync/atomic/asm.s b/src/sync/atomic/asm.s new file mode 100644 index 0000000..f86726f --- /dev/null +++ b/src/sync/atomic/asm.s @@ -0,0 +1,85 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !race + +#include "textflag.h" + +TEXT ·SwapInt32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xchg(SB) + +TEXT ·SwapUint32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xchg(SB) + +TEXT ·SwapInt64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xchg64(SB) + +TEXT ·SwapUint64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xchg64(SB) + +TEXT ·SwapUintptr(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xchguintptr(SB) + +TEXT ·CompareAndSwapInt32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Cas(SB) + +TEXT ·CompareAndSwapUint32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Cas(SB) + +TEXT ·CompareAndSwapUintptr(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Casuintptr(SB) + +TEXT ·CompareAndSwapInt64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Cas64(SB) + +TEXT ·CompareAndSwapUint64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Cas64(SB) + +TEXT ·AddInt32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xadd(SB) + +TEXT ·AddUint32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xadd(SB) + +TEXT ·AddUintptr(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xadduintptr(SB) + +TEXT ·AddInt64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xadd64(SB) + +TEXT ·AddUint64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Xadd64(SB) + +TEXT ·LoadInt32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Load(SB) + +TEXT ·LoadUint32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Load(SB) + +TEXT ·LoadInt64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Load64(SB) + +TEXT ·LoadUint64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Load64(SB) + +TEXT ·LoadUintptr(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Loaduintptr(SB) + +TEXT ·LoadPointer(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Loadp(SB) + +TEXT ·StoreInt32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Store(SB) + +TEXT ·StoreUint32(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Store(SB) + +TEXT ·StoreInt64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Store64(SB) + +TEXT ·StoreUint64(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Store64(SB) + +TEXT ·StoreUintptr(SB),NOSPLIT,$0 + JMP runtime∕internal∕atomic·Storeuintptr(SB) diff --git a/src/sync/atomic/atomic_test.go b/src/sync/atomic/atomic_test.go new file mode 100644 index 0000000..eadc962 --- /dev/null +++ b/src/sync/atomic/atomic_test.go @@ -0,0 +1,1472 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package atomic_test + +import ( + "fmt" + "runtime" + "strings" + . "sync/atomic" + "testing" + "unsafe" +) + +// Tests of correct behavior, without contention. +// (Does the function work as advertised?) +// +// Test that the Add functions add correctly. +// Test that the CompareAndSwap functions actually +// do the comparison and the swap correctly. +// +// The loop over power-of-two values is meant to +// ensure that the operations apply to the full word size. +// The struct fields x.before and x.after check that the +// operations do not extend past the full word size. + +const ( + magic32 = 0xdedbeef + magic64 = 0xdeddeadbeefbeef +) + +// Do the 64-bit functions panic? If so, don't bother testing. +var test64err = func() (err interface{}) { + defer func() { + err = recover() + }() + var x int64 + AddInt64(&x, 1) + return nil +}() + +func TestSwapInt32(t *testing.T) { + var x struct { + before int32 + i int32 + after int32 + } + x.before = magic32 + x.after = magic32 + var j int32 + for delta := int32(1); delta+delta > delta; delta += delta { + k := SwapInt32(&x.i, delta) + if x.i != delta || k != j { + t.Fatalf("delta=%d i=%d j=%d k=%d", delta, x.i, j, k) + } + j = delta + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + +func TestSwapUint32(t *testing.T) { + var x struct { + before uint32 + i uint32 + after uint32 + } + x.before = magic32 + x.after = magic32 + var j uint32 + for delta := uint32(1); delta+delta > delta; delta += delta { + k := SwapUint32(&x.i, delta) + if x.i != delta || k != j { + t.Fatalf("delta=%d i=%d j=%d k=%d", delta, x.i, j, k) + } + j = delta + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + +func TestSwapInt64(t *testing.T) { + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + var x struct { + before int64 + i int64 + after int64 + } + x.before = magic64 + x.after = magic64 + var j int64 + for delta := int64(1); delta+delta > delta; delta += delta { + k := SwapInt64(&x.i, delta) + if x.i != delta || k != j { + t.Fatalf("delta=%d i=%d j=%d k=%d", delta, x.i, j, k) + } + j = delta + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + +func TestSwapUint64(t *testing.T) { + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + var x struct { + before uint64 + i uint64 + after uint64 + } + x.before = magic64 + x.after = magic64 + var j uint64 + for delta := uint64(1); delta+delta > delta; delta += delta { + k := SwapUint64(&x.i, delta) + if x.i != delta || k != j { + t.Fatalf("delta=%d i=%d j=%d k=%d", delta, x.i, j, k) + } + j = delta + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + +func TestSwapUintptr(t *testing.T) { + var x struct { + before uintptr + i uintptr + after uintptr + } + var m uint64 = magic64 + magicptr := uintptr(m) + x.before = magicptr + x.after = magicptr + var j uintptr + for delta := uintptr(1); delta+delta > delta; delta += delta { + k := SwapUintptr(&x.i, delta) + if x.i != delta || k != j { + t.Fatalf("delta=%d i=%d j=%d k=%d", delta, x.i, j, k) + } + j = delta + } + if x.before != magicptr || x.after != magicptr { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magicptr, magicptr) + } +} + +var global [1024]byte + +func testPointers() []unsafe.Pointer { + var pointers []unsafe.Pointer + // globals + for i := 0; i < 10; i++ { + pointers = append(pointers, unsafe.Pointer(&global[1<<i-1])) + } + // heap + pointers = append(pointers, unsafe.Pointer(new(byte))) + // nil + pointers = append(pointers, nil) + return pointers +} + +func TestSwapPointer(t *testing.T) { + var x struct { + before uintptr + i unsafe.Pointer + after uintptr + } + var m uint64 = magic64 + magicptr := uintptr(m) + x.before = magicptr + x.after = magicptr + var j unsafe.Pointer + + for _, p := range testPointers() { + k := SwapPointer(&x.i, p) + if x.i != p || k != j { + t.Fatalf("p=%p i=%p j=%p k=%p", p, x.i, j, k) + } + j = p + } + if x.before != magicptr || x.after != magicptr { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magicptr, magicptr) + } +} + +func TestAddInt32(t *testing.T) { + var x struct { + before int32 + i int32 + after int32 + } + x.before = magic32 + x.after = magic32 + var j int32 + for delta := int32(1); delta+delta > delta; delta += delta { + k := AddInt32(&x.i, delta) + j += delta + if x.i != j || k != j { + t.Fatalf("delta=%d i=%d j=%d k=%d", delta, x.i, j, k) + } + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + +func TestAddUint32(t *testing.T) { + var x struct { + before uint32 + i uint32 + after uint32 + } + x.before = magic32 + x.after = magic32 + var j uint32 + for delta := uint32(1); delta+delta > delta; delta += delta { + k := AddUint32(&x.i, delta) + j += delta + if x.i != j || k != j { + t.Fatalf("delta=%d i=%d j=%d k=%d", delta, x.i, j, k) + } + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + +func TestAddInt64(t *testing.T) { + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + var x struct { + before int64 + i int64 + after int64 + } + x.before = magic64 + x.after = magic64 + var j int64 + for delta := int64(1); delta+delta > delta; delta += delta { + k := AddInt64(&x.i, delta) + j += delta + if x.i != j || k != j { + t.Fatalf("delta=%d i=%d j=%d k=%d", delta, x.i, j, k) + } + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, int64(magic64), int64(magic64)) + } +} + +func TestAddUint64(t *testing.T) { + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + var x struct { + before uint64 + i uint64 + after uint64 + } + x.before = magic64 + x.after = magic64 + var j uint64 + for delta := uint64(1); delta+delta > delta; delta += delta { + k := AddUint64(&x.i, delta) + j += delta + if x.i != j || k != j { + t.Fatalf("delta=%d i=%d j=%d k=%d", delta, x.i, j, k) + } + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + +func TestAddUintptr(t *testing.T) { + var x struct { + before uintptr + i uintptr + after uintptr + } + var m uint64 = magic64 + magicptr := uintptr(m) + x.before = magicptr + x.after = magicptr + var j uintptr + for delta := uintptr(1); delta+delta > delta; delta += delta { + k := AddUintptr(&x.i, delta) + j += delta + if x.i != j || k != j { + t.Fatalf("delta=%d i=%d j=%d k=%d", delta, x.i, j, k) + } + } + if x.before != magicptr || x.after != magicptr { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magicptr, magicptr) + } +} + +func TestCompareAndSwapInt32(t *testing.T) { + var x struct { + before int32 + i int32 + after int32 + } + x.before = magic32 + x.after = magic32 + for val := int32(1); val+val > val; val += val { + x.i = val + if !CompareAndSwapInt32(&x.i, val, val+1) { + t.Fatalf("should have swapped %#x %#x", val, val+1) + } + if x.i != val+1 { + t.Fatalf("wrong x.i after swap: x.i=%#x val+1=%#x", x.i, val+1) + } + x.i = val + 1 + if CompareAndSwapInt32(&x.i, val, val+2) { + t.Fatalf("should not have swapped %#x %#x", val, val+2) + } + if x.i != val+1 { + t.Fatalf("wrong x.i after swap: x.i=%#x val+1=%#x", x.i, val+1) + } + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + +func TestCompareAndSwapUint32(t *testing.T) { + var x struct { + before uint32 + i uint32 + after uint32 + } + x.before = magic32 + x.after = magic32 + for val := uint32(1); val+val > val; val += val { + x.i = val + if !CompareAndSwapUint32(&x.i, val, val+1) { + t.Fatalf("should have swapped %#x %#x", val, val+1) + } + if x.i != val+1 { + t.Fatalf("wrong x.i after swap: x.i=%#x val+1=%#x", x.i, val+1) + } + x.i = val + 1 + if CompareAndSwapUint32(&x.i, val, val+2) { + t.Fatalf("should not have swapped %#x %#x", val, val+2) + } + if x.i != val+1 { + t.Fatalf("wrong x.i after swap: x.i=%#x val+1=%#x", x.i, val+1) + } + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + +func TestCompareAndSwapInt64(t *testing.T) { + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + var x struct { + before int64 + i int64 + after int64 + } + x.before = magic64 + x.after = magic64 + for val := int64(1); val+val > val; val += val { + x.i = val + if !CompareAndSwapInt64(&x.i, val, val+1) { + t.Fatalf("should have swapped %#x %#x", val, val+1) + } + if x.i != val+1 { + t.Fatalf("wrong x.i after swap: x.i=%#x val+1=%#x", x.i, val+1) + } + x.i = val + 1 + if CompareAndSwapInt64(&x.i, val, val+2) { + t.Fatalf("should not have swapped %#x %#x", val, val+2) + } + if x.i != val+1 { + t.Fatalf("wrong x.i after swap: x.i=%#x val+1=%#x", x.i, val+1) + } + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + +func testCompareAndSwapUint64(t *testing.T, cas func(*uint64, uint64, uint64) bool) { + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + var x struct { + before uint64 + i uint64 + after uint64 + } + x.before = magic64 + x.after = magic64 + for val := uint64(1); val+val > val; val += val { + x.i = val + if !cas(&x.i, val, val+1) { + t.Fatalf("should have swapped %#x %#x", val, val+1) + } + if x.i != val+1 { + t.Fatalf("wrong x.i after swap: x.i=%#x val+1=%#x", x.i, val+1) + } + x.i = val + 1 + if cas(&x.i, val, val+2) { + t.Fatalf("should not have swapped %#x %#x", val, val+2) + } + if x.i != val+1 { + t.Fatalf("wrong x.i after swap: x.i=%#x val+1=%#x", x.i, val+1) + } + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + +func TestCompareAndSwapUint64(t *testing.T) { + testCompareAndSwapUint64(t, CompareAndSwapUint64) +} + +func TestCompareAndSwapUintptr(t *testing.T) { + var x struct { + before uintptr + i uintptr + after uintptr + } + var m uint64 = magic64 + magicptr := uintptr(m) + x.before = magicptr + x.after = magicptr + for val := uintptr(1); val+val > val; val += val { + x.i = val + if !CompareAndSwapUintptr(&x.i, val, val+1) { + t.Fatalf("should have swapped %#x %#x", val, val+1) + } + if x.i != val+1 { + t.Fatalf("wrong x.i after swap: x.i=%#x val+1=%#x", x.i, val+1) + } + x.i = val + 1 + if CompareAndSwapUintptr(&x.i, val, val+2) { + t.Fatalf("should not have swapped %#x %#x", val, val+2) + } + if x.i != val+1 { + t.Fatalf("wrong x.i after swap: x.i=%#x val+1=%#x", x.i, val+1) + } + } + if x.before != magicptr || x.after != magicptr { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magicptr, magicptr) + } +} + +func TestCompareAndSwapPointer(t *testing.T) { + var x struct { + before uintptr + i unsafe.Pointer + after uintptr + } + var m uint64 = magic64 + magicptr := uintptr(m) + x.before = magicptr + x.after = magicptr + q := unsafe.Pointer(new(byte)) + for _, p := range testPointers() { + x.i = p + if !CompareAndSwapPointer(&x.i, p, q) { + t.Fatalf("should have swapped %p %p", p, q) + } + if x.i != q { + t.Fatalf("wrong x.i after swap: x.i=%p want %p", x.i, q) + } + if CompareAndSwapPointer(&x.i, p, nil) { + t.Fatalf("should not have swapped %p nil", p) + } + if x.i != q { + t.Fatalf("wrong x.i after swap: x.i=%p want %p", x.i, q) + } + } + if x.before != magicptr || x.after != magicptr { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magicptr, magicptr) + } +} + +func TestLoadInt32(t *testing.T) { + var x struct { + before int32 + i int32 + after int32 + } + x.before = magic32 + x.after = magic32 + for delta := int32(1); delta+delta > delta; delta += delta { + k := LoadInt32(&x.i) + if k != x.i { + t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k) + } + x.i += delta + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + +func TestLoadUint32(t *testing.T) { + var x struct { + before uint32 + i uint32 + after uint32 + } + x.before = magic32 + x.after = magic32 + for delta := uint32(1); delta+delta > delta; delta += delta { + k := LoadUint32(&x.i) + if k != x.i { + t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k) + } + x.i += delta + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + +func TestLoadInt64(t *testing.T) { + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + var x struct { + before int64 + i int64 + after int64 + } + x.before = magic64 + x.after = magic64 + for delta := int64(1); delta+delta > delta; delta += delta { + k := LoadInt64(&x.i) + if k != x.i { + t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k) + } + x.i += delta + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + +func TestLoadUint64(t *testing.T) { + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + var x struct { + before uint64 + i uint64 + after uint64 + } + x.before = magic64 + x.after = magic64 + for delta := uint64(1); delta+delta > delta; delta += delta { + k := LoadUint64(&x.i) + if k != x.i { + t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k) + } + x.i += delta + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + +func TestLoadUintptr(t *testing.T) { + var x struct { + before uintptr + i uintptr + after uintptr + } + var m uint64 = magic64 + magicptr := uintptr(m) + x.before = magicptr + x.after = magicptr + for delta := uintptr(1); delta+delta > delta; delta += delta { + k := LoadUintptr(&x.i) + if k != x.i { + t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k) + } + x.i += delta + } + if x.before != magicptr || x.after != magicptr { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magicptr, magicptr) + } +} + +func TestLoadPointer(t *testing.T) { + var x struct { + before uintptr + i unsafe.Pointer + after uintptr + } + var m uint64 = magic64 + magicptr := uintptr(m) + x.before = magicptr + x.after = magicptr + for _, p := range testPointers() { + x.i = p + k := LoadPointer(&x.i) + if k != p { + t.Fatalf("p=%x k=%x", p, k) + } + } + if x.before != magicptr || x.after != magicptr { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magicptr, magicptr) + } +} + +func TestStoreInt32(t *testing.T) { + var x struct { + before int32 + i int32 + after int32 + } + x.before = magic32 + x.after = magic32 + v := int32(0) + for delta := int32(1); delta+delta > delta; delta += delta { + StoreInt32(&x.i, v) + if x.i != v { + t.Fatalf("delta=%d i=%d v=%d", delta, x.i, v) + } + v += delta + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + +func TestStoreUint32(t *testing.T) { + var x struct { + before uint32 + i uint32 + after uint32 + } + x.before = magic32 + x.after = magic32 + v := uint32(0) + for delta := uint32(1); delta+delta > delta; delta += delta { + StoreUint32(&x.i, v) + if x.i != v { + t.Fatalf("delta=%d i=%d v=%d", delta, x.i, v) + } + v += delta + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + +func TestStoreInt64(t *testing.T) { + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + var x struct { + before int64 + i int64 + after int64 + } + x.before = magic64 + x.after = magic64 + v := int64(0) + for delta := int64(1); delta+delta > delta; delta += delta { + StoreInt64(&x.i, v) + if x.i != v { + t.Fatalf("delta=%d i=%d v=%d", delta, x.i, v) + } + v += delta + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + +func TestStoreUint64(t *testing.T) { + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + var x struct { + before uint64 + i uint64 + after uint64 + } + x.before = magic64 + x.after = magic64 + v := uint64(0) + for delta := uint64(1); delta+delta > delta; delta += delta { + StoreUint64(&x.i, v) + if x.i != v { + t.Fatalf("delta=%d i=%d v=%d", delta, x.i, v) + } + v += delta + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + +func TestStoreUintptr(t *testing.T) { + var x struct { + before uintptr + i uintptr + after uintptr + } + var m uint64 = magic64 + magicptr := uintptr(m) + x.before = magicptr + x.after = magicptr + v := uintptr(0) + for delta := uintptr(1); delta+delta > delta; delta += delta { + StoreUintptr(&x.i, v) + if x.i != v { + t.Fatalf("delta=%d i=%d v=%d", delta, x.i, v) + } + v += delta + } + if x.before != magicptr || x.after != magicptr { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magicptr, magicptr) + } +} + +func TestStorePointer(t *testing.T) { + var x struct { + before uintptr + i unsafe.Pointer + after uintptr + } + var m uint64 = magic64 + magicptr := uintptr(m) + x.before = magicptr + x.after = magicptr + for _, p := range testPointers() { + StorePointer(&x.i, p) + if x.i != p { + t.Fatalf("x.i=%p p=%p", x.i, p) + } + } + if x.before != magicptr || x.after != magicptr { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magicptr, magicptr) + } +} + +// Tests of correct behavior, with contention. +// (Is the function atomic?) +// +// For each function, we write a "hammer" function that repeatedly +// uses the atomic operation to add 1 to a value. After running +// multiple hammers in parallel, check that we end with the correct +// total. +// Swap can't add 1, so it uses a different scheme. +// The functions repeatedly generate a pseudo-random number such that +// low bits are equal to high bits, swap, check that the old value +// has low and high bits equal. + +var hammer32 = map[string]func(*uint32, int){ + "SwapInt32": hammerSwapInt32, + "SwapUint32": hammerSwapUint32, + "SwapUintptr": hammerSwapUintptr32, + "AddInt32": hammerAddInt32, + "AddUint32": hammerAddUint32, + "AddUintptr": hammerAddUintptr32, + "CompareAndSwapInt32": hammerCompareAndSwapInt32, + "CompareAndSwapUint32": hammerCompareAndSwapUint32, + "CompareAndSwapUintptr": hammerCompareAndSwapUintptr32, +} + +func init() { + var v uint64 = 1 << 50 + if uintptr(v) != 0 { + // 64-bit system; clear uintptr tests + delete(hammer32, "SwapUintptr") + delete(hammer32, "AddUintptr") + delete(hammer32, "CompareAndSwapUintptr") + } +} + +func hammerSwapInt32(uaddr *uint32, count int) { + addr := (*int32)(unsafe.Pointer(uaddr)) + seed := int(uintptr(unsafe.Pointer(&count))) + for i := 0; i < count; i++ { + new := uint32(seed+i)<<16 | uint32(seed+i)<<16>>16 + old := uint32(SwapInt32(addr, int32(new))) + if old>>16 != old<<16>>16 { + panic(fmt.Sprintf("SwapInt32 is not atomic: %v", old)) + } + } +} + +func hammerSwapUint32(addr *uint32, count int) { + seed := int(uintptr(unsafe.Pointer(&count))) + for i := 0; i < count; i++ { + new := uint32(seed+i)<<16 | uint32(seed+i)<<16>>16 + old := SwapUint32(addr, new) + if old>>16 != old<<16>>16 { + panic(fmt.Sprintf("SwapUint32 is not atomic: %v", old)) + } + } +} + +func hammerSwapUintptr32(uaddr *uint32, count int) { + // only safe when uintptr is 32-bit. + // not called on 64-bit systems. + addr := (*uintptr)(unsafe.Pointer(uaddr)) + seed := int(uintptr(unsafe.Pointer(&count))) + for i := 0; i < count; i++ { + new := uintptr(seed+i)<<16 | uintptr(seed+i)<<16>>16 + old := SwapUintptr(addr, new) + if old>>16 != old<<16>>16 { + panic(fmt.Sprintf("SwapUintptr is not atomic: %#08x", old)) + } + } +} + +func hammerAddInt32(uaddr *uint32, count int) { + addr := (*int32)(unsafe.Pointer(uaddr)) + for i := 0; i < count; i++ { + AddInt32(addr, 1) + } +} + +func hammerAddUint32(addr *uint32, count int) { + for i := 0; i < count; i++ { + AddUint32(addr, 1) + } +} + +func hammerAddUintptr32(uaddr *uint32, count int) { + // only safe when uintptr is 32-bit. + // not called on 64-bit systems. + addr := (*uintptr)(unsafe.Pointer(uaddr)) + for i := 0; i < count; i++ { + AddUintptr(addr, 1) + } +} + +func hammerCompareAndSwapInt32(uaddr *uint32, count int) { + addr := (*int32)(unsafe.Pointer(uaddr)) + for i := 0; i < count; i++ { + for { + v := LoadInt32(addr) + if CompareAndSwapInt32(addr, v, v+1) { + break + } + } + } +} + +func hammerCompareAndSwapUint32(addr *uint32, count int) { + for i := 0; i < count; i++ { + for { + v := LoadUint32(addr) + if CompareAndSwapUint32(addr, v, v+1) { + break + } + } + } +} + +func hammerCompareAndSwapUintptr32(uaddr *uint32, count int) { + // only safe when uintptr is 32-bit. + // not called on 64-bit systems. + addr := (*uintptr)(unsafe.Pointer(uaddr)) + for i := 0; i < count; i++ { + for { + v := LoadUintptr(addr) + if CompareAndSwapUintptr(addr, v, v+1) { + break + } + } + } +} + +func TestHammer32(t *testing.T) { + const p = 4 + n := 100000 + if testing.Short() { + n = 1000 + } + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(p)) + + for name, testf := range hammer32 { + c := make(chan int) + var val uint32 + for i := 0; i < p; i++ { + go func() { + defer func() { + if err := recover(); err != nil { + t.Error(err.(string)) + } + c <- 1 + }() + testf(&val, n) + }() + } + for i := 0; i < p; i++ { + <-c + } + if !strings.HasPrefix(name, "Swap") && val != uint32(n)*p { + t.Fatalf("%s: val=%d want %d", name, val, n*p) + } + } +} + +var hammer64 = map[string]func(*uint64, int){ + "SwapInt64": hammerSwapInt64, + "SwapUint64": hammerSwapUint64, + "SwapUintptr": hammerSwapUintptr64, + "AddInt64": hammerAddInt64, + "AddUint64": hammerAddUint64, + "AddUintptr": hammerAddUintptr64, + "CompareAndSwapInt64": hammerCompareAndSwapInt64, + "CompareAndSwapUint64": hammerCompareAndSwapUint64, + "CompareAndSwapUintptr": hammerCompareAndSwapUintptr64, +} + +func init() { + var v uint64 = 1 << 50 + if uintptr(v) == 0 { + // 32-bit system; clear uintptr tests + delete(hammer64, "SwapUintptr") + delete(hammer64, "AddUintptr") + delete(hammer64, "CompareAndSwapUintptr") + } +} + +func hammerSwapInt64(uaddr *uint64, count int) { + addr := (*int64)(unsafe.Pointer(uaddr)) + seed := int(uintptr(unsafe.Pointer(&count))) + for i := 0; i < count; i++ { + new := uint64(seed+i)<<32 | uint64(seed+i)<<32>>32 + old := uint64(SwapInt64(addr, int64(new))) + if old>>32 != old<<32>>32 { + panic(fmt.Sprintf("SwapInt64 is not atomic: %v", old)) + } + } +} + +func hammerSwapUint64(addr *uint64, count int) { + seed := int(uintptr(unsafe.Pointer(&count))) + for i := 0; i < count; i++ { + new := uint64(seed+i)<<32 | uint64(seed+i)<<32>>32 + old := SwapUint64(addr, new) + if old>>32 != old<<32>>32 { + panic(fmt.Sprintf("SwapUint64 is not atomic: %v", old)) + } + } +} + +const arch32 = unsafe.Sizeof(uintptr(0)) == 4 + +func hammerSwapUintptr64(uaddr *uint64, count int) { + // only safe when uintptr is 64-bit. + // not called on 32-bit systems. + if !arch32 { + addr := (*uintptr)(unsafe.Pointer(uaddr)) + seed := int(uintptr(unsafe.Pointer(&count))) + for i := 0; i < count; i++ { + new := uintptr(seed+i)<<32 | uintptr(seed+i)<<32>>32 + old := SwapUintptr(addr, new) + if old>>32 != old<<32>>32 { + panic(fmt.Sprintf("SwapUintptr is not atomic: %v", old)) + } + } + } +} + +func hammerAddInt64(uaddr *uint64, count int) { + addr := (*int64)(unsafe.Pointer(uaddr)) + for i := 0; i < count; i++ { + AddInt64(addr, 1) + } +} + +func hammerAddUint64(addr *uint64, count int) { + for i := 0; i < count; i++ { + AddUint64(addr, 1) + } +} + +func hammerAddUintptr64(uaddr *uint64, count int) { + // only safe when uintptr is 64-bit. + // not called on 32-bit systems. + addr := (*uintptr)(unsafe.Pointer(uaddr)) + for i := 0; i < count; i++ { + AddUintptr(addr, 1) + } +} + +func hammerCompareAndSwapInt64(uaddr *uint64, count int) { + addr := (*int64)(unsafe.Pointer(uaddr)) + for i := 0; i < count; i++ { + for { + v := LoadInt64(addr) + if CompareAndSwapInt64(addr, v, v+1) { + break + } + } + } +} + +func hammerCompareAndSwapUint64(addr *uint64, count int) { + for i := 0; i < count; i++ { + for { + v := LoadUint64(addr) + if CompareAndSwapUint64(addr, v, v+1) { + break + } + } + } +} + +func hammerCompareAndSwapUintptr64(uaddr *uint64, count int) { + // only safe when uintptr is 64-bit. + // not called on 32-bit systems. + addr := (*uintptr)(unsafe.Pointer(uaddr)) + for i := 0; i < count; i++ { + for { + v := LoadUintptr(addr) + if CompareAndSwapUintptr(addr, v, v+1) { + break + } + } + } +} + +func TestHammer64(t *testing.T) { + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + const p = 4 + n := 100000 + if testing.Short() { + n = 1000 + } + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(p)) + + for name, testf := range hammer64 { + c := make(chan int) + var val uint64 + for i := 0; i < p; i++ { + go func() { + defer func() { + if err := recover(); err != nil { + t.Error(err.(string)) + } + c <- 1 + }() + testf(&val, n) + }() + } + for i := 0; i < p; i++ { + <-c + } + if !strings.HasPrefix(name, "Swap") && val != uint64(n)*p { + t.Fatalf("%s: val=%d want %d", name, val, n*p) + } + } +} + +func hammerStoreLoadInt32(t *testing.T, paddr unsafe.Pointer) { + addr := (*int32)(paddr) + v := LoadInt32(addr) + vlo := v & ((1 << 16) - 1) + vhi := v >> 16 + if vlo != vhi { + t.Fatalf("Int32: %#x != %#x", vlo, vhi) + } + new := v + 1 + 1<<16 + if vlo == 1e4 { + new = 0 + } + StoreInt32(addr, new) +} + +func hammerStoreLoadUint32(t *testing.T, paddr unsafe.Pointer) { + addr := (*uint32)(paddr) + v := LoadUint32(addr) + vlo := v & ((1 << 16) - 1) + vhi := v >> 16 + if vlo != vhi { + t.Fatalf("Uint32: %#x != %#x", vlo, vhi) + } + new := v + 1 + 1<<16 + if vlo == 1e4 { + new = 0 + } + StoreUint32(addr, new) +} + +func hammerStoreLoadInt64(t *testing.T, paddr unsafe.Pointer) { + addr := (*int64)(paddr) + v := LoadInt64(addr) + vlo := v & ((1 << 32) - 1) + vhi := v >> 32 + if vlo != vhi { + t.Fatalf("Int64: %#x != %#x", vlo, vhi) + } + new := v + 1 + 1<<32 + StoreInt64(addr, new) +} + +func hammerStoreLoadUint64(t *testing.T, paddr unsafe.Pointer) { + addr := (*uint64)(paddr) + v := LoadUint64(addr) + vlo := v & ((1 << 32) - 1) + vhi := v >> 32 + if vlo != vhi { + t.Fatalf("Uint64: %#x != %#x", vlo, vhi) + } + new := v + 1 + 1<<32 + StoreUint64(addr, new) +} + +func hammerStoreLoadUintptr(t *testing.T, paddr unsafe.Pointer) { + addr := (*uintptr)(paddr) + v := LoadUintptr(addr) + new := v + if arch32 { + vlo := v & ((1 << 16) - 1) + vhi := v >> 16 + if vlo != vhi { + t.Fatalf("Uintptr: %#x != %#x", vlo, vhi) + } + new = v + 1 + 1<<16 + if vlo == 1e4 { + new = 0 + } + } else { + vlo := v & ((1 << 32) - 1) + vhi := v >> 32 + if vlo != vhi { + t.Fatalf("Uintptr: %#x != %#x", vlo, vhi) + } + inc := uint64(1 + 1<<32) + new = v + uintptr(inc) + } + StoreUintptr(addr, new) +} + +//go:nocheckptr +// This code is just testing that LoadPointer/StorePointer operate +// atomically; it's not actually calculating pointers. +func hammerStoreLoadPointer(t *testing.T, paddr unsafe.Pointer) { + addr := (*unsafe.Pointer)(paddr) + v := uintptr(LoadPointer(addr)) + new := v + if arch32 { + vlo := v & ((1 << 16) - 1) + vhi := v >> 16 + if vlo != vhi { + t.Fatalf("Pointer: %#x != %#x", vlo, vhi) + } + new = v + 1 + 1<<16 + if vlo == 1e4 { + new = 0 + } + } else { + vlo := v & ((1 << 32) - 1) + vhi := v >> 32 + if vlo != vhi { + t.Fatalf("Pointer: %#x != %#x", vlo, vhi) + } + inc := uint64(1 + 1<<32) + new = v + uintptr(inc) + } + StorePointer(addr, unsafe.Pointer(new)) +} + +func TestHammerStoreLoad(t *testing.T) { + var tests []func(*testing.T, unsafe.Pointer) + tests = append(tests, hammerStoreLoadInt32, hammerStoreLoadUint32, + hammerStoreLoadUintptr, hammerStoreLoadPointer) + if test64err == nil { + tests = append(tests, hammerStoreLoadInt64, hammerStoreLoadUint64) + } + n := int(1e6) + if testing.Short() { + n = int(1e4) + } + const procs = 8 + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(procs)) + for _, tt := range tests { + c := make(chan int) + var val uint64 + for p := 0; p < procs; p++ { + go func() { + for i := 0; i < n; i++ { + tt(t, unsafe.Pointer(&val)) + } + c <- 1 + }() + } + for p := 0; p < procs; p++ { + <-c + } + } +} + +func TestStoreLoadSeqCst32(t *testing.T) { + if runtime.NumCPU() == 1 { + t.Skipf("Skipping test on %v processor machine", runtime.NumCPU()) + } + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4)) + N := int32(1e3) + if testing.Short() { + N = int32(1e2) + } + c := make(chan bool, 2) + X := [2]int32{} + ack := [2][3]int32{{-1, -1, -1}, {-1, -1, -1}} + for p := 0; p < 2; p++ { + go func(me int) { + he := 1 - me + for i := int32(1); i < N; i++ { + StoreInt32(&X[me], i) + my := LoadInt32(&X[he]) + StoreInt32(&ack[me][i%3], my) + for w := 1; LoadInt32(&ack[he][i%3]) == -1; w++ { + if w%1000 == 0 { + runtime.Gosched() + } + } + his := LoadInt32(&ack[he][i%3]) + if (my != i && my != i-1) || (his != i && his != i-1) { + t.Errorf("invalid values: %d/%d (%d)", my, his, i) + break + } + if my != i && his != i { + t.Errorf("store/load are not sequentially consistent: %d/%d (%d)", my, his, i) + break + } + StoreInt32(&ack[me][(i-1)%3], -1) + } + c <- true + }(p) + } + <-c + <-c +} + +func TestStoreLoadSeqCst64(t *testing.T) { + if runtime.NumCPU() == 1 { + t.Skipf("Skipping test on %v processor machine", runtime.NumCPU()) + } + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4)) + N := int64(1e3) + if testing.Short() { + N = int64(1e2) + } + c := make(chan bool, 2) + X := [2]int64{} + ack := [2][3]int64{{-1, -1, -1}, {-1, -1, -1}} + for p := 0; p < 2; p++ { + go func(me int) { + he := 1 - me + for i := int64(1); i < N; i++ { + StoreInt64(&X[me], i) + my := LoadInt64(&X[he]) + StoreInt64(&ack[me][i%3], my) + for w := 1; LoadInt64(&ack[he][i%3]) == -1; w++ { + if w%1000 == 0 { + runtime.Gosched() + } + } + his := LoadInt64(&ack[he][i%3]) + if (my != i && my != i-1) || (his != i && his != i-1) { + t.Errorf("invalid values: %d/%d (%d)", my, his, i) + break + } + if my != i && his != i { + t.Errorf("store/load are not sequentially consistent: %d/%d (%d)", my, his, i) + break + } + StoreInt64(&ack[me][(i-1)%3], -1) + } + c <- true + }(p) + } + <-c + <-c +} + +func TestStoreLoadRelAcq32(t *testing.T) { + if runtime.NumCPU() == 1 { + t.Skipf("Skipping test on %v processor machine", runtime.NumCPU()) + } + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4)) + N := int32(1e3) + if testing.Short() { + N = int32(1e2) + } + c := make(chan bool, 2) + type Data struct { + signal int32 + pad1 [128]int8 + data1 int32 + pad2 [128]int8 + data2 float32 + } + var X Data + for p := int32(0); p < 2; p++ { + go func(p int32) { + for i := int32(1); i < N; i++ { + if (i+p)%2 == 0 { + X.data1 = i + X.data2 = float32(i) + StoreInt32(&X.signal, i) + } else { + for w := 1; LoadInt32(&X.signal) != i; w++ { + if w%1000 == 0 { + runtime.Gosched() + } + } + d1 := X.data1 + d2 := X.data2 + if d1 != i || d2 != float32(i) { + t.Errorf("incorrect data: %d/%g (%d)", d1, d2, i) + break + } + } + } + c <- true + }(p) + } + <-c + <-c +} + +func TestStoreLoadRelAcq64(t *testing.T) { + if runtime.NumCPU() == 1 { + t.Skipf("Skipping test on %v processor machine", runtime.NumCPU()) + } + if test64err != nil { + t.Skipf("Skipping 64-bit tests: %v", test64err) + } + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4)) + N := int64(1e3) + if testing.Short() { + N = int64(1e2) + } + c := make(chan bool, 2) + type Data struct { + signal int64 + pad1 [128]int8 + data1 int64 + pad2 [128]int8 + data2 float64 + } + var X Data + for p := int64(0); p < 2; p++ { + go func(p int64) { + for i := int64(1); i < N; i++ { + if (i+p)%2 == 0 { + X.data1 = i + X.data2 = float64(i) + StoreInt64(&X.signal, i) + } else { + for w := 1; LoadInt64(&X.signal) != i; w++ { + if w%1000 == 0 { + runtime.Gosched() + } + } + d1 := X.data1 + d2 := X.data2 + if d1 != i || d2 != float64(i) { + t.Errorf("incorrect data: %d/%g (%d)", d1, d2, i) + break + } + } + } + c <- true + }(p) + } + <-c + <-c +} + +func shouldPanic(t *testing.T, name string, f func()) { + defer func() { + // Check that all GC maps are sane. + runtime.GC() + + err := recover() + want := "unaligned 64-bit atomic operation" + if err == nil { + t.Errorf("%s did not panic", name) + } else if s, _ := err.(string); s != want { + t.Errorf("%s: wanted panic %q, got %q", name, want, err) + } + }() + f() +} + +func TestUnaligned64(t *testing.T) { + // Unaligned 64-bit atomics on 32-bit systems are + // a continual source of pain. Test that on 32-bit systems they crash + // instead of failing silently. + if !arch32 { + t.Skip("test only runs on 32-bit systems") + } + + x := make([]uint32, 4) + p := (*uint64)(unsafe.Pointer(&x[1])) // misaligned + + shouldPanic(t, "LoadUint64", func() { LoadUint64(p) }) + shouldPanic(t, "StoreUint64", func() { StoreUint64(p, 1) }) + shouldPanic(t, "CompareAndSwapUint64", func() { CompareAndSwapUint64(p, 1, 2) }) + shouldPanic(t, "AddUint64", func() { AddUint64(p, 3) }) +} + +func TestNilDeref(t *testing.T) { + funcs := [...]func(){ + func() { CompareAndSwapInt32(nil, 0, 0) }, + func() { CompareAndSwapInt64(nil, 0, 0) }, + func() { CompareAndSwapUint32(nil, 0, 0) }, + func() { CompareAndSwapUint64(nil, 0, 0) }, + func() { CompareAndSwapUintptr(nil, 0, 0) }, + func() { CompareAndSwapPointer(nil, nil, nil) }, + func() { SwapInt32(nil, 0) }, + func() { SwapUint32(nil, 0) }, + func() { SwapInt64(nil, 0) }, + func() { SwapUint64(nil, 0) }, + func() { SwapUintptr(nil, 0) }, + func() { SwapPointer(nil, nil) }, + func() { AddInt32(nil, 0) }, + func() { AddUint32(nil, 0) }, + func() { AddInt64(nil, 0) }, + func() { AddUint64(nil, 0) }, + func() { AddUintptr(nil, 0) }, + func() { LoadInt32(nil) }, + func() { LoadInt64(nil) }, + func() { LoadUint32(nil) }, + func() { LoadUint64(nil) }, + func() { LoadUintptr(nil) }, + func() { LoadPointer(nil) }, + func() { StoreInt32(nil, 0) }, + func() { StoreInt64(nil, 0) }, + func() { StoreUint32(nil, 0) }, + func() { StoreUint64(nil, 0) }, + func() { StoreUintptr(nil, 0) }, + func() { StorePointer(nil, nil) }, + } + for _, f := range funcs { + func() { + defer func() { + runtime.GC() + recover() + }() + f() + }() + } +} diff --git a/src/sync/atomic/doc.go b/src/sync/atomic/doc.go new file mode 100644 index 0000000..805ef95 --- /dev/null +++ b/src/sync/atomic/doc.go @@ -0,0 +1,144 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package atomic provides low-level atomic memory primitives +// useful for implementing synchronization algorithms. +// +// These functions require great care to be used correctly. +// Except for special, low-level applications, synchronization is better +// done with channels or the facilities of the sync package. +// Share memory by communicating; +// don't communicate by sharing memory. +// +// The swap operation, implemented by the SwapT functions, is the atomic +// equivalent of: +// +// old = *addr +// *addr = new +// return old +// +// The compare-and-swap operation, implemented by the CompareAndSwapT +// functions, is the atomic equivalent of: +// +// if *addr == old { +// *addr = new +// return true +// } +// return false +// +// The add operation, implemented by the AddT functions, is the atomic +// equivalent of: +// +// *addr += delta +// return *addr +// +// The load and store operations, implemented by the LoadT and StoreT +// functions, are the atomic equivalents of "return *addr" and +// "*addr = val". +// +package atomic + +import ( + "unsafe" +) + +// BUG(rsc): On 386, the 64-bit functions use instructions unavailable before the Pentium MMX. +// +// On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core. +// +// On ARM, 386, and 32-bit MIPS, it is the caller's responsibility +// to arrange for 64-bit alignment of 64-bit words accessed atomically. +// The first word in a variable or in an allocated struct, array, or slice can +// be relied upon to be 64-bit aligned. + +// SwapInt32 atomically stores new into *addr and returns the previous *addr value. +func SwapInt32(addr *int32, new int32) (old int32) + +// SwapInt64 atomically stores new into *addr and returns the previous *addr value. +func SwapInt64(addr *int64, new int64) (old int64) + +// SwapUint32 atomically stores new into *addr and returns the previous *addr value. +func SwapUint32(addr *uint32, new uint32) (old uint32) + +// SwapUint64 atomically stores new into *addr and returns the previous *addr value. +func SwapUint64(addr *uint64, new uint64) (old uint64) + +// SwapUintptr atomically stores new into *addr and returns the previous *addr value. +func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) + +// SwapPointer atomically stores new into *addr and returns the previous *addr value. +func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) + +// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value. +func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) + +// CompareAndSwapInt64 executes the compare-and-swap operation for an int64 value. +func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) + +// CompareAndSwapUint32 executes the compare-and-swap operation for a uint32 value. +func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) + +// CompareAndSwapUint64 executes the compare-and-swap operation for a uint64 value. +func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) + +// CompareAndSwapUintptr executes the compare-and-swap operation for a uintptr value. +func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) + +// CompareAndSwapPointer executes the compare-and-swap operation for a unsafe.Pointer value. +func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) + +// AddInt32 atomically adds delta to *addr and returns the new value. +func AddInt32(addr *int32, delta int32) (new int32) + +// AddUint32 atomically adds delta to *addr and returns the new value. +// To subtract a signed positive constant value c from x, do AddUint32(&x, ^uint32(c-1)). +// In particular, to decrement x, do AddUint32(&x, ^uint32(0)). +func AddUint32(addr *uint32, delta uint32) (new uint32) + +// AddInt64 atomically adds delta to *addr and returns the new value. +func AddInt64(addr *int64, delta int64) (new int64) + +// AddUint64 atomically adds delta to *addr and returns the new value. +// To subtract a signed positive constant value c from x, do AddUint64(&x, ^uint64(c-1)). +// In particular, to decrement x, do AddUint64(&x, ^uint64(0)). +func AddUint64(addr *uint64, delta uint64) (new uint64) + +// AddUintptr atomically adds delta to *addr and returns the new value. +func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) + +// LoadInt32 atomically loads *addr. +func LoadInt32(addr *int32) (val int32) + +// LoadInt64 atomically loads *addr. +func LoadInt64(addr *int64) (val int64) + +// LoadUint32 atomically loads *addr. +func LoadUint32(addr *uint32) (val uint32) + +// LoadUint64 atomically loads *addr. +func LoadUint64(addr *uint64) (val uint64) + +// LoadUintptr atomically loads *addr. +func LoadUintptr(addr *uintptr) (val uintptr) + +// LoadPointer atomically loads *addr. +func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) + +// StoreInt32 atomically stores val into *addr. +func StoreInt32(addr *int32, val int32) + +// StoreInt64 atomically stores val into *addr. +func StoreInt64(addr *int64, val int64) + +// StoreUint32 atomically stores val into *addr. +func StoreUint32(addr *uint32, val uint32) + +// StoreUint64 atomically stores val into *addr. +func StoreUint64(addr *uint64, val uint64) + +// StoreUintptr atomically stores val into *addr. +func StoreUintptr(addr *uintptr, val uintptr) + +// StorePointer atomically stores val into *addr. +func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) diff --git a/src/sync/atomic/example_test.go b/src/sync/atomic/example_test.go new file mode 100644 index 0000000..09ae0aa --- /dev/null +++ b/src/sync/atomic/example_test.go @@ -0,0 +1,76 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package atomic_test + +import ( + "sync" + "sync/atomic" + "time" +) + +func loadConfig() map[string]string { + return make(map[string]string) +} + +func requests() chan int { + return make(chan int) +} + +// The following example shows how to use Value for periodic program config updates +// and propagation of the changes to worker goroutines. +func ExampleValue_config() { + var config atomic.Value // holds current server configuration + // Create initial config value and store into config. + config.Store(loadConfig()) + go func() { + // Reload config every 10 seconds + // and update config value with the new version. + for { + time.Sleep(10 * time.Second) + config.Store(loadConfig()) + } + }() + // Create worker goroutines that handle incoming requests + // using the latest config value. + for i := 0; i < 10; i++ { + go func() { + for r := range requests() { + c := config.Load() + // Handle request r using config c. + _, _ = r, c + } + }() + } +} + +// The following example shows how to maintain a scalable frequently read, +// but infrequently updated data structure using copy-on-write idiom. +func ExampleValue_readMostly() { + type Map map[string]string + var m atomic.Value + m.Store(make(Map)) + var mu sync.Mutex // used only by writers + // read function can be used to read the data without further synchronization + read := func(key string) (val string) { + m1 := m.Load().(Map) + return m1[key] + } + // insert function can be used to update the data without further synchronization + insert := func(key, val string) { + mu.Lock() // synchronize with other potential writers + defer mu.Unlock() + m1 := m.Load().(Map) // load current value of the data structure + m2 := make(Map) // create a new value + for k, v := range m1 { + m2[k] = v // copy all data from the current object to the new one + } + m2[key] = val // do the update that we need + m.Store(m2) // atomically replace the current object with the new one + // At this point all new readers start working with the new version. + // The old version will be garbage collected once the existing readers + // (if any) are done with it. + } + _, _ = read, insert +} diff --git a/src/sync/atomic/race.s b/src/sync/atomic/race.s new file mode 100644 index 0000000..fd6ca22 --- /dev/null +++ b/src/sync/atomic/race.s @@ -0,0 +1,8 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build race + +// This file is here only to allow external functions. +// The operations are implemented in src/runtime/race_amd64.s diff --git a/src/sync/atomic/value.go b/src/sync/atomic/value.go new file mode 100644 index 0000000..eab7e70 --- /dev/null +++ b/src/sync/atomic/value.go @@ -0,0 +1,86 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package atomic + +import ( + "unsafe" +) + +// A Value provides an atomic load and store of a consistently typed value. +// The zero value for a Value returns nil from Load. +// Once Store has been called, a Value must not be copied. +// +// A Value must not be copied after first use. +type Value struct { + v interface{} +} + +// ifaceWords is interface{} internal representation. +type ifaceWords struct { + typ unsafe.Pointer + data unsafe.Pointer +} + +// Load returns the value set by the most recent Store. +// It returns nil if there has been no call to Store for this Value. +func (v *Value) Load() (x interface{}) { + vp := (*ifaceWords)(unsafe.Pointer(v)) + typ := LoadPointer(&vp.typ) + if typ == nil || uintptr(typ) == ^uintptr(0) { + // First store not yet completed. + return nil + } + data := LoadPointer(&vp.data) + xp := (*ifaceWords)(unsafe.Pointer(&x)) + xp.typ = typ + xp.data = data + return +} + +// Store sets the value of the Value to x. +// All calls to Store for a given Value must use values of the same concrete type. +// Store of an inconsistent type panics, as does Store(nil). +func (v *Value) Store(x interface{}) { + if x == nil { + panic("sync/atomic: store of nil value into Value") + } + vp := (*ifaceWords)(unsafe.Pointer(v)) + xp := (*ifaceWords)(unsafe.Pointer(&x)) + for { + typ := LoadPointer(&vp.typ) + if typ == nil { + // Attempt to start first store. + // Disable preemption so that other goroutines can use + // active spin wait to wait for completion; and so that + // GC does not see the fake type accidentally. + runtime_procPin() + if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) { + runtime_procUnpin() + continue + } + // Complete first store. + StorePointer(&vp.data, xp.data) + StorePointer(&vp.typ, xp.typ) + runtime_procUnpin() + return + } + if uintptr(typ) == ^uintptr(0) { + // First store in progress. Wait. + // Since we disable preemption around the first store, + // we can wait with active spinning. + continue + } + // First store completed. Check type and overwrite data. + if typ != xp.typ { + panic("sync/atomic: store of inconsistently typed value into Value") + } + StorePointer(&vp.data, xp.data) + return + } +} + +// Disable/enable preemption, implemented in runtime. +func runtime_procPin() +func runtime_procUnpin() diff --git a/src/sync/atomic/value_test.go b/src/sync/atomic/value_test.go new file mode 100644 index 0000000..f289766 --- /dev/null +++ b/src/sync/atomic/value_test.go @@ -0,0 +1,135 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package atomic_test + +import ( + "math/rand" + "runtime" + . "sync/atomic" + "testing" +) + +func TestValue(t *testing.T) { + var v Value + if v.Load() != nil { + t.Fatal("initial Value is not nil") + } + v.Store(42) + x := v.Load() + if xx, ok := x.(int); !ok || xx != 42 { + t.Fatalf("wrong value: got %+v, want 42", x) + } + v.Store(84) + x = v.Load() + if xx, ok := x.(int); !ok || xx != 84 { + t.Fatalf("wrong value: got %+v, want 84", x) + } +} + +func TestValueLarge(t *testing.T) { + var v Value + v.Store("foo") + x := v.Load() + if xx, ok := x.(string); !ok || xx != "foo" { + t.Fatalf("wrong value: got %+v, want foo", x) + } + v.Store("barbaz") + x = v.Load() + if xx, ok := x.(string); !ok || xx != "barbaz" { + t.Fatalf("wrong value: got %+v, want barbaz", x) + } +} + +func TestValuePanic(t *testing.T) { + const nilErr = "sync/atomic: store of nil value into Value" + const badErr = "sync/atomic: store of inconsistently typed value into Value" + var v Value + func() { + defer func() { + err := recover() + if err != nilErr { + t.Fatalf("inconsistent store panic: got '%v', want '%v'", err, nilErr) + } + }() + v.Store(nil) + }() + v.Store(42) + func() { + defer func() { + err := recover() + if err != badErr { + t.Fatalf("inconsistent store panic: got '%v', want '%v'", err, badErr) + } + }() + v.Store("foo") + }() + func() { + defer func() { + err := recover() + if err != nilErr { + t.Fatalf("inconsistent store panic: got '%v', want '%v'", err, nilErr) + } + }() + v.Store(nil) + }() +} + +func TestValueConcurrent(t *testing.T) { + tests := [][]interface{}{ + {uint16(0), ^uint16(0), uint16(1 + 2<<8), uint16(3 + 4<<8)}, + {uint32(0), ^uint32(0), uint32(1 + 2<<16), uint32(3 + 4<<16)}, + {uint64(0), ^uint64(0), uint64(1 + 2<<32), uint64(3 + 4<<32)}, + {complex(0, 0), complex(1, 2), complex(3, 4), complex(5, 6)}, + } + p := 4 * runtime.GOMAXPROCS(0) + N := int(1e5) + if testing.Short() { + p /= 2 + N = 1e3 + } + for _, test := range tests { + var v Value + done := make(chan bool, p) + for i := 0; i < p; i++ { + go func() { + r := rand.New(rand.NewSource(rand.Int63())) + expected := true + loop: + for j := 0; j < N; j++ { + x := test[r.Intn(len(test))] + v.Store(x) + x = v.Load() + for _, x1 := range test { + if x == x1 { + continue loop + } + } + t.Logf("loaded unexpected value %+v, want %+v", x, test) + expected = false + break + } + done <- expected + }() + } + for i := 0; i < p; i++ { + if !<-done { + t.FailNow() + } + } + } +} + +func BenchmarkValueRead(b *testing.B) { + var v Value + v.Store(new(int)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + x := v.Load().(*int) + if *x != 0 { + b.Fatalf("wrong value: got %v, want 0", *x) + } + } + }) +} diff --git a/src/sync/cond.go b/src/sync/cond.go new file mode 100644 index 0000000..b254c93 --- /dev/null +++ b/src/sync/cond.go @@ -0,0 +1,98 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" +) + +// Cond implements a condition variable, a rendezvous point +// for goroutines waiting for or announcing the occurrence +// of an event. +// +// Each Cond has an associated Locker L (often a *Mutex or *RWMutex), +// which must be held when changing the condition and +// when calling the Wait method. +// +// A Cond must not be copied after first use. +type Cond struct { + noCopy noCopy + + // L is held while observing or changing the condition + L Locker + + notify notifyList + checker copyChecker +} + +// NewCond returns a new Cond with Locker l. +func NewCond(l Locker) *Cond { + return &Cond{L: l} +} + +// Wait atomically unlocks c.L and suspends execution +// of the calling goroutine. After later resuming execution, +// Wait locks c.L before returning. Unlike in other systems, +// Wait cannot return unless awoken by Broadcast or Signal. +// +// Because c.L is not locked when Wait first resumes, the caller +// typically cannot assume that the condition is true when +// Wait returns. Instead, the caller should Wait in a loop: +// +// c.L.Lock() +// for !condition() { +// c.Wait() +// } +// ... make use of condition ... +// c.L.Unlock() +// +func (c *Cond) Wait() { + c.checker.check() + t := runtime_notifyListAdd(&c.notify) + c.L.Unlock() + runtime_notifyListWait(&c.notify, t) + c.L.Lock() +} + +// Signal wakes one goroutine waiting on c, if there is any. +// +// It is allowed but not required for the caller to hold c.L +// during the call. +func (c *Cond) Signal() { + c.checker.check() + runtime_notifyListNotifyOne(&c.notify) +} + +// Broadcast wakes all goroutines waiting on c. +// +// It is allowed but not required for the caller to hold c.L +// during the call. +func (c *Cond) Broadcast() { + c.checker.check() + runtime_notifyListNotifyAll(&c.notify) +} + +// copyChecker holds back pointer to itself to detect object copying. +type copyChecker uintptr + +func (c *copyChecker) check() { + if uintptr(*c) != uintptr(unsafe.Pointer(c)) && + !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && + uintptr(*c) != uintptr(unsafe.Pointer(c)) { + panic("sync.Cond is copied") + } +} + +// noCopy may be embedded into structs which must not be copied +// after the first use. +// +// See https://golang.org/issues/8005#issuecomment-190753527 +// for details. +type noCopy struct{} + +// Lock is a no-op used by -copylocks checker from `go vet`. +func (*noCopy) Lock() {} +func (*noCopy) Unlock() {} diff --git a/src/sync/cond_test.go b/src/sync/cond_test.go new file mode 100644 index 0000000..859cae5 --- /dev/null +++ b/src/sync/cond_test.go @@ -0,0 +1,316 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + "reflect" + "runtime" + . "sync" + "testing" + "time" +) + +func TestCondSignal(t *testing.T) { + var m Mutex + c := NewCond(&m) + n := 2 + running := make(chan bool, n) + awake := make(chan bool, n) + for i := 0; i < n; i++ { + go func() { + m.Lock() + running <- true + c.Wait() + awake <- true + m.Unlock() + }() + } + for i := 0; i < n; i++ { + <-running // Wait for everyone to run. + } + for n > 0 { + select { + case <-awake: + t.Fatal("goroutine not asleep") + default: + } + m.Lock() + c.Signal() + m.Unlock() + <-awake // Will deadlock if no goroutine wakes up + select { + case <-awake: + t.Fatal("too many goroutines awake") + default: + } + n-- + } + c.Signal() +} + +func TestCondSignalGenerations(t *testing.T) { + var m Mutex + c := NewCond(&m) + n := 100 + running := make(chan bool, n) + awake := make(chan int, n) + for i := 0; i < n; i++ { + go func(i int) { + m.Lock() + running <- true + c.Wait() + awake <- i + m.Unlock() + }(i) + if i > 0 { + a := <-awake + if a != i-1 { + t.Fatalf("wrong goroutine woke up: want %d, got %d", i-1, a) + } + } + <-running + m.Lock() + c.Signal() + m.Unlock() + } +} + +func TestCondBroadcast(t *testing.T) { + var m Mutex + c := NewCond(&m) + n := 200 + running := make(chan int, n) + awake := make(chan int, n) + exit := false + for i := 0; i < n; i++ { + go func(g int) { + m.Lock() + for !exit { + running <- g + c.Wait() + awake <- g + } + m.Unlock() + }(i) + } + for i := 0; i < n; i++ { + for i := 0; i < n; i++ { + <-running // Will deadlock unless n are running. + } + if i == n-1 { + m.Lock() + exit = true + m.Unlock() + } + select { + case <-awake: + t.Fatal("goroutine not asleep") + default: + } + m.Lock() + c.Broadcast() + m.Unlock() + seen := make([]bool, n) + for i := 0; i < n; i++ { + g := <-awake + if seen[g] { + t.Fatal("goroutine woke up twice") + } + seen[g] = true + } + } + select { + case <-running: + t.Fatal("goroutine did not exit") + default: + } + c.Broadcast() +} + +func TestRace(t *testing.T) { + x := 0 + c := NewCond(&Mutex{}) + done := make(chan bool) + go func() { + c.L.Lock() + x = 1 + c.Wait() + if x != 2 { + t.Error("want 2") + } + x = 3 + c.Signal() + c.L.Unlock() + done <- true + }() + go func() { + c.L.Lock() + for { + if x == 1 { + x = 2 + c.Signal() + break + } + c.L.Unlock() + runtime.Gosched() + c.L.Lock() + } + c.L.Unlock() + done <- true + }() + go func() { + c.L.Lock() + for { + if x == 2 { + c.Wait() + if x != 3 { + t.Error("want 3") + } + break + } + if x == 3 { + break + } + c.L.Unlock() + runtime.Gosched() + c.L.Lock() + } + c.L.Unlock() + done <- true + }() + <-done + <-done + <-done +} + +func TestCondSignalStealing(t *testing.T) { + for iters := 0; iters < 1000; iters++ { + var m Mutex + cond := NewCond(&m) + + // Start a waiter. + ch := make(chan struct{}) + go func() { + m.Lock() + ch <- struct{}{} + cond.Wait() + m.Unlock() + + ch <- struct{}{} + }() + + <-ch + m.Lock() + m.Unlock() + + // We know that the waiter is in the cond.Wait() call because we + // synchronized with it, then acquired/released the mutex it was + // holding when we synchronized. + // + // Start two goroutines that will race: one will broadcast on + // the cond var, the other will wait on it. + // + // The new waiter may or may not get notified, but the first one + // has to be notified. + done := false + go func() { + cond.Broadcast() + }() + + go func() { + m.Lock() + for !done { + cond.Wait() + } + m.Unlock() + }() + + // Check that the first waiter does get signaled. + select { + case <-ch: + case <-time.After(2 * time.Second): + t.Fatalf("First waiter didn't get broadcast.") + } + + // Release the second waiter in case it didn't get the + // broadcast. + m.Lock() + done = true + m.Unlock() + cond.Broadcast() + } +} + +func TestCondCopy(t *testing.T) { + defer func() { + err := recover() + if err == nil || err.(string) != "sync.Cond is copied" { + t.Fatalf("got %v, expect sync.Cond is copied", err) + } + }() + c := Cond{L: &Mutex{}} + c.Signal() + var c2 Cond + reflect.ValueOf(&c2).Elem().Set(reflect.ValueOf(&c).Elem()) // c2 := c, hidden from vet + c2.Signal() +} + +func BenchmarkCond1(b *testing.B) { + benchmarkCond(b, 1) +} + +func BenchmarkCond2(b *testing.B) { + benchmarkCond(b, 2) +} + +func BenchmarkCond4(b *testing.B) { + benchmarkCond(b, 4) +} + +func BenchmarkCond8(b *testing.B) { + benchmarkCond(b, 8) +} + +func BenchmarkCond16(b *testing.B) { + benchmarkCond(b, 16) +} + +func BenchmarkCond32(b *testing.B) { + benchmarkCond(b, 32) +} + +func benchmarkCond(b *testing.B, waiters int) { + c := NewCond(&Mutex{}) + done := make(chan bool) + id := 0 + + for routine := 0; routine < waiters+1; routine++ { + go func() { + for i := 0; i < b.N; i++ { + c.L.Lock() + if id == -1 { + c.L.Unlock() + break + } + id++ + if id == waiters+1 { + id = 0 + c.Broadcast() + } else { + c.Wait() + } + c.L.Unlock() + } + c.L.Lock() + id = -1 + c.Broadcast() + c.L.Unlock() + done <- true + }() + } + for routine := 0; routine < waiters+1; routine++ { + <-done + } +} diff --git a/src/sync/example_pool_test.go b/src/sync/example_pool_test.go new file mode 100644 index 0000000..8288d41 --- /dev/null +++ b/src/sync/example_pool_test.go @@ -0,0 +1,45 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + "bytes" + "io" + "os" + "sync" + "time" +) + +var bufPool = sync.Pool{ + New: func() interface{} { + // The Pool's New function should generally only return pointer + // types, since a pointer can be put into the return interface + // value without an allocation: + return new(bytes.Buffer) + }, +} + +// timeNow is a fake version of time.Now for tests. +func timeNow() time.Time { + return time.Unix(1136214245, 0) +} + +func Log(w io.Writer, key, val string) { + b := bufPool.Get().(*bytes.Buffer) + b.Reset() + // Replace this with time.Now() in a real logger. + b.WriteString(timeNow().UTC().Format(time.RFC3339)) + b.WriteByte(' ') + b.WriteString(key) + b.WriteByte('=') + b.WriteString(val) + w.Write(b.Bytes()) + bufPool.Put(b) +} + +func ExamplePool() { + Log(os.Stdout, "path", "/search?q=flowers") + // Output: 2006-01-02T15:04:05Z path=/search?q=flowers +} diff --git a/src/sync/example_test.go b/src/sync/example_test.go new file mode 100644 index 0000000..bdd3af6 --- /dev/null +++ b/src/sync/example_test.go @@ -0,0 +1,59 @@ +// Copyright 2012 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + "fmt" + "sync" +) + +type httpPkg struct{} + +func (httpPkg) Get(url string) {} + +var http httpPkg + +// This example fetches several URLs concurrently, +// using a WaitGroup to block until all the fetches are complete. +func ExampleWaitGroup() { + var wg sync.WaitGroup + var urls = []string{ + "http://www.golang.org/", + "http://www.google.com/", + "http://www.somestupidname.com/", + } + for _, url := range urls { + // Increment the WaitGroup counter. + wg.Add(1) + // Launch a goroutine to fetch the URL. + go func(url string) { + // Decrement the counter when the goroutine completes. + defer wg.Done() + // Fetch the URL. + http.Get(url) + }(url) + } + // Wait for all HTTP fetches to complete. + wg.Wait() +} + +func ExampleOnce() { + var once sync.Once + onceBody := func() { + fmt.Println("Only once") + } + done := make(chan bool) + for i := 0; i < 10; i++ { + go func() { + once.Do(onceBody) + done <- true + }() + } + for i := 0; i < 10; i++ { + <-done + } + // Output: + // Only once +} diff --git a/src/sync/export_test.go b/src/sync/export_test.go new file mode 100644 index 0000000..ffbe567 --- /dev/null +++ b/src/sync/export_test.go @@ -0,0 +1,57 @@ +// Copyright 2012 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +// Export for testing. +var Runtime_Semacquire = runtime_Semacquire +var Runtime_Semrelease = runtime_Semrelease +var Runtime_procPin = runtime_procPin +var Runtime_procUnpin = runtime_procUnpin + +// poolDequeue testing. +type PoolDequeue interface { + PushHead(val interface{}) bool + PopHead() (interface{}, bool) + PopTail() (interface{}, bool) +} + +func NewPoolDequeue(n int) PoolDequeue { + d := &poolDequeue{ + vals: make([]eface, n), + } + // For testing purposes, set the head and tail indexes close + // to wrapping around. + d.headTail = d.pack(1<<dequeueBits-500, 1<<dequeueBits-500) + return d +} + +func (d *poolDequeue) PushHead(val interface{}) bool { + return d.pushHead(val) +} + +func (d *poolDequeue) PopHead() (interface{}, bool) { + return d.popHead() +} + +func (d *poolDequeue) PopTail() (interface{}, bool) { + return d.popTail() +} + +func NewPoolChain() PoolDequeue { + return new(poolChain) +} + +func (c *poolChain) PushHead(val interface{}) bool { + c.pushHead(val) + return true +} + +func (c *poolChain) PopHead() (interface{}, bool) { + return c.popHead() +} + +func (c *poolChain) PopTail() (interface{}, bool) { + return c.popTail() +} diff --git a/src/sync/map.go b/src/sync/map.go new file mode 100644 index 0000000..9ad2535 --- /dev/null +++ b/src/sync/map.go @@ -0,0 +1,384 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" +) + +// Map is like a Go map[interface{}]interface{} but is safe for concurrent use +// by multiple goroutines without additional locking or coordination. +// Loads, stores, and deletes run in amortized constant time. +// +// The Map type is specialized. Most code should use a plain Go map instead, +// with separate locking or coordination, for better type safety and to make it +// easier to maintain other invariants along with the map content. +// +// The Map type is optimized for two common use cases: (1) when the entry for a given +// key is only ever written once but read many times, as in caches that only grow, +// or (2) when multiple goroutines read, write, and overwrite entries for disjoint +// sets of keys. In these two cases, use of a Map may significantly reduce lock +// contention compared to a Go map paired with a separate Mutex or RWMutex. +// +// The zero Map is empty and ready for use. A Map must not be copied after first use. +type Map struct { + mu Mutex + + // read contains the portion of the map's contents that are safe for + // concurrent access (with or without mu held). + // + // The read field itself is always safe to load, but must only be stored with + // mu held. + // + // Entries stored in read may be updated concurrently without mu, but updating + // a previously-expunged entry requires that the entry be copied to the dirty + // map and unexpunged with mu held. + read atomic.Value // readOnly + + // dirty contains the portion of the map's contents that require mu to be + // held. To ensure that the dirty map can be promoted to the read map quickly, + // it also includes all of the non-expunged entries in the read map. + // + // Expunged entries are not stored in the dirty map. An expunged entry in the + // clean map must be unexpunged and added to the dirty map before a new value + // can be stored to it. + // + // If the dirty map is nil, the next write to the map will initialize it by + // making a shallow copy of the clean map, omitting stale entries. + dirty map[interface{}]*entry + + // misses counts the number of loads since the read map was last updated that + // needed to lock mu to determine whether the key was present. + // + // Once enough misses have occurred to cover the cost of copying the dirty + // map, the dirty map will be promoted to the read map (in the unamended + // state) and the next store to the map will make a new dirty copy. + misses int +} + +// readOnly is an immutable struct stored atomically in the Map.read field. +type readOnly struct { + m map[interface{}]*entry + amended bool // true if the dirty map contains some key not in m. +} + +// expunged is an arbitrary pointer that marks entries which have been deleted +// from the dirty map. +var expunged = unsafe.Pointer(new(interface{})) + +// An entry is a slot in the map corresponding to a particular key. +type entry struct { + // p points to the interface{} value stored for the entry. + // + // If p == nil, the entry has been deleted and m.dirty == nil. + // + // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry + // is missing from m.dirty. + // + // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty + // != nil, in m.dirty[key]. + // + // An entry can be deleted by atomic replacement with nil: when m.dirty is + // next created, it will atomically replace nil with expunged and leave + // m.dirty[key] unset. + // + // An entry's associated value can be updated by atomic replacement, provided + // p != expunged. If p == expunged, an entry's associated value can be updated + // only after first setting m.dirty[key] = e so that lookups using the dirty + // map find the entry. + p unsafe.Pointer // *interface{} +} + +func newEntry(i interface{}) *entry { + return &entry{p: unsafe.Pointer(&i)} +} + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *Map) Load(key interface{}) (value interface{}, ok bool) { + read, _ := m.read.Load().(readOnly) + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + // Avoid reporting a spurious miss if m.dirty got promoted while we were + // blocked on m.mu. (If further loads of the same key will not miss, it's + // not worth copying the dirty map for this key.) + read, _ = m.read.Load().(readOnly) + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + // Regardless of whether the entry was present, record a miss: this key + // will take the slow path until the dirty map is promoted to the read + // map. + m.missLocked() + } + m.mu.Unlock() + } + if !ok { + return nil, false + } + return e.load() +} + +func (e *entry) load() (value interface{}, ok bool) { + p := atomic.LoadPointer(&e.p) + if p == nil || p == expunged { + return nil, false + } + return *(*interface{})(p), true +} + +// Store sets the value for a key. +func (m *Map) Store(key, value interface{}) { + read, _ := m.read.Load().(readOnly) + if e, ok := read.m[key]; ok && e.tryStore(&value) { + return + } + + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + // The entry was previously expunged, which implies that there is a + // non-nil dirty map and this entry is not in it. + m.dirty[key] = e + } + e.storeLocked(&value) + } else if e, ok := m.dirty[key]; ok { + e.storeLocked(&value) + } else { + if !read.amended { + // We're adding the first new key to the dirty map. + // Make sure it is allocated and mark the read-only map as incomplete. + m.dirtyLocked() + m.read.Store(readOnly{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + } + m.mu.Unlock() +} + +// tryStore stores a value if the entry has not been expunged. +// +// If the entry is expunged, tryStore returns false and leaves the entry +// unchanged. +func (e *entry) tryStore(i *interface{}) bool { + for { + p := atomic.LoadPointer(&e.p) + if p == expunged { + return false + } + if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { + return true + } + } +} + +// unexpungeLocked ensures that the entry is not marked as expunged. +// +// If the entry was previously expunged, it must be added to the dirty map +// before m.mu is unlocked. +func (e *entry) unexpungeLocked() (wasExpunged bool) { + return atomic.CompareAndSwapPointer(&e.p, expunged, nil) +} + +// storeLocked unconditionally stores a value to the entry. +// +// The entry must be known not to be expunged. +func (e *entry) storeLocked(i *interface{}) { + atomic.StorePointer(&e.p, unsafe.Pointer(i)) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + // Avoid locking if it's a clean hit. + read, _ := m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + actual, loaded, ok := e.tryLoadOrStore(value) + if ok { + return actual, loaded + } + } + + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + m.dirty[key] = e + } + actual, loaded, _ = e.tryLoadOrStore(value) + } else if e, ok := m.dirty[key]; ok { + actual, loaded, _ = e.tryLoadOrStore(value) + m.missLocked() + } else { + if !read.amended { + // We're adding the first new key to the dirty map. + // Make sure it is allocated and mark the read-only map as incomplete. + m.dirtyLocked() + m.read.Store(readOnly{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + actual, loaded = value, false + } + m.mu.Unlock() + + return actual, loaded +} + +// tryLoadOrStore atomically loads or stores a value if the entry is not +// expunged. +// +// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and +// returns with ok==false. +func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) { + p := atomic.LoadPointer(&e.p) + if p == expunged { + return nil, false, false + } + if p != nil { + return *(*interface{})(p), true, true + } + + // Copy the interface after the first load to make this method more amenable + // to escape analysis: if we hit the "load" path or the entry is expunged, we + // shouldn't bother heap-allocating. + ic := i + for { + if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { + return i, false, true + } + p = atomic.LoadPointer(&e.p) + if p == expunged { + return nil, false, false + } + if p != nil { + return *(*interface{})(p), true, true + } + } +} + +// LoadAndDelete deletes the value for a key, returning the previous value if any. +// The loaded result reports whether the key was present. +func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) { + read, _ := m.read.Load().(readOnly) + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + delete(m.dirty, key) + // Regardless of whether the entry was present, record a miss: this key + // will take the slow path until the dirty map is promoted to the read + // map. + m.missLocked() + } + m.mu.Unlock() + } + if ok { + return e.delete() + } + return nil, false +} + +// Delete deletes the value for a key. +func (m *Map) Delete(key interface{}) { + m.LoadAndDelete(key) +} + +func (e *entry) delete() (value interface{}, ok bool) { + for { + p := atomic.LoadPointer(&e.p) + if p == nil || p == expunged { + return nil, false + } + if atomic.CompareAndSwapPointer(&e.p, p, nil) { + return *(*interface{})(p), true + } + } +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently, Range may reflect any mapping for that key +// from any point during the Range call. +// +// Range may be O(N) with the number of elements in the map even if f returns +// false after a constant number of calls. +func (m *Map) Range(f func(key, value interface{}) bool) { + // We need to be able to iterate over all of the keys that were already + // present at the start of the call to Range. + // If read.amended is false, then read.m satisfies that property without + // requiring us to hold m.mu for a long time. + read, _ := m.read.Load().(readOnly) + if read.amended { + // m.dirty contains keys not in read.m. Fortunately, Range is already O(N) + // (assuming the caller does not break out early), so a call to Range + // amortizes an entire copy of the map: we can promote the dirty copy + // immediately! + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if read.amended { + read = readOnly{m: m.dirty} + m.read.Store(read) + m.dirty = nil + m.misses = 0 + } + m.mu.Unlock() + } + + for k, e := range read.m { + v, ok := e.load() + if !ok { + continue + } + if !f(k, v) { + break + } + } +} + +func (m *Map) missLocked() { + m.misses++ + if m.misses < len(m.dirty) { + return + } + m.read.Store(readOnly{m: m.dirty}) + m.dirty = nil + m.misses = 0 +} + +func (m *Map) dirtyLocked() { + if m.dirty != nil { + return + } + + read, _ := m.read.Load().(readOnly) + m.dirty = make(map[interface{}]*entry, len(read.m)) + for k, e := range read.m { + if !e.tryExpungeLocked() { + m.dirty[k] = e + } + } +} + +func (e *entry) tryExpungeLocked() (isExpunged bool) { + p := atomic.LoadPointer(&e.p) + for p == nil { + if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { + return true + } + p = atomic.LoadPointer(&e.p) + } + return p == expunged +} diff --git a/src/sync/map_bench_test.go b/src/sync/map_bench_test.go new file mode 100644 index 0000000..cf0a3d7 --- /dev/null +++ b/src/sync/map_bench_test.go @@ -0,0 +1,289 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + "fmt" + "reflect" + "sync" + "sync/atomic" + "testing" +) + +type bench struct { + setup func(*testing.B, mapInterface) + perG func(b *testing.B, pb *testing.PB, i int, m mapInterface) +} + +func benchMap(b *testing.B, bench bench) { + for _, m := range [...]mapInterface{&DeepCopyMap{}, &RWMutexMap{}, &sync.Map{}} { + b.Run(fmt.Sprintf("%T", m), func(b *testing.B) { + m = reflect.New(reflect.TypeOf(m).Elem()).Interface().(mapInterface) + if bench.setup != nil { + bench.setup(b, m) + } + + b.ResetTimer() + + var i int64 + b.RunParallel(func(pb *testing.PB) { + id := int(atomic.AddInt64(&i, 1) - 1) + bench.perG(b, pb, id*b.N, m) + }) + }) + } +} + +func BenchmarkLoadMostlyHits(b *testing.B) { + const hits, misses = 1023, 1 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i % (hits + misses)) + } + }, + }) +} + +func BenchmarkLoadMostlyMisses(b *testing.B) { + const hits, misses = 1, 1023 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i % (hits + misses)) + } + }, + }) +} + +func BenchmarkLoadOrStoreBalanced(b *testing.B) { + const hits, misses = 128, 128 + + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + j := i % (hits + misses) + if j < hits { + if _, ok := m.LoadOrStore(j, i); !ok { + b.Fatalf("unexpected miss for %v", j) + } + } else { + if v, loaded := m.LoadOrStore(i, i); loaded { + b.Fatalf("failed to store %v: existing value %v", i, v) + } + } + } + }, + }) +} + +func BenchmarkLoadOrStoreUnique(b *testing.B) { + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadOrStore(i, i) + } + }, + }) +} + +func BenchmarkLoadOrStoreCollision(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.LoadOrStore(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadOrStore(0, 0) + } + }, + }) +} + +func BenchmarkLoadAndDeleteBalanced(b *testing.B) { + const hits, misses = 128, 128 + + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + j := i % (hits + misses) + if j < hits { + m.LoadAndDelete(j) + } else { + m.LoadAndDelete(i) + } + } + }, + }) +} + +func BenchmarkLoadAndDeleteUnique(b *testing.B) { + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadAndDelete(i) + } + }, + }) +} + +func BenchmarkLoadAndDeleteCollision(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.LoadOrStore(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadAndDelete(0) + } + }, + }) +} + +func BenchmarkRange(b *testing.B) { + const mapSize = 1 << 10 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < mapSize; i++ { + m.Store(i, i) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Range(func(_, _ interface{}) bool { return true }) + } + }, + }) +} + +// BenchmarkAdversarialAlloc tests performance when we store a new value +// immediately whenever the map is promoted to clean and otherwise load a +// unique, missing key. +// +// This forces the Load calls to always acquire the map's mutex. +func BenchmarkAdversarialAlloc(b *testing.B) { + benchMap(b, bench{ + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + var stores, loadsSinceStore int64 + for ; pb.Next(); i++ { + m.Load(i) + if loadsSinceStore++; loadsSinceStore > stores { + m.LoadOrStore(i, stores) + loadsSinceStore = 0 + stores++ + } + } + }, + }) +} + +// BenchmarkAdversarialDelete tests performance when we periodically delete +// one key and add a different one in a large map. +// +// This forces the Load calls to always acquire the map's mutex and periodically +// makes a full copy of the map despite changing only one entry. +func BenchmarkAdversarialDelete(b *testing.B) { + const mapSize = 1 << 10 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < mapSize; i++ { + m.Store(i, i) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i) + + if i%mapSize == 0 { + m.Range(func(k, _ interface{}) bool { + m.Delete(k) + return false + }) + m.Store(i, i) + } + } + }, + }) +} + +func BenchmarkDeleteCollision(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.LoadOrStore(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Delete(0) + } + }, + }) +} diff --git a/src/sync/map_reference_test.go b/src/sync/map_reference_test.go new file mode 100644 index 0000000..d105a24 --- /dev/null +++ b/src/sync/map_reference_test.go @@ -0,0 +1,174 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + "sync" + "sync/atomic" +) + +// This file contains reference map implementations for unit-tests. + +// mapInterface is the interface Map implements. +type mapInterface interface { + Load(interface{}) (interface{}, bool) + Store(key, value interface{}) + LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) + LoadAndDelete(key interface{}) (value interface{}, loaded bool) + Delete(interface{}) + Range(func(key, value interface{}) (shouldContinue bool)) +} + +// RWMutexMap is an implementation of mapInterface using a sync.RWMutex. +type RWMutexMap struct { + mu sync.RWMutex + dirty map[interface{}]interface{} +} + +func (m *RWMutexMap) Load(key interface{}) (value interface{}, ok bool) { + m.mu.RLock() + value, ok = m.dirty[key] + m.mu.RUnlock() + return +} + +func (m *RWMutexMap) Store(key, value interface{}) { + m.mu.Lock() + if m.dirty == nil { + m.dirty = make(map[interface{}]interface{}) + } + m.dirty[key] = value + m.mu.Unlock() +} + +func (m *RWMutexMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + m.mu.Lock() + actual, loaded = m.dirty[key] + if !loaded { + actual = value + if m.dirty == nil { + m.dirty = make(map[interface{}]interface{}) + } + m.dirty[key] = value + } + m.mu.Unlock() + return actual, loaded +} + +func (m *RWMutexMap) LoadAndDelete(key interface{}) (value interface{}, loaded bool) { + m.mu.Lock() + value, loaded = m.dirty[key] + if !loaded { + m.mu.Unlock() + return nil, false + } + delete(m.dirty, key) + m.mu.Unlock() + return value, loaded +} + +func (m *RWMutexMap) Delete(key interface{}) { + m.mu.Lock() + delete(m.dirty, key) + m.mu.Unlock() +} + +func (m *RWMutexMap) Range(f func(key, value interface{}) (shouldContinue bool)) { + m.mu.RLock() + keys := make([]interface{}, 0, len(m.dirty)) + for k := range m.dirty { + keys = append(keys, k) + } + m.mu.RUnlock() + + for _, k := range keys { + v, ok := m.Load(k) + if !ok { + continue + } + if !f(k, v) { + break + } + } +} + +// DeepCopyMap is an implementation of mapInterface using a Mutex and +// atomic.Value. It makes deep copies of the map on every write to avoid +// acquiring the Mutex in Load. +type DeepCopyMap struct { + mu sync.Mutex + clean atomic.Value +} + +func (m *DeepCopyMap) Load(key interface{}) (value interface{}, ok bool) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + value, ok = clean[key] + return value, ok +} + +func (m *DeepCopyMap) Store(key, value interface{}) { + m.mu.Lock() + dirty := m.dirty() + dirty[key] = value + m.clean.Store(dirty) + m.mu.Unlock() +} + +func (m *DeepCopyMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + actual, loaded = clean[key] + if loaded { + return actual, loaded + } + + m.mu.Lock() + // Reload clean in case it changed while we were waiting on m.mu. + clean, _ = m.clean.Load().(map[interface{}]interface{}) + actual, loaded = clean[key] + if !loaded { + dirty := m.dirty() + dirty[key] = value + actual = value + m.clean.Store(dirty) + } + m.mu.Unlock() + return actual, loaded +} + +func (m *DeepCopyMap) LoadAndDelete(key interface{}) (value interface{}, loaded bool) { + m.mu.Lock() + dirty := m.dirty() + value, loaded = dirty[key] + delete(dirty, key) + m.clean.Store(dirty) + m.mu.Unlock() + return +} + +func (m *DeepCopyMap) Delete(key interface{}) { + m.mu.Lock() + dirty := m.dirty() + delete(dirty, key) + m.clean.Store(dirty) + m.mu.Unlock() +} + +func (m *DeepCopyMap) Range(f func(key, value interface{}) (shouldContinue bool)) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + for k, v := range clean { + if !f(k, v) { + break + } + } +} + +func (m *DeepCopyMap) dirty() map[interface{}]interface{} { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + dirty := make(map[interface{}]interface{}, len(clean)+1) + for k, v := range clean { + dirty[k] = v + } + return dirty +} diff --git a/src/sync/map_test.go b/src/sync/map_test.go new file mode 100644 index 0000000..7f163ca --- /dev/null +++ b/src/sync/map_test.go @@ -0,0 +1,197 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + "math/rand" + "reflect" + "runtime" + "sync" + "sync/atomic" + "testing" + "testing/quick" +) + +type mapOp string + +const ( + opLoad = mapOp("Load") + opStore = mapOp("Store") + opLoadOrStore = mapOp("LoadOrStore") + opLoadAndDelete = mapOp("LoadAndDelete") + opDelete = mapOp("Delete") +) + +var mapOps = [...]mapOp{opLoad, opStore, opLoadOrStore, opLoadAndDelete, opDelete} + +// mapCall is a quick.Generator for calls on mapInterface. +type mapCall struct { + op mapOp + k, v interface{} +} + +func (c mapCall) apply(m mapInterface) (interface{}, bool) { + switch c.op { + case opLoad: + return m.Load(c.k) + case opStore: + m.Store(c.k, c.v) + return nil, false + case opLoadOrStore: + return m.LoadOrStore(c.k, c.v) + case opLoadAndDelete: + return m.LoadAndDelete(c.k) + case opDelete: + m.Delete(c.k) + return nil, false + default: + panic("invalid mapOp") + } +} + +type mapResult struct { + value interface{} + ok bool +} + +func randValue(r *rand.Rand) interface{} { + b := make([]byte, r.Intn(4)) + for i := range b { + b[i] = 'a' + byte(rand.Intn(26)) + } + return string(b) +} + +func (mapCall) Generate(r *rand.Rand, size int) reflect.Value { + c := mapCall{op: mapOps[rand.Intn(len(mapOps))], k: randValue(r)} + switch c.op { + case opStore, opLoadOrStore: + c.v = randValue(r) + } + return reflect.ValueOf(c) +} + +func applyCalls(m mapInterface, calls []mapCall) (results []mapResult, final map[interface{}]interface{}) { + for _, c := range calls { + v, ok := c.apply(m) + results = append(results, mapResult{v, ok}) + } + + final = make(map[interface{}]interface{}) + m.Range(func(k, v interface{}) bool { + final[k] = v + return true + }) + + return results, final +} + +func applyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { + return applyCalls(new(sync.Map), calls) +} + +func applyRWMutexMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { + return applyCalls(new(RWMutexMap), calls) +} + +func applyDeepCopyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { + return applyCalls(new(DeepCopyMap), calls) +} + +func TestMapMatchesRWMutex(t *testing.T) { + if err := quick.CheckEqual(applyMap, applyRWMutexMap, nil); err != nil { + t.Error(err) + } +} + +func TestMapMatchesDeepCopy(t *testing.T) { + if err := quick.CheckEqual(applyMap, applyDeepCopyMap, nil); err != nil { + t.Error(err) + } +} + +func TestConcurrentRange(t *testing.T) { + const mapSize = 1 << 10 + + m := new(sync.Map) + for n := int64(1); n <= mapSize; n++ { + m.Store(n, int64(n)) + } + + done := make(chan struct{}) + var wg sync.WaitGroup + defer func() { + close(done) + wg.Wait() + }() + for g := int64(runtime.GOMAXPROCS(0)); g > 0; g-- { + r := rand.New(rand.NewSource(g)) + wg.Add(1) + go func(g int64) { + defer wg.Done() + for i := int64(0); ; i++ { + select { + case <-done: + return + default: + } + for n := int64(1); n < mapSize; n++ { + if r.Int63n(mapSize) == 0 { + m.Store(n, n*i*g) + } else { + m.Load(n) + } + } + } + }(g) + } + + iters := 1 << 10 + if testing.Short() { + iters = 16 + } + for n := iters; n > 0; n-- { + seen := make(map[int64]bool, mapSize) + + m.Range(func(ki, vi interface{}) bool { + k, v := ki.(int64), vi.(int64) + if v%k != 0 { + t.Fatalf("while Storing multiples of %v, Range saw value %v", k, v) + } + if seen[k] { + t.Fatalf("Range visited key %v twice", k) + } + seen[k] = true + return true + }) + + if len(seen) != mapSize { + t.Fatalf("Range visited %v elements of %v-element Map", len(seen), mapSize) + } + } +} + +func TestIssue40999(t *testing.T) { + var m sync.Map + + // Since the miss-counting in missLocked (via Delete) + // compares the miss count with len(m.dirty), + // add an initial entry to bias len(m.dirty) above the miss count. + m.Store(nil, struct{}{}) + + var finalized uint32 + + // Set finalizers that count for collected keys. A non-zero count + // indicates that keys have not been leaked. + for atomic.LoadUint32(&finalized) == 0 { + p := new(int) + runtime.SetFinalizer(p, func(*int) { + atomic.AddUint32(&finalized, 1) + }) + m.Store(p, struct{}{}) + m.Delete(p) + runtime.GC() + } +} diff --git a/src/sync/mutex.go b/src/sync/mutex.go new file mode 100644 index 0000000..3028552 --- /dev/null +++ b/src/sync/mutex.go @@ -0,0 +1,226 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package sync provides basic synchronization primitives such as mutual +// exclusion locks. Other than the Once and WaitGroup types, most are intended +// for use by low-level library routines. Higher-level synchronization is +// better done via channels and communication. +// +// Values containing the types defined in this package should not be copied. +package sync + +import ( + "internal/race" + "sync/atomic" + "unsafe" +) + +func throw(string) // provided by runtime + +// A Mutex is a mutual exclusion lock. +// The zero value for a Mutex is an unlocked mutex. +// +// A Mutex must not be copied after first use. +type Mutex struct { + state int32 + sema uint32 +} + +// A Locker represents an object that can be locked and unlocked. +type Locker interface { + Lock() + Unlock() +} + +const ( + mutexLocked = 1 << iota // mutex is locked + mutexWoken + mutexStarving + mutexWaiterShift = iota + + // Mutex fairness. + // + // Mutex can be in 2 modes of operations: normal and starvation. + // In normal mode waiters are queued in FIFO order, but a woken up waiter + // does not own the mutex and competes with new arriving goroutines over + // the ownership. New arriving goroutines have an advantage -- they are + // already running on CPU and there can be lots of them, so a woken up + // waiter has good chances of losing. In such case it is queued at front + // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, + // it switches mutex to the starvation mode. + // + // In starvation mode ownership of the mutex is directly handed off from + // the unlocking goroutine to the waiter at the front of the queue. + // New arriving goroutines don't try to acquire the mutex even if it appears + // to be unlocked, and don't try to spin. Instead they queue themselves at + // the tail of the wait queue. + // + // If a waiter receives ownership of the mutex and sees that either + // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, + // it switches mutex back to normal operation mode. + // + // Normal mode has considerably better performance as a goroutine can acquire + // a mutex several times in a row even if there are blocked waiters. + // Starvation mode is important to prevent pathological cases of tail latency. + starvationThresholdNs = 1e6 +) + +// Lock locks m. +// If the lock is already in use, the calling goroutine +// blocks until the mutex is available. +func (m *Mutex) Lock() { + // Fast path: grab unlocked mutex. + if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { + if race.Enabled { + race.Acquire(unsafe.Pointer(m)) + } + return + } + // Slow path (outlined so that the fast path can be inlined) + m.lockSlow() +} + +func (m *Mutex) lockSlow() { + var waitStartTime int64 + starving := false + awoke := false + iter := 0 + old := m.state + for { + // Don't spin in starvation mode, ownership is handed off to waiters + // so we won't be able to acquire the mutex anyway. + if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { + // Active spinning makes sense. + // Try to set mutexWoken flag to inform Unlock + // to not wake other blocked goroutines. + if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && + atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { + awoke = true + } + runtime_doSpin() + iter++ + old = m.state + continue + } + new := old + // Don't try to acquire starving mutex, new arriving goroutines must queue. + if old&mutexStarving == 0 { + new |= mutexLocked + } + if old&(mutexLocked|mutexStarving) != 0 { + new += 1 << mutexWaiterShift + } + // The current goroutine switches mutex to starvation mode. + // But if the mutex is currently unlocked, don't do the switch. + // Unlock expects that starving mutex has waiters, which will not + // be true in this case. + if starving && old&mutexLocked != 0 { + new |= mutexStarving + } + if awoke { + // The goroutine has been woken from sleep, + // so we need to reset the flag in either case. + if new&mutexWoken == 0 { + throw("sync: inconsistent mutex state") + } + new &^= mutexWoken + } + if atomic.CompareAndSwapInt32(&m.state, old, new) { + if old&(mutexLocked|mutexStarving) == 0 { + break // locked the mutex with CAS + } + // If we were already waiting before, queue at the front of the queue. + queueLifo := waitStartTime != 0 + if waitStartTime == 0 { + waitStartTime = runtime_nanotime() + } + runtime_SemacquireMutex(&m.sema, queueLifo, 1) + starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs + old = m.state + if old&mutexStarving != 0 { + // If this goroutine was woken and mutex is in starvation mode, + // ownership was handed off to us but mutex is in somewhat + // inconsistent state: mutexLocked is not set and we are still + // accounted as waiter. Fix that. + if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { + throw("sync: inconsistent mutex state") + } + delta := int32(mutexLocked - 1<<mutexWaiterShift) + if !starving || old>>mutexWaiterShift == 1 { + // Exit starvation mode. + // Critical to do it here and consider wait time. + // Starvation mode is so inefficient, that two goroutines + // can go lock-step infinitely once they switch mutex + // to starvation mode. + delta -= mutexStarving + } + atomic.AddInt32(&m.state, delta) + break + } + awoke = true + iter = 0 + } else { + old = m.state + } + } + + if race.Enabled { + race.Acquire(unsafe.Pointer(m)) + } +} + +// Unlock unlocks m. +// It is a run-time error if m is not locked on entry to Unlock. +// +// A locked Mutex is not associated with a particular goroutine. +// It is allowed for one goroutine to lock a Mutex and then +// arrange for another goroutine to unlock it. +func (m *Mutex) Unlock() { + if race.Enabled { + _ = m.state + race.Release(unsafe.Pointer(m)) + } + + // Fast path: drop lock bit. + new := atomic.AddInt32(&m.state, -mutexLocked) + if new != 0 { + // Outlined slow path to allow inlining the fast path. + // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. + m.unlockSlow(new) + } +} + +func (m *Mutex) unlockSlow(new int32) { + if (new+mutexLocked)&mutexLocked == 0 { + throw("sync: unlock of unlocked mutex") + } + if new&mutexStarving == 0 { + old := new + for { + // If there are no waiters or a goroutine has already + // been woken or grabbed the lock, no need to wake anyone. + // In starvation mode ownership is directly handed off from unlocking + // goroutine to the next waiter. We are not part of this chain, + // since we did not observe mutexStarving when we unlocked the mutex above. + // So get off the way. + if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { + return + } + // Grab the right to wake someone. + new = (old - 1<<mutexWaiterShift) | mutexWoken + if atomic.CompareAndSwapInt32(&m.state, old, new) { + runtime_Semrelease(&m.sema, false, 1) + return + } + old = m.state + } + } else { + // Starving mode: handoff mutex ownership to the next waiter, and yield + // our time slice so that the next waiter can start to run immediately. + // Note: mutexLocked is not set, the waiter will set it after wakeup. + // But mutex is still considered locked if mutexStarving is set, + // so new coming goroutines won't acquire it. + runtime_Semrelease(&m.sema, true, 1) + } +} diff --git a/src/sync/mutex_test.go b/src/sync/mutex_test.go new file mode 100644 index 0000000..98c1bf2 --- /dev/null +++ b/src/sync/mutex_test.go @@ -0,0 +1,317 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// GOMAXPROCS=10 go test + +package sync_test + +import ( + "fmt" + "internal/testenv" + "os" + "os/exec" + "runtime" + "strings" + . "sync" + "testing" + "time" +) + +func HammerSemaphore(s *uint32, loops int, cdone chan bool) { + for i := 0; i < loops; i++ { + Runtime_Semacquire(s) + Runtime_Semrelease(s, false, 0) + } + cdone <- true +} + +func TestSemaphore(t *testing.T) { + s := new(uint32) + *s = 1 + c := make(chan bool) + for i := 0; i < 10; i++ { + go HammerSemaphore(s, 1000, c) + } + for i := 0; i < 10; i++ { + <-c + } +} + +func BenchmarkUncontendedSemaphore(b *testing.B) { + s := new(uint32) + *s = 1 + HammerSemaphore(s, b.N, make(chan bool, 2)) +} + +func BenchmarkContendedSemaphore(b *testing.B) { + b.StopTimer() + s := new(uint32) + *s = 1 + c := make(chan bool) + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(2)) + b.StartTimer() + + go HammerSemaphore(s, b.N/2, c) + go HammerSemaphore(s, b.N/2, c) + <-c + <-c +} + +func HammerMutex(m *Mutex, loops int, cdone chan bool) { + for i := 0; i < loops; i++ { + m.Lock() + m.Unlock() + } + cdone <- true +} + +func TestMutex(t *testing.T) { + if n := runtime.SetMutexProfileFraction(1); n != 0 { + t.Logf("got mutexrate %d expected 0", n) + } + defer runtime.SetMutexProfileFraction(0) + m := new(Mutex) + c := make(chan bool) + for i := 0; i < 10; i++ { + go HammerMutex(m, 1000, c) + } + for i := 0; i < 10; i++ { + <-c + } +} + +var misuseTests = []struct { + name string + f func() +}{ + { + "Mutex.Unlock", + func() { + var mu Mutex + mu.Unlock() + }, + }, + { + "Mutex.Unlock2", + func() { + var mu Mutex + mu.Lock() + mu.Unlock() + mu.Unlock() + }, + }, + { + "RWMutex.Unlock", + func() { + var mu RWMutex + mu.Unlock() + }, + }, + { + "RWMutex.Unlock2", + func() { + var mu RWMutex + mu.RLock() + mu.Unlock() + }, + }, + { + "RWMutex.Unlock3", + func() { + var mu RWMutex + mu.Lock() + mu.Unlock() + mu.Unlock() + }, + }, + { + "RWMutex.RUnlock", + func() { + var mu RWMutex + mu.RUnlock() + }, + }, + { + "RWMutex.RUnlock2", + func() { + var mu RWMutex + mu.Lock() + mu.RUnlock() + }, + }, + { + "RWMutex.RUnlock3", + func() { + var mu RWMutex + mu.RLock() + mu.RUnlock() + mu.RUnlock() + }, + }, +} + +func init() { + if len(os.Args) == 3 && os.Args[1] == "TESTMISUSE" { + for _, test := range misuseTests { + if test.name == os.Args[2] { + func() { + defer func() { recover() }() + test.f() + }() + fmt.Printf("test completed\n") + os.Exit(0) + } + } + fmt.Printf("unknown test\n") + os.Exit(0) + } +} + +func TestMutexMisuse(t *testing.T) { + testenv.MustHaveExec(t) + for _, test := range misuseTests { + out, err := exec.Command(os.Args[0], "TESTMISUSE", test.name).CombinedOutput() + if err == nil || !strings.Contains(string(out), "unlocked") { + t.Errorf("%s: did not find failure with message about unlocked lock: %s\n%s\n", test.name, err, out) + } + } +} + +func TestMutexFairness(t *testing.T) { + var mu Mutex + stop := make(chan bool) + defer close(stop) + go func() { + for { + mu.Lock() + time.Sleep(100 * time.Microsecond) + mu.Unlock() + select { + case <-stop: + return + default: + } + } + }() + done := make(chan bool, 1) + go func() { + for i := 0; i < 10; i++ { + time.Sleep(100 * time.Microsecond) + mu.Lock() + mu.Unlock() + } + done <- true + }() + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatalf("can't acquire Mutex in 10 seconds") + } +} + +func BenchmarkMutexUncontended(b *testing.B) { + type PaddedMutex struct { + Mutex + pad [128]uint8 + } + b.RunParallel(func(pb *testing.PB) { + var mu PaddedMutex + for pb.Next() { + mu.Lock() + mu.Unlock() + } + }) +} + +func benchmarkMutex(b *testing.B, slack, work bool) { + var mu Mutex + if slack { + b.SetParallelism(10) + } + b.RunParallel(func(pb *testing.PB) { + foo := 0 + for pb.Next() { + mu.Lock() + mu.Unlock() + if work { + for i := 0; i < 100; i++ { + foo *= 2 + foo /= 2 + } + } + } + _ = foo + }) +} + +func BenchmarkMutex(b *testing.B) { + benchmarkMutex(b, false, false) +} + +func BenchmarkMutexSlack(b *testing.B) { + benchmarkMutex(b, true, false) +} + +func BenchmarkMutexWork(b *testing.B) { + benchmarkMutex(b, false, true) +} + +func BenchmarkMutexWorkSlack(b *testing.B) { + benchmarkMutex(b, true, true) +} + +func BenchmarkMutexNoSpin(b *testing.B) { + // This benchmark models a situation where spinning in the mutex should be + // non-profitable and allows to confirm that spinning does not do harm. + // To achieve this we create excess of goroutines most of which do local work. + // These goroutines yield during local work, so that switching from + // a blocked goroutine to other goroutines is profitable. + // As a matter of fact, this benchmark still triggers some spinning in the mutex. + var m Mutex + var acc0, acc1 uint64 + b.SetParallelism(4) + b.RunParallel(func(pb *testing.PB) { + c := make(chan bool) + var data [4 << 10]uint64 + for i := 0; pb.Next(); i++ { + if i%4 == 0 { + m.Lock() + acc0 -= 100 + acc1 += 100 + m.Unlock() + } else { + for i := 0; i < len(data); i += 4 { + data[i]++ + } + // Elaborate way to say runtime.Gosched + // that does not put the goroutine onto global runq. + go func() { + c <- true + }() + <-c + } + } + }) +} + +func BenchmarkMutexSpin(b *testing.B) { + // This benchmark models a situation where spinning in the mutex should be + // profitable. To achieve this we create a goroutine per-proc. + // These goroutines access considerable amount of local data so that + // unnecessary rescheduling is penalized by cache misses. + var m Mutex + var acc0, acc1 uint64 + b.RunParallel(func(pb *testing.PB) { + var data [16 << 10]uint64 + for i := 0; pb.Next(); i++ { + m.Lock() + acc0 -= 100 + acc1 += 100 + m.Unlock() + for i := 0; i < len(data); i += 4 { + data[i]++ + } + } + }) +} diff --git a/src/sync/once.go b/src/sync/once.go new file mode 100644 index 0000000..8844314 --- /dev/null +++ b/src/sync/once.go @@ -0,0 +1,70 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" +) + +// Once is an object that will perform exactly one action. +// +// A Once must not be copied after first use. +type Once struct { + // done indicates whether the action has been performed. + // It is first in the struct because it is used in the hot path. + // The hot path is inlined at every call site. + // Placing done first allows more compact instructions on some architectures (amd64/386), + // and fewer instructions (to calculate offset) on other architectures. + done uint32 + m Mutex +} + +// Do calls the function f if and only if Do is being called for the +// first time for this instance of Once. In other words, given +// var once Once +// if once.Do(f) is called multiple times, only the first call will invoke f, +// even if f has a different value in each invocation. A new instance of +// Once is required for each function to execute. +// +// Do is intended for initialization that must be run exactly once. Since f +// is niladic, it may be necessary to use a function literal to capture the +// arguments to a function to be invoked by Do: +// config.once.Do(func() { config.init(filename) }) +// +// Because no call to Do returns until the one call to f returns, if f causes +// Do to be called, it will deadlock. +// +// If f panics, Do considers it to have returned; future calls of Do return +// without calling f. +// +func (o *Once) Do(f func()) { + // Note: Here is an incorrect implementation of Do: + // + // if atomic.CompareAndSwapUint32(&o.done, 0, 1) { + // f() + // } + // + // Do guarantees that when it returns, f has finished. + // This implementation would not implement that guarantee: + // given two simultaneous calls, the winner of the cas would + // call f, and the second would return immediately, without + // waiting for the first's call to f to complete. + // This is why the slow path falls back to a mutex, and why + // the atomic.StoreUint32 must be delayed until after f returns. + + if atomic.LoadUint32(&o.done) == 0 { + // Outlined slow-path to allow inlining of the fast-path. + o.doSlow(f) + } +} + +func (o *Once) doSlow(f func()) { + o.m.Lock() + defer o.m.Unlock() + if o.done == 0 { + defer atomic.StoreUint32(&o.done, 1) + f() + } +} diff --git a/src/sync/once_test.go b/src/sync/once_test.go new file mode 100644 index 0000000..1eec8d1 --- /dev/null +++ b/src/sync/once_test.go @@ -0,0 +1,68 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + . "sync" + "testing" +) + +type one int + +func (o *one) Increment() { + *o++ +} + +func run(t *testing.T, once *Once, o *one, c chan bool) { + once.Do(func() { o.Increment() }) + if v := *o; v != 1 { + t.Errorf("once failed inside run: %d is not 1", v) + } + c <- true +} + +func TestOnce(t *testing.T) { + o := new(one) + once := new(Once) + c := make(chan bool) + const N = 10 + for i := 0; i < N; i++ { + go run(t, once, o, c) + } + for i := 0; i < N; i++ { + <-c + } + if *o != 1 { + t.Errorf("once failed outside run: %d is not 1", *o) + } +} + +func TestOncePanic(t *testing.T) { + var once Once + func() { + defer func() { + if r := recover(); r == nil { + t.Fatalf("Once.Do did not panic") + } + }() + once.Do(func() { + panic("failed") + }) + }() + + once.Do(func() { + t.Fatalf("Once.Do called twice") + }) +} + +func BenchmarkOnce(b *testing.B) { + var once Once + f := func() {} + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + once.Do(f) + } + }) +} diff --git a/src/sync/pool.go b/src/sync/pool.go new file mode 100644 index 0000000..1ae7012 --- /dev/null +++ b/src/sync/pool.go @@ -0,0 +1,294 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "internal/race" + "runtime" + "sync/atomic" + "unsafe" +) + +// A Pool is a set of temporary objects that may be individually saved and +// retrieved. +// +// Any item stored in the Pool may be removed automatically at any time without +// notification. If the Pool holds the only reference when this happens, the +// item might be deallocated. +// +// A Pool is safe for use by multiple goroutines simultaneously. +// +// Pool's purpose is to cache allocated but unused items for later reuse, +// relieving pressure on the garbage collector. That is, it makes it easy to +// build efficient, thread-safe free lists. However, it is not suitable for all +// free lists. +// +// An appropriate use of a Pool is to manage a group of temporary items +// silently shared among and potentially reused by concurrent independent +// clients of a package. Pool provides a way to amortize allocation overhead +// across many clients. +// +// An example of good use of a Pool is in the fmt package, which maintains a +// dynamically-sized store of temporary output buffers. The store scales under +// load (when many goroutines are actively printing) and shrinks when +// quiescent. +// +// On the other hand, a free list maintained as part of a short-lived object is +// not a suitable use for a Pool, since the overhead does not amortize well in +// that scenario. It is more efficient to have such objects implement their own +// free list. +// +// A Pool must not be copied after first use. +type Pool struct { + noCopy noCopy + + local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal + localSize uintptr // size of the local array + + victim unsafe.Pointer // local from previous cycle + victimSize uintptr // size of victims array + + // New optionally specifies a function to generate + // a value when Get would otherwise return nil. + // It may not be changed concurrently with calls to Get. + New func() interface{} +} + +// Local per-P Pool appendix. +type poolLocalInternal struct { + private interface{} // Can be used only by the respective P. + shared poolChain // Local P can pushHead/popHead; any P can popTail. +} + +type poolLocal struct { + poolLocalInternal + + // Prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte +} + +// from runtime +func fastrand() uint32 + +var poolRaceHash [128]uint64 + +// poolRaceAddr returns an address to use as the synchronization point +// for race detector logic. We don't use the actual pointer stored in x +// directly, for fear of conflicting with other synchronization on that address. +// Instead, we hash the pointer to get an index into poolRaceHash. +// See discussion on golang.org/cl/31589. +func poolRaceAddr(x interface{}) unsafe.Pointer { + ptr := uintptr((*[2]unsafe.Pointer)(unsafe.Pointer(&x))[1]) + h := uint32((uint64(uint32(ptr)) * 0x85ebca6b) >> 16) + return unsafe.Pointer(&poolRaceHash[h%uint32(len(poolRaceHash))]) +} + +// Put adds x to the pool. +func (p *Pool) Put(x interface{}) { + if x == nil { + return + } + if race.Enabled { + if fastrand()%4 == 0 { + // Randomly drop x on floor. + return + } + race.ReleaseMerge(poolRaceAddr(x)) + race.Disable() + } + l, _ := p.pin() + if l.private == nil { + l.private = x + x = nil + } + if x != nil { + l.shared.pushHead(x) + } + runtime_procUnpin() + if race.Enabled { + race.Enable() + } +} + +// Get selects an arbitrary item from the Pool, removes it from the +// Pool, and returns it to the caller. +// Get may choose to ignore the pool and treat it as empty. +// Callers should not assume any relation between values passed to Put and +// the values returned by Get. +// +// If Get would otherwise return nil and p.New is non-nil, Get returns +// the result of calling p.New. +func (p *Pool) Get() interface{} { + if race.Enabled { + race.Disable() + } + l, pid := p.pin() + x := l.private + l.private = nil + if x == nil { + // Try to pop the head of the local shard. We prefer + // the head over the tail for temporal locality of + // reuse. + x, _ = l.shared.popHead() + if x == nil { + x = p.getSlow(pid) + } + } + runtime_procUnpin() + if race.Enabled { + race.Enable() + if x != nil { + race.Acquire(poolRaceAddr(x)) + } + } + if x == nil && p.New != nil { + x = p.New() + } + return x +} + +func (p *Pool) getSlow(pid int) interface{} { + // See the comment in pin regarding ordering of the loads. + size := runtime_LoadAcquintptr(&p.localSize) // load-acquire + locals := p.local // load-consume + // Try to steal one element from other procs. + for i := 0; i < int(size); i++ { + l := indexLocal(locals, (pid+i+1)%int(size)) + if x, _ := l.shared.popTail(); x != nil { + return x + } + } + + // Try the victim cache. We do this after attempting to steal + // from all primary caches because we want objects in the + // victim cache to age out if at all possible. + size = atomic.LoadUintptr(&p.victimSize) + if uintptr(pid) >= size { + return nil + } + locals = p.victim + l := indexLocal(locals, pid) + if x := l.private; x != nil { + l.private = nil + return x + } + for i := 0; i < int(size); i++ { + l := indexLocal(locals, (pid+i)%int(size)) + if x, _ := l.shared.popTail(); x != nil { + return x + } + } + + // Mark the victim cache as empty for future gets don't bother + // with it. + atomic.StoreUintptr(&p.victimSize, 0) + + return nil +} + +// pin pins the current goroutine to P, disables preemption and +// returns poolLocal pool for the P and the P's id. +// Caller must call runtime_procUnpin() when done with the pool. +func (p *Pool) pin() (*poolLocal, int) { + pid := runtime_procPin() + // In pinSlow we store to local and then to localSize, here we load in opposite order. + // Since we've disabled preemption, GC cannot happen in between. + // Thus here we must observe local at least as large localSize. + // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). + s := runtime_LoadAcquintptr(&p.localSize) // load-acquire + l := p.local // load-consume + if uintptr(pid) < s { + return indexLocal(l, pid), pid + } + return p.pinSlow() +} + +func (p *Pool) pinSlow() (*poolLocal, int) { + // Retry under the mutex. + // Can not lock the mutex while pinned. + runtime_procUnpin() + allPoolsMu.Lock() + defer allPoolsMu.Unlock() + pid := runtime_procPin() + // poolCleanup won't be called while we are pinned. + s := p.localSize + l := p.local + if uintptr(pid) < s { + return indexLocal(l, pid), pid + } + if p.local == nil { + allPools = append(allPools, p) + } + // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. + size := runtime.GOMAXPROCS(0) + local := make([]poolLocal, size) + atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release + runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release + return &local[pid], pid +} + +func poolCleanup() { + // This function is called with the world stopped, at the beginning of a garbage collection. + // It must not allocate and probably should not call any runtime functions. + + // Because the world is stopped, no pool user can be in a + // pinned section (in effect, this has all Ps pinned). + + // Drop victim caches from all pools. + for _, p := range oldPools { + p.victim = nil + p.victimSize = 0 + } + + // Move primary cache to victim cache. + for _, p := range allPools { + p.victim = p.local + p.victimSize = p.localSize + p.local = nil + p.localSize = 0 + } + + // The pools with non-empty primary caches now have non-empty + // victim caches and no pools have primary caches. + oldPools, allPools = allPools, nil +} + +var ( + allPoolsMu Mutex + + // allPools is the set of pools that have non-empty primary + // caches. Protected by either 1) allPoolsMu and pinning or 2) + // STW. + allPools []*Pool + + // oldPools is the set of pools that may have non-empty victim + // caches. Protected by STW. + oldPools []*Pool +) + +func init() { + runtime_registerPoolCleanup(poolCleanup) +} + +func indexLocal(l unsafe.Pointer, i int) *poolLocal { + lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{})) + return (*poolLocal)(lp) +} + +// Implemented in runtime. +func runtime_registerPoolCleanup(cleanup func()) +func runtime_procPin() int +func runtime_procUnpin() + +// The below are implemented in runtime/internal/atomic and the +// compiler also knows to intrinsify the symbol we linkname into this +// package. + +//go:linkname runtime_LoadAcquintptr runtime/internal/atomic.LoadAcquintptr +func runtime_LoadAcquintptr(ptr *uintptr) uintptr + +//go:linkname runtime_StoreReluintptr runtime/internal/atomic.StoreReluintptr +func runtime_StoreReluintptr(ptr *uintptr, val uintptr) uintptr diff --git a/src/sync/pool_test.go b/src/sync/pool_test.go new file mode 100644 index 0000000..ad98350 --- /dev/null +++ b/src/sync/pool_test.go @@ -0,0 +1,352 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Pool is no-op under race detector, so all these tests do not work. +// +build !race + +package sync_test + +import ( + "runtime" + "runtime/debug" + "sort" + . "sync" + "sync/atomic" + "testing" + "time" +) + +func TestPool(t *testing.T) { + // disable GC so we can control when it happens. + defer debug.SetGCPercent(debug.SetGCPercent(-1)) + var p Pool + if p.Get() != nil { + t.Fatal("expected empty") + } + + // Make sure that the goroutine doesn't migrate to another P + // between Put and Get calls. + Runtime_procPin() + p.Put("a") + p.Put("b") + if g := p.Get(); g != "a" { + t.Fatalf("got %#v; want a", g) + } + if g := p.Get(); g != "b" { + t.Fatalf("got %#v; want b", g) + } + if g := p.Get(); g != nil { + t.Fatalf("got %#v; want nil", g) + } + Runtime_procUnpin() + + // Put in a large number of objects so they spill into + // stealable space. + for i := 0; i < 100; i++ { + p.Put("c") + } + // After one GC, the victim cache should keep them alive. + runtime.GC() + if g := p.Get(); g != "c" { + t.Fatalf("got %#v; want c after GC", g) + } + // A second GC should drop the victim cache. + runtime.GC() + if g := p.Get(); g != nil { + t.Fatalf("got %#v; want nil after second GC", g) + } +} + +func TestPoolNew(t *testing.T) { + // disable GC so we can control when it happens. + defer debug.SetGCPercent(debug.SetGCPercent(-1)) + + i := 0 + p := Pool{ + New: func() interface{} { + i++ + return i + }, + } + if v := p.Get(); v != 1 { + t.Fatalf("got %v; want 1", v) + } + if v := p.Get(); v != 2 { + t.Fatalf("got %v; want 2", v) + } + + // Make sure that the goroutine doesn't migrate to another P + // between Put and Get calls. + Runtime_procPin() + p.Put(42) + if v := p.Get(); v != 42 { + t.Fatalf("got %v; want 42", v) + } + Runtime_procUnpin() + + if v := p.Get(); v != 3 { + t.Fatalf("got %v; want 3", v) + } +} + +// Test that Pool does not hold pointers to previously cached resources. +func TestPoolGC(t *testing.T) { + testPool(t, true) +} + +// Test that Pool releases resources on GC. +func TestPoolRelease(t *testing.T) { + testPool(t, false) +} + +func testPool(t *testing.T, drain bool) { + var p Pool + const N = 100 +loop: + for try := 0; try < 3; try++ { + if try == 1 && testing.Short() { + break + } + var fin, fin1 uint32 + for i := 0; i < N; i++ { + v := new(string) + runtime.SetFinalizer(v, func(vv *string) { + atomic.AddUint32(&fin, 1) + }) + p.Put(v) + } + if drain { + for i := 0; i < N; i++ { + p.Get() + } + } + for i := 0; i < 5; i++ { + runtime.GC() + time.Sleep(time.Duration(i*100+10) * time.Millisecond) + // 1 pointer can remain on stack or elsewhere + if fin1 = atomic.LoadUint32(&fin); fin1 >= N-1 { + continue loop + } + } + t.Fatalf("only %v out of %v resources are finalized on try %v", fin1, N, try) + } +} + +func TestPoolStress(t *testing.T) { + const P = 10 + N := int(1e6) + if testing.Short() { + N /= 100 + } + var p Pool + done := make(chan bool) + for i := 0; i < P; i++ { + go func() { + var v interface{} = 0 + for j := 0; j < N; j++ { + if v == nil { + v = 0 + } + p.Put(v) + v = p.Get() + if v != nil && v.(int) != 0 { + t.Errorf("expect 0, got %v", v) + break + } + } + done <- true + }() + } + for i := 0; i < P; i++ { + <-done + } +} + +func TestPoolDequeue(t *testing.T) { + testPoolDequeue(t, NewPoolDequeue(16)) +} + +func TestPoolChain(t *testing.T) { + testPoolDequeue(t, NewPoolChain()) +} + +func testPoolDequeue(t *testing.T, d PoolDequeue) { + const P = 10 + var N int = 2e6 + if testing.Short() { + N = 1e3 + } + have := make([]int32, N) + var stop int32 + var wg WaitGroup + record := func(val int) { + atomic.AddInt32(&have[val], 1) + if val == N-1 { + atomic.StoreInt32(&stop, 1) + } + } + + // Start P-1 consumers. + for i := 1; i < P; i++ { + wg.Add(1) + go func() { + fail := 0 + for atomic.LoadInt32(&stop) == 0 { + val, ok := d.PopTail() + if ok { + fail = 0 + record(val.(int)) + } else { + // Speed up the test by + // allowing the pusher to run. + if fail++; fail%100 == 0 { + runtime.Gosched() + } + } + } + wg.Done() + }() + } + + // Start 1 producer. + nPopHead := 0 + wg.Add(1) + go func() { + for j := 0; j < N; j++ { + for !d.PushHead(j) { + // Allow a popper to run. + runtime.Gosched() + } + if j%10 == 0 { + val, ok := d.PopHead() + if ok { + nPopHead++ + record(val.(int)) + } + } + } + wg.Done() + }() + wg.Wait() + + // Check results. + for i, count := range have { + if count != 1 { + t.Errorf("expected have[%d] = 1, got %d", i, count) + } + } + // Check that at least some PopHeads succeeded. We skip this + // check in short mode because it's common enough that the + // queue will stay nearly empty all the time and a PopTail + // will happen during the window between every PushHead and + // PopHead. + if !testing.Short() && nPopHead == 0 { + t.Errorf("popHead never succeeded") + } +} + +func BenchmarkPool(b *testing.B) { + var p Pool + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p.Put(1) + p.Get() + } + }) +} + +func BenchmarkPoolOverflow(b *testing.B) { + var p Pool + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for b := 0; b < 100; b++ { + p.Put(1) + } + for b := 0; b < 100; b++ { + p.Get() + } + } + }) +} + +var globalSink interface{} + +func BenchmarkPoolSTW(b *testing.B) { + // Take control of GC. + defer debug.SetGCPercent(debug.SetGCPercent(-1)) + + var mstats runtime.MemStats + var pauses []uint64 + + var p Pool + for i := 0; i < b.N; i++ { + // Put a large number of items into a pool. + const N = 100000 + var item interface{} = 42 + for i := 0; i < N; i++ { + p.Put(item) + } + // Do a GC. + runtime.GC() + // Record pause time. + runtime.ReadMemStats(&mstats) + pauses = append(pauses, mstats.PauseNs[(mstats.NumGC+255)%256]) + } + + // Get pause time stats. + sort.Slice(pauses, func(i, j int) bool { return pauses[i] < pauses[j] }) + var total uint64 + for _, ns := range pauses { + total += ns + } + // ns/op for this benchmark is average STW time. + b.ReportMetric(float64(total)/float64(b.N), "ns/op") + b.ReportMetric(float64(pauses[len(pauses)*95/100]), "p95-ns/STW") + b.ReportMetric(float64(pauses[len(pauses)*50/100]), "p50-ns/STW") +} + +func BenchmarkPoolExpensiveNew(b *testing.B) { + // Populate a pool with items that are expensive to construct + // to stress pool cleanup and subsequent reconstruction. + + // Create a ballast so the GC has a non-zero heap size and + // runs at reasonable times. + globalSink = make([]byte, 8<<20) + defer func() { globalSink = nil }() + + // Create a pool that's "expensive" to fill. + var p Pool + var nNew uint64 + p.New = func() interface{} { + atomic.AddUint64(&nNew, 1) + time.Sleep(time.Millisecond) + return 42 + } + var mstats1, mstats2 runtime.MemStats + runtime.ReadMemStats(&mstats1) + b.RunParallel(func(pb *testing.PB) { + // Simulate 100X the number of goroutines having items + // checked out from the Pool simultaneously. + items := make([]interface{}, 100) + var sink []byte + for pb.Next() { + // Stress the pool. + for i := range items { + items[i] = p.Get() + // Simulate doing some work with this + // item checked out. + sink = make([]byte, 32<<10) + } + for i, v := range items { + p.Put(v) + items[i] = nil + } + } + _ = sink + }) + runtime.ReadMemStats(&mstats2) + + b.ReportMetric(float64(mstats2.NumGC-mstats1.NumGC)/float64(b.N), "GCs/op") + b.ReportMetric(float64(nNew)/float64(b.N), "New/op") +} diff --git a/src/sync/poolqueue.go b/src/sync/poolqueue.go new file mode 100644 index 0000000..9be83e9 --- /dev/null +++ b/src/sync/poolqueue.go @@ -0,0 +1,309 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" +) + +// poolDequeue is a lock-free fixed-size single-producer, +// multi-consumer queue. The single producer can both push and pop +// from the head, and consumers can pop from the tail. +// +// It has the added feature that it nils out unused slots to avoid +// unnecessary retention of objects. This is important for sync.Pool, +// but not typically a property considered in the literature. +type poolDequeue struct { + // headTail packs together a 32-bit head index and a 32-bit + // tail index. Both are indexes into vals modulo len(vals)-1. + // + // tail = index of oldest data in queue + // head = index of next slot to fill + // + // Slots in the range [tail, head) are owned by consumers. + // A consumer continues to own a slot outside this range until + // it nils the slot, at which point ownership passes to the + // producer. + // + // The head index is stored in the most-significant bits so + // that we can atomically add to it and the overflow is + // harmless. + headTail uint64 + + // vals is a ring buffer of interface{} values stored in this + // dequeue. The size of this must be a power of 2. + // + // vals[i].typ is nil if the slot is empty and non-nil + // otherwise. A slot is still in use until *both* the tail + // index has moved beyond it and typ has been set to nil. This + // is set to nil atomically by the consumer and read + // atomically by the producer. + vals []eface +} + +type eface struct { + typ, val unsafe.Pointer +} + +const dequeueBits = 32 + +// dequeueLimit is the maximum size of a poolDequeue. +// +// This must be at most (1<<dequeueBits)/2 because detecting fullness +// depends on wrapping around the ring buffer without wrapping around +// the index. We divide by 4 so this fits in an int on 32-bit. +const dequeueLimit = (1 << dequeueBits) / 4 + +// dequeueNil is used in poolDequeue to represent interface{}(nil). +// Since we use nil to represent empty slots, we need a sentinel value +// to represent nil. +type dequeueNil *struct{} + +func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) { + const mask = 1<<dequeueBits - 1 + head = uint32((ptrs >> dequeueBits) & mask) + tail = uint32(ptrs & mask) + return +} + +func (d *poolDequeue) pack(head, tail uint32) uint64 { + const mask = 1<<dequeueBits - 1 + return (uint64(head) << dequeueBits) | + uint64(tail&mask) +} + +// pushHead adds val at the head of the queue. It returns false if the +// queue is full. It must only be called by a single producer. +func (d *poolDequeue) pushHead(val interface{}) bool { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { + // Queue is full. + return false + } + slot := &d.vals[head&uint32(len(d.vals)-1)] + + // Check if the head slot has been released by popTail. + typ := atomic.LoadPointer(&slot.typ) + if typ != nil { + // Another goroutine is still cleaning up the tail, so + // the queue is actually still full. + return false + } + + // The head slot is free, so we own it. + if val == nil { + val = dequeueNil(nil) + } + *(*interface{})(unsafe.Pointer(slot)) = val + + // Increment head. This passes ownership of slot to popTail + // and acts as a store barrier for writing the slot. + atomic.AddUint64(&d.headTail, 1<<dequeueBits) + return true +} + +// popHead removes and returns the element at the head of the queue. +// It returns false if the queue is empty. It must only be called by a +// single producer. +func (d *poolDequeue) popHead() (interface{}, bool) { + var slot *eface + for { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if tail == head { + // Queue is empty. + return nil, false + } + + // Confirm tail and decrement head. We do this before + // reading the value to take back ownership of this + // slot. + head-- + ptrs2 := d.pack(head, tail) + if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { + // We successfully took back slot. + slot = &d.vals[head&uint32(len(d.vals)-1)] + break + } + } + + val := *(*interface{})(unsafe.Pointer(slot)) + if val == dequeueNil(nil) { + val = nil + } + // Zero the slot. Unlike popTail, this isn't racing with + // pushHead, so we don't need to be careful here. + *slot = eface{} + return val, true +} + +// popTail removes and returns the element at the tail of the queue. +// It returns false if the queue is empty. It may be called by any +// number of consumers. +func (d *poolDequeue) popTail() (interface{}, bool) { + var slot *eface + for { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if tail == head { + // Queue is empty. + return nil, false + } + + // Confirm head and tail (for our speculative check + // above) and increment tail. If this succeeds, then + // we own the slot at tail. + ptrs2 := d.pack(head, tail+1) + if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { + // Success. + slot = &d.vals[tail&uint32(len(d.vals)-1)] + break + } + } + + // We now own slot. + val := *(*interface{})(unsafe.Pointer(slot)) + if val == dequeueNil(nil) { + val = nil + } + + // Tell pushHead that we're done with this slot. Zeroing the + // slot is also important so we don't leave behind references + // that could keep this object live longer than necessary. + // + // We write to val first and then publish that we're done with + // this slot by atomically writing to typ. + slot.val = nil + atomic.StorePointer(&slot.typ, nil) + // At this point pushHead owns the slot. + + return val, true +} + +// poolChain is a dynamically-sized version of poolDequeue. +// +// This is implemented as a doubly-linked list queue of poolDequeues +// where each dequeue is double the size of the previous one. Once a +// dequeue fills up, this allocates a new one and only ever pushes to +// the latest dequeue. Pops happen from the other end of the list and +// once a dequeue is exhausted, it gets removed from the list. +type poolChain struct { + // head is the poolDequeue to push to. This is only accessed + // by the producer, so doesn't need to be synchronized. + head *poolChainElt + + // tail is the poolDequeue to popTail from. This is accessed + // by consumers, so reads and writes must be atomic. + tail *poolChainElt +} + +type poolChainElt struct { + poolDequeue + + // next and prev link to the adjacent poolChainElts in this + // poolChain. + // + // next is written atomically by the producer and read + // atomically by the consumer. It only transitions from nil to + // non-nil. + // + // prev is written atomically by the consumer and read + // atomically by the producer. It only transitions from + // non-nil to nil. + next, prev *poolChainElt +} + +func storePoolChainElt(pp **poolChainElt, v *poolChainElt) { + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v)) +} + +func loadPoolChainElt(pp **poolChainElt) *poolChainElt { + return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp)))) +} + +func (c *poolChain) pushHead(val interface{}) { + d := c.head + if d == nil { + // Initialize the chain. + const initSize = 8 // Must be a power of 2 + d = new(poolChainElt) + d.vals = make([]eface, initSize) + c.head = d + storePoolChainElt(&c.tail, d) + } + + if d.pushHead(val) { + return + } + + // The current dequeue is full. Allocate a new one of twice + // the size. + newSize := len(d.vals) * 2 + if newSize >= dequeueLimit { + // Can't make it any bigger. + newSize = dequeueLimit + } + + d2 := &poolChainElt{prev: d} + d2.vals = make([]eface, newSize) + c.head = d2 + storePoolChainElt(&d.next, d2) + d2.pushHead(val) +} + +func (c *poolChain) popHead() (interface{}, bool) { + d := c.head + for d != nil { + if val, ok := d.popHead(); ok { + return val, ok + } + // There may still be unconsumed elements in the + // previous dequeue, so try backing up. + d = loadPoolChainElt(&d.prev) + } + return nil, false +} + +func (c *poolChain) popTail() (interface{}, bool) { + d := loadPoolChainElt(&c.tail) + if d == nil { + return nil, false + } + + for { + // It's important that we load the next pointer + // *before* popping the tail. In general, d may be + // transiently empty, but if next is non-nil before + // the pop and the pop fails, then d is permanently + // empty, which is the only condition under which it's + // safe to drop d from the chain. + d2 := loadPoolChainElt(&d.next) + + if val, ok := d.popTail(); ok { + return val, ok + } + + if d2 == nil { + // This is the only dequeue. It's empty right + // now, but could be pushed to in the future. + return nil, false + } + + // The tail of the chain has been drained, so move on + // to the next dequeue. Try to drop it from the chain + // so the next pop doesn't have to look at the empty + // dequeue again. + if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { + // We won the race. Clear the prev pointer so + // the garbage collector can collect the empty + // dequeue and so popHead doesn't back up + // further than necessary. + storePoolChainElt(&d2.prev, nil) + } + d = d2 + } +} diff --git a/src/sync/runtime.go b/src/sync/runtime.go new file mode 100644 index 0000000..de2b0a3 --- /dev/null +++ b/src/sync/runtime.go @@ -0,0 +1,57 @@ +// Copyright 2012 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import "unsafe" + +// defined in package runtime + +// Semacquire waits until *s > 0 and then atomically decrements it. +// It is intended as a simple sleep primitive for use by the synchronization +// library and should not be used directly. +func runtime_Semacquire(s *uint32) + +// SemacquireMutex is like Semacquire, but for profiling contended Mutexes. +// If lifo is true, queue waiter at the head of wait queue. +// skipframes is the number of frames to omit during tracing, counting from +// runtime_SemacquireMutex's caller. +func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int) + +// Semrelease atomically increments *s and notifies a waiting goroutine +// if one is blocked in Semacquire. +// It is intended as a simple wakeup primitive for use by the synchronization +// library and should not be used directly. +// If handoff is true, pass count directly to the first waiter. +// skipframes is the number of frames to omit during tracing, counting from +// runtime_Semrelease's caller. +func runtime_Semrelease(s *uint32, handoff bool, skipframes int) + +// See runtime/sema.go for documentation. +func runtime_notifyListAdd(l *notifyList) uint32 + +// See runtime/sema.go for documentation. +func runtime_notifyListWait(l *notifyList, t uint32) + +// See runtime/sema.go for documentation. +func runtime_notifyListNotifyAll(l *notifyList) + +// See runtime/sema.go for documentation. +func runtime_notifyListNotifyOne(l *notifyList) + +// Ensure that sync and runtime agree on size of notifyList. +func runtime_notifyListCheck(size uintptr) +func init() { + var n notifyList + runtime_notifyListCheck(unsafe.Sizeof(n)) +} + +// Active spinning runtime support. +// runtime_canSpin reports whether spinning makes sense at the moment. +func runtime_canSpin(i int) bool + +// runtime_doSpin does active spinning. +func runtime_doSpin() + +func runtime_nanotime() int64 diff --git a/src/sync/runtime2.go b/src/sync/runtime2.go new file mode 100644 index 0000000..f10c4e8 --- /dev/null +++ b/src/sync/runtime2.go @@ -0,0 +1,19 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !goexperiment.staticlockranking + +package sync + +import "unsafe" + +// Approximation of notifyList in runtime/sema.go. Size and alignment must +// agree. +type notifyList struct { + wait uint32 + notify uint32 + lock uintptr // key field of the mutex + head unsafe.Pointer + tail unsafe.Pointer +} diff --git a/src/sync/runtime2_lockrank.go b/src/sync/runtime2_lockrank.go new file mode 100644 index 0000000..aaa1c27 --- /dev/null +++ b/src/sync/runtime2_lockrank.go @@ -0,0 +1,22 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build goexperiment.staticlockranking + +package sync + +import "unsafe" + +// Approximation of notifyList in runtime/sema.go. Size and alignment must +// agree. +type notifyList struct { + wait uint32 + notify uint32 + rank int // rank field of the mutex + pad int // pad field of the mutex + lock uintptr // key field of the mutex + + head unsafe.Pointer + tail unsafe.Pointer +} diff --git a/src/sync/runtime_sema_test.go b/src/sync/runtime_sema_test.go new file mode 100644 index 0000000..152cf0e --- /dev/null +++ b/src/sync/runtime_sema_test.go @@ -0,0 +1,75 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + "runtime" + . "sync" + "testing" +) + +func BenchmarkSemaUncontended(b *testing.B) { + type PaddedSem struct { + sem uint32 + pad [32]uint32 + } + b.RunParallel(func(pb *testing.PB) { + sem := new(PaddedSem) + for pb.Next() { + Runtime_Semrelease(&sem.sem, false, 0) + Runtime_Semacquire(&sem.sem) + } + }) +} + +func benchmarkSema(b *testing.B, block, work bool) { + if b.N == 0 { + return + } + sem := uint32(0) + if block { + done := make(chan bool) + go func() { + for p := 0; p < runtime.GOMAXPROCS(0)/2; p++ { + Runtime_Semacquire(&sem) + } + done <- true + }() + defer func() { + <-done + }() + } + b.RunParallel(func(pb *testing.PB) { + foo := 0 + for pb.Next() { + Runtime_Semrelease(&sem, false, 0) + if work { + for i := 0; i < 100; i++ { + foo *= 2 + foo /= 2 + } + } + Runtime_Semacquire(&sem) + } + _ = foo + Runtime_Semrelease(&sem, false, 0) + }) +} + +func BenchmarkSemaSyntNonblock(b *testing.B) { + benchmarkSema(b, false, false) +} + +func BenchmarkSemaSyntBlock(b *testing.B) { + benchmarkSema(b, true, false) +} + +func BenchmarkSemaWorkNonblock(b *testing.B) { + benchmarkSema(b, false, true) +} + +func BenchmarkSemaWorkBlock(b *testing.B) { + benchmarkSema(b, true, true) +} diff --git a/src/sync/rwmutex.go b/src/sync/rwmutex.go new file mode 100644 index 0000000..3012b55 --- /dev/null +++ b/src/sync/rwmutex.go @@ -0,0 +1,164 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "internal/race" + "sync/atomic" + "unsafe" +) + +// There is a modified copy of this file in runtime/rwmutex.go. +// If you make any changes here, see if you should make them there. + +// A RWMutex is a reader/writer mutual exclusion lock. +// The lock can be held by an arbitrary number of readers or a single writer. +// The zero value for a RWMutex is an unlocked mutex. +// +// A RWMutex must not be copied after first use. +// +// If a goroutine holds a RWMutex for reading and another goroutine might +// call Lock, no goroutine should expect to be able to acquire a read lock +// until the initial read lock is released. In particular, this prohibits +// recursive read locking. This is to ensure that the lock eventually becomes +// available; a blocked Lock call excludes new readers from acquiring the +// lock. +type RWMutex struct { + w Mutex // held if there are pending writers + writerSem uint32 // semaphore for writers to wait for completing readers + readerSem uint32 // semaphore for readers to wait for completing writers + readerCount int32 // number of pending readers + readerWait int32 // number of departing readers +} + +const rwmutexMaxReaders = 1 << 30 + +// Happens-before relationships are indicated to the race detector via: +// - Unlock -> Lock: readerSem +// - Unlock -> RLock: readerSem +// - RUnlock -> Lock: writerSem +// +// The methods below temporarily disable handling of race synchronization +// events in order to provide the more precise model above to the race +// detector. +// +// For example, atomic.AddInt32 in RLock should not appear to provide +// acquire-release semantics, which would incorrectly synchronize racing +// readers, thus potentially missing races. + +// RLock locks rw for reading. +// +// It should not be used for recursive read locking; a blocked Lock +// call excludes new readers from acquiring the lock. See the +// documentation on the RWMutex type. +func (rw *RWMutex) RLock() { + if race.Enabled { + _ = rw.w.state + race.Disable() + } + if atomic.AddInt32(&rw.readerCount, 1) < 0 { + // A writer is pending, wait for it. + runtime_SemacquireMutex(&rw.readerSem, false, 0) + } + if race.Enabled { + race.Enable() + race.Acquire(unsafe.Pointer(&rw.readerSem)) + } +} + +// RUnlock undoes a single RLock call; +// it does not affect other simultaneous readers. +// It is a run-time error if rw is not locked for reading +// on entry to RUnlock. +func (rw *RWMutex) RUnlock() { + if race.Enabled { + _ = rw.w.state + race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) + race.Disable() + } + if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { + // Outlined slow-path to allow the fast-path to be inlined + rw.rUnlockSlow(r) + } + if race.Enabled { + race.Enable() + } +} + +func (rw *RWMutex) rUnlockSlow(r int32) { + if r+1 == 0 || r+1 == -rwmutexMaxReaders { + race.Enable() + throw("sync: RUnlock of unlocked RWMutex") + } + // A writer is pending. + if atomic.AddInt32(&rw.readerWait, -1) == 0 { + // The last reader unblocks the writer. + runtime_Semrelease(&rw.writerSem, false, 1) + } +} + +// Lock locks rw for writing. +// If the lock is already locked for reading or writing, +// Lock blocks until the lock is available. +func (rw *RWMutex) Lock() { + if race.Enabled { + _ = rw.w.state + race.Disable() + } + // First, resolve competition with other writers. + rw.w.Lock() + // Announce to readers there is a pending writer. + r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders + // Wait for active readers. + if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { + runtime_SemacquireMutex(&rw.writerSem, false, 0) + } + if race.Enabled { + race.Enable() + race.Acquire(unsafe.Pointer(&rw.readerSem)) + race.Acquire(unsafe.Pointer(&rw.writerSem)) + } +} + +// Unlock unlocks rw for writing. It is a run-time error if rw is +// not locked for writing on entry to Unlock. +// +// As with Mutexes, a locked RWMutex is not associated with a particular +// goroutine. One goroutine may RLock (Lock) a RWMutex and then +// arrange for another goroutine to RUnlock (Unlock) it. +func (rw *RWMutex) Unlock() { + if race.Enabled { + _ = rw.w.state + race.Release(unsafe.Pointer(&rw.readerSem)) + race.Disable() + } + + // Announce to readers there is no active writer. + r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) + if r >= rwmutexMaxReaders { + race.Enable() + throw("sync: Unlock of unlocked RWMutex") + } + // Unblock blocked readers, if any. + for i := 0; i < int(r); i++ { + runtime_Semrelease(&rw.readerSem, false, 0) + } + // Allow other writers to proceed. + rw.w.Unlock() + if race.Enabled { + race.Enable() + } +} + +// RLocker returns a Locker interface that implements +// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock. +func (rw *RWMutex) RLocker() Locker { + return (*rlocker)(rw) +} + +type rlocker RWMutex + +func (r *rlocker) Lock() { (*RWMutex)(r).RLock() } +func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() } diff --git a/src/sync/rwmutex_test.go b/src/sync/rwmutex_test.go new file mode 100644 index 0000000..c98e69f --- /dev/null +++ b/src/sync/rwmutex_test.go @@ -0,0 +1,217 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// GOMAXPROCS=10 go test + +package sync_test + +import ( + "fmt" + "runtime" + . "sync" + "sync/atomic" + "testing" +) + +// There is a modified copy of this file in runtime/rwmutex_test.go. +// If you make any changes here, see if you should make them there. + +func parallelReader(m *RWMutex, clocked, cunlock, cdone chan bool) { + m.RLock() + clocked <- true + <-cunlock + m.RUnlock() + cdone <- true +} + +func doTestParallelReaders(numReaders, gomaxprocs int) { + runtime.GOMAXPROCS(gomaxprocs) + var m RWMutex + clocked := make(chan bool) + cunlock := make(chan bool) + cdone := make(chan bool) + for i := 0; i < numReaders; i++ { + go parallelReader(&m, clocked, cunlock, cdone) + } + // Wait for all parallel RLock()s to succeed. + for i := 0; i < numReaders; i++ { + <-clocked + } + for i := 0; i < numReaders; i++ { + cunlock <- true + } + // Wait for the goroutines to finish. + for i := 0; i < numReaders; i++ { + <-cdone + } +} + +func TestParallelReaders(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) + doTestParallelReaders(1, 4) + doTestParallelReaders(3, 4) + doTestParallelReaders(4, 2) +} + +func reader(rwm *RWMutex, num_iterations int, activity *int32, cdone chan bool) { + for i := 0; i < num_iterations; i++ { + rwm.RLock() + n := atomic.AddInt32(activity, 1) + if n < 1 || n >= 10000 { + rwm.RUnlock() + panic(fmt.Sprintf("wlock(%d)\n", n)) + } + for i := 0; i < 100; i++ { + } + atomic.AddInt32(activity, -1) + rwm.RUnlock() + } + cdone <- true +} + +func writer(rwm *RWMutex, num_iterations int, activity *int32, cdone chan bool) { + for i := 0; i < num_iterations; i++ { + rwm.Lock() + n := atomic.AddInt32(activity, 10000) + if n != 10000 { + rwm.Unlock() + panic(fmt.Sprintf("wlock(%d)\n", n)) + } + for i := 0; i < 100; i++ { + } + atomic.AddInt32(activity, -10000) + rwm.Unlock() + } + cdone <- true +} + +func HammerRWMutex(gomaxprocs, numReaders, num_iterations int) { + runtime.GOMAXPROCS(gomaxprocs) + // Number of active readers + 10000 * number of active writers. + var activity int32 + var rwm RWMutex + cdone := make(chan bool) + go writer(&rwm, num_iterations, &activity, cdone) + var i int + for i = 0; i < numReaders/2; i++ { + go reader(&rwm, num_iterations, &activity, cdone) + } + go writer(&rwm, num_iterations, &activity, cdone) + for ; i < numReaders; i++ { + go reader(&rwm, num_iterations, &activity, cdone) + } + // Wait for the 2 writers and all readers to finish. + for i := 0; i < 2+numReaders; i++ { + <-cdone + } +} + +func TestRWMutex(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) + n := 1000 + if testing.Short() { + n = 5 + } + HammerRWMutex(1, 1, n) + HammerRWMutex(1, 3, n) + HammerRWMutex(1, 10, n) + HammerRWMutex(4, 1, n) + HammerRWMutex(4, 3, n) + HammerRWMutex(4, 10, n) + HammerRWMutex(10, 1, n) + HammerRWMutex(10, 3, n) + HammerRWMutex(10, 10, n) + HammerRWMutex(10, 5, n) +} + +func TestRLocker(t *testing.T) { + var wl RWMutex + var rl Locker + wlocked := make(chan bool, 1) + rlocked := make(chan bool, 1) + rl = wl.RLocker() + n := 10 + go func() { + for i := 0; i < n; i++ { + rl.Lock() + rl.Lock() + rlocked <- true + wl.Lock() + wlocked <- true + } + }() + for i := 0; i < n; i++ { + <-rlocked + rl.Unlock() + select { + case <-wlocked: + t.Fatal("RLocker() didn't read-lock it") + default: + } + rl.Unlock() + <-wlocked + select { + case <-rlocked: + t.Fatal("RLocker() didn't respect the write lock") + default: + } + wl.Unlock() + } +} + +func BenchmarkRWMutexUncontended(b *testing.B) { + type PaddedRWMutex struct { + RWMutex + pad [32]uint32 + } + b.RunParallel(func(pb *testing.PB) { + var rwm PaddedRWMutex + for pb.Next() { + rwm.RLock() + rwm.RLock() + rwm.RUnlock() + rwm.RUnlock() + rwm.Lock() + rwm.Unlock() + } + }) +} + +func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) { + var rwm RWMutex + b.RunParallel(func(pb *testing.PB) { + foo := 0 + for pb.Next() { + foo++ + if foo%writeRatio == 0 { + rwm.Lock() + rwm.Unlock() + } else { + rwm.RLock() + for i := 0; i != localWork; i += 1 { + foo *= 2 + foo /= 2 + } + rwm.RUnlock() + } + } + _ = foo + }) +} + +func BenchmarkRWMutexWrite100(b *testing.B) { + benchmarkRWMutex(b, 0, 100) +} + +func BenchmarkRWMutexWrite10(b *testing.B) { + benchmarkRWMutex(b, 0, 10) +} + +func BenchmarkRWMutexWorkWrite100(b *testing.B) { + benchmarkRWMutex(b, 100, 100) +} + +func BenchmarkRWMutexWorkWrite10(b *testing.B) { + benchmarkRWMutex(b, 100, 10) +} diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go new file mode 100644 index 0000000..e81a493 --- /dev/null +++ b/src/sync/waitgroup.go @@ -0,0 +1,141 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "internal/race" + "sync/atomic" + "unsafe" +) + +// A WaitGroup waits for a collection of goroutines to finish. +// The main goroutine calls Add to set the number of +// goroutines to wait for. Then each of the goroutines +// runs and calls Done when finished. At the same time, +// Wait can be used to block until all goroutines have finished. +// +// A WaitGroup must not be copied after first use. +type WaitGroup struct { + noCopy noCopy + + // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. + // 64-bit atomic operations require 64-bit alignment, but 32-bit + // compilers do not ensure it. So we allocate 12 bytes and then use + // the aligned 8 bytes in them as state, and the other 4 as storage + // for the sema. + state1 [3]uint32 +} + +// state returns pointers to the state and sema fields stored within wg.state1. +func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { + if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { + return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] + } else { + return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] + } +} + +// Add adds delta, which may be negative, to the WaitGroup counter. +// If the counter becomes zero, all goroutines blocked on Wait are released. +// If the counter goes negative, Add panics. +// +// Note that calls with a positive delta that occur when the counter is zero +// must happen before a Wait. Calls with a negative delta, or calls with a +// positive delta that start when the counter is greater than zero, may happen +// at any time. +// Typically this means the calls to Add should execute before the statement +// creating the goroutine or other event to be waited for. +// If a WaitGroup is reused to wait for several independent sets of events, +// new Add calls must happen after all previous Wait calls have returned. +// See the WaitGroup example. +func (wg *WaitGroup) Add(delta int) { + statep, semap := wg.state() + if race.Enabled { + _ = *statep // trigger nil deref early + if delta < 0 { + // Synchronize decrements with Wait. + race.ReleaseMerge(unsafe.Pointer(wg)) + } + race.Disable() + defer race.Enable() + } + state := atomic.AddUint64(statep, uint64(delta)<<32) + v := int32(state >> 32) + w := uint32(state) + if race.Enabled && delta > 0 && v == int32(delta) { + // The first increment must be synchronized with Wait. + // Need to model this as a read, because there can be + // several concurrent wg.counter transitions from 0. + race.Read(unsafe.Pointer(semap)) + } + if v < 0 { + panic("sync: negative WaitGroup counter") + } + if w != 0 && delta > 0 && v == int32(delta) { + panic("sync: WaitGroup misuse: Add called concurrently with Wait") + } + if v > 0 || w == 0 { + return + } + // This goroutine has set counter to 0 when waiters > 0. + // Now there can't be concurrent mutations of state: + // - Adds must not happen concurrently with Wait, + // - Wait does not increment waiters if it sees counter == 0. + // Still do a cheap sanity check to detect WaitGroup misuse. + if *statep != state { + panic("sync: WaitGroup misuse: Add called concurrently with Wait") + } + // Reset waiters count to 0. + *statep = 0 + for ; w != 0; w-- { + runtime_Semrelease(semap, false, 0) + } +} + +// Done decrements the WaitGroup counter by one. +func (wg *WaitGroup) Done() { + wg.Add(-1) +} + +// Wait blocks until the WaitGroup counter is zero. +func (wg *WaitGroup) Wait() { + statep, semap := wg.state() + if race.Enabled { + _ = *statep // trigger nil deref early + race.Disable() + } + for { + state := atomic.LoadUint64(statep) + v := int32(state >> 32) + w := uint32(state) + if v == 0 { + // Counter is 0, no need to wait. + if race.Enabled { + race.Enable() + race.Acquire(unsafe.Pointer(wg)) + } + return + } + // Increment waiters count. + if atomic.CompareAndSwapUint64(statep, state, state+1) { + if race.Enabled && w == 0 { + // Wait must be synchronized with the first Add. + // Need to model this is as a write to race with the read in Add. + // As a consequence, can do the write only for the first waiter, + // otherwise concurrent Waits will race with each other. + race.Write(unsafe.Pointer(semap)) + } + runtime_Semacquire(semap) + if *statep != 0 { + panic("sync: WaitGroup is reused before previous Wait has returned") + } + if race.Enabled { + race.Enable() + race.Acquire(unsafe.Pointer(wg)) + } + return + } + } +} diff --git a/src/sync/waitgroup_test.go b/src/sync/waitgroup_test.go new file mode 100644 index 0000000..c569e0f --- /dev/null +++ b/src/sync/waitgroup_test.go @@ -0,0 +1,301 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + "internal/race" + "runtime" + . "sync" + "sync/atomic" + "testing" +) + +func testWaitGroup(t *testing.T, wg1 *WaitGroup, wg2 *WaitGroup) { + n := 16 + wg1.Add(n) + wg2.Add(n) + exited := make(chan bool, n) + for i := 0; i != n; i++ { + go func() { + wg1.Done() + wg2.Wait() + exited <- true + }() + } + wg1.Wait() + for i := 0; i != n; i++ { + select { + case <-exited: + t.Fatal("WaitGroup released group too soon") + default: + } + wg2.Done() + } + for i := 0; i != n; i++ { + <-exited // Will block if barrier fails to unlock someone. + } +} + +func TestWaitGroup(t *testing.T) { + wg1 := &WaitGroup{} + wg2 := &WaitGroup{} + + // Run the same test a few times to ensure barrier is in a proper state. + for i := 0; i != 8; i++ { + testWaitGroup(t, wg1, wg2) + } +} + +func knownRacy(t *testing.T) { + if race.Enabled { + t.Skip("skipping known-racy test under the race detector") + } +} + +func TestWaitGroupMisuse(t *testing.T) { + defer func() { + err := recover() + if err != "sync: negative WaitGroup counter" { + t.Fatalf("Unexpected panic: %#v", err) + } + }() + wg := &WaitGroup{} + wg.Add(1) + wg.Done() + wg.Done() + t.Fatal("Should panic") +} + +// pollUntilEqual blocks until v, loaded atomically, is +// equal to the target. +func pollUntilEqual(v *uint32, target uint32) { + for { + for i := 0; i < 1e3; i++ { + if atomic.LoadUint32(v) == target { + return + } + } + // yield to avoid deadlock with the garbage collector + // see issue #20072 + runtime.Gosched() + } +} + +func TestWaitGroupMisuse2(t *testing.T) { + knownRacy(t) + if runtime.NumCPU() <= 4 { + t.Skip("NumCPU<=4, skipping: this test requires parallelism") + } + defer func() { + err := recover() + if err != "sync: negative WaitGroup counter" && + err != "sync: WaitGroup misuse: Add called concurrently with Wait" && + err != "sync: WaitGroup is reused before previous Wait has returned" { + t.Fatalf("Unexpected panic: %#v", err) + } + }() + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4)) + done := make(chan interface{}, 2) + // The detection is opportunistic, so we want it to panic + // at least in one run out of a million. + for i := 0; i < 1e6; i++ { + var wg WaitGroup + var here uint32 + wg.Add(1) + go func() { + defer func() { + done <- recover() + }() + atomic.AddUint32(&here, 1) + pollUntilEqual(&here, 3) + wg.Wait() + }() + go func() { + defer func() { + done <- recover() + }() + atomic.AddUint32(&here, 1) + pollUntilEqual(&here, 3) + wg.Add(1) // This is the bad guy. + wg.Done() + }() + atomic.AddUint32(&here, 1) + pollUntilEqual(&here, 3) + wg.Done() + for j := 0; j < 2; j++ { + if err := <-done; err != nil { + panic(err) + } + } + } + t.Fatal("Should panic") +} + +func TestWaitGroupMisuse3(t *testing.T) { + knownRacy(t) + if runtime.NumCPU() <= 1 { + t.Skip("NumCPU==1, skipping: this test requires parallelism") + } + defer func() { + err := recover() + if err != "sync: negative WaitGroup counter" && + err != "sync: WaitGroup misuse: Add called concurrently with Wait" && + err != "sync: WaitGroup is reused before previous Wait has returned" { + t.Fatalf("Unexpected panic: %#v", err) + } + }() + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4)) + done := make(chan interface{}, 3) + // The detection is opportunistically, so we want it to panic + // at least in one run out of a million. + for i := 0; i < 1e6; i++ { + var wg WaitGroup + wg.Add(1) + go func() { + defer func() { + done <- recover() + }() + wg.Done() + }() + go func() { + defer func() { + done <- recover() + }() + wg.Wait() + // Start reusing the wg before waiting for the Wait below to return. + wg.Add(1) + go func() { + wg.Done() + }() + wg.Wait() + }() + go func() { + defer func() { + done <- recover() + }() + wg.Wait() + }() + for j := 0; j < 3; j++ { + if err := <-done; err != nil { + panic(err) + } + } + } + t.Fatal("Should panic") +} + +func TestWaitGroupRace(t *testing.T) { + // Run this test for about 1ms. + for i := 0; i < 1000; i++ { + wg := &WaitGroup{} + n := new(int32) + // spawn goroutine 1 + wg.Add(1) + go func() { + atomic.AddInt32(n, 1) + wg.Done() + }() + // spawn goroutine 2 + wg.Add(1) + go func() { + atomic.AddInt32(n, 1) + wg.Done() + }() + // Wait for goroutine 1 and 2 + wg.Wait() + if atomic.LoadInt32(n) != 2 { + t.Fatal("Spurious wakeup from Wait") + } + } +} + +func TestWaitGroupAlign(t *testing.T) { + type X struct { + x byte + wg WaitGroup + } + var x X + x.wg.Add(1) + go func(x *X) { + x.wg.Done() + }(&x) + x.wg.Wait() +} + +func BenchmarkWaitGroupUncontended(b *testing.B) { + type PaddedWaitGroup struct { + WaitGroup + pad [128]uint8 + } + b.RunParallel(func(pb *testing.PB) { + var wg PaddedWaitGroup + for pb.Next() { + wg.Add(1) + wg.Done() + wg.Wait() + } + }) +} + +func benchmarkWaitGroupAddDone(b *testing.B, localWork int) { + var wg WaitGroup + b.RunParallel(func(pb *testing.PB) { + foo := 0 + for pb.Next() { + wg.Add(1) + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + wg.Done() + } + _ = foo + }) +} + +func BenchmarkWaitGroupAddDone(b *testing.B) { + benchmarkWaitGroupAddDone(b, 0) +} + +func BenchmarkWaitGroupAddDoneWork(b *testing.B) { + benchmarkWaitGroupAddDone(b, 100) +} + +func benchmarkWaitGroupWait(b *testing.B, localWork int) { + var wg WaitGroup + b.RunParallel(func(pb *testing.PB) { + foo := 0 + for pb.Next() { + wg.Wait() + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + } + _ = foo + }) +} + +func BenchmarkWaitGroupWait(b *testing.B) { + benchmarkWaitGroupWait(b, 0) +} + +func BenchmarkWaitGroupWaitWork(b *testing.B) { + benchmarkWaitGroupWait(b, 100) +} + +func BenchmarkWaitGroupActuallyWait(b *testing.B) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var wg WaitGroup + wg.Add(1) + go func() { + wg.Done() + }() + wg.Wait() + } + }) +} |