Refactor indexer client handling of subscriptions#1012
Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 9 minutes and 38 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
WalkthroughConsolidates subscription lifecycle into a single NewSubscription call returning subscription ID, event channel, and cleanup; adds UpdateSubscription; refactors scripts cache to track scripts per subscription ID with replacement resolution; updates reconnect callbacks and stream state to return and carry subscription IDs via metadata. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant IndexerService
participant ScriptsCache
Client->>IndexerService: NewSubscription(ctx, scripts)
IndexerService-->>Client: subscriptionId, eventsCh, closeFn
Client->>ScriptsCache: add(subscriptionId, scripts)
alt connection lost
Client->>IndexerService: Reconnect()
IndexerService-->>Client: newSubscriptionId, newStream
Client->>ScriptsCache: replace(oldId, newSubscriptionId)
Client->>Client: emit RECONNECTED state with Metadata{"id": newSubscriptionId}
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
pkg/client-lib/indexer/grpc/cache.go (1)
62-71: Consider cleaning up replacement chain entries when removing a subscription.When
removeSubscriptionis called, thereplacementsmap entries that point to or from the removed subscription ID are not cleaned up. Over time, this could lead to stale entries accumulating in thereplacementsmap.For a long-running client with many reconnections, this could cause unbounded growth of the
replacementsmap, though the impact is likely minimal since these are just string-to-string mappings.♻️ Optional cleanup of replacement chain
func (s *scriptsCache) removeSubscription(id string) { s.lock.Lock() defer s.lock.Unlock() subId := s.resolveId(id) if _, ok := s.scriptsBySubId[subId]; !ok { return } delete(s.scriptsBySubId, subId) + // Clean up replacement chain entries pointing to this subscription + for k, v := range s.replacements { + if v == subId { + delete(s.replacements, k) + } + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/client-lib/indexer/grpc/cache.go` around lines 62 - 71, removeSubscription in scriptsCache currently deletes scriptsBySubId but leaves stale entries in the replacements map; update removeSubscription (method on type scriptsCache) to additionally clean replacements entries that point to or from the resolved subId: after computing subId via resolveId(id) and deleting from scriptsBySubId, iterate the s.replacements map under the same lock and remove any map entries where the key == subId or the value == subId (to prevent both outgoing and incoming stale links); ensure you operate while s.lock is held and reference the replacements and scriptsBySubId fields by name.pkg/client-lib/indexer/grpc/client.go (2)
548-548: Remove or complete the commented-out Metadata field.This commented-out line appears to be leftover from development. Either remove it or add a
// TODO: Enable in#951`` comment to clarify the intent if it's intentionally deferred.Suggested fix
- // Metadata: event.Metadata, + // TODO(`#951`): Enable Metadata field once stream_retry propagates it. + // Metadata: event.Metadata,Or simply remove if not needed:
- // Metadata: event.Metadata,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/client-lib/indexer/grpc/client.go` at line 548, The commented-out "Metadata: event.Metadata," line in pkg/client-lib/indexer/grpc/client.go is leftover and should be clarified: either remove the commented line entirely or replace it with a clear placeholder comment (e.g. "// TODO: Enable in `#951`") to indicate intentional deferral; locate the commented line referencing event.Metadata within the gRPC request/struct construction (around the function handling event serialization/transport) and apply one of the two fixes so the intent is explicit.
468-487: Minor race condition withcancelFnduring reconnect.If
cancelFn()is invoked whileReconnectis executing (betweena.scripts.get()on line 471 anda.scripts.replace()on line 486), theremoveSubscription()call could clear the cache, causingscriptsto be nil. This would result in subscribing with an empty scripts list.In practice,
closeFn()should stop the retry loop beforeremoveSubscription()runs, making this window very narrow. Given the PR notes this is temporary pending#951, this is acceptable, but consider adding a nil-check guard if reconnect stability becomes an issue.Optional defensive check
Reconnect: func( ctx context.Context, ) (string, arkv1.IndexerService_GetSubscriptionClient, error) { scripts := a.scripts.get(subscriptionId) + if len(scripts) == 0 { + return "", nil, fmt.Errorf("subscription %s no longer exists", subscriptionId) + } resp, err := a.svc().SubscribeForScripts(ctx, &arkv1.SubscribeForScriptsRequest{🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/client-lib/indexer/grpc/client.go` around lines 468 - 487, The Reconnect implementation can race with cancelFn/removeSubscription causing scripts (from a.scripts.get(subscriptionId)) to be nil and leading to an empty SubscribeForScripts call; to fix, add a nil-check and early return before using scripts and before calling a.scripts.replace: in the Reconnect closure (function Reconnect) after fetching scripts via a.scripts.get(subscriptionId) validate that scripts is not nil (or length > 0) and if it is, return an appropriate error so we avoid subscribing with an empty list, and likewise ensure a.scripts.replace(subscriptionId, newSubscriptionId) is only called if scripts was valid; keep references to a.scripts.get, a.svc().SubscribeForScripts, a.svc().GetSubscription, and a.scripts.replace so the changes are easy to locate.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/client-lib/indexer/grpc/cache_test.go`:
- Around line 168-176: The test expectation is wrong because removeScripts
leaves an empty map entry so get (which uses slices.Collect(maps.Keys(scripts)))
returns an empty []string{}, not nil; fix by either updating the test in
cache_test.go for the "remove all scripts" case to expect an empty slice (e.g.,
expected: []string{}) or modify the scriptsCache.removeScripts implementation to
delete the scriptsBySubId entry when its map becomes empty so get returns nil
for nonexistent subs; reference the methods removeScripts, get and the
scriptsBySubId map when making the change.
---
Nitpick comments:
In `@pkg/client-lib/indexer/grpc/cache.go`:
- Around line 62-71: removeSubscription in scriptsCache currently deletes
scriptsBySubId but leaves stale entries in the replacements map; update
removeSubscription (method on type scriptsCache) to additionally clean
replacements entries that point to or from the resolved subId: after computing
subId via resolveId(id) and deleting from scriptsBySubId, iterate the
s.replacements map under the same lock and remove any map entries where the key
== subId or the value == subId (to prevent both outgoing and incoming stale
links); ensure you operate while s.lock is held and reference the replacements
and scriptsBySubId fields by name.
In `@pkg/client-lib/indexer/grpc/client.go`:
- Line 548: The commented-out "Metadata: event.Metadata," line in
pkg/client-lib/indexer/grpc/client.go is leftover and should be clarified:
either remove the commented line entirely or replace it with a clear placeholder
comment (e.g. "// TODO: Enable in `#951`") to indicate intentional deferral;
locate the commented line referencing event.Metadata within the gRPC
request/struct construction (around the function handling event
serialization/transport) and apply one of the two fixes so the intent is
explicit.
- Around line 468-487: The Reconnect implementation can race with
cancelFn/removeSubscription causing scripts (from a.scripts.get(subscriptionId))
to be nil and leading to an empty SubscribeForScripts call; to fix, add a
nil-check and early return before using scripts and before calling
a.scripts.replace: in the Reconnect closure (function Reconnect) after fetching
scripts via a.scripts.get(subscriptionId) validate that scripts is not nil (or
length > 0) and if it is, return an appropriate error so we avoid subscribing
with an empty list, and likewise ensure a.scripts.replace(subscriptionId,
newSubscriptionId) is only called if scripts was valid; keep references to
a.scripts.get, a.svc().SubscribeForScripts, a.svc().GetSubscription, and
a.scripts.replace so the changes are easy to locate.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: afb3aab2-e09c-4594-a2e6-b484c2aacc27
📒 Files selected for processing (10)
pkg/client-lib/client/grpc/client.gopkg/client-lib/funding.gopkg/client-lib/indexer/grpc/cache.gopkg/client-lib/indexer/grpc/cache_test.gopkg/client-lib/indexer/grpc/client.gopkg/client-lib/indexer/grpc/reconnect_get_subscription_stream_test.gopkg/client-lib/indexer/service.gopkg/client-lib/internal/utils/stream_retry.gopkg/client-lib/internal/utils/stream_retry_test.gopkg/client-lib/types/types.go
| At: event.At, | ||
| DisconnectedAt: event.DisconnectedAt, | ||
| Err: event.Err, | ||
| // Metadata: event.Metadata, |
There was a problem hiding this comment.
Arkana Code Review — arkd#1012
Overall: solid refactor that simplifies the subscription lifecycle API. The consolidation of SubscribeForScripts + GetSubscription into NewSubscription is a good ergonomic improvement and the per-subscription script tracking in the cache is the correct fix for the old "all scripts merged into one bag" problem. A few issues to address:
🔴 Memory leak in replacement chain (cache.go)
removeSubscription deletes from scriptsBySubId but never cleans up replacements. Each reconnection adds an entry to replacements via replace(). Over the lifetime of a long-running client with periodic disconnects, this map grows without bound.
cache.go:81-89 (removeSubscription):
func (s *scriptsCache) removeSubscription(id string) {
s.lock.Lock()
defer s.lock.Unlock()
subId := s.resolveId(id)
// ... deletes scriptsBySubId[subId] but replacements chain is leaked
}Fix: walk the replacement chain from id to subId and delete each intermediate entry from s.replacements, then delete the final resolved key.
🟡 Discarded Reconnect return value in stream_retry.go
Reconnect now returns (string, S, error) but stream_retry.go:326 discards the string:
_, newStream, dialErr := cfg.Reconnect(ctx)The StreamConnectionEvent struct (types.go:66) has a new TODO comment saying "Temporary field to be able to pass the new subscription id back to clients after reconnection" — but no field was actually added. The comment is orphaned and misleading. Either:
- Add the field and propagate the new sub ID through
ReconnectingStreamStateEvent→ConnectionEvent, or - Remove the misleading comment since the cache handles it internally now.
Given the PR description says users don't need the new sub ID, option 2 seems right.
🟡 Cross-repo breakage — fulmine
This PR removes SubscribeForScripts, UnsubscribeForScripts, and GetSubscription from the Indexer interface. fulmine directly calls all three:
fulmine/internal/core/application/subscription.go:67—SubscribeForScriptsfulmine/internal/core/application/subscription.go:111—UnsubscribeForScriptsfulmine/internal/core/application/subscription.go:200—GetSubscriptionfulmine/internal/core/application/subscription.go:276—SubscribeForScripts
fulmine will fail to compile once this merges. The PR description says "breaking changes" which is acknowledged, but please confirm there's a coordinated fulmine PR planned (or open it before merging this).
🟡 Concurrent test coverage gap (cache_test.go)
The concurrent access test (line ~420) only exercises add/get/removeScripts. It does not test replace or removeSubscription under concurrent access, which are the new complex code paths with replacement chain walking. Add goroutines that call replace concurrently with add/get to validate there are no data races.
🟢 Minor: empty scripts on reconnect
In client.go NewSubscription reconnect closure: if a.scripts.get(subscriptionId) returns nil (all scripts were removed), the reconnect sends an empty Scripts list to SubscribeForScripts. Depending on server behavior this could be a no-op or an error. Worth adding a guard or documenting the expected server behavior.
✅ What looks good
- Cache
resolveIdcycle prevention is correct — the three guards inreplace(self-ref, existing chain key, existing scripts) prevent all cycle scenarios. cancelFninNewSubscriptioncorrectly composescloseFn()+removeSubscription()— clean teardown.- The
funding.gosimplification from 10 lines to 3 is a clear win. - Test coverage for the cache is thorough with good edge cases (multi-hop chains, no-ops, self-replacement).
Verdict: Request changes for the replacement chain memory leak and the orphaned comment. The cross-repo impact on fulmine needs a plan before merge. The rest is solid work.
🤖 Reviewed by Arkana
There was a problem hiding this comment.
Arkana Follow-up Review — commit e858f45
The new commit addresses the 🔴 replacement chain memory leak I flagged in the previous review. The fix is correct:
removeSubscriptionnow walks the chain bidirectionally — downstream (id → ... → resolved) and upstream viafindAncestor— deleting everyreplacementsentry.findAncestoris O(n) over the map but acceptable since the map stays small and this only runs on teardown.- New test cases ("remove from middle of chain", "remove from tail of chain") cover the critical positions, and
require.Empty(t, c.replacements)is asserted on allremoveSubscriptiontests. Good.
Remaining items from prior review (unchanged, lower severity)
-
Orphaned comment (
types.go:66-67): TheStreamConnectionEventstruct has a TODO comment about passing the new subscription id but no field was added. Still misleading — should be removed or the field added. -
Cross-repo breakage: fulmine still needs a coordinated PR before this merges. Already acknowledged in PR description as breaking.
-
Concurrent test gap (
cache_test.go:~420): The concurrent access test still only coversadd/get/removeScripts. Addingreplace+removeSubscriptionunder goroutine contention would strengthen confidence, but not blocking.
None of these are blockers. The memory leak was the critical issue and it's properly fixed. Approving.
🤖 Reviewed by Arkana
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/client-lib/indexer/grpc/client.go`:
- Around line 575-578: The current existence check uses
a.scripts.get(subscriptionId) and treats an empty script list as "not found",
which breaks re-population because removeScripts leaves an empty bucket; replace
that logic to call the scripts cache existence method (e.g.
scripts.exists(subscriptionId) on the scriptsCache) instead of inspecting the
returned slice length, so the code in the function that references
a.scripts.get(subscriptionId) (and any related checks in
UpdateSubscription/removeScripts paths) uses scripts.exists to decide presence
and only returns the "subscription not found" error when exists returns false.
- Around line 634-669: Both subscribeForScripts and unsubscribeForScripts send
the caller-provided subscriptionId to the server, but after reconnect the server
expects the latest replacement ID; resolve the live ID from the local cache
before making the RPC. In subscribeForScripts and unsubscribeForScripts call
a.scripts.currentID(subscriptionId) (or equivalent) and use that resolved ID to
set req.SubscriptionId prior to invoking a.svc().SubscribeForScripts /
UnsubscribeForScripts; keep the local cache update calls (a.scripts.add /
a.scripts.removeScripts) unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e68ef445-e6c3-4464-a0b9-6d153dcb74d0
📒 Files selected for processing (5)
pkg/client-lib/indexer/grpc/cache.gopkg/client-lib/indexer/grpc/cache_test.gopkg/client-lib/indexer/grpc/client.gopkg/client-lib/internal/utils/stream_retry.gopkg/client-lib/types/types.go
✅ Files skipped from review due to trivial changes (1)
- pkg/client-lib/indexer/grpc/cache_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/client-lib/internal/utils/stream_retry.go
There was a problem hiding this comment.
Arkana Follow-up Review — commits 87a7606, 47fca4e
All items from my two prior reviews are now resolved:
- ✅ Replacement chain memory leak — fixed in prior commit, unchanged
- ✅ Orphaned TODO comment in
types.go— removed - ✅ Concurrent test gap — new
concurrent access with replace and removeSubscriptiontest covers both distinct-ID and same-ID contention. The invariant assertions (no self-replacements, no dangling tails) are the right way to validate non-deterministic concurrent outcomes. - ✅
exists()for subscription lookup —UpdateSubscriptionnow usesexists()instead oflen(get()) <= 0, correctly handling subscriptions with zero scripts remaining - ✅ Resolve subscription ID before RPC —
subscribeForScripts/unsubscribeForScriptsnow callresolveId()before sending to the server, preventing stale-ID failures after reconnection
New code review
resolveId/resolveIdLocked split (cache.go:148-167) — Clean. All internal callers that already hold s.lock use resolveIdLocked; the public resolveId acquires the lock. The findAncestor doc now correctly notes "Caller must hold s.lock."
Pagination (client.go:670-786, paginated_fetch_test.go):
paginatedFetchgeneric is well-structured — correct loop termination,maxPagessafety cap, context-aware throttle viaselect.- Tests cover single page, multi-page, nil page response, throttle timing, error propagation, context cancellation, and max-page exceeded. Solid.
One nit:
client.go:776 comment says "Throttle to avoid hitting the rate limit (20 req/sec)" but maxReqsPerSec = 10. The comment should say 10 req/sec to match the constant.
One observation (not blocking):
GetFullVtxoTree (line ~196) does txMap[txids[i]] = txResp.Txs[i] — positional index mapping between input txids and output txs. Now that GetVirtualTxs can auto-paginate when len(txids) > 1000, this mapping depends on the server returning exactly one tx per txid in input order, preserved across page boundaries. If the server ever skips a txid or reorders, this silently corrupts the tree. Consider switching to a keyed response (txid→tx map) on the proto side in a future PR, or at minimum adding a len(txids) != len(txResp.Txs) guard before the loop.
Cross-repo (fulmine): Still needs a coordinated PR before merge. Already acknowledged.
Approving — all prior blockers resolved, new code is correct and well-tested.
🤖 Reviewed by Arkana
Breaking changes to the indexer client to abstract and hide to the user the handling of the subscription id.
These changes are temporary and will be subject to changes in #951.
The indexer client's cache now tracks subs and related scripts with a map
subId -> (indexed) scripts (map[string]map[string]struct{}), this way scripts are not merged into a single subscription if reconnection happens.The user doesn't even need to be notified about the new subscription id. The indexer client, when reconnecting, keeps track of the chain of subs id replacements, this way the client can just use the original sub id he got when he called
NewSubscriptionin first place.Please @louisinger @sekulicd review
Summary by CodeRabbit
New Features
Tests