Skip to content

Commit 5e0a92e

Browse files
fix : blocl store when 0 peers (#269)
1 parent b3d9a3d commit 5e0a92e

6 files changed

Lines changed: 147 additions & 7 deletions

File tree

p2p/kademlia/dht.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2312,15 +2312,21 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i
23122312
logtrace.Debug(ctx, "Iterate batch store: dispatching to nodes", logtrace.Fields{"task_id": id, "nodes": len(knownNodes)})
23132313

23142314
// If there are no candidate nodes, there's nothing to fan out to. The caller
2315-
// already persisted the batch locally (see StoreBatch), so treat this as a
2316-
// no-op success rather than an error.
2315+
// already persisted the batch locally (see StoreBatch), but local-only
2316+
// persistence is not sufficient for network durability. Treat this as an
2317+
// error so callers do not finalize actions or delete source data under the
2318+
// assumption that replication occurred.
23172319
if len(knownNodes) == 0 {
2318-
logtrace.Info(ctx, "dht: batch store skipped (no candidate nodes)", logtrace.Fields{
2319-
logtrace.FieldModule: "dht",
2320-
"task_id": id,
2321-
"keys": len(values),
2320+
logtrace.Error(ctx, "dht: batch store skipped (no candidate nodes)", logtrace.Fields{
2321+
logtrace.FieldModule: "dht",
2322+
"task_id": id,
2323+
"keys": len(values),
2324+
"len_nodes": len(s.ht.nodes()),
2325+
"banned_nodes": len(ignoreList),
2326+
"routing_allow_ready": s.routingAllowReady.Load(),
2327+
"routing_allow_count": s.routingAllowCount.Load(),
23222328
})
2323-
return nil
2329+
return fmt.Errorf("no candidate nodes for batch store")
23242330
}
23252331
storeResponses := s.batchStoreNetwork(ctx, values, knownNodes, storageMap, typ)
23262332
for response := range storeResponses {
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package kademlia
2+
3+
import (
4+
"context"
5+
"strings"
6+
"testing"
7+
)
8+
9+
func TestIterateBatchStore_NoCandidateNodes_ReturnsError(t *testing.T) {
10+
ctx, cancel := context.WithCancel(context.Background())
11+
defer cancel()
12+
13+
ht, err := NewHashTable(&Options{
14+
ID: []byte("self"),
15+
IP: "0.0.0.0",
16+
Port: 4445,
17+
})
18+
if err != nil {
19+
t.Fatalf("NewHashTable: %v", err)
20+
}
21+
22+
dht := &DHT{
23+
ht: ht,
24+
ignorelist: NewBanList(ctx),
25+
}
26+
27+
err = dht.IterateBatchStore(ctx, [][]byte{[]byte("value")}, 0, "task")
28+
if err == nil {
29+
t.Fatalf("expected error, got nil")
30+
}
31+
if !strings.Contains(err.Error(), "no candidate nodes") {
32+
t.Fatalf("unexpected error: %v", err)
33+
}
34+
}

p2p/p2p.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,16 @@ func (s *p2p) Retrieve(ctx context.Context, key string, localOnly ...bool) ([]by
139139
return s.dht.Retrieve(ctx, key, localOnly...)
140140
}
141141

142+
// PeersCount returns the current number of peers in the routing table.
143+
// This is intentionally not part of the public Client interface; it is used by
144+
// internal guardrails (e.g., registration) to avoid local-only stores.
145+
func (s *p2p) PeersCount() int {
146+
if s == nil || s.dht == nil {
147+
return 0
148+
}
149+
return s.dht.PeersCount()
150+
}
151+
142152
// BatchRetrieve retrive the data from the kademlia network
143153
func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) {
144154

p2p/p2p_peerscount_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package p2p
2+
3+
import "testing"
4+
5+
type peersCounter interface {
6+
PeersCount() int
7+
}
8+
9+
var _ peersCounter = (*p2p)(nil)
10+
11+
func TestPeersCount_NilReceiver_ReturnsZero(t *testing.T) {
12+
var s *p2p
13+
if got := s.PeersCount(); got != 0 {
14+
t.Fatalf("expected 0, got %d", got)
15+
}
16+
}
17+
18+
func TestPeersCount_NilDHT_ReturnsZero(t *testing.T) {
19+
s := &p2p{}
20+
if got := s.PeersCount(); got != 0 {
21+
t.Fatalf("expected 0, got %d", got)
22+
}
23+
}
24+

supernode/adaptors/p2p.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,18 @@ type StoreArtefactsRequest struct {
4545
}
4646

4747
func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) error {
48+
// Registration must never proceed when the node is not connected to any peers.
49+
// Otherwise, StoreBatch can devolve into local-only persistence and actions may
50+
// be finalized without durable replication.
51+
type peersCounter interface {
52+
PeersCount() int
53+
}
54+
if pc, ok := p.p2p.(peersCounter); ok {
55+
if peers := pc.PeersCount(); peers <= 0 {
56+
return fmt.Errorf("p2p has zero peers; refusing to store artefacts (would be non-durable)")
57+
}
58+
}
59+
4860
idFilesBytes := totalBytes(req.IDFiles)
4961
logtrace.Info(ctx, "store: p2p start", logtrace.Fields{
5062
"taskID": req.TaskID,

supernode/adaptors/p2p_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package adaptors
2+
3+
import (
4+
"context"
5+
"errors"
6+
"strings"
7+
"testing"
8+
9+
"github.com/LumeraProtocol/supernode/v2/p2p"
10+
"github.com/LumeraProtocol/supernode/v2/pkg/storage/rqstore"
11+
"go.uber.org/mock/gomock"
12+
)
13+
14+
type clientWithPeersCount struct {
15+
p2p.Client
16+
peers int
17+
}
18+
19+
func (c clientWithPeersCount) PeersCount() int { return c.peers }
20+
21+
func TestStoreArtefacts_ZeroPeers_ReturnsError(t *testing.T) {
22+
svc := NewP2PService(clientWithPeersCount{peers: 0}, nil)
23+
24+
err := svc.StoreArtefacts(context.Background(), StoreArtefactsRequest{TaskID: "task"}, nil)
25+
if err == nil {
26+
t.Fatalf("expected error, got nil")
27+
}
28+
if !strings.Contains(err.Error(), "zero peers") {
29+
t.Fatalf("unexpected error: %v", err)
30+
}
31+
}
32+
33+
func TestStoreArtefacts_PeersPresent_DoesNotTripGuard(t *testing.T) {
34+
ctrl := gomock.NewController(t)
35+
t.Cleanup(ctrl.Finish)
36+
37+
store := rqstore.NewMockStore(ctrl)
38+
storeErr := errors.New("store down")
39+
store.EXPECT().StoreSymbolDirectory("task", "").Return(storeErr)
40+
41+
svc := NewP2PService(clientWithPeersCount{peers: 1}, store)
42+
43+
err := svc.StoreArtefacts(context.Background(), StoreArtefactsRequest{TaskID: "task"}, nil)
44+
if err == nil {
45+
t.Fatalf("expected error, got nil")
46+
}
47+
if strings.Contains(err.Error(), "zero peers") {
48+
t.Fatalf("guard should not have fired, got: %v", err)
49+
}
50+
if !strings.Contains(err.Error(), storeErr.Error()) {
51+
t.Fatalf("expected wrapped store error, got: %v", err)
52+
}
53+
}
54+

0 commit comments

Comments
 (0)