diff --git a/Project.toml b/Project.toml index 44354cf..823a38d 100644 --- a/Project.toml +++ b/Project.toml @@ -1,9 +1,11 @@ name = "ClusterScripts" uuid = "0afbe08e-4e50-452c-a3af-5975c91048e8" authors = ["Alexander Spears and contributors"] -version = "1.0.0" +version = "1.0.1" [deps] +CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" +DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" DiffEqBase = "2b5f629d-d688-5b77-993f-72d75c75574e" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" FilePathsBase = "48062228-2e41-5def-b9a4-89aafe57970f" @@ -11,11 +13,12 @@ Glob = "c27321d9-0574-5035-807b-f59d2c89b15c" JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819" ProgressBars = "49802e3a-d2f1-5c88-81d8-b72133a6f568" RobustPmap = "27aeedcb-f738-516b-a0b8-3211cf1146e5" +UnicodePlots = "b8865327-cd53-5732-bb35-84acbb429228" [compat] DiffEqBase = "6" RobustPmap = "1" -julia = "1.6.2" +UnicodePlots = "3" [extras] Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" diff --git a/src/ClusterScripts.jl b/src/ClusterScripts.jl index 10c3053..5bb2005 100644 --- a/src/ClusterScripts.jl +++ b/src/ClusterScripts.jl @@ -7,6 +7,7 @@ using JLD2 using RobustPmap using Glob using ProgressBars +import UnicodePlots """ Struct to hold file paths and provide some basic functionality for working with them. @@ -50,4 +51,7 @@ export pmap_queue, merge_pmap_results include("file_based.jl") export build_job_queue, create_results_file, update_results_file, update_results_file!, serialise_queue!, save! +include("csv_out.jl") +export create_csv_file, update_csv_file! + end diff --git a/src/concat_output.jl b/src/concat_output.jl index cf4062f..79cb18c 100644 --- a/src/concat_output.jl +++ b/src/concat_output.jl @@ -1,50 +1,96 @@ -function concatenate_results!(results_container::AbstractArray, glob_pattern::String, queue_file::String; trajectories_key="trajectories") +function concatenate_results!( + results_container::AbstractArray, + glob_pattern::String, + queue_file::String; + trajectories_key = "trajectories", + job_id_source = :filename, +) # Read in all files for a simulation queue. glob_pattern = SimulationFile(glob_pattern) all_files = map(SimulationFile, glob(glob_pattern.with_extension, glob_pattern.stem)) - progress = ProgressBar(total=length(all_files), printing_delay=1.0) + progress = ProgressBar(total = length(all_files), printing_delay = 1.0) set_description(progress, "Processing files: ") # Import simulation parameters simulation_parameters = jldopen(queue_file) # Go through each element in the input tensor and collect all jobs we have for it. - for index in eachindex(simulation_parameters["parameters"]) - # Read job ids from results if possible to avoid reading duplicates. - job_ids = !isassigned(results_container, index) ? simulation_parameters["parameters"][index]["job_ids"] : container[index][2]["job_ids"] - to_read = findall(x -> split(x.name, "_")[end] in string.(job_ids), all_files) - for file_index in to_read - try - file_results = jldopen(all_files[file_index].path)["results"] - @debug "File read successfully" - # Move data to the output tensor - if !isassigned(results_container, index) - results_container[index] = file_results - else - results_container[index] = push_nqcd_outputs!(results_container[index], [file_results]; trajectories_key=trajectories_key) + all_job_ids = [ + get(simulation_parameters["parameters"][idx], "job_ids", Int[]) for + idx in eachindex(simulation_parameters["parameters"]) + ] + for file_index in eachindex(all_files) + try + file_results = jldopen(all_files[file_index].path)["results"] + # Move data to the output tensor + if job_id_source == :filename + jobid = parse( + Int, + match( + r"-\d*-(\d*).jld2", + all_files[file_index].with_extension, + ).captures |> first, + ) + elseif job_id_source == :in_dict + jobid = file_results[2]["jobid"] + end + parameter_index = findfirst(jobid .∈ all_job_ids) + if !isnothing(parameter_index) + if !isassigned(results_container, parameter_index) + results_container[parameter_index] = file_results + symdiff!(results_container[parameter_index][2]["job_ids"], jobid) + elseif jobid ∈ results_container[parameter_index][2]["job_ids"] + results_container[parameter_index] = push_nqcd_outputs!( + results_container[parameter_index], + [file_results]; + trajectories_key = trajectories_key, + ) + symdiff!(results_container[parameter_index][2]["job_ids"], jobid) end - # Remove job id from parameters once that result has been added - jobid = parse(Int, split(all_files[file_index].name, "_")[end]) - deleteat!(results_container[index][2]["job_ids"], findall(results_container[index][2]["job_ids"] .== jobid)...) - catch - @warn "File $(all_files[file_index].name) could not be read. It may be incomplete or corrupted." - continue + else + @error "Couldn't find a job ID for this file - Was the correct parameters file selected?" end - update(progress) + # Remove job id from parameters once that result has been added + catch e + @warn "File $(all_files[file_index].name) could not be read. It may be incomplete or corrupted." + @debug "Concatenation logic failed due to the following error" error = e + continue end - # Trajectory completeness check - if !isassigned(results_container, index) || results_container[index][2]["total_trajectories"] != results_container[index][2]["trajectories"] - @info "Simulation results are incomplete or oversubscribed in results[$(index)]. Make sure you have run all sub-jobs. " + update(progress) + end + # Trajectory completeness check + parameter_sets = [] + completeness = Float64[] + for idx in eachindex(results_container) + push!(parameter_sets, idx) + if isassigned(results_container, idx) + push!( + completeness, + length(results_container[idx][2]["job_ids"]) / + length(simulation_parameters["parameters"][idx]["job_ids"]), + ) + else + push!(completeness, 0.0) end end + UnicodePlots.barplot( + parameter_sets, + completeness; + title = "Completeness of results file", + ) end -function concatenate_results!(results_container::ResultsLazyLoader, glob_pattern::String, queue_file::String; trajectories_key="trajectories") +function concatenate_results!( + results_container::ResultsLazyLoader, + glob_pattern::String, + queue_file::String; + trajectories_key = "trajectories", +) # Read in all files for a simulation queue. glob_pattern = SimulationFile(glob_pattern) all_files = map(SimulationFile, glob(glob_pattern.with_extension, glob_pattern.stem)) - progress = ProgressBar(total=length(all_files), printing_delay=1.0) + progress = ProgressBar(total = length(all_files), printing_delay = 1.0) set_description(progress, "Processing files: ") # Import simulation parameters simulation_parameters = jldopen(queue_file) @@ -55,14 +101,15 @@ function concatenate_results!(results_container::ResultsLazyLoader, glob_pattern data_to_append = [] trajectories_read = 0 ids_read = Int[] - to_read = findall(x -> split(x.name, "_")[end] in string.(job_ids), all_files) + to_read = findall(x -> parse(Int, split(x.name, "_")[end]) in job_ids, all_files) sizehint!(data_to_append, length(to_read)) sizehint!(ids_read, length(to_read)) for file_index in to_read try file_results = jldopen(all_files[file_index].path)["results"] # Put data into vector if not already - file_data = isa(file_results[1], Vector) ? file_results[1] : [file_results[1]] + file_data = + isa(file_results[1], Vector) ? file_results[1] : [file_results[1]] # Move to cache append!(data_to_append, file_data) # Update trajectory count @@ -81,13 +128,16 @@ function concatenate_results!(results_container::ResultsLazyLoader, glob_pattern if !haskey(results_container.file["results"], "$(index)") results_container[index] = data_to_append else - results_container[index] = append!(deepcopy(results_container[index]), data_to_append) + results_container[index] = + append!(deepcopy(results_container[index]), data_to_append) end results_container.parameters[index][trajectories_key] += trajectories_read setdiff!(results_container.parameters[index]["job_ids"], ids_read) end # Trajectory completeness check - if !haskey(results_container.file["results"], "$(index)") || results_container.parameters[index]["total_$(trajectories_key)"] != results_container.parameters[index][trajectories_key] + if !haskey(results_container.file["results"], "$(index)") || + results_container.parameters[index]["total_$(trajectories_key)"] != + results_container.parameters[index][trajectories_key] @info "Simulation results are incomplete or oversubscribed in results[$(index)]. Make sure you have run all sub-jobs. " end end @@ -98,10 +148,9 @@ end """ push_nqcd_outputs!!(first_output, other_outputs...) - Like a push!() function, but it also puts `first_output` into a vector if it wasn't already and adds the number of trajectories together. -TBW +Like a push!() function, but it also puts `first_output` into a vector if it wasn't already and adds the number of trajectories together. """ -function push_nqcd_outputs!(first_output, other_outputs; trajectories_key="trajectories") +function push_nqcd_outputs!(first_output, other_outputs; trajectories_key = "trajectories") for i in other_outputs for (k, v) in i[2] if k == trajectories_key diff --git a/src/csv_out.jl b/src/csv_out.jl new file mode 100644 index 0000000..5057bbe --- /dev/null +++ b/src/csv_out.jl @@ -0,0 +1,100 @@ +using CSV, DataFrames + +function create_csv_file(output_filename::String, glob_pattern::String, queue_file::String) + # Create an empty output CSV + output_dataframe = DataFrame(job_id=Int[], parameters_set=Int[]) + return update_csv_file!(output_filename, output_dataframe, glob_pattern, queue_file) +end + +function update_csv_file!(output_filename::String, input_file::DataFrame, glob_pattern::String, queue_file::String) + simulation_parameters = jldopen(queue_file) + # Create an empty output CSV + # Concatenate results + glob_pattern = SimulationFile(glob_pattern) + all_files = map(SimulationFile, glob(glob_pattern.with_extension, glob_pattern.stem)) + progress = ProgressBar(total=length(all_files), printing_delay=1.0) + set_description(progress, "Processing files: ") + for index in eachindex(vec(simulation_parameters["parameters"])) + # Read job ids from results if possible to avoid reading duplicates. + job_ids = convert(Vector{Int64}, simulation_parameters["parameters"][index]["job_ids"]) + to_read = findall(x -> parse(Int, split(x.name, "_")[end]) in job_ids, all_files) + for file_index in to_read + try + file_results = jldopen(all_files[file_index].path)["results"] + @debug "File read successfully" + # Columns to write out + output_columns = [:job_id, :parameter_set] + for (k, v) in pairs(file_results[1][1]) + # Only make entries for non-vector outputs. (Number, Bool, String are OK) + !isa(v, AbstractArray) ? push!(output_columns, k) : nothing + end + # Collect output values + output_values = Any[] + all_jobids = isa(file_results[2]["jobid"], Vector) ? file_results[2]["jobid"] : [file_results[2]["jobid"]] + new_jobids = findall(x -> !(x in input_file.job_id), all_jobids) + push!(output_values, all_jobids[new_jobids]) + parameter_set = fill(index, length(new_jobids)) + push!(output_values, parameter_set) + sizehint!(output_values, length(output_columns)) + for column in output_columns[3:end] #excluding job_id and parameters_set + col_values = getindex.(file_results[1][new_jobids], column) + push!(output_values, replace(col_values, nothing => missing)) + end + # Add to Dataframe + input_file = vcat(input_file, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union) + catch e + @warn "Error reading file: $e" + end + update(progress) + end + end + CSV.write(output_filename, input_file) +end + +function results_array_to_dataframe(output_dicts::Vector, params_dict::Dict; params_index=missing) + output_dataframe = DataFrame(job_id=Int[], parameters_set=Int[]) + output_columns = [:job_id, :parameters_set] + for (k, v) in pairs(output_dicts[1]) + # Only make entries for non-vector outputs. (Number, Bool, String are OK) + !isa(v, AbstractArray) ? push!(output_columns, k) : nothing + end + # Collect output values + output_values = Any[] + sizehint!(output_values, length(output_columns)) + parameter_set = fill(params_index, length(output_dicts)) + push!(output_values, parameter_set) + for column in output_columns[3:end] #excluding job_id and parameters_set + col_values = getindex.(output_dicts, column) + push!(output_values, replace(col_values, nothing => missing)) + end + # Add to Dataframe + output_dataframe = vcat(output_dataframe, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union) +end + +function jld2_ungrouped_to_csv(csv_output::String, jld2_input::String) + jld2_results = jldopen(jld2_input)["results"] + output_dataframe = DataFrame(job_id=Int[], parameters_set=Int[]) + for index in eachindex(jld2_results) + try + output_columns = [:job_id, :parameters_set] + for (k, v) in pairs(jld2_results[index][1][1]) + # Only make entries for non-vector outputs. (Number, Bool, String are OK) + !isa(v, AbstractArray) ? push!(output_columns, k) : nothing + end + # Collect output values + output_values = Any[] + sizehint!(output_values, length(output_columns)) + parameter_set = fill(index, length(jld2_results[index][1])) + push!(output_values, parameter_set) + for column in output_columns[3:end] #excluding job_id and parameters_set + col_values = getindex.(jld2_results[index][1], column) + push!(output_values, replace(col_values, nothing => missing)) + end + # Add to Dataframe + output_dataframe = vcat(output_dataframe, DataFrame([i => j for (i, j) in zip(output_columns, output_values)]), cols=:union) + catch e + @warn "Error reading index $(index): $(e)" + end + end + CSV.write(csv_output, output_dataframe) +end diff --git a/src/file_based.jl b/src/file_based.jl index e7f9a2e..07dcc80 100644 --- a/src/file_based.jl +++ b/src/file_based.jl @@ -4,7 +4,10 @@ """ save!(loader::ResultsLazyLoader) +**Warning: Don't run this on an unstable system or network connection to prevent data loss!** + Updates the stored parameters and derived quantities inside a grouped JLD2 file. + Run this function after modifying `loader.parameters` or `loader.derived_quantities` to save the changes. """ function save!(loader::ResultsLazyLoader) @@ -31,7 +34,8 @@ function save!(loader::ResultsLazyLoader) @info "Saved updated data to $(loader.file.path)" end -Base.show(io::IO, loader::ResultsLazyLoader) = print(io, "ResultsLazyLoader($(loader.file))") +Base.show(io::IO, loader::ResultsLazyLoader) = + print(io, "ResultsLazyLoader($(loader.file))") Base.size(loader::ResultsLazyLoader) = size(loader.parameters) Base.length(loader::ResultsLazyLoader) = length(loader.parameters) function Base.getindex(loader::ResultsLazyLoader, i::Int) @@ -62,7 +66,7 @@ function Base.setindex!(loader::ResultsLazyLoader, val, i::Int) end function save_as_jld2(filename, results_data) - jldsave(filename, compress=true; results=results_data) + jldsave(filename, compress = true; results = results_data) end """ @@ -72,20 +76,22 @@ Converts the results format of an ungrouped JLD2 file to the grouped format. **Warning: This method yields undefined simulation parameters which normally shouldn't occur in grouped JLD2 files.** """ function convert_to_grouped_jld2(filename, results_data) - jldopen(filename, "w"; compress=true) do file + jldopen(filename, "w"; compress = true) do file # Flag file as grouped file["grouped"] = true # Need to modify out-of place. parameters = Array{Dict{String,Any}}(undef, size(results_data)) # Store results in separate groups to load as required - indices_to_write = findall(x -> isassigned(results_data, x), eachindex(results_data)) + indices_to_write = + findall(x -> isassigned(results_data, x), eachindex(results_data)) for i in ProgressBar(indices_to_write) parameters[i] = results_data[i][2] file["results/$i"] = results_data[i][1] end # Create a group to store derived quantities file["parameters"] = parameters - file["derived_quantities"] = [Dict{Symbol,Any}() for i in eachindex(file["parameters"])] + file["derived_quantities"] = + [Dict{Symbol,Any}() for i in eachindex(file["parameters"])] end end @@ -94,15 +100,21 @@ end Converts the results format of an ungrouped JLD2 file to the grouped format and adds the simulation parameters from a simulation queue. """ -function convert_to_grouped_jld2(filename, results_data, simulation_queue; trajectories_key="trajectories") +function convert_to_grouped_jld2( + filename, + results_data, + simulation_queue; + trajectories_key = "trajectories", +) simulation_parameters = jldopen(simulation_queue, "r")["parameters"] - jldopen(filename, "w"; compress=true) do file + jldopen(filename, "w"; compress = true) do file # Flag file as grouped file["grouped"] = true # Can't in-place modify arrays with JLD2, so need to modify out-of place. parameters = Array{Dict{String,Any}}(undef, size(results_data)) # Store results in separate groups to load as required - indices_to_write = findall(x -> isassigned(results_data, x), eachindex(results_data)) + indices_to_write = + findall(x -> isassigned(results_data, x), eachindex(results_data)) for i in ProgressBar(indices_to_write) parameters[i] = results_data[i][2] file["results/$i"] = results_data[i][1] @@ -115,7 +127,8 @@ function convert_to_grouped_jld2(filename, results_data, simulation_queue; traje end # Create a group to store derived quantities file["parameters"] = parameters - file["derived_quantities"] = [Dict{Symbol,Any}() for i in eachindex(file["parameters"])] + file["derived_quantities"] = + [Dict{Symbol,Any}() for i in eachindex(file["parameters"])] end end @@ -142,15 +155,36 @@ This file contains the results of all jobs in the queue, as well as the input pa `truncate_times::Bool`: If true, the time array in the output will be truncated to the final value only. Useful to save space when a large number of identical trajectories are run with short time steps. """ -function create_results_file(output_filename::String, glob_pattern::String, queue_file::String; trajectories_key="trajectories", file_format::String="jld2") +function create_results_file( + output_filename::String, + glob_pattern::String, + queue_file::String; + trajectories_key = "trajectories", + file_format::String = "jld2", + job_id_source::Symbol = :filename, +) simulation_parameters = jldopen(queue_file) # Create an empty total output object output_tensor = Array{Tuple}(undef, (size(simulation_parameters["parameters"]))) - concatenate_results!(output_tensor, glob_pattern, queue_file; trajectories_key=trajectories_key) + concatenate_results!( + output_tensor, + glob_pattern, + queue_file; + trajectories_key = trajectories_key, + job_id_source = job_id_source, + ) if file_format == "jld2" - save_as_jld2(output_filename, reshape(output_tensor, size(simulation_parameters["parameters"]))) + save_as_jld2( + output_filename, + reshape(output_tensor, size(simulation_parameters["parameters"])), + ) elseif file_format == "jld2_grouped" - convert_to_grouped_jld2(output_filename, reshape(output_tensor, size(simulation_parameters["parameters"])), queue_file; trajectories_key=trajectories_key) + convert_to_grouped_jld2( + output_filename, + reshape(output_tensor, size(simulation_parameters["parameters"])), + queue_file; + trajectories_key = trajectories_key, + ) end return reshape(output_tensor, size(simulation_parameters["parameters"])) end @@ -171,19 +205,44 @@ Merges existing results from an **ungrouped JLD2 file** into a new **ungrouped J """ -function update_results_file(input_file::String, glob_pattern::String, queue_file::String, output_file::String; trajectories_key="trajectories", file_format::String="jld2") +function update_results_file( + input_file::String, + glob_pattern::String, + queue_file::String, + output_file::String; + trajectories_key = "trajectories", + file_format::String = "jld2", + job_id_source::Symbol = :filename, +) simulation_parameters = jldopen(queue_file) # Create an empty total output object output_tensor = jldopen(input_file)["results"] - concatenate_results!(output_tensor, glob_pattern, queue_file; trajectories_key=trajectories_key) + concatenate_results!( + output_tensor, + glob_pattern, + queue_file; + trajectories_key = trajectories_key, + job_id_source = job_id_source, + ) if file_format == "jld2" - save_as_jld2(output_filename, output_tensor) + save_as_jld2(output_file, output_tensor) end return reshape(output_tensor, size(simulation_parameters["parameters"])) end -function update_results_file!(input_file::ResultsLazyLoader, glob_pattern::String, queue_file::String; trajectories_key="trajectories", file_format::String="jld2") - concatenate_results!(input_file, glob_pattern, queue_file; trajectories_key=trajectories_key) +function update_results_file!( + input_file::ResultsLazyLoader, + glob_pattern::String, + queue_file::String; + trajectories_key = "trajectories", + file_format::String = "jld2", +) + concatenate_results!( + input_file, + glob_pattern, + queue_file; + trajectories_key = trajectories_key, + ) end """ @@ -196,7 +255,16 @@ function build_job_queue(fixed_parameters::Dict, variables::Dict) merged_combinations = Vector{Dict}() variable_combinations = reshape(collect(Iterators.product(values(variables)...)), :) for i in eachindex(variable_combinations) - push!(merged_combinations, merge(fixed_parameters, Dict([(collect(keys(variables))[j], variable_combinations[i][j]) for j in 1:length(keys(variables))]))) + push!( + merged_combinations, + merge( + fixed_parameters, + Dict([ + (collect(keys(variables))[j], variable_combinations[i][j]) for + j = 1:length(keys(variables)) + ]), + ), + ) end return merged_combinations end @@ -207,11 +275,24 @@ end Returns a Vector of all unique combinations of values in `variables` merged with `fixed_parameters`. By specifying a `postprocessing_function`, further actions can be performed each of the elements in the resulting vector. """ -function build_job_queue(fixed_parameters::Dict, variables::Dict, postprocessing_function::Function) +function build_job_queue( + fixed_parameters::Dict, + variables::Dict, + postprocessing_function::Function, +) merged_combinations = Vector{Dict}() variable_combinations = reshape(collect(Iterators.product(values(variables)...)), :) for i in eachindex(variable_combinations) - push!(merged_combinations, merge(fixed_parameters, Dict([(collect(keys(variables))[j], variable_combinations[i][j]) for j in 1:length(keys(variables))]))) + push!( + merged_combinations, + merge( + fixed_parameters, + Dict([ + (collect(keys(variables))[j], variable_combinations[i][j]) for + j = 1:length(keys(variables)) + ]), + ), + ) end # Accept a function that does in-place modification of the input parameters dictionary return map(postprocessing_function, merged_combinations) @@ -228,17 +309,22 @@ Set "trajectories_key" in case jobs should be split by something different. Set "filename" to save the resulting batch queue somewhere different than `simulation_parameters.jld2`. """ -function serialise_queue!(input_dict_tensor::Vector{<:Dict{<:Any}}; trajectories_key="trajectories", filename="simulation_parameters.jld2") +function serialise_queue!( + input_dict_tensor::Vector{<:Dict{<:Any}}; + trajectories_key = "trajectories", + filename = "simulation_parameters.jld2", +) queue = [] #Empty queue array to fill with views of input_dict_tensor job_id = 1 for index in eachindex(input_dict_tensor) # Save a list of jobs created from an input dict within it. input_dict_tensor[index]["job_ids"] = [] # Save the total number of trajectories before modification of the input dict to verify completeness on analysis. - input_dict_tensor[index]["total_trajectories"] = input_dict_tensor[index][trajectories_key] + input_dict_tensor[index]["total_trajectories"] = + input_dict_tensor[index][trajectories_key] if get!(input_dict_tensor[index], "batchsize", 1) == 1 # Case 1: Fully serialised operation - Split into as many jobs as trajectories. - for trj in 1:input_dict_tensor[index][trajectories_key] + for trj = 1:input_dict_tensor[index][trajectories_key] # Add a view of the input dict push!(queue, view(input_dict_tensor, index)) push!(input_dict_tensor[index]["job_ids"], job_id) @@ -248,19 +334,26 @@ function serialise_queue!(input_dict_tensor::Vector{<:Dict{<:Any}}; trajectories else # Case 2: Larger batch size - There might be some benefit like multithreading, so split into chunks of a certain size. # Work in batchsize chunks - input_dict_tensor[index][trajectories_key] = input_dict_tensor[index]["batchsize"] + input_dict_tensor[index][trajectories_key] = + input_dict_tensor[index]["batchsize"] # If there enough trajectories to fit in >1 batch: - for batch in 2:(floor(input_dict_tensor[index]["total_trajectories"] / input_dict_tensor[index]["batchsize"])) + for batch = + 2:(floor( + input_dict_tensor[index]["total_trajectories"] / + input_dict_tensor[index]["batchsize"], + )) push!(queue, view(input_dict_tensor, index)) push!(input_dict_tensor[index]["job_ids"], job_id) job_id += 1 end extra_parameters = copy(input_dict_tensor[index]) - extra_parameters[trajectories_key] += input_dict_tensor[index]["total_trajectories"] % input_dict_tensor[index]["batchsize"] + extra_parameters[trajectories_key] += + input_dict_tensor[index]["total_trajectories"] % + input_dict_tensor[index]["batchsize"] push!(queue, hcat(extra_parameters)) # This covers any cases where the number of trajectories isn't exactly divisible by the batch size. push!(input_dict_tensor[index]["job_ids"], job_id) job_id += 1 end end - jldsave(filename; parameters=input_dict_tensor, queue=queue) + jldsave(filename; parameters = input_dict_tensor, queue = queue) end