Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ and this project adheres to

### Fixed

- Fix version-stuck bug where the collaborative editor shows stale state after a
sandbox merge or CLI deploy.
[#4535](https://github.com/OpenFn/lightning/issues/4535)
- Copying api tokens doesn't work on unsecure non-localhost contexts
[PR#4551](https://github.com/OpenFn/lightning/pull/4551)
- Fix AI assistant authorization for support users on projects with support
Expand Down
19 changes: 18 additions & 1 deletion lib/lightning/collaboration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Lightning.Collaborate do

Collaborate.start(user: user, workflow: workflow)
"""
alias Lightning.Accounts.User
alias Lightning.Collaboration.DocumentSupervisor
alias Lightning.Collaboration.Registry
alias Lightning.Collaboration.Session
Expand All @@ -36,6 +37,20 @@ defmodule Lightning.Collaborate do

@spec start(opts :: Keyword.t()) :: GenServer.on_start()
def start(opts) do
case do_start(opts) do
{:error, {:error, :shared_doc_not_found}} ->
# A SharedDoc was registered in :pg but died before the Session could
# observe it (0ms auto_exit race). Yield one ms — enough for the timer
# to fire and clear :pg — then try once more from scratch.
Process.sleep(1)
do_start(opts)

result ->
result
end
end

defp do_start(opts) do
session_id = Ecto.UUID.generate()
parent_pid = Keyword.get(opts, :parent_pid, self())

Expand Down Expand Up @@ -64,13 +79,15 @@ defmodule Lightning.Collaborate do
end

# Start session for this user
user_id = if is_struct(user, User), do: user.id, else: nil

SessionSupervisor.start_child({
Session,
workflow: workflow,
user: user,
parent_pid: parent_pid,
document_name: document_name,
name: Registry.via({:session, "#{document_name}:#{session_id}", user.id})
name: Registry.via({:session, "#{document_name}:#{session_id}", user_id})
})
end

Expand Down
94 changes: 0 additions & 94 deletions lib/lightning/collaboration/persistence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ defmodule Lightning.Collaboration.Persistence do
alias Lightning.Collaboration.DocumentState
alias Lightning.Collaboration.PersistenceWriter
alias Lightning.Collaboration.Session
alias Lightning.Collaboration.WorkflowSerializer

require Logger

Expand All @@ -30,7 +29,6 @@ defmodule Lightning.Collaboration.Persistence do
case DocumentState.get_checkpoint_and_updates(doc_name) do
{:ok, checkpoint, updates} ->
apply_persisted_state(doc, doc_name, checkpoint, updates)
reconcile_or_reset(doc, doc_name, workflow)

{:error, :not_found} ->
Logger.info(
Expand Down Expand Up @@ -89,96 +87,4 @@ defmodule Lightning.Collaboration.Persistence do
DocumentState.apply_to_doc(doc, checkpoint, updates)
Logger.debug("Loaded #{length(updates)} updates. document=#{doc_name}")
end

defp reconcile_or_reset(doc, doc_name, workflow) do
workflow_map = Yex.Doc.get_map(doc, "workflow")
persisted_lock_version = extract_lock_version(workflow_map)
current_lock_version = workflow.lock_version

if stale?(persisted_lock_version, current_lock_version) do
Logger.warning("""
Persisted Y.Doc is stale (persisted: #{inspect(persisted_lock_version)}, \
current: #{current_lock_version})
Discarding persisted state and reloading from database.
document=#{doc_name}
""")

clear_and_reset_workflow(doc, workflow)
else
Logger.debug(
"Persisted Y.Doc is current (lock_version: #{current_lock_version}). document=#{doc_name}"
)

reconcile_workflow_metadata(doc, workflow)
end
end

defp extract_lock_version(workflow_map) do
case Yex.Map.fetch(workflow_map, "lock_version") do
{:ok, version} when is_float(version) -> trunc(version)
{:ok, version} when is_integer(version) -> version
{:ok, nil} -> nil
:error -> nil
end
end

defp stale?(nil, current_version), do: not is_nil(current_version)

defp stale?(persisted_version, current_version),
do: persisted_version != current_version

defp clear_and_reset_workflow(doc, workflow) do
# Same pattern as Session.clear_and_reset_doc
# Get all Yex collections BEFORE transaction to avoid VM deadlock
jobs_array = Yex.Doc.get_array(doc, "jobs")
edges_array = Yex.Doc.get_array(doc, "edges")
triggers_array = Yex.Doc.get_array(doc, "triggers")

# Transaction 1: Clear all arrays
Yex.Doc.transaction(doc, "clear_stale_workflow", fn ->
clear_array(jobs_array)
clear_array(edges_array)
clear_array(triggers_array)
end)

# Transaction 2: Re-serialize workflow from database
Session.initialize_workflow_document(doc, workflow)

:ok
end

defp clear_array(array) do
length = Yex.Array.length(array)

if length > 0 do
Yex.Array.delete_range(array, 0, length)
end
end

defp reconcile_workflow_metadata(doc, workflow) do
# Update workflow metadata fields to match current database state
# This is critical when loading persisted Y.Doc state that may be stale
workflow_map = Yex.Doc.get_map(doc, "workflow")

Yex.Doc.transaction(doc, "reconcile_metadata", fn ->
# Update lock_version to current database value
Yex.Map.set(workflow_map, "lock_version", workflow.lock_version)

# Update name in case it changed
Yex.Map.set(workflow_map, "name", workflow.name)

# Update deleted_at if present
Yex.Map.set(
workflow_map,
"deleted_at",
WorkflowSerializer.datetime_to_string(workflow.deleted_at)
)
end)

Logger.debug(
"Reconciled workflow metadata: lock_version=#{workflow.lock_version}, name=#{workflow.name}"
)

:ok
end
end
53 changes: 33 additions & 20 deletions lib/lightning/collaboration/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ defmodule Lightning.Collaboration.Session do

alias Lightning.Accounts.User
alias Lightning.Collaboration.WorkflowSerializer
alias Lightning.VersionControl.ProjectRepoConnection
alias Lightning.Workflows.Presence
alias Lightning.Workflows.WorkflowUsageLimiter
alias Yex.Sync.SharedDoc
Expand All @@ -41,7 +42,7 @@ defmodule Lightning.Collaboration.Session do

@type start_opts :: [
workflow: Lightning.Workflows.Workflow.t(),
user: User.t(),
user: User.t() | ProjectRepoConnection.t(),
parent_pid: pid()
]

Expand Down Expand Up @@ -69,6 +70,10 @@ defmodule Lightning.Collaboration.Session do
GenServer.stop(session_pid)
end

def shared_doc_pid(session_pid) do
GenServer.call(session_pid, :shared_doc_pid)
end

def child_spec(opts) do
{opts, args} =
Keyword.put_new_lazy(opts, :session_id, fn -> Ecto.UUID.generate() end)
Expand Down Expand Up @@ -114,20 +119,26 @@ defmodule Lightning.Collaboration.Session do
{:stop, {:error, :shared_doc_not_found}}

shared_doc_pid ->
SharedDoc.observe(shared_doc_pid)
Logger.info("Joined SharedDoc for #{document_name}")

# We track the user presence here so the the original WorkflowLive.Edit
# can be stopped from editing the workflow when someone else is editing it.
# Note: Presence tracking uses workflow.id, not document_name, because
# presence is about showing who is editing the workflow, not which version
Presence.track_user_presence(
user,
workflow.id,
self()
)

{:ok, %{state | shared_doc_pid: shared_doc_pid}}
try do
SharedDoc.observe(shared_doc_pid)
Logger.info("Joined SharedDoc for #{document_name}")

# We track the user presence here so the the original WorkflowLive.Edit
# can be stopped from editing the workflow when someone else is editing it.
# Note: Presence tracking uses workflow.id, not document_name, because
# presence is about showing who is editing the workflow, not which version.
# Only track presence for real users — not system actors like ProjectRepoConnection.
if is_struct(user, User),
do: Presence.track_user_presence(user, workflow.id, self())

{:ok, %{state | shared_doc_pid: shared_doc_pid}}
catch
# GenServer.call raises an exit (not a rescuable exception) when the
# target process is dead. SharedDoc may have been registered in :pg
# but died before we could observe it (0ms auto_exit race).
# Return cleanly so Collaborate.start can retry.
:exit, _ -> {:stop, {:error, :shared_doc_not_found}}
end
end
end

Expand All @@ -144,11 +155,8 @@ defmodule Lightning.Collaboration.Session do
SharedDoc.unobserve(shared_doc_pid)
end

Presence.untrack_user_presence(
state.user,
state.workflow.id,
self()
)
if is_struct(state.user, User),
do: Presence.untrack_user_presence(state.user, state.workflow.id, self())

:ok
end
Expand Down Expand Up @@ -259,6 +267,11 @@ defmodule Lightning.Collaboration.Session do
GenServer.call(session_pid, {:reset_workflow, user}, 10_000)
end

@impl true
def handle_call(:shared_doc_pid, _from, state) do
{:reply, state.shared_doc_pid, state}
end

@impl true
def handle_call(:get_doc, _from, %{shared_doc_pid: shared_doc_pid} = state) do
{:reply, SharedDoc.get_doc(shared_doc_pid), state}
Expand Down
Loading
Loading