Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions cmd/cartesi-rollups-cli/root/app/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package status

import (
"bufio"
"fmt"
"os"
"strings"
Expand All @@ -15,6 +16,8 @@ import (
"github.com/cartesi/rollups-node/internal/repository/factory"
)

var yesFlag bool

var Cmd = &cobra.Command{
Use: "status [app-name-or-address] [new-status]",
Short: "Display or set application status (enabled or disabled)",
Expand All @@ -31,9 +34,14 @@ cartesi-rollups-cli app status echo-dapp

# Set application status:
cartesi-rollups-cli app status echo-dapp enabled
cartesi-rollups-cli app status echo-dapp disabled`
cartesi-rollups-cli app status echo-dapp disabled

# Re-enable a FAILED application without confirmation prompt:
cartesi-rollups-cli app status echo-dapp enabled --yes`

func init() {
Cmd.Flags().BoolVarP(&yesFlag, "yes", "y", false, "Skip confirmation prompts")

origHelpFunc := Cmd.HelpFunc()
Cmd.SetHelpFunc(func(command *cobra.Command, strings []string) {
command.Flags().Lookup("verbose").Hidden = false
Expand Down Expand Up @@ -62,17 +70,26 @@ func run(cmd *cobra.Command, args []string) {
os.Exit(1)
}

// If no new status is provided, just display the current status
// If no new status is provided, display the current status and reason
if len(args) == 1 {
fmt.Println(app.State)
if app.Reason != nil && *app.Reason != "" {
fmt.Printf("Reason: %s\n", *app.Reason)
}
os.Exit(0)
}

// Handle status change
newStatus := strings.ToLower(args[1])

if app.State == model.ApplicationState_Inoperable {
fmt.Fprintf(os.Stderr, "Error: Cannot execute operation. Application %s is in %s state\n", app.Name, app.State)
fmt.Fprintf(os.Stderr,
"Error: Cannot change state of application %s. It is INOPERABLE (irrecoverable).\n",
app.Name)
if app.Reason != nil {
fmt.Fprintf(os.Stderr, "Reason: %s\n", *app.Reason)
}
fmt.Fprintf(os.Stderr, "Use 'app remove' to remove this application.\n")
os.Exit(1)
}

Expand All @@ -92,6 +109,32 @@ func run(cmd *cobra.Command, args []string) {
os.Exit(0)
}

// Re-enabling a FAILED application requires confirmation
if app.State == model.ApplicationState_Failed && targetState == model.ApplicationState_Enabled && !yesFlag {
fmt.Printf("Application %q is in FAILED state.\n", app.Name)
if app.Reason != nil {
fmt.Printf("Reason: %s\n", *app.Reason)
}
fmt.Println("Re-enabling will attempt to restart processing from the last snapshot.")
fmt.Print("Proceed? [y/N] ")
scanner := bufio.NewScanner(os.Stdin)
scanner.Scan()
if scanErr := scanner.Err(); scanErr != nil {
fmt.Fprintf(os.Stderr, "Error reading input: %v\n", scanErr)
os.Exit(1)
}
answer := strings.TrimSpace(strings.ToLower(scanner.Text()))
if answer != "y" && answer != "yes" {
fmt.Println("Aborted.")
os.Exit(0)
}
Comment on lines +119 to +130

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create an issue to make this an utility function in the future and reuse this in places like here, here, and here?

}

// Show failure reason when changing state away from FAILED
if app.State == model.ApplicationState_Failed && app.Reason != nil && *app.Reason != "" {
fmt.Printf("Previous failure reason: %s\n", *app.Reason)
}

err = repo.UpdateApplicationState(ctx, app.ID, targetState, nil)
cobra.CheckErr(err)

Expand Down
100 changes: 60 additions & 40 deletions internal/advancer/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"strings"

"github.com/cartesi/rollups-node/internal/appstatus"
"github.com/cartesi/rollups-node/internal/manager"
. "github.com/cartesi/rollups-node/internal/model"
"github.com/cartesi/rollups-node/internal/repository"
Expand Down Expand Up @@ -57,8 +58,11 @@ func getUnprocessedInputs(
return repo.ListInputs(ctx, appAddress, f, repository.Pagination{Limit: batchSize}, false)
}

// Step performs one processing cycle of the advancer
// It updates machines, gets unprocessed inputs, processes them, and updates epochs
// Step performs one processing cycle of the advancer.
// It updates machines, gets unprocessed inputs, processes them, and updates epochs.
// Per-app errors are accumulated so that a failure in one application does not block
// processing of other healthy applications. Context cancellation is always propagated
// immediately.
func (s *Service) Step(ctx context.Context) error {
// Check for context cancellation
if err := ctx.Err(); err != nil {
Expand All @@ -74,38 +78,53 @@ func (s *Service) Step(ctx context.Context) error {
// Get all applications with active machines
apps := s.machineManager.Applications()

// Process inputs for each application
// Process inputs for each application, accumulating per-app errors
var errs []error
for _, app := range apps {
appAddress := app.IApplicationAddress.String()
if err := s.stepApp(ctx, app); err != nil {
// Context cancellation means the node is shutting down — stop immediately.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return err
}
errs = append(errs, err)
}
}

epochs, _, err := getUnprocessedEpochs(ctx, s.repository, appAddress)
if err != nil {
return errors.Join(errs...)
}

// stepApp processes all unprocessed epochs and inputs for a single application.
func (s *Service) stepApp(ctx context.Context, app *Application) error {
appAddress := app.IApplicationAddress.String()

epochs, _, err := getUnprocessedEpochs(ctx, s.repository, appAddress)
if err != nil {
return err
}

for _, epoch := range epochs {
if err := s.processEpochInputs(ctx, app, epoch.Index); err != nil {
return err
}

for _, epoch := range epochs {
if err := s.processEpochInputs(ctx, app, epoch.Index); err != nil {
return err
}
if epoch.Status == EpochStatus_Closed {
if allProcessed, perr := s.isAllEpochInputsProcessed(app, epoch); perr == nil && allProcessed {
err := s.handleEpochAfterInputsProcessed(ctx, app, epoch)
if err != nil {
return err
}

if epoch.Status == EpochStatus_Closed {
if allProcessed, perr := s.isAllEpochInputsProcessed(app, epoch); perr == nil && allProcessed {
err := s.handleEpochAfterInputsProcessed(ctx, app, epoch)
if err != nil {
return err
}

// Update epochs to mark inputs as processed
err = s.repository.UpdateEpochInputsProcessed(ctx, appAddress, epoch.Index)
if err != nil {
return err
}
s.Logger.Info("Epoch updated to Inputs Processed", "application", app.Name, "epoch_index", epoch.Index)
} else if perr != nil {
return perr
} else {
break // some inputs were not processed yet, check next time
// Update epochs to mark inputs as processed
err = s.repository.UpdateEpochInputsProcessed(ctx, appAddress, epoch.Index)
if err != nil {
return err
}
s.Logger.Info("Epoch updated to Inputs Processed",
"application", app.Name, "epoch_index", epoch.Index)
} else if perr != nil {
return perr
} else {
break // some inputs were not processed yet, check next time
}
}
}
Expand Down Expand Up @@ -177,27 +196,27 @@ func (s *Service) processInputs(ctx context.Context, app *Application, inputs []
result, err := machine.Advance(ctx, input.RawData, input.EpochIndex, input.Index, app.IsDaveConsensus())
input.RawData = nil // allow GC to collect payload while batch continues
if err != nil {
// If there's an error, mark the application as inoperable
// If there's an error, mark the application as failed
s.Logger.Error("Error executing advance",
"application", app.Name,
"index", input.Index,
"error", err)

// If the error is due to context cancellation, don't mark as inoperable
// If the error is due to context cancellation, don't mark as failed
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return err
}

reason := err.Error()
updateErr := s.repository.UpdateApplicationState(ctx, app.ID, ApplicationState_Inoperable, &reason)
if updateErr != nil {
s.Logger.Error("Failed to update application state",
"application", app.Name,
"error", updateErr)
if dbErr := appstatus.SetFailed(ctx, s.Logger, s.repository, app, err.Error()); dbErr != nil {
s.Logger.Error("Failed to persist FAILED state — machine will be closed "+
"but app remains ENABLED in DB; it will be re-created from the "+
"last snapshot on the next tick. If the root cause persists, "+
"this may loop.",
"application", app.Name, "db_error", dbErr)
}

// Eagerly close the machine to release the child process.
// The app is already inoperable, so no further operations will succeed.
// The app has failed, so no further operations will succeed.
// Skip if the runtime was already destroyed inside the manager.
if !errors.Is(err, manager.ErrMachineClosed) {
if closeErr := machine.Close(); closeErr != nil {
Expand Down Expand Up @@ -320,11 +339,12 @@ func (s *Service) handleEpochAfterInputsProcessed(ctx context.Context, app *Appl
outputsProof, err := machine.OutputsProof(ctx)
if err != nil {
// If the runtime was destroyed (e.g., child process crashed),
// mark the app inoperable to avoid an infinite retry loop.
// mark the app as failed to avoid an infinite retry loop.
if errors.Is(err, manager.ErrMachineClosed) {
reason := err.Error()
_ = s.repository.UpdateApplicationState(ctx, app.ID,
ApplicationState_Inoperable, &reason)
if dbErr := appstatus.SetFailed(ctx, s.Logger, s.repository, app, err.Error()); dbErr != nil {
s.Logger.Error("Failed to persist FAILED state for crashed machine",
"application", app.Name, "db_error", dbErr)
}
}
return fmt.Errorf("failed to get outputs proof from machine: %w", err)
}
Expand Down
58 changes: 57 additions & 1 deletion internal/advancer/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,49 @@ func (s *AdvancerSuite) TestStep() {
require.Nil(err)
require.Len(env.repo.StoredResults, 0)
})

s.Run("FailedAppDoesNotBlockOtherApps", func() {
require := s.Require()

mm := newMockMachineManager()
app1 := newMockMachine(1) // will fail
app2 := newMockMachine(2) // should still be processed
mm.Map[1] = newMockInstance(app1)
mm.Map[2] = newMockInstance(app2)
res2 := randomAdvanceResult(0)

repo := &MockRepository{
GetEpochsReturn: map[common.Address][]*Epoch{
app1.Application.IApplicationAddress: {
{Index: 0, Status: EpochStatus_Open},
},
app2.Application.IApplicationAddress: {
{Index: 0, Status: EpochStatus_Open},
},
},
GetInputsReturn: map[common.Address][]*Input{
app1.Application.IApplicationAddress: {
newInput(app1.Application.ID, 0, 0, []byte("advance error")),
},
app2.Application.IApplicationAddress: {
newInput(app2.Application.ID, 0, 0, marshal(res2)),
},
},
}

svc, err := newMockAdvancerService(mm, repo)
require.NoError(err)

err = svc.Step(context.Background())
// Step returns a combined error but the healthy app was still processed
require.Error(err)
require.Contains(err.Error(), "advance error")
require.Equal(1, repo.ApplicationStateUpdates)
require.Equal(ApplicationState_Failed, repo.LastApplicationState)

// app2's input was processed despite app1's failure
require.Len(repo.StoredResults, 1)
})
}

func (s *AdvancerSuite) TestGetUnprocessedInputs() {
Expand Down Expand Up @@ -262,7 +305,7 @@ func (s *AdvancerSuite) TestProcess() {
err := env.service.processInputs(context.Background(), env.app.Application, inputs)
require.Error(err)
require.Equal(1, env.repo.ApplicationStateUpdates)
require.Equal(ApplicationState_Inoperable, env.repo.LastApplicationState)
require.Equal(ApplicationState_Failed, env.repo.LastApplicationState)
require.NotNil(env.repo.LastApplicationStateReason)
require.Equal("advance error", *env.repo.LastApplicationStateReason)
})
Expand Down Expand Up @@ -640,6 +683,19 @@ func (s *AdvancerSuite) TestHandleEpochAfterInputsProcessed() {
require.Contains(err.Error(), "proof error")
})

s.Run("EmptyEpochIndex0ErrMachineClosedMarksAppFailed", func() {
require := s.Require()
env := s.setupOneApp()
env.app.OutputsProofError = manager.ErrMachineClosed

epoch := &Epoch{Index: 0, Status: EpochStatus_Closed, InputIndexLowerBound: 0, InputIndexUpperBound: 0}
err := env.service.handleEpochAfterInputsProcessed(context.Background(), env.app.Application, epoch)
require.Error(err)
require.ErrorIs(err, manager.ErrMachineClosed)
require.Equal(1, env.repo.ApplicationStateUpdates)
require.Equal(ApplicationState_Failed, env.repo.LastApplicationState)
})

s.Run("EmptyEpochIndexGt0RepeatsPreviousProof", func() {
require := s.Require()
env := s.setupOneApp()
Expand Down
Loading
Loading