From 9a0c1f6b8f36f6fbe6b5637913fc721c5d0b3c4f Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Tue, 3 Mar 2026 14:54:16 -0500 Subject: [PATCH] StreamLiveMintData now handles app shutdown more gracefully --- ocp/rpc/currency/live_data.go | 3 +++ ocp/rpc/currency/server.go | 6 ++++-- ocp/rpc/currency/worker.go | 19 ++++++++++++++----- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/ocp/rpc/currency/live_data.go b/ocp/rpc/currency/live_data.go index 377f0e8..b1e8ca3 100644 --- a/ocp/rpc/currency/live_data.go +++ b/ocp/rpc/currency/live_data.go @@ -67,6 +67,9 @@ func (s *currencyServer) StreamLiveMintData( // Register stream with state worker stream := s.liveMintStateWorker.registerStream(streamID, requestedMints) + if stream == nil { + return status.Error(codes.Unavailable, "server is shutting down") + } defer s.liveMintStateWorker.unregisterStream(streamID) log.Debug("stream registered") diff --git a/ocp/rpc/currency/server.go b/ocp/rpc/currency/server.go index 8d3cf70..3f71eee 100644 --- a/ocp/rpc/currency/server.go +++ b/ocp/rpc/currency/server.go @@ -52,13 +52,13 @@ func NewCurrencyServer( antispamGuard *antispam.Guard, s3Client *s3.Client, configProvider ConfigProvider, -) currencypb.CurrencyServer { +) (currencypb.CurrencyServer, func()) { conf := configProvider() liveMintStateWorker := newLiveMintStateWorker(log, data, conf) liveMintStateWorker.start(context.Background()) - return ¤cyServer{ + s := ¤cyServer{ log: log, conf: conf, @@ -78,4 +78,6 @@ func NewCurrencyServer( liveMintStateWorker: liveMintStateWorker, } + + return s, s.liveMintStateWorker.stop } diff --git a/ocp/rpc/currency/worker.go b/ocp/rpc/currency/worker.go index 44ac25e..59b0ffa 100644 --- a/ocp/rpc/currency/worker.go +++ b/ocp/rpc/currency/worker.go @@ -55,6 +55,7 @@ type liveMintStateWorker struct { streamsMu sync.RWMutex streams map[string]*liveMintDataStream + stopped bool exchangeRatesReady chan struct{} exchangeRatesReadyOnce sync.Once @@ -89,13 +90,16 @@ func (m *liveMintStateWorker) start(ctx context.Context) error { return nil } -// stop cancels the polling goroutines and closes all streams +// stop cancels the polling goroutines and closes all streams. +// After stop is called, no new streams can be registered. func (m *liveMintStateWorker) stop() { m.cancel() m.streamsMu.Lock() defer m.streamsMu.Unlock() + m.stopped = true + for _, stream := range m.streams { stream.close() } @@ -152,13 +156,18 @@ func (m *liveMintStateWorker) trackMints(ctx context.Context, mints []*common.Ac return nil } -// registerStream creates and registers a new stream for the given mints +// registerStream creates and registers a new stream for the given mints. +// Returns nil if the worker has been stopped. func (m *liveMintStateWorker) registerStream(id string, mints []*common.Account) *liveMintDataStream { - stream := newLiveMintDataStream(id, mints, streamBufferSize) - m.streamsMu.Lock() + defer m.streamsMu.Unlock() + + if m.stopped { + return nil + } + + stream := newLiveMintDataStream(id, mints, streamBufferSize) m.streams[id] = stream - m.streamsMu.Unlock() return stream }