Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/lightning/invocation/step.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ defmodule Lightning.Invocation.Step do
:finished_at
])
|> validate_required([:finished_at, :exit_reason])
|> assoc_constraint(:output_dataclip)
end

@doc false
Expand Down
83 changes: 40 additions & 43 deletions lib/lightning/runs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -198,36 +198,32 @@ defmodule Lightning.Runs do
@spec update_run(Ecto.Changeset.t(Run.t())) ::
{:ok, Run.t()} | {:error, Ecto.Changeset.t(Run.t())}
def update_run(%Ecto.Changeset{data: %Run{}} = changeset) do
run_id = Ecto.Changeset.get_field(changeset, :id)
if changeset.valid? do
run_id = Ecto.Changeset.get_field(changeset, :id)

run_query =
from(a in Run,
where: a.id == ^run_id,
lock: "FOR UPDATE"
)

update_query =
Run
|> with_cte("subset", as: ^run_query)
|> join(:inner, [a], s in fragment(~s("subset")), on: a.id == s.id)
|> select([a, _], a)

case update_runs(update_query, changeset) do
{:ok, %{runs: {1, [run]}}} ->
{:ok, run}

{:error, _op, changeset, _changes} ->
{:error, changeset}
Repo.transaction(fn ->
# Pessimistic lock — serializes with mark_run_lost janitor
_locked =
from(r in Run, where: r.id == ^run_id, lock: "FOR UPDATE")
|> Repo.one!()

with {:ok, run} <- Repo.update(changeset),
{:ok, _wo} <- Lightning.WorkOrders.update_state(run) do
run
else
{:error, reason} -> Repo.rollback(reason)
end
end)
|> tap(fn
{:ok, run} -> broadcast_run_updates(run)
_ -> :noop
end)
else
{:error, changeset}
end
end

def update_runs(update_query, updates) do
updates =
case updates do
%Ecto.Changeset{changes: changes} -> [set: changes |> Enum.into([])]
updates when is_list(updates) -> updates
end

def update_runs(update_query, updates) when is_list(updates) do
Ecto.Multi.new()
|> Ecto.Multi.update_all(:runs, update_query, updates)
|> Ecto.Multi.run(:post, fn _repo, %{runs: {_, runs}} ->
Expand All @@ -240,27 +236,28 @@ defmodule Lightning.Runs do
|> Repo.transaction()
|> tap(fn result ->
with {:ok, %{runs: {_n, runs}}} <- result do
# TODO: remove the requirement for events to be hydrated with a specific
# set of preloads.
runs
|> Enum.map(fn run ->
Repo.preload(run, [
:snapshot,
:created_by,
:starting_trigger,
workflow: [:project]
])
end)
|> Enum.each(fn run ->
# Broadcast to run-specific topic (for run viewer)
Events.run_updated(run)
# Broadcast to project topic (for workflow channel/history)
Lightning.WorkOrders.Events.run_updated(run.workflow.project_id, run)
end)
Enum.each(runs, &broadcast_run_updates/1)
end
end)
end

defp broadcast_run_updates(%Run{} = run) do
# TODO: remove the requirement for events to be hydrated with a specific
# set of preloads.
run =
Repo.preload(run, [
:snapshot,
:created_by,
:starting_trigger,
workflow: [:project]
])

# Broadcast to run-specific topic (for run viewer)
Events.run_updated(run)
# Broadcast to project topic (for workflow channel/history)
Lightning.WorkOrders.Events.run_updated(run.workflow.project_id, run)
end

def append_run_log(run, params, scrubber \\ nil) do
LogLine.new(run, params, scrubber)
|> Ecto.Changeset.validate_change(:step_id, fn _field, step_id ->
Expand Down
12 changes: 7 additions & 5 deletions lib/lightning/runs/handlers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ defmodule Lightning.Runs.Handlers do
with {:ok, start_run} <- params |> new() |> apply_action(:validate) do
run
|> Run.start(to_run_params(start_run))
|> Runs.update_run()
|> case do
%{valid?: false} = changeset -> {:error, changeset}
changeset -> Runs.update_run(changeset)
end
|> tap(&track_run_queue_delay/1)
end
end
Expand Down Expand Up @@ -145,10 +148,9 @@ defmodule Lightning.Runs.Handlers do
|> Map.put(:finished_at, complete_run.timestamp)
end

# @Stu - is this necessary? I'm worried that it's overkill to check first,
# but also don't want a situation where we crash the channel cause it's not
# there.
# When the worker sends an existing dataclip ID, verify it exists first.
# Pre-check existence for a clean error message to the worker.
# The FK constraint on Run.complete/2 is also enforced at the DB level
# as a safety net (via Repo.update in Runs.update_run/1).
defp resolve_final_dataclip(
%__MODULE__{final_dataclip_id: id} = complete_run,
_options
Expand Down
12 changes: 11 additions & 1 deletion lib/lightning/runs/run.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ defmodule Lightning.Run do
"""
use Lightning.Schema

require Logger

import Lightning.ChangesetUtils
import Lightning.Validators

Expand Down Expand Up @@ -212,9 +214,17 @@ defmodule Lightning.Run do
changeset |> validate_required([:finished_at])

{from, to} when from == to ->
Logger.warning(
"Run state machine: same-state transition #{inspect(from)} -> #{inspect(to)}"
)

changeset

{_from, _to} ->
{from, to} ->
Logger.warning(
"Run state machine: unexpected transition #{inspect(from)} -> #{inspect(to)}"
)

changeset
end
end
Expand Down
Loading