1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package poll_test
import (
"internal/poll"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)
var closeHook atomic.Value // func(fd int)
func init() {
closeFunc := poll.CloseFunc
poll.CloseFunc = func(fd int) (err error) {
if v := closeHook.Load(); v != nil {
if hook := v.(func(int)); hook != nil {
hook(fd)
}
}
return closeFunc(fd)
}
}
func TestSplicePipePool(t *testing.T) {
const N = 64
var (
p *poll.SplicePipe
ps []*poll.SplicePipe
allFDs []int
pendingFDs sync.Map // fd → struct{}{}
err error
)
closeHook.Store(func(fd int) { pendingFDs.Delete(fd) })
t.Cleanup(func() { closeHook.Store((func(int))(nil)) })
for i := 0; i < N; i++ {
p, _, err = poll.GetPipe()
if err != nil {
t.Skipf("failed to create pipe due to error(%v), skip this test", err)
}
_, pwfd := poll.GetPipeFds(p)
allFDs = append(allFDs, pwfd)
pendingFDs.Store(pwfd, struct{}{})
ps = append(ps, p)
}
for _, p = range ps {
poll.PutPipe(p)
}
ps = nil
p = nil
// Exploit the timeout of "go test" as a timer for the subsequent verification.
timeout := 5 * time.Minute
if deadline, ok := t.Deadline(); ok {
timeout = deadline.Sub(time.Now())
timeout -= timeout / 10 // Leave 10% headroom for cleanup.
}
expiredTime := time.NewTimer(timeout)
defer expiredTime.Stop()
// Trigger garbage collection repeatedly, waiting for all pipes in sync.Pool
// to either be deallocated and closed, or to time out.
for {
runtime.GC()
time.Sleep(10 * time.Millisecond)
// Detect whether all pipes are closed properly.
var leakedFDs []int
pendingFDs.Range(func(k, v any) bool {
leakedFDs = append(leakedFDs, k.(int))
return true
})
if len(leakedFDs) == 0 {
break
}
select {
case <-expiredTime.C:
t.Logf("all descriptors: %v", allFDs)
t.Fatalf("leaked descriptors: %v", leakedFDs)
default:
}
}
}
func BenchmarkSplicePipe(b *testing.B) {
b.Run("SplicePipeWithPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
p, _, err := poll.GetPipe()
if err != nil {
continue
}
poll.PutPipe(p)
}
})
b.Run("SplicePipeWithoutPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
p := poll.NewPipe()
if p == nil {
b.Skip("newPipe returned nil")
}
poll.DestroyPipe(p)
}
})
}
func BenchmarkSplicePipePoolParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
p, _, err := poll.GetPipe()
if err != nil {
continue
}
poll.PutPipe(p)
}
})
}
func BenchmarkSplicePipeNativeParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
p := poll.NewPipe()
if p == nil {
b.Skip("newPipe returned nil")
}
poll.DestroyPipe(p)
}
})
}
|