From 0b1e61e1eee742a3848a2f3eeccb32e41d2d8c21 Mon Sep 17 00:00:00 2001 From: Rico Date: Sun, 3 Aug 2025 19:12:26 +0200 Subject: [PATCH 01/18] docs: fix typo in docker-compose config --- docker/docker-compose.metrics.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/docker-compose.metrics.yml b/docker/docker-compose.metrics.yml index 316a8ec..301a165 100644 --- a/docker/docker-compose.metrics.yml +++ b/docker/docker-compose.metrics.yml @@ -1,6 +1,6 @@ version: '2' -# Make sure to create the sub directories "prometheus", "prometheus_data", "grafana", "grafana_data" and "certstream" +# Make sure to create the subdirectories "prometheus", "prometheus_data", "grafana", "grafana_data" and "certstream" # and create the config files for all three services. For further details please refer to https://github.com/d-Rickyy-b/certstream-server-go/wiki/Collecting-and-Visualizing-Metrics networks: From b86c2a32adbe6e7a9be9187c46364d5913570045 Mon Sep 17 00:00:00 2001 From: Rico Date: Mon, 4 Aug 2025 01:57:24 +0200 Subject: [PATCH 02/18] feat: first implementation for streaming proessors such as kafka Relates to #23 and #35 --- config.sample.yaml | 13 +- go.mod | 1 + go.sum | 96 ++++++++++++-- internal/broadcast/baseclient.go | 48 +++++++ .../{web => broadcast}/broadcastmanager.go | 82 +++++++----- internal/broadcast/kafkaclient.go | 122 ++++++++++++++++++ .../websocketclient.go} | 46 +++---- .../certificatetransparency/ct-watcher.go | 3 +- .../certificatetransparency/logmetrics.go | 2 + internal/certstream/certstream.go | 19 ++- internal/config/config.go | 23 +++- internal/metrics/prometheus.go | 10 +- internal/web/examplecert.go | 2 +- internal/web/server.go | 25 ++-- 14 files changed, 389 insertions(+), 103 deletions(-) create mode 100644 internal/broadcast/baseclient.go rename internal/{web => broadcast}/broadcastmanager.go (52%) create mode 100644 internal/broadcast/kafkaclient.go rename internal/{web/client.go => broadcast/websocketclient.go} (80%) diff --git a/config.sample.yaml b/config.sample.yaml index c442d9c..cf3e395 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -22,6 +22,14 @@ prometheus: whitelist: - "127.0.0.1/8" +# Configuration related to external stream processing tools go here. +streamprocessing: + kafka: + enabled: true + server_address: "127.0.0.1" + server_port: 9092 + topic: "certstream" + general: # DisableDefaultLogs indicates whether the default logs used in Google Chrome and provided by Google should be disabled. disable_default_logs: false @@ -39,9 +47,8 @@ general: websocket: 300 # Buffer for each CT log connection ctlog: 1000 - # Combined buffer for the broadcast manager - broadcastmanager: 10000 - + # Combined buffer for the cert dispatcher + dispatcher: 10000 # Google regularly updates the log list. If this option is set to true, the server will remove all logs no longer listed in the Google log list. # This option defaults to true. See https://github.com/d-Rickyy-b/certstream-server-go/issues/51 drop_old_logs: true diff --git a/go.mod b/go.mod index d121c0a..a38a885 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/go-chi/chi/v5 v5.2.3 github.com/google/certificate-transparency-go v1.3.2 github.com/gorilla/websocket v1.5.3 + github.com/segmentio/kafka-go v0.4.48 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index f0ce5f4..f7e46ad 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,15 @@ -github.com/VictoriaMetrics/metrics v1.40.1 h1:FrF5uJRpIVj9fayWcn8xgiI+FYsKGMslzPuOXjdeyR4= +github.com/VictoriaMetrics/metrics v1.35.2 h1:Bj6L6ExfnakZKYPpi7mGUnkJP4NGQz2v5wiChhXNyWQ= +github.com/VictoriaMetrics/metrics v1.35.2/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= github.com/VictoriaMetrics/metrics v1.40.1/go.mod h1:XE4uudAAIRaJE614Tl5HMrtoEU6+GDZO4QTnNSsZRuA= -github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-chi/chi/v5 v5.2.1 h1:KOIHODQj58PmL80G2Eak4WdvUzjSJSm0vG72crDCqb8= +github.com/go-chi/chi/v5 v5.2.1/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= -github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= @@ -14,30 +21,97 @@ github.com/google/trillian v1.7.2 h1:EPBxc4YWY4Ak8tcuhyFleY+zYlbCDCa4Sn24e1Ka8Js github.com/google/trillian v1.7.2/go.mod h1:mfQJW4qRH6/ilABtPYNBerVJAJ/upxHLX81zxNQw05s= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/kafka-go v0.4.48 h1:9jyu9CWK4W5W+SroCe8EffbrRZVqAOkuaLd/ApID4Vs= +github.com/segmentio/kafka-go v0.4.48/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= -golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= +golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= -golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= +golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= -golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= -golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9 h1:V1jCN2HBa8sySkR5vLcCSqJSTMv093Rw9EJefhQGP7M= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f h1:N/PrbTw4kdkqNRzVfWPrBekzLuarFREcbFOiOLkXon4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9/go.mod h1:HSkG/KdJWusxU1F6CNrwNDjBMgisKxGnc5dAZfT0mjQ= -google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI= +google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM= +google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= -google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= diff --git a/internal/broadcast/baseclient.go b/internal/broadcast/baseclient.go new file mode 100644 index 0000000..9b69e3e --- /dev/null +++ b/internal/broadcast/baseclient.go @@ -0,0 +1,48 @@ +package broadcast + +import "log" + +// BaseClient defines the basic structure for a client that can receive broadcast messages. +// Other client types can embed this struct to inherit its functionality. +type BaseClient struct { + broadcastChan chan []byte + stopChan chan struct{} + name string + subType SubscriptionType + skippedCerts uint64 +} + +// Close cleans up the client's resources by closing the stop and broadcast channels. +func (c *BaseClient) Close() { + close(c.stopChan) + close(c.broadcastChan) +} + +// Name returns the name of the client. +func (c *BaseClient) Name() string { + return c.name +} + +// SubType returns the subscription type of the client. +func (c *BaseClient) SubType() SubscriptionType { + return c.subType +} + +// SkippedCerts returns the number of certificates that were skipped due to the client's broadcast channel being full. +func (c *BaseClient) SkippedCerts() uint64 { + return c.skippedCerts +} + +// Write sends a message to the client's broadcast channel. +// If the channel is full, it increments the skippedCerts counter and logs a message. +func (c *BaseClient) Write(data []byte) { + select { + case c.broadcastChan <- data: + default: + // Default case is executed if the client's broadcast channel is full. + c.skippedCerts++ + if c.skippedCerts%1000 == 1 { + log.Printf("Not providing client '%s' with cert because client's buffer is full. The client can't keep up. Skipped certs: %d\n", c.name, c.skippedCerts) + } + } +} diff --git a/internal/web/broadcastmanager.go b/internal/broadcast/broadcastmanager.go similarity index 52% rename from internal/web/broadcastmanager.go rename to internal/broadcast/broadcastmanager.go index 18382be..2a9d600 100644 --- a/internal/web/broadcastmanager.go +++ b/internal/broadcast/broadcastmanager.go @@ -1,34 +1,48 @@ -package web +package broadcast import ( "log" "sync" + "github.com/d-Rickyy-b/certstream-server-go/internal/config" "github.com/d-Rickyy-b/certstream-server-go/internal/models" ) -type BroadcastManager struct { - Broadcast chan models.Entry - clients []*client - clientLock sync.RWMutex +// ClientHandler dispatches certificate entries to registered clients. +var ClientHandler *Dispatcher + +type Dispatcher struct { + MessageQueue chan models.Entry + clients []CertProcessor + clientLock sync.RWMutex } -// registerClient adds a client to the list of clients of the BroadcastManager. +// NewDispatcher creates a new Dispatcher instance and assigns it to the ClientHandler variable. +func NewDispatcher() *Dispatcher { + d := &Dispatcher{} + d.MessageQueue = make(chan models.Entry, config.AppConfig.General.BufferSizes.Dispatcher) + ClientHandler = d + + return d +} + +// RegisterClient adds a client to the list of clients of the Dispatcher. // The client will receive certificate broadcasts right after registration. -func (bm *BroadcastManager) registerClient(c *client) { +func (bm *Dispatcher) RegisterClient(c CertProcessor) { + // TODO: check if the client is already registered bm.clientLock.Lock() bm.clients = append(bm.clients, c) - log.Printf("Clients: %d, Capacity: %d\n", len(bm.clients), cap(bm.clients)) + log.Printf("Added new client. Clients: %d, Capacity: %d\n", len(bm.clients), cap(bm.clients)) bm.clientLock.Unlock() } -// unregisterClient removes a client from the list of clients of the BroadcastManager. +// UnregisterClient removes a client from the list of clients of the Dispatcher. // The client will no longer receive certificate broadcasts right after unregistering. -func (bm *BroadcastManager) unregisterClient(c *client) { +func (bm *Dispatcher) UnregisterClient(clientName string) { bm.clientLock.Lock() for i, client := range bm.clients { - if c == client { + if clientName == client.Name() { // Copy the last element of the slice to the position of the removed element // Then remove the last element by re-slicing bm.clients[i] = bm.clients[len(bm.clients)-1] @@ -36,7 +50,7 @@ func (bm *BroadcastManager) unregisterClient(c *client) { bm.clients = bm.clients[:len(bm.clients)-1] // Close the broadcast channel of the client, otherwise this leads to a memory leak - close(c.broadcastChan) + client.Close() break } @@ -46,28 +60,28 @@ func (bm *BroadcastManager) unregisterClient(c *client) { } // ClientFullCount returns the current number of clients connected to the service on the `full` endpoint. -func (bm *BroadcastManager) ClientFullCount() (count int64) { +func (bm *Dispatcher) ClientFullCount() (count int64) { return bm.clientCountByType(SubTypeFull) } // ClientLiteCount returns the current number of clients connected to the service on the `lite` endpoint. -func (bm *BroadcastManager) ClientLiteCount() (count int64) { +func (bm *Dispatcher) ClientLiteCount() (count int64) { return bm.clientCountByType(SubTypeLite) } // ClientDomainsCount returns the current number of clients connected to the service on the `domains-only` endpoint. -func (bm *BroadcastManager) ClientDomainsCount() (count int64) { +func (bm *Dispatcher) ClientDomainsCount() (count int64) { return bm.clientCountByType(SubTypeDomain) } // clientCountByType returns the current number of clients connected to the service on the endpoint matching // the specified SubscriptionType. -func (bm *BroadcastManager) clientCountByType(subType SubscriptionType) (count int64) { +func (bm *Dispatcher) clientCountByType(subType SubscriptionType) (count int64) { bm.clientLock.RLock() defer bm.clientLock.RUnlock() for _, c := range bm.clients { - if c.subType == subType { + if c.SubType() == subType { count++ } } @@ -75,24 +89,23 @@ func (bm *BroadcastManager) clientCountByType(subType SubscriptionType) (count i return count } -func (bm *BroadcastManager) GetSkippedCerts() map[string]uint64 { +// GetSkippedCerts returns a map of client names to the number of skipped certificates for each client. +func (bm *Dispatcher) GetSkippedCerts() map[string]uint64 { bm.clientLock.RLock() defer bm.clientLock.RUnlock() skippedCerts := make(map[string]uint64, len(bm.clients)) for _, c := range bm.clients { - skippedCerts[c.name] = c.skippedCerts + skippedCerts[c.Name()] = c.SkippedCerts() } return skippedCerts } -// broadcaster is run in a goroutine and handles the dispatching of entries to clients. -func (bm *BroadcastManager) broadcaster() { +// broadcaster is run in a goroutine and handles the dispatching of certs to clients. +func (bm *Dispatcher) broadcaster() { for { - var data []byte - - entry := <-bm.Broadcast + entry := <-bm.MessageQueue dataLite := entry.JSONLite() dataFull := entry.JSON() dataDomain := entry.JSONDomains() @@ -100,7 +113,7 @@ func (bm *BroadcastManager) broadcaster() { bm.clientLock.RLock() for _, c := range bm.clients { - switch c.subType { + switch c.SubType() { case SubTypeLite: data = dataLite case SubTypeFull: @@ -108,21 +121,20 @@ func (bm *BroadcastManager) broadcaster() { case SubTypeDomain: data = dataDomain default: - log.Printf("Unknown subscription type '%d' for client '%s'. Skipping this client!\n", c.subType, c.name) + // This should never happen, but if it does, we log it and skip the client. + log.Printf("Unknown subscription type '%d' for client '%s'. Skipping this client!\n", c.SubType(), c.Name()) continue } - select { - case c.broadcastChan <- data: - default: - // Default case is executed if the client's broadcast channel is full. - c.skippedCerts++ - if c.skippedCerts%1000 == 1 { - log.Printf("Not providing client '%s' with cert because client's buffer is full. The client can't keep up. Skipped certs: %d\n", c.name, c.skippedCerts) - } - } + c.Write(data) } bm.clientLock.RUnlock() } } + +// Start starts the broadcaster goroutine. +func (bm *Dispatcher) Start() { + go bm.broadcaster() + log.Println("Dispatcher started. Listening for certificate entries...") +} diff --git a/internal/broadcast/kafkaclient.go b/internal/broadcast/kafkaclient.go new file mode 100644 index 0000000..b89b845 --- /dev/null +++ b/internal/broadcast/kafkaclient.go @@ -0,0 +1,122 @@ +package broadcast + +import ( + "context" + "log" + "net" + "strconv" + "time" + + "github.com/d-Rickyy-b/certstream-server-go/internal/config" + + "github.com/segmentio/kafka-go" +) + +const ( + TOPIC = "certstream" +) + +// KafkaClient connects to a Kafka server in order to provide it with certificates. +type KafkaClient struct { + conn *kafka.Conn // Kafka connection + addr string + isConnected bool + BaseClient +} + +// NewKafkaClient creates a new Kafka client that immediately connects to the configured Kafka server. +func NewKafkaClient(subType SubscriptionType, name string, certBufferSize int) *KafkaClient { + addr := net.JoinHostPort(config.AppConfig.StreamProcessing.Kafka.ServerAddr, strconv.Itoa(config.AppConfig.StreamProcessing.Kafka.ServerPort)) + + // Connect to the Kafka server + // TODO make topic configurable + conn, err := kafka.DialLeader(context.Background(), "tcp", addr, TOPIC, 0) + if err != nil { + log.Println("failed to connect to kafka:", err) + } + + kc := &KafkaClient{ + conn: conn, + addr: addr, + BaseClient: BaseClient{ + broadcastChan: make(chan []byte, certBufferSize), + stopChan: make(chan struct{}), + name: name, + subType: subType, + }, + } + + go kc.broadcastHandler() + go kc.reconnectHandler() + + return kc +} + +// reconnectHandler is a background job that attempts to reconnect to the Kafka server if the connection is lost. +func (c *KafkaClient) reconnectHandler() { + for { + select { + case <-c.stopChan: + log.Println("Stopping reconnectHandler for kafka producer:", c.addr) + c.conn.Close() + + return + default: + if c.isConnected { + // If already connected or no connection exists, skip reconnection + time.Sleep(5 * time.Second) + continue + } + conn, err := kafka.DialLeader(context.Background(), "tcp", c.addr, TOPIC, 0) + if err != nil { + log.Printf("Reconnect failed: %v. Retrying in 5s...", err) + time.Sleep(5 * time.Second) + + continue + } + // Close old connection if exists + if c.conn != nil { + _ = c.conn.Close() + } + c.conn = conn + c.isConnected = true + log.Println("Reconnected to Kafka at", c.addr) + } + } +} + +// Each client has a broadcastHandler that runs in the background and sends out the broadcast messages to the client. +func (c *KafkaClient) broadcastHandler() { + writeWait := 60 * time.Second + + defer func() { + log.Println("Closing broadcast handler for kafka producer:", c.addr) + if err := c.conn.Close(); err != nil { + log.Println("failed to close writer:", err) + } + + ClientHandler.UnregisterClient(c.name) + }() + + for { + select { + case <-c.stopChan: + return + case message := <-c.broadcastChan: + if !c.isConnected { + continue + } + + _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + + c.conn.Broker() + _, err := c.conn.WriteMessages( + kafka.Message{Value: message}, + ) + if err != nil { + c.isConnected = false + log.Println("Failed to write messages to kafka:", err) + } + } + } +} diff --git a/internal/web/client.go b/internal/broadcast/websocketclient.go similarity index 80% rename from internal/web/client.go rename to internal/broadcast/websocketclient.go index 464d6c0..ef083ff 100644 --- a/internal/web/client.go +++ b/internal/broadcast/websocketclient.go @@ -1,4 +1,4 @@ -package web +package broadcast import ( "log" @@ -8,34 +8,30 @@ import ( "github.com/gorilla/websocket" ) -const ( - SubTypeFull SubscriptionType = iota - SubTypeLite - SubTypeDomain -) - -type SubscriptionType int - -// client represents a single client's connection to the server. -type client struct { - conn *websocket.Conn - broadcastChan chan []byte - name string - subType SubscriptionType - skippedCerts uint64 +// WebsocketClient represents a single WebSocket client's connection to the server. +type WebsocketClient struct { + conn *websocket.Conn + *BaseClient } -func newClient(conn *websocket.Conn, subType SubscriptionType, name string, certBufferSize int) *client { - return &client{ - conn: conn, - broadcastChan: make(chan []byte, certBufferSize), - name: name, - subType: subType, +// NewWebsocketClient creates a new WebSocket client from the given connection. +func NewWebsocketClient(conn *websocket.Conn, subType SubscriptionType, name string, certBufferSize int) *WebsocketClient { + c := &WebsocketClient{ + conn: conn, + BaseClient: &BaseClient{ + broadcastChan: make(chan []byte, certBufferSize), + name: name, + subType: subType, + }, } + go c.broadcastHandler() + go c.listenWebsocket() + + return c } // Each client has a broadcastHandler that runs in the background and sends out the broadcast messages to the client. -func (c *client) broadcastHandler() { +func (c *WebsocketClient) broadcastHandler() { writeWait := 60 * time.Second pingTicker := time.NewTicker(30 * time.Second) @@ -82,10 +78,10 @@ func (c *client) broadcastHandler() { // listenWebsocket is running in the background on a goroutine and listens for messages from the client. // It responds to ping messages with a pong message. It closes the connection if the client sends // a close message or no ping is received within 65 seconds. -func (c *client) listenWebsocket() { +func (c *WebsocketClient) listenWebsocket() { defer func() { _ = c.conn.Close() - ClientHandler.unregisterClient(c) + ClientHandler.UnregisterClient(c.name) }() readWait := 65 * time.Second diff --git a/internal/certificatetransparency/ct-watcher.go b/internal/certificatetransparency/ct-watcher.go index 8895cbd..86b21fd 100644 --- a/internal/certificatetransparency/ct-watcher.go +++ b/internal/certificatetransparency/ct-watcher.go @@ -13,6 +13,7 @@ import ( "sync/atomic" "time" + "github.com/d-Rickyy-b/certstream-server-go/internal/broadcast" "github.com/d-Rickyy-b/certstream-server-go/internal/config" "github.com/d-Rickyy-b/certstream-server-go/internal/models" "github.com/d-Rickyy-b/certstream-server-go/internal/web" @@ -471,7 +472,7 @@ func certHandler(entryChan chan models.Entry) { } // Run json encoding in the background and send the result to the clients. - web.ClientHandler.Broadcast <- entry + broadcast.ClientHandler.MessageQueue <- entry // Update metrics url := entry.Data.Source.NormalizedURL diff --git a/internal/certificatetransparency/logmetrics.go b/internal/certificatetransparency/logmetrics.go index 064f263..7ecc2a7 100644 --- a/internal/certificatetransparency/logmetrics.go +++ b/internal/certificatetransparency/logmetrics.go @@ -273,10 +273,12 @@ func GetProcessedPrecerts() int64 { return processedPrecerts } +// GetCertMetrics returns a copy of the internal metrics map. func GetCertMetrics() CTMetrics { return metrics.GetCTMetrics() } +// GetLogOperators returns a map of operator names to a list of CT logs. func GetLogOperators() map[string][]string { return metrics.OperatorLogMapping() } diff --git a/internal/certstream/certstream.go b/internal/certstream/certstream.go index 6ade396..13fc238 100644 --- a/internal/certstream/certstream.go +++ b/internal/certstream/certstream.go @@ -10,6 +10,7 @@ import ( "os/signal" "syscall" + "github.com/d-Rickyy-b/certstream-server-go/internal/broadcast" "github.com/d-Rickyy-b/certstream-server-go/internal/certificatetransparency" "github.com/d-Rickyy-b/certstream-server-go/internal/config" "github.com/d-Rickyy-b/certstream-server-go/internal/metrics" @@ -35,6 +36,11 @@ func NewCertstreamServer(config config.Config) (*Certstream, error) { cs := Certstream{} cs.config = config + // Start the broadcast dispatcher + broadcast.NewDispatcher() + broadcast.ClientHandler.Start() + + // TODO: add support do disable websocket Server // Initialize the webserver used for the websocket server webserver := web.NewWebsocketServer(config.Webserver.ListenAddr, config.Webserver.ListenPort, config.Webserver.CertPath, config.Webserver.CertKeyPath) cs.webserver = webserver @@ -42,6 +48,13 @@ func NewCertstreamServer(config config.Config) (*Certstream, error) { // Setup metrics server cs.setupMetrics(webserver) + if config.StreamProcessing.Kafka.Enabled { + log.Println("Initializing Kafka client...") + + kc := broadcast.NewKafkaClient(broadcast.SubTypeFull, "kafka-producer", config.General.BufferSizes.Websocket) + broadcast.ClientHandler.RegisterClient(kc) + } + return &cs, nil } @@ -65,7 +78,11 @@ func (cs *Certstream) setupMetrics(webserver *web.WebServer) { webserver.RegisterPrometheus(cs.config.Prometheus.MetricsURL, metrics.WritePrometheus) } else { log.Println("Starting prometheus server on new interface") - cs.metricsServer = web.NewMetricsServer(cs.config.Prometheus.ListenAddr, cs.config.Prometheus.ListenPort, cs.config.Prometheus.CertPath, cs.config.Prometheus.CertKeyPath) + cs.metricsServer = web.NewMetricsServer( + cs.config.Prometheus.ListenAddr, + cs.config.Prometheus.ListenPort, + cs.config.Prometheus.CertPath, + cs.config.Prometheus.CertKeyPath) cs.metricsServer.RegisterPrometheus(cs.config.Prometheus.MetricsURL, metrics.WritePrometheus) } } diff --git a/internal/config/config.go b/internal/config/config.go index 24dd3fb..4ad58c0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -32,8 +32,10 @@ type LogConfig struct { } type BufferSizes struct { - Websocket int `yaml:"websocket"` - CTLog int `yaml:"ctlog"` + Websocket int `yaml:"websocket"` + CTLog int `yaml:"ctlog"` + Dispatcher int `yaml:"dispatcher"` + // Deprecated: BroadcastManager was renamed to Dispatcher. BroadcastManager int `yaml:"broadcastmanager"` } @@ -51,6 +53,14 @@ type Config struct { MetricsURL string `yaml:"metrics_url"` ExposeSystemMetrics bool `yaml:"expose_system_metrics"` } + StreamProcessing struct { + Kafka struct { + Enabled bool `yaml:"enabled"` + ServerAddr string `yaml:"server_addr"` + ServerPort int `yaml:"server_port"` + Topic string `yaml:"topic"` + } + } General struct { // DisableDefaultLogs indicates whether the default logs used in Google Chrome and provided by Google should be disabled. DisableDefaultLogs bool `yaml:"disable_default_logs"` @@ -235,8 +245,13 @@ func validateConfig(config *Config) bool { config.General.BufferSizes.CTLog = 1000 } - if config.General.BufferSizes.BroadcastManager <= 0 { - config.General.BufferSizes.BroadcastManager = 10000 + // For backward compatibility, copy value from deprecated BroadcastManager field + if config.General.BufferSizes.BroadcastManager != 0 { + config.General.BufferSizes.Dispatcher = config.General.BufferSizes.BroadcastManager + } + + if config.General.BufferSizes.Dispatcher <= 0 { + config.General.BufferSizes.Dispatcher = 10000 } // If the cleanup flag is not set, default to true diff --git a/internal/metrics/prometheus.go b/internal/metrics/prometheus.go index aad8d2e..ac25f6d 100644 --- a/internal/metrics/prometheus.go +++ b/internal/metrics/prometheus.go @@ -7,8 +7,8 @@ import ( "sync" "time" + "github.com/d-Rickyy-b/certstream-server-go/internal/broadcast" "github.com/d-Rickyy-b/certstream-server-go/internal/certificatetransparency" - "github.com/d-Rickyy-b/certstream-server-go/internal/web" "github.com/VictoriaMetrics/metrics" ) @@ -22,13 +22,13 @@ var ( // Number of currently connected clients. fullClientCount = metrics.NewGauge("certstreamservergo_clients_total{type=\"full\"}", func() float64 { - return float64(web.ClientHandler.ClientFullCount()) + return float64(broadcast.ClientHandler.ClientFullCount()) }) liteClientCount = metrics.NewGauge("certstreamservergo_clients_total{type=\"lite\"}", func() float64 { - return float64(web.ClientHandler.ClientLiteCount()) + return float64(broadcast.ClientHandler.ClientLiteCount()) }) domainClientCount = metrics.NewGauge("certstreamservergo_clients_total{type=\"domain\"}", func() float64 { - return float64(web.ClientHandler.ClientDomainsCount()) + return float64(broadcast.ClientHandler.ClientDomainsCount()) }) // Number of certificates processed by the CT watcher. @@ -90,7 +90,7 @@ func getCertCountForLog(operatorName, logname string) int64 { // getSkippedCertMetrics gets the number of skipped certificates for each client and creates metrics for it. // It also removes metrics for clients that are not connected anymore. func getSkippedCertMetrics() { - skippedCerts := web.ClientHandler.GetSkippedCerts() + skippedCerts := broadcast.ClientHandler.GetSkippedCerts() for clientName := range skippedCerts { // Get or register a new counter for each client metricName := fmt.Sprintf("certstreamservergo_skipped_certs{client=\"%s\"}", clientName) diff --git a/internal/web/examplecert.go b/internal/web/examplecert.go index e841323..05576f3 100644 --- a/internal/web/examplecert.go +++ b/internal/web/examplecert.go @@ -29,7 +29,7 @@ func exampleDomains(w http.ResponseWriter, _ *http.Request) { w.Write(exampleCert.JSONDomains()) //nolint:errcheck } -// SetExampleCert sets one certificate as the example Cert that is returned by the example endpoints. +// SetExampleCert sets the example certificate to be used in the example endpoints. func SetExampleCert(cert models.Entry) { exampleCert = cert } diff --git a/internal/web/server.go b/internal/web/server.go index 50a55b6..50660c6 100644 --- a/internal/web/server.go +++ b/internal/web/server.go @@ -21,8 +21,7 @@ import ( ) var ( - ClientHandler = BroadcastManager{} - upgrader websocket.Upgrader + upgrader websocket.Upgrader ) // WebServer is a struct that holds the necessary information to run a webserver. @@ -62,7 +61,6 @@ func IPWhitelist(whitelist []string) func(next http.Handler) http.Handler { continue } - ipList = append(ipList, ip) continue @@ -119,7 +117,7 @@ func initFullWebsocket(w http.ResponseWriter, r *http.Request) { return } - setupClient(connection, SubTypeFull, r.RemoteAddr) + setupClient(connection, broadcast.SubTypeFull, r.RemoteAddr) } // initLiteWebsocket is called when a client connects to the / endpoint. @@ -131,7 +129,7 @@ func initLiteWebsocket(w http.ResponseWriter, r *http.Request) { return } - setupClient(connection, SubTypeLite, r.RemoteAddr) + setupClient(connection, broadcast.SubTypeLite, r.RemoteAddr) } // initDomainWebsocket is called when a client connects to the /domains-only endpoint. @@ -143,7 +141,7 @@ func initDomainWebsocket(w http.ResponseWriter, r *http.Request) { return } - setupClient(connection, SubTypeDomain, r.RemoteAddr) + setupClient(connection, broadcast.SubTypeDomain, r.RemoteAddr) } // upgradeConnection upgrades the connection to a websocket and returns the connection. @@ -174,12 +172,9 @@ func upgradeConnection(w http.ResponseWriter, r *http.Request) (*websocket.Conn, } // setupClient initializes a client struct and starts the broadcastHandler and websocket listener. -func setupClient(connection *websocket.Conn, subscriptionType SubscriptionType, name string) { - c := newClient(connection, subscriptionType, name, config.AppConfig.General.BufferSizes.Websocket) - go c.broadcastHandler() - go c.listenWebsocket() - - ClientHandler.registerClient(c) +func setupClient(connection *websocket.Conn, subscriptionType broadcast.SubscriptionType, name string) { + c := broadcast.NewWebsocketClient(connection, subscriptionType, name, config.AppConfig.General.BufferSizes.Websocket) + broadcast.ClientHandler.RegisterClient(c) } // setupWebsocketRoutes configures all the routes necessary for the websocket webserver. @@ -255,8 +250,7 @@ func NewMetricsServer(networkIf string, port int, certPath, keyPath string) *Web } // NewWebsocketServer starts a new webserver and initialized it with the necessary routes. -// It also starts the broadcaster in ClientHandler as a background job and takes care of -// setting up websocket.Upgrader. +// It also takes care of setting up websocket.Upgrader. func NewWebsocketServer(networkIf string, port int, certPath, keyPath string) *WebServer { server := &WebServer{ networkIf: networkIf, @@ -286,9 +280,6 @@ func NewWebsocketServer(networkIf string, port int, certPath, keyPath string) *W setupWebsocketRoutes(server.routes) server.initServer() - ClientHandler.Broadcast = make(chan models.Entry, config.AppConfig.General.BufferSizes.BroadcastManager) - go ClientHandler.broadcaster() - return server } From 5396fa401bd1575a2016245cd03687091f302c67 Mon Sep 17 00:00:00 2001 From: Rico Date: Mon, 4 Aug 2025 01:57:53 +0200 Subject: [PATCH 03/18] chore: surround yaml strings by quotes --- docker/docker-compose.metrics.yml | 6 +++--- docker/docker-compose.yml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docker/docker-compose.metrics.yml b/docker/docker-compose.metrics.yml index 301a165..9c17a7f 100644 --- a/docker/docker-compose.metrics.yml +++ b/docker/docker-compose.metrics.yml @@ -30,7 +30,7 @@ services: ports: # Exposing Prometheus is NOT required, if you don't want to access it from outside the Docker network. # Using localhost enables you to use a reverse proxy (e.g. with basic auth) to access Prometheus in a more secure way. - - 127.0.0.1:9090:9090 + - "127.0.0.1:9090:9090" networks: - monitoring extra_hosts: @@ -44,7 +44,7 @@ services: depends_on: - prometheus ports: - - 127.0.0.1:8082:3000 + - "127.0.0.1:8082:3000" volumes: - ./grafana_data:/var/lib/grafana - ./grafana/provisioning/:/etc/grafana/provisioning/ @@ -60,7 +60,7 @@ services: # Configure the service to run as specific user. # user: "1000:1000" ports: - - 127.0.0.1:8080:80 + - "127.0.0.1:8080:80" # Don't forget to open the other port in case you run the Prometheus endpoint on another port than the websocket server. # - 127.0.0.1:8081:81 volumes: diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4edc2fe..1cdeef8 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -7,7 +7,7 @@ services: # Configure the service to run as specific user # user: "1000:1000" ports: - - 127.0.0.1:8080:80 + - "127.0.0.1:8080:80" # Don't forget to open the other port in case you run the Prometheus endpoint on another port than the websocket server. # - 127.0.0.1:8081:81 volumes: From 24a89d7e5f311c0c6b167f94f7b4b5397bf94bbc Mon Sep 17 00:00:00 2001 From: Rico Date: Mon, 4 Aug 2025 01:58:13 +0200 Subject: [PATCH 04/18] docs: add missing options to sample config --- config.sample.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/config.sample.yaml b/config.sample.yaml index cf3e395..dabf583 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -11,6 +11,11 @@ webserver: cert_path: "" cert_key_path: "" compression_enabled: false + # Use True-Client-IP, X-Real-IP or the X-Forwarded-For headers (in that order) to determine the real IP address of the client. + # If you are using a reverse proxy, you should set this to true. + real_ip: false + whitelist: + - "127.0.0.1/8" prometheus: enabled: true From 0316caf550cb1315b2c707030a1a4d2a28fe37a1 Mon Sep 17 00:00:00 2001 From: Rico Date: Mon, 4 Aug 2025 01:59:30 +0200 Subject: [PATCH 05/18] fix: add missing imports and sort them --- internal/web/server.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/web/server.go b/internal/web/server.go index 50660c6..3311c27 100644 --- a/internal/web/server.go +++ b/internal/web/server.go @@ -11,12 +11,11 @@ import ( "strconv" "time" - "github.com/go-chi/chi/v5" - "github.com/go-chi/chi/v5/middleware" - + "github.com/d-Rickyy-b/certstream-server-go/internal/broadcast" "github.com/d-Rickyy-b/certstream-server-go/internal/config" - "github.com/d-Rickyy-b/certstream-server-go/internal/models" + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" "github.com/gorilla/websocket" ) From de6860ddaf88b2d77810c3afcabdd839c96e111d Mon Sep 17 00:00:00 2001 From: Rico Date: Mon, 4 Aug 2025 01:59:52 +0200 Subject: [PATCH 06/18] fix: use context.TODO instead of nil --- internal/web/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/web/server.go b/internal/web/server.go index 3311c27..99e8fff 100644 --- a/internal/web/server.go +++ b/internal/web/server.go @@ -54,7 +54,7 @@ func IPWhitelist(whitelist []string) func(next http.Handler) http.Handler { for _, element := range whitelist { _, ipNet, err := net.ParseCIDR(element) if err != nil { - var ip net.IP + // If there is an error parsing the CIDR, it might be an IP address if ip = net.ParseIP(element); ip == nil { log.Println("Invalid IP in metrics whitelist: ", element) From 8d7fbb6600095dd23f1b46a0c4be406457cce039 Mon Sep 17 00:00:00 2001 From: Rico Date: Mon, 4 Aug 2025 02:00:17 +0200 Subject: [PATCH 07/18] ci: update goreleaser yaml config --- .github/goreleaser.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/goreleaser.yml b/.github/goreleaser.yml index 05d95ad..5a1352f 100644 --- a/.github/goreleaser.yml +++ b/.github/goreleaser.yml @@ -14,13 +14,13 @@ builds: - darwin - windows goarch: - - 386 + - "386" - amd64 - arm - arm64 ignore: - goos: darwin - goarch: 386 + goarch: "386" - goos: darwin goarch: arm - goos: windows @@ -28,7 +28,7 @@ builds: - goos: windows goarch: arm64 - goos: windows - goarch: 386 + goarch: "386" checksum: name_template: '{{.ProjectName}}_{{.Version}}_checksums.txt' changelog: From 940c7cafe6caebaeb764a9443403e5e7f2a8f298 Mon Sep 17 00:00:00 2001 From: Rico Date: Mon, 4 Aug 2025 02:00:48 +0200 Subject: [PATCH 08/18] ci: migrate golangci config to v2 --- .golangci.yml | 102 +++++++++++++++++++++++++------------------------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 4dceca4..7fa69e1 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,3 +1,5 @@ +version: "2" + # Options for analysis running. run: # The default concurrency value is the number of available CPU. @@ -32,8 +34,8 @@ linters: # Default: false # fast: true - # Disable all linters. - disable-all: true + # Disable all linters for now. + default: none # Enable specific linter # https://golangci-lint.run/usage/linters/#enabled-by-default @@ -41,7 +43,7 @@ linters: - bodyclose - containedctx # - deadcode - - depguard + # - depguard - dogsled - dupl - dupword @@ -50,23 +52,16 @@ linters: - errchkjson - errname - errorlint - - execinquery - exhaustive - - exportloopref - gochecknoinits - gocritic - godot # - godox - - gofmt - # - gofumpt - - goimports - # - golint # - gomnd - gomoddirectives - gomodguard - goprintffuncname - gosec - - gosimple - govet - grouper - importas @@ -91,9 +86,6 @@ linters: - reassign - revive - staticcheck - - stylecheck - - tenv - - typecheck - unconvert - unparam - unused @@ -102,43 +94,51 @@ linters: - wastedassign - whitespace - wrapcheck - - wsl + # - wsl_v5 + + exclusions: + rules: + # Exclude some linters from running on tests files. + - path: _test\.go + linters: + - nlreturn -linters-settings: - nlreturn: - # Size of the block (including return statement that is still "OK") so no return split required. - # Default: 1 - block-size: 2 - wsl: - enforce-err-cuddling: true - allow-cuddle-declarations: true - allow-assign-and-call: true - allow-cuddle-with-calls: - - log.Println - - log.Printf - - RLock - - RUnlock - - Lock - - Unlock - allow-assign-and-anything: true - gofumpt: - extra-rules: true - lll: - line-length: 160 - varnamelen: - ignore-decls: - - w io.Writer - - w io.WriteCloser - - w http.ResponseWriter - - r *http.Request - - r chi.Router - - r *chi.Mux - - i int + settings: + nlreturn: + # Size of the block (including return statement that is still "OK") so no return split required. + # Default: 1 + block-size: 2 + wsl: + force-err-cuddling: true + allow-cuddle-declarations: true + allow-assign-and-call: true + allow-cuddle-with-calls: + - log.Println + - log.Printf + - RLock + - RUnlock + - Lock + - Unlock + allow-assign-and-anything: true + lll: + line-length: 160 + varnamelen: + ignore-decls: + - w io.Writer + - w io.WriteCloser + - w http.ResponseWriter + - r *http.Request + - r chi.Router + - r *chi.Mux + - i int + +formatters: + enable: + # - gofmt + # - gofumpt + - goimports + # - golint -issues: - # List of regexps of issue texts to exclude. - exclude-rules: - # Exclude some linters from running on tests files. - - path: _test\.go - linters: - - nlreturn + settings: + gofumpt: + extra-rules: true From 56f6acf3a23e1a6f27a9584879fb2122a51636ae Mon Sep 17 00:00:00 2001 From: Rico Date: Mon, 4 Aug 2025 02:01:09 +0200 Subject: [PATCH 09/18] fix: properly cancel context --- cmd/certpicker/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/certpicker/main.go b/cmd/certpicker/main.go index 22cc530..3a58136 100644 --- a/cmd/certpicker/main.go +++ b/cmd/certpicker/main.go @@ -46,7 +46,9 @@ func main() { } // Get entries from CT log - c, _ := context.WithTimeout(context.Background(), 10*time.Second) + c, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + entries, getEntriesErr := jsonClient.GetRawEntries(c, certID, certID) if getEntriesErr != nil { log.Fatalln("Error getting entries from CT log: ", getEntriesErr) From e6948810337bf2b00ecd8bf0956295ab265da552 Mon Sep 17 00:00:00 2001 From: Rico Date: Mon, 4 Aug 2025 02:01:33 +0200 Subject: [PATCH 10/18] chore: add missing dependencies --- go.mod | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.mod b/go.mod index a38a885..f52242b 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,8 @@ require ( require ( github.com/go-logr/logr v1.4.3 // indirect github.com/google/trillian v1.7.2 // indirect + github.com/klauspost/compress v1.17.11 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/histogram v1.2.0 // indirect golang.org/x/crypto v0.42.0 // indirect From a54515cc5496ec947098f5032a8e43f72705d569 Mon Sep 17 00:00:00 2001 From: Rico Date: Mon, 4 Aug 2025 02:04:35 +0200 Subject: [PATCH 11/18] docs: add comments for config --- internal/config/config.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 4ad58c0..f7ae4a9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -66,8 +66,10 @@ type Config struct { DisableDefaultLogs bool `yaml:"disable_default_logs"` // AdditionalLogs contains additional logs provided by the user that can be used in addition to the default logs. AdditionalLogs []LogConfig `yaml:"additional_logs"` - BufferSizes BufferSizes `yaml:"buffer_sizes"` - DropOldLogs *bool `yaml:"drop_old_logs"` + // BufferSizes contains the buffer sizes for the different components of the server. They usually don't need any adjustments. + BufferSizes BufferSizes `yaml:"buffer_sizes"` + // DropOldLogs indicates whether downloading CT-Logs should start at the latest index (true) or should from the beginning (false). + DropOldLogs *bool `yaml:"drop_old_logs"` Recovery struct { Enabled bool `yaml:"enabled"` CTIndexFile string `yaml:"ct_index_file"` From 33fb23eaeb5ffb3f7a14f7f50d84c50a2e41bb17 Mon Sep 17 00:00:00 2001 From: Rico Date: Tue, 5 Aug 2025 17:37:09 +0200 Subject: [PATCH 12/18] feat: add certprocessor interface --- internal/broadcast/certprocessor.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 internal/broadcast/certprocessor.go diff --git a/internal/broadcast/certprocessor.go b/internal/broadcast/certprocessor.go new file mode 100644 index 0000000..d10d47a --- /dev/null +++ b/internal/broadcast/certprocessor.go @@ -0,0 +1,26 @@ +package broadcast + +// CertProcessor defines the interface for processing certificate messages. +// Different implementations can be used for different types of clients, such as WebSocket clients, kafka, or other message queues for example. +type CertProcessor interface { + // Write sends a message to the client's broadcast channel. + Write(message []byte) + // Close closes the client's connection and cleans up resources. + Close() + + // To obtain client details, there are getter methods for several values + Name() string + SubType() SubscriptionType + SkippedCerts() uint64 +} + +const ( + // SubTypeFull represents full certificate updates. + SubTypeFull SubscriptionType = iota + // SubTypeLite represents certificate updates with less details. + SubTypeLite + // SubTypeDomain represents updates that only include domain information. + SubTypeDomain +) + +type SubscriptionType int From 756d52ded7b3a9d1f8a8e088f29062a355c475ca Mon Sep 17 00:00:00 2001 From: Rico Date: Tue, 5 Aug 2025 18:08:46 +0200 Subject: [PATCH 13/18] feat: implement NSQ client refers to #23 and #35 I needed to change the config to an arrary in order to have proper support for multiple kinds of queues and stream processors. Currently NSQ and kafka are supported. Future releases could support amazon SNS/SQS by creating a new client in the broadcast package and by extending the config. --- config.sample.yaml | 12 ++- go.mod | 2 + go.sum | 4 + internal/broadcast/kafkaclient.go | 21 +++--- internal/broadcast/nsqclient.go | 119 ++++++++++++++++++++++++++++++ internal/certstream/certstream.go | 39 +++++++++- internal/config/config.go | 16 ++-- 7 files changed, 186 insertions(+), 27 deletions(-) create mode 100644 internal/broadcast/nsqclient.go diff --git a/config.sample.yaml b/config.sample.yaml index dabf583..7787d65 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -28,10 +28,16 @@ prometheus: - "127.0.0.1/8" # Configuration related to external stream processing tools go here. -streamprocessing: - kafka: +stream_processing: + - name: "kafka" + enabled: false + server_addr: "127.0.0.1" + server_port: 9092 + topic: "certstream" + + - name: "nqs" enabled: true - server_address: "127.0.0.1" + server_addr: "127.0.0.1" server_port: 9092 topic: "certstream" diff --git a/go.mod b/go.mod index f52242b..cabf194 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,14 @@ require ( github.com/go-chi/chi/v5 v5.2.3 github.com/google/certificate-transparency-go v1.3.2 github.com/gorilla/websocket v1.5.3 + github.com/nsqio/go-nsq v1.1.0 github.com/segmentio/kafka-go v0.4.48 gopkg.in/yaml.v3 v3.0.1 ) require ( github.com/go-logr/logr v1.4.3 // indirect + github.com/golang/snappy v0.0.1 // indirect github.com/google/trillian v1.7.2 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect diff --git a/go.sum b/go.sum index f7e46ad..2814adc 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/trillian v1.7.2 h1:EPBxc4YWY4Ak8tcuhyFleY+zYlbCDCa4Sn24e1Ka8Js= github.com/google/trillian v1.7.2/go.mod h1:mfQJW4qRH6/ilABtPYNBerVJAJ/upxHLX81zxNQw05s= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= @@ -26,6 +28,8 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/internal/broadcast/kafkaclient.go b/internal/broadcast/kafkaclient.go index b89b845..a10a938 100644 --- a/internal/broadcast/kafkaclient.go +++ b/internal/broadcast/kafkaclient.go @@ -3,12 +3,8 @@ package broadcast import ( "context" "log" - "net" - "strconv" "time" - "github.com/d-Rickyy-b/certstream-server-go/internal/config" - "github.com/segmentio/kafka-go" ) @@ -20,24 +16,23 @@ const ( type KafkaClient struct { conn *kafka.Conn // Kafka connection addr string + topic string isConnected bool BaseClient } // NewKafkaClient creates a new Kafka client that immediately connects to the configured Kafka server. -func NewKafkaClient(subType SubscriptionType, name string, certBufferSize int) *KafkaClient { - addr := net.JoinHostPort(config.AppConfig.StreamProcessing.Kafka.ServerAddr, strconv.Itoa(config.AppConfig.StreamProcessing.Kafka.ServerPort)) - +func NewKafkaClient(subType SubscriptionType, addr, name, topic string, certBufferSize int) *KafkaClient { // Connect to the Kafka server - // TODO make topic configurable - conn, err := kafka.DialLeader(context.Background(), "tcp", addr, TOPIC, 0) + conn, err := kafka.DialLeader(context.Background(), "tcp", addr, topic, 0) if err != nil { log.Println("failed to connect to kafka:", err) } kc := &KafkaClient{ - conn: conn, - addr: addr, + conn: conn, + addr: addr, + topic: topic, BaseClient: BaseClient{ broadcastChan: make(chan []byte, certBufferSize), stopChan: make(chan struct{}), @@ -67,7 +62,9 @@ func (c *KafkaClient) reconnectHandler() { time.Sleep(5 * time.Second) continue } - conn, err := kafka.DialLeader(context.Background(), "tcp", c.addr, TOPIC, 0) + + // Attempt to connect to the Kafka server + conn, err := kafka.DialLeader(context.Background(), "tcp", c.addr, c.topic, 0) if err != nil { log.Printf("Reconnect failed: %v. Retrying in 5s...", err) time.Sleep(5 * time.Second) diff --git a/internal/broadcast/nsqclient.go b/internal/broadcast/nsqclient.go new file mode 100644 index 0000000..56091ec --- /dev/null +++ b/internal/broadcast/nsqclient.go @@ -0,0 +1,119 @@ +package broadcast + +import ( + "github.com/nsqio/go-nsq" + + "log" + "time" +) + +// NSQClient connects to a NSQ server in order to provide it with certificates. +type NSQClient struct { + conn *nsq.Producer // Kafka connection + addr string + topic string + isConnected bool + BaseClient +} + +// nullLogger is a logger that does not output anything. +// It is used to silence the NSQ logger. +type nullLogger struct{} + +func (l nullLogger) Output(calldepth int, s string) error { + // Do nothing, effectively silencing the logger + return nil +} + +// NewNSQClient creates a new NSQ client that immediately connects to the configured NSQ server +func NewNSQClient(subType SubscriptionType, addr, name, topic string, certBufferSize int) *NSQClient { + log.Println("Initializing NSQ client...") + + // Instantiate a producer. + conf := nsq.NewConfig() + conn, err := nsq.NewProducer(addr, conf) + if err != nil { + log.Println(err) + } + + log.Println("Connected to NSQ server at", addr) + + // Silence log output from NSQ + conn.SetLogger(nullLogger{}, nsq.LogLevelError) + + nsqc := &NSQClient{ + conn: conn, + addr: addr, + topic: topic, + BaseClient: BaseClient{ + broadcastChan: make(chan []byte, certBufferSize), + stopChan: make(chan struct{}), + name: name, + subType: subType, + }, + } + nsqc.isConnected = true + + go nsqc.broadcastHandler() + go nsqc.reconnectHandler() + + return nsqc +} + +// reconnectHandler is a background job that attempts to reconnect to the Kafka server if the connection is lost. +func (c *NSQClient) reconnectHandler() { + for { + select { + case <-c.stopChan: + log.Println("Stopping reconnectHandler for kafka producer:", c.addr) + return + default: + if c.isConnected { + // If already connected or no connection exists, skip reconnection + time.Sleep(5 * time.Second) + continue + } + // Attempt to connect to the NSQ server + err := c.conn.Ping() + if err != nil { + log.Printf("Reconnect to NSQ server failed: '%v'. Retrying in 5s...", err) + time.Sleep(5 * time.Second) + + continue + } + + c.isConnected = true + log.Println("Reconnected to NSQ server at", c.addr) + } + } +} + +// Each client has a broadcastHandler that runs in the background and sends out the broadcast messages to the client. +func (c *NSQClient) broadcastHandler() { + // writeWait := 60 * time.Second + + defer func() { + log.Println("Closing broadcast handler for nsq producer:", c.addr) + // Gracefully stop the producer when appropriate (e.g. before shutting down the service) + c.conn.Stop() + }() + + for { + select { + case <-c.stopChan: + return + case message := <-c.broadcastChan: + if !c.isConnected { + continue + } + + // Synchronously publish a single message to the specified topic. + // Messages can also be sent asynchronously and/or in batches. + err := c.conn.Publish(c.topic, message) + if err != nil { + log.Println("Error writing to NSQ topic:", err) + c.isConnected = false + } + } + } +} diff --git a/internal/certstream/certstream.go b/internal/certstream/certstream.go index 13fc238..e77e12f 100644 --- a/internal/certstream/certstream.go +++ b/internal/certstream/certstream.go @@ -6,8 +6,10 @@ package certstream import ( "log" + "net" "os" "os/signal" + "strconv" "syscall" "github.com/d-Rickyy-b/certstream-server-go/internal/broadcast" @@ -48,11 +50,40 @@ func NewCertstreamServer(config config.Config) (*Certstream, error) { // Setup metrics server cs.setupMetrics(webserver) - if config.StreamProcessing.Kafka.Enabled { - log.Println("Initializing Kafka client...") + log.Println(config.StreamProcessing) + // Initialize the stream processors if configured and enabled. + for _, streamProcessor := range config.StreamProcessing { + if !streamProcessor.Enabled { + continue + } - kc := broadcast.NewKafkaClient(broadcast.SubTypeFull, "kafka-producer", config.General.BufferSizes.Websocket) - broadcast.ClientHandler.RegisterClient(kc) + addr := net.JoinHostPort(streamProcessor.ServerAddr, strconv.Itoa(streamProcessor.ServerPort)) + log.Printf("Initializing stream processor: %s at %s\n", streamProcessor.Name, addr) + + switch streamProcessor.Type { + case "nsq": + log.Println("Initializing NSQ client...") + nc := broadcast.NewNSQClient( + broadcast.SubTypeFull, + addr, + streamProcessor.Name, + streamProcessor.Topic, + config.General.BufferSizes.Websocket, + ) + broadcast.ClientHandler.RegisterClient(nc) + case "kafka": + log.Println("Initializing Kafka client...") + kc := broadcast.NewKafkaClient( + broadcast.SubTypeFull, + addr, + streamProcessor.Name, + streamProcessor.Topic, + config.General.BufferSizes.Websocket, + ) + broadcast.ClientHandler.RegisterClient(kc) + default: + log.Printf("Unknown stream processor type '%s' for %s. Skipping...\n", streamProcessor.Type, streamProcessor.Name) + } } return &cs, nil diff --git a/internal/config/config.go b/internal/config/config.go index f7ae4a9..5d91b70 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -53,14 +53,14 @@ type Config struct { MetricsURL string `yaml:"metrics_url"` ExposeSystemMetrics bool `yaml:"expose_system_metrics"` } - StreamProcessing struct { - Kafka struct { - Enabled bool `yaml:"enabled"` - ServerAddr string `yaml:"server_addr"` - ServerPort int `yaml:"server_port"` - Topic string `yaml:"topic"` - } - } + StreamProcessing []struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + Enabled bool `yaml:"enabled"` + ServerAddr string `yaml:"server_addr"` + ServerPort int `yaml:"server_port"` + Topic string `yaml:"topic"` + } `yaml:"stream_processing"` General struct { // DisableDefaultLogs indicates whether the default logs used in Google Chrome and provided by Google should be disabled. DisableDefaultLogs bool `yaml:"disable_default_logs"` From 6582c08d94767bccf9737ec1b1899733b48f7962 Mon Sep 17 00:00:00 2001 From: Rico Date: Tue, 5 Aug 2025 22:29:01 +0200 Subject: [PATCH 14/18] refactor: remove unnecessary log --- internal/certstream/certstream.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/certstream/certstream.go b/internal/certstream/certstream.go index e77e12f..6e19da5 100644 --- a/internal/certstream/certstream.go +++ b/internal/certstream/certstream.go @@ -50,7 +50,6 @@ func NewCertstreamServer(config config.Config) (*Certstream, error) { // Setup metrics server cs.setupMetrics(webserver) - log.Println(config.StreamProcessing) // Initialize the stream processors if configured and enabled. for _, streamProcessor := range config.StreamProcessing { if !streamProcessor.Enabled { From 79ba8d7cd5202e797cebd8666c43010d9eb45048 Mon Sep 17 00:00:00 2001 From: Rico Date: Wed, 6 Aug 2025 01:01:37 +0200 Subject: [PATCH 15/18] refactor: change "kafka" in comments to "nsq" --- internal/broadcast/nsqclient.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/broadcast/nsqclient.go b/internal/broadcast/nsqclient.go index 56091ec..9aa6b4e 100644 --- a/internal/broadcast/nsqclient.go +++ b/internal/broadcast/nsqclient.go @@ -9,7 +9,7 @@ import ( // NSQClient connects to a NSQ server in order to provide it with certificates. type NSQClient struct { - conn *nsq.Producer // Kafka connection + conn *nsq.Producer // nsq connection addr string topic string isConnected bool @@ -60,12 +60,12 @@ func NewNSQClient(subType SubscriptionType, addr, name, topic string, certBuffer return nsqc } -// reconnectHandler is a background job that attempts to reconnect to the Kafka server if the connection is lost. +// reconnectHandler is a background job that attempts to reconnect to the NSQ server if the connection is lost. func (c *NSQClient) reconnectHandler() { for { select { case <-c.stopChan: - log.Println("Stopping reconnectHandler for kafka producer:", c.addr) + log.Println("Stopping reconnectHandler for nsq producer:", c.addr) return default: if c.isConnected { From 5aa85cdfd014256ad44a0e7be14fe3ae2dde11d7 Mon Sep 17 00:00:00 2001 From: Rico Date: Thu, 7 Aug 2025 12:53:36 +0200 Subject: [PATCH 16/18] chore: update dependencies --- go.sum | 62 ++++++++++++++++++++++++++-------------------------------- 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/go.sum b/go.sum index 2814adc..a0f323f 100644 --- a/go.sum +++ b/go.sum @@ -1,39 +1,38 @@ -github.com/VictoriaMetrics/metrics v1.35.2 h1:Bj6L6ExfnakZKYPpi7mGUnkJP4NGQz2v5wiChhXNyWQ= -github.com/VictoriaMetrics/metrics v1.35.2/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= github.com/VictoriaMetrics/metrics v1.40.1/go.mod h1:XE4uudAAIRaJE614Tl5HMrtoEU6+GDZO4QTnNSsZRuA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-chi/chi/v5 v5.2.1 h1:KOIHODQj58PmL80G2Eak4WdvUzjSJSm0vG72crDCqb8= -github.com/go-chi/chi/v5 v5.2.1/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= -github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= -github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= -github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618= +github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/certificate-transparency-go v1.3.2 h1:9ahSNZF2o7SYMaKaXhAumVEzXB2QaayzII9C8rv7v+A= github.com/google/certificate-transparency-go v1.3.2/go.mod h1:H5FpMUaGa5Ab2+KCYsxg6sELw3Flkl7pGZzWdBoYLXs= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/trillian v1.7.2 h1:EPBxc4YWY4Ak8tcuhyFleY+zYlbCDCa4Sn24e1Ka8Js= github.com/google/trillian v1.7.2/go.mod h1:mfQJW4qRH6/ilABtPYNBerVJAJ/upxHLX81zxNQw05s= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= -github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/segmentio/kafka-go v0.4.48 h1:9jyu9CWK4W5W+SroCe8EffbrRZVqAOkuaLd/ApID4Vs= github.com/segmentio/kafka-go v0.4.48/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= @@ -41,8 +40,9 @@ github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NF github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= @@ -57,9 +57,8 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= -golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= -golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= +golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= +golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -68,9 +67,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= -golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= -golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= +golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -82,9 +80,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= -golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= +golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -97,22 +94,19 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= -golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= +golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f h1:N/PrbTw4kdkqNRzVfWPrBekzLuarFREcbFOiOLkXon4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9/go.mod h1:HSkG/KdJWusxU1F6CNrwNDjBMgisKxGnc5dAZfT0mjQ= -google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM= -google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= -google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= -google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= -google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= -google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b h1:zPKJod4w6F1+nRGDI9ubnXYhU9NSWoFAijkHkUXeTK8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4= +google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM= +google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= +google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 400f2e378036ee9781900079d511675f8e12a576 Mon Sep 17 00:00:00 2001 From: Rico Date: Sun, 23 Nov 2025 18:51:06 +0100 Subject: [PATCH 17/18] chore: update dependencies --- go.mod | 21 +++++++------ go.sum | 93 ++++++++++++++-------------------------------------------- 2 files changed, 32 insertions(+), 82 deletions(-) diff --git a/go.mod b/go.mod index cabf194..3aeb889 100644 --- a/go.mod +++ b/go.mod @@ -5,28 +5,27 @@ go 1.24.0 toolchain go1.24.2 require ( - github.com/VictoriaMetrics/metrics v1.40.1 + github.com/VictoriaMetrics/metrics v1.40.2 github.com/go-chi/chi/v5 v5.2.3 github.com/google/certificate-transparency-go v1.3.2 github.com/gorilla/websocket v1.5.3 github.com/nsqio/go-nsq v1.1.0 - github.com/segmentio/kafka-go v0.4.48 + github.com/segmentio/kafka-go v0.4.49 gopkg.in/yaml.v3 v3.0.1 ) require ( github.com/go-logr/logr v1.4.3 // indirect - github.com/golang/snappy v0.0.1 // indirect + github.com/golang/snappy v1.0.0 // indirect github.com/google/trillian v1.7.2 // indirect - github.com/klauspost/compress v1.17.11 // indirect - github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/klauspost/compress v1.18.1 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/histogram v1.2.0 // indirect - golang.org/x/crypto v0.42.0 // indirect - golang.org/x/net v0.44.0 // indirect - golang.org/x/sys v0.36.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9 // indirect - google.golang.org/grpc v1.75.1 // indirect - google.golang.org/protobuf v1.36.9 // indirect + golang.org/x/crypto v0.45.0 // indirect + golang.org/x/sys v0.38.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect + google.golang.org/grpc v1.77.0 // indirect + google.golang.org/protobuf v1.36.10 // indirect k8s.io/klog/v2 v2.130.1 // indirect ) diff --git a/go.sum b/go.sum index a0f323f..3b66df9 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,9 @@ -github.com/VictoriaMetrics/metrics v1.40.1/go.mod h1:XE4uudAAIRaJE614Tl5HMrtoEU6+GDZO4QTnNSsZRuA= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/VictoriaMetrics/metrics v1.40.2 h1:OVSjKcQEx6JAwGeu8/KQm9Su5qJ72TMEW4xYn5vw3Ac= +github.com/VictoriaMetrics/metrics v1.40.2/go.mod h1:XE4uudAAIRaJE614Tl5HMrtoEU6+GDZO4QTnNSsZRuA= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618= -github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= +github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE= +github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= @@ -20,27 +19,20 @@ github.com/google/trillian v1.7.2 h1:EPBxc4YWY4Ak8tcuhyFleY+zYlbCDCa4Sn24e1Ka8Js github.com/google/trillian v1.7.2/go.mod h1:mfQJW4qRH6/ilABtPYNBerVJAJ/upxHLX81zxNQw05s= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= +github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= -github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/segmentio/kafka-go v0.4.48 h1:9jyu9CWK4W5W+SroCe8EffbrRZVqAOkuaLd/ApID4Vs= -github.com/segmentio/kafka-go v0.4.48/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk= +github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= @@ -53,63 +45,22 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= -golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= -golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= -golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= -golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b h1:zPKJod4w6F1+nRGDI9ubnXYhU9NSWoFAijkHkUXeTK8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= -google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4= -google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM= -google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= -google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= +golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba h1:UKgtfRM7Yh93Sya0Fo8ZzhDP4qBckrrxEr2oF5UIVb8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM= +google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= From 1603cce9d275c60e95a31f7d4a22e708d8f9211e Mon Sep 17 00:00:00 2001 From: Rico Date: Sun, 23 Nov 2025 18:51:31 +0100 Subject: [PATCH 18/18] fix: broken code due to rebase --- internal/broadcast/broadcastmanager.go | 2 ++ internal/web/server.go | 6 ++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/broadcast/broadcastmanager.go b/internal/broadcast/broadcastmanager.go index 2a9d600..6977780 100644 --- a/internal/broadcast/broadcastmanager.go +++ b/internal/broadcast/broadcastmanager.go @@ -105,6 +105,8 @@ func (bm *Dispatcher) GetSkippedCerts() map[string]uint64 { // broadcaster is run in a goroutine and handles the dispatching of certs to clients. func (bm *Dispatcher) broadcaster() { for { + var data []byte + entry := <-bm.MessageQueue dataLite := entry.JSONLite() dataFull := entry.JSON() diff --git a/internal/web/server.go b/internal/web/server.go index 99e8fff..079f6b8 100644 --- a/internal/web/server.go +++ b/internal/web/server.go @@ -19,9 +19,7 @@ import ( "github.com/gorilla/websocket" ) -var ( - upgrader websocket.Upgrader -) +var upgrader websocket.Upgrader // WebServer is a struct that holds the necessary information to run a webserver. // It is used for the websocket server as well as the metrics server. @@ -52,7 +50,7 @@ func IPWhitelist(whitelist []string) func(next http.Handler) http.Handler { var cidrList []net.IPNet for _, element := range whitelist { - _, ipNet, err := net.ParseCIDR(element) + ip, ipNet, err := net.ParseCIDR(element) if err != nil { // If there is an error parsing the CIDR, it might be an IP address if ip = net.ParseIP(element); ip == nil {