diff --git a/environment/environment.yml b/environment/environment.yml index 3de64b3..a1bd8a7 100644 --- a/environment/environment.yml +++ b/environment/environment.yml @@ -2,34 +2,31 @@ name: pyciam channels: - conda-forge dependencies: - - bottleneck=1.3 - - bokeh=2.4.3 # for use of dask dashboard - - cartopy=0.21.1 - - cloudpathlib=0.13 - - distributed=2023.3.1 - - flox=0.6.8 - - fsspec=2023.3.0 - - geopandas=0.12.2 - - gitpython=3.1.31 # unmarked dependency of rhg_compute_tools - - jupyterlab=3.6.1 - - matplotlib=3.7.1 - - numpy=1.24.2 - - numpy_groupies=0.9.20 - - oct2py=5.6.0 - - octave=7.3.0 - - openpyxl=3.1.1 - - pandas=1.5.3 - - papermill=2.3.4 - - pint-xarray=0.3 - - pip=23.0.1 - - python=3.11.0 - - requests=2.28.2 - - scikit-learn=1.2.2 - - scipy=1.10.1 - - tqdm=4.65.0 - - xarray=2023.2.0 - - zarr=2.14.2 + - bottleneck + - bokeh # for use of dask dashboard + - cartopy + - cloudpathlib + - distributed + - flox + - fsspec + - geopandas + - gitpython # unmarked dependency of rhg_compute_tools + - jupyterlab + - matplotlib + - numpy + - openpyxl + - pandas + - papermill + - pint-xarray + - pip + - python=3.14 + - requests + - scikit-learn + - scipy + - tqdm + - xarray + - zarr - pip: - - python-ciam==1.1 - - rhg_compute_tools==1.3 - - parameterize_jobs==0.1 \ No newline at end of file + - python-ciam + - rhg_compute_tools + - parameterize_jobs \ No newline at end of file diff --git a/pyCIAM/io.py b/pyCIAM/io.py index bd211ac..d688943 100644 --- a/pyCIAM/io.py +++ b/pyCIAM/io.py @@ -22,7 +22,7 @@ from fsspec import FSTimeoutError from fsspec.implementations.zip import ZipFileSystem -from pyCIAM.utils import copy +from pyCIAM.utils import _get_exp_year, copy from pyCIAM.utils import spherical_nearest_neighbor as snn from .utils import _s2d @@ -36,7 +36,7 @@ def prep_sliiders( selectors={}, calc_popdens_with_wetland_area=True, expand_exposure=True, - storage_options={}, + storage_options=None, ): """Import the SLIIDERS dataset (or a different dataset formatted analogously). @@ -102,7 +102,10 @@ def prep_sliiders( """ inputs_all = xr.open_zarr( str(input_store), chunks=None, storage_options=storage_options - ).sel(selectors, drop=True) + ) + inputs_all = inputs_all.sel( + {k: v for k, v in selectors.items() if k in inputs_all.dims}, drop=True + ) inputs = inputs_all.sel({seg_var: seg_vals}) inputs = _s2d(inputs).assign(constants) @@ -124,19 +127,16 @@ def prep_sliiders( * (inputs.ypcc / ref_income) ** inputs.vsl_inc_elast ) - if expand_exposure and "pop" not in inputs.data_vars: - exp_year = [ - v for v in inputs.data_vars if v.startswith("pop_") and "scale" not in v - ] - assert len(exp_year) == 1, exp_year - exp_year = exp_year[0].split("_")[1] - pop_var = "pop_" + exp_year - inputs["pop"] = inputs[pop_var] * inputs.pop_scale - inputs = inputs.drop(pop_var) - if expand_exposure and "K" not in inputs.data_vars: - K_var = "K_" + exp_year - inputs["K"] = inputs[K_var] * inputs.K_scale - inputs = inputs.drop(K_var) + if expand_exposure: + exp_year = _get_exp_year(inputs) + if "pop" not in inputs.data_vars: + pop_var = f"pop_{exp_year}" + inputs["pop"] = inputs[pop_var] * inputs.pop_scale + inputs = inputs.drop(pop_var) + if "K" not in inputs.data_vars: + K_var = f"K_{exp_year}" + inputs["K"] = inputs[K_var] * inputs.K_scale + inputs = inputs.drop(K_var) if "dfact" not in inputs.data_vars and "npv_start" in inputs.data_vars: inputs["dfact"] = (1 / (1 + inputs.dr)) ** (inputs.year - inputs.npv_start) @@ -196,30 +196,28 @@ def _load_scenario_mc( include_cc=True, quantiles=None, ncc_name="ncc", - storage_options={}, + storage_options=None, ): scen_mc_filter = xr.open_zarr( str(slr_store), chunks=None, storage_options=storage_options )[["scenario", mc_dim]] if quantiles is not None: if mc_dim == "quantile": - scen_mc_filter = scen_mc_filter.sel(quantile=quantiles) + scen_mc_filter = scen_mc_filter.sel(quantile=quantiles).sortby( + ["scenario", "quantile"] + ) else: - scen_mc_filter = scen_mc_filter.quantile(quantiles, dim=mc_dim) - - scen_mc_filter = ( - scen_mc_filter.to_dataframe().sort_values(["scenario", mc_dim]).index - ) + scen_mc_filter = scen_mc_filter.scenario.sortby("scenario") + scen_mc_filter = scen_mc_filter.to_dataframe().index if include_ncc: - scen_mc_filter = scen_mc_filter.append( - pd.MultiIndex.from_product( - ( - [ncc_name], - scen_mc_filter.get_level_values(mc_dim).unique().sort_values(), - ), - names=["scenario", mc_dim], - ) + scen_mc_filter = scen_mc_filter.union( + pd.DataFrame(index=scen_mc_filter) + .reset_index() + .assign(scenario=ncc_name) + .drop_duplicates() + .set_index(scen_mc_filter.names) + .index ) if not include_cc: @@ -239,9 +237,10 @@ def _load_lslr_for_ciam( mc_dim="mc_sample_id", lsl_var="lsl_msl05", lsl_ncc_var="lsl_ncc_msl05", + site_id_dim="site_id", ncc_name="ncc", slr_0_year=2005, - storage_options={}, + storage_options=None, quantiles=None, ): if scen_mc_filter is None: @@ -254,9 +253,16 @@ def _load_lslr_for_ciam( quantiles=quantiles, ncc_name=ncc_name, ) + elif isinstance(scen_mc_filter, pd.MultiIndex): + scen_mc_filter = scen_mc_filter.reorder_levels(("scenario", mc_dim)) wcc = scen_mc_filter.get_level_values("scenario") != ncc_name - scen_mc_ncc = scen_mc_filter[~wcc].droplevel("scenario").values + + # it's possible that the scen_mc_filter only filters scenarios not monte carlos + if mc_dim in scen_mc_filter.names: + scen_mc_ncc = scen_mc_filter[~wcc].droplevel("scenario").values + else: + scen_mc_ncc = None scen_mc_xr_wcc = ( scen_mc_filter[wcc] .to_frame() @@ -269,45 +275,56 @@ def _load_lslr_for_ciam( # select the nearest SLR locations to the passed locations slr = _s2d( - slr.sel(site_id=get_nearest_slrs(slr, lonlats).to_xarray()).drop("site_id") + slr.sel({site_id_dim: get_nearest_slrs(slr, lonlats).to_xarray()}).drop( + site_id_dim + ) ).drop(["lat", "lon"], errors="ignore") # select only the scenarios we wish to model if len(scen_mc_xr_wcc.scen_mc): slr_out = ( slr[lsl_var] - .sel({"scenario": scen_mc_xr_wcc.scenario, mc_dim: scen_mc_xr_wcc[mc_dim]}) - .set_index(scen_mc=["scenario", mc_dim]) + .sel({k: scen_mc_xr_wcc[k] for k in scen_mc_xr_wcc.data_vars}) + .set_index(scen_mc=list(scen_mc_xr_wcc.data_vars.keys())) ) else: slr_out = xr.DataArray( [], dims=("scen_mc",), coords={ - "scen_mc": pd.MultiIndex.from_tuples([], names=["scenario", mc_dim]) + "scen_mc": pd.MultiIndex.from_tuples( + [], names=list(scen_mc_xr_wcc.data_vars.keys()) + ) + if mc_dim in scen_mc_xr_wcc.data_vars + else pd.Index([], name="scen_mc") }, ) - if len(scen_mc_ncc): - slr_ncc = ( - slr[lsl_ncc_var] - .sel({mc_dim: scen_mc_ncc}) - .expand_dims(scenario=[ncc_name]) - .stack(scen_mc=["scenario", mc_dim]) - ) - slr_out = xr.concat((slr_out, slr_ncc), dim="scen_mc").sel( - scen_mc=scen_mc_filter + if include_ncc: + slr_ncc = slr[lsl_ncc_var] + stack_dims = ["scenario"] + if scen_mc_ncc is not None: + slr_ncc = slr_ncc.sel({mc_dim: scen_mc_ncc}) + stack_dims.append(mc_dim) + + slr_ncc = slr_ncc.expand_dims(scenario=[ncc_name]) + if len(stack_dims) > 1: + slr_ncc = slr_ncc.stack(scen_mc=stack_dims) + else: + slr_ncc = slr_ncc.rename({stack_dims[0]: "scen_mc"}) + slr_out = xr.concat( + (slr_out, slr_ncc), dim="scen_mc", combine_attrs="no_conflicts" + ).sel(scen_mc=scen_mc_filter) + + if quantiles is not None and mc_dim != "quantile": + slr_out = slr_out.quantile(quantiles, dim=mc_dim) + slr_out = slr_out.rename(scen_mc="scenario").stack( + scen_mc=["scenario", "quantile"] ) + # convert to meters if "units" in slr_out.attrs: - ix_names = slr_out.indexes["scen_mc"].names - # hack to avoid pint destroying multi-indexed coords - slr_out = ( - slr_out.pint.quantify() - .pint.to("meters") - .pint.dequantify() - .set_index(scen_mc=ix_names) - ) + slr_out = slr_out.pint.quantify().pint.to("meters").pint.dequantify() # add on base year where slr is 0 slr_out = slr_out.reindex( @@ -319,6 +336,16 @@ def _load_lslr_for_ciam( if interp_years is not None: slr_out = slr_out.interp(year=interp_years) + # use string concatenated variable instead of multiindex to facilitate zarr + # compression + if "scen_mc" in slr_out.coords and isinstance( + slr_out.indexes["scen_mc"], pd.MultiIndex + ): + slr_out = slr_out.reset_index("scen_mc").assign_coords( + scen_mc=slr_out.scenario.values + + "_" + + slr_out[mc_dim if quantiles is None else "quantile"].values.astype(str) + ) return slr_out @@ -350,14 +377,19 @@ def create_template_dataarray(dims, coords, chunks, dtype="float32", name=None): An empty dask-backed DataArray. """ lens = {k: len(v) for k, v in coords.items()} - return xr.DataArray( - da.empty( - [lens[k] for k in dims], chunks=[chunks[k] for k in dims], dtype=dtype - ), + out = xr.DataArray( + da.empty([lens[k] for k in dims], chunks=[-1] * len(dims), dtype=dtype), dims=dims, coords={k: v for k, v in coords.items() if k in dims}, name=name, ) + out.encoding["chunks"] = [chunks[k] for k in dims] + if np.issubdtype(np.dtype(dtype), np.integer): + fill_value = np.iinfo(dtype).max + else: + fill_value = "NaN" + out.encoding["fill_value"] = fill_value + return out def create_template_dataset(var_dims, coords, chunks, dtypes): @@ -403,7 +435,7 @@ def check_finished_zarr_workflow( varname=None, final_selector={}, mask=None, - storage_options={}, + storage_options=None, ): """Check if a workflow that writes to a particular region of a zarr store has already run. This is useful when running pyCIAM in "probabilistic" mode across a @@ -487,80 +519,6 @@ def check_finished_zarr_workflow( return finished -def save_to_zarr_region(ds_in, store, already_aligned=False, storage_options={}): - """Wrapper around :py:method:`xarray.Dataset.to_zarr` when specifying the `region` - kwarg. This function allows you to avoid boilerplate to figure out the integer slice - objects needed to pass as `region` when calling `:py:meth:xarray.Dataset.to_zarr`. - - Parameters - ---------- - ds_in : :py:class:`xarray.Dataset` or :py:class:`xarray.DataArray` - Dataset or DataArray to save to a specific region of a Zarr store - store : Path-like - Path to Zarr store - already_aligned : bool, default False - If True, assume that the coordinates of `ds_in` are already ordered the same - way as those of `store`. May save some computation, but will miss-attribute - values to coordinates if set to True when coords are not aligned. - storage_options : dict, optional - Passed to :py:function:`xarray.open_zarr` - - Returns - ------- - None : - No return value but `ds_in` is saved to the appropriate region of `store`. - - Raises - ------ - ValueError - If `ds_in` is an unnamed DataArray and `store` has more than one variable. - AssertionError - If any coordinate values of `ds_in` are not contiguous within `store`. - """ - ds_out = xr.open_zarr(str(store), chunks=None, storage_options=storage_options) - - # convert dataarray to dataset if needed - if isinstance(ds_in, xr.DataArray): - if ds_in.name is not None: - ds_in = ds_in.to_dataset() - else: - if len(ds_out.data_vars) != 1: - raise ValueError( - "``ds_in`` is an unnamed DataArray and ``store`` has more than one " - "variable." - ) - ds_in = ds_in.to_dataset(name=list(ds_out.data_vars)[0]) - - # align - for v in ds_in.data_vars: - ds_in[v] = ds_in[v].transpose(*ds_out[v].dims).astype(ds_out[v].dtype) - - # find appropriate regions - alignment_dims = {} - regions = {} - for r in ds_in.dims: - if len(ds_in[r]) == len(ds_out[r]): - alignment_dims[r] = ds_out[r].values - continue - alignment_dims[r] = [v for v in ds_out[r].values if v in ds_in[r].values] - valid_ixs = np.arange(len(ds_out[r]))[ds_out[r].isin(alignment_dims[r]).values] - n_valid = len(valid_ixs) - st = valid_ixs[0] - end = valid_ixs[-1] - assert end - st == n_valid - 1, ( - f"Indices are not continuous along dimension {r}" - ) - regions[r] = slice(st, end + 1) - - # align coords - if not already_aligned: - ds_in = ds_in.sel(alignment_dims) - - ds_in.drop_vars(ds_in.coords).to_zarr( - str(store), region=regions, storage_options=storage_options - ) - - def get_nearest_slrs(slr_ds, lonlats, x1="seg_lon", y1="seg_lat"): unique_lonlats = lonlats[[x1, y1]].drop_duplicates() slr_lonlat = slr_ds[["lon", "lat"]].to_dataframe() @@ -585,9 +543,11 @@ def load_ciam_inputs( input_store, slr_store, params, - seg_vals, + selectors={}, slr_names=None, seg_var="seg", + lsl_var="lsl_msl05", + slr_site_id_dim="site_id", surge_lookup_store=None, ssp=None, iam=None, @@ -596,7 +556,7 @@ def load_ciam_inputs( include_cc=True, mc_dim="mc_sample_id", quantiles=None, - storage_options={}, + storage_options=None, ): """Load, process, and format all inputs needed to run pyCIAM. @@ -611,9 +571,8 @@ def load_ciam_inputs( params : dict Dictionary of model parameters, typically loaded from a JSON file. See :file:`../params.json` for an example of the required parameters. - seg_vals : list of str - Defines the subset of regions (along dimension `seg_var`) that the function - will prep. Subsets are used to run CIAM in parallel. + selectors : list of str + Defines the subset of regions (along dimension `seg_var`) and/or scenario that the function will prep. Subsets are used to run CIAM in parallel. slr_names : list of str, optional If `slr_store` is a list of multiple SLR datasets, this must be a list of the same length providing names for each SLR dataset. This is used as a suffix for @@ -623,6 +582,10 @@ def load_ciam_inputs( seg_var : str, default "seg_var" The name of the dimension in `input_store` along which the function will subset using `seg_vals` + lsl_var : str, default "lsl_msl05" + The name of the variable in ``slr_store`` containing local SLR values + slr_site_id_dim : str, default "site_id" + The name of the location dimension in ``slr_store``. surge_lookup_store : Path-like, optional If not None, will also load and process data from an ESL impacts lookup table (see `lookup.create_surge_lookup`). If included in a call to @@ -681,14 +644,14 @@ def load_ciam_inputs( If `ssp` or `iam` is specified and the corresponding variables are not present in the Zarr store located at `input_store`. """ - selectors = {"year": slice(params.model_start, None)} + selectors = {"year": slice(params.model_start, None), **selectors} if ssp is not None: selectors["ssp"] = ssp if iam is not None: selectors["iam"] = iam inputs = prep_sliiders( input_store, - seg_vals, + selectors[seg_var], # dropping the "refA_scenario_selectors" b/c this doesn't need to be added to # the input dataset object constants=params[params.map(type) != dict].to_dict(), # noqa: E721 @@ -707,7 +670,7 @@ def load_ciam_inputs( xr.open_zarr( str(surge_lookup_store), chunks=None, storage_options=storage_options ) - .sel({seg_var: seg_vals}) + .sel({seg_var: selectors[seg_var]}) .load() ) if seg_var != "seg": @@ -716,7 +679,7 @@ def load_ciam_inputs( surge = None # get SLR - if not isinstance(slr_store, (list, np.ndarray, tuple, set)): + if not pd.api.types.is_list_like(slr_store): slr_store = [slr_store] ncc_names = ["ncc"] else: @@ -733,8 +696,10 @@ def load_ciam_inputs( include_ncc=include_ncc, include_cc=include_cc, ncc_name=ncc_names[sx], + lsl_var=lsl_var, mc_dim=mc_dim, quantiles=quantiles, + site_id_dim=slr_site_id_dim, storage_options=storage_options, ) for sx, s in enumerate(slr_store) @@ -742,11 +707,18 @@ def load_ciam_inputs( dim="scen_mc", ) + slr = slr.sel({k: v for k, v in selectors.items() if k in slr.dims}) + return inputs, slr, surge def load_diaz_inputs( - input_store, seg_vals, params, include_ncc=True, include_cc=True, storage_options={} + input_store, + seg_vals, + params, + include_ncc=True, + include_cc=True, + storage_options=None, ): """Load the original inputs used in Diaz 2016. diff --git a/pyCIAM/run.py b/pyCIAM/run.py index 861d438..c492474 100644 --- a/pyCIAM/run.py +++ b/pyCIAM/run.py @@ -8,15 +8,21 @@ * execute_pyciam """ +import warnings from collections import OrderedDict +from math import ceil from shutil import rmtree +from time import sleep +import dask.config import numpy as np import pandas as pd import xarray as xr from cloudpathlib import AnyPath, CloudPath -from distributed import Client -from rhg_compute_tools.xarray import dataarray_from_delayed +from distributed import Client, as_completed +from distributed.client import FutureCancelledError +from numpy.dtypes import StringDType +from rhg_compute_tools.xarray import dataset_from_delayed from pyCIAM.constants import CASE_DICT, CASES, COSTTYPES, PLIST, RLIST, SOLVCASES from pyCIAM.io import ( @@ -24,7 +30,6 @@ create_template_dataarray, load_ciam_inputs, load_diaz_inputs, - save_to_zarr_region, ) from pyCIAM.surge import damage_funcs, lookup from pyCIAM.surge._calc import ( @@ -264,15 +269,16 @@ def calc_costs( sigma = xr.concat((sigma_r, sigma_p), dim="adapttype") surge_cap = sigma / tot_landarea surge_pop = surge_cap * 0.01 # floodmortality from diaz + surge_dim = xr.DataArray(["stormCapital", "stormPopulation"], dims=["costtype"]) surge = xr.concat( (surge_cap, surge_pop), - dim=pd.Index(["stormCapital", "stormPopulation"], name="costtype"), + dim=surge_dim, ).to_dataset("costtype") surge_cap_noadapt = sigma_noadapt / tot_landarea surge_pop_noadapt = surge_cap_noadapt * 0.01 # floodmortality from diaz surge_noadapt = xr.concat( (surge_cap_noadapt, surge_pop_noadapt), - dim=pd.Index(["stormCapital", "stormPopulation"], name="costtype"), + dim=surge_dim, ).to_dataset("costtype") # multiply fractional losses by seg-ir-level total capital and population @@ -323,7 +329,7 @@ def calc_costs( ) ) surge = xr.concat( - surge, dim=pd.Index(["retreat", "protect"], name="adapttype") + surge, dim=xr.DataArray(["retreat", "protect"], dims=["adapttype"]) ) else: # Interpolate to the SLR and rh_diff values for all scenario/mc/year/iam/ssp @@ -783,7 +789,8 @@ def caseify(ds, prefix): "protection": xr.zeros_like(abandonment), } ), - ) + ), + compat="no_conflicts", ).to_array("costtype") del abandonment, relocation_r, wetland_r_noadapt @@ -800,7 +807,8 @@ def caseify(ds, prefix): "protection": protection, } ), - ) + ), + compat="no_conflicts", ).to_array("costtype") del protection, wetland_p @@ -850,11 +858,11 @@ def caseify(ds, prefix): def select_optimal_case( all_case_cost_path, - region, - seg_regions, + selectors, + econ_input_path, eps=1, region_var="seg_adm", - storage_options={}, + storage_options=None, ): """Calculate the least-cost adaptation path for a given `region`, which is nested within a given coastal segment. All regions within a segment must take the same @@ -865,11 +873,11 @@ def select_optimal_case( all_case_cost_path : Path-like Path to Zarr store that contains ``costs`` and ``npv`` variables for each adaptation choice for all regions. - region : str - Name of region that you will calculate optimal case for - seg_regions : list of str - Names of all regions within this segment. NPV across all regions will be summed - to calculate the segment-level least-cost adaptation choice. + selectors : dict[str, Any] + Selectors for the output dataset, which must include ``region_var`` and may + include others if you are running on subsets of the full set of SLR scenarios. + econ_input_path : Path-like + Path to the econ input dataset for this pyCIAM run (e.g. SLIIDERS) eps : int, default 1 Dollars of NPV to shave off of noAdaptation npv when choosing optimal case, in order to avoid floating point noise driving decision for some regions. Probably @@ -886,13 +894,30 @@ def select_optimal_case( ``optimalfixed``, which represents the optimal adaptation choice for this region for each socioeconomic and SLR trajectory. """ + with xr.open_zarr( + str(econ_input_path), storage_options=storage_options, chunks=None + ) as ds: + all_segs = ds.seg.load() + all_seg_vars = ds[region_var].load() + + segadm_seg_map = all_segs.sel({region_var: selectors[region_var]}) + all_seg_adms = all_seg_vars.isel( + {region_var: all_segs.isin(np.unique(segadm_seg_map))} + ) + opt_case = ( xr.open_zarr( str(all_case_cost_path), chunks=None, storage_options=storage_options ) - .npv.sel({region_var: seg_regions}) + .npv.sel( + { + region_var: all_seg_adms.values, + **{k: v for k, v in selectors.items() if k != region_var}, + } + ) .drop_sel(case="optimalfixed") - .sum(region_var) + .groupby(all_seg_adms.seg) + .sum() ) # in case of a tie, we don't want floating point precision noise to determine the @@ -909,21 +934,403 @@ def select_optimal_case( ) .to_xarray() .astype("uint8") + .sel(seg=segadm_seg_map) ) + # get opt_case back into seg-region dimension + opt_case = opt_case.sel(seg=segadm_seg_map) + out = ( xr.open_zarr( str(all_case_cost_path), chunks=None, storage_options=storage_options )[["costs", "npv"]] - .sel({region_var: [region]}) + .sel(selectors) .sel(case=opt_case) .drop("case") .expand_dims(case=["optimalfixed"]) ) - out["optimal_case"] = opt_val.expand_dims({region_var: [region]}) + out["optimal_case"] = opt_val return out +def _get_selectors( + ciam_in, seg_adm_var, seg_chunksize, other_chunksizes, out_ds, client_batchsize +): + seg_vars = ciam_in[seg_adm_var].values + seg_groups = [ + slice(seg_vars[i], seg_vars[min(i + seg_chunksize - 1, len(seg_vars) - 1)]) + for i in range(0, len(seg_vars), seg_chunksize) + ] + + # add on groupings by any other dimensions than seg_var + names = [seg_adm_var] + ixs = [seg_groups] + for c, s in other_chunksizes.items(): + this_dim = out_ds[c].values + ixs.append( + [ + slice(this_dim[i], this_dim[min(i + s - 1, len(this_dim) - 1)]) + for i in range(0, len(this_dim), s) + ] + ) + names.append(c) + master_ix = pd.MultiIndex.from_product(ixs, names=names) + dimnames = master_ix.names + dimnum = range(len(dimnames)) + selectors = master_ix.map(lambda x: {dimnames[k]: x[k] for k in dimnum}).values + + # compute in batches so that dask scheduler doesn't get overwhelmed with 1M+ tasks + # in case of large workflow + n_batches = ceil(len(selectors) / client_batchsize) + selector_groups = np.array_split(selectors, n_batches) + ix_groups = np.array_split(master_ix, n_batches) + + return selector_groups, ix_groups + + +def _check_finished( + other_selectors, output_path, case_selector=CASES[:-1], storage_options=None +): + return check_finished_zarr_workflow( + finalstore=output_path, + varname="costs", + final_selector={"case": case_selector, **other_selectors}, + storage_options=storage_options, + ) + + +def _check_finished_full_chunk( + selector_group, + output_path, + client, + case_selector=CASES[:-1], + storage_options=None, +): + kwargs = dict( + output_path=output_path, + case_selector=case_selector, + storage_options=storage_options, + ) + is_finished = pd.Series( + selector_group, + index=client.map(_check_finished, selector_group, **kwargs), + name="selectors", + ) + to_run = [] + + while len(is_finished): + sleep(5) + status = np.array([fut.status for fut in is_finished.index]) + all_futs = is_finished.index[status == "finished"] + cancelled = is_finished[~np.isin(status, ["finished", "pending", "processing"])] + if len(cancelled): + bad_spec = cancelled.tolist() + new_futs = client.map(_check_finished, bad_spec, **kwargs) + is_finished = pd.concat( + ( + is_finished.drop(cancelled.index), + pd.Series( + bad_spec, + index=pd.Index(new_futs, name=is_finished.index.name), + name=is_finished.name, + ), + ) + ) + if not len(all_futs): + continue + try: + finished = client.gather(all_futs.tolist()) + except FutureCancelledError: + continue + + unfinished = ~np.array(finished) + if unfinished.sum(): + unfinished_futs = all_futs[unfinished] + to_run += is_finished.loc[unfinished_futs].tolist() + del unfinished_futs + + is_finished = is_finished.drop(all_futs) + del all_futs + return to_run + + +def _check_completed_fut_batch(batch, futs_ser, client, retry_func, **retry_kwargs): + status = np.array([fut.status for fut in batch]) + errored = status == "error" + cancelled = status == "cancelled" + arr_batch = np.array(batch) + if sum(errored): + bad_futs = arr_batch[errored] + bad_spec = futs_ser.loc[bad_futs] + try: + bad_futs[0].result() + except Exception: + warnings.warn(f"Error in spec: {bad_spec}") + raise + if sum(cancelled): + bad_futs = arr_batch[cancelled] + bad_spec = futs_ser.loc[bad_futs].tolist() + new_futs = client.map(retry_func, bad_spec, **retry_kwargs) + futs_ser = pd.concat( + ( + futs_ser.drop(bad_futs), + pd.Series( + bad_spec, + index=pd.Index(new_futs, name=futs_ser.index.name), + name=futs_ser.name, + ), + ) + ) + batch = arr_batch[~cancelled] + return futs_ser.drop(batch) + + +def _aggregate_costs_to_adm( + adms, + adm_groups, + adm_var, + seg_adm_var, + mc_dim, + output_path, + tmp_output_path, + postprocess, + storage_options=None, +): + if check_finished_zarr_workflow( + finalstore=output_path, + varname="costs", + final_selector={adm_var: adms}, + storage_options=storage_options, + ): + return None + out = [] + all_adm_groups = np.concatenate(adm_groups.loc[adms].values) + dims = xr.open_zarr(str(tmp_output_path), chunks=None).dims + full_input = postprocess( + xr.open_zarr( + str(tmp_output_path), + chunks={ + d: -1 + for d in dims + if d not in [seg_adm_var, "scenario", mc_dim, "scen_mc"] + }, + ).sel({seg_adm_var: all_adm_groups}) + ) + # aggregating costs and npv, accounting for whether postprocess func dropped either + full_input = full_input[[c for c in ["costs", "npv"] if c in full_input.data_vars]] + for adm in adms: + this_out = full_input.sel({seg_adm_var: adm_groups.loc[adm]}) + ix_start = np.concatenate([[0], np.cumsum(this_out.chunksizes[seg_adm_var])]) + this_sum = 0 + with dask.config.set(scheduler="single-threaded"): + for ix in range(len(ix_start) - 1): + this_sum += ( + this_out.isel( + {seg_adm_var: slice(ix_start[ix], ix_start[ix + 1])}, drop=True + ) + .load() + .sum(seg_adm_var) + ) + out.append(this_sum) + out = xr.concat(out, dim=xr.DataArray(adms, dims=[adm_var])).drop_encoding() + if out.to_array().isnull().any(): + raise ValueError("Null values found.") + out.drop_encoding().to_zarr(str(output_path), region="auto") + + +def _aggregate_optimal_case_to_seg( + segs, + seg_map, + seg_adm_var, + output_path, + tmp_output_path, + postprocess, + storage_options=None, +): + if check_finished_zarr_workflow( + finalstore=output_path, + varname="optimal_case", + final_selector={"seg": segs}, + storage_options=storage_options, + ): + return None + all_seg_adms = seg_map.sel(seg=segs) + dims = xr.open_zarr(str(tmp_output_path), chunks=None).dims + out = postprocess( + xr.open_zarr( + str(tmp_output_path), chunks={d: -1 for d in dims if d != seg_adm_var} + ) + .sel({seg_adm_var: all_seg_adms}) + .drop_vars(seg_adm_var) + )[["optimal_case"]] + with dask.config.set(scheduler="single-threaded"): + out = out.load() + if out.to_array().isnull().any(): + raise ValueError("Null values found.") + out.drop_encoding().to_zarr(str(output_path), region="auto") + + +def _get_adm_groups(econ_input_path, adm_var, seg_adm_var, adms_unique): + return ( + xr.open_zarr(str(econ_input_path), chunks=None)[adm_var] + .to_series() + .reset_index() + .set_index(adm_var)[seg_adm_var] + .groupby(adm_var) + .apply(list) + .loc[adms_unique] + ) + + +def _get_seg_map(econ_input_path, seg_adm_var, segs_unique): + return ( + xr.open_zarr(str(econ_input_path), chunks=None) + .seg.to_series() + .reset_index() + .set_index("seg")[seg_adm_var] + .groupby("seg") + .first() + .loc[segs_unique] + .to_xarray() + ) + + +def _aggregate_results( + tmppath, + outpath, + econ_input_path, + seg_var, + adm_var, + mc_dim, + client, + postprocess_func=None, + scen_mc_filter=None, + overwrite=False, + storage_options=None, +): + def postprocess(ds): + if postprocess_func is None: + return ds + return postprocess_func(ds) + + # final output will be aggregated across segments within each adm region. We first + # create the appropriate template for a single IR and then later expand dims + template = postprocess( + xr.open_zarr(str(tmppath)).isel({seg_var: 0}, drop=True) + ).chunk(-1) + + # want chunks to be roughly 100MB in size + adm_chunksize = ceil(100 / (template.costs.nbytes / 2**20)) + seg_chunksize = ceil(100 / (template.optimal_case.nbytes / 2**20)) + + # get unique adms in same order as they appear in seg_ir + with xr.open_zarr(str(econ_input_path), chunks=None) as ciam_in: + adms = ciam_in[adm_var].load() + segs = ciam_in.seg.load() + adms_unique = adms.values[np.sort(np.unique(adms, return_index=True)[1])] + segs_unique = segs.values[np.sort(np.unique(segs, return_index=True)[1])] + + # expand dims across adm regions and segs + for varname in ["costs", "npv"]: + if varname in template.data_vars: + template[varname] = ( + template[varname] + .expand_dims({adm_var: adms_unique}) + .chunk({adm_var: adm_chunksize}) + ) + template["optimal_case"] = template.optimal_case.expand_dims(seg=segs_unique).chunk( + {"seg": seg_chunksize} + ) + + # provide scenario and sample coords if needed + if "scen_mc" in template.dims: + template = template.assign_coords( + scenario=("scen_mc", scen_mc_filter.get_level_values("scenario")), + sample=("scen_mc", scen_mc_filter.get_level_values("sample")), + ) + + # clean up before saving + template = template.unify_chunks().drop_encoding() + + # save + if overwrite or not outpath.exists(): + template.to_zarr(str(outpath), mode="w", compute=False) + + adm_groups = np.array_split(adms_unique, ceil(len(adms_unique) / adm_chunksize)) + seg_groups = np.array_split(segs_unique, ceil(len(segs_unique) / seg_chunksize)) + seg_fut = client.submit( + _get_seg_map, + econ_input_path=econ_input_path, + seg_adm_var=seg_var, + segs_unique=segs_unique, + ) + adm_fut = client.submit( + _get_adm_groups, + econ_input_path=econ_input_path, + adm_var=adm_var, + seg_adm_var=seg_var, + adms_unique=adms_unique, + ) + + def this_aggregate_costs(adm_group, adm_groups_all): + return _aggregate_costs_to_adm( + adm_group, + adm_groups=adm_groups_all, + adm_var=adm_var, + seg_adm_var=seg_var, + mc_dim=mc_dim, + output_path=outpath, + tmp_output_path=tmppath, + postprocess=postprocess, + storage_options=storage_options, + ) + + final_futs = pd.Series( + adm_groups, + index=client.map(this_aggregate_costs, adm_groups, adm_groups_all=adm_fut), + ) + for batch in as_completed(final_futs.index.tolist()).batches(): + final_futs = _check_completed_fut_batch( + batch, final_futs, client, this_aggregate_costs, adm_groups_all=adm_fut + ) + del batch + if not len(final_futs): + break + sleep(10) + + def this_aggregate_optimal_case(seg_group, seg_map): + return _aggregate_optimal_case_to_seg( + seg_group, + seg_map=seg_map, + seg_adm_var=seg_var, + output_path=outpath, + tmp_output_path=tmppath, + postprocess=postprocess, + storage_options=storage_options, + ) + + final_futs = pd.concat( + ( + final_futs, + pd.Series( + seg_groups, + index=client.map( + this_aggregate_optimal_case, seg_groups, seg_map=seg_fut + ), + ), + ) + ) + + for batch in as_completed(final_futs.index.tolist()).batches(): + final_futs = _check_completed_fut_batch( + batch, final_futs, client, this_aggregate_optimal_case, seg_map=seg_fut + ) + del batch + if not len(final_futs): + break + sleep(10) + + def execute_pyciam( params_path, econ_input_path, @@ -936,22 +1343,30 @@ def execute_pyciam( tmp_output_path=AnyPath("pyciam_tmp_results.zarr"), remove_tmpfile=True, overwrite=False, + no_surge_check=False, mc_dim="quantile", + slr_site_id_dim="site_id", seg_var="seg_adm", seg_var_subset=None, adm_var="adm1", + lsl_var="lsl_msl05", + scen_mc_filter=None, quantiles=[0.5], + refA_quantile=0.5, extra_attrs={}, econ_input_seg_chunksize=100, + client_batchsize=10000, surge_batchsize=700, surge_seg_chunksize=5, refA_seg_chunksize=500, pyciam_seg_chunksize=3, + other_chunksizes={}, diaz_inputs=False, diaz_config=False, dask_client_func=Client, storage_options=None, params_override={}, + postprocess_func=None, **model_kwargs, ): """Execute the full pyCIAM model. The following inputs are assumed: @@ -1019,11 +1434,16 @@ def execute_pyciam( want to examine seg-adm level results. ovewrwrite : bool, default False If True, overwrite all intermediate output files + no_surge_check : bool, default False + If True, assume surge lookup tables are complete and do not load to check. mc_dim : str, default "quantile" - The dimension of the sea level rise datasets specified at `slr_input_paths` that - indexes different simulations within the same scenario. This could reflect Monte - Carlo simulations *or* different quantiles of SLR. Ignored if + The dimension of the sea level rise datasets specified at ``slr_input_paths`` + that indexes different simulations within the same scenario. This could reflect + Monte Carlo simulations *or* different quantiles of SLR. Ignored if `diaz_inputs=True`. + slr_site_id_dim : str, default "site_id" + The dimension of the SLR datasets specified at ``slr_input_paths`` that indexes + location of the estimate. seg_var : str, default "seg_adm" The coordinate of the socioeconomic input data specified at `econ_input_path` that indexes the intersection of coastal segments and administrative units. @@ -1035,6 +1455,12 @@ def execute_pyciam( The coordinate of the socioeconomic input data specified at `econ_input_path` that specifies the administrative unit associated with each admin/seg intersection (`seg_var`). Ignored if `seg_var=="seg"` + lsl_var : str, default "lsl_msl05" + The name of the variable in ``slr_input_paths`` containing local SLR values + scen_mc_filter : :py:class:`pandas.MultiIndex`, optional + A Index of paired ``scenario`` (str) and ``mc_sample_id`` (int) values that + specify a subset of the individual SLR trajectories contained in the zarr stores + at ``slr_input_paths``. If None, run all scenario X MC sample combinations. quantiles : Optional[Iterable[float]], default [0.5] The quantiles of the sea level rise datasets specified at `slr_input_paths` that will be used to estimate costs within this function. If `mc_dim=="quantile"`, it @@ -1042,6 +1468,9 @@ def execute_pyciam( values of the "quantile" coordinate in each SLR dataset. If not, then quantiles over the simulations indexed by `mc_dim` will be calculated on-the-fly. If None, all simulations indexed by `mc_dim` will be used. + refA_quantile : float, default 0.5 + Quantile of SLR for "no-climate-change" scenario to use for defining refA (the + initial adaptation height). extra_attrs : dict[str, str], optional Additional attributes to write to the output dataset. econ_input_seg_chunksize : int, default 100 @@ -1079,16 +1508,29 @@ def execute_pyciam( some environment variables in order for the :py:mod:`cloudpathlib` objects to function correctly. For example, if your data exists on Google Cloud Storage and requires authentication, you would need to set the - `GOOGLE_APPLICATION_CREDENTIALS` environment variable to the same path as - reflected in `storage_options["token"]`. Other cloud storage providers will have + ``GOOGLE_APPLICATION_CREDENTIALS`` environment variable to the same path as + reflected in ``storage_options["token"]``. Other cloud storage providers will have different authentication methods and have not yet been tested with this function. params_override : dict, default {} - Used to override params specified in `params_path` + Used to override params specified in ``params_path``. + postprocess_func : func, optional + If not None, perform this on the intermediate seg-adm-level output before saving + to an adm-level aggregated output. For example, you may wish to only save some + values of case (e.g. noAdaptation and optimalfixed), or you may wish to sum + over costtype. This is especially useful when running on large monte carlo + samples with TB-scale outputs. Function must take a dataset as the only + argument. **model_kwargs Passed directly to :py:func:`pyCIAM.calc_costs` """ # convert filepaths to appropriate path representation + if not pd.api.types.is_list_like(slr_input_paths): + slr_input_paths = [slr_input_paths] + + if not pd.api.types.is_list_like(slr_names): + slr_names = [slr_names] + ( params_path, econ_input_path, @@ -1120,12 +1562,6 @@ def execute_pyciam( params = pd.read_json(params_path)["values"] params.update(params_override) - # determine whether to check for finished jobs - if output_path is None: - check = False - else: - check = True - attr_dict = { "updated": pd.Timestamp.now(tz="US/Pacific").strftime("%c"), "planning_period_start_years": params.at_start, @@ -1160,6 +1596,7 @@ def execute_pyciam( econ_input_path_seg = econ_input_path else: if overwrite or not econ_input_path_seg.is_dir(): + print("Creating seg-level econ input dataset...") collapse_econ_inputs_to_seg( econ_input_path, econ_input_path_seg, @@ -1172,18 +1609,20 @@ def execute_pyciam( ######################################## # create surge lookup table if necessary ######################################## - surge_futs = {} + + surge_futs = [] + print("Creating ESL impacts lookup table if it does not exist...") for var, path in surge_input_paths.items(): if path is None: continue - if overwrite or not path.is_dir(): + if not no_surge_check: if var == seg_var: this_econ_input = econ_input_path elif var == "seg": this_econ_input = econ_input_path_seg else: raise ValueError(var) - surge_futs[var] = lookup.create_surge_lookup( + surge_futs += lookup.create_surge_lookup( this_econ_input, slr_input_paths, path, @@ -1199,13 +1638,13 @@ def execute_pyciam( slr_0_years=params.slr_0_year, client=client, client_kwargs={"batch_size": surge_batchsize}, - force_overwrite=True, + force_overwrite=overwrite, seg_chunksize=surge_seg_chunksize, mc_dim=mc_dim, + slr_site_id_dim=slr_site_id_dim, + lsl_var=lsl_var, storage_options=storage_options, ) - # block on this calculation - client.gather(surge_futs) ############################### # define temporary output store @@ -1228,246 +1667,284 @@ def execute_pyciam( econ_input_path, slr_input_paths, params, - [this_seg], + selectors={seg_var: [this_seg]}, slr_names=slr_names, seg_var=seg_var, + lsl_var=lsl_var, surge_lookup_store=None, mc_dim=mc_dim, + slr_site_id_dim=slr_site_id_dim, + scen_mc_filter=scen_mc_filter, quantiles=quantiles, storage_options=storage_options, ) - slr = test_inputs[1].unstack("scen_mc") - test_inputs = test_inputs[0] + test_inputs, slr, _ = test_inputs + if scen_mc_filter is None: + coords = {"scenario": slr.scenario.values} + if quantiles is not None: + coords["quantiles"] = quantiles + else: + coords[mc_dim] = slr[mc_dim] + else: + if quantiles is not None: + raise ValueError( + "Cannot specify both ``scen_mc_filter`` and ``quantiles``." + ) + coords = {"scen_mc": slr["scen_mc"]} + + coords = OrderedDict( + { + "case": np.array(CASES, dtype=StringDType), + "costtype": np.array(COSTTYPES, dtype=StringDType), + seg_var: ciam_in[seg_var].values, + "year": np.arange(params.model_start, ciam_in.year.max().item() + 1), + **coords, + **{ + dim: ciam_in[dim].values + for dim in ["ssp", "iam"] + if dim in ciam_in.dims + }, + } + ) - if output_path is not None: - coords = OrderedDict( - { - "case": CASES, - "costtype": COSTTYPES, - seg_var: ciam_in[seg_var].values, - "scenario": slr.scenario.astype("unicode"), - "quantile": quantiles, - "year": np.arange(params.model_start, ciam_in.year.max().item() + 1), - **{ - dim: ciam_in[dim].values - for dim in ["ssp", "iam"] - if dim in ciam_in.dims - }, - } - ) + chunks = {seg_var: 1, "case": len(coords["case"]) - 1, **other_chunksizes} + chunks = {k: len(coords[k]) if k not in chunks else chunks[k] for k in coords} - chunks = {seg_var: 1, "case": len(coords["case"]) - 1} - chunks = {k: -1 if k not in chunks else chunks[k] for k in coords} + # create arrays + cost_dims = coords.keys() + costs = create_template_dataarray(cost_dims, coords, chunks) - # create arrays - cost_dims = coords.keys() + nonpvdims = ["year", "costtype"] + nooptcasedims = nonpvdims + ["case"] - out_ds = create_template_dataarray(cost_dims, coords, chunks).to_dataset( - name="costs" - ) - out_ds["npv"] = out_ds.costs.isel(year=0, costtype=0, drop=True).astype( - "float64" + def _create_smaller_array(nodims, dtype="float32"): + return create_template_dataarray( + [k for k in cost_dims if k not in nodims], coords, chunks, dtype=dtype ) - out_ds["optimal_case"] = out_ds.npv.isel(case=0, drop=True).astype("uint8") - # add attrs - out_ds.attrs.update(attr_dict) - out_ds = add_attrs_to_result(out_ds) + npv = _create_smaller_array(nonpvdims) + optcase = _create_smaller_array(nooptcasedims, dtype="uint8") - if overwrite or not tmp_output_path.is_dir(): - out_ds.to_zarr( - str(tmp_output_path), - compute=False, - mode="w", - storage_options=storage_options, - encoding={ - "costs": {"fill_value": "NaN"}, - "optimal_case": {"fill_value": 255}, - }, - ) + out_ds = xr.Dataset({"costs": costs, "npv": npv, "optimal_case": optcase}) + + # add attrs + out_ds.attrs.update(attr_dict) + out_ds = add_attrs_to_result(out_ds, seg_var, mc_dim=mc_dim) + + if output_path is not None and (overwrite or not tmp_output_path.is_dir()): + print("Creating output skeleton...") + out_ds.to_zarr( + str(tmp_output_path), + compute=False, + mode="w", + storage_options=storage_options, + safe_chunks=False, + ) + + # block on surge calculation before we start calculating actual costs + for f in as_completed(surge_futs, raise_errors=False): + if f.status != "finished": + f.result() + surge_futs.pop(f) #################################################### # Create initial adaptaion heights dataset if needed #################################################### if overwrite or not refA_path.is_dir(): + print("Estimating baseline adaptation RPs...") segs = np.unique(ciam_in.seg) - seg_grps = [ - segs[i : i + refA_seg_chunksize] + seg_groups = [ + slice(segs[i], segs[min(i + refA_seg_chunksize - 1, len(segs) - 1)]) for i in range(0, len(segs), refA_seg_chunksize) ] - ( - dataarray_from_delayed( - client.map( - get_refA, - seg_grps, - econ_input_path=econ_input_path_seg, - slr_input_path=slr_input_paths[0], - params=params, - surge_input_path=surge_input_paths["seg"], - mc_dim=mc_dim, - storage_options=storage_options, - quantiles=quantiles, - diaz_inputs=diaz_inputs, - eps=eps, - **model_kwargs, - ), - dim="seg", - ) - .to_dataset(name="refA") - .chunk({"seg": -1}) - .to_zarr(str(refA_path), storage_options=storage_options, mode="w") + refA_futs = client.map( + get_refA, + seg_groups, + econ_input_path=econ_input_path_seg, + slr_input_path=slr_input_paths[0], + params=params, + surge_input_path=surge_input_paths["seg"], + mc_dim=mc_dim, + slr_site_id_dim=slr_site_id_dim, + lsl_var=lsl_var, + storage_options=storage_options, + quantile=refA_quantile, + scen_mc_filter=None, + diaz_inputs=diaz_inputs, + eps=eps, + **model_kwargs, ) + refA_ds = dataset_from_delayed(refA_futs, dim="seg") + for v in refA_ds.data_vars: + refA_ds[v].attrs = {} + refA_ds = add_attrs_to_result( + refA_ds.rename(case="optimal_case"), "seg", mc_dim=mc_dim + ).rename(optimal_case="case") + refA_ds.to_zarr(str(refA_path), mode="w") + else: + print("Baseline adaptation RPs already calculated...") ############################### # get groups for running pyCIAM ############################### - groups = [ - ciam_in[seg_var].isel({seg_var: slice(i, i + pyciam_seg_chunksize)}).values - for i in np.arange(0, len(ciam_in[seg_var]), pyciam_seg_chunksize) - ] - - # get groups for aggregating seg-adms up to segs - if seg_var == "seg": - most_segadm = 1 - else: - most_segadm = ciam_in.length.groupby("seg").count().max().item() - i = 0 - agg_groups = [] - while i < len(ciam_in.seg): - this_group = ciam_in.isel({seg_var: slice(i, i + most_segadm)}) - if len(np.unique(this_group.seg)) == 1: - i += most_segadm - else: - this_group = this_group.isel( - {seg_var: this_group.seg != this_group.seg.isel(seg_adm=-1, drop=True)} - ) - i += len(this_group[seg_var]) - - agg_groups.append(this_group[seg_var].values) - - groups_ser = ( - pd.Series(groups) - .explode() - .reset_index() - .rename(columns={"index": "group_id", 0: seg_var}) - .set_index(seg_var) - .group_id + selector_groups, ix_groups = _get_selectors( + ciam_in, + seg_var, + pyciam_seg_chunksize, + other_chunksizes, + out_ds, + client_batchsize, ) ######################################################### # Run 1st stage (estimate costs for each adaptation type) ######################################################### - ciam_futs = np.array( - client.map( - calc_all_cases, - groups, - params=params, - econ_input_path=econ_input_path, - slr_input_paths=slr_input_paths, - slr_names=slr_names, - output_path=tmp_output_path, - refA_path=refA_path, - surge_input_path=surge_input_paths[seg_var], - seg_var=seg_var, - mc_dim=mc_dim, - quantiles=quantiles, - storage_options=storage_options, - diaz_inputs=diaz_inputs, - check=check, - **model_kwargs, - ) - ) - - if output_path is None and seg_var == "seg": - out = add_attrs_to_result( - xr.concat( - client.gather( - client.map( - optimize_case_seg, - ciam_futs, - dfact=test_inputs.dfact, - npv_start=test_inputs.npv_start, - ) - ), - dim="seg", - ) - ) - out.attrs.update(attr_dict) - return out - - ############################################## - # Run 2nd stage (calculate optimal adaptation) - ############################################## - seg_adm_ser = pd.Series(ciam_in[seg_var].values) - seg_adm_ser.index = ciam_in.seg.values - seg_grps = seg_adm_ser.groupby(seg_adm_ser.index).apply(list) - precurser_futs = ( - seg_adm_ser.to_frame(seg_var) - .join(seg_grps.rename("seg_group")) - .set_index(seg_var) - .seg_group.explode() - .to_frame() - .join(groups_ser, on="seg_group") - .groupby(seg_var) - .group_id.apply(set) - .apply(list) - .apply(lambda x: ciam_futs[x]) - ) - ciam_futs_2 = precurser_futs.reset_index(drop=False).apply( - lambda row: client.submit( - optimize_case, - row[seg_var], - *row.group_id, - econ_input_path=econ_input_path, - output_path=tmp_output_path, - seg_var=seg_var, - eps=eps, - check=check, - storage_options=storage_options, - ), - axis=1, - ) + # print("Queueing jobs to calculate costs for all adaptation scenarios...") + + # ciam_futs = None + # n_selector_groups = len(selector_groups) + # for sx, s in enumerate(selector_groups): + # final_batch = (sx + 1) == n_selector_groups + # print(f"computing group {sx + 1} / {n_selector_groups}") + # if check: + # to_run = _check_finished_full_chunk( + # s, + # tmp_output_path, + # client, + # storage_options=storage_options, + # ) + # else: + # to_run = s + + # if not len(to_run): + # continue + + # print(f"...adding {len(to_run)} tasks to queue") + + # def _calc_all_cases(selector): + # return calc_all_cases( + # selector, + # params=params, + # econ_input_path=econ_input_path, + # slr_input_paths=slr_input_paths, + # slr_names=slr_names, + # output_path=tmp_output_path, + # refA_path=refA_path, + # surge_input_path=surge_input_paths[seg_var], + # seg_var=seg_var, + # mc_dim=mc_dim, + # slr_site_id_dim=slr_site_id_dim, + # lsl_var=lsl_var, + # scen_mc_filter=scen_mc_filter, + # quantiles=quantiles, + # storage_options=storage_options, + # diaz_inputs=diaz_inputs, + # check=check, + # **model_kwargs, + # ) + + # this_futs = pd.Series( + # to_run, + # index=client.map( + # _calc_all_cases, + # to_run, + # ), + # ) + # if ciam_futs is None: + # ciam_futs = this_futs + # else: + # ciam_futs = pd.concat((ciam_futs, this_futs)) + # del this_futs + + # for batch in as_completed(ciam_futs.index.tolist()).batches(): + # ciam_futs = _check_completed_fut_batch( + # batch, ciam_futs, client, _calc_all_cases + # ) + # del batch + # if ((not final_batch) and len(ciam_futs) < (client_batchsize / 10)) or ( + # final_batch and (not len(ciam_futs)) + # ): + # break + # sleep(10) + + # ############################################## + # # Run 2nd stage (calculate optimal adaptation) + # ############################################## + + # print("Queueing jobs to optimize adaptation scenario by segment...") + # stage_2_futs = None + # for sx, s in enumerate(selector_groups): + # final_batch = (sx + 1) == n_selector_groups + # print(f"computing group {sx + 1} / {n_selector_groups}") + # if check: + # to_run = _check_finished_full_chunk( + # s, + # tmp_output_path, + # client, + # storage_options=storage_options, + # case_selector=CASES[-1], + # ) + # else: + # to_run = s + + # if not len(to_run): + # continue + + # print(f"...adding {len(to_run)} tasks to queue") + + # def _optimize_case(selector): + # return optimize_case( + # selector, + # econ_input_path=econ_input_path, + # output_path=tmp_output_path, + # seg_var=seg_var, + # eps=eps, + # check=check, + # storage_options=storage_options, + # ) + + # this_futs = pd.Series( + # to_run, + # index=client.map( + # _optimize_case, + # to_run, + # ), + # ) + # if stage_2_futs is None: + # stage_2_futs = this_futs + # else: + # stage_2_futs = pd.concat((stage_2_futs, this_futs)) + # del this_futs + + # for batch in as_completed(stage_2_futs.index.tolist()).batches(): + # stage_2_futs = _check_completed_fut_batch( + # batch, stage_2_futs, client, _optimize_case + # ) + # del batch + # if ((not final_batch) and len(stage_2_futs) < (client_batchsize / 10)) or ( + # final_batch and (not len(stage_2_futs)) + # ): + # break + # sleep(10) ############################### # Rechunk and save final ############################### - client.gather(ciam_futs_2.tolist()) - client.cancel(ciam_futs_2) - del ciam_futs_2 - - this_chunksize = pyciam_seg_chunksize * 3 - out = ( - xr.open_zarr( - str(tmp_output_path), - storage_options=storage_options, - chunks={"case": -1, seg_var: this_chunksize}, - ) - .drop_vars("npv") - .chunk({"year": 10}) - .persist() + _aggregate_results( + tmp_output_path, + output_path, + econ_input_path, + seg_var, + adm_var, + mc_dim, + client, + postprocess_func=postprocess_func, + scen_mc_filter=scen_mc_filter, + overwrite=overwrite, + storage_options=storage_options, ) - if adm_var != seg_var: - out["costs"] = ( - out.costs.groupby(ciam_in[adm_var]).sum().chunk({adm_var: this_chunksize}) - ).persist() - out["optimal_case"] = ( - out.optimal_case.load().groupby(ciam_in.seg).first(skipna=False).chunk() - ).persist() - out = out.drop(seg_var) - out = out.unify_chunks() - - for v in out.data_vars: - out[v].encoding.clear() - - for k, v in out.coords.items(): - if v.dtype == object: - out[k] = v.astype("unicode") - - out = out.persist() - - out.to_zarr(str(output_path), storage_options=storage_options, mode="w") ############################### # Final checks and cleanup @@ -1491,12 +1968,24 @@ def get_refA( params, surge_input_path=None, mc_dim="quantile", - storage_options={}, - quantiles=[0.5], + slr_site_id_dim="site_id", + lsl_var="lsl_msl05", + storage_options=None, + quantile=0.5, eps=1, + scen_mc_filter=None, diaz_inputs=False, + output_path=None, **model_kwargs, ): + if check_finished_zarr_workflow( + finalstore=output_path, + varname="refA", + storage_options=storage_options, + final_selector={"seg": segs}, + ): + return None + if diaz_inputs: inputs, slr = load_diaz_inputs( econ_input_path, @@ -1512,16 +2001,18 @@ def get_refA( econ_input_path, slr_input_path, params, - segs, + selectors={"seg": segs}, surge_lookup_store=surge_input_path, mc_dim=mc_dim, + lsl_var=lsl_var, + slr_site_id_dim=slr_site_id_dim, include_cc=False, include_ncc=True, + scen_mc_filter=scen_mc_filter, storage_options=storage_options, - quantiles=quantiles, + quantiles=[quantile], **params.refA_scenario_selectors, ) - slr = slr.unstack("scen_mc") slr = slr.squeeze( [d for d in slr.dims if len(slr[d]) == 1 and d != "seg"], drop=True ) @@ -1552,13 +2043,17 @@ def get_refA( lowest = npv.argmin("case").astype("uint8") refA = refA.isel(case=lowest) + refA = refA.to_dataset(name="refA").reset_coords(drop=True) refA["case"] = lowest + if output_path is not None: + refA.to_zarr(str(output_path), storage_options=storage_options, region="auto") + return None return refA def calc_all_cases( - seg_adms, + selectors, params, econ_input_path, slr_input_paths, @@ -1568,8 +2063,11 @@ def calc_all_cases( surge_input_path=None, seg_var="seg_adm", mc_dim="quantile", + slr_site_id_dim="site_id", + lsl_var="lsl_msl05", quantiles=[0.5], - storage_options={}, + scen_mc_filter=None, + storage_options=None, check=True, diaz_inputs=False, **model_kwargs, @@ -1577,16 +2075,14 @@ def calc_all_cases( if check_finished_zarr_workflow( finalstore=output_path if check else None, varname="costs", - final_selector={seg_var: seg_adms, "case": CASES[:-1]}, + final_selector={"case": CASES[:-1], **selectors}, storage_options=storage_options, ): return None - segs = ["_".join(seg_adm.split("_")[:2]) for seg_adm in seg_adms] - if diaz_inputs: inputs, slr = load_diaz_inputs( - econ_input_path, segs, params, storage_options=storage_options + econ_input_path, selectors[seg_var], params, storage_options=storage_options ) surge = None else: @@ -1594,26 +2090,35 @@ def calc_all_cases( econ_input_path, slr_input_paths, params, - seg_adms, + selectors=selectors, slr_names=slr_names, seg_var=seg_var, surge_lookup_store=surge_input_path, mc_dim=mc_dim, + lsl_var=lsl_var, + slr_site_id_dim=slr_site_id_dim, + scen_mc_filter=scen_mc_filter, quantiles=quantiles, storage_options=storage_options, ) + segs = ( + xr.open_zarr( + str(econ_input_path), storage_options=storage_options, chunks=None + )["seg"] + .sel({seg_var: selectors[seg_var]}) + .values + ) + assert inputs.notnull().all().to_array("tmp").all() assert slr.notnull().all() if surge is not None: assert surge.notnull().all().to_array("tmp").all() # get initial adaptation height - refA = ( - xr.open_zarr(str(refA_path), storage_options=storage_options, chunks=None) - .refA.sel(seg=segs) - .drop_vars("case") - ) - refA["seg"] = seg_adms + refA = xr.open_zarr( + str(refA_path), storage_options=storage_options, chunks=None + ).refA.sel(seg=segs) + refA["seg"] = inputs["seg"].values if "movefactor" in refA.dims: refA = refA.sel(movefactor=params.movefactor, drop=True) @@ -1633,50 +2138,43 @@ def calc_all_cases( .sel(year=slice(inputs.npv_start, None)) .sum("year") ) + + # get proper ordering of case + out = out.sel(case=SOLVCASES, costtype=COSTTYPES).reset_coords(drop=True) if output_path is not None: - save_to_zarr_region(out, output_path, storage_options=storage_options) + out.to_zarr(str(output_path), storage_options=storage_options, region="auto") return None return out def optimize_case( - seg_adm, + selectors, *wait_futs, econ_input_path=None, output_path=None, seg_var="seg_adm", check=True, eps=1, - storage_options={}, + storage_options=None, ): # use last fpath to check if this task has already been run if check and check_finished_zarr_workflow( finalstore=output_path if check else None, varname="costs", - final_selector={seg_var: seg_adm, "case": CASES[-1]}, + final_selector={"case": CASES[-1], **selectors}, storage_options=storage_options, ): return None - seg = "_".join(seg_adm.split("_")[:2]) - with xr.open_zarr( - str(econ_input_path), storage_options=storage_options, chunks=None - ) as ds: - all_segs = ds.seg.load() - - this_seg_adms = all_segs[seg_var].isel({seg_var: all_segs.seg == seg}).values - - save_to_zarr_region( - select_optimal_case( - output_path, - seg_adm, - this_seg_adms, - eps=eps, - region_var=seg_var, - storage_options=storage_options, - ), + select_optimal_case( output_path, + selectors, + econ_input_path, + eps=eps, + region_var=seg_var, storage_options=storage_options, + ).reset_coords(drop=True).to_zarr( + str(output_path), storage_options=storage_options, region="auto" ) return None diff --git a/pyCIAM/surge/lookup.py b/pyCIAM/surge/lookup.py index 935e626..bd4599b 100644 --- a/pyCIAM/surge/lookup.py +++ b/pyCIAM/surge/lookup.py @@ -21,10 +21,13 @@ import dask.array as da import numpy as np -import pandas as pd import xarray as xr +from numpy.dtypes import StringDType -from pyCIAM.io import _load_lslr_for_ciam, save_to_zarr_region +from pyCIAM.io import ( + _load_lslr_for_ciam, + check_finished_zarr_workflow, +) from pyCIAM.surge._calc import ( _calc_storm_damages_no_resilience, _get_surge_heights_probs, @@ -61,7 +64,9 @@ def _get_lslr_rhdiff_range( include_ncc=True, slr_0_years=2005, mc_dim="mc_sample_id", - storage_options={}, + site_id_dim="site_id", + lsl_var="lsl_msl05", + storage_options=None, ): """Get range of lslr and rhdiff that we need to model to cover the full range. @@ -78,7 +83,7 @@ def _get_lslr_rhdiff_range( pc_in = _s2d( xr.open_zarr( str(sliiders_store), chunks=None, storage_options=storage_options - ).sel({seg_var: seg_vals}) + ).sel({seg_var: seg_vals, "year": slice(min(slr_0_years), None)}) ) if interp_years is None: @@ -100,9 +105,11 @@ def _get_lslr_rhdiff_range( include_cc=include_cc, include_ncc=include_ncc, mc_dim=mc_dim, + lsl_var=lsl_var, slr_0_year=slr_0_years[sx], storage_options=storage_options, - quantiles=quantiles, + site_id_dim=site_id_dim, + quantiles=[0, 1] if mc_dim != "quantile" else quantiles, ) # get the max LSLR experienced @@ -175,7 +182,7 @@ def _create_surge_lookup_skeleton_store( seg_var="seg", seg_var_subset=None, force_overwrite=True, - storage_options={}, + storage_options=None, ): pc_in = subset_econ_inputs( xr.open_zarr(str(sliiders_store), storage_options=storage_options), @@ -187,14 +194,17 @@ def _create_surge_lookup_skeleton_store( da.empty( (len(pc_in[seg_var]), n_interp_pts_lslr, n_interp_pts_rhdiff, 2, 2), chunks=(seg_chunksize, -1, -1, -1, -1), + dtype="float64", ), dims=[seg_var, "lslr", "rh_diff", "costtype", "adapttype"], coords={ seg_var: pc_in[seg_var].values, "lslr": np.arange(n_interp_pts_lslr), "rh_diff": np.arange(n_interp_pts_rhdiff), - "adapttype": ["retreat", "protect"], - "costtype": ["stormCapital", "stormPopulation"], + "adapttype": np.array(["retreat", "protect"], dtype=StringDType), + "costtype": np.array( + ["stormCapital", "stormPopulation"], dtype=StringDType + ), }, ).to_dataset(name="frac_losses") to_save["rh_diff_by_seg"] = ( @@ -228,6 +238,7 @@ def _create_surge_lookup_skeleton_store( def _save_storm_dam( seg_vals, seg_var="seg", + lsl_var="lsl_msl05", sliiders_store=None, slr_stores=None, surge_lookup_store=None, @@ -239,11 +250,21 @@ def _save_storm_dam( quantiles=None, scen_mc_filter=None, mc_dim="mc_sample_id", + slr_site_id_dim="site_id", start_year=None, slr_0_years=2005, - storage_options={}, + overwrite=False, + storage_options=None, ): """Map over each chunk to run through damage calcs.""" + + if not overwrite and check_finished_zarr_workflow( + surge_lookup_store, + varname="frac_losses", + final_selector={seg_var: seg_vals}, + storage_options=storage_options, + ): + return None diff_ranges = _get_lslr_rhdiff_range( sliiders_store, slr_stores, @@ -255,6 +276,8 @@ def _save_storm_dam( quantiles=quantiles, scen_mc_filter=scen_mc_filter, mc_dim=mc_dim, + lsl_var=lsl_var, + site_id_dim=slr_site_id_dim, slr_0_years=slr_0_years, storage_options=storage_options, ) @@ -268,18 +291,26 @@ def _save_storm_dam( # these must be unique otherwise interp function will raise error template["lslr_by_seg"] = ( (seg_var, "lslr"), - np.tile(np.arange(len(template.lslr))[np.newaxis, :], (len(seg_vals), 1)), + np.tile( + np.arange(len(template.lslr), dtype=template.lslr_by_seg.dtype)[ + np.newaxis, : + ], + (len(seg_vals), 1), + ), ) template["rh_diff_by_seg"] = ( (seg_var, "rh_diff"), np.tile( - np.arange(len(template.rh_diff))[np.newaxis, :], (len(seg_vals), 1) + np.arange(len(template.rh_diff), dtype=template.rh_diff_by_seg.dtype)[ + np.newaxis, : + ], + (len(seg_vals), 1), ), ) if surge_lookup_store is None: return template - save_to_zarr_region( - template, surge_lookup_store, storage_options=storage_options + template.to_zarr( + str(surge_lookup_store), storage_options=storage_options, region="auto" ) return None @@ -343,7 +374,7 @@ def _save_storm_dam( ).to_array("costtype") ) res = ( - xr.concat(res, dim=pd.Index(["retreat", "protect"], name="adapttype")) + xr.concat(res, dim=xr.DataArray(["retreat", "protect"], dims=["adapttype"])) .reindex(costtype=["stormCapital", "stormPopulation"]) .to_dataset(name="frac_losses") ) @@ -354,7 +385,7 @@ def _save_storm_dam( return res # identify which index to save to in template zarr - save_to_zarr_region(res, surge_lookup_store, storage_options=storage_options) + res.to_zarr(str(surge_lookup_store), storage_options=storage_options, region="auto") def create_surge_lookup( @@ -374,10 +405,12 @@ def create_surge_lookup( scen_mc_filter=None, quantiles=None, mc_dim="mc_sample_id", + slr_site_id_dim="site_id", + lsl_var="lsl_msl05", force_overwrite=False, client=None, client_kwargs={}, - storage_options={}, + storage_options=None, ): """Create storm surge lookup table. @@ -405,6 +438,10 @@ def create_surge_lookup( equivalent to segments. The reason you may wish to have nested regions is to be able to aggregate impacts to a different regions than those that are defined by the segments. + lsl_var : str, default "lsl_msl05" + The name of the variable in ``slr_stores`` containing local SLR values + slr_site_id_dim : str, default "site_id" + The name of the location dimension in ``slr_stores``. at_start : list of int A list specifying the starting years of each adpatation period. In pyCIAM, each segment chooses a new retreat or protection height at the start of each of these @@ -492,6 +529,8 @@ def create_surge_lookup( n_interp_pts_lslr=n_interp_pts_lslr, n_interp_pts_rhdiff=n_interp_pts_rhdiff, mc_dim=mc_dim, + lsl_var=lsl_var, + slr_site_id_dim=slr_site_id_dim, ddf_i=ddf_i, dmf_i=dmf_i, quantiles=quantiles, @@ -499,6 +538,7 @@ def create_surge_lookup( start_year=start_year, slr_0_years=slr_0_years, storage_options=storage_options, + overwrite=force_overwrite, **client_kwargs, ) ) diff --git a/pyCIAM/utils.py b/pyCIAM/utils.py index fd635b4..1cac079 100644 --- a/pyCIAM/utils.py +++ b/pyCIAM/utils.py @@ -64,10 +64,7 @@ def _get_lslr_plan_data( lslr_plan = lslr.sel(year=design_years).rename(year="at") lslr_plan["at"] = plan_years else: - # hack to handle newer xarray not being able to groupby with multiindex - lslr_plan = lslr.unstack().groupby(planning_periods).max().rename("lslr_plan") - if "scen_mc" in lslr.dims: - lslr_plan = lslr_plan.stack(scen_mc=lslr.xindexes["scen_mc"].index.names) + lslr_plan = lslr.groupby(planning_periods).max().rename("lslr_plan") # hack to reduce surge height by 50% for protect 10 as in Diaz2016 if diaz_protect_height: @@ -76,10 +73,9 @@ def _get_lslr_plan_data( ) else: surge_heights_p = surge_heights - surge_heights = xr.concat( (surge_heights, surge_heights_p), - dim=pd.Index(["retreat", "protect"], name="adapttype"), + dim=xr.DataArray(["retreat", "protect"], dims=["adapttype"]), ) # calculate retreat and protect heights @@ -116,7 +112,7 @@ def spherical_nearest_neighbor(df1, df2, x1="lon", y1="lat", x2="lon", y2="lat") return pd.Series(df2.index[ixs[:, 0]], index=df1.index) -def add_attrs_to_result(ds): +def add_attrs_to_result(ds, seg_var, mc_dim=None): attr_dict = { "case": { "long_name": "Adaptation Strategy", @@ -180,11 +176,32 @@ def add_attrs_to_result(ds): "long_name": "Shared Socioeconomic Pathway", "description": "Socioeconomic growth model used", }, + "refA": { + "long_name": "Initial adaptation height", + "description": ( + "Initial retreat height assumed in model. Determined by choosing " + "optimal adaptation pathway under a 'no-climate-change' scenario and " + "selecting the initial height. Retreat is assumed regardless of " + "whether optimal path is retreat or protect." + ), + }, + "scen_mc": { + "long_name": "Scenario and Monte Carlo Sample Index", + "description": ( + "If filtering to a subset of the full combination of SLR scenario and " + "Monte Carlo sample using `scen_mc_filter`, the result is no longer a " + "dense array across both 'scenario' and '`mc_sample_dim`. Instead, we " + "have a 1D list of scenario/mcID values that were run. This coordinate " + "contains that list." + ), + }, } + if mc_dim is not None: + attr_dict[mc_dim] = {"long_name": "Monte carlo sample index"} extra_vars = [ v for v in ds.variables - if v not in ["year", "seg_adm", "npv"] + list(attr_dict.keys()) + if v not in ["year", seg_var, "npv"] + list(attr_dict.keys()) ] assert not len(extra_vars), f"Unexpected variables: {extra_vars}" for v in ds.variables: @@ -193,13 +210,19 @@ def add_attrs_to_result(ds): return ds +def _get_exp_year(da): + exp_year = [v for v in da.data_vars if v.startswith("pop_") and "scale" not in v] + assert len(exp_year) == 1, exp_year + return int(exp_year[0].split("_")[1]) + + def collapse_econ_inputs_to_seg( econ_input_path, output_path, seg_var_subset=None, output_chunksize=100, seg_var="seg_adm", - storage_options={}, + storage_options=None, ): sliiders = subset_econ_inputs( xr.open_zarr( @@ -222,10 +245,15 @@ def collapse_econ_inputs_to_seg( sliiders.ypcc.sel(country="USA", drop=True).load().reset_coords(drop=True) ) + # allow for different base years in K and pop spatial variables + exp_year = _get_exp_year(sliiders) + pop_var = f"pop_{exp_year}" + k_var = f"K_{exp_year}" + out = ( - sliiders[["K_2019", "pop_2019", "landarea", "length", "wetland"]] + sliiders[[k_var, pop_var, "landarea", "length", "wetland"]] .groupby(grouper) - .sum("seg_adm") + .sum(seg_var) ) out[["surge_height", "gumbel_params", "seg_lon", "seg_lat"]] = ( @@ -246,16 +274,18 @@ def weighted_avg(varname, wts_in): for v, w in [ ( "mobcapfrac", - sliiders.K_2019.sum("elev"), + sliiders[k_var].sum("elev"), ), - ("pop_scale", sliiders.pop_2019.sum("elev")), - ("K_scale", sliiders.K_2019.sum("elev")), + ("pop_scale", sliiders[pop_var].sum("elev")), + ("K_scale", sliiders[k_var].sum("elev")), ("interior", sliiders.landarea.sum("elev")), ("pc", sliiders.length), - ("ypcc", sliiders.pop_2019.sum("elev")), + ("ypcc", sliiders[pop_var].sum("elev")), ("wetlandservice", sliiders.wetland.sum("elev")), + ("vsl", sliiders[pop_var].sum("elev")), ]: - weighted_avg(v, w) + if v in sliiders.data_vars: + weighted_avg(v, w) out["rho"] = out.ypcc / (out.ypcc + usa_ypcc_ref.sel(year=2000, drop=True)) diff --git a/pyproject.toml b/pyproject.toml index 7e08fc8..e7bcfa8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,9 +20,10 @@ maintainers = [{ name = "Ian Bolliger", email = "ian@reask.earth"}] dependencies = [ "cloudpathlib", "gitpython", - "rhg_compute_tools", + "numpy >= 2", "parameterize_jobs", "pint-xarray", + "rhg_compute_tools", "scikit-learn", "xarray[accel,parallel]", "zarr"