From 9e58c05289b94795461f0c6667441203b0ced567 Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Tue, 7 Apr 2026 17:11:12 -0400 Subject: [PATCH 01/10] feat: support defining schemas as dicts This provides an alternative way to define schemas, which IMO looks a bit nicer than the class based approach. One big benefit of this method is that you can use dashes in the keys, so it's possible to use both underscores and dashes as identifiers and it's clear to the user which is which. --- src/taskgraph/util/schema.py | 79 ++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/src/taskgraph/util/schema.py b/src/taskgraph/util/schema.py index ad9a93e5a..45bfcf4ae 100644 --- a/src/taskgraph/util/schema.py +++ b/src/taskgraph/util/schema.py @@ -2,6 +2,7 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +import inspect import pprint import re import threading @@ -318,6 +319,11 @@ def __getitem__(self, item): return self.schema[item] # type: ignore +def _caller_module_name(depth=1): + frame = inspect.stack()[depth + 1].frame + return frame.f_globals.get("__name__", "schema") + + class Schema( msgspec.Struct, kw_only=True, @@ -345,6 +351,11 @@ class MySchema(Schema, forbid_unknown_fields=False, kw_only=True): foo: str """ + def __init_subclass__(cls, exclusive=None, **kwargs): + super().__init_subclass__(**kwargs) + if exclusive is not None: + cls.exclusive = exclusive + def __post_init__(self): if taskgraph.fast: return @@ -370,6 +381,74 @@ def __post_init__(self): keyed_by.validate(obj) + # Validate mutually exclusive field groups. + for group in getattr(self, "exclusive", []): + set_fields = [f for f in group if getattr(self, f) is not None] + if len(set_fields) > 1: + raise ValueError( + f"{' and '.join(repr(f) for f in set_fields)} are mutually exclusive" + ) + + @classmethod + def from_dict( + cls, + fields_dict: dict[str, Any], + name: Optional[str] = None, + optional: bool = False, + **kwargs, + ) -> Union[type[msgspec.Struct], type[Optional[msgspec.Struct]]]: + """Create a Schema subclass dynamically from a dict of field definitions. + + Each key is a field name and each value is either a type annotation or a + ``(type, default)`` tuple. Fields typed as ``Optional[...]`` automatically + receive a default of ``None`` when no explicit default is provided. + + Usage:: + + Schema.from_dict("MySchema", { + "required_field": str, + "optional_field": Optional[int], # default None inferred + "explicit_default": (list[str], []), # explicit default + }) + + Keyword arguments are forwarded to ``msgspec.defstruct`` (e.g. + ``forbid_unknown_fields=False``). + """ + # Don't use `rename=kebab` by default as we can define kebab case + # properly in dicts. + kwargs.setdefault("rename", None) + + # Ensure name and module are set correctly for error messages. + caller_module = _caller_module_name() + kwargs.setdefault("module", caller_module) + name = name or caller_module.rsplit(".", 1)[-1] + + fields = [] + for field_name, field_spec in fields_dict.items(): + python_name = field_name.replace("-", "_") + + if isinstance(field_spec, tuple): + typ, default = field_spec + else: + typ = field_spec + if get_origin(typ) is Union and type(None) in get_args(typ): + default = None + else: + default = msgspec.NODEFAULT + + if field_name != python_name: + # Use msgspec.field to preserve the kebab-case encoded name. + # Explicit field names take priority over the struct-level rename. + fields.append((python_name, typ, msgspec.field(name=field_name, default=default))) + else: + fields.append((python_name, typ, default)) + + exclusive = kwargs.pop("exclusive", None) + result = msgspec.defstruct(name, fields, bases=(cls,), **kwargs) + if exclusive: + result.exclusive = exclusive + return Optional[result] if optional else result # type: ignore[valid-type] + @classmethod def validate(cls, data): """Validate data against this schema.""" From 57ecbbc28d5174072425bedd0a1fbfcb21fc680d Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Tue, 7 Apr 2026 17:11:12 -0400 Subject: [PATCH 02/10] refactor: use Schema.from_dict in from_deps transforms --- src/taskgraph/transforms/from_deps.py | 86 +++++++++++++-------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/src/taskgraph/transforms/from_deps.py b/src/taskgraph/transforms/from_deps.py index 101579394..70bda2730 100644 --- a/src/taskgraph/transforms/from_deps.py +++ b/src/taskgraph/transforms/from_deps.py @@ -22,46 +22,46 @@ from taskgraph.util.schema import Schema, validate_schema from taskgraph.util.set_name import SET_NAME_MAP - -class FromDepsConfig(Schema): - # Limit dependencies to specified kinds (defaults to all kinds in - # `kind-dependencies`). - # - # The first kind in the list is the "primary" kind. The - # dependency of this kind will be used to derive the label - # and copy attributes (if `copy-attributes` is True). - kinds: Optional[list[str]] = None - # Set-name function (dynamic: validated at runtime against SET_NAME_MAP). - set_name: Optional[Union[bool, str, dict[str, object]]] = None - # Limit dependencies to tasks whose attributes match - # using :func:`~taskgraph.util.attributes.attrmatch`. - with_attributes: Optional[dict[str, Union[list, str]]] = None - # Group cross-kind dependencies using the given group-by - # function. One task will be created for each group. If not - # specified, the 'single' function will be used which creates - # a new task for each individual dependency. - group_by: Optional[Union[str, dict[str, object]]] = None - # If True, copy attributes from the dependency matching the - # first kind in the `kinds` list (whether specified explicitly - # or taken from `kind-dependencies`). - copy_attributes: Optional[bool] = None - # If true (the default), there must be only a single unique task - # for each kind in a dependency group. Setting this to false - # disables that requirement. - unique_kinds: Optional[bool] = None - # If present, a `fetches` entry will be added for each task - # dependency. Attributes of the upstream task may be used as - # substitution values in the `artifact` or `dest` values of the - # `fetches` entry. - fetches: Optional[dict[str, list[FetchesEntrySchema]]] = None - - -#: Schema for from_deps transforms -class FromDepsSchema(Schema, forbid_unknown_fields=False, kw_only=True): - from_deps: FromDepsConfig - - -FROM_DEPS_SCHEMA = FromDepsSchema +FROM_DEPS_SCHEMA = Schema.from_dict( + { + "from-deps": Schema.from_dict( + { + # Limit dependencies to specified kinds (defaults to all kinds in + # `kind-dependencies`). + # + # The first kind in the list is the "primary" kind. The + # dependency of this kind will be used to derive the label + # and copy attributes (if `copy-attributes` is True). + "kinds": Optional[list[str]], + # Set-name function (dynamic: validated at runtime against SET_NAME_MAP). + "set-name": Optional[Union[bool, str, dict[str, object]]], + # Limit dependencies to tasks whose attributes match + # using :func:`~taskgraph.util.attributes.attrmatch`. + "with-attributes": Optional[dict[str, Union[list, str]]], + # Group cross-kind dependencies using the given group-by + # function. One task will be created for each group. If not + # specified, the 'single' function will be used which creates + # a new task for each individual dependency. + "group-by": Optional[Union[str, dict[str, object]]], + # If True, copy attributes from the dependency matching the + # first kind in the `kinds` list (whether specified explicitly + # or taken from `kind-dependencies`). + "copy-attributes": Optional[bool], + # If true (the default), there must be only a single unique task + # for each kind in a dependency group. Setting this to false + # disables that requirement. + "unique-kinds": Optional[bool], + # If present, a `fetches` entry will be added for each task + # dependency. Attributes of the upstream task may be used as + # substitution values in the `artifact` or `dest` values of the + # `fetches` entry. + "fetches": Optional[dict[str, list[FetchesEntrySchema]]], + }, + ), + }, + name="FromDepsSchema", + forbid_unknown_fields=False, +) transforms = TransformSequence() transforms.add_validate(FROM_DEPS_SCHEMA) @@ -151,9 +151,9 @@ def from_deps(config, tasks): else: raise Exception("Could not detect primary kind!") - new_task.setdefault("attributes", {})["primary-kind-dependency"] = ( - primary_kind - ) + new_task.setdefault("attributes", {})[ + "primary-kind-dependency" + ] = primary_kind primary_dep = [dep for dep in group if dep.kind == primary_kind][0] new_task["attributes"]["primary-dependency-label"] = primary_dep.label From 6eddcdc59600cceaa63bf01131ec52a4fb57444d Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Wed, 8 Apr 2026 09:27:11 -0400 Subject: [PATCH 03/10] refactor: use Schema.from_dict in chunking transforms --- src/taskgraph/transforms/chunking.py | 37 ++++++++++++++-------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/taskgraph/transforms/chunking.py b/src/taskgraph/transforms/chunking.py index 6547708bd..ba596d7d7 100644 --- a/src/taskgraph/transforms/chunking.py +++ b/src/taskgraph/transforms/chunking.py @@ -2,30 +2,29 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import copy -from typing import Optional from taskgraph.transforms.base import TransformSequence from taskgraph.util.schema import Schema from taskgraph.util.templates import substitute - -class ChunkConfig(Schema): - # The total number of chunks to split the task into. - total_chunks: int - # A list of fields that need to have `{this_chunk}` and/or - # `{total_chunks}` replaced in them. - substitution_fields: list[str] = [] - - -#: Schema for chunking transforms -class ChunkSchema(Schema, forbid_unknown_fields=False, kw_only=True): - # `chunk` can be used to split one task into `total-chunks` - # tasks, substituting `this_chunk` and `total_chunks` into any - # fields in `substitution-fields`. - chunk: Optional[ChunkConfig] = None - - -CHUNK_SCHEMA = ChunkSchema +CHUNK_SCHEMA = Schema.from_dict( + { + # `chunk` can be used to split one task into `total-chunks` + # tasks, substituting `this_chunk` and `total_chunks` into any + # fields in `substitution-fields`. + "chunk": Schema.from_dict( + { + # The total number of chunks to split the task into. + "total-chunks": int, + # A list of fields that need to have `{this_chunk}` and/or + # `{total_chunks}` replaced in them. + "substitution-fields": (list[str], []), + }, + optional=True, + ), + }, + forbid_unknown_fields=False, +) transforms = TransformSequence() transforms.add_validate(CHUNK_SCHEMA) From a4397d0241238ac191f1ec2bc2a3160be26acede Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Wed, 8 Apr 2026 09:49:02 -0400 Subject: [PATCH 04/10] refactor: use Schema.from_dict in docker_image transforms --- src/taskgraph/transforms/docker_image.py | 50 ++++++++++++------------ 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/src/taskgraph/transforms/docker_image.py b/src/taskgraph/transforms/docker_image.py index 643be20e5..b1d94afe0 100644 --- a/src/taskgraph/transforms/docker_image.py +++ b/src/taskgraph/transforms/docker_image.py @@ -5,7 +5,6 @@ import logging import os import re -from typing import Optional import taskgraph from taskgraph.transforms.base import TransformSequence @@ -27,31 +26,32 @@ transforms = TransformSequence() - #: Schema for docker_image transforms -class DockerImageSchema(Schema): - # Name of the docker image. - name: str - # Name of the parent docker image. - parent: Optional[str] = None - # Treeherder symbol. - symbol: Optional[str] = None - # Relative path (from config.path) to the file the docker image was defined in. - task_from: Optional[str] = None - # Arguments to use for the Dockerfile. - args: Optional[dict[str, str]] = None - # Name of the docker image definition under taskcluster/docker, when - # different from the docker image name. - definition: Optional[str] = None - # List of package tasks this docker image depends on. - packages: Optional[list[str]] = None - # Information for indexing this build so its artifacts can be discovered. - index: Optional[IndexSchema] = None - # Whether this image should be cached based on inputs. - cache: Optional[bool] = None - - -docker_image_schema = DockerImageSchema +DOCKER_IMAGE_SCHEMA = Schema.from_dict( + { + # Name of the docker image. + "name": str, + # Name of the parent docker image. + "parent": (str, None), + # Treeherder symbol. + "symbol": (str, None), + # Relative path (from config.path) to the file the docker image was defined in. + "task-from": (str, None), + # Arguments to use for the Dockerfile. + "args": (dict[str, str], None), + # Name of the docker image definition under taskcluster/docker, when + # different from the docker image name. + "definition": (str, None), + # List of package tasks this docker image depends on. + "packages": (list[str], None), + # Information for indexing this build so its artifacts can be discovered. + "index": (IndexSchema, None), + # Whether this image should be cached based on inputs. + "cache": (bool, None), + }, +) + +docker_image_schema = DOCKER_IMAGE_SCHEMA transforms.add_validate(docker_image_schema) From 6b660a704eeb61344bc749f20ecd26580b0d844a Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Wed, 8 Apr 2026 09:49:02 -0400 Subject: [PATCH 05/10] refactor: use Schema.from_dict in matrix transforms --- src/taskgraph/transforms/matrix.py | 63 +++++++++++++++--------------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/src/taskgraph/transforms/matrix.py b/src/taskgraph/transforms/matrix.py index b3ac53f16..2a591f2e9 100644 --- a/src/taskgraph/transforms/matrix.py +++ b/src/taskgraph/transforms/matrix.py @@ -8,42 +8,43 @@ """ from copy import deepcopy -from typing import Optional from taskgraph.transforms.base import TransformSequence from taskgraph.util.schema import Schema from taskgraph.util.templates import substitute_task_fields - -class MatrixConfig(Schema, forbid_unknown_fields=False, kw_only=True): - # Exclude the specified combination(s) of matrix values from the - # final list of tasks. - # - # If only a subset of the possible rows are present in the - # exclusion rule, then *all* combinations including that subset - # subset will be excluded. - exclude: Optional[list[dict[str, str]]] = None - # Sets the task name to the specified format string. - # - # Useful for cases where the default of joining matrix values by - # a dash is not desired. - set_name: Optional[str] = None - # List of fields in the task definition to substitute matrix values into. - # - # If not specified, all fields in the task definition will be - # substituted. - substitution_fields: Optional[list[str]] = None - # Extra dimension keys (e.g. "platform": ["linux", "win"]) allowed - # via forbid_unknown_fields=False - - -#: Schema for matrix transforms -class MatrixSchema(Schema, forbid_unknown_fields=False, kw_only=True): - name: str - matrix: Optional[MatrixConfig] = None - - -MATRIX_SCHEMA = MatrixSchema +MATRIX_SCHEMA = Schema.from_dict( + { + "name": str, + # `matrix` holds the configuration for splitting tasks. + "matrix": Schema.from_dict( + { + # Exclude the specified combination(s) of matrix values from the + # final list of tasks. + # + # If only a subset of the possible rows are present in the + # exclusion rule, then *all* combinations including that subset + # subset will be excluded. + "exclude": (list[dict[str, str]], None), + # Sets the task name to the specified format string. + # + # Useful for cases where the default of joining matrix values by + # a dash is not desired. + "set-name": (str, None), + # List of fields in the task definition to substitute matrix values into. + # + # If not specified, all fields in the task definition will be + # substituted. + "substitution-fields": (list[str], None), + # Extra dimension keys (e.g. "platform": ["linux", "win"]) allowed + # via forbid_unknown_fields=False + }, + optional=True, + forbid_unknown_fields=False, + ), + }, + forbid_unknown_fields=False, +) transforms = TransformSequence() transforms.add_validate(MATRIX_SCHEMA) From a372c075c3d7d427bd1266b64e914d73c03f7e4c Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Wed, 8 Apr 2026 09:49:02 -0400 Subject: [PATCH 06/10] refactor: use Schema.from_dict in task_context transforms --- src/taskgraph/transforms/task_context.py | 92 ++++++++++++------------ 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/src/taskgraph/transforms/task_context.py b/src/taskgraph/transforms/task_context.py index 5911bd52c..aff763f5f 100644 --- a/src/taskgraph/transforms/task_context.py +++ b/src/taskgraph/transforms/task_context.py @@ -1,55 +1,55 @@ -from typing import Optional, Union +from typing import Union from taskgraph.transforms.base import TransformSequence from taskgraph.util.schema import Schema from taskgraph.util.templates import deep_get, substitute_task_fields from taskgraph.util.yaml import load_yaml - -class TaskContextConfig(Schema): - # A list of fields in the task to substitute the provided values - # into. - substitution_fields: list[str] - # Retrieve task context values from parameters. A single - # parameter may be provided or a list of parameters in - # priority order. The latter can be useful in implementing a - # "default" value if some other parameter is not provided. - from_parameters: Optional[dict[str, Union[list[str], str]]] = None - # Retrieve task context values from a yaml file. The provided - # file should usually only contain top level keys and values - # (eg: nested objects will not be interpolated - they will be - # substituted as text representations of the object). - from_file: Optional[str] = None - # Key/value pairs to be used as task context - from_object: Optional[object] = None - - -#: Schema for the task_context transforms -class TaskContextSchema(Schema, forbid_unknown_fields=False, kw_only=True): - name: Optional[str] = None - # `task-context` can be used to substitute values into any field in a - # task with data that is not known until `taskgraph` runs. - # - # This data can be provided via `from-parameters` or `from-file`, - # which can pull in values from parameters and a defined yml file - # respectively. - # - # Data may also be provided directly in the `from-object` section of - # `task-context`. This can be useful in `kinds` that define most of - # their contents in `task-defaults`, but have some values that may - # differ for various concrete `tasks` in the `kind`. - # - # If the same key is found in multiple places the order of precedence - # is as follows: - # - Parameters - # - `from-object` keys - # - File - # - # That is to say: parameters will always override anything else. - task_context: Optional[TaskContextConfig] = None - - -SCHEMA = TaskContextSchema +SCHEMA = Schema.from_dict( + { + "name": (str, None), + # `task-context` can be used to substitute values into any field in a + # task with data that is not known until `taskgraph` runs. + # + # This data can be provided via `from-parameters` or `from-file`, + # which can pull in values from parameters and a defined yml file + # respectively. + # + # Data may also be provided directly in the `from-object` section of + # `task-context`. This can be useful in `kinds` that define most of + # their contents in `task-defaults`, but have some values that may + # differ for various concrete `tasks` in the `kind`. + # + # If the same key is found in multiple places the order of precedence + # is as follows: + # - Parameters + # - `from-object` keys + # - File + # + # That is to say: parameters will always override anything else. + "task-context": Schema.from_dict( + { + # A list of fields in the task to substitute the provided values + # into. + "substitution-fields": list[str], + # Retrieve task context values from parameters. A single + # parameter may be provided or a list of parameters in + # priority order. The latter can be useful in implementing a + # "default" value if some other parameter is not provided. + "from-parameters": (dict[str, Union[list[str], str]], None), + # Retrieve task context values from a yaml file. The provided + # file should usually only contain top level keys and values + # (eg: nested objects will not be interpolated - they will be + # substituted as text representations of the object). + "from-file": (str, None), + # Key/value pairs to be used as task context + "from-object": (object, None), + }, + optional=True, + ), + }, + forbid_unknown_fields=False, +) transforms = TransformSequence() transforms.add_validate(SCHEMA) From 57f632cda1ea53581083af384e70c25c2265fe39 Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Wed, 8 Apr 2026 10:00:17 -0400 Subject: [PATCH 07/10] refactor: use Schema.from_deps in notify transforms --- src/taskgraph/transforms/notify.py | 179 ++++++++++++++++------------- 1 file changed, 102 insertions(+), 77 deletions(-) diff --git a/src/taskgraph/transforms/notify.py b/src/taskgraph/transforms/notify.py index 9f73d8d04..b986212eb 100644 --- a/src/taskgraph/transforms/notify.py +++ b/src/taskgraph/transforms/notify.py @@ -23,33 +23,86 @@ "on-running", ] - -class EmailRecipient(Schema, tag_field="type", tag="email", kw_only=True): - address: optionally_keyed_by("project", "level", str, use_msgspec=True) # type: ignore - status_type: Optional[StatusType] = None - - -class MatrixRoomRecipient(Schema, tag_field="type", tag="matrix-room", kw_only=True): - room_id: str - status_type: Optional[StatusType] = None - - -class PulseRecipient(Schema, tag_field="type", tag="pulse", kw_only=True): - routing_key: str - status_type: Optional[StatusType] = None - - -class SlackChannelRecipient( - Schema, tag_field="type", tag="slack-channel", kw_only=True -): - channel_id: str - status_type: Optional[StatusType] = None - - Recipient = Union[ - EmailRecipient, MatrixRoomRecipient, PulseRecipient, SlackChannelRecipient + Schema.from_dict( + { + "address": optionally_keyed_by("project", "level", str, use_msgspec=True), + "status-type": Optional[StatusType], + }, + name="EmailRecipient", + tag_field="type", + tag="email", + ), # type: ignore [invalid-type-form] + Schema.from_dict( + { + "room-id": str, + "status-type": Optional[StatusType], + }, + name="MatrixRoomRecipient", + tag_field="type", + tag="matrix-room", + ), # type: ignore [invalid-type-form] + Schema.from_dict( + { + "routing-key": str, + "status-type": Optional[StatusType], + }, + name="PulseRecipient", + tag_field="type", + tag="pulse", + ), # type: ignore [invalid-type-form] + Schema.from_dict( + { + "channel-id": str, + "status-type": Optional[StatusType], + }, + name="SlackChannelRecipient", + tag_field="type", + tag="slack-channel", + ), # type: ignore [invalid-type-form] ] +Content = Schema.from_dict( + { + "email": Schema.from_dict( + { + "subject": Optional[str], + "content": Optional[str], + "link": Schema.from_dict( + { + "text": str, + "href": str, + }, + optional=True, + ), + }, + name="EmailContent", + optional=True, + ), + "matrix": Schema.from_dict( + { + "body": Optional[str], + "formatted-body": Optional[str], + "format": Optional[str], + "msg-type": Optional[str], + }, + name="MatrixContent", + optional=True, + ), + "slack": Schema.from_dict( + { + "text": Optional[str], + "blocks": Optional[list], + "attachments": Optional[list], + }, + name="SlackContent", + optional=True, + ), + }, + optional=True, + name="NotifyContentConfig", +) + _route_keys = { "email": "address", @@ -60,60 +113,32 @@ class SlackChannelRecipient( """Map each type to its primary key that will be used in the route.""" -class EmailLinkContent(Schema): - text: str - href: str - - -class EmailContent(Schema): - subject: Optional[str] = None - content: Optional[str] = None - link: Optional[EmailLinkContent] = None - - -class MatrixContent(Schema): - body: Optional[str] = None - formatted_body: Optional[str] = None - format: Optional[str] = None - msg_type: Optional[str] = None - - -class SlackContent(Schema): - text: Optional[str] = None - blocks: Optional[list] = None - attachments: Optional[list] = None - - -class NotifyContentConfig(Schema): - email: Optional[EmailContent] = None - matrix: Optional[MatrixContent] = None - slack: Optional[SlackContent] = None - - -class NotifyConfig(Schema): - recipients: list[Recipient] - content: Optional[NotifyContentConfig] = None - - -class LegacyNotificationsConfig(Schema): - # Continue supporting the legacy schema for backwards compat. - emails: optionally_keyed_by("project", "level", list[str], use_msgspec=True) # type: ignore - subject: str - message: Optional[str] = None - status_types: Optional[list[StatusType]] = None - - #: Schema for notify transforms -class NotifySchema(Schema, forbid_unknown_fields=False, kw_only=True): - notify: Optional[NotifyConfig] = None - notifications: Optional[LegacyNotificationsConfig] = None - - def __post_init__(self): - if self.notify is not None and self.notifications is not None: - raise ValueError("'notify' and 'notifications' are mutually exclusive") - - -NOTIFY_SCHEMA = NotifySchema +NOTIFY_SCHEMA = Schema.from_dict( + { + "notify": Schema.from_dict( + { + "recipients": list[Recipient], + "content": Content, + }, + optional=True, + ), + # Continue supporting the legacy schema for backwards compat. + "notifications": Schema.from_dict( + { + "emails": optionally_keyed_by( + "project", "level", list[str], use_msgspec=True + ), + "subject": str, + "message": Optional[str], + "status-types": Optional[list[StatusType]], + }, + optional=True, + ), + }, + exclusive=[("notify", "notifications")], + forbid_unknown_fields=False, +) transforms = TransformSequence() transforms.add_validate(NOTIFY_SCHEMA) From 20ebd5232dabf2563a856f3be8b27fcebf21bf9e Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Wed, 8 Apr 2026 09:49:02 -0400 Subject: [PATCH 08/10] refactor: use Schema.from_dict in fetch transforms --- src/taskgraph/transforms/fetch.py | 164 ++++++++++++++++-------------- 1 file changed, 88 insertions(+), 76 deletions(-) diff --git a/src/taskgraph/transforms/fetch.py b/src/taskgraph/transforms/fetch.py index 2258cb85a..e3508542f 100644 --- a/src/taskgraph/transforms/fetch.py +++ b/src/taskgraph/transforms/fetch.py @@ -21,36 +21,38 @@ CACHE_TYPE = "content.v1" - -class FetchSubSchema(Schema, forbid_unknown_fields=False, kw_only=True): - # The fetch type - type: str - +_FETCH_SUB_SCHEMA = Schema.from_dict( + { + # The fetch type + "type": str, + }, + forbid_unknown_fields=False, +) #: Schema for fetch transforms -class FetchSchema(Schema): - # Name of the task. - name: str - # Description of the task. - description: str - # The fetch configuration - fetch: FetchSubSchema - # Relative path (from config.path) to the file the task was defined - # in. - task_from: Optional[str] = None - expires_after: Optional[str] = None - docker_image: Optional[object] = None - # An alias that can be used instead of the real fetch task name in - # fetch stanzas for tasks. - fetch_alias: Optional[str] = None - # The prefix of the taskcluster artifact being uploaded. - # Defaults to `public/`; if it starts with something other than - # `public/` the artifact will require scopes to access. - artifact_prefix: Optional[str] = None - attributes: Optional[dict[str, object]] = None - - -FETCH_SCHEMA = FetchSchema +FETCH_SCHEMA = Schema.from_dict( + { + # Name of the task. + "name": str, + # Description of the task. + "description": str, + # The fetch configuration + "fetch": _FETCH_SUB_SCHEMA, + # Relative path (from config.path) to the file the task was defined + # in. + "task-from": (str, None), + "expires-after": (str, None), + "docker-image": (object, None), + # An alias that can be used instead of the real fetch task name in + # fetch stanzas for tasks. + "fetch-alias": (str, None), + # The prefix of the taskcluster artifact being uploaded. + # Defaults to `public/`; if it starts with something other than + # `public/` the artifact will require scopes to access. + "artifact-prefix": (str, None), + "attributes": (dict[str, object], None), + }, +) # define a collection of payload builders, depending on the worker implementation fetch_builders = {} @@ -173,42 +175,48 @@ def make_task(config, tasks): yield task_desc -class GpgSignatureConfig(Schema): - # URL where GPG signature document can be obtained. Can contain the - # value ``{url}``, which will be substituted with the value from - # ``url``. - sig_url: str - # Path to file containing GPG public key(s) used to validate - # download. - key_path: str - - -class StaticUrlFetchSchema(Schema, forbid_unknown_fields=False, kw_only=True): - type: Literal["static-url"] - # The URL to download. - url: str - # The SHA-256 of the downloaded content. - sha256: str - # Size of the downloaded entity, in bytes. - size: int - # GPG signature verification. - gpg_signature: Optional[GpgSignatureConfig] = None - # The name to give to the generated artifact. Defaults to the file - # portion of the URL. Using a different extension converts the - # archive to the given type. Only conversion to .tar.zst is - # supported. - artifact_name: Optional[str] = None - # Strip the given number of path components at the beginning of - # each file entry in the archive. - # Requires an artifact-name ending with .tar.zst. - strip_components: Optional[int] = None - # Add the given prefix to each file entry in the archive. - # Requires an artifact-name ending with .tar.zst. - add_prefix: Optional[str] = None - # Headers to pass alongside the request. - headers: Optional[dict[str, str]] = None - # IMPORTANT: when adding anything that changes the behavior of the task, - # it is important to update the digest data used to compute cache hits. +_GPG_SIGNATURE_SCHEMA = Schema.from_dict( + { + # URL where GPG signature document can be obtained. Can contain the + # value ``{url}``, which will be substituted with the value from + # ``url``. + "sig-url": str, + # Path to file containing GPG public key(s) used to validate + # download. + "key-path": str, + }, +) + +StaticUrlFetchSchema = Schema.from_dict( + { + "type": Literal["static-url"], + # The URL to download. + "url": str, + # The SHA-256 of the downloaded content. + "sha256": str, + # Size of the downloaded entity, in bytes. + "size": int, + # GPG signature verification. + "gpg-signature": (_GPG_SIGNATURE_SCHEMA, None), + # The name to give to the generated artifact. Defaults to the file + # portion of the URL. Using a different extension converts the + # archive to the given type. Only conversion to .tar.zst is + # supported. + "artifact-name": (str, None), + # Strip the given number of path components at the beginning of + # each file entry in the archive. + # Requires an artifact-name ending with .tar.zst. + "strip-components": (int, None), + # Add the given prefix to each file entry in the archive. + # Requires an artifact-name ending with .tar.zst. + "add-prefix": (str, None), + # Headers to pass alongside the request. + "headers": (dict[str, str], None), + # IMPORTANT: when adding anything that changes the behavior of the task, + # it is important to update the digest data used to compute cache hits. + }, + forbid_unknown_fields=False, +) @fetch_builder("static-url", schema=StaticUrlFetchSchema) @@ -274,18 +282,22 @@ def create_fetch_url_task(config, name, fetch): } -class GitFetchSchema(Schema, forbid_unknown_fields=False, kw_only=True): - type: Literal["git"] - repo: str - revision: str - include_dot_git: Optional[bool] = None - artifact_name: Optional[str] = None - path_prefix: Optional[str] = None - # ssh-key is a taskcluster secret path (e.g. project/civet/github-deploy-key) - # In the secret dictionary, the key should be specified as - # "ssh_privkey": "-----BEGIN OPENSSH PRIVATE KEY-----\nkfksnb3jc..." - # n.b. The OpenSSH private key file format requires a newline at the end of the file. - ssh_key: Optional[str] = None +GitFetchSchema = Schema.from_dict( + { + "type": Literal["git"], + "repo": str, + "revision": str, + "include-dot-git": (bool, None), + "artifact-name": (str, None), + "path-prefix": (str, None), + # ssh-key is a taskcluster secret path (e.g. project/civet/github-deploy-key) + # In the secret dictionary, the key should be specified as + # "ssh_privkey": "-----BEGIN OPENSSH PRIVATE KEY-----\nkfksnb3jc..." + # n.b. The OpenSSH private key file format requires a newline at the end of the file. + "ssh-key": (str, None), + }, + forbid_unknown_fields=False, +) @fetch_builder("git", schema=GitFetchSchema) From 8cbe49166a431c1a2de6fa799f662f143c19bac1 Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Wed, 8 Apr 2026 09:49:02 -0400 Subject: [PATCH 09/10] refactor: use Schema.from_dict in run transforms --- src/taskgraph/transforms/run/__init__.py | 51 ++++++++++------ src/taskgraph/transforms/run/index_search.py | 17 +++--- src/taskgraph/transforms/run/run_task.py | 62 ++++++++++---------- src/taskgraph/transforms/run/toolchain.py | 48 +++++++-------- 4 files changed, 98 insertions(+), 80 deletions(-) diff --git a/src/taskgraph/transforms/run/__init__.py b/src/taskgraph/transforms/run/__init__.py index e4ad08716..3322ee62b 100644 --- a/src/taskgraph/transforms/run/__init__.py +++ b/src/taskgraph/transforms/run/__init__.py @@ -33,25 +33,36 @@ # Fetches may be accepted in other transforms and eventually passed along # to a `task` (eg: from_deps). Defining this here allows them to reuse # the schema and avoid duplication. -class FetchesEntrySchema(Schema): - artifact: str - dest: Optional[str] = None - extract: Optional[bool] = None - verify_hash: Optional[bool] = None - - -class WhenConfig(Schema): - # This task only needs to be run if a file matching one of the given - # patterns has changed in the push. The patterns use the mozpack - # match function (python/mozbuild/mozpack/path.py). - files_changed: Optional[list[str]] = None +FetchesEntrySchema = Schema.from_dict( + { + "artifact": str, + "dest": Optional[str], + "extract": Optional[bool], + "verify-hash": Optional[bool], + }, + name="FetchesEntrySchema", +) +WhenConfig = Schema.from_dict( + { + # This task only needs to be run if a file matching one of the given + # patterns has changed in the push. The patterns use the mozpack + # match function (python/mozbuild/mozpack/path.py). + "files-changed": Optional[list[str]], + }, + name="WhenConfig", +) -class RunConfig(Schema, forbid_unknown_fields=False, kw_only=True): - # The key to a run implementation in a peer module to this one. - using: str - # Base work directory used to set up the task. - workdir: Optional[str] = None +RunConfig = Schema.from_dict( + { + # The key to a run implementation in a peer module to this one. + "using": str, + # Base work directory used to set up the task. + "workdir": Optional[str], + }, + name="RunConfig", + forbid_unknown_fields=False, +) #: Schema for a run transforms @@ -405,8 +416,10 @@ def wrap(func): return wrap -class AlwaysOptimizedRunSchema(Schema): - using: Literal["always-optimized"] +AlwaysOptimizedRunSchema = Schema.from_dict( + {"using": Literal["always-optimized"]}, + name="AlwaysOptimizedRunSchema", +) @run_task_using("always-optimized", "always-optimized", AlwaysOptimizedRunSchema) diff --git a/src/taskgraph/transforms/run/index_search.py b/src/taskgraph/transforms/run/index_search.py index fd3ada672..0b86dc644 100644 --- a/src/taskgraph/transforms/run/index_search.py +++ b/src/taskgraph/transforms/run/index_search.py @@ -18,14 +18,15 @@ #: Schema for run.using index-search -class IndexSearchRunSchema(Schema): - using: Literal["index-search"] - # A list of indexes in decreasing order of priority at which to lookup for this - # task. This is interpolated with the graph parameters. - index_search: list[str] - - -run_task_schema = IndexSearchRunSchema +run_task_schema = Schema.from_dict( + { + "using": Literal["index-search"], + # A list of indexes in decreasing order of priority at which to lookup for this + # task. This is interpolated with the graph parameters. + "index-search": list[str], + }, + name="IndexSearchRunSchema", +) @run_task_using("always-optimized", "index-search", schema=run_task_schema) diff --git a/src/taskgraph/transforms/run/run_task.py b/src/taskgraph/transforms/run/run_task.py index 7c3ccb7a6..3cafaf7b7 100644 --- a/src/taskgraph/transforms/run/run_task.py +++ b/src/taskgraph/transforms/run/run_task.py @@ -27,36 +27,38 @@ #: Schema for run.using run_task -class RunTaskRunSchema(Schema, forbid_unknown_fields=False, kw_only=True): - # Specifies the task type. Must be 'run-task'. - using: Literal["run-task"] - # The command arguments to pass to the `run-task` script, after the checkout - # arguments. If a list, it will be passed directly; otherwise it will be - # included in a single argument to the command specified by `exec-with`. - command: Union[list[taskref_or_string_msgspec], taskref_or_string_msgspec] - # If true (the default), perform a checkout on the worker. Can also be a - # dictionary specifying explicit checkouts. - checkout: Union[bool, dict[str, dict]] - # Base work directory used to set up the task. - workdir: str - # Specifies which caches to use. May take a boolean in which case either all - # (True) or no (False) caches will be used. Alternatively, it can accept a - # list of caches to enable. Defaults to only the checkout cache enabled. - use_caches: Optional[Union[bool, list[CacheType]]] = None - # Path to run command in. If a checkout is present, the path to the checkout - # will be interpolated with the key `checkout`. - cwd: Optional[str] = None - # Specifies what to execute the command with in the event the command is a - # string. - exec_with: Optional[ExecWith] = None - # Command used to invoke the `run-task` script. Can be used if the script - # or Python installation is in a non-standard location on the workers. - run_task_command: Optional[list] = None - # Whether to run as root. Defaults to False. - run_as_root: Optional[bool] = None - - -run_task_schema = RunTaskRunSchema +run_task_schema = Schema.from_dict( + { + # Specifies the task type. Must be 'run-task'. + "using": Literal["run-task"], + # The command arguments to pass to the `run-task` script, after the checkout + # arguments. If a list, it will be passed directly; otherwise it will be + # included in a single argument to the command specified by `exec-with`. + "command": Union[list[taskref_or_string_msgspec], taskref_or_string_msgspec], + # If true (the default), perform a checkout on the worker. Can also be a + # dictionary specifying explicit checkouts. + "checkout": Union[bool, dict[str, dict]], + # Base work directory used to set up the task. + "workdir": str, + # Specifies which caches to use. May take a boolean in which case either all + # (True) or no (False) caches will be used. Alternatively, it can accept a + # list of caches to enable. Defaults to only the checkout cache enabled. + "use-caches": Optional[Union[bool, list[CacheType]]], + # Path to run command in. If a checkout is present, the path to the checkout + # will be interpolated with the key `checkout`. + "cwd": Optional[str], + # Specifies what to execute the command with in the event the command is a + # string. + "exec-with": Optional[ExecWith], + # Command used to invoke the `run-task` script. Can be used if the script + # or Python installation is in a non-standard location on the workers. + "run-task-command": Optional[list], + # Whether to run as root. Defaults to False. + "run-as-root": Optional[bool], + }, + name="RunTaskRunSchema", + forbid_unknown_fields=False, +) def common_setup(config, task, taskdesc, command): diff --git a/src/taskgraph/transforms/run/toolchain.py b/src/taskgraph/transforms/run/toolchain.py index 44067801f..3800d0643 100644 --- a/src/taskgraph/transforms/run/toolchain.py +++ b/src/taskgraph/transforms/run/toolchain.py @@ -23,29 +23,31 @@ #: Schema for run.using toolchain -class ToolchainRunSchema(Schema, forbid_unknown_fields=False, kw_only=True): - # Specifies the run type. Must be "toolchain-script". - using: Literal["toolchain-script"] - # The script (in taskcluster/scripts/misc) to run. - script: str - # Path to the artifact produced by the toolchain task. - toolchain_artifact: str - # Base work directory used to set up the task. - workdir: str - # Arguments to pass to the script. - arguments: Optional[list[str]] = None - # Paths/patterns pointing to files that influence the outcome of - # a toolchain build. - resources: Optional[list[str]] = None - # An alias that can be used instead of the real toolchain task name in - # fetch stanzas for tasks. - toolchain_alias: Optional[Union[str, list[str]]] = None - # Additional env variables to add to the worker when using this - # toolchain. - toolchain_env: Optional[dict[str, object]] = None - - -toolchain_run_schema = ToolchainRunSchema +toolchain_run_schema = Schema.from_dict( + { + # Specifies the run type. Must be "toolchain-script". + "using": Literal["toolchain-script"], + # The script (in taskcluster/scripts/misc) to run. + "script": str, + # Path to the artifact produced by the toolchain task. + "toolchain-artifact": str, + # Base work directory used to set up the task. + "workdir": str, + # Arguments to pass to the script. + "arguments": Optional[list[str]], + # Paths/patterns pointing to files that influence the outcome of + # a toolchain build. + "resources": Optional[list[str]], + # An alias that can be used instead of the real toolchain task name in + # fetch stanzas for tasks. + "toolchain-alias": Optional[Union[str, list[str]]], + # Additional env variables to add to the worker when using this + # toolchain. + "toolchain-env": Optional[dict[str, object]], + }, + name="ToolchainRunSchema", + forbid_unknown_fields=False, +) def get_digest_data(config, run, taskdesc): From 4b61b910ec5b6fa14df051ae631ff821ed6673d8 Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Wed, 8 Apr 2026 09:49:02 -0400 Subject: [PATCH 10/10] refactor: use Schema.from_dict in task transforms --- src/taskgraph/transforms/task.py | 332 +++++++++++++++++-------------- 1 file changed, 185 insertions(+), 147 deletions(-) diff --git a/src/taskgraph/transforms/task.py b/src/taskgraph/transforms/task.py index 53995a8ad..2c8192e03 100644 --- a/src/taskgraph/transforms/task.py +++ b/src/taskgraph/transforms/task.py @@ -50,9 +50,13 @@ def run_task_suffix(): return hash_path(RUN_TASK)[0:20] -class WorkerSchema(Schema, forbid_unknown_fields=False, kw_only=True): - # The worker implementation type. - implementation: str +WORKER_SCHEMA = Schema.from_dict( + { + # The worker implementation type. + "implementation": str, + }, + forbid_unknown_fields=False, +) #: Schema for the task transforms @@ -134,7 +138,7 @@ class TaskDescriptionSchema(Schema): # the release promotion phase that this task belongs to. shipping_phase: Optional[Literal["build", "promote", "push", "ship"]] = None # Information specific to the worker implementation that will run this task. - worker: Optional[WorkerSchema] = None + worker: Optional[WORKER_SCHEMA] = None def __post_init__(self): if self.dependencies: @@ -241,58 +245,66 @@ def verify_index(config, index): DockerImage = Union[str, dict[str, str]] -class DockerWorkerCacheEntry(Schema): - # only one type is supported by any of the workers right now - type: Literal["persistent"] = "persistent" - # name of the cache, allowing reuse by subsequent tasks naming the same cache - name: Optional[str] = None - # location in the task image where the cache will be mounted - mount_point: Optional[str] = None - # Whether the cache is not used in untrusted environments (like the Try repo). - skip_untrusted: Optional[bool] = None - - -class DockerWorkerArtifact(Schema): - # type of artifact -- simple file, or recursive directory, or a volume mounted directory. - type: Optional[Literal["file", "directory", "volume"]] = None - # task image path from which to read artifact - path: Optional[str] = None - # name of the produced artifact (root of the names for type=directory) - name: Optional[str] = None - - -class DockerWorkerPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True): - implementation: Literal["docker-worker"] - os: Literal["linux"] - # For tasks that will run in docker-worker, this is the name of the docker - # image or in-tree docker image to run the task in. - docker_image: DockerImage - # worker features that should be enabled - relengapi_proxy: bool - chain_of_trust: bool - taskcluster_proxy: bool - allow_ptrace: bool - loopback_video: bool - # environment variables - env: dict[str, taskref_or_string_msgspec] - # the maximum time to run, in seconds - max_run_time: int - # Paths to Docker volumes. - volumes: Optional[list[str]] = None - # caches to set up for the task - caches: Optional[list[DockerWorkerCacheEntry]] = None - # artifacts to extract from the task image after completion - artifacts: Optional[list[DockerWorkerArtifact]] = None - # the command to run; if not given, docker-worker will default to the - # command in the docker image - command: Optional[list[taskref_or_string_msgspec]] = None - # the exit status code(s) that indicates the task should be retried - retry_exit_status: Optional[list[int]] = None - # the exit status code(s) that indicates the caches used by the task - # should be purged - purge_caches_exit_status: Optional[list[int]] = None - # Whether any artifacts are assigned to this worker - skip_artifacts: Optional[bool] = None +DOCKER_WORKER_CACHE_ENTRY = Schema.from_dict( + { + # only one type is supported by any of the workers right now + "type": (Literal["persistent"], "persistent"), + # name of the cache, allowing reuse by subsequent tasks naming the same cache + "name": (str, None), + # location in the task image where the cache will be mounted + "mount-point": (str, None), + # Whether the cache is not used in untrusted environments (like the Try repo). + "skip-untrusted": (bool, None), + }, +) + +DOCKER_WORKER_ARTIFACT = Schema.from_dict( + { + # type of artifact -- simple file, or recursive directory, or a volume mounted directory. + "type": (Optional[Literal["file", "directory", "volume"]], None), + # task image path from which to read artifact + "path": (str, None), + # name of the produced artifact (root of the names for type=directory) + "name": (str, None), + }, +) + +DockerWorkerPayloadSchema = Schema.from_dict( + { + "implementation": Literal["docker-worker"], + "os": Literal["linux"], + # For tasks that will run in docker-worker, this is the name of the docker + # image or in-tree docker image to run the task in. + "docker-image": DockerImage, + # worker features that should be enabled + "relengapi-proxy": bool, + "chain-of-trust": bool, + "taskcluster-proxy": bool, + "allow-ptrace": bool, + "loopback-video": bool, + # environment variables + "env": dict[str, taskref_or_string_msgspec], + # the maximum time to run, in seconds + "max-run-time": int, + # Paths to Docker volumes. + "volumes": (list[str], None), + # caches to set up for the task + "caches": (list[DOCKER_WORKER_CACHE_ENTRY], None), + # artifacts to extract from the task image after completion + "artifacts": (list[DOCKER_WORKER_ARTIFACT], None), + # the command to run; if not given, docker-worker will default to the + # command in the docker image + "command": (list[taskref_or_string_msgspec], None), + # the exit status code(s) that indicates the task should be retried + "retry-exit-status": (list[int], None), + # the exit status code(s) that indicates the caches used by the task + # should be purged + "purge-caches-exit-status": (list[int], None), + # Whether any artifacts are assigned to this worker + "skip-artifacts": (bool, None), + }, + forbid_unknown_fields=False, +) @payload_builder("docker-worker", schema=DockerWorkerPayloadSchema) @@ -497,69 +509,78 @@ def build_docker_worker_payload(config, task, task_def): payload["capabilities"] = capabilities -class GenericWorkerArtifact(Schema): - # type of artifact -- simple file, or recursive directory - type: Literal["file", "directory"] - # filesystem path from which to read artifact - path: str - # if not specified, path is used for artifact name - name: Optional[str] = None - - -class MountContentSchema(Schema): - # Artifact name that contains the content. - artifact: Optional[str] = None - # Task ID that has the artifact that contains the content. - task_id: Optional[taskref_or_string_msgspec] = None - # URL that supplies the content in response to an unauthenticated GET request. - url: Optional[str] = None - - -class MountSchema(Schema): - # A unique name for the cache volume. - cache_name: Optional[str] = None - # Optional content for pre-loading cache, or mandatory content for - # read-only file or directory. - content: Optional[MountContentSchema] = None - # If mounting a cache or read-only directory. - directory: Optional[str] = None - # If mounting a file. - file: Optional[str] = None - # Archive format of the content. - format: Optional[Literal["rar", "tar.bz2", "tar.gz", "zip"]] = None - - -class GenericWorkerPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True): - implementation: Literal["generic-worker"] - os: Literal["windows", "macosx", "linux", "linux-bitbar"] - # command is a list of commands to run, sequentially - # On Windows, each command is a string; on Linux/OS X, each command is a string array - command: list - # environment variables - env: dict[str, taskref_or_string_msgspec] - # the maximum time to run, in seconds - max_run_time: int - # optional features - chain_of_trust: bool - # artifacts to extract from the task image after completion - artifacts: Optional[list[GenericWorkerArtifact]] = None - # Directories and/or files to be mounted. - mounts: Optional[list[MountSchema]] = None - # the exit status code(s) that indicates the task should be retried - retry_exit_status: Optional[list[int]] = None - # the exit status code(s) that indicates the caches used by the task - # should be purged - purge_caches_exit_status: Optional[list[int]] = None - # os user groups for test task workers - os_groups: Optional[list[str]] = None - # feature for test task to run as administrator - run_as_administrator: Optional[bool] = None - # feature for task to run as current OS user - run_task_as_current_user: Optional[bool] = None - taskcluster_proxy: Optional[bool] = None - hide_cmd_window: Optional[bool] = None - # Whether any artifacts are assigned to this worker - skip_artifacts: Optional[bool] = None +GENERIC_WORKER_ARTIFACT = Schema.from_dict( + { + # type of artifact -- simple file, or recursive directory + "type": Literal["file", "directory"], + # filesystem path from which to read artifact + "path": str, + # if not specified, path is used for artifact name + "name": (str, None), + }, +) + +MOUNT_SCHEMA = Schema.from_dict( + { + # A unique name for the cache volume. + "cache-name": (str, None), + # Optional content for pre-loading cache, or mandatory content for + # read-only file or directory. + "content": Schema.from_dict( + { + # Artifact name that contains the content. + "artifact": (str, None), + # Task ID that has the artifact that contains the content. + "task-id": (taskref_or_string_msgspec, None), + # URL that supplies the content in response to an unauthenticated GET request. + "url": (str, None), + }, + optional=True, + ), + # If mounting a cache or read-only directory. + "directory": (str, None), + # If mounting a file. + "file": (str, None), + # Archive format of the content. + "format": (Optional[Literal["rar", "tar.bz2", "tar.gz", "zip"]], None), + }, +) + +GenericWorkerPayloadSchema = Schema.from_dict( + { + "implementation": Literal["generic-worker"], + "os": Literal["windows", "macosx", "linux", "linux-bitbar"], + # command is a list of commands to run, sequentially + # On Windows, each command is a string; on Linux/OS X, each command is a string array + "command": list, + # environment variables + "env": dict[str, taskref_or_string_msgspec], + # the maximum time to run, in seconds + "max-run-time": int, + # optional features + "chain-of-trust": bool, + # artifacts to extract from the task image after completion + "artifacts": (list[GENERIC_WORKER_ARTIFACT], None), + # Directories and/or files to be mounted. + "mounts": (list[MOUNT_SCHEMA], None), + # the exit status code(s) that indicates the task should be retried + "retry-exit-status": (list[int], None), + # the exit status code(s) that indicates the caches used by the task + # should be purged + "purge-caches-exit-status": (list[int], None), + # os user groups for test task workers + "os-groups": (list[str], None), + # feature for test task to run as administrator + "run-as-administrator": (bool, None), + # feature for task to run as current OS user + "run-task-as-current-user": (bool, None), + "taskcluster-proxy": (bool, None), + "hide-cmd-window": (bool, None), + # Whether any artifacts are assigned to this worker + "skip-artifacts": (bool, None), + }, + forbid_unknown_fields=False, +) @payload_builder("generic-worker", schema=GenericWorkerPayloadSchema) @@ -677,13 +698,16 @@ def build_generic_worker_payload(config, task, task_def): task_def["payload"]["features"] = features -class ReleaseProperties(Schema): - app_name: Optional[str] = None - app_version: Optional[str] = None - branch: Optional[str] = None - build_id: Optional[str] = None - hash_type: Optional[str] = None - platform: Optional[str] = None +RELEASE_PROPERTIES = Schema.from_dict( + { + "app-name": (str, None), + "app-version": (str, None), + "branch": (str, None), + "build-id": (str, None), + "hash-type": (str, None), + "platform": (str, None), + }, +) class UpstreamArtifact(Schema, rename="camel"): @@ -697,18 +721,22 @@ class UpstreamArtifact(Schema, rename="camel"): locale: str -class BeetmoverPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True): - implementation: Literal["beetmover"] - # the maximum time to run, in seconds - max_run_time: int - # release properties - release_properties: ReleaseProperties - # list of artifact URLs for the artifacts that should be beetmoved - upstream_artifacts: list[UpstreamArtifact] - # locale key, if this is a locale beetmover task - locale: Optional[str] = None - partner_public: Optional[bool] = None - artifact_map: Optional[object] = None +BeetmoverPayloadSchema = Schema.from_dict( + { + "implementation": Literal["beetmover"], + # the maximum time to run, in seconds + "max-run-time": int, + # release properties + "release-properties": RELEASE_PROPERTIES, + # list of artifact URLs for the artifacts that should be beetmoved + "upstream-artifacts": list[UpstreamArtifact], + # locale key, if this is a locale beetmover task + "locale": (str, None), + "partner-public": (bool, None), + "artifact-map": (object, None), + }, + forbid_unknown_fields=False, +) @payload_builder("beetmover", schema=BeetmoverPayloadSchema) @@ -737,10 +765,14 @@ def build_beetmover_payload(config, task, task_def): task_def["payload"]["is_partner_repack_public"] = worker["partner-public"] -class InvalidPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True): - # an invalid task is one which should never actually be created; this is used in - # release automation on branches where the task just doesn't make sense - implementation: Literal["invalid"] +InvalidPayloadSchema = Schema.from_dict( + { + # an invalid task is one which should never actually be created; this is used in + # release automation on branches where the task just doesn't make sense + "implementation": Literal["invalid"], + }, + forbid_unknown_fields=False, +) @payload_builder("invalid", schema=InvalidPayloadSchema) @@ -748,12 +780,18 @@ def build_invalid_payload(config, task, task_def): task_def["payload"] = "invalid task - should never be created" -class AlwaysOptimizedPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True): - implementation: Literal["always-optimized"] - +AlwaysOptimizedPayloadSchema = Schema.from_dict( + { + "implementation": Literal["always-optimized"], + }, + forbid_unknown_fields=False, +) -class SucceedPayloadSchema(Schema): - implementation: Literal["succeed"] +SucceedPayloadSchema = Schema.from_dict( + { + "implementation": Literal["succeed"], + }, +) @payload_builder("always-optimized", schema=AlwaysOptimizedPayloadSchema)