Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,18 @@ paths:
- Chunk
parameters:
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmTagParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
- in: query
name: swarm-tag
schema:
$ref: "SwarmCommon.yaml#/components/schemas/Uid"
required: false
description: "Associate upload with an existing Tag UID (use when WebSocket client cannot set custom headers)"
- in: header
name: swarm-postage-batch-id
description: "ID of Postage Batch that is used to upload data with. Optional when chunks include pre-signed postage stamps."
required: false
schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmAddress"
responses:
"200":
description: "Connection established"
Expand Down
11 changes: 11 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,18 @@ func (s *Service) newStampedPutter(ctx context.Context, opts putterOptions, stam
return nil, errInvalidPostageBatch
}

return s.newStampedPutterWithBatch(ctx, opts, stamp, storedBatch)
}

// newStampedPutterWithBatch creates a stamped putter using a pre-fetched batch.
// This avoids the database lookup when batch info is already cached.
func (s *Service) newStampedPutterWithBatch(ctx context.Context, opts putterOptions, stamp *postage.Stamp, storedBatch *postage.Batch) (storer.PutterSession, error) {
if !opts.Deferred && s.beeMode == DevMode {
return nil, errUnsupportedDevNodeOperation
}

var session storer.PutterSession
var err error
if opts.Deferred || opts.Pin {
session, err = s.storer.Upload(ctx, opts.Pin, opts.TagID)
if err != nil {
Expand Down
194 changes: 165 additions & 29 deletions pkg/api/chunk_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"net/http"
"strconv"
"time"

"github.com/ethersphere/bee/v2/pkg/cac"
Expand All @@ -28,14 +29,27 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques
logger := s.logger.WithName("chunks_stream").Build()

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
BatchID []byte `map:"Swarm-Postage-Batch-Id"` // Optional: omit if caller provides pre-signed stamps per chunk
SwarmTag uint64 `map:"Swarm-Tag"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

// Fallback: read tag from query parameter (browser WebSocket can't set headers)
if headers.SwarmTag == 0 {
Comment thread
acud marked this conversation as resolved.
if qTag := r.URL.Query().Get("swarm-tag"); qTag != "" {
parsed, err := strconv.ParseUint(qTag, 10, 64)
if err != nil {
logger.Debug("invalid swarm-tag query parameter", "value", qTag, "error", err)
jsonhttp.BadRequest(w, "invalid swarm-tag query parameter")
return
}
headers.SwarmTag = parsed
}
}

var (
tag uint64
err error
Expand All @@ -55,29 +69,36 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques
}
}

// if tag not specified use direct upload
// Using context.Background here because the putter's lifetime extends beyond that of the HTTP request.
putter, err := s.newStamperPutter(context.Background(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Deferred: tag != 0,
})
if err != nil {
logger.Debug("get putter failed", "error", err)
logger.Error(nil, "get putter failed")
switch {
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist")
case errors.Is(err, postage.ErrNotFound):
jsonhttp.NotFound(w, "batch with id not found")
case errors.Is(err, errInvalidPostageBatch):
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
default:
jsonhttp.BadRequest(w, nil)
// Create connection-level putter only if BatchID is provided.
// If BatchID is not provided, the API caller is expected to provide
// pre-signed stamps with each chunk (and is also expected to keep
// track of stamp state over time).
var putter storer.PutterSession
if len(headers.BatchID) > 0 {
// if tag not specified use direct upload
// Using context.Background here because the putter's lifetime extends beyond that of the HTTP request.
putter, err = s.newStamperPutter(context.Background(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Deferred: tag != 0,
})
if err != nil {
logger.Debug("get putter failed", "error", err)
logger.Error(nil, "get putter failed")
switch {
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist")
case errors.Is(err, postage.ErrNotFound):
jsonhttp.NotFound(w, "batch with id not found")
case errors.Is(err, errInvalidPostageBatch):
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
default:
jsonhttp.BadRequest(w, nil)
}
return
}
return
}

upgrader := websocket.Upgrader{
Expand All @@ -95,13 +116,46 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques
}

s.wsWg.Add(1)
go s.handleUploadStream(logger, wsConn, putter)
var decode chunkDecoder
if len(headers.BatchID) > 0 {
decode = decodeChunkWithoutStamp
} else {
decode = decodeChunkWithStamp
}
go s.handleUploadStream(logger, wsConn, putter, tag, decode)
}

// chunkDecoder extracts chunk data and optionally a stamp from a websocket message.
// When BatchID is provided in headers, decodeChunkWithoutStamp is used (no stamp in message).
// When BatchID is not provided, decodeChunkWithStamp is used (stamp prepended to chunk data).
type chunkDecoder func(msg []byte) (chunkData []byte, stamp *postage.Stamp, err error)

// decodeChunkWithoutStamp returns the message as-is (used when BatchID provided in headers).
func decodeChunkWithoutStamp(msg []byte) ([]byte, *postage.Stamp, error) {
return msg, nil, nil
}

// decodeChunkWithStamp extracts a stamp from the first 113 bytes of the message.
// Returns an error if the message is too small or the stamp is invalid.
func decodeChunkWithStamp(msg []byte) ([]byte, *postage.Stamp, error) {
if len(msg) < postage.StampSize+swarm.SpanSize {
return nil, nil, errors.New("message too small for stamp + chunk")
}

stamp := &postage.Stamp{}
if err := stamp.UnmarshalBinary(msg[:postage.StampSize]); err != nil {
return nil, nil, errors.New("invalid stamp")
}

return msg[postage.StampSize:], stamp, nil
}

func (s *Service) handleUploadStream(
logger log.Logger,
conn *websocket.Conn,
putter storer.PutterSession,
tag uint64,
decode chunkDecoder,
) {
defer s.wsWg.Done()

Expand All @@ -111,11 +165,23 @@ func (s *Service) handleUploadStream(
gone = make(chan struct{})
err error
)

// Cache for batch validation to avoid database lookups for every chunk
// Key: batch ID, Value: stored batch info
// This avoids the expensive batchStore.Get() call for each chunk
batchCache := make(map[string]*postage.Batch)

defer func() {
cancel()
_ = conn.Close()
if err = putter.Done(swarm.ZeroAddress); err != nil {
logger.Error(err, "chunk upload stream: syncing chunks failed")

// No cleanup needed for batch cache - it's just metadata

// Only call Done on connection-level putter if it exists
if putter != nil {
if err = putter.Done(swarm.ZeroAddress); err != nil {
logger.Error(err, "chunk upload stream: syncing chunks failed")
}
}
}()

Expand Down Expand Up @@ -190,17 +256,80 @@ func (s *Service) handleUploadStream(
return
}

chunk, err := cac.NewWithDataSpan(msg)
// Decode the message using the appropriate decoder
chunkData, stamp, err := decode(msg)
if err != nil {
logger.Debug("chunk upload stream: decode failed", "error", err)
logger.Error(nil, "chunk upload stream: "+err.Error())
sendErrorClose(websocket.CloseInternalServerErr, err.Error())
return
}

// Determine the putter to use
var (
chunk swarm.Chunk
chunkPutter = putter
)

// If stamp was extracted, create a per-chunk putter
if stamp != nil {
batchID := stamp.BatchID()
batchIDKey := string(batchID)

storedBatch, exists := batchCache[batchIDKey]
if !exists {
storedBatch, err = s.batchStore.Get(batchID)
if err != nil {
logger.Debug("chunk upload stream: batch validation failed", "error", err)
logger.Error(nil, "chunk upload stream: batch validation failed")
if errors.Is(err, storage.ErrNotFound) {
sendErrorClose(websocket.CloseInternalServerErr, "batch not found")
} else {
sendErrorClose(websocket.CloseInternalServerErr, "batch validation failed")
}
return
}
batchCache[batchIDKey] = storedBatch
}

chunkPutter, err = s.newStampedPutterWithBatch(ctx, putterOptions{
BatchID: batchID,
TagID: tag,
Deferred: tag != 0,
}, stamp, storedBatch)
if err != nil {
logger.Debug("chunk upload stream: failed to create stamped putter", "error", err)
logger.Error(nil, "chunk upload stream: failed to create stamped putter")
switch {
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
sendErrorClose(websocket.CloseInternalServerErr, "batch not usable")
case errors.Is(err, postage.ErrNotFound):
sendErrorClose(websocket.CloseInternalServerErr, "batch not found")
default:
sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed")
}
return
}
}

chunk, err = cac.NewWithDataSpan(chunkData)
if err != nil {
Copy link
Copy Markdown
Contributor

@akrem-chabchoub akrem-chabchoub Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we can use chunkPutter.Cleanup() here ?

logger.Debug("chunk upload stream: create chunk failed", "error", err)
logger.Debug("chunk upload stream: create chunk failed", "error", err, "chunk_size", len(chunkData))
logger.Error(nil, "chunk upload stream: create chunk failed")
if chunkPutter != putter {
_ = chunkPutter.Cleanup()
}
sendErrorClose(websocket.CloseInternalServerErr, "invalid chunk data")
return
}

err = putter.Put(ctx, chunk)
err = chunkPutter.Put(ctx, chunk)
if err != nil {
Copy link
Copy Markdown
Contributor

@akrem-chabchoub akrem-chabchoub Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we can use chunkPutter.Cleanup() here ?

logger.Debug("chunk upload stream: write chunk failed", "address", chunk.Address(), "error", err)
logger.Error(nil, "chunk upload stream: write chunk failed")
if chunkPutter != putter {
_ = chunkPutter.Cleanup()
}
switch {
case errors.Is(err, postage.ErrBucketFull):
sendErrorClose(websocket.CloseInternalServerErr, "batch is overissued")
Expand All @@ -210,6 +339,13 @@ func (s *Service) handleUploadStream(
return
}

// Clean up per-chunk putter
if chunkPutter != putter {
if err := chunkPutter.Done(swarm.ZeroAddress); err != nil {
logger.Error(err, "chunk upload stream: failed to finalize per-chunk putter")
}
}

err = sendMsg(websocket.BinaryMessage, successWsMsg)
if err != nil {
s.logger.Debug("chunk upload stream: sending success message failed", "error", err)
Expand Down
Loading
Loading