-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathbuffer.go
More file actions
167 lines (146 loc) · 3.65 KB
/
buffer.go
File metadata and controls
167 lines (146 loc) · 3.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package buffer
import (
"errors"
"io"
"time"
)
var (
// ErrTimeout indicates an operation has timed out.
ErrTimeout = errors.New("operation timed-out")
// ErrClosed indicates the buffer is closed and can no longer be used.
ErrClosed = errors.New("buffer is closed")
)
type (
// Buffer represents a data buffer that is asynchronously flushed, either manually or automatically.
Buffer[T any] struct {
io.Closer
flushFunc func([]T)
dataCh chan T
flushCh chan struct{}
closeCh chan struct{}
doneCh chan struct{}
options *Options
}
)
// New creates a new buffer instance with the provided flush function and options.
// It panics if provided with a nil flush function.
func New[T any](flushFunc func([]T), opts ...Option) *Buffer[T] {
if flushFunc == nil {
panic("flush function cannot be nil")
}
buffer := &Buffer[T]{
flushFunc: flushFunc,
dataCh: make(chan T),
flushCh: make(chan struct{}),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
options: resolveOptions(opts...),
}
go buffer.consume()
return buffer
}
// Push appends an item to the end of the buffer.
//
// It returns an ErrTimeout if it cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer[T]) Push(item T) error {
if buffer.closed() {
return ErrClosed
}
select {
case buffer.dataCh <- item:
return nil
case <-time.After(buffer.options.PushTimeout):
return ErrTimeout
}
}
// Flush outputs the buffer to a permanent destination.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer[T]) Flush() error {
if buffer.closed() {
return ErrClosed
}
select {
case buffer.flushCh <- struct{}{}:
return nil
case <-time.After(buffer.options.FlushTimeout):
return ErrTimeout
}
}
// Close flushes the buffer and prevents it from being further used.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has already been closed.
//
// An ErrTimeout can either mean that a flush could not be triggered, or it can
// mean that a flush was triggered but it has not finished yet. In any case it is
// safe to call Close again.
func (buffer *Buffer[T]) Close() error {
if buffer.closed() {
return ErrClosed
}
select {
case buffer.closeCh <- struct{}{}:
// noop
case <-time.After(buffer.options.CloseTimeout):
return ErrTimeout
}
select {
case <-buffer.doneCh:
close(buffer.dataCh)
close(buffer.flushCh)
close(buffer.closeCh)
return nil
case <-time.After(buffer.options.CloseTimeout):
return ErrTimeout
}
}
func (buffer *Buffer[T]) closed() bool {
select {
case <-buffer.doneCh:
return true
default:
return false
}
}
func (buffer *Buffer[T]) consume() {
count := 0
items := make([]T, buffer.options.Size)
mustFlush := false
ticker, stopTicker := newTicker(buffer.options.FlushInterval)
isOpen := true
for isOpen {
select {
case item := <-buffer.dataCh:
items[count] = item
count++
mustFlush = count >= len(items)
case <-ticker:
mustFlush = count > 0
case <-buffer.flushCh:
mustFlush = count > 0
case <-buffer.closeCh:
isOpen = false
mustFlush = count > 0
}
if mustFlush {
stopTicker()
buffer.flushFunc(items[:count])
count = 0
items = make([]T, buffer.options.Size)
mustFlush = false
ticker, stopTicker = newTicker(buffer.options.FlushInterval)
}
}
stopTicker()
close(buffer.doneCh)
}
func newTicker(interval time.Duration) (<-chan time.Time, func()) {
if interval == 0 {
return nil, func() {}
}
ticker := time.NewTicker(interval)
return ticker.C, ticker.Stop
}