From d4ce6a98c9c411005c7fc997e7365e78905b220b Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Fri, 24 May 2024 13:53:04 +0100 Subject: [PATCH 01/26] Test CSV out support --- Project.toml | 2 ++ src/ClusterScripts.jl | 2 ++ src/csv_out.jl | 44 +++++++++++++++++++++++++++++++++++++++++++ src/file_based.jl | 3 +++ 4 files changed, 51 insertions(+) create mode 100644 src/csv_out.jl diff --git a/Project.toml b/Project.toml index 44354cf..90e7049 100644 --- a/Project.toml +++ b/Project.toml @@ -4,6 +4,8 @@ authors = ["Alexander Spears and contributors"] version = "1.0.0" [deps] +CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" +DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" DiffEqBase = "2b5f629d-d688-5b77-993f-72d75c75574e" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" FilePathsBase = "48062228-2e41-5def-b9a4-89aafe57970f" diff --git a/src/ClusterScripts.jl b/src/ClusterScripts.jl index 10c3053..e5efb4d 100644 --- a/src/ClusterScripts.jl +++ b/src/ClusterScripts.jl @@ -50,4 +50,6 @@ export pmap_queue, merge_pmap_results include("file_based.jl") export build_job_queue, create_results_file, update_results_file, update_results_file!, serialise_queue!, save! +include("csv_out.jl") + end diff --git a/src/csv_out.jl b/src/csv_out.jl new file mode 100644 index 0000000..f706d12 --- /dev/null +++ b/src/csv_out.jl @@ -0,0 +1,44 @@ +using CSV, DataFrames + +function create_results_file(output_filename::String, glob_pattern::String, queue_file::String) + simulation_parameters = jldopen(queue_file) + # Create an empty output CSV + output_dataframe = DataFrame(job_id = Int[], parameters_set=Int[]) + # Concatenate results + glob_pattern = SimulationFile(glob_pattern) + all_files = map(SimulationFile, glob(glob_pattern.with_extension, glob_pattern.stem)) + progress = ProgressBar(total=length(all_files), printing_delay=1.0) + set_description(progress, "Processing files: ") + for index in eachindex(vec(simulation_parameters["parameters"])) + # Read job ids from results if possible to avoid reading duplicates. + job_ids = simulation_parameters["parameters"][index]["job_ids"] + to_read = findall(x -> split(x.name, "_")[end] in string.(job_ids), all_files) + for file_index in to_read + try + file_results = jldopen(all_files[file_index].path)["results"] + @debug "File read successfully" + # Columns to write out + output_columns = Symbol[:job_id, :parameters_set] + for (k,v) in file_results[1] + # Only make entries for non-vector outputs. (Number, Bool, String are OK) + !isa(v, AbstractArray) : push!(output_columns, k) : nothing + end + # Collect output values + output_values = Any[] + all_jobids = isa(file_results[2]["job_id"], Vector) ? file_results[2]["job_id"] : [file_results[2]["job_id"] + new_jobids = findall(x ->!(x in output_dataframe.job_id), all_jobids) + push!(output_values, jobid[new_jobids]) + parameter_set = fill(index, length(new_jobids)) + push!(output_values, parameter_set) + sizehint!(output_values, length(output_columns)) + for column in output_columns[2:end] #excluding job_id and parameters_set + push!(output_values, getindex.(file_results[1][new_jobids], column)) + end + # Add to Dataframe + vcat(output_dataframe, DataFrame([i => j for (i,j) in zip(output_columns, output_values)])) + end + end + update(progress) + end + CSV.write(output_filename, output_dataframe) +end diff --git a/src/file_based.jl b/src/file_based.jl index e7f9a2e..cfead36 100644 --- a/src/file_based.jl +++ b/src/file_based.jl @@ -4,7 +4,10 @@ """ save!(loader::ResultsLazyLoader) +**Warning: Don't run this on an unstable system or network connection to prevent data loss!** + Updates the stored parameters and derived quantities inside a grouped JLD2 file. + Run this function after modifying `loader.parameters` or `loader.derived_quantities` to save the changes. """ function save!(loader::ResultsLazyLoader) From 0e3eb49eb4fab67094a5214621c1149bc68b9473 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Fri, 24 May 2024 15:42:27 +0100 Subject: [PATCH 02/26] Mistake in results concat --- src/csv_out.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csv_out.jl b/src/csv_out.jl index f706d12..65d759e 100644 --- a/src/csv_out.jl +++ b/src/csv_out.jl @@ -18,7 +18,7 @@ function create_results_file(output_filename::String, glob_pattern::String, queu file_results = jldopen(all_files[file_index].path)["results"] @debug "File read successfully" # Columns to write out - output_columns = Symbol[:job_id, :parameters_set] + output_columns = [:job_id, :parameters_set] for (k,v) in file_results[1] # Only make entries for non-vector outputs. (Number, Bool, String are OK) !isa(v, AbstractArray) : push!(output_columns, k) : nothing From 13725c168fb26f13591a51f2ec04d47ee5c45c09 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Fri, 24 May 2024 16:11:23 +0100 Subject: [PATCH 03/26] Add CSV update function and fix errors --- src/ClusterScripts.jl | 1 + src/csv_out.jl | 25 ++++++++++++++++--------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/ClusterScripts.jl b/src/ClusterScripts.jl index e5efb4d..d8ea7a7 100644 --- a/src/ClusterScripts.jl +++ b/src/ClusterScripts.jl @@ -51,5 +51,6 @@ include("file_based.jl") export build_job_queue, create_results_file, update_results_file, update_results_file!, serialise_queue!, save! include("csv_out.jl") +export create_csv_file, update_csv_file! end diff --git a/src/csv_out.jl b/src/csv_out.jl index 65d759e..facb746 100644 --- a/src/csv_out.jl +++ b/src/csv_out.jl @@ -1,9 +1,14 @@ using CSV, DataFrames -function create_results_file(output_filename::String, glob_pattern::String, queue_file::String) +function create_csv_file(output_filename::String, glob_pattern::String, queue_file::String) + # Create an empty output CSV + output_dataframe = DataFrame(job_id=Int[], parameters_set=Int[]) + return update_csv_file(output_filename, output_dataframe, glob_pattern, queue_file) +end + +function update_csv_file!(output_filename::String, input_file::DataFrame, glob_pattern::String, queue_file::String) simulation_parameters = jldopen(queue_file) # Create an empty output CSV - output_dataframe = DataFrame(job_id = Int[], parameters_set=Int[]) # Concatenate results glob_pattern = SimulationFile(glob_pattern) all_files = map(SimulationFile, glob(glob_pattern.with_extension, glob_pattern.stem)) @@ -19,14 +24,14 @@ function create_results_file(output_filename::String, glob_pattern::String, queu @debug "File read successfully" # Columns to write out output_columns = [:job_id, :parameters_set] - for (k,v) in file_results[1] + for (k, v) in file_results[1] # Only make entries for non-vector outputs. (Number, Bool, String are OK) - !isa(v, AbstractArray) : push!(output_columns, k) : nothing + !isa(v, AbstractArray):push!(output_columns, k):nothing end # Collect output values output_values = Any[] - all_jobids = isa(file_results[2]["job_id"], Vector) ? file_results[2]["job_id"] : [file_results[2]["job_id"] - new_jobids = findall(x ->!(x in output_dataframe.job_id), all_jobids) + all_jobids = isa(file_results[2]["job_id"], Vector) ? file_results[2]["job_id"] : [file_results[2]["job_id"]] + new_jobids = findall(x -> !(x in input_file.job_id), all_jobids) push!(output_values, jobid[new_jobids]) parameter_set = fill(index, length(new_jobids)) push!(output_values, parameter_set) @@ -35,10 +40,12 @@ function create_results_file(output_filename::String, glob_pattern::String, queu push!(output_values, getindex.(file_results[1][new_jobids], column)) end # Add to Dataframe - vcat(output_dataframe, DataFrame([i => j for (i,j) in zip(output_columns, output_values)])) - end + input_file = vcat(input_file, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union) + catch e + @warn "Error reading file: $e" end update(progress) end - CSV.write(output_filename, output_dataframe) + end + CSV.write(output_filename, input_file) end From b826562d03b95d5cccb3bc3af49e46b834d8a206 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Fri, 24 May 2024 16:44:53 +0100 Subject: [PATCH 04/26] Update create function --- src/csv_out.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csv_out.jl b/src/csv_out.jl index facb746..78df072 100644 --- a/src/csv_out.jl +++ b/src/csv_out.jl @@ -3,7 +3,7 @@ using CSV, DataFrames function create_csv_file(output_filename::String, glob_pattern::String, queue_file::String) # Create an empty output CSV output_dataframe = DataFrame(job_id=Int[], parameters_set=Int[]) - return update_csv_file(output_filename, output_dataframe, glob_pattern, queue_file) + return update_csv_file!(output_filename, output_dataframe, glob_pattern, queue_file) end function update_csv_file!(output_filename::String, input_file::DataFrame, glob_pattern::String, queue_file::String) From 02ef522f9d70a1ea08a7e088cbd7701772052402 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 28 May 2024 09:35:50 +0100 Subject: [PATCH 05/26] Debug issue with csv read --- src/csv_out.jl | 48 ++++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/src/csv_out.jl b/src/csv_out.jl index 78df072..2de860c 100644 --- a/src/csv_out.jl +++ b/src/csv_out.jl @@ -19,31 +19,31 @@ function update_csv_file!(output_filename::String, input_file::DataFrame, glob_p job_ids = simulation_parameters["parameters"][index]["job_ids"] to_read = findall(x -> split(x.name, "_")[end] in string.(job_ids), all_files) for file_index in to_read - try - file_results = jldopen(all_files[file_index].path)["results"] - @debug "File read successfully" - # Columns to write out - output_columns = [:job_id, :parameters_set] - for (k, v) in file_results[1] - # Only make entries for non-vector outputs. (Number, Bool, String are OK) - !isa(v, AbstractArray):push!(output_columns, k):nothing - end - # Collect output values - output_values = Any[] - all_jobids = isa(file_results[2]["job_id"], Vector) ? file_results[2]["job_id"] : [file_results[2]["job_id"]] - new_jobids = findall(x -> !(x in input_file.job_id), all_jobids) - push!(output_values, jobid[new_jobids]) - parameter_set = fill(index, length(new_jobids)) - push!(output_values, parameter_set) - sizehint!(output_values, length(output_columns)) - for column in output_columns[2:end] #excluding job_id and parameters_set - push!(output_values, getindex.(file_results[1][new_jobids], column)) - end - # Add to Dataframe - input_file = vcat(input_file, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union) - catch e - @warn "Error reading file: $e" + #try + file_results = jldopen(all_files[file_index].path)["results"] + @debug "File read successfully" + # Columns to write out + output_columns = [:job_id, :parameters_set] + for (k, v) in file_results[1] + # Only make entries for non-vector outputs. (Number, Bool, String are OK) + !isa(v, AbstractArray):push!(output_columns, k):nothing end + # Collect output values + output_values = Any[] + all_jobids = isa(file_results[2]["job_id"], Vector) ? file_results[2]["job_id"] : [file_results[2]["job_id"]] + new_jobids = findall(x -> !(x in input_file.job_id), all_jobids) + push!(output_values, jobid[new_jobids]) + parameter_set = fill(index, length(new_jobids)) + push!(output_values, parameter_set) + sizehint!(output_values, length(output_columns)) + for column in output_columns[2:end] #excluding job_id and parameters_set + push!(output_values, getindex.(file_results[1][new_jobids], column)) + end + # Add to Dataframe + input_file = vcat(input_file, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union) + #catch e + @warn "Error reading file: $e" + #end update(progress) end end From 3c21c9990fdfb803dbb2a77aee1aeeb85331631a Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 28 May 2024 10:38:54 +0100 Subject: [PATCH 06/26] Update CSV logic and transform nothings to missing --- src/csv_out.jl | 49 +++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/src/csv_out.jl b/src/csv_out.jl index 2de860c..e90bce7 100644 --- a/src/csv_out.jl +++ b/src/csv_out.jl @@ -19,31 +19,32 @@ function update_csv_file!(output_filename::String, input_file::DataFrame, glob_p job_ids = simulation_parameters["parameters"][index]["job_ids"] to_read = findall(x -> split(x.name, "_")[end] in string.(job_ids), all_files) for file_index in to_read - #try - file_results = jldopen(all_files[file_index].path)["results"] - @debug "File read successfully" - # Columns to write out - output_columns = [:job_id, :parameters_set] - for (k, v) in file_results[1] - # Only make entries for non-vector outputs. (Number, Bool, String are OK) - !isa(v, AbstractArray):push!(output_columns, k):nothing + try + file_results = jldopen(all_files[file_index].path)["results"] + @debug "File read successfully" + # Columns to write out + output_columns = [:job_id, :parameters_set] + for (k, v) in pairs(file_results[1][1]) + # Only make entries for non-vector outputs. (Number, Bool, String are OK) + !isa(v, AbstractArray) ? push!(output_columns, k) : nothing + end + # Collect output values + output_values = Any[] + all_jobids = isa(file_results[2]["jobid"], Vector) ? file_results[2]["jobid"] : [file_results[2]["jobid"]] + new_jobids = findall(x -> !(x in input_file.job_id), all_jobids) + push!(output_values, all_jobids[new_jobids]) + parameter_set = fill(index, length(new_jobids)) + push!(output_values, parameter_set) + sizehint!(output_values, length(output_columns)) + for column in output_columns[3:end] #excluding job_id and parameters_set + col_values = getindex.(file_results[1][new_jobids], column) + push!(output_values, replace(col_values, nothing => missing)) + end + # Add to Dataframe + input_file = vcat(input_file, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union) + catch e + @warn "Error reading file: $e" end - # Collect output values - output_values = Any[] - all_jobids = isa(file_results[2]["job_id"], Vector) ? file_results[2]["job_id"] : [file_results[2]["job_id"]] - new_jobids = findall(x -> !(x in input_file.job_id), all_jobids) - push!(output_values, jobid[new_jobids]) - parameter_set = fill(index, length(new_jobids)) - push!(output_values, parameter_set) - sizehint!(output_values, length(output_columns)) - for column in output_columns[2:end] #excluding job_id and parameters_set - push!(output_values, getindex.(file_results[1][new_jobids], column)) - end - # Add to Dataframe - input_file = vcat(input_file, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union) - #catch e - @warn "Error reading file: $e" - #end update(progress) end end From e926bda5cbf658db9097c0214ac3e713744a4b40 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 28 May 2024 11:18:32 +0100 Subject: [PATCH 07/26] Increase speed of job id comparison --- src/csv_out.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/csv_out.jl b/src/csv_out.jl index e90bce7..1943246 100644 --- a/src/csv_out.jl +++ b/src/csv_out.jl @@ -16,8 +16,8 @@ function update_csv_file!(output_filename::String, input_file::DataFrame, glob_p set_description(progress, "Processing files: ") for index in eachindex(vec(simulation_parameters["parameters"])) # Read job ids from results if possible to avoid reading duplicates. - job_ids = simulation_parameters["parameters"][index]["job_ids"] - to_read = findall(x -> split(x.name, "_")[end] in string.(job_ids), all_files) + job_ids = convert(Vector{Int64}, simulation_parameters["parameters"][index]["job_ids"]) + to_read = findall(x -> parse(Int, split(x.name, "_")[end]) in job_ids, all_files) for file_index in to_read try file_results = jldopen(all_files[file_index].path)["results"] From 9e973da7b8ef81c7fc13068be66c085f49e8b622 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 28 May 2024 13:47:32 +0100 Subject: [PATCH 08/26] Add conversion for ungrouped jld2 --- src/csv_out.jl | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/csv_out.jl b/src/csv_out.jl index 1943246..715e90f 100644 --- a/src/csv_out.jl +++ b/src/csv_out.jl @@ -50,3 +50,31 @@ function update_csv_file!(output_filename::String, input_file::DataFrame, glob_p end CSV.write(output_filename, input_file) end + +function jld2_ungrouped_to_csv(csv_output::String, jld2_input::String) + jld2_results = jldopen(jld2_input)["results"] + output_dataframe = DataFrame(job_id=Int[], parameters_set=Int[]) + for index in eachindex(jld2_results) + try + output_columns = [:job_id, :parameters_set] + for (k, v) in pairs(jld2_results[index][1][1]) + # Only make entries for non-vector outputs. (Number, Bool, String are OK) + !isa(v, AbstractArray) ? push!(output_columns, k) : nothing + end + # Collect output values + output_values = Any[] + sizehint!(output_values, length(output_columns)) + parameter_set = fill(index, length(jld2_results[index][1])) + push!(output_values, parameter_set) + for column in output_columns[3:end] #excluding job_id and parameters_set + col_values = getindex.(jld2_results[index][1], column) + push!(output_values, replace(col_values, nothing => missing)) + end + # Add to Dataframe + output_dataframe = vcat(output_dataframe, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union) + catch e + @warn "Error reading index $(index): $(e)" + end + end + CSV.write(csv_output, output_dataframe) +end From e9cf31911756fc9e5e4d6fbd3f1b04ab37ce0abc Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Wed, 29 May 2024 10:41:26 +0100 Subject: [PATCH 09/26] Add conversion for postprocessed NQCD output dicts --- src/csv_out.jl | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/csv_out.jl b/src/csv_out.jl index 715e90f..018f8c8 100644 --- a/src/csv_out.jl +++ b/src/csv_out.jl @@ -51,6 +51,26 @@ function update_csv_file!(output_filename::String, input_file::DataFrame, glob_p CSV.write(output_filename, input_file) end +function results_array_to_dataframe(output_dicts::Vector, params_dict::Dict; params_index=missing) + output_dataframe = DataFrame(job_id=Int[], parameters_set=Int[]) + output_columns = [:job_id, :parameters_set] + for (k, v) in pairs(output_dicts[1]) + # Only make entries for non-vector outputs. (Number, Bool, String are OK) + !isa(v, AbstractArray) ? push!(output_columns, k) : nothing + end + # Collect output values + output_values = Any[] + sizehint!(output_values, length(output_columns)) + parameter_set = fill(params_index, length(output_dicts)) + push!(output_values, parameter_set) + for column in output_columns[3:end] #excluding job_id and parameters_set + col_values = getindex.(output_dicts, column) + push!(output_values, replace(col_values, nothing => missing)) + end + # Add to Dataframe + output_dataframe = vcat(output_dataframe, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union) +end + function jld2_ungrouped_to_csv(csv_output::String, jld2_input::String) jld2_results = jldopen(jld2_input)["results"] output_dataframe = DataFrame(job_id=Int[], parameters_set=Int[]) From d9cdb42275623800ade1258603542dea0f6a4ff1 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Mon, 3 Jun 2024 13:31:09 +0100 Subject: [PATCH 10/26] Small bugfixes for JLD2-based workflow --- src/concat_output.jl | 2 +- src/csv_out.jl | 2 +- src/file_based.jl | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index cf4062f..a9d5150 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -12,7 +12,7 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St # Go through each element in the input tensor and collect all jobs we have for it. for index in eachindex(simulation_parameters["parameters"]) # Read job ids from results if possible to avoid reading duplicates. - job_ids = !isassigned(results_container, index) ? simulation_parameters["parameters"][index]["job_ids"] : container[index][2]["job_ids"] + job_ids = !isassigned(results_container, index) ? simulation_parameters["parameters"][index]["job_ids"] : results_container[index][2]["job_ids"] to_read = findall(x -> split(x.name, "_")[end] in string.(job_ids), all_files) for file_index in to_read try diff --git a/src/csv_out.jl b/src/csv_out.jl index 018f8c8..5057bbe 100644 --- a/src/csv_out.jl +++ b/src/csv_out.jl @@ -23,7 +23,7 @@ function update_csv_file!(output_filename::String, input_file::DataFrame, glob_p file_results = jldopen(all_files[file_index].path)["results"] @debug "File read successfully" # Columns to write out - output_columns = [:job_id, :parameters_set] + output_columns = [:job_id, :parameter_set] for (k, v) in pairs(file_results[1][1]) # Only make entries for non-vector outputs. (Number, Bool, String are OK) !isa(v, AbstractArray) ? push!(output_columns, k) : nothing diff --git a/src/file_based.jl b/src/file_based.jl index cfead36..3c2e644 100644 --- a/src/file_based.jl +++ b/src/file_based.jl @@ -180,7 +180,7 @@ function update_results_file(input_file::String, glob_pattern::String, queue_fil output_tensor = jldopen(input_file)["results"] concatenate_results!(output_tensor, glob_pattern, queue_file; trajectories_key=trajectories_key) if file_format == "jld2" - save_as_jld2(output_filename, output_tensor) + save_as_jld2(output_file, output_tensor) end return reshape(output_tensor, size(simulation_parameters["parameters"])) end From 8963ec400f5201c89ddb8c22ac4f8305d53d0349 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Mon, 3 Jun 2024 13:37:06 +0100 Subject: [PATCH 11/26] Faster job ID search for JLD2 workflow --- src/concat_output.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index a9d5150..edc3249 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -13,7 +13,7 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St for index in eachindex(simulation_parameters["parameters"]) # Read job ids from results if possible to avoid reading duplicates. job_ids = !isassigned(results_container, index) ? simulation_parameters["parameters"][index]["job_ids"] : results_container[index][2]["job_ids"] - to_read = findall(x -> split(x.name, "_")[end] in string.(job_ids), all_files) + to_read = findall(x -> parse(Int, split(x.name, "_")[end]) in job_ids, all_files) for file_index in to_read try file_results = jldopen(all_files[file_index].path)["results"] @@ -55,7 +55,7 @@ function concatenate_results!(results_container::ResultsLazyLoader, glob_pattern data_to_append = [] trajectories_read = 0 ids_read = Int[] - to_read = findall(x -> split(x.name, "_")[end] in string.(job_ids), all_files) + to_read = findall(x -> Parse(Int, split(x.name, "_")[end]) in job_ids, all_files) sizehint!(data_to_append, length(to_read)) sizehint!(ids_read, length(to_read)) for file_index in to_read From 13dfd5c4f28dca09edca3707b1767876df420e5d Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Fri, 11 Apr 2025 13:34:27 +0100 Subject: [PATCH 12/26] =?UTF-8?q?=E2=9C=A8=20Allow=20import=20with=20arbit?= =?UTF-8?q?rary=20filenames?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/concat_output.jl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index edc3249..1e7806e 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -13,8 +13,7 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St for index in eachindex(simulation_parameters["parameters"]) # Read job ids from results if possible to avoid reading duplicates. job_ids = !isassigned(results_container, index) ? simulation_parameters["parameters"][index]["job_ids"] : results_container[index][2]["job_ids"] - to_read = findall(x -> parse(Int, split(x.name, "_")[end]) in job_ids, all_files) - for file_index in to_read + for file_index in all_files try file_results = jldopen(all_files[file_index].path)["results"] @debug "File read successfully" @@ -25,7 +24,7 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St results_container[index] = push_nqcd_outputs!(results_container[index], [file_results]; trajectories_key=trajectories_key) end # Remove job id from parameters once that result has been added - jobid = parse(Int, split(all_files[file_index].name, "_")[end]) + jobid = parse(Int, file_results[2]["jobid"]) deleteat!(results_container[index][2]["job_ids"], findall(results_container[index][2]["job_ids"] .== jobid)...) catch @warn "File $(all_files[file_index].name) could not be read. It may be incomplete or corrupted." From bd04a6701474e630aac4617c6363aefad47ff236 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Fri, 11 Apr 2025 13:47:31 +0100 Subject: [PATCH 13/26] =?UTF-8?q?=F0=9F=9A=91=20Indexing=20mistake,=20only?= =?UTF-8?q?=20add=20trajectory=20if=20new?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/concat_output.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index 1e7806e..6bbf8d2 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -13,7 +13,7 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St for index in eachindex(simulation_parameters["parameters"]) # Read job ids from results if possible to avoid reading duplicates. job_ids = !isassigned(results_container, index) ? simulation_parameters["parameters"][index]["job_ids"] : results_container[index][2]["job_ids"] - for file_index in all_files + for file_index in eachindex(all_files) try file_results = jldopen(all_files[file_index].path)["results"] @debug "File read successfully" From 9fbc0b26c8b8129ccd89bdf490b894cb0cb56451 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Fri, 11 Apr 2025 13:47:44 +0100 Subject: [PATCH 14/26] =?UTF-8?q?=F0=9F=9A=91=20Only=20add=20job=20if=20ne?= =?UTF-8?q?w?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/concat_output.jl | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index 6bbf8d2..46ffada 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -18,14 +18,19 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St file_results = jldopen(all_files[file_index].path)["results"] @debug "File read successfully" # Move data to the output tensor + jobid = parse(Int, file_results[2]["jobid"]) if !isassigned(results_container, index) results_container[index] = file_results + jobid = parse(Int, file_results[2]["jobid"]) + deleteat!(results_container[index][2]["job_ids"], findall(results_container[index][2]["job_ids"] .== jobid)...) else - results_container[index] = push_nqcd_outputs!(results_container[index], [file_results]; trajectories_key=trajectories_key) + if jobid ∉ results_container[index][2]["job_ids"] + results_container[index] = push_nqcd_outputs!(results_container[index], [file_results]; trajectories_key=trajectories_key) + jobid = parse(Int, file_results[2]["jobid"]) + deleteat!(results_container[index][2]["job_ids"], findall(results_container[index][2]["job_ids"] .== jobid)...) + end end # Remove job id from parameters once that result has been added - jobid = parse(Int, file_results[2]["jobid"]) - deleteat!(results_container[index][2]["job_ids"], findall(results_container[index][2]["job_ids"] .== jobid)...) catch @warn "File $(all_files[file_index].name) could not be read. It may be incomplete or corrupted." continue From b0e4c5d93bb055d28f895865b22f5b695375bf3a Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Fri, 11 Apr 2025 13:58:54 +0100 Subject: [PATCH 15/26] =?UTF-8?q?=F0=9F=9A=91=20Mistake=20when=20parsing?= =?UTF-8?q?=20files?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/concat_output.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index 46ffada..9f00f8f 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -18,7 +18,7 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St file_results = jldopen(all_files[file_index].path)["results"] @debug "File read successfully" # Move data to the output tensor - jobid = parse(Int, file_results[2]["jobid"]) + jobid = file_results[2]["jobid"] if !isassigned(results_container, index) results_container[index] = file_results jobid = parse(Int, file_results[2]["jobid"]) From f77ef4cad87d87958f421aaf6b3b43df6b00c263 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Fri, 11 Apr 2025 14:07:06 +0100 Subject: [PATCH 16/26] =?UTF-8?q?=F0=9F=9A=91=20Removed=20incorrect=20job?= =?UTF-8?q?=20id=20parsing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/concat_output.jl | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index 9f00f8f..219aaca 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -21,13 +21,11 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St jobid = file_results[2]["jobid"] if !isassigned(results_container, index) results_container[index] = file_results - jobid = parse(Int, file_results[2]["jobid"]) - deleteat!(results_container[index][2]["job_ids"], findall(results_container[index][2]["job_ids"] .== jobid)...) + symdiff!(results_container[index][2]["job_ids"], jobid) else if jobid ∉ results_container[index][2]["job_ids"] results_container[index] = push_nqcd_outputs!(results_container[index], [file_results]; trajectories_key=trajectories_key) - jobid = parse(Int, file_results[2]["jobid"]) - deleteat!(results_container[index][2]["job_ids"], findall(results_container[index][2]["job_ids"] .== jobid)...) + symdiff!(results_container[index][2]["job_ids"], jobid) end end # Remove job id from parameters once that result has been added From d190698b3bc4f977e5741e3f1e0514b538ab814c Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Mon, 14 Apr 2025 09:36:50 +0100 Subject: [PATCH 17/26] =?UTF-8?q?=F0=9F=9A=A7=20Add=20debug=20error=20log?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/concat_output.jl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index 219aaca..9342c8a 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -29,8 +29,9 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St end end # Remove job id from parameters once that result has been added - catch - @warn "File $(all_files[file_index].name) could not be read. It may be incomplete or corrupted." + catch e + @warn "File $(all_files[file_index].name) could not be read. It may be incomplete or corrupted." + @debug "Concatenation logic failed due to the following error" error = e continue end update(progress) From 412858f411b4aabb7825a1a3be5996356dbbb82e Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 15 Apr 2025 10:51:39 +0100 Subject: [PATCH 18/26] =?UTF-8?q?=F0=9F=90=9B=20Second=20attempt=20at=20fi?= =?UTF-8?q?xing=20concat=20logic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/concat_output.jl | 63 +++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index 9342c8a..d5bc141 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -10,37 +10,47 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St # Import simulation parameters simulation_parameters = jldopen(queue_file) # Go through each element in the input tensor and collect all jobs we have for it. - for index in eachindex(simulation_parameters["parameters"]) - # Read job ids from results if possible to avoid reading duplicates. - job_ids = !isassigned(results_container, index) ? simulation_parameters["parameters"][index]["job_ids"] : results_container[index][2]["job_ids"] - for file_index in eachindex(all_files) - try - file_results = jldopen(all_files[file_index].path)["results"] - @debug "File read successfully" - # Move data to the output tensor - jobid = file_results[2]["jobid"] - if !isassigned(results_container, index) - results_container[index] = file_results - symdiff!(results_container[index][2]["job_ids"], jobid) + all_job_ids = get.(simulation_parameters["parameters"], "job_ids", Int[]) + for file_index in eachindex(all_files) + try + file_results = jldopen(all_files[file_index].path)["results"] + @debug "File read successfully" + # Move data to the output tensor + jobid = file_results[2]["jobid"] + parameter_index = findfirst(jobid .∈ all_job_ids) + if !isnothing(parameter_index) + if !isassigned(results_container, parameter_index) + results_container[parameter_index] = file_results + symdiff!(results_container[parameter_index][2]["job_ids"], jobid) else - if jobid ∉ results_container[index][2]["job_ids"] - results_container[index] = push_nqcd_outputs!(results_container[index], [file_results]; trajectories_key=trajectories_key) - symdiff!(results_container[index][2]["job_ids"], jobid) + if jobid ∉ results_container[parameter_index][2]["job_ids"] + results_container[parameter_index] = push_nqcd_outputs!(results_container[parameter_index], [file_results]; trajectories_key=trajectories_key) + symdiff!(results_container[parameter_index][2]["job_ids"], jobid) end end - # Remove job id from parameters once that result has been added - catch e - @warn "File $(all_files[file_index].name) could not be read. It may be incomplete or corrupted." - @debug "Concatenation logic failed due to the following error" error = e - continue + else + @error "Couldn't find a job ID for this file - Was the correct parameters file selected?" end - update(progress) + # Remove job id from parameters once that result has been added + catch e + @warn "File $(all_files[file_index].name) could not be read. It may be incomplete or corrupted." + @debug "Concatenation logic failed due to the following error" error = e + continue end - # Trajectory completeness check - if !isassigned(results_container, index) || results_container[index][2]["total_trajectories"] != results_container[index][2]["trajectories"] - @info "Simulation results are incomplete or oversubscribed in results[$(index)]. Make sure you have run all sub-jobs. " + update(progress) + end + # Trajectory completeness check + parameter_sets = [] + completeness = Float64[] + for idx in eachindex(results_container) + push!(parameter_sets, idx) + if isassigned(results_container, idx) + push!(completeness, length(results_container[idx][2]["job_ids"]) / length(simulation_parameters["parameters"][idx]["job_ids"])) + else + push!(completeness, 0.0) end end + UnicodePlots.barplot(parameter_sets, completeness; title = "Completeness of results file") end function concatenate_results!(results_container::ResultsLazyLoader, glob_pattern::String, queue_file::String; trajectories_key="trajectories") @@ -58,7 +68,7 @@ function concatenate_results!(results_container::ResultsLazyLoader, glob_pattern data_to_append = [] trajectories_read = 0 ids_read = Int[] - to_read = findall(x -> Parse(Int, split(x.name, "_")[end]) in job_ids, all_files) + to_read = findall(x -> parse(Int, split(x.name, "_")[end]) in job_ids, all_files) sizehint!(data_to_append, length(to_read)) sizehint!(ids_read, length(to_read)) for file_index in to_read @@ -101,8 +111,7 @@ end """ push_nqcd_outputs!!(first_output, other_outputs...) - Like a push!() function, but it also puts `first_output` into a vector if it wasn't already and adds the number of trajectories together. -TBW +Like a push!() function, but it also puts `first_output` into a vector if it wasn't already and adds the number of trajectories together. """ function push_nqcd_outputs!(first_output, other_outputs; trajectories_key="trajectories") for i in other_outputs From 2565242429353988a8a7bba10f49b37abb6e2685 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 15 Apr 2025 10:51:58 +0100 Subject: [PATCH 19/26] =?UTF-8?q?=E2=9C=A8=20Add=20UnicodePlot=20for=20res?= =?UTF-8?q?ults=20completeness?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Project.toml | 3 ++- src/ClusterScripts.jl | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 90e7049..4e956ec 100644 --- a/Project.toml +++ b/Project.toml @@ -13,11 +13,12 @@ Glob = "c27321d9-0574-5035-807b-f59d2c89b15c" JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819" ProgressBars = "49802e3a-d2f1-5c88-81d8-b72133a6f568" RobustPmap = "27aeedcb-f738-516b-a0b8-3211cf1146e5" +UnicodePlots = "b8865327-cd53-5732-bb35-84acbb429228" [compat] DiffEqBase = "6" RobustPmap = "1" -julia = "1.6.2" +UnicodePlots = "3.7.2" [extras] Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" diff --git a/src/ClusterScripts.jl b/src/ClusterScripts.jl index d8ea7a7..5bb2005 100644 --- a/src/ClusterScripts.jl +++ b/src/ClusterScripts.jl @@ -7,6 +7,7 @@ using JLD2 using RobustPmap using Glob using ProgressBars +import UnicodePlots """ Struct to hold file paths and provide some basic functionality for working with them. From ff8c018ab58e911633051b3081364d9dd0bcfdba Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 15 Apr 2025 10:54:41 +0100 Subject: [PATCH 20/26] =?UTF-8?q?=E2=AC=86=EF=B8=8F=20Removed=20unnecessar?= =?UTF-8?q?ily=20tight=20dependency?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 4e956ec..3dc53d5 100644 --- a/Project.toml +++ b/Project.toml @@ -18,7 +18,7 @@ UnicodePlots = "b8865327-cd53-5732-bb35-84acbb429228" [compat] DiffEqBase = "6" RobustPmap = "1" -UnicodePlots = "3.7.2" +UnicodePlots = "3" [extras] Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" From 26abaa9aaaa1d9d851598c2a41b2662d12e31706 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 15 Apr 2025 11:32:15 +0100 Subject: [PATCH 21/26] =?UTF-8?q?=F0=9F=90=9B=20Mistake=20when=20setting?= =?UTF-8?q?=20up=20job=20id=20distributions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/concat_output.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index d5bc141..d14225f 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -10,7 +10,7 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St # Import simulation parameters simulation_parameters = jldopen(queue_file) # Go through each element in the input tensor and collect all jobs we have for it. - all_job_ids = get.(simulation_parameters["parameters"], "job_ids", Int[]) + all_job_ids = [get(simulation_parameters["parameters"][idx], "job_ids", Int[]) for idx in simulation_parameters["parameters"]] for file_index in eachindex(all_files) try file_results = jldopen(all_files[file_index].path)["results"] From 9a4817e09a5f1b07b7149e2a79bd446d54197451 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 15 Apr 2025 11:53:41 +0100 Subject: [PATCH 22/26] =?UTF-8?q?=F0=9F=90=9B=20Wrong=20indexing=20again?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/concat_output.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index d14225f..6668289 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -10,7 +10,7 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St # Import simulation parameters simulation_parameters = jldopen(queue_file) # Go through each element in the input tensor and collect all jobs we have for it. - all_job_ids = [get(simulation_parameters["parameters"][idx], "job_ids", Int[]) for idx in simulation_parameters["parameters"]] + all_job_ids = [get(simulation_parameters["parameters"][idx], "job_ids", Int[]) for idx in eachindex(simulation_parameters["parameters"])] for file_index in eachindex(all_files) try file_results = jldopen(all_files[file_index].path)["results"] From 2596c78f0496dab8a963f13177d87c3ea203cd0b Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 15 Apr 2025 13:19:39 +0100 Subject: [PATCH 23/26] =?UTF-8?q?=F0=9F=90=9B=20Mistake=20in=20tracking=20?= =?UTF-8?q?which=20subjobs=20were=20added.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/concat_output.jl | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index 6668289..8b6732a 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -22,11 +22,9 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St if !isassigned(results_container, parameter_index) results_container[parameter_index] = file_results symdiff!(results_container[parameter_index][2]["job_ids"], jobid) - else - if jobid ∉ results_container[parameter_index][2]["job_ids"] - results_container[parameter_index] = push_nqcd_outputs!(results_container[parameter_index], [file_results]; trajectories_key=trajectories_key) - symdiff!(results_container[parameter_index][2]["job_ids"], jobid) - end + elseif jobid ∈ results_container[parameter_index][2]["job_ids"] + results_container[parameter_index] = push_nqcd_outputs!(results_container[parameter_index], [file_results]; trajectories_key=trajectories_key) + symdiff!(results_container[parameter_index][2]["job_ids"], jobid) end else @error "Couldn't find a job ID for this file - Was the correct parameters file selected?" From 6ba71083adb54c2b62c7b9df4c117ecf01b367b7 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 3 Jun 2025 13:36:36 +0100 Subject: [PATCH 24/26] =?UTF-8?q?=E2=9C=A8=20Add=20job=20id=20source=20to?= =?UTF-8?q?=20read=20from=20files=20or=20dict?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Project.toml | 2 +- src/concat_output.jl | 67 +++++++++++++++----- src/file_based.jl | 144 +++++++++++++++++++++++++++++++++++-------- 3 files changed, 170 insertions(+), 43 deletions(-) diff --git a/Project.toml b/Project.toml index 3dc53d5..823a38d 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ClusterScripts" uuid = "0afbe08e-4e50-452c-a3af-5975c91048e8" authors = ["Alexander Spears and contributors"] -version = "1.0.0" +version = "1.0.1" [deps] CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" diff --git a/src/concat_output.jl b/src/concat_output.jl index 8b6732a..f800f41 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -1,29 +1,49 @@ -function concatenate_results!(results_container::AbstractArray, glob_pattern::String, queue_file::String; trajectories_key="trajectories") +function concatenate_results!( + results_container::AbstractArray, + glob_pattern::String, + queue_file::String; + trajectories_key = "trajectories", + job_id_source::Symbol = :filename, +) # Read in all files for a simulation queue. glob_pattern = SimulationFile(glob_pattern) all_files = map(SimulationFile, glob(glob_pattern.with_extension, glob_pattern.stem)) - progress = ProgressBar(total=length(all_files), printing_delay=1.0) + progress = ProgressBar(total = length(all_files), printing_delay = 1.0) set_description(progress, "Processing files: ") # Import simulation parameters simulation_parameters = jldopen(queue_file) # Go through each element in the input tensor and collect all jobs we have for it. - all_job_ids = [get(simulation_parameters["parameters"][idx], "job_ids", Int[]) for idx in eachindex(simulation_parameters["parameters"])] + all_job_ids = [ + get(simulation_parameters["parameters"][idx], "job_ids", Int[]) for + idx in eachindex(simulation_parameters["parameters"]) + ] for file_index in eachindex(all_files) try file_results = jldopen(all_files[file_index].path)["results"] - @debug "File read successfully" # Move data to the output tensor - jobid = file_results[2]["jobid"] + if job_id_source == :filename + jobid = + match( + r"-\d*-(\d*).jld2", + all_files[file_index].with_extension, + ).captures |> first + elseif job_id_source == :in_dict + jobid = file_results[2]["jobid"] + end parameter_index = findfirst(jobid .∈ all_job_ids) if !isnothing(parameter_index) if !isassigned(results_container, parameter_index) results_container[parameter_index] = file_results symdiff!(results_container[parameter_index][2]["job_ids"], jobid) elseif jobid ∈ results_container[parameter_index][2]["job_ids"] - results_container[parameter_index] = push_nqcd_outputs!(results_container[parameter_index], [file_results]; trajectories_key=trajectories_key) + results_container[parameter_index] = push_nqcd_outputs!( + results_container[parameter_index], + [file_results]; + trajectories_key = trajectories_key, + ) symdiff!(results_container[parameter_index][2]["job_ids"], jobid) end else @@ -31,7 +51,7 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St end # Remove job id from parameters once that result has been added catch e - @warn "File $(all_files[file_index].name) could not be read. It may be incomplete or corrupted." + @warn "File $(all_files[file_index].name) could not be read. It may be incomplete or corrupted." @debug "Concatenation logic failed due to the following error" error = e continue end @@ -43,19 +63,32 @@ function concatenate_results!(results_container::AbstractArray, glob_pattern::St for idx in eachindex(results_container) push!(parameter_sets, idx) if isassigned(results_container, idx) - push!(completeness, length(results_container[idx][2]["job_ids"]) / length(simulation_parameters["parameters"][idx]["job_ids"])) + push!( + completeness, + length(results_container[idx][2]["job_ids"]) / + length(simulation_parameters["parameters"][idx]["job_ids"]), + ) else push!(completeness, 0.0) end end - UnicodePlots.barplot(parameter_sets, completeness; title = "Completeness of results file") + UnicodePlots.barplot( + parameter_sets, + completeness; + title = "Completeness of results file", + ) end -function concatenate_results!(results_container::ResultsLazyLoader, glob_pattern::String, queue_file::String; trajectories_key="trajectories") +function concatenate_results!( + results_container::ResultsLazyLoader, + glob_pattern::String, + queue_file::String; + trajectories_key = "trajectories", +) # Read in all files for a simulation queue. glob_pattern = SimulationFile(glob_pattern) all_files = map(SimulationFile, glob(glob_pattern.with_extension, glob_pattern.stem)) - progress = ProgressBar(total=length(all_files), printing_delay=1.0) + progress = ProgressBar(total = length(all_files), printing_delay = 1.0) set_description(progress, "Processing files: ") # Import simulation parameters simulation_parameters = jldopen(queue_file) @@ -73,7 +106,8 @@ function concatenate_results!(results_container::ResultsLazyLoader, glob_pattern try file_results = jldopen(all_files[file_index].path)["results"] # Put data into vector if not already - file_data = isa(file_results[1], Vector) ? file_results[1] : [file_results[1]] + file_data = + isa(file_results[1], Vector) ? file_results[1] : [file_results[1]] # Move to cache append!(data_to_append, file_data) # Update trajectory count @@ -92,13 +126,16 @@ function concatenate_results!(results_container::ResultsLazyLoader, glob_pattern if !haskey(results_container.file["results"], "$(index)") results_container[index] = data_to_append else - results_container[index] = append!(deepcopy(results_container[index]), data_to_append) + results_container[index] = + append!(deepcopy(results_container[index]), data_to_append) end results_container.parameters[index][trajectories_key] += trajectories_read setdiff!(results_container.parameters[index]["job_ids"], ids_read) end # Trajectory completeness check - if !haskey(results_container.file["results"], "$(index)") || results_container.parameters[index]["total_$(trajectories_key)"] != results_container.parameters[index][trajectories_key] + if !haskey(results_container.file["results"], "$(index)") || + results_container.parameters[index]["total_$(trajectories_key)"] != + results_container.parameters[index][trajectories_key] @info "Simulation results are incomplete or oversubscribed in results[$(index)]. Make sure you have run all sub-jobs. " end end @@ -111,7 +148,7 @@ end Like a push!() function, but it also puts `first_output` into a vector if it wasn't already and adds the number of trajectories together. """ -function push_nqcd_outputs!(first_output, other_outputs; trajectories_key="trajectories") +function push_nqcd_outputs!(first_output, other_outputs; trajectories_key = "trajectories") for i in other_outputs for (k, v) in i[2] if k == trajectories_key diff --git a/src/file_based.jl b/src/file_based.jl index 3c2e644..35002eb 100644 --- a/src/file_based.jl +++ b/src/file_based.jl @@ -34,7 +34,8 @@ function save!(loader::ResultsLazyLoader) @info "Saved updated data to $(loader.file.path)" end -Base.show(io::IO, loader::ResultsLazyLoader) = print(io, "ResultsLazyLoader($(loader.file))") +Base.show(io::IO, loader::ResultsLazyLoader) = + print(io, "ResultsLazyLoader($(loader.file))") Base.size(loader::ResultsLazyLoader) = size(loader.parameters) Base.length(loader::ResultsLazyLoader) = length(loader.parameters) function Base.getindex(loader::ResultsLazyLoader, i::Int) @@ -65,7 +66,7 @@ function Base.setindex!(loader::ResultsLazyLoader, val, i::Int) end function save_as_jld2(filename, results_data) - jldsave(filename, compress=true; results=results_data) + jldsave(filename, compress = true; results = results_data) end """ @@ -75,20 +76,22 @@ Converts the results format of an ungrouped JLD2 file to the grouped format. **Warning: This method yields undefined simulation parameters which normally shouldn't occur in grouped JLD2 files.** """ function convert_to_grouped_jld2(filename, results_data) - jldopen(filename, "w"; compress=true) do file + jldopen(filename, "w"; compress = true) do file # Flag file as grouped file["grouped"] = true # Need to modify out-of place. parameters = Array{Dict{String,Any}}(undef, size(results_data)) # Store results in separate groups to load as required - indices_to_write = findall(x -> isassigned(results_data, x), eachindex(results_data)) + indices_to_write = + findall(x -> isassigned(results_data, x), eachindex(results_data)) for i in ProgressBar(indices_to_write) parameters[i] = results_data[i][2] file["results/$i"] = results_data[i][1] end # Create a group to store derived quantities file["parameters"] = parameters - file["derived_quantities"] = [Dict{Symbol,Any}() for i in eachindex(file["parameters"])] + file["derived_quantities"] = + [Dict{Symbol,Any}() for i in eachindex(file["parameters"])] end end @@ -97,15 +100,21 @@ end Converts the results format of an ungrouped JLD2 file to the grouped format and adds the simulation parameters from a simulation queue. """ -function convert_to_grouped_jld2(filename, results_data, simulation_queue; trajectories_key="trajectories") +function convert_to_grouped_jld2( + filename, + results_data, + simulation_queue; + trajectories_key = "trajectories", +) simulation_parameters = jldopen(simulation_queue, "r")["parameters"] - jldopen(filename, "w"; compress=true) do file + jldopen(filename, "w"; compress = true) do file # Flag file as grouped file["grouped"] = true # Can't in-place modify arrays with JLD2, so need to modify out-of place. parameters = Array{Dict{String,Any}}(undef, size(results_data)) # Store results in separate groups to load as required - indices_to_write = findall(x -> isassigned(results_data, x), eachindex(results_data)) + indices_to_write = + findall(x -> isassigned(results_data, x), eachindex(results_data)) for i in ProgressBar(indices_to_write) parameters[i] = results_data[i][2] file["results/$i"] = results_data[i][1] @@ -118,7 +127,8 @@ function convert_to_grouped_jld2(filename, results_data, simulation_queue; traje end # Create a group to store derived quantities file["parameters"] = parameters - file["derived_quantities"] = [Dict{Symbol,Any}() for i in eachindex(file["parameters"])] + file["derived_quantities"] = + [Dict{Symbol,Any}() for i in eachindex(file["parameters"])] end end @@ -145,15 +155,36 @@ This file contains the results of all jobs in the queue, as well as the input pa `truncate_times::Bool`: If true, the time array in the output will be truncated to the final value only. Useful to save space when a large number of identical trajectories are run with short time steps. """ -function create_results_file(output_filename::String, glob_pattern::String, queue_file::String; trajectories_key="trajectories", file_format::String="jld2") +function create_results_file( + output_filename::String, + glob_pattern::String, + queue_file::String; + trajectories_key = "trajectories", + file_format::String = "jld2", + job_id_source::Symbol = :filename, +) simulation_parameters = jldopen(queue_file) # Create an empty total output object output_tensor = Array{Tuple}(undef, (size(simulation_parameters["parameters"]))) - concatenate_results!(output_tensor, glob_pattern, queue_file; trajectories_key=trajectories_key) + concatenate_results!( + output_tensor, + glob_pattern, + queue_file; + trajectories_key = trajectories_key, + job_id_source::Symbol = job_id_source, + ) if file_format == "jld2" - save_as_jld2(output_filename, reshape(output_tensor, size(simulation_parameters["parameters"]))) + save_as_jld2( + output_filename, + reshape(output_tensor, size(simulation_parameters["parameters"])), + ) elseif file_format == "jld2_grouped" - convert_to_grouped_jld2(output_filename, reshape(output_tensor, size(simulation_parameters["parameters"])), queue_file; trajectories_key=trajectories_key) + convert_to_grouped_jld2( + output_filename, + reshape(output_tensor, size(simulation_parameters["parameters"])), + queue_file; + trajectories_key = trajectories_key, + ) end return reshape(output_tensor, size(simulation_parameters["parameters"])) end @@ -174,19 +205,44 @@ Merges existing results from an **ungrouped JLD2 file** into a new **ungrouped J """ -function update_results_file(input_file::String, glob_pattern::String, queue_file::String, output_file::String; trajectories_key="trajectories", file_format::String="jld2") +function update_results_file( + input_file::String, + glob_pattern::String, + queue_file::String, + output_file::String; + trajectories_key = "trajectories", + file_format::String = "jld2", + job_id_source::Symbol = :filename, +) simulation_parameters = jldopen(queue_file) # Create an empty total output object output_tensor = jldopen(input_file)["results"] - concatenate_results!(output_tensor, glob_pattern, queue_file; trajectories_key=trajectories_key) + concatenate_results!( + output_tensor, + glob_pattern, + queue_file; + trajectories_key = trajectories_key, + job_id_source = job_id_source, + ) if file_format == "jld2" save_as_jld2(output_file, output_tensor) end return reshape(output_tensor, size(simulation_parameters["parameters"])) end -function update_results_file!(input_file::ResultsLazyLoader, glob_pattern::String, queue_file::String; trajectories_key="trajectories", file_format::String="jld2") - concatenate_results!(input_file, glob_pattern, queue_file; trajectories_key=trajectories_key) +function update_results_file!( + input_file::ResultsLazyLoader, + glob_pattern::String, + queue_file::String; + trajectories_key = "trajectories", + file_format::String = "jld2", +) + concatenate_results!( + input_file, + glob_pattern, + queue_file; + trajectories_key = trajectories_key, + ) end """ @@ -199,7 +255,16 @@ function build_job_queue(fixed_parameters::Dict, variables::Dict) merged_combinations = Vector{Dict}() variable_combinations = reshape(collect(Iterators.product(values(variables)...)), :) for i in eachindex(variable_combinations) - push!(merged_combinations, merge(fixed_parameters, Dict([(collect(keys(variables))[j], variable_combinations[i][j]) for j in 1:length(keys(variables))]))) + push!( + merged_combinations, + merge( + fixed_parameters, + Dict([ + (collect(keys(variables))[j], variable_combinations[i][j]) for + j = 1:length(keys(variables)) + ]), + ), + ) end return merged_combinations end @@ -210,11 +275,24 @@ end Returns a Vector of all unique combinations of values in `variables` merged with `fixed_parameters`. By specifying a `postprocessing_function`, further actions can be performed each of the elements in the resulting vector. """ -function build_job_queue(fixed_parameters::Dict, variables::Dict, postprocessing_function::Function) +function build_job_queue( + fixed_parameters::Dict, + variables::Dict, + postprocessing_function::Function, +) merged_combinations = Vector{Dict}() variable_combinations = reshape(collect(Iterators.product(values(variables)...)), :) for i in eachindex(variable_combinations) - push!(merged_combinations, merge(fixed_parameters, Dict([(collect(keys(variables))[j], variable_combinations[i][j]) for j in 1:length(keys(variables))]))) + push!( + merged_combinations, + merge( + fixed_parameters, + Dict([ + (collect(keys(variables))[j], variable_combinations[i][j]) for + j = 1:length(keys(variables)) + ]), + ), + ) end # Accept a function that does in-place modification of the input parameters dictionary return map(postprocessing_function, merged_combinations) @@ -231,17 +309,22 @@ Set "trajectories_key" in case jobs should be split by something different. Set "filename" to save the resulting batch queue somewhere different than `simulation_parameters.jld2`. """ -function serialise_queue!(input_dict_tensor::Vector{<:Dict{<:Any}}; trajectories_key="trajectories", filename="simulation_parameters.jld2") +function serialise_queue!( + input_dict_tensor::Vector{<:Dict{<:Any}}; + trajectories_key = "trajectories", + filename = "simulation_parameters.jld2", +) queue = [] #Empty queue array to fill with views of input_dict_tensor job_id = 1 for index in eachindex(input_dict_tensor) # Save a list of jobs created from an input dict within it. input_dict_tensor[index]["job_ids"] = [] # Save the total number of trajectories before modification of the input dict to verify completeness on analysis. - input_dict_tensor[index]["total_trajectories"] = input_dict_tensor[index][trajectories_key] + input_dict_tensor[index]["total_trajectories"] = + input_dict_tensor[index][trajectories_key] if get!(input_dict_tensor[index], "batchsize", 1) == 1 # Case 1: Fully serialised operation - Split into as many jobs as trajectories. - for trj in 1:input_dict_tensor[index][trajectories_key] + for trj = 1:input_dict_tensor[index][trajectories_key] # Add a view of the input dict push!(queue, view(input_dict_tensor, index)) push!(input_dict_tensor[index]["job_ids"], job_id) @@ -251,19 +334,26 @@ function serialise_queue!(input_dict_tensor::Vector{<:Dict{<:Any}}; trajectories else # Case 2: Larger batch size - There might be some benefit like multithreading, so split into chunks of a certain size. # Work in batchsize chunks - input_dict_tensor[index][trajectories_key] = input_dict_tensor[index]["batchsize"] + input_dict_tensor[index][trajectories_key] = + input_dict_tensor[index]["batchsize"] # If there enough trajectories to fit in >1 batch: - for batch in 2:(floor(input_dict_tensor[index]["total_trajectories"] / input_dict_tensor[index]["batchsize"])) + for batch = + 2:(floor( + input_dict_tensor[index]["total_trajectories"] / + input_dict_tensor[index]["batchsize"], + )) push!(queue, view(input_dict_tensor, index)) push!(input_dict_tensor[index]["job_ids"], job_id) job_id += 1 end extra_parameters = copy(input_dict_tensor[index]) - extra_parameters[trajectories_key] += input_dict_tensor[index]["total_trajectories"] % input_dict_tensor[index]["batchsize"] + extra_parameters[trajectories_key] += + input_dict_tensor[index]["total_trajectories"] % + input_dict_tensor[index]["batchsize"] push!(queue, hcat(extra_parameters)) # This covers any cases where the number of trajectories isn't exactly divisible by the batch size. push!(input_dict_tensor[index]["job_ids"], job_id) job_id += 1 end end - jldsave(filename; parameters=input_dict_tensor, queue=queue) + jldsave(filename; parameters = input_dict_tensor, queue = queue) end From 39f9024d3ce17bf45ebad6e9a42f6c54850073f4 Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 3 Jun 2025 13:39:28 +0100 Subject: [PATCH 25/26] =?UTF-8?q?=F0=9F=90=9B=20Wrong=20type=20definition?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/concat_output.jl | 2 +- src/file_based.jl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index f800f41..03bb0f2 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -6,7 +6,7 @@ function concatenate_results!( glob_pattern::String, queue_file::String; trajectories_key = "trajectories", - job_id_source::Symbol = :filename, + job_id_source = :filename, ) # Read in all files for a simulation queue. glob_pattern = SimulationFile(glob_pattern) diff --git a/src/file_based.jl b/src/file_based.jl index 35002eb..07dcc80 100644 --- a/src/file_based.jl +++ b/src/file_based.jl @@ -171,7 +171,7 @@ function create_results_file( glob_pattern, queue_file; trajectories_key = trajectories_key, - job_id_source::Symbol = job_id_source, + job_id_source = job_id_source, ) if file_format == "jld2" save_as_jld2( From 506e1ddd8bfa48ef71689a0533fb022a22f5707e Mon Sep 17 00:00:00 2001 From: Alexander Spears Date: Tue, 3 Jun 2025 13:50:30 +0100 Subject: [PATCH 26/26] =?UTF-8?q?=F0=9F=90=9B=20Forgot=20to=20parse=20jobi?= =?UTF-8?q?d=20as=20Int?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/concat_output.jl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/concat_output.jl b/src/concat_output.jl index 03bb0f2..79cb18c 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -25,11 +25,13 @@ function concatenate_results!( file_results = jldopen(all_files[file_index].path)["results"] # Move data to the output tensor if job_id_source == :filename - jobid = + jobid = parse( + Int, match( r"-\d*-(\d*).jld2", all_files[file_index].with_extension, - ).captures |> first + ).captures |> first, + ) elseif job_id_source == :in_dict jobid = file_results[2]["jobid"] end