From 43a123c1ae6613b3efeed291fa552ecd909d3acf Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Tue, 16 Apr 2024 21:23:18 +0200 Subject: Adding upstream version 1.20.14. Signed-off-by: Daniel Baumann --- src/internal/poll/splice_linux_test.go | 136 +++++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 src/internal/poll/splice_linux_test.go (limited to 'src/internal/poll/splice_linux_test.go') diff --git a/src/internal/poll/splice_linux_test.go b/src/internal/poll/splice_linux_test.go new file mode 100644 index 0000000..29bcaab --- /dev/null +++ b/src/internal/poll/splice_linux_test.go @@ -0,0 +1,136 @@ +// Copyright 2021 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll_test + +import ( + "internal/poll" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +var closeHook atomic.Value // func(fd int) + +func init() { + closeFunc := poll.CloseFunc + poll.CloseFunc = func(fd int) (err error) { + if v := closeHook.Load(); v != nil { + if hook := v.(func(int)); hook != nil { + hook(fd) + } + } + return closeFunc(fd) + } +} + +func TestSplicePipePool(t *testing.T) { + const N = 64 + var ( + p *poll.SplicePipe + ps []*poll.SplicePipe + allFDs []int + pendingFDs sync.Map // fd → struct{}{} + err error + ) + + closeHook.Store(func(fd int) { pendingFDs.Delete(fd) }) + t.Cleanup(func() { closeHook.Store((func(int))(nil)) }) + + for i := 0; i < N; i++ { + p, _, err = poll.GetPipe() + if err != nil { + t.Skipf("failed to create pipe due to error(%v), skip this test", err) + } + _, pwfd := poll.GetPipeFds(p) + allFDs = append(allFDs, pwfd) + pendingFDs.Store(pwfd, struct{}{}) + ps = append(ps, p) + } + for _, p = range ps { + poll.PutPipe(p) + } + ps = nil + p = nil + + // Exploit the timeout of "go test" as a timer for the subsequent verification. + timeout := 5 * time.Minute + if deadline, ok := t.Deadline(); ok { + timeout = deadline.Sub(time.Now()) + timeout -= timeout / 10 // Leave 10% headroom for cleanup. + } + expiredTime := time.NewTimer(timeout) + defer expiredTime.Stop() + + // Trigger garbage collection repeatedly, waiting for all pipes in sync.Pool + // to either be deallocated and closed, or to time out. + for { + runtime.GC() + time.Sleep(10 * time.Millisecond) + + // Detect whether all pipes are closed properly. + var leakedFDs []int + pendingFDs.Range(func(k, v any) bool { + leakedFDs = append(leakedFDs, k.(int)) + return true + }) + if len(leakedFDs) == 0 { + break + } + + select { + case <-expiredTime.C: + t.Logf("all descriptors: %v", allFDs) + t.Fatalf("leaked descriptors: %v", leakedFDs) + default: + } + } +} + +func BenchmarkSplicePipe(b *testing.B) { + b.Run("SplicePipeWithPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + p, _, err := poll.GetPipe() + if err != nil { + continue + } + poll.PutPipe(p) + } + }) + b.Run("SplicePipeWithoutPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + p := poll.NewPipe() + if p == nil { + b.Skip("newPipe returned nil") + } + poll.DestroyPipe(p) + } + }) +} + +func BenchmarkSplicePipePoolParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p, _, err := poll.GetPipe() + if err != nil { + continue + } + poll.PutPipe(p) + } + }) +} + +func BenchmarkSplicePipeNativeParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p := poll.NewPipe() + if p == nil { + b.Skip("newPipe returned nil") + } + poll.DestroyPipe(p) + } + }) +} -- cgit v1.2.3