Conversation
| }) | ||
|
|
||
| return exitCode, request.ContainerId, err | ||
| return -1, "", fmt.Errorf("checkpoint not found") |
There was a problem hiding this comment.
The parent function already handles this exact intent of running the code without checkpoints as a fallback. This is just duplicated.
| log.Error().Str("container_id", request.ContainerId).Msgf("failed to update checkpoint state: %v", updateStateErr) | ||
| } | ||
|
|
||
| err = exposeNetwork() |
There was a problem hiding this comment.
When we are checkpointing, we do not want requests to taint the checkpoint. It should only checkpoint the ready state of the pod and not any running requests. In the case of an API server, the running request would be a connection.
pkg/worker/lifecycle.go
Outdated
| return | ||
| } | ||
| } | ||
| // for idx, bindPort := range opts.BindPorts { |
There was a problem hiding this comment.
@luke-lombardi Would it be problem if we delay the exposing of ports right before the container starts instead?
|
|
||
| if self.checkpoint_enabled and self.checkpoint_condition is not None: | ||
| self.entrypoint = [ | ||
| "(python -m beta9.runner.checkpoint &) &&", |
There was a problem hiding this comment.
With regards to the current snippet of entrypoint code we have, this is the best place to put the script.
we can also move this to the backend and pass USER_CODE_DIR
["sh", "-c", f"cd {USER_CODE_DIR} && {' '.join(self.entrypoint)}"]
|
|
||
| if not module and hasattr(user_obj, "cleanup_deployment_artifacts"): | ||
| user_obj.cleanup_deployment_artifacts() | ||
| finally: |
There was a problem hiding this comment.
GRPC errors left pod artifacts in the repo and they were not properly cleaned up.
| updateStateErr := s.updateCheckpointState(request, types.CheckpointStatusRestoreFailed) | ||
| if updateStateErr != nil { | ||
| log.Error().Str("container_id", request.ContainerId).Msgf("failed to update checkpoint state: %v", updateStateErr) | ||
| var e *runc.ExitError |
There was a problem hiding this comment.
[TODO]: This logic is not complete. @luke-lombardi I want to chat with you about stop and error conditions related to a running RestoreCheckpoint process.
A SIGKILL from container stop event from user would put the checkpoint into a restore_failed state which is not what we want but this SIGKILL can also occur for other factors that are related to a failed restore. We need to hash about better indicators of failure in a restore checkpoint process.
| user_obj.cleanup_deployment_artifacts() | ||
| finally: | ||
| if not module and hasattr(user_obj, "cleanup_deployment_artifacts"): | ||
| user_obj.cleanup_deployment_artifacts() |
There was a problem hiding this comment.
Exceptions from cleanup_deployment_artifacts inside finally can mask the original failure or exit; wrap cleanup in its own try/except to avoid overshadowing errors.
Prompt for AI agents
Address the following comment on sdk/src/beta9/cli/deployment.py at line 197:
<comment>Exceptions from cleanup_deployment_artifacts inside finally can mask the original failure or exit; wrap cleanup in its own try/except to avoid overshadowing errors.</comment>
<file context>
@@ -180,17 +180,21 @@ def create_deployment(
- user_obj.cleanup_deployment_artifacts()
+ finally:
+ if not module and hasattr(user_obj, "cleanup_deployment_artifacts"):
+ user_obj.cleanup_deployment_artifacts()
if capture_logs.capture_logs:
</file context>
|
|
||
| if self.checkpoint_enabled and self.checkpoint_condition is not None: | ||
| self.entrypoint = [ | ||
| "(python -m beta9.runner.checkpoint &) &&", |
There was a problem hiding this comment.
Shell snippet is injected into entrypoint without a shell wrapper for custom images, causing exec failure and overriding the image's default ENTRYPOINT.
Prompt for AI agents
Address the following comment on sdk/src/beta9/abstractions/pod.py at line 310:
<comment>Shell snippet is injected into entrypoint without a shell wrapper for custom images, causing exec failure and overriding the image's default ENTRYPOINT.</comment>
<file context>
@@ -281,6 +305,12 @@ def deploy(
+ if self.checkpoint_enabled and self.checkpoint_condition is not None:
+ self.entrypoint = [
+ "(python -m beta9.runner.checkpoint &) &&",
+ *self.entrypoint,
+ ]
</file context>
| fmt.Sprintf("STUB_ID=%s", stub.ExternalId), | ||
| fmt.Sprintf("STUB_TYPE=%s", stub.Type), | ||
| fmt.Sprintf("KEEP_WARM_SECONDS=%d", stubConfig.KeepWarmSeconds), | ||
| fmt.Sprintf("CHECKPOINT_ENABLED=%t", stubConfig.CheckpointEnabled), |
There was a problem hiding this comment.
CHECKPOINT_ENABLED env var reflects stubConfig instead of the computed effective value, causing mismatches when checkpointing is disabled (e.g., multi-GPU).
Prompt for AI agents
Address the following comment on pkg/abstractions/pod/pod.go at line 285:
<comment>CHECKPOINT_ENABLED env var reflects stubConfig instead of the computed effective value, causing mismatches when checkpointing is disabled (e.g., multi-GPU).</comment>
<file context>
@@ -282,6 +282,8 @@ func (s *GenericPodService) run(ctx context.Context, authInfo *auth.AuthInfo, st
fmt.Sprintf("STUB_ID=%s", stub.ExternalId),
fmt.Sprintf("STUB_TYPE=%s", stub.Type),
fmt.Sprintf("KEEP_WARM_SECONDS=%d", stubConfig.KeepWarmSeconds),
+ fmt.Sprintf("CHECKPOINT_ENABLED=%t", stubConfig.CheckpointEnabled),
+ fmt.Sprintf("CHECKPOINT_CONDITION=%s", stubConfig.CheckpointCondition),
}...)
</file context>
| workers_ready.value += 1 | ||
|
|
||
| if workers_ready.value == config.workers: | ||
| Path(CHECKPOINT_SIGNAL_FILE).touch(exist_ok=True) |
There was a problem hiding this comment.
Creating the checkpoint signal without waiting for all workers can race with worker coordination; delegate to wait_for_checkpoint() to ensure readiness across workers.
Prompt for AI agents
Address the following comment on sdk/src/beta9/runner/checkpoint.py at line 30:
<comment>Creating the checkpoint signal without waiting for all workers can race with worker coordination; delegate to wait_for_checkpoint() to ensure readiness across workers.</comment>
<file context>
@@ -0,0 +1,71 @@
+ workers_ready.value += 1
+
+ if workers_ready.value == config.workers:
+ Path(CHECKPOINT_SIGNAL_FILE).touch(exist_ok=True)
+ return _reload_config()
+
</file context>
| }) | ||
|
|
||
| return exitCode, request.ContainerId, err | ||
| return -1, "", fmt.Errorf("checkpoint not found") |
There was a problem hiding this comment.
Missing fallback to normal run when checkpoint is not found; this now returns an error instead of starting the container normally.
(Based on the PR description that the worker falls back to a normal run if no checkpoint is found.)
Prompt for AI agents
Address the following comment on pkg/worker/criu.go at line 150:
<comment>Missing fallback to normal run when checkpoint is not found; this now returns an error instead of starting the container normally.
(Based on the PR description that the worker falls back to a normal run if no checkpoint is found.)</comment>
<file context>
@@ -123,31 +129,30 @@ func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types.
- })
-
- return exitCode, request.ContainerId, err
+ return -1, "", fmt.Errorf("checkpoint not found")
}
</file context>
| if updateStateErr != nil { | ||
| log.Error().Str("container_id", request.ContainerId).Msgf("failed to update checkpoint state: %v", updateStateErr) | ||
| var e *runc.ExitError | ||
| if errors.As(err, &e) { |
There was a problem hiding this comment.
Restore failures that are not runc.ExitError won't update checkpoint state or be logged, leaving stale/incorrect state.
Prompt for AI agents
Address the following comment on pkg/worker/criu.go at line 133:
<comment>Restore failures that are not runc.ExitError won't update checkpoint state or be logged, leaving stale/incorrect state.</comment>
<file context>
@@ -123,31 +129,30 @@ func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types.
- if updateStateErr != nil {
- log.Error().Str("container_id", request.ContainerId).Msgf("failed to update checkpoint state: %v", updateStateErr)
+ var e *runc.ExitError
+ if errors.As(err, &e) {
+ code := e.Status
+
</file context>
| } | ||
| defer f.Close() | ||
|
|
||
| err = exposeNetwork() |
There was a problem hiding this comment.
Network is exposed before a successful restore, potentially allowing premature connections; expose after a successful restore instead.
(Based on the PR description stating the worker only exposes ports after checkpoint or restore.)
Prompt for AI agents
Address the following comment on pkg/worker/criu.go at line 117:
<comment>Network is exposed before a successful restore, potentially allowing premature connections; expose after a successful restore instead.
(Based on the PR description stating the worker only exposes ports after checkpoint or restore.)</comment>
<file context>
@@ -113,6 +114,11 @@ func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types.
}
defer f.Close()
+ err = exposeNetwork()
+ if err != nil {
+ return -1, "", fmt.Errorf("failed to expose network: %v", err)
</file context>
| fmt.Sprintf("STUB_ID=%s", i.Stub.ExternalId), | ||
| fmt.Sprintf("STUB_TYPE=%s", i.Stub.Type), | ||
| fmt.Sprintf("KEEP_WARM_SECONDS=%d", i.StubConfig.KeepWarmSeconds), | ||
| fmt.Sprintf("CHECKPOINT_ENABLED=%t", i.StubConfig.CheckpointEnabled), |
There was a problem hiding this comment.
Adding CHECKPOINT_ENABLED to request.Env duplicates the worker-controlled flag and can override it due to merge order; it also ignores the computed effective value (GPU>1), risking inconsistent checkpoint behavior.
Prompt for AI agents
Address the following comment on pkg/abstractions/pod/instance.go at line 36:
<comment>Adding CHECKPOINT_ENABLED to request.Env duplicates the worker-controlled flag and can override it due to merge order; it also ignores the computed effective value (GPU>1), risking inconsistent checkpoint behavior.</comment>
<file context>
@@ -33,6 +33,8 @@ func (i *podInstance) startContainers(containersToRun int) error {
fmt.Sprintf("STUB_ID=%s", i.Stub.ExternalId),
fmt.Sprintf("STUB_TYPE=%s", i.Stub.Type),
fmt.Sprintf("KEEP_WARM_SECONDS=%d", i.StubConfig.KeepWarmSeconds),
+ fmt.Sprintf("CHECKPOINT_ENABLED=%t", i.StubConfig.CheckpointEnabled),
+ fmt.Sprintf("CHECKPOINT_CONDITION=%s", i.StubConfig.CheckpointCondition),
}...)
</file context>
| return | ||
|
|
||
| module, func = config.checkpoint_condition.split(":") | ||
| target_module = importlib.import_module(module) |
There was a problem hiding this comment.
Rule violated: Prevent Redundant Code Duplication
Repeated dynamic import/lookup logic; extract a shared utility to load a callable from "module:function" to prevent code duplication across modules.
Prompt for AI agents
Address the following comment on sdk/src/beta9/runner/checkpoint.py at line 48:
<comment>Repeated dynamic import/lookup logic; extract a shared utility to load a callable from "module:function" to prevent code duplication across modules.</comment>
<file context>
@@ -0,0 +1,71 @@
+ return
+
+ module, func = config.checkpoint_condition.split(":")
+ target_module = importlib.import_module(module)
+ method = getattr(target_module, func)
+
</file context>
Summary by cubic
Adds CRIU-based checkpoint/restore for pods with an optional user-defined checkpoint condition. This lets pods start from a pre-warmed state to reduce cold starts.
New Features
Migration