diff --git a/.github/goreleaser.yml b/.github/goreleaser.yml index 05d95ad..5a1352f 100644 --- a/.github/goreleaser.yml +++ b/.github/goreleaser.yml @@ -14,13 +14,13 @@ builds: - darwin - windows goarch: - - 386 + - "386" - amd64 - arm - arm64 ignore: - goos: darwin - goarch: 386 + goarch: "386" - goos: darwin goarch: arm - goos: windows @@ -28,7 +28,7 @@ builds: - goos: windows goarch: arm64 - goos: windows - goarch: 386 + goarch: "386" checksum: name_template: '{{.ProjectName}}_{{.Version}}_checksums.txt' changelog: diff --git a/.golangci.yml b/.golangci.yml index 4dceca4..7fa69e1 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,3 +1,5 @@ +version: "2" + # Options for analysis running. run: # The default concurrency value is the number of available CPU. @@ -32,8 +34,8 @@ linters: # Default: false # fast: true - # Disable all linters. - disable-all: true + # Disable all linters for now. + default: none # Enable specific linter # https://golangci-lint.run/usage/linters/#enabled-by-default @@ -41,7 +43,7 @@ linters: - bodyclose - containedctx # - deadcode - - depguard + # - depguard - dogsled - dupl - dupword @@ -50,23 +52,16 @@ linters: - errchkjson - errname - errorlint - - execinquery - exhaustive - - exportloopref - gochecknoinits - gocritic - godot # - godox - - gofmt - # - gofumpt - - goimports - # - golint # - gomnd - gomoddirectives - gomodguard - goprintffuncname - gosec - - gosimple - govet - grouper - importas @@ -91,9 +86,6 @@ linters: - reassign - revive - staticcheck - - stylecheck - - tenv - - typecheck - unconvert - unparam - unused @@ -102,43 +94,51 @@ linters: - wastedassign - whitespace - wrapcheck - - wsl + # - wsl_v5 + + exclusions: + rules: + # Exclude some linters from running on tests files. + - path: _test\.go + linters: + - nlreturn -linters-settings: - nlreturn: - # Size of the block (including return statement that is still "OK") so no return split required. - # Default: 1 - block-size: 2 - wsl: - enforce-err-cuddling: true - allow-cuddle-declarations: true - allow-assign-and-call: true - allow-cuddle-with-calls: - - log.Println - - log.Printf - - RLock - - RUnlock - - Lock - - Unlock - allow-assign-and-anything: true - gofumpt: - extra-rules: true - lll: - line-length: 160 - varnamelen: - ignore-decls: - - w io.Writer - - w io.WriteCloser - - w http.ResponseWriter - - r *http.Request - - r chi.Router - - r *chi.Mux - - i int + settings: + nlreturn: + # Size of the block (including return statement that is still "OK") so no return split required. + # Default: 1 + block-size: 2 + wsl: + force-err-cuddling: true + allow-cuddle-declarations: true + allow-assign-and-call: true + allow-cuddle-with-calls: + - log.Println + - log.Printf + - RLock + - RUnlock + - Lock + - Unlock + allow-assign-and-anything: true + lll: + line-length: 160 + varnamelen: + ignore-decls: + - w io.Writer + - w io.WriteCloser + - w http.ResponseWriter + - r *http.Request + - r chi.Router + - r *chi.Mux + - i int + +formatters: + enable: + # - gofmt + # - gofumpt + - goimports + # - golint -issues: - # List of regexps of issue texts to exclude. - exclude-rules: - # Exclude some linters from running on tests files. - - path: _test\.go - linters: - - nlreturn + settings: + gofumpt: + extra-rules: true diff --git a/cmd/certpicker/main.go b/cmd/certpicker/main.go index 22cc530..3a58136 100644 --- a/cmd/certpicker/main.go +++ b/cmd/certpicker/main.go @@ -46,7 +46,9 @@ func main() { } // Get entries from CT log - c, _ := context.WithTimeout(context.Background(), 10*time.Second) + c, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + entries, getEntriesErr := jsonClient.GetRawEntries(c, certID, certID) if getEntriesErr != nil { log.Fatalln("Error getting entries from CT log: ", getEntriesErr) diff --git a/config.sample.yaml b/config.sample.yaml index c442d9c..7787d65 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -11,6 +11,11 @@ webserver: cert_path: "" cert_key_path: "" compression_enabled: false + # Use True-Client-IP, X-Real-IP or the X-Forwarded-For headers (in that order) to determine the real IP address of the client. + # If you are using a reverse proxy, you should set this to true. + real_ip: false + whitelist: + - "127.0.0.1/8" prometheus: enabled: true @@ -22,6 +27,20 @@ prometheus: whitelist: - "127.0.0.1/8" +# Configuration related to external stream processing tools go here. +stream_processing: + - name: "kafka" + enabled: false + server_addr: "127.0.0.1" + server_port: 9092 + topic: "certstream" + + - name: "nqs" + enabled: true + server_addr: "127.0.0.1" + server_port: 9092 + topic: "certstream" + general: # DisableDefaultLogs indicates whether the default logs used in Google Chrome and provided by Google should be disabled. disable_default_logs: false @@ -39,9 +58,8 @@ general: websocket: 300 # Buffer for each CT log connection ctlog: 1000 - # Combined buffer for the broadcast manager - broadcastmanager: 10000 - + # Combined buffer for the cert dispatcher + dispatcher: 10000 # Google regularly updates the log list. If this option is set to true, the server will remove all logs no longer listed in the Google log list. # This option defaults to true. See https://github.com/d-Rickyy-b/certstream-server-go/issues/51 drop_old_logs: true diff --git a/docker/docker-compose.metrics.yml b/docker/docker-compose.metrics.yml index 316a8ec..9c17a7f 100644 --- a/docker/docker-compose.metrics.yml +++ b/docker/docker-compose.metrics.yml @@ -1,6 +1,6 @@ version: '2' -# Make sure to create the sub directories "prometheus", "prometheus_data", "grafana", "grafana_data" and "certstream" +# Make sure to create the subdirectories "prometheus", "prometheus_data", "grafana", "grafana_data" and "certstream" # and create the config files for all three services. For further details please refer to https://github.com/d-Rickyy-b/certstream-server-go/wiki/Collecting-and-Visualizing-Metrics networks: @@ -30,7 +30,7 @@ services: ports: # Exposing Prometheus is NOT required, if you don't want to access it from outside the Docker network. # Using localhost enables you to use a reverse proxy (e.g. with basic auth) to access Prometheus in a more secure way. - - 127.0.0.1:9090:9090 + - "127.0.0.1:9090:9090" networks: - monitoring extra_hosts: @@ -44,7 +44,7 @@ services: depends_on: - prometheus ports: - - 127.0.0.1:8082:3000 + - "127.0.0.1:8082:3000" volumes: - ./grafana_data:/var/lib/grafana - ./grafana/provisioning/:/etc/grafana/provisioning/ @@ -60,7 +60,7 @@ services: # Configure the service to run as specific user. # user: "1000:1000" ports: - - 127.0.0.1:8080:80 + - "127.0.0.1:8080:80" # Don't forget to open the other port in case you run the Prometheus endpoint on another port than the websocket server. # - 127.0.0.1:8081:81 volumes: diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4edc2fe..1cdeef8 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -7,7 +7,7 @@ services: # Configure the service to run as specific user # user: "1000:1000" ports: - - 127.0.0.1:8080:80 + - "127.0.0.1:8080:80" # Don't forget to open the other port in case you run the Prometheus endpoint on another port than the websocket server. # - 127.0.0.1:8081:81 volumes: diff --git a/go.mod b/go.mod index d121c0a..3aeb889 100644 --- a/go.mod +++ b/go.mod @@ -5,23 +5,27 @@ go 1.24.0 toolchain go1.24.2 require ( - github.com/VictoriaMetrics/metrics v1.40.1 + github.com/VictoriaMetrics/metrics v1.40.2 github.com/go-chi/chi/v5 v5.2.3 github.com/google/certificate-transparency-go v1.3.2 github.com/gorilla/websocket v1.5.3 + github.com/nsqio/go-nsq v1.1.0 + github.com/segmentio/kafka-go v0.4.49 gopkg.in/yaml.v3 v3.0.1 ) require ( github.com/go-logr/logr v1.4.3 // indirect + github.com/golang/snappy v1.0.0 // indirect github.com/google/trillian v1.7.2 // indirect + github.com/klauspost/compress v1.18.1 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/histogram v1.2.0 // indirect - golang.org/x/crypto v0.42.0 // indirect - golang.org/x/net v0.44.0 // indirect - golang.org/x/sys v0.36.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9 // indirect - google.golang.org/grpc v1.75.1 // indirect - google.golang.org/protobuf v1.36.9 // indirect + golang.org/x/crypto v0.45.0 // indirect + golang.org/x/sys v0.38.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect + google.golang.org/grpc v1.77.0 // indirect + google.golang.org/protobuf v1.36.10 // indirect k8s.io/klog/v2 v2.130.1 // indirect ) diff --git a/go.sum b/go.sum index f0ce5f4..3b66df9 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,16 @@ -github.com/VictoriaMetrics/metrics v1.40.1 h1:FrF5uJRpIVj9fayWcn8xgiI+FYsKGMslzPuOXjdeyR4= -github.com/VictoriaMetrics/metrics v1.40.1/go.mod h1:XE4uudAAIRaJE614Tl5HMrtoEU6+GDZO4QTnNSsZRuA= +github.com/VictoriaMetrics/metrics v1.40.2 h1:OVSjKcQEx6JAwGeu8/KQm9Su5qJ72TMEW4xYn5vw3Ac= +github.com/VictoriaMetrics/metrics v1.40.2/go.mod h1:XE4uudAAIRaJE614Tl5HMrtoEU6+GDZO4QTnNSsZRuA= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE= github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/certificate-transparency-go v1.3.2 h1:9ahSNZF2o7SYMaKaXhAumVEzXB2QaayzII9C8rv7v+A= github.com/google/certificate-transparency-go v1.3.2/go.mod h1:H5FpMUaGa5Ab2+KCYsxg6sELw3Flkl7pGZzWdBoYLXs= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= @@ -14,28 +19,46 @@ github.com/google/trillian v1.7.2 h1:EPBxc4YWY4Ak8tcuhyFleY+zYlbCDCa4Sn24e1Ka8Js github.com/google/trillian v1.7.2/go.mod h1:mfQJW4qRH6/ilABtPYNBerVJAJ/upxHLX81zxNQw05s= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= +github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk= +github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= -golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= -golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= -golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= -golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= -golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= -golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= -golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9 h1:V1jCN2HBa8sySkR5vLcCSqJSTMv093Rw9EJefhQGP7M= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9/go.mod h1:HSkG/KdJWusxU1F6CNrwNDjBMgisKxGnc5dAZfT0mjQ= -google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI= -google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= -google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= -google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= +golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba h1:UKgtfRM7Yh93Sya0Fo8ZzhDP4qBckrrxEr2oF5UIVb8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM= +google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/broadcast/baseclient.go b/internal/broadcast/baseclient.go new file mode 100644 index 0000000..9b69e3e --- /dev/null +++ b/internal/broadcast/baseclient.go @@ -0,0 +1,48 @@ +package broadcast + +import "log" + +// BaseClient defines the basic structure for a client that can receive broadcast messages. +// Other client types can embed this struct to inherit its functionality. +type BaseClient struct { + broadcastChan chan []byte + stopChan chan struct{} + name string + subType SubscriptionType + skippedCerts uint64 +} + +// Close cleans up the client's resources by closing the stop and broadcast channels. +func (c *BaseClient) Close() { + close(c.stopChan) + close(c.broadcastChan) +} + +// Name returns the name of the client. +func (c *BaseClient) Name() string { + return c.name +} + +// SubType returns the subscription type of the client. +func (c *BaseClient) SubType() SubscriptionType { + return c.subType +} + +// SkippedCerts returns the number of certificates that were skipped due to the client's broadcast channel being full. +func (c *BaseClient) SkippedCerts() uint64 { + return c.skippedCerts +} + +// Write sends a message to the client's broadcast channel. +// If the channel is full, it increments the skippedCerts counter and logs a message. +func (c *BaseClient) Write(data []byte) { + select { + case c.broadcastChan <- data: + default: + // Default case is executed if the client's broadcast channel is full. + c.skippedCerts++ + if c.skippedCerts%1000 == 1 { + log.Printf("Not providing client '%s' with cert because client's buffer is full. The client can't keep up. Skipped certs: %d\n", c.name, c.skippedCerts) + } + } +} diff --git a/internal/web/broadcastmanager.go b/internal/broadcast/broadcastmanager.go similarity index 52% rename from internal/web/broadcastmanager.go rename to internal/broadcast/broadcastmanager.go index 18382be..6977780 100644 --- a/internal/web/broadcastmanager.go +++ b/internal/broadcast/broadcastmanager.go @@ -1,34 +1,48 @@ -package web +package broadcast import ( "log" "sync" + "github.com/d-Rickyy-b/certstream-server-go/internal/config" "github.com/d-Rickyy-b/certstream-server-go/internal/models" ) -type BroadcastManager struct { - Broadcast chan models.Entry - clients []*client - clientLock sync.RWMutex +// ClientHandler dispatches certificate entries to registered clients. +var ClientHandler *Dispatcher + +type Dispatcher struct { + MessageQueue chan models.Entry + clients []CertProcessor + clientLock sync.RWMutex +} + +// NewDispatcher creates a new Dispatcher instance and assigns it to the ClientHandler variable. +func NewDispatcher() *Dispatcher { + d := &Dispatcher{} + d.MessageQueue = make(chan models.Entry, config.AppConfig.General.BufferSizes.Dispatcher) + ClientHandler = d + + return d } -// registerClient adds a client to the list of clients of the BroadcastManager. +// RegisterClient adds a client to the list of clients of the Dispatcher. // The client will receive certificate broadcasts right after registration. -func (bm *BroadcastManager) registerClient(c *client) { +func (bm *Dispatcher) RegisterClient(c CertProcessor) { + // TODO: check if the client is already registered bm.clientLock.Lock() bm.clients = append(bm.clients, c) - log.Printf("Clients: %d, Capacity: %d\n", len(bm.clients), cap(bm.clients)) + log.Printf("Added new client. Clients: %d, Capacity: %d\n", len(bm.clients), cap(bm.clients)) bm.clientLock.Unlock() } -// unregisterClient removes a client from the list of clients of the BroadcastManager. +// UnregisterClient removes a client from the list of clients of the Dispatcher. // The client will no longer receive certificate broadcasts right after unregistering. -func (bm *BroadcastManager) unregisterClient(c *client) { +func (bm *Dispatcher) UnregisterClient(clientName string) { bm.clientLock.Lock() for i, client := range bm.clients { - if c == client { + if clientName == client.Name() { // Copy the last element of the slice to the position of the removed element // Then remove the last element by re-slicing bm.clients[i] = bm.clients[len(bm.clients)-1] @@ -36,7 +50,7 @@ func (bm *BroadcastManager) unregisterClient(c *client) { bm.clients = bm.clients[:len(bm.clients)-1] // Close the broadcast channel of the client, otherwise this leads to a memory leak - close(c.broadcastChan) + client.Close() break } @@ -46,28 +60,28 @@ func (bm *BroadcastManager) unregisterClient(c *client) { } // ClientFullCount returns the current number of clients connected to the service on the `full` endpoint. -func (bm *BroadcastManager) ClientFullCount() (count int64) { +func (bm *Dispatcher) ClientFullCount() (count int64) { return bm.clientCountByType(SubTypeFull) } // ClientLiteCount returns the current number of clients connected to the service on the `lite` endpoint. -func (bm *BroadcastManager) ClientLiteCount() (count int64) { +func (bm *Dispatcher) ClientLiteCount() (count int64) { return bm.clientCountByType(SubTypeLite) } // ClientDomainsCount returns the current number of clients connected to the service on the `domains-only` endpoint. -func (bm *BroadcastManager) ClientDomainsCount() (count int64) { +func (bm *Dispatcher) ClientDomainsCount() (count int64) { return bm.clientCountByType(SubTypeDomain) } // clientCountByType returns the current number of clients connected to the service on the endpoint matching // the specified SubscriptionType. -func (bm *BroadcastManager) clientCountByType(subType SubscriptionType) (count int64) { +func (bm *Dispatcher) clientCountByType(subType SubscriptionType) (count int64) { bm.clientLock.RLock() defer bm.clientLock.RUnlock() for _, c := range bm.clients { - if c.subType == subType { + if c.SubType() == subType { count++ } } @@ -75,24 +89,25 @@ func (bm *BroadcastManager) clientCountByType(subType SubscriptionType) (count i return count } -func (bm *BroadcastManager) GetSkippedCerts() map[string]uint64 { +// GetSkippedCerts returns a map of client names to the number of skipped certificates for each client. +func (bm *Dispatcher) GetSkippedCerts() map[string]uint64 { bm.clientLock.RLock() defer bm.clientLock.RUnlock() skippedCerts := make(map[string]uint64, len(bm.clients)) for _, c := range bm.clients { - skippedCerts[c.name] = c.skippedCerts + skippedCerts[c.Name()] = c.SkippedCerts() } return skippedCerts } -// broadcaster is run in a goroutine and handles the dispatching of entries to clients. -func (bm *BroadcastManager) broadcaster() { +// broadcaster is run in a goroutine and handles the dispatching of certs to clients. +func (bm *Dispatcher) broadcaster() { for { var data []byte - entry := <-bm.Broadcast + entry := <-bm.MessageQueue dataLite := entry.JSONLite() dataFull := entry.JSON() dataDomain := entry.JSONDomains() @@ -100,7 +115,7 @@ func (bm *BroadcastManager) broadcaster() { bm.clientLock.RLock() for _, c := range bm.clients { - switch c.subType { + switch c.SubType() { case SubTypeLite: data = dataLite case SubTypeFull: @@ -108,21 +123,20 @@ func (bm *BroadcastManager) broadcaster() { case SubTypeDomain: data = dataDomain default: - log.Printf("Unknown subscription type '%d' for client '%s'. Skipping this client!\n", c.subType, c.name) + // This should never happen, but if it does, we log it and skip the client. + log.Printf("Unknown subscription type '%d' for client '%s'. Skipping this client!\n", c.SubType(), c.Name()) continue } - select { - case c.broadcastChan <- data: - default: - // Default case is executed if the client's broadcast channel is full. - c.skippedCerts++ - if c.skippedCerts%1000 == 1 { - log.Printf("Not providing client '%s' with cert because client's buffer is full. The client can't keep up. Skipped certs: %d\n", c.name, c.skippedCerts) - } - } + c.Write(data) } bm.clientLock.RUnlock() } } + +// Start starts the broadcaster goroutine. +func (bm *Dispatcher) Start() { + go bm.broadcaster() + log.Println("Dispatcher started. Listening for certificate entries...") +} diff --git a/internal/broadcast/certprocessor.go b/internal/broadcast/certprocessor.go new file mode 100644 index 0000000..d10d47a --- /dev/null +++ b/internal/broadcast/certprocessor.go @@ -0,0 +1,26 @@ +package broadcast + +// CertProcessor defines the interface for processing certificate messages. +// Different implementations can be used for different types of clients, such as WebSocket clients, kafka, or other message queues for example. +type CertProcessor interface { + // Write sends a message to the client's broadcast channel. + Write(message []byte) + // Close closes the client's connection and cleans up resources. + Close() + + // To obtain client details, there are getter methods for several values + Name() string + SubType() SubscriptionType + SkippedCerts() uint64 +} + +const ( + // SubTypeFull represents full certificate updates. + SubTypeFull SubscriptionType = iota + // SubTypeLite represents certificate updates with less details. + SubTypeLite + // SubTypeDomain represents updates that only include domain information. + SubTypeDomain +) + +type SubscriptionType int diff --git a/internal/broadcast/kafkaclient.go b/internal/broadcast/kafkaclient.go new file mode 100644 index 0000000..a10a938 --- /dev/null +++ b/internal/broadcast/kafkaclient.go @@ -0,0 +1,119 @@ +package broadcast + +import ( + "context" + "log" + "time" + + "github.com/segmentio/kafka-go" +) + +const ( + TOPIC = "certstream" +) + +// KafkaClient connects to a Kafka server in order to provide it with certificates. +type KafkaClient struct { + conn *kafka.Conn // Kafka connection + addr string + topic string + isConnected bool + BaseClient +} + +// NewKafkaClient creates a new Kafka client that immediately connects to the configured Kafka server. +func NewKafkaClient(subType SubscriptionType, addr, name, topic string, certBufferSize int) *KafkaClient { + // Connect to the Kafka server + conn, err := kafka.DialLeader(context.Background(), "tcp", addr, topic, 0) + if err != nil { + log.Println("failed to connect to kafka:", err) + } + + kc := &KafkaClient{ + conn: conn, + addr: addr, + topic: topic, + BaseClient: BaseClient{ + broadcastChan: make(chan []byte, certBufferSize), + stopChan: make(chan struct{}), + name: name, + subType: subType, + }, + } + + go kc.broadcastHandler() + go kc.reconnectHandler() + + return kc +} + +// reconnectHandler is a background job that attempts to reconnect to the Kafka server if the connection is lost. +func (c *KafkaClient) reconnectHandler() { + for { + select { + case <-c.stopChan: + log.Println("Stopping reconnectHandler for kafka producer:", c.addr) + c.conn.Close() + + return + default: + if c.isConnected { + // If already connected or no connection exists, skip reconnection + time.Sleep(5 * time.Second) + continue + } + + // Attempt to connect to the Kafka server + conn, err := kafka.DialLeader(context.Background(), "tcp", c.addr, c.topic, 0) + if err != nil { + log.Printf("Reconnect failed: %v. Retrying in 5s...", err) + time.Sleep(5 * time.Second) + + continue + } + // Close old connection if exists + if c.conn != nil { + _ = c.conn.Close() + } + c.conn = conn + c.isConnected = true + log.Println("Reconnected to Kafka at", c.addr) + } + } +} + +// Each client has a broadcastHandler that runs in the background and sends out the broadcast messages to the client. +func (c *KafkaClient) broadcastHandler() { + writeWait := 60 * time.Second + + defer func() { + log.Println("Closing broadcast handler for kafka producer:", c.addr) + if err := c.conn.Close(); err != nil { + log.Println("failed to close writer:", err) + } + + ClientHandler.UnregisterClient(c.name) + }() + + for { + select { + case <-c.stopChan: + return + case message := <-c.broadcastChan: + if !c.isConnected { + continue + } + + _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + + c.conn.Broker() + _, err := c.conn.WriteMessages( + kafka.Message{Value: message}, + ) + if err != nil { + c.isConnected = false + log.Println("Failed to write messages to kafka:", err) + } + } + } +} diff --git a/internal/broadcast/nsqclient.go b/internal/broadcast/nsqclient.go new file mode 100644 index 0000000..9aa6b4e --- /dev/null +++ b/internal/broadcast/nsqclient.go @@ -0,0 +1,119 @@ +package broadcast + +import ( + "github.com/nsqio/go-nsq" + + "log" + "time" +) + +// NSQClient connects to a NSQ server in order to provide it with certificates. +type NSQClient struct { + conn *nsq.Producer // nsq connection + addr string + topic string + isConnected bool + BaseClient +} + +// nullLogger is a logger that does not output anything. +// It is used to silence the NSQ logger. +type nullLogger struct{} + +func (l nullLogger) Output(calldepth int, s string) error { + // Do nothing, effectively silencing the logger + return nil +} + +// NewNSQClient creates a new NSQ client that immediately connects to the configured NSQ server +func NewNSQClient(subType SubscriptionType, addr, name, topic string, certBufferSize int) *NSQClient { + log.Println("Initializing NSQ client...") + + // Instantiate a producer. + conf := nsq.NewConfig() + conn, err := nsq.NewProducer(addr, conf) + if err != nil { + log.Println(err) + } + + log.Println("Connected to NSQ server at", addr) + + // Silence log output from NSQ + conn.SetLogger(nullLogger{}, nsq.LogLevelError) + + nsqc := &NSQClient{ + conn: conn, + addr: addr, + topic: topic, + BaseClient: BaseClient{ + broadcastChan: make(chan []byte, certBufferSize), + stopChan: make(chan struct{}), + name: name, + subType: subType, + }, + } + nsqc.isConnected = true + + go nsqc.broadcastHandler() + go nsqc.reconnectHandler() + + return nsqc +} + +// reconnectHandler is a background job that attempts to reconnect to the NSQ server if the connection is lost. +func (c *NSQClient) reconnectHandler() { + for { + select { + case <-c.stopChan: + log.Println("Stopping reconnectHandler for nsq producer:", c.addr) + return + default: + if c.isConnected { + // If already connected or no connection exists, skip reconnection + time.Sleep(5 * time.Second) + continue + } + // Attempt to connect to the NSQ server + err := c.conn.Ping() + if err != nil { + log.Printf("Reconnect to NSQ server failed: '%v'. Retrying in 5s...", err) + time.Sleep(5 * time.Second) + + continue + } + + c.isConnected = true + log.Println("Reconnected to NSQ server at", c.addr) + } + } +} + +// Each client has a broadcastHandler that runs in the background and sends out the broadcast messages to the client. +func (c *NSQClient) broadcastHandler() { + // writeWait := 60 * time.Second + + defer func() { + log.Println("Closing broadcast handler for nsq producer:", c.addr) + // Gracefully stop the producer when appropriate (e.g. before shutting down the service) + c.conn.Stop() + }() + + for { + select { + case <-c.stopChan: + return + case message := <-c.broadcastChan: + if !c.isConnected { + continue + } + + // Synchronously publish a single message to the specified topic. + // Messages can also be sent asynchronously and/or in batches. + err := c.conn.Publish(c.topic, message) + if err != nil { + log.Println("Error writing to NSQ topic:", err) + c.isConnected = false + } + } + } +} diff --git a/internal/web/client.go b/internal/broadcast/websocketclient.go similarity index 80% rename from internal/web/client.go rename to internal/broadcast/websocketclient.go index 464d6c0..ef083ff 100644 --- a/internal/web/client.go +++ b/internal/broadcast/websocketclient.go @@ -1,4 +1,4 @@ -package web +package broadcast import ( "log" @@ -8,34 +8,30 @@ import ( "github.com/gorilla/websocket" ) -const ( - SubTypeFull SubscriptionType = iota - SubTypeLite - SubTypeDomain -) - -type SubscriptionType int - -// client represents a single client's connection to the server. -type client struct { - conn *websocket.Conn - broadcastChan chan []byte - name string - subType SubscriptionType - skippedCerts uint64 +// WebsocketClient represents a single WebSocket client's connection to the server. +type WebsocketClient struct { + conn *websocket.Conn + *BaseClient } -func newClient(conn *websocket.Conn, subType SubscriptionType, name string, certBufferSize int) *client { - return &client{ - conn: conn, - broadcastChan: make(chan []byte, certBufferSize), - name: name, - subType: subType, +// NewWebsocketClient creates a new WebSocket client from the given connection. +func NewWebsocketClient(conn *websocket.Conn, subType SubscriptionType, name string, certBufferSize int) *WebsocketClient { + c := &WebsocketClient{ + conn: conn, + BaseClient: &BaseClient{ + broadcastChan: make(chan []byte, certBufferSize), + name: name, + subType: subType, + }, } + go c.broadcastHandler() + go c.listenWebsocket() + + return c } // Each client has a broadcastHandler that runs in the background and sends out the broadcast messages to the client. -func (c *client) broadcastHandler() { +func (c *WebsocketClient) broadcastHandler() { writeWait := 60 * time.Second pingTicker := time.NewTicker(30 * time.Second) @@ -82,10 +78,10 @@ func (c *client) broadcastHandler() { // listenWebsocket is running in the background on a goroutine and listens for messages from the client. // It responds to ping messages with a pong message. It closes the connection if the client sends // a close message or no ping is received within 65 seconds. -func (c *client) listenWebsocket() { +func (c *WebsocketClient) listenWebsocket() { defer func() { _ = c.conn.Close() - ClientHandler.unregisterClient(c) + ClientHandler.UnregisterClient(c.name) }() readWait := 65 * time.Second diff --git a/internal/certificatetransparency/ct-watcher.go b/internal/certificatetransparency/ct-watcher.go index 8895cbd..86b21fd 100644 --- a/internal/certificatetransparency/ct-watcher.go +++ b/internal/certificatetransparency/ct-watcher.go @@ -13,6 +13,7 @@ import ( "sync/atomic" "time" + "github.com/d-Rickyy-b/certstream-server-go/internal/broadcast" "github.com/d-Rickyy-b/certstream-server-go/internal/config" "github.com/d-Rickyy-b/certstream-server-go/internal/models" "github.com/d-Rickyy-b/certstream-server-go/internal/web" @@ -471,7 +472,7 @@ func certHandler(entryChan chan models.Entry) { } // Run json encoding in the background and send the result to the clients. - web.ClientHandler.Broadcast <- entry + broadcast.ClientHandler.MessageQueue <- entry // Update metrics url := entry.Data.Source.NormalizedURL diff --git a/internal/certificatetransparency/logmetrics.go b/internal/certificatetransparency/logmetrics.go index 064f263..7ecc2a7 100644 --- a/internal/certificatetransparency/logmetrics.go +++ b/internal/certificatetransparency/logmetrics.go @@ -273,10 +273,12 @@ func GetProcessedPrecerts() int64 { return processedPrecerts } +// GetCertMetrics returns a copy of the internal metrics map. func GetCertMetrics() CTMetrics { return metrics.GetCTMetrics() } +// GetLogOperators returns a map of operator names to a list of CT logs. func GetLogOperators() map[string][]string { return metrics.OperatorLogMapping() } diff --git a/internal/certstream/certstream.go b/internal/certstream/certstream.go index 6ade396..6e19da5 100644 --- a/internal/certstream/certstream.go +++ b/internal/certstream/certstream.go @@ -6,10 +6,13 @@ package certstream import ( "log" + "net" "os" "os/signal" + "strconv" "syscall" + "github.com/d-Rickyy-b/certstream-server-go/internal/broadcast" "github.com/d-Rickyy-b/certstream-server-go/internal/certificatetransparency" "github.com/d-Rickyy-b/certstream-server-go/internal/config" "github.com/d-Rickyy-b/certstream-server-go/internal/metrics" @@ -35,6 +38,11 @@ func NewCertstreamServer(config config.Config) (*Certstream, error) { cs := Certstream{} cs.config = config + // Start the broadcast dispatcher + broadcast.NewDispatcher() + broadcast.ClientHandler.Start() + + // TODO: add support do disable websocket Server // Initialize the webserver used for the websocket server webserver := web.NewWebsocketServer(config.Webserver.ListenAddr, config.Webserver.ListenPort, config.Webserver.CertPath, config.Webserver.CertKeyPath) cs.webserver = webserver @@ -42,6 +50,41 @@ func NewCertstreamServer(config config.Config) (*Certstream, error) { // Setup metrics server cs.setupMetrics(webserver) + // Initialize the stream processors if configured and enabled. + for _, streamProcessor := range config.StreamProcessing { + if !streamProcessor.Enabled { + continue + } + + addr := net.JoinHostPort(streamProcessor.ServerAddr, strconv.Itoa(streamProcessor.ServerPort)) + log.Printf("Initializing stream processor: %s at %s\n", streamProcessor.Name, addr) + + switch streamProcessor.Type { + case "nsq": + log.Println("Initializing NSQ client...") + nc := broadcast.NewNSQClient( + broadcast.SubTypeFull, + addr, + streamProcessor.Name, + streamProcessor.Topic, + config.General.BufferSizes.Websocket, + ) + broadcast.ClientHandler.RegisterClient(nc) + case "kafka": + log.Println("Initializing Kafka client...") + kc := broadcast.NewKafkaClient( + broadcast.SubTypeFull, + addr, + streamProcessor.Name, + streamProcessor.Topic, + config.General.BufferSizes.Websocket, + ) + broadcast.ClientHandler.RegisterClient(kc) + default: + log.Printf("Unknown stream processor type '%s' for %s. Skipping...\n", streamProcessor.Type, streamProcessor.Name) + } + } + return &cs, nil } @@ -65,7 +108,11 @@ func (cs *Certstream) setupMetrics(webserver *web.WebServer) { webserver.RegisterPrometheus(cs.config.Prometheus.MetricsURL, metrics.WritePrometheus) } else { log.Println("Starting prometheus server on new interface") - cs.metricsServer = web.NewMetricsServer(cs.config.Prometheus.ListenAddr, cs.config.Prometheus.ListenPort, cs.config.Prometheus.CertPath, cs.config.Prometheus.CertKeyPath) + cs.metricsServer = web.NewMetricsServer( + cs.config.Prometheus.ListenAddr, + cs.config.Prometheus.ListenPort, + cs.config.Prometheus.CertPath, + cs.config.Prometheus.CertKeyPath) cs.metricsServer.RegisterPrometheus(cs.config.Prometheus.MetricsURL, metrics.WritePrometheus) } } diff --git a/internal/config/config.go b/internal/config/config.go index 24dd3fb..5d91b70 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -32,8 +32,10 @@ type LogConfig struct { } type BufferSizes struct { - Websocket int `yaml:"websocket"` - CTLog int `yaml:"ctlog"` + Websocket int `yaml:"websocket"` + CTLog int `yaml:"ctlog"` + Dispatcher int `yaml:"dispatcher"` + // Deprecated: BroadcastManager was renamed to Dispatcher. BroadcastManager int `yaml:"broadcastmanager"` } @@ -51,13 +53,23 @@ type Config struct { MetricsURL string `yaml:"metrics_url"` ExposeSystemMetrics bool `yaml:"expose_system_metrics"` } + StreamProcessing []struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + Enabled bool `yaml:"enabled"` + ServerAddr string `yaml:"server_addr"` + ServerPort int `yaml:"server_port"` + Topic string `yaml:"topic"` + } `yaml:"stream_processing"` General struct { // DisableDefaultLogs indicates whether the default logs used in Google Chrome and provided by Google should be disabled. DisableDefaultLogs bool `yaml:"disable_default_logs"` // AdditionalLogs contains additional logs provided by the user that can be used in addition to the default logs. AdditionalLogs []LogConfig `yaml:"additional_logs"` - BufferSizes BufferSizes `yaml:"buffer_sizes"` - DropOldLogs *bool `yaml:"drop_old_logs"` + // BufferSizes contains the buffer sizes for the different components of the server. They usually don't need any adjustments. + BufferSizes BufferSizes `yaml:"buffer_sizes"` + // DropOldLogs indicates whether downloading CT-Logs should start at the latest index (true) or should from the beginning (false). + DropOldLogs *bool `yaml:"drop_old_logs"` Recovery struct { Enabled bool `yaml:"enabled"` CTIndexFile string `yaml:"ct_index_file"` @@ -235,8 +247,13 @@ func validateConfig(config *Config) bool { config.General.BufferSizes.CTLog = 1000 } - if config.General.BufferSizes.BroadcastManager <= 0 { - config.General.BufferSizes.BroadcastManager = 10000 + // For backward compatibility, copy value from deprecated BroadcastManager field + if config.General.BufferSizes.BroadcastManager != 0 { + config.General.BufferSizes.Dispatcher = config.General.BufferSizes.BroadcastManager + } + + if config.General.BufferSizes.Dispatcher <= 0 { + config.General.BufferSizes.Dispatcher = 10000 } // If the cleanup flag is not set, default to true diff --git a/internal/metrics/prometheus.go b/internal/metrics/prometheus.go index aad8d2e..ac25f6d 100644 --- a/internal/metrics/prometheus.go +++ b/internal/metrics/prometheus.go @@ -7,8 +7,8 @@ import ( "sync" "time" + "github.com/d-Rickyy-b/certstream-server-go/internal/broadcast" "github.com/d-Rickyy-b/certstream-server-go/internal/certificatetransparency" - "github.com/d-Rickyy-b/certstream-server-go/internal/web" "github.com/VictoriaMetrics/metrics" ) @@ -22,13 +22,13 @@ var ( // Number of currently connected clients. fullClientCount = metrics.NewGauge("certstreamservergo_clients_total{type=\"full\"}", func() float64 { - return float64(web.ClientHandler.ClientFullCount()) + return float64(broadcast.ClientHandler.ClientFullCount()) }) liteClientCount = metrics.NewGauge("certstreamservergo_clients_total{type=\"lite\"}", func() float64 { - return float64(web.ClientHandler.ClientLiteCount()) + return float64(broadcast.ClientHandler.ClientLiteCount()) }) domainClientCount = metrics.NewGauge("certstreamservergo_clients_total{type=\"domain\"}", func() float64 { - return float64(web.ClientHandler.ClientDomainsCount()) + return float64(broadcast.ClientHandler.ClientDomainsCount()) }) // Number of certificates processed by the CT watcher. @@ -90,7 +90,7 @@ func getCertCountForLog(operatorName, logname string) int64 { // getSkippedCertMetrics gets the number of skipped certificates for each client and creates metrics for it. // It also removes metrics for clients that are not connected anymore. func getSkippedCertMetrics() { - skippedCerts := web.ClientHandler.GetSkippedCerts() + skippedCerts := broadcast.ClientHandler.GetSkippedCerts() for clientName := range skippedCerts { // Get or register a new counter for each client metricName := fmt.Sprintf("certstreamservergo_skipped_certs{client=\"%s\"}", clientName) diff --git a/internal/web/examplecert.go b/internal/web/examplecert.go index e841323..05576f3 100644 --- a/internal/web/examplecert.go +++ b/internal/web/examplecert.go @@ -29,7 +29,7 @@ func exampleDomains(w http.ResponseWriter, _ *http.Request) { w.Write(exampleCert.JSONDomains()) //nolint:errcheck } -// SetExampleCert sets one certificate as the example Cert that is returned by the example endpoints. +// SetExampleCert sets the example certificate to be used in the example endpoints. func SetExampleCert(cert models.Entry) { exampleCert = cert } diff --git a/internal/web/server.go b/internal/web/server.go index 50a55b6..079f6b8 100644 --- a/internal/web/server.go +++ b/internal/web/server.go @@ -11,19 +11,15 @@ import ( "strconv" "time" - "github.com/go-chi/chi/v5" - "github.com/go-chi/chi/v5/middleware" - + "github.com/d-Rickyy-b/certstream-server-go/internal/broadcast" "github.com/d-Rickyy-b/certstream-server-go/internal/config" - "github.com/d-Rickyy-b/certstream-server-go/internal/models" + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" "github.com/gorilla/websocket" ) -var ( - ClientHandler = BroadcastManager{} - upgrader websocket.Upgrader -) +var upgrader websocket.Upgrader // WebServer is a struct that holds the necessary information to run a webserver. // It is used for the websocket server as well as the metrics server. @@ -54,15 +50,14 @@ func IPWhitelist(whitelist []string) func(next http.Handler) http.Handler { var cidrList []net.IPNet for _, element := range whitelist { - _, ipNet, err := net.ParseCIDR(element) + ip, ipNet, err := net.ParseCIDR(element) if err != nil { - var ip net.IP + // If there is an error parsing the CIDR, it might be an IP address if ip = net.ParseIP(element); ip == nil { log.Println("Invalid IP in metrics whitelist: ", element) continue } - ipList = append(ipList, ip) continue @@ -119,7 +114,7 @@ func initFullWebsocket(w http.ResponseWriter, r *http.Request) { return } - setupClient(connection, SubTypeFull, r.RemoteAddr) + setupClient(connection, broadcast.SubTypeFull, r.RemoteAddr) } // initLiteWebsocket is called when a client connects to the / endpoint. @@ -131,7 +126,7 @@ func initLiteWebsocket(w http.ResponseWriter, r *http.Request) { return } - setupClient(connection, SubTypeLite, r.RemoteAddr) + setupClient(connection, broadcast.SubTypeLite, r.RemoteAddr) } // initDomainWebsocket is called when a client connects to the /domains-only endpoint. @@ -143,7 +138,7 @@ func initDomainWebsocket(w http.ResponseWriter, r *http.Request) { return } - setupClient(connection, SubTypeDomain, r.RemoteAddr) + setupClient(connection, broadcast.SubTypeDomain, r.RemoteAddr) } // upgradeConnection upgrades the connection to a websocket and returns the connection. @@ -174,12 +169,9 @@ func upgradeConnection(w http.ResponseWriter, r *http.Request) (*websocket.Conn, } // setupClient initializes a client struct and starts the broadcastHandler and websocket listener. -func setupClient(connection *websocket.Conn, subscriptionType SubscriptionType, name string) { - c := newClient(connection, subscriptionType, name, config.AppConfig.General.BufferSizes.Websocket) - go c.broadcastHandler() - go c.listenWebsocket() - - ClientHandler.registerClient(c) +func setupClient(connection *websocket.Conn, subscriptionType broadcast.SubscriptionType, name string) { + c := broadcast.NewWebsocketClient(connection, subscriptionType, name, config.AppConfig.General.BufferSizes.Websocket) + broadcast.ClientHandler.RegisterClient(c) } // setupWebsocketRoutes configures all the routes necessary for the websocket webserver. @@ -255,8 +247,7 @@ func NewMetricsServer(networkIf string, port int, certPath, keyPath string) *Web } // NewWebsocketServer starts a new webserver and initialized it with the necessary routes. -// It also starts the broadcaster in ClientHandler as a background job and takes care of -// setting up websocket.Upgrader. +// It also takes care of setting up websocket.Upgrader. func NewWebsocketServer(networkIf string, port int, certPath, keyPath string) *WebServer { server := &WebServer{ networkIf: networkIf, @@ -286,9 +277,6 @@ func NewWebsocketServer(networkIf string, port int, certPath, keyPath string) *W setupWebsocketRoutes(server.routes) server.initServer() - ClientHandler.Broadcast = make(chan models.Entry, config.AppConfig.General.BufferSizes.BroadcastManager) - go ClientHandler.broadcaster() - return server }