-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsplit.go
More file actions
151 lines (115 loc) · 2.6 KB
/
split.go
File metadata and controls
151 lines (115 loc) · 2.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/go-redis/redis/v8"
"io/ioutil"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
type AppConfig struct {
Source RedisConfig `json:"source"`
Sinks []RedisConfig `json:"sinks"`
}
type RedisConfig struct {
Addr string `json:"addr"`
Password string `json:"password"`
Key string `json:"key"`
}
func load() (AppConfig, error) {
var appConfig AppConfig
// Load App Config
jsonFile, err := os.Open("config.json")
if err != nil {
return AppConfig{}, err
}
defer jsonFile.Close()
byteValue, _ := ioutil.ReadAll(jsonFile)
json.Unmarshal([]byte(byteValue), &appConfig)
return appConfig, nil
}
func main() {
var wg sync.WaitGroup
appConfig, err := load()
if err != nil {
fmt.Println(err)
return
}
// Create Sinks
// ------------
sinkConfigs := appConfig.Sinks
var sinkChans = make([]chan string, len(sinkConfigs))
var stopChans = make([]chan int, len(sinkConfigs)+1)
for i := 0; i < len(sinkConfigs); i++ {
wg.Add(1)
sinkChans[i] = make(chan string)
stopChans[i] = make(chan int)
sinkChan := sinkChans[i]
stopChan := stopChans[i]
sinkConf := sinkConfigs[i]
go func(data chan string, control chan int, wg *sync.WaitGroup) {
rdb := redis.NewClient(&redis.Options{
Addr: sinkConf.Addr,
Password: sinkConf.Password,
})
for {
select {
case item := <-data:
rdb.LPush(context.TODO(), sinkConf.Key, item)
case <-control:
wg.Done()
return
}
}
}(sinkChan, stopChan, &wg)
}
// Create Source
// -------------
wg.Add(1)
stopChans[len(sinkConfigs)] = make(chan int)
sourceStop := stopChans[len(sinkConfigs)]
go func(appConfig AppConfig, sinkChans []chan string, sourceStop chan int) {
rdb := redis.NewClient(&redis.Options{
Addr: appConfig.Source.Addr,
Password: appConfig.Source.Password,
})
_, err = rdb.Ping(context.TODO()).Result()
if err != nil {
fmt.Println(err)
return
}
timeout, _ := time.ParseDuration("1s")
sourceKey := appConfig.Source.Key
for {
select {
case <-sourceStop:
wg.Done()
return
default:
item, _ := rdb.BRPop(context.TODO(), timeout, sourceKey).Result()
if len(item) < 2 {
continue
}
for i := range sinkChans {
sinkChan := sinkChans[i]
sinkChan <- item[1]
}
}
}
}(appConfig, sinkChans, sourceStop)
// Cleanup Routine
// ---------------
control := make(chan os.Signal)
signal.Notify(control, os.Interrupt, syscall.SIGTERM)
go func(stopChans []chan int) {
<-control
for i := range stopChans {
stopChans[i] <- 0
}
}(stopChans)
wg.Wait()
}