-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrunAvaScenModelChain.py
More file actions
468 lines (402 loc) · 24.3 KB
/
runAvaScenModelChain.py
File metadata and controls
468 lines (402 loc) · 24.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
#
# ───────────────────────────────────────────────────────────────────────────────────────────────
#
# ██████╗ ██╗ ██╗ ██████╗ ████████╗ ██████╗ ███████╗ ███╗ ██╗
# ██╔══██╗ ██╗ ██║ ██╔══██╗ ╚██╔════╝ ██╔════╝ ██╔════╝ ████╗ ██║
# ███████║ ██║ ██╔╝ ███████║ ███████╗ ██║ █████╗ ██╔██╗ ██║
# ██╔══██║ ██║██╔╝ ██╔══██║ ╚════██║ ██║ ██╔══╝ ██║╚██╗██║
# ██║ ██║ ╚███╔╝ ██║ ███╗██╗████████║ ╚██████╗ ███████╗ ██║ ╚████║ █████╗ ███╗██╗
# ╚═╝ ╚═╝ ╚══╝ ╚═╝ ╚══╝╚═╝╚═══════╝ ╚═════╝ ╚══════╝ ╚═╝ ╚═══╝ ╚════╝ ╚══╝╚═╝
# ───────────────────────────────────────────────────────────────────────────────────────────────
# ███████ A V A L A N C H E · S C E N E N A R I O · M O D E L · C H A I N ████████
# ───────────────────────────────────────────────────────────────────────────────────────────────
#
# Purpose :
# Master orchestrator for the Avalanche Scenario Model Chain (Steps 00–15).
# Drives the end-to-end workflow:
# PRA delineation → PRA processing → FlowPy simulation → AvaDirectory compilation.
#
# Inputs :
# - local_avaScenModelChainCfg.ini / avaScenModelChainCfg.ini
# - 00_input/ directory with DEM, FOREST, BOUNDARY
#
# Outputs :
# - Structured scenario directories
# - FlowPy simulation outputs
# - AvaDirectory (Types, Results) for mapping / analysis
#
# Config :
# [MAIN] Project metadata, paths, input rasters
# [WORKFLOW] Activation flags for all steps
# [pra*] PRA preprocessing (Steps 01–08)
# [ava*] FlowPy parameterization, simulation and directory building (Steps 09–15)
#
# Consumes :
# com1PRA/ (Steps 01–08)
# com2AvaDirectory/ (Steps 13–15)
#
# Depends on :
# in1Utils.cfgUtils – config loading & GDAL/PROJ environment setup
# in1Utils.workflowUtils – unified workflow orchestration (stepEnabled, timers, logging)
# in1Utils.dataUtils – raster/vector I/O, compression utilities
# in2Parameter.compParams – FlowPy parameter generation and size back-mapping
#
# Provides :
# Fully automated, resumable Avalanche Scenario Model Chain execution
# under unified logging, path handling, timings, and config control.
#
# Author :
# Christoph Hesselbach
#
# Institution :
# Austrian Research Centre for Forests (BFW)
# Department of Natural Hazards | Snow and Avalanche Unit
#
# Date & Version :
# 2025-11 - 1.0
#
# ------------------------------------------------------------------------------- #
import os
import time
import logging
import configparser
import pathlib
from logging.handlers import MemoryHandler
# ------------------ AvaScenarioModelChain core imports ------------------ #
import runInitWorkDir as initWorkDir
import in1Utils.cfgUtils as cfgUtils
import in1Utils.workflowUtils as workflowUtils
import in1Utils.dataUtils as dataUtils
import in2Parameter.compParams as compParams
# ------------------ Component imports ----------------------------------- #
import com1PRA.praDelineation as praDelineation
import com1PRA.praSelection as praSelection
import com1PRA.praSubCatchments as subCatchments
import com1PRA.praProcessing as praProcessing
import com1PRA.praSegmentation as praSegmentation
import com1PRA.praAssignElevSize as praAssignElevSize
import com1PRA.praPrepForFlowPy as praPrepForFlowPy
import com1PRA.praMakeBigDataStructure as praMakeBigDataStructure
import com2AvaDirectory.avaDirBuildFromFlowPy as avaDirBuildFromFlowPy
import com2AvaDirectory.avaDirType as avaDirType
import com2AvaDirectory.avaDirResults as avaDirResults
# ------------------ AvaFrame interface ---------------------------------- #
from avaframe import runCom4FlowPy
# ------------------ Environment setup ----------------------------------- #
from in1Utils.cfgUtils import setupGdalEnv
setupGdalEnv(verbose=True)
log = logging.getLogger(__name__)
# ───────────────────────────────────────────────────────────────────────────────────────────────
# MAIN DRIVER FUNCTION
# ───────────────────────────────────────────────────────────────────────────────────────────────
def runAvaScenModelChain(workDir: str = "") -> bool:
# -------------------------------------------------------------------------
# Step 00: Initialization -------------------------------------------------
# -------------------------------------------------------------------------
modPath = os.getcwd()
localFile = os.path.join(modPath, "local_avaScenModelChainCfg.ini")
configPath = localFile if os.path.isfile(localFile) else os.path.join(modPath, "avaScenModelChainCfg.ini")
root_logger = logging.getLogger()
early_buf = MemoryHandler(capacity=10000, flushLevel=logging.CRITICAL)
root_logger.addHandler(early_buf)
# Log header (as before, single INFO entry)
log.info(
"\n\n"
" ===============================================================================\n"
f" ... Start main driver for AvaScenarioModelChain ({time.strftime('%Y-%m-%d %H:%M:%S')}) ...\n"
" ===============================================================================\n"
)
log.info("Config file: %s", os.path.abspath(configPath))
# --- Update config if workDir provided ---
if workDir:
cfgTmp = configparser.ConfigParser()
cfgTmp.read(configPath)
cfgTmp.setdefault("MAIN", {})
cfgTmp["MAIN"]["workDir"] = workDir
with open(configPath, "w") as f:
cfgTmp.write(f)
# --- Initialize work directory ---
cfgPreview = configparser.ConfigParser()
cfgPreview.read(configPath)
if "MAIN" not in cfgPreview:
log.error("Step 00: Config missing [MAIN] section.")
workflowUtils.closeEarlyBuffer(early_buf, root_logger)
return False
main = cfgPreview["MAIN"]
if not main.getboolean("initWorkDir", fallback=False):
log.info("Step 00: initWorkDir=False → no directories created.")
workflowUtils.closeEarlyBuffer(early_buf, root_logger)
return False
workFlowDir = initWorkDir.initWorkDir(configPath)
log.info("Step 00: Project initialized in %.2fs", time.perf_counter())
# --- Attach log file ---
log_dir = workFlowDir["cairosDir"]
log_path = os.path.join(log_dir, f"runAvaScenModelChain_{time.strftime('%Y%m%d_%H%M%S')}.log")
fh = logging.FileHandler(log_path, mode="w", encoding="utf-8")
fh.setLevel(logging.INFO)
fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
root_logger.addHandler(fh)
early_buf.setTarget(fh)
early_buf.flush()
workflowUtils.closeEarlyBuffer(early_buf, root_logger)
log.info("Step 00: Log file created at %s", os.path.relpath(log_path, start=log_dir))
# --- Load full config ---
cfg = cfgUtils.getConfig()
if "WORKFLOW" not in cfg:
log.error("Step 00: Missing [WORKFLOW] section in config.")
return False
workflowFlags = cfg["WORKFLOW"]
# --- Validate inputs ---
if not workflowUtils.validateInputs(cfg, workFlowDir):
return False
# --- Master flags ---
masterPra = workflowFlags.getboolean("runAllPRASteps", fallback=False)
masterFlowPy = workflowFlags.getboolean("runAllFlowPySteps", fallback=False)
masterAvaDir = workflowFlags.getboolean("runAllAvaDirSteps", fallback=False)
stepStats: dict[str, float] = {}
# --- Kickoff banner ---
log.info(
"All inputs complete: %s/00_input\n\n"
" ===============================================================================\n"
" ... LET'S KICK IT - AVALANCHE SCENARIOS in 3... 2... 1...\n"
" ===============================================================================\n",
workFlowDir["cairosDir"],
)
# ───────────────────────────────────────────────────────────────────────────────────────────
# Step 01–08: PRA Processing
# ───────────────────────────────────────────────────────────────────────────────────────────
praSteps = [
("01", "PRA delineation", praDelineation.runPraDelineation),
("02", "PRA selection", praSelection.runPraSelection),
("03", "Subcatchments", subCatchments.runSubcatchments),
("04", "PRA processing", praProcessing.runPraProcessing),
("05", "PRA segmentation", praSegmentation.runPraSegmentation),
("06", "PRA assign elevation & size", praAssignElevSize.runPraAssignElevSize),
("07", "PRA → FlowPy preparation", praPrepForFlowPy.runPraPrepForFlowPy),
("08", "Make Big Data Structure", praMakeBigDataStructure.runPraMakeBigDataStructure),
]
for stepKey, label, func in praSteps:
if not workflowUtils.runStep(stepKey, label, func, cfg, workFlowDir, stepStats, workflowFlags, masterPra):
return False
# ───────────────────────────────────────────────────────────────────────────────────────────
# Step 09–12: Avalanche intensity and runout modelling
# ───────────────────────────────────────────────────────────────────────────────────────────
# -------------------------------------------------------------------------
# Step 09: Size dependent parametrization
# -------------------------------------------------------------------------
avaDirs: list[pathlib.Path] = []
if workflowUtils.stepEnabled(workflowFlags, "flowPyInputToSize", masterFlowPy):
t9 = time.perf_counter()
log.info("Step 09: Start size-dependent FlowPy parameterization...")
try:
avaDirs = workflowUtils.discoverAvaDirs(cfg, workFlowDir)
avaDirs = workflowUtils.filterSingleTestDirs(cfg, avaDirs, "Step 09")
demName = cfg["MAIN"].get("DEM", "").strip()
demPath = pathlib.Path(workFlowDir["inputDir"]) / demName
if not demPath.exists():
log.error("Step 09: DEM missing at %s", demPath)
return False
for avaDir in avaDirs:
relLeaf = os.path.relpath(avaDir, workFlowDir["cairosDir"])
scen = avaDir.name.lower()
cfgSize = configparser.ConfigParser()
cfgSize["avaSIZE"] = dict(cfg["avaSIZE"])
sect = cfgSize["avaSIZE"]
size_parent = avaDir.parent.name.lower()
if size_parent.startswith("size"):
try:
sect["sizeMax"] = str(int(size_parent[4:]))
except ValueError:
pass
if scen in ("dry", "wet"):
sect["constantTemperature"] = "True"
sect["Tcons"] = sect.get("TCold" if scen == "dry" else "TWarm", sect.get("Tcons", "0"))
compParams.computeAndSaveParameters(
avaDir, cfg["avaPARAMETER"], sect, demOverride=demPath, compressFiles=False
)
log.info("Step 09: Parameterized ./%s (%s)", relLeaf, scen)
stepStats["Step 09"] = time.perf_counter() - t9
log.info("Step 09: Finished parameterization in %.2fs", stepStats["Step 09"])
except Exception:
log.exception("Step 09: Parameterization failed.")
return False
else:
log.info("Step 09: ...Size dependent parameterization skipped (flag is False)")
# -------------------------------------------------------------------------
# Step 10–12: FlowPy run & postprocessing (resume-aware)
# -------------------------------------------------------------------------
if workflowUtils.stepEnabled(workflowFlags, "flowPyRun", masterFlowPy):
t10 = time.perf_counter()
log.info("Step 10: Start FlowPy run...")
try:
# -----------------------------------------------------------------
# Discover and filter FlowPy leaves
# -----------------------------------------------------------------
avaDirs = workflowUtils.discoverAndFilterAvaDirs(cfg, workFlowDir, "Step 10")
# NEW: resumeFlowPyStep → skip leaves with existing Outputs/
avaDirs = workflowUtils.filterAlreadyCompletedLeaves(
cfg, avaDirs, workFlowDir, "Step 10"
)
# No remaining dirs after resume filtering?
if not avaDirs:
if workflowFlags.getboolean("resumeFlowPyStep", fallback=False):
log.info(
"Step 10: All FlowPy leaves already completed → nothing to run "
"(resumeFlowPyStep=True)."
)
return True
else:
log.error("Step 10: No FlowPy directories available; cannot continue.")
return False
# -----------------------------------------------------------------
# Optional post-processing flags
# -----------------------------------------------------------------
doSize = workflowUtils.stepEnabled(workflowFlags, "flowPyOutputToSize", masterFlowPy)
doCompress = workflowUtils.stepEnabled(workflowFlags, "flowPyOutputCompress", masterFlowPy)
delOG = workflowUtils.stepEnabled(workflowFlags, "flowPyDOutputDeleteOGFiles", masterFlowPy)
delTemp = workflowUtils.stepEnabled(workflowFlags, "flowPyDeleteTempFolder", masterFlowPy)
# -----------------------------------------------------------------
# Loop over FlowPy directories (resume-aware)
# -----------------------------------------------------------------
for avaDir in avaDirs:
relLeaf = os.path.relpath(avaDir, workFlowDir["cairosDir"])
log.info("Step 10: Running FlowPy for ./%s...", relLeaf)
t_leaf = time.perf_counter()
with workflowUtils.preserveLoggingForFlowPy():
runCom4FlowPy.main(avalancheDir=str(avaDir))
log.info(
"Step 10: FlowPy run finished for ./%s in %.2fs",
relLeaf, time.perf_counter() - t_leaf
)
# -----------------------------------------------------------------
# Step 11: Optional back-map
# -----------------------------------------------------------------
if doSize:
try:
log.info("Step 11: Back-map FlowPy output to size for ./%s", relLeaf)
compParams.computeAndSaveSize(pathlib.Path(avaDir), cfg["avaSIZE"])
except Exception:
log.exception("Step 11: Results → size failed for ./%s", relLeaf)
return False
# -----------------------------------------------------------------
# Step 12: Compression / cleanup
# -----------------------------------------------------------------
if doCompress:
try:
outDir = pathlib.Path(avaDir) / "Outputs"
log.info("Step 12: Compress outputs for ./%s", relLeaf)
dataUtils.tifCompress(outDir, delete_original=delOG)
except Exception:
log.exception("Step 12: Compression failed for ./%s", relLeaf)
return False
if delTemp:
try:
log.info("Step 12: Delete temporary data for ./%s", relLeaf)
dataUtils.deleteTempFolder(pathlib.Path(avaDir))
except Exception:
log.exception("Step 12: Delete temp data failed for ./%s", relLeaf)
return False
# -----------------------------------------------------------------
# Final timing + log
# -----------------------------------------------------------------
stepStats["Step 10"] = time.perf_counter() - t10
log.info("Step 10–12: FlowPy + postprocessing completed in %.2fs", stepStats["Step 10"])
except Exception:
log.exception("Step 10–12: FlowPy processing failed.")
return False
else:
log.info("Step 10: ...FlowPy run skipped (flag is False)")
# ───────────────────────────────────────────────────────────────────────────────────────────
# Step 13–15: Avalanche Directory (Type and Result) Builder
# ───────────────────────────────────────────────────────────────────────────────────────────
# -------------------------------------------------------------------------
# Step 13: Avalanche Directory Build from FlowPy
# -------------------------------------------------------------------------
t13 = time.perf_counter()
if not workflowUtils.stepEnabled(workflowFlags, "avaDirBuildFromFlowPy", masterAvaDir):
log.info("Step 13: ...Avalanche Directory Build from FlowPy skipped (flag is False)")
else:
log.info("Step 13: Start Avalanche Directory Build from FlowPy...")
try:
avaDirBuildFromFlowPy.runAvaDirBuildFromFlowPy(cfg, workFlowDir)
stepStats["Step 13"] = time.perf_counter() - t13
log.info("Step 13: Avalanche Directory Build from FlowPy finished successfully in %.2fs",
stepStats["Step 13"])
except Exception:
log.exception("Step 13: Avalanche Directory Build from FlowPy failed.")
return False
# -------------------------------------------------------------------------
# Step 14: Avalanche Directory Type
# -------------------------------------------------------------------------
t14 = time.perf_counter()
if not workflowUtils.stepEnabled(workflowFlags, "avaDirType", masterAvaDir):
log.info("Step 14: ...Avalanche Directory Type skipped (flag is False)")
else:
log.info("Step 14: Start Avalanche Directory Type...")
try:
avaDirType.runAvaDirType(cfg, workFlowDir)
stepStats["Step 14"] = time.perf_counter() - t14
log.info("Step 14: Avalanche Directory Type finished successfully in %.2fs",
stepStats["Step 14"])
except Exception:
log.exception("Step 14: Avalanche Directory Type failed.")
return False
# -------------------------------------------------------------------------
# Step 15: Avalanche Directory Results
# -------------------------------------------------------------------------
t15 = time.perf_counter()
if not workflowUtils.stepEnabled(workflowFlags, "avaDirResults", masterAvaDir):
log.info("Step 15: ...Avalanche Directory Results skipped (flag is False)")
else:
log.info("Step 15: Start Avalanche Directory Results Build...")
try:
avaDirResults.runAvaDirResults(cfg, workFlowDir)
stepStats["Step 15"] = time.perf_counter() - t15
log.info("Step 15: Avalanche Directory Results finished successfully in %.2fs",
stepStats["Step 15"])
except Exception:
log.exception("Step 15: Avalanche Directory Results failed.")
return False
# ───────────────────────────────────────────────────────────────────────────────────────────
# Step 00–15: FINAL SUMMARY
# ───────────────────────────────────────────────────────────────────────────────────────────
total = sum(stepStats.values())
log.info("\n\nAvaScenarioModelChain Summary...\n")
for s, dur in stepStats.items():
log.info("%-12s ✅ %.2fs", s, dur)
log.info("Total runtime: %.2fs", total)
return True
# ───────────────────────────────────────────────────────────────────────────────────────────────
# MAIN RUNNER
# ───────────────────────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING, format="%(levelname)s:%(name)s: %(message)s")
# Enable informative logging for key AvaScenarioModelChain modules
for name in [
"__main__",
"runAvaScenModelChain",
"runInitWorkDir",
"in1Utils.workflowUtils",
"com2AvaDirectory.avaDirBuildFromFlowPy",
"in2Parameter",
"in2Parameter.compParams",
]:
logging.getLogger(name).setLevel(logging.INFO)
# Silence noisy AvaFrame internals
for name in [
"in2Parameter.sizeParameters",
"avaframe.com4FlowPy.splitAndMerge",
"in1Utils.cfgUtils",
"avaframe.in3Utils.cfgUtils",
"avaframe.com4FlowPy.cfgUtils",
]:
logging.getLogger(name).setLevel(logging.WARNING)
t_all = time.perf_counter()
success = runAvaScenModelChain()
if success:
log.info(
"\n\n ===============================================================================\n"
" ... AvaScenarioModelChain WORKFLOW DONE - completed in %.2fs ...\n"
" ===============================================================================\n",
time.perf_counter() - t_all,
)