-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueueManager.go
More file actions
89 lines (77 loc) · 2.51 KB
/
queueManager.go
File metadata and controls
89 lines (77 loc) · 2.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package queue
import (
"sync"
"time"
"github.com/farseer-go/collections"
"github.com/farseer-go/fs/flog"
)
// 队列
// key = name
// value = 队列
var dicQueue collections.Dictionary[string, *queueManager]
// 队列列表
type queueManager struct {
// 当前队列名称
name string
// 全局队列
queue collections.ListAny
// 当前消费到的索引位置(如果是多个消费者,只记录最早的索引位置)
// 用于定时移除queue已被消费的数据,以节省内存空间
minOffset int
// 订阅者
subscribers collections.List[*subscriber]
// 读写锁
queueLock *sync.RWMutex
}
func newQueueManager(queueName string) *queueManager {
return &queueManager{
name: queueName,
minOffset: -1,
queue: collections.NewListAny(),
subscribers: collections.NewList[*subscriber](),
queueLock: &sync.RWMutex{},
}
}
// 定时检查一下队列的消费长度
func (receiver *queueManager) stat() {
for {
time.Sleep(MoveQueueInterval)
// 计算当前所有订阅者的最后消费的位置的最小值
if receiver.subscribers.Count() > 0 {
receiver.queueLock.Lock()
// 先校验所有订阅者的 offset 是否合法
// queueCount := receiver.queue.Count()
// for i := 0; i < receiver.subscribers.Count(); i++ {
// subscriber := receiver.subscribers.Index(i)
// if subscriber.offset >= queueCount {
// subscriber.offset = queueCount - 1
// }
// }
// 计算最小 offset
receiver.minOffset = receiver.subscribers.Min(func(item *subscriber) any {
return item.offset //return atomic.LoadInt64(&item.offset)
}).(int)
// 所有订阅者没有在执行的时候,做一次队列合并
if receiver.minOffset > -1 {
receiver.moveQueue()
}
receiver.queueLock.Unlock()
}
}
}
// 缩减使用过的队列
func (receiver *queueManager) moveQueue() {
// ⚡ 防御性检查:确保 minOffset 合法
if receiver.minOffset < 0 || receiver.minOffset >= receiver.queue.Count() {
flog.Warningf("本地Queue '%s' minOffset=%d 大于 整个queue的数量=%d,", receiver.name, receiver.minOffset, receiver.queue.Count())
return
}
// 裁剪队列,将头部已消费的移除
arr := receiver.queue.RangeStart(receiver.minOffset + 1).ToArray()
receiver.queue = collections.NewListAny(arr...)
// 设置每个订阅者的偏移量
for i := 0; i < receiver.subscribers.Count(); i++ {
//atomic.AddInt64(&receiver.subscribers.Index(i).offset, -int64(receiver.minOffset)-1)
receiver.subscribers.Index(i).offset -= receiver.minOffset + 1
}
}