-
Notifications
You must be signed in to change notification settings - Fork 386
feat: per-chunk stamped putter for WebSocket chunk uploads #5256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
acud
merged 7 commits into
ethersphere:master
from
agazso:fix/chunk-stream-stamped-putter
Apr 7, 2026
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
d5b6ca7
feat: per-chunk stamped putter for WebSocket chunk uploads
agazso 0430d51
fix: openapi
agazso d3676b4
fix: error handling
agazso 168a40e
fix: add tests
agazso 3b2bd9a
fix: linter
agazso 811edc0
fix: chunkPutter cleanup
agazso c860af6
fix: comment
agazso File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ import ( | |
| "context" | ||
| "errors" | ||
| "net/http" | ||
| "strconv" | ||
| "time" | ||
|
|
||
| "github.com/ethersphere/bee/v2/pkg/cac" | ||
|
|
@@ -28,14 +29,27 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques | |
| logger := s.logger.WithName("chunks_stream").Build() | ||
|
|
||
| headers := struct { | ||
| BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` | ||
| BatchID []byte `map:"Swarm-Postage-Batch-Id"` // Optional: omit if caller provides pre-signed stamps per chunk | ||
| SwarmTag uint64 `map:"Swarm-Tag"` | ||
| }{} | ||
| if response := s.mapStructure(r.Header, &headers); response != nil { | ||
| response("invalid header params", logger, w) | ||
| return | ||
| } | ||
|
|
||
| // Fallback: read tag from query parameter (browser WebSocket can't set headers) | ||
| if headers.SwarmTag == 0 { | ||
| if qTag := r.URL.Query().Get("swarm-tag"); qTag != "" { | ||
| parsed, err := strconv.ParseUint(qTag, 10, 64) | ||
| if err != nil { | ||
| logger.Debug("invalid swarm-tag query parameter", "value", qTag, "error", err) | ||
| jsonhttp.BadRequest(w, "invalid swarm-tag query parameter") | ||
| return | ||
| } | ||
| headers.SwarmTag = parsed | ||
| } | ||
| } | ||
|
|
||
| var ( | ||
| tag uint64 | ||
| err error | ||
|
|
@@ -55,29 +69,36 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques | |
| } | ||
| } | ||
|
|
||
| // if tag not specified use direct upload | ||
| // Using context.Background here because the putter's lifetime extends beyond that of the HTTP request. | ||
| putter, err := s.newStamperPutter(context.Background(), putterOptions{ | ||
| BatchID: headers.BatchID, | ||
| TagID: tag, | ||
| Deferred: tag != 0, | ||
| }) | ||
| if err != nil { | ||
| logger.Debug("get putter failed", "error", err) | ||
| logger.Error(nil, "get putter failed") | ||
| switch { | ||
| case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): | ||
| jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist") | ||
| case errors.Is(err, postage.ErrNotFound): | ||
| jsonhttp.NotFound(w, "batch with id not found") | ||
| case errors.Is(err, errInvalidPostageBatch): | ||
| jsonhttp.BadRequest(w, "invalid batch id") | ||
| case errors.Is(err, errUnsupportedDevNodeOperation): | ||
| jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) | ||
| default: | ||
| jsonhttp.BadRequest(w, nil) | ||
| // Create connection-level putter only if BatchID is provided. | ||
| // If BatchID is not provided, the API caller is expected to provide | ||
| // pre-signed stamps with each chunk (and is also expected to keep | ||
| // track of stamp state over time). | ||
| var putter storer.PutterSession | ||
| if len(headers.BatchID) > 0 { | ||
| // if tag not specified use direct upload | ||
| // Using context.Background here because the putter's lifetime extends beyond that of the HTTP request. | ||
| putter, err = s.newStamperPutter(context.Background(), putterOptions{ | ||
| BatchID: headers.BatchID, | ||
| TagID: tag, | ||
| Deferred: tag != 0, | ||
| }) | ||
| if err != nil { | ||
| logger.Debug("get putter failed", "error", err) | ||
| logger.Error(nil, "get putter failed") | ||
| switch { | ||
| case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): | ||
| jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist") | ||
| case errors.Is(err, postage.ErrNotFound): | ||
| jsonhttp.NotFound(w, "batch with id not found") | ||
| case errors.Is(err, errInvalidPostageBatch): | ||
| jsonhttp.BadRequest(w, "invalid batch id") | ||
| case errors.Is(err, errUnsupportedDevNodeOperation): | ||
| jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) | ||
| default: | ||
| jsonhttp.BadRequest(w, nil) | ||
| } | ||
| return | ||
| } | ||
| return | ||
| } | ||
|
|
||
| upgrader := websocket.Upgrader{ | ||
|
|
@@ -95,13 +116,46 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques | |
| } | ||
|
|
||
| s.wsWg.Add(1) | ||
| go s.handleUploadStream(logger, wsConn, putter) | ||
| var decode chunkDecoder | ||
| if len(headers.BatchID) > 0 { | ||
| decode = decodeChunkWithoutStamp | ||
| } else { | ||
| decode = decodeChunkWithStamp | ||
| } | ||
| go s.handleUploadStream(logger, wsConn, putter, tag, decode) | ||
| } | ||
|
|
||
| // chunkDecoder extracts chunk data and optionally a stamp from a websocket message. | ||
| // When BatchID is provided in headers, decodeChunkWithoutStamp is used (no stamp in message). | ||
| // When BatchID is not provided, decodeChunkWithStamp is used (stamp prepended to chunk data). | ||
| type chunkDecoder func(msg []byte) (chunkData []byte, stamp *postage.Stamp, err error) | ||
|
|
||
| // decodeChunkWithoutStamp returns the message as-is (used when BatchID provided in headers). | ||
| func decodeChunkWithoutStamp(msg []byte) ([]byte, *postage.Stamp, error) { | ||
| return msg, nil, nil | ||
| } | ||
|
|
||
| // decodeChunkWithStamp extracts a stamp from the first 113 bytes of the message. | ||
| // Returns an error if the message is too small or the stamp is invalid. | ||
| func decodeChunkWithStamp(msg []byte) ([]byte, *postage.Stamp, error) { | ||
| if len(msg) < postage.StampSize+swarm.SpanSize { | ||
| return nil, nil, errors.New("message too small for stamp + chunk") | ||
| } | ||
|
|
||
| stamp := &postage.Stamp{} | ||
| if err := stamp.UnmarshalBinary(msg[:postage.StampSize]); err != nil { | ||
| return nil, nil, errors.New("invalid stamp") | ||
| } | ||
|
|
||
| return msg[postage.StampSize:], stamp, nil | ||
| } | ||
|
|
||
| func (s *Service) handleUploadStream( | ||
| logger log.Logger, | ||
| conn *websocket.Conn, | ||
| putter storer.PutterSession, | ||
| tag uint64, | ||
| decode chunkDecoder, | ||
| ) { | ||
| defer s.wsWg.Done() | ||
|
|
||
|
|
@@ -111,11 +165,23 @@ func (s *Service) handleUploadStream( | |
| gone = make(chan struct{}) | ||
| err error | ||
| ) | ||
|
|
||
| // Cache for batch validation to avoid database lookups for every chunk | ||
| // Key: batch ID, Value: stored batch info | ||
| // This avoids the expensive batchStore.Get() call for each chunk | ||
| batchCache := make(map[string]*postage.Batch) | ||
|
|
||
| defer func() { | ||
| cancel() | ||
| _ = conn.Close() | ||
| if err = putter.Done(swarm.ZeroAddress); err != nil { | ||
| logger.Error(err, "chunk upload stream: syncing chunks failed") | ||
|
|
||
| // No cleanup needed for batch cache - it's just metadata | ||
|
|
||
| // Only call Done on connection-level putter if it exists | ||
| if putter != nil { | ||
| if err = putter.Done(swarm.ZeroAddress); err != nil { | ||
| logger.Error(err, "chunk upload stream: syncing chunks failed") | ||
| } | ||
| } | ||
| }() | ||
|
|
||
|
|
@@ -190,17 +256,80 @@ func (s *Service) handleUploadStream( | |
| return | ||
| } | ||
|
|
||
| chunk, err := cac.NewWithDataSpan(msg) | ||
| // Decode the message using the appropriate decoder | ||
| chunkData, stamp, err := decode(msg) | ||
| if err != nil { | ||
| logger.Debug("chunk upload stream: decode failed", "error", err) | ||
| logger.Error(nil, "chunk upload stream: "+err.Error()) | ||
| sendErrorClose(websocket.CloseInternalServerErr, err.Error()) | ||
| return | ||
| } | ||
|
|
||
| // Determine the putter to use | ||
| var ( | ||
| chunk swarm.Chunk | ||
| chunkPutter = putter | ||
| ) | ||
|
|
||
| // If stamp was extracted, create a per-chunk putter | ||
| if stamp != nil { | ||
| batchID := stamp.BatchID() | ||
| batchIDKey := string(batchID) | ||
|
|
||
| storedBatch, exists := batchCache[batchIDKey] | ||
| if !exists { | ||
| storedBatch, err = s.batchStore.Get(batchID) | ||
| if err != nil { | ||
| logger.Debug("chunk upload stream: batch validation failed", "error", err) | ||
| logger.Error(nil, "chunk upload stream: batch validation failed") | ||
| if errors.Is(err, storage.ErrNotFound) { | ||
| sendErrorClose(websocket.CloseInternalServerErr, "batch not found") | ||
| } else { | ||
| sendErrorClose(websocket.CloseInternalServerErr, "batch validation failed") | ||
| } | ||
| return | ||
| } | ||
| batchCache[batchIDKey] = storedBatch | ||
| } | ||
|
|
||
| chunkPutter, err = s.newStampedPutterWithBatch(ctx, putterOptions{ | ||
| BatchID: batchID, | ||
| TagID: tag, | ||
| Deferred: tag != 0, | ||
| }, stamp, storedBatch) | ||
| if err != nil { | ||
| logger.Debug("chunk upload stream: failed to create stamped putter", "error", err) | ||
| logger.Error(nil, "chunk upload stream: failed to create stamped putter") | ||
| switch { | ||
| case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): | ||
| sendErrorClose(websocket.CloseInternalServerErr, "batch not usable") | ||
| case errors.Is(err, postage.ErrNotFound): | ||
| sendErrorClose(websocket.CloseInternalServerErr, "batch not found") | ||
| default: | ||
| sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed") | ||
| } | ||
| return | ||
| } | ||
| } | ||
|
|
||
| chunk, err = cac.NewWithDataSpan(chunkData) | ||
| if err != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if we can use |
||
| logger.Debug("chunk upload stream: create chunk failed", "error", err) | ||
| logger.Debug("chunk upload stream: create chunk failed", "error", err, "chunk_size", len(chunkData)) | ||
| logger.Error(nil, "chunk upload stream: create chunk failed") | ||
| if chunkPutter != putter { | ||
| _ = chunkPutter.Cleanup() | ||
| } | ||
| sendErrorClose(websocket.CloseInternalServerErr, "invalid chunk data") | ||
| return | ||
| } | ||
|
|
||
| err = putter.Put(ctx, chunk) | ||
| err = chunkPutter.Put(ctx, chunk) | ||
| if err != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if we can use |
||
| logger.Debug("chunk upload stream: write chunk failed", "address", chunk.Address(), "error", err) | ||
| logger.Error(nil, "chunk upload stream: write chunk failed") | ||
| if chunkPutter != putter { | ||
| _ = chunkPutter.Cleanup() | ||
| } | ||
| switch { | ||
| case errors.Is(err, postage.ErrBucketFull): | ||
| sendErrorClose(websocket.CloseInternalServerErr, "batch is overissued") | ||
|
|
@@ -210,6 +339,13 @@ func (s *Service) handleUploadStream( | |
| return | ||
| } | ||
|
|
||
| // Clean up per-chunk putter | ||
| if chunkPutter != putter { | ||
| if err := chunkPutter.Done(swarm.ZeroAddress); err != nil { | ||
| logger.Error(err, "chunk upload stream: failed to finalize per-chunk putter") | ||
| } | ||
| } | ||
|
|
||
| err = sendMsg(websocket.BinaryMessage, successWsMsg) | ||
| if err != nil { | ||
| s.logger.Debug("chunk upload stream: sending success message failed", "error", err) | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.