Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d4ce6a9
Test CSV out support
Alexsp32 May 24, 2024
0e3eb49
Mistake in results concat
Alexsp32 May 24, 2024
13725c1
Add CSV update function and fix errors
Alexsp32 May 24, 2024
b826562
Update create function
Alexsp32 May 24, 2024
02ef522
Debug issue with csv read
Alexsp32 May 28, 2024
3c21c99
Update CSV logic and transform nothings to missing
Alexsp32 May 28, 2024
e926bda
Increase speed of job id comparison
Alexsp32 May 28, 2024
9e973da
Add conversion for ungrouped jld2
Alexsp32 May 28, 2024
e9cf319
Add conversion for postprocessed NQCD output dicts
Alexsp32 May 29, 2024
d9cdb42
Small bugfixes for JLD2-based workflow
Alexsp32 Jun 3, 2024
8963ec4
Faster job ID search for JLD2 workflow
Alexsp32 Jun 3, 2024
13dfd5c
✨ Allow import with arbitrary filenames
Alexsp32 Apr 11, 2025
bd04a67
🚑 Indexing mistake, only add trajectory if new
Alexsp32 Apr 11, 2025
9fbc0b2
🚑 Only add job if new
Alexsp32 Apr 11, 2025
b0e4c5d
🚑 Mistake when parsing files
Alexsp32 Apr 11, 2025
f77ef4c
🚑 Removed incorrect job id parsing
Alexsp32 Apr 11, 2025
d190698
🚧 Add debug error log
Alexsp32 Apr 14, 2025
412858f
🐛 Second attempt at fixing concat logic
Alexsp32 Apr 15, 2025
2565242
✨ Add UnicodePlot for results completeness
Alexsp32 Apr 15, 2025
ff8c018
⬆️ Removed unnecessarily tight dependency
Alexsp32 Apr 15, 2025
26abaa9
🐛 Mistake when setting up job id distributions
Alexsp32 Apr 15, 2025
9a4817e
🐛 Wrong indexing again
Alexsp32 Apr 15, 2025
2596c78
🐛 Mistake in tracking which subjobs were added.
Alexsp32 Apr 15, 2025
6ba7108
✨ Add job id source to read from files or dict
Alexsp32 Jun 3, 2025
39f9024
🐛 Wrong type definition
Alexsp32 Jun 3, 2025
506e1dd
🐛 Forgot to parse jobid as Int
Alexsp32 Jun 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
name = "ClusterScripts"
uuid = "0afbe08e-4e50-452c-a3af-5975c91048e8"
authors = ["Alexander Spears <alexander.spears@warwick.ac.uk> and contributors"]
version = "1.0.0"
version = "1.0.1"

[deps]
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
DiffEqBase = "2b5f629d-d688-5b77-993f-72d75c75574e"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
FilePathsBase = "48062228-2e41-5def-b9a4-89aafe57970f"
Glob = "c27321d9-0574-5035-807b-f59d2c89b15c"
JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819"
ProgressBars = "49802e3a-d2f1-5c88-81d8-b72133a6f568"
RobustPmap = "27aeedcb-f738-516b-a0b8-3211cf1146e5"
UnicodePlots = "b8865327-cd53-5732-bb35-84acbb429228"

[compat]
DiffEqBase = "6"
RobustPmap = "1"
julia = "1.6.2"
UnicodePlots = "3"

[extras]
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
Expand Down
4 changes: 4 additions & 0 deletions src/ClusterScripts.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ using JLD2
using RobustPmap
using Glob
using ProgressBars
import UnicodePlots

"""
Struct to hold file paths and provide some basic functionality for working with them.
Expand Down Expand Up @@ -50,4 +51,7 @@ export pmap_queue, merge_pmap_results
include("file_based.jl")
export build_job_queue, create_results_file, update_results_file, update_results_file!, serialise_queue!, save!

include("csv_out.jl")
export create_csv_file, update_csv_file!

end
117 changes: 83 additions & 34 deletions src/concat_output.jl
Original file line number Diff line number Diff line change
@@ -1,50 +1,96 @@



function concatenate_results!(results_container::AbstractArray, glob_pattern::String, queue_file::String; trajectories_key="trajectories")
function concatenate_results!(
results_container::AbstractArray,
glob_pattern::String,
queue_file::String;
trajectories_key = "trajectories",
job_id_source = :filename,
)
# Read in all files for a simulation queue.
glob_pattern = SimulationFile(glob_pattern)
all_files = map(SimulationFile, glob(glob_pattern.with_extension, glob_pattern.stem))
progress = ProgressBar(total=length(all_files), printing_delay=1.0)
progress = ProgressBar(total = length(all_files), printing_delay = 1.0)
set_description(progress, "Processing files: ")
# Import simulation parameters
simulation_parameters = jldopen(queue_file)
# Go through each element in the input tensor and collect all jobs we have for it.
for index in eachindex(simulation_parameters["parameters"])
# Read job ids from results if possible to avoid reading duplicates.
job_ids = !isassigned(results_container, index) ? simulation_parameters["parameters"][index]["job_ids"] : container[index][2]["job_ids"]
to_read = findall(x -> split(x.name, "_")[end] in string.(job_ids), all_files)
for file_index in to_read
try
file_results = jldopen(all_files[file_index].path)["results"]
@debug "File read successfully"
# Move data to the output tensor
if !isassigned(results_container, index)
results_container[index] = file_results
else
results_container[index] = push_nqcd_outputs!(results_container[index], [file_results]; trajectories_key=trajectories_key)
all_job_ids = [
get(simulation_parameters["parameters"][idx], "job_ids", Int[]) for
idx in eachindex(simulation_parameters["parameters"])
]
for file_index in eachindex(all_files)
try
file_results = jldopen(all_files[file_index].path)["results"]
# Move data to the output tensor
if job_id_source == :filename
jobid = parse(
Int,
match(
r"-\d*-(\d*).jld2",
all_files[file_index].with_extension,
).captures |> first,
)
elseif job_id_source == :in_dict
jobid = file_results[2]["jobid"]
end
parameter_index = findfirst(jobid .∈ all_job_ids)
if !isnothing(parameter_index)
if !isassigned(results_container, parameter_index)
results_container[parameter_index] = file_results
symdiff!(results_container[parameter_index][2]["job_ids"], jobid)
elseif jobid ∈ results_container[parameter_index][2]["job_ids"]
results_container[parameter_index] = push_nqcd_outputs!(
results_container[parameter_index],
[file_results];
trajectories_key = trajectories_key,
)
symdiff!(results_container[parameter_index][2]["job_ids"], jobid)
end
# Remove job id from parameters once that result has been added
jobid = parse(Int, split(all_files[file_index].name, "_")[end])
deleteat!(results_container[index][2]["job_ids"], findall(results_container[index][2]["job_ids"] .== jobid)...)
catch
@warn "File $(all_files[file_index].name) could not be read. It may be incomplete or corrupted."
continue
else
@error "Couldn't find a job ID for this file - Was the correct parameters file selected?"
end
update(progress)
# Remove job id from parameters once that result has been added
catch e
@warn "File $(all_files[file_index].name) could not be read. It may be incomplete or corrupted."
@debug "Concatenation logic failed due to the following error" error = e
continue
end
# Trajectory completeness check
if !isassigned(results_container, index) || results_container[index][2]["total_trajectories"] != results_container[index][2]["trajectories"]
@info "Simulation results are incomplete or oversubscribed in results[$(index)]. Make sure you have run all sub-jobs. "
update(progress)
end
# Trajectory completeness check
parameter_sets = []
completeness = Float64[]
for idx in eachindex(results_container)
push!(parameter_sets, idx)
if isassigned(results_container, idx)
push!(
completeness,
length(results_container[idx][2]["job_ids"]) /
length(simulation_parameters["parameters"][idx]["job_ids"]),
)
else
push!(completeness, 0.0)
end
end
UnicodePlots.barplot(
parameter_sets,
completeness;
title = "Completeness of results file",
)
end

function concatenate_results!(results_container::ResultsLazyLoader, glob_pattern::String, queue_file::String; trajectories_key="trajectories")
function concatenate_results!(
results_container::ResultsLazyLoader,
glob_pattern::String,
queue_file::String;
trajectories_key = "trajectories",
)
# Read in all files for a simulation queue.
glob_pattern = SimulationFile(glob_pattern)
all_files = map(SimulationFile, glob(glob_pattern.with_extension, glob_pattern.stem))
progress = ProgressBar(total=length(all_files), printing_delay=1.0)
progress = ProgressBar(total = length(all_files), printing_delay = 1.0)
set_description(progress, "Processing files: ")
# Import simulation parameters
simulation_parameters = jldopen(queue_file)
Expand All @@ -55,14 +101,15 @@ function concatenate_results!(results_container::ResultsLazyLoader, glob_pattern
data_to_append = []
trajectories_read = 0
ids_read = Int[]
to_read = findall(x -> split(x.name, "_")[end] in string.(job_ids), all_files)
to_read = findall(x -> parse(Int, split(x.name, "_")[end]) in job_ids, all_files)
sizehint!(data_to_append, length(to_read))
sizehint!(ids_read, length(to_read))
for file_index in to_read
try
file_results = jldopen(all_files[file_index].path)["results"]
# Put data into vector if not already
file_data = isa(file_results[1], Vector) ? file_results[1] : [file_results[1]]
file_data =
isa(file_results[1], Vector) ? file_results[1] : [file_results[1]]
# Move to cache
append!(data_to_append, file_data)
# Update trajectory count
Expand All @@ -81,13 +128,16 @@ function concatenate_results!(results_container::ResultsLazyLoader, glob_pattern
if !haskey(results_container.file["results"], "$(index)")
results_container[index] = data_to_append
else
results_container[index] = append!(deepcopy(results_container[index]), data_to_append)
results_container[index] =
append!(deepcopy(results_container[index]), data_to_append)
end
results_container.parameters[index][trajectories_key] += trajectories_read
setdiff!(results_container.parameters[index]["job_ids"], ids_read)
end
# Trajectory completeness check
if !haskey(results_container.file["results"], "$(index)") || results_container.parameters[index]["total_$(trajectories_key)"] != results_container.parameters[index][trajectories_key]
if !haskey(results_container.file["results"], "$(index)") ||
results_container.parameters[index]["total_$(trajectories_key)"] !=
results_container.parameters[index][trajectories_key]
@info "Simulation results are incomplete or oversubscribed in results[$(index)]. Make sure you have run all sub-jobs. "
end
end
Expand All @@ -98,10 +148,9 @@ end
"""
push_nqcd_outputs!!(first_output, other_outputs...)

Like a push!() function, but it also puts `first_output` into a vector if it wasn't already and adds the number of trajectories together.
TBW
Like a push!() function, but it also puts `first_output` into a vector if it wasn't already and adds the number of trajectories together.
"""
function push_nqcd_outputs!(first_output, other_outputs; trajectories_key="trajectories")
function push_nqcd_outputs!(first_output, other_outputs; trajectories_key = "trajectories")
for i in other_outputs
for (k, v) in i[2]
if k == trajectories_key
Expand Down
100 changes: 100 additions & 0 deletions src/csv_out.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using CSV, DataFrames

function create_csv_file(output_filename::String, glob_pattern::String, queue_file::String)
# Create an empty output CSV
output_dataframe = DataFrame(job_id=Int[], parameters_set=Int[])
return update_csv_file!(output_filename, output_dataframe, glob_pattern, queue_file)
end

function update_csv_file!(output_filename::String, input_file::DataFrame, glob_pattern::String, queue_file::String)
simulation_parameters = jldopen(queue_file)
# Create an empty output CSV
# Concatenate results
glob_pattern = SimulationFile(glob_pattern)
all_files = map(SimulationFile, glob(glob_pattern.with_extension, glob_pattern.stem))
progress = ProgressBar(total=length(all_files), printing_delay=1.0)
set_description(progress, "Processing files: ")
for index in eachindex(vec(simulation_parameters["parameters"]))
# Read job ids from results if possible to avoid reading duplicates.
job_ids = convert(Vector{Int64}, simulation_parameters["parameters"][index]["job_ids"])
to_read = findall(x -> parse(Int, split(x.name, "_")[end]) in job_ids, all_files)
for file_index in to_read
try
file_results = jldopen(all_files[file_index].path)["results"]
@debug "File read successfully"
# Columns to write out
output_columns = [:job_id, :parameter_set]
for (k, v) in pairs(file_results[1][1])
# Only make entries for non-vector outputs. (Number, Bool, String are OK)
!isa(v, AbstractArray) ? push!(output_columns, k) : nothing
end
# Collect output values
output_values = Any[]
all_jobids = isa(file_results[2]["jobid"], Vector) ? file_results[2]["jobid"] : [file_results[2]["jobid"]]
new_jobids = findall(x -> !(x in input_file.job_id), all_jobids)
push!(output_values, all_jobids[new_jobids])
parameter_set = fill(index, length(new_jobids))
push!(output_values, parameter_set)
sizehint!(output_values, length(output_columns))
for column in output_columns[3:end] #excluding job_id and parameters_set
col_values = getindex.(file_results[1][new_jobids], column)
push!(output_values, replace(col_values, nothing => missing))
end
# Add to Dataframe
input_file = vcat(input_file, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union)
catch e
@warn "Error reading file: $e"
end
update(progress)
end
end
CSV.write(output_filename, input_file)
end

function results_array_to_dataframe(output_dicts::Vector, params_dict::Dict; params_index=missing)
output_dataframe = DataFrame(job_id=Int[], parameters_set=Int[])
output_columns = [:job_id, :parameters_set]
for (k, v) in pairs(output_dicts[1])
# Only make entries for non-vector outputs. (Number, Bool, String are OK)
!isa(v, AbstractArray) ? push!(output_columns, k) : nothing
end
# Collect output values
output_values = Any[]
sizehint!(output_values, length(output_columns))
parameter_set = fill(params_index, length(output_dicts))
push!(output_values, parameter_set)
for column in output_columns[3:end] #excluding job_id and parameters_set
col_values = getindex.(output_dicts, column)
push!(output_values, replace(col_values, nothing => missing))
end
# Add to Dataframe
output_dataframe = vcat(output_dataframe, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union)
end

function jld2_ungrouped_to_csv(csv_output::String, jld2_input::String)
jld2_results = jldopen(jld2_input)["results"]
output_dataframe = DataFrame(job_id=Int[], parameters_set=Int[])
for index in eachindex(jld2_results)
try
output_columns = [:job_id, :parameters_set]
for (k, v) in pairs(jld2_results[index][1][1])
# Only make entries for non-vector outputs. (Number, Bool, String are OK)
!isa(v, AbstractArray) ? push!(output_columns, k) : nothing
end
# Collect output values
output_values = Any[]
sizehint!(output_values, length(output_columns))
parameter_set = fill(index, length(jld2_results[index][1]))
push!(output_values, parameter_set)
for column in output_columns[3:end] #excluding job_id and parameters_set
col_values = getindex.(jld2_results[index][1], column)
push!(output_values, replace(col_values, nothing => missing))
end
# Add to Dataframe
output_dataframe = vcat(output_dataframe, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union)
catch e
@warn "Error reading index $(index): $(e)"
end
end
CSV.write(csv_output, output_dataframe)
end
Loading
Loading