1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
package periodic
import (
"context"
"sync"
"time"
)
// Option configures Start.
type Option interface {
apply(*periodic)
}
// Stopper implements the Stop method,
// which stops a periodic task from Start().
type Stopper interface {
Stop() // Stops a periodic task.
}
// Tick is the value for periodic task callbacks that
// contains the time of the tick and
// the time elapsed since the start of the periodic task.
type Tick struct {
Elapsed time.Duration
Time time.Time
}
// Immediate starts the periodic task immediately instead of after the first tick.
func Immediate() Option {
return optionFunc(func(p *periodic) {
p.immediate = true
})
}
// OnStop configures a callback that is executed when a periodic task is stopped or canceled.
func OnStop(f func(Tick)) Option {
return optionFunc(func(p *periodic) {
p.onStop = f
})
}
// Start starts a periodic task with a ticker at the specified interval,
// which executes the given callback after each tick.
// Pending tasks do not overlap, but could start immediately if
// the previous task(s) takes longer than the interval.
// Call Stop() on the return value in order to stop the ticker and to release associated resources.
// The interval must be greater than zero.
func Start(ctx context.Context, interval time.Duration, callback func(Tick), options ...Option) Stopper {
t := &periodic{
interval: interval,
callback: callback,
}
for _, option := range options {
option.apply(t)
}
ctx, cancelCtx := context.WithCancel(ctx)
start := time.Now()
go func() {
done := false
if !t.immediate {
select {
case <-time.After(interval):
case <-ctx.Done():
done = true
}
}
if !done {
ticker := time.NewTicker(t.interval)
defer ticker.Stop()
for tickTime := time.Now(); !done; {
t.callback(Tick{
Elapsed: tickTime.Sub(start),
Time: tickTime,
})
select {
case tickTime = <-ticker.C:
case <-ctx.Done():
done = true
}
}
}
if t.onStop != nil {
now := time.Now()
t.onStop(Tick{
Elapsed: now.Sub(start),
Time: now,
})
}
}()
return stoperFunc(func() {
t.stop.Do(cancelCtx)
})
}
type optionFunc func(*periodic)
func (f optionFunc) apply(p *periodic) {
f(p)
}
type stoperFunc func()
func (f stoperFunc) Stop() {
f()
}
type periodic struct {
interval time.Duration
callback func(Tick)
immediate bool
stop sync.Once
onStop func(Tick)
}
|