Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ocp/rpc/currency/live_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (s *currencyServer) StreamLiveMintData(

// Register stream with state worker
stream := s.liveMintStateWorker.registerStream(streamID, requestedMints)
if stream == nil {
return status.Error(codes.Unavailable, "server is shutting down")
}
defer s.liveMintStateWorker.unregisterStream(streamID)

log.Debug("stream registered")
Expand Down
6 changes: 4 additions & 2 deletions ocp/rpc/currency/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ func NewCurrencyServer(
antispamGuard *antispam.Guard,
s3Client *s3.Client,
configProvider ConfigProvider,
) currencypb.CurrencyServer {
) (currencypb.CurrencyServer, func()) {
conf := configProvider()

liveMintStateWorker := newLiveMintStateWorker(log, data, conf)
liveMintStateWorker.start(context.Background())

return &currencyServer{
s := &currencyServer{
log: log,

conf: conf,
Expand All @@ -78,4 +78,6 @@ func NewCurrencyServer(

liveMintStateWorker: liveMintStateWorker,
}

return s, s.liveMintStateWorker.stop
}
19 changes: 14 additions & 5 deletions ocp/rpc/currency/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type liveMintStateWorker struct {

streamsMu sync.RWMutex
streams map[string]*liveMintDataStream
stopped bool

exchangeRatesReady chan struct{}
exchangeRatesReadyOnce sync.Once
Expand Down Expand Up @@ -89,13 +90,16 @@ func (m *liveMintStateWorker) start(ctx context.Context) error {
return nil
}

// stop cancels the polling goroutines and closes all streams
// stop cancels the polling goroutines and closes all streams.
// After stop is called, no new streams can be registered.
func (m *liveMintStateWorker) stop() {
m.cancel()

m.streamsMu.Lock()
defer m.streamsMu.Unlock()

m.stopped = true

for _, stream := range m.streams {
stream.close()
}
Expand Down Expand Up @@ -152,13 +156,18 @@ func (m *liveMintStateWorker) trackMints(ctx context.Context, mints []*common.Ac
return nil
}

// registerStream creates and registers a new stream for the given mints
// registerStream creates and registers a new stream for the given mints.
// Returns nil if the worker has been stopped.
func (m *liveMintStateWorker) registerStream(id string, mints []*common.Account) *liveMintDataStream {
stream := newLiveMintDataStream(id, mints, streamBufferSize)

m.streamsMu.Lock()
defer m.streamsMu.Unlock()

if m.stopped {
return nil
}

stream := newLiveMintDataStream(id, mints, streamBufferSize)
m.streams[id] = stream
m.streamsMu.Unlock()

return stream
}
Expand Down
Loading