1313import time
1414import warnings
1515from pathlib import Path
16- from typing import Any , Callable , Dict , Iterable , List , Tuple , Union , Sequence
16+ from typing import Any , Callable , Dict , Iterable , List , Tuple , Union , Sequence , Optional
1717
1818import geopandas as gpd
1919import numpy as np
@@ -936,22 +936,18 @@ def save_result(args: Dict, env: EvalEnv) -> SaveResult: # TODO: return type no
936936
937937@process_registry_100 .add_function (spec = read_spec ("openeo-processes/experimental/save_ml_model.json" ))
938938@process_registry_2xx .add_function (spec = read_spec ("openeo-processes/experimental/save_ml_model.json" ))
939- def save_ml_model (args : dict , env : EvalEnv ) -> MlModelResult :
940- data : DriverMlModel = extract_arg (args , "data" , process_id = "save_ml_model" )
941- if not isinstance (data , DriverMlModel ):
942- raise ProcessParameterInvalidException (
943- parameter = "data" , process = "save_ml_model" , reason = f"Invalid data type { type (data )!r} expected raster-cube."
944- )
945- options = args .get ("options" , {})
939+ def save_ml_model (args : ProcessArgs , env : EvalEnv ) -> MlModelResult :
940+ data = args .get_required ("data" , expected_type = DriverMlModel )
941+ options = args .get_optional ("options" , default = {}, expected_type = dict )
946942 return MlModelResult (ml_model = data , options = options )
947943
948944
949945@process_registry_100 .add_function (spec = read_spec ("openeo-processes/experimental/load_ml_model.json" ))
950946@process_registry_2xx .add_function (spec = read_spec ("openeo-processes/experimental/load_ml_model.json" ))
951- def load_ml_model (args : dict , env : EvalEnv ) -> DriverMlModel :
947+ def load_ml_model (args : ProcessArgs , env : EvalEnv ) -> DriverMlModel :
952948 if env .get (ENV_DRY_RUN_TRACER ):
953949 return DriverMlModel ()
954- job_id = extract_arg ( args , "id" )
950+ job_id = args . get_required ( "id" , expected_type = str )
955951 return env .backend_implementation .load_ml_model (job_id )
956952
957953
@@ -1187,51 +1183,34 @@ def add_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
11871183
11881184
11891185@process
1190- def drop_dimension (args : dict , env : EvalEnv ) -> DriverDataCube :
1191- data_cube = extract_arg (args , 'data' )
1192- if not isinstance (data_cube , DriverDataCube ):
1193- raise ProcessParameterInvalidException (
1194- parameter = "data" , process = "drop_dimension" ,
1195- reason = f"Invalid data type { type (data_cube )!r} expected raster-cube."
1196- )
1197- return data_cube .drop_dimension (name = extract_arg (args , 'name' ))
1186+ def drop_dimension (args : ProcessArgs , env : EvalEnv ) -> DriverDataCube :
1187+ cube : DriverDataCube = args .get_required ("data" , expected_type = DriverDataCube )
1188+ name : str = args .get_required ("name" , expected_type = str )
1189+ return cube .drop_dimension (name = name )
11981190
11991191
12001192@process
1201- def dimension_labels (args : dict , env : EvalEnv ) -> DriverDataCube :
1202- data_cube = extract_arg (args , 'data' )
1203- if not isinstance (data_cube , DriverDataCube ):
1204- raise ProcessParameterInvalidException (
1205- parameter = "data" , process = "dimension_labels" ,
1206- reason = f"Invalid data type { type (data_cube )!r} expected raster-cube."
1207- )
1208- return data_cube .dimension_labels (dimension = extract_arg (args , 'dimension' ))
1193+ def dimension_labels (args : ProcessArgs , env : EvalEnv ) -> List [str ]:
1194+ cube : DriverDataCube = args .get_required ("data" , expected_type = DriverDataCube )
1195+ dimension : str = args .get_required ("dimension" , expected_type = str )
1196+ return cube .dimension_labels (dimension = dimension )
12091197
12101198
12111199@process
1212- def rename_dimension (args : dict , env : EvalEnv ) -> DriverDataCube :
1213- data_cube = extract_arg (args , 'data' )
1214- if not isinstance (data_cube , DriverDataCube ):
1215- raise ProcessParameterInvalidException (
1216- parameter = "data" , process = "rename_dimension" ,
1217- reason = f"Invalid data type { type (data_cube )!r} expected raster-cube."
1218- )
1219- return data_cube .rename_dimension (source = extract_arg (args , 'source' ),target = extract_arg (args , 'target' ))
1200+ def rename_dimension (args : ProcessArgs , env : EvalEnv ) -> DriverDataCube :
1201+ cube : DriverDataCube = args .get_required ("data" , expected_type = DriverDataCube )
1202+ source : str = args .get_required ("source" , expected_type = str )
1203+ target : str = args .get_required ("target" , expected_type = str )
1204+ return cube .rename_dimension (source = source , target = target )
12201205
12211206
12221207@process
1223- def rename_labels (args : dict , env : EvalEnv ) -> DriverDataCube :
1224- data_cube = extract_arg (args , 'data' )
1225- if not isinstance (data_cube , DriverDataCube ):
1226- raise ProcessParameterInvalidException (
1227- parameter = "data" , process = "rename_labels" ,
1228- reason = f"Invalid data type { type (data_cube )!r} expected raster-cube."
1229- )
1230- return data_cube .rename_labels (
1231- dimension = extract_arg (args , 'dimension' ),
1232- target = extract_arg (args , 'target' ),
1233- source = args .get ('source' ,[])
1234- )
1208+ def rename_labels (args : ProcessArgs , env : EvalEnv ) -> DriverDataCube :
1209+ cube : DriverDataCube = args .get_required ("data" , expected_type = DriverDataCube )
1210+ dimension : str = args .get_required ("dimension" , expected_type = str )
1211+ target : List = args .get_required ("target" , expected_type = list )
1212+ source : Optional [list ] = args .get_optional ("source" , default = None , expected_type = list )
1213+ return cube .rename_labels (dimension = dimension , target = target , source = source )
12351214
12361215
12371216@process
@@ -1377,14 +1356,10 @@ def aggregate_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
13771356
13781357
13791358@process
1380- def mask (args : dict , env : EvalEnv ) -> DriverDataCube :
1381- cube = extract_arg (args , 'data' )
1382- if not isinstance (cube , DriverDataCube ):
1383- raise ProcessParameterInvalidException (
1384- parameter = "data" , process = "mask" , reason = f"Invalid data type { type (cube )!r} expected raster-cube."
1385- )
1386- mask = extract_arg (args , 'mask' )
1387- replacement = args .get ('replacement' , None )
1359+ def mask (args : ProcessArgs , env : EvalEnv ) -> DriverDataCube :
1360+ cube : DriverDataCube = args .get_required ("data" , expected_type = DriverDataCube )
1361+ mask : DriverDataCube = args .get_required ("mask" , expected_type = DriverDataCube )
1362+ replacement = args .get_optional ("replacement" , default = None )
13881363 return cube .mask (mask = mask , replacement = replacement )
13891364
13901365
@@ -1416,7 +1391,10 @@ def mask_polygon(args: dict, env: EvalEnv) -> DriverDataCube:
14161391 return image_collection
14171392
14181393
1419- def _extract_temporal_extent (args : dict , field = "extent" , process_id = "filter_temporal" ) -> Tuple [str , str ]:
1394+ def _extract_temporal_extent (
1395+ args : Union [dict , ProcessArgs ], field = "extent" , process_id = "filter_temporal"
1396+ ) -> Tuple [str , str ]:
1397+ # TODO #346: make this a ProcessArgs method?
14201398 extent = extract_arg (args , name = field , process_id = process_id )
14211399 if len (extent ) != 2 :
14221400 raise ProcessParameterInvalidException (
@@ -1441,29 +1419,27 @@ def _extract_temporal_extent(args: dict, field="extent", process_id="filter_temp
14411419
14421420
14431421@process
1444- def filter_temporal (args : dict , env : EvalEnv ) -> DriverDataCube :
1445- cube = extract_arg (args , 'data' )
1446- if not isinstance (cube , DriverDataCube ):
1447- raise ProcessParameterInvalidException (
1448- parameter = "data" , process = "filter_temporal" ,
1449- reason = f"Invalid data type { type (cube )!r} expected raster-cube."
1450- )
1422+ def filter_temporal (args : ProcessArgs , env : EvalEnv ) -> DriverDataCube :
1423+ cube : DriverDataCube = args .get_required ("data" , expected_type = DriverDataCube )
14511424 extent = _extract_temporal_extent (args , field = "extent" , process_id = "filter_temporal" )
14521425 return cube .filter_temporal (start = extent [0 ], end = extent [1 ])
14531426
1427+
14541428@process_registry_100 .add_function (spec = read_spec ("openeo-processes/1.x/proposals/filter_labels.json" ))
14551429@process_registry_2xx .add_function (spec = read_spec ("openeo-processes/2.x/proposals/filter_labels.json" ))
1456- def filter_labels (args : dict , env : EvalEnv ) -> DriverDataCube :
1457- cube = extract_arg ( args , 'data' )
1458- if not isinstance ( cube , DriverDataCube ):
1459- raise ProcessParameterInvalidException (
1460- parameter = "data " , process = "filter_labels" ,
1461- reason = f"Invalid data type { type ( cube )!r } expected cube."
1462- )
1430+ def filter_labels (args : ProcessArgs , env : EvalEnv ) -> DriverDataCube :
1431+ cube : DriverDataCube = args . get_required ( "data" , expected_type = DriverDataCube )
1432+ # TODO: validation that condition is a process graph construct
1433+ condition = args . get_required ( "condition" , expected_type = dict )
1434+ dimension = args . get_required ( "dimension " , expected_type = str )
1435+ context = args . get_optional ( "context" , default = None )
1436+ return cube . filter_labels ( condition = condition , dimension = dimension , context = context , env = env )
14631437
1464- return cube .filter_labels (condition = extract_arg (args ,"condition" ),dimension = extract_arg (args ,"dimension" ),context = args .get ("context" ,None ),env = env )
14651438
1466- def _extract_bbox_extent (args : dict , field = "extent" , process_id = "filter_bbox" , handle_geojson = False ) -> dict :
1439+ def _extract_bbox_extent (
1440+ args : Union [dict , ProcessArgs ], field = "extent" , process_id = "filter_bbox" , handle_geojson = False
1441+ ) -> dict :
1442+ # TODO #346: make this a ProcessArgs method?
14671443 extent = extract_arg (args , name = field , process_id = process_id )
14681444 if handle_geojson and extent .get ("type" ) in [
14691445 "Polygon" ,
@@ -1488,24 +1464,16 @@ def _extract_bbox_extent(args: dict, field="extent", process_id="filter_bbox", h
14881464
14891465
14901466@process
1491- def filter_bbox (args : Dict , env : EvalEnv ) -> DriverDataCube :
1492- cube = extract_arg (args , 'data' )
1493- if not isinstance (cube , DriverDataCube ):
1494- raise ProcessParameterInvalidException (
1495- parameter = "data" , process = "filter_bbox" , reason = f"Invalid data type { type (cube )!r} expected raster-cube."
1496- )
1467+ def filter_bbox (args : ProcessArgs , env : EvalEnv ) -> DriverDataCube :
1468+ cube : DriverDataCube = args .get_required ("data" , expected_type = DriverDataCube )
14971469 spatial_extent = _extract_bbox_extent (args , "extent" , process_id = "filter_bbox" )
14981470 return cube .filter_bbox (** spatial_extent )
14991471
15001472
15011473@process
1502- def filter_spatial (args : Dict , env : EvalEnv ) -> DriverDataCube :
1503- cube = extract_arg (args , 'data' )
1504- geometries = extract_arg (args , 'geometries' )
1505- if not isinstance (cube , DriverDataCube ):
1506- raise ProcessParameterInvalidException (
1507- parameter = "data" , process = "filter_spatial" , reason = f"Invalid data type { type (cube )!r} expected raster-cube."
1508- )
1474+ def filter_spatial (args : ProcessArgs , env : EvalEnv ) -> DriverDataCube :
1475+ cube : DriverDataCube = args .get_required ("data" , expected_type = DriverDataCube )
1476+ geometries = args .get_required ("geometries" )
15091477
15101478 if isinstance (geometries , dict ):
15111479 if "type" in geometries and geometries ["type" ] != "GeometryCollection" :
@@ -1534,32 +1502,22 @@ def filter_spatial(args: Dict, env: EvalEnv) -> DriverDataCube:
15341502
15351503
15361504@process
1537- def filter_bands (args : Dict , env : EvalEnv ) -> Union [DriverDataCube , DriverVectorCube ]:
1538- cube : Union [DriverDataCube , DriverVectorCube ] = extract_arg (args , "data" )
1539- if not isinstance (cube , DriverDataCube ) and not isinstance (cube , DriverVectorCube ):
1540- raise ProcessParameterInvalidException (
1541- parameter = "data" , process = "filter_bands" , reason = f"Invalid data type { type (cube )!r} expected raster-cube."
1542- )
1543- bands = extract_arg (args , "bands" , process_id = "filter_bands" )
1505+ def filter_bands (args : ProcessArgs , env : EvalEnv ) -> Union [DriverDataCube , DriverVectorCube ]:
1506+ cube : Union [DriverDataCube , DriverVectorCube ] = args .get_required (
1507+ "data" , expected_type = (DriverDataCube , DriverVectorCube )
1508+ )
1509+ bands = args .get_required ("bands" , expected_type = list )
15441510 return cube .filter_bands (bands = bands )
15451511
15461512
15471513@process
1548- def apply_kernel (args : Dict , env : EvalEnv ) -> DriverDataCube :
1549- image_collection = extract_arg (args , 'data' )
1550- kernel = np .asarray (extract_arg (args , 'kernel' ))
1551- factor = args .get ('factor' , 1.0 )
1552- border = args .get ('border' , 0 )
1553- if not isinstance (image_collection , DriverDataCube ):
1554- raise ProcessParameterInvalidException (
1555- parameter = "data" , process = "apply_kernel" ,
1556- reason = f"Invalid data type { type (image_collection )!r} expected raster-cube."
1557- )
1558- if border == "0" :
1559- # R-client sends `0` border as a string
1560- border = 0
1561- replace_invalid = args .get ('replace_invalid' , 0 )
1562- return image_collection .apply_kernel (kernel = kernel , factor = factor , border = border , replace_invalid = replace_invalid )
1514+ def apply_kernel (args : ProcessArgs , env : EvalEnv ) -> DriverDataCube :
1515+ cube : DriverDataCube = args .get_required ("data" , expected_type = DriverDataCube )
1516+ kernel = np .asarray (args .get_required ("kernel" , expected_type = list ))
1517+ factor = args .get_optional ("factor" , default = 1.0 , expected_type = (int , float ))
1518+ border = args .get_optional ("border" , default = 0 , expected_type = int )
1519+ replace_invalid = args .get_optional ("replace_invalid" , default = 0 , expected_type = (int , float ))
1520+ return cube .apply_kernel (kernel = kernel , factor = factor , border = border , replace_invalid = replace_invalid )
15631521
15641522
15651523@process
@@ -1698,20 +1656,17 @@ def run_udf(args: dict, env: EvalEnv):
16981656
16991657
17001658@process
1701- def linear_scale_range (args : dict , env : EvalEnv ) -> DriverDataCube :
1702- image_collection = extract_arg (args , 'x' )
1703-
1704- inputMin = extract_arg (args , "inputMin" )
1705- inputMax = extract_arg (args , "inputMax" )
1706- outputMax = args .get ("outputMax" , 1.0 )
1707- outputMin = args .get ("outputMin" , 0.0 )
1708- if not isinstance (image_collection , DriverDataCube ):
1709- raise ProcessParameterInvalidException (
1710- parameter = "data" , process = "linear_scale_range" ,
1711- reason = f"Invalid data type { type (image_collection )!r} expected raster-cube."
1712- )
1713-
1714- return image_collection .linear_scale_range (inputMin , inputMax , outputMin , outputMax )
1659+ def linear_scale_range (args : ProcessArgs , env : EvalEnv ) -> DriverDataCube :
1660+ # TODO: eliminate this top-level linear_scale_range process implementation (should be used as `apply` callback)
1661+ _log .warning ("DEPRECATED: linear_scale_range usage directly on cube is deprecated/non-standard." )
1662+ cube : DriverDataCube = args .get_required ("x" , expected_type = DriverDataCube )
1663+ # Note: non-standard camelCase parameter names (https://github.com/Open-EO/openeo-processes/issues/302)
1664+ input_min = args .get_required ("inputMin" )
1665+ input_max = args .get_required ("inputMax" )
1666+ output_min = args .get_optional ("outputMin" , default = 0.0 )
1667+ output_max = args .get_optional ("outputMax" , default = 1.0 )
1668+ # TODO linear_scale_range is defined on GeopysparkDataCube, but not on DriverDataCube
1669+ return cube .linear_scale_range (input_min , input_max , output_min , output_max )
17151670
17161671
17171672@process
@@ -1991,14 +1946,10 @@ def get_geometries(args: Dict, env: EvalEnv) -> Union[DelayedVector, dict]:
19911946 .param ('data' , description = "A raster data cube." , schema = {"type" : "object" , "subtype" : "raster-cube" })
19921947 .returns ("vector-cube" , schema = {"type" : "object" , "subtype" : "vector-cube" })
19931948)
1994- def raster_to_vector (args : Dict , env : EvalEnv ):
1995- image_collection = extract_arg (args , 'data' )
1996- if not isinstance (image_collection , DriverDataCube ):
1997- raise ProcessParameterInvalidException (
1998- parameter = "data" , process = "raster_to_vector" ,
1999- reason = f"Invalid data type { type (image_collection )!r} expected raster-cube."
2000- )
2001- return image_collection .raster_to_vector ()
1949+ def raster_to_vector (args : ProcessArgs , env : EvalEnv ):
1950+ cube : DriverDataCube = args .get_required ("data" , expected_type = DriverDataCube )
1951+ # TODO: raster_to_vector is only defined on GeopysparkDataCube, not DriverDataCube
1952+ return cube .raster_to_vector ()
20021953
20031954
20041955@non_standard_process (
@@ -2238,13 +2189,8 @@ def discard_result(args: ProcessArgs, env: EvalEnv):
22382189
22392190@process_registry_100 .add_function (spec = read_spec ("openeo-processes/experimental/mask_scl_dilation.json" ))
22402191@process_registry_2xx .add_function (spec = read_spec ("openeo-processes/experimental/mask_scl_dilation.json" ))
2241- def mask_scl_dilation (args : Dict , env : EvalEnv ):
2242- cube : DriverDataCube = extract_arg (args , 'data' )
2243- if not isinstance (cube , DriverDataCube ):
2244- raise ProcessParameterInvalidException (
2245- parameter = "data" , process = "mask_scl_dilation" ,
2246- reason = f"Invalid data type { type (cube )!r} expected raster-cube."
2247- )
2192+ def mask_scl_dilation (args : ProcessArgs , env : EvalEnv ):
2193+ cube : DriverDataCube = args .get_required ("data" , expected_type = DriverDataCube )
22482194 if hasattr (cube , "mask_scl_dilation" ):
22492195 the_args = args .copy ()
22502196 del the_args ["data" ]
@@ -2275,13 +2221,8 @@ def to_scl_dilation_mask(args: ProcessArgs, env: EvalEnv):
22752221
22762222@process_registry_100 .add_function (spec = read_spec ("openeo-processes/experimental/mask_l1c.json" ))
22772223@process_registry_2xx .add_function (spec = read_spec ("openeo-processes/experimental/mask_l1c.json" ))
2278- def mask_l1c (args : Dict , env : EvalEnv ):
2279- cube : DriverDataCube = extract_arg (args , 'data' )
2280- if not isinstance (cube , DriverDataCube ):
2281- raise ProcessParameterInvalidException (
2282- parameter = "data" , process = "mask_l1c" ,
2283- reason = f"Invalid data type { type (cube )!r} expected raster-cube."
2284- )
2224+ def mask_l1c (args : ProcessArgs , env : EvalEnv ):
2225+ cube : DriverDataCube = args .get_required ("data" , expected_type = DriverDataCube )
22852226 if hasattr (cube , "mask_l1c" ):
22862227 return cube .mask_l1c ()
22872228 else :
@@ -2376,10 +2317,11 @@ def load_result(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
23762317
23772318@process_registry_100 .add_function (spec = read_spec ("openeo-processes/1.x/proposals/inspect.json" ))
23782319@process_registry_2xx .add_function (spec = read_spec ("openeo-processes/2.x/proposals/inspect.json" ))
2379- def inspect (args : dict , env : EvalEnv ):
2380- data = extract_arg (args , "data" )
2381- message = args .get ("message" , "" )
2382- level = args .get ("level" , "info" )
2320+ def inspect (args : ProcessArgs , env : EvalEnv ):
2321+ data = args .get_required ("data" )
2322+ message = args .get_optional ("message" , default = "" )
2323+ code = args .get_optional ("code" , default = "User" )
2324+ level = args .get_optional ("level" , default = "info" )
23832325 if message :
23842326 _log .log (level = logging .getLevelName (level .upper ()), msg = message )
23852327 data_message = str (data )
0 commit comments