From ad1fecedb672271652bb5db8777ec34d88c25f7d Mon Sep 17 00:00:00 2001 From: abhishekmadan30 Date: Mon, 9 Feb 2026 23:40:35 -0500 Subject: [PATCH 1/2] feat: convert all independent schemas from voluptuous --- src/taskgraph/decision.py | 14 +-- src/taskgraph/transforms/base.py | 4 +- src/taskgraph/transforms/chunking.py | 57 ++++------ src/taskgraph/transforms/docker_image.py | 109 +++++------------- src/taskgraph/transforms/matrix.py | 79 ++++++------- src/taskgraph/transforms/run/index_search.py | 22 ++-- src/taskgraph/transforms/run/toolchain.py | 102 +++++------------ src/taskgraph/transforms/task_context.py | 111 +++++++------------ src/taskgraph/util/schema.py | 14 +++ 9 files changed, 177 insertions(+), 335 deletions(-) diff --git a/src/taskgraph/decision.py b/src/taskgraph/decision.py index 888ad12fe..35193f06e 100644 --- a/src/taskgraph/decision.py +++ b/src/taskgraph/decision.py @@ -9,9 +9,9 @@ import shutil import time from pathlib import Path +from typing import Optional import yaml -from voluptuous import Optional from taskgraph.actions import render_actions_json from taskgraph.create import create_tasks @@ -20,7 +20,7 @@ from taskgraph.taskgraph import TaskGraph from taskgraph.util import json from taskgraph.util.python_path import find_object -from taskgraph.util.schema import LegacySchema, validate_schema +from taskgraph.util.schema import Schema, validate_schema from taskgraph.util.vcs import get_repository from taskgraph.util.yaml import load_yaml @@ -40,11 +40,11 @@ #: Schema for try_task_config.json version 2 -try_task_config_schema_v2 = LegacySchema( - { - Optional("parameters"): {str: object}, - } -) +class TryTaskConfigSchemaV2(Schema, forbid_unknown_fields=True): + parameters: Optional[dict[str, object]] = None + + +try_task_config_schema_v2 = TryTaskConfigSchemaV2 def full_task_graph_to_runnable_tasks(full_task_json): diff --git a/src/taskgraph/transforms/base.py b/src/taskgraph/transforms/base.py index a1d50ea4f..42705c3c5 100644 --- a/src/taskgraph/transforms/base.py +++ b/src/taskgraph/transforms/base.py @@ -12,7 +12,7 @@ from ..config import GraphConfig from ..parameters import Parameters -from ..util.schema import LegacySchema, validate_schema +from ..util.schema import Schema, validate_schema @dataclass(frozen=True) @@ -138,7 +138,7 @@ def add_validate(self, schema): @dataclass class ValidateSchema: - schema: LegacySchema + schema: Schema def __call__(self, config, tasks): for task in tasks: diff --git a/src/taskgraph/transforms/chunking.py b/src/taskgraph/transforms/chunking.py index 59818337b..f781b0c22 100644 --- a/src/taskgraph/transforms/chunking.py +++ b/src/taskgraph/transforms/chunking.py @@ -2,49 +2,30 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import copy -from textwrap import dedent - -from voluptuous import ALLOW_EXTRA, Optional, Required +from typing import Optional from taskgraph.transforms.base import TransformSequence -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema from taskgraph.util.templates import substitute + +class ChunkConfig(Schema): + # The total number of chunks to split the task into. + total_chunks: int + # A list of fields that need to have `{this_chunk}` and/or + # `{total_chunks}` replaced in them. + substitution_fields: list[str] = [] + + #: Schema for chunking transforms -CHUNK_SCHEMA = LegacySchema( - { - # Optional, so it can be used for a subset of tasks in a kind - Optional( - "chunk", - description=dedent( - """ - `chunk` can be used to split one task into `total-chunks` - tasks, substituting `this_chunk` and `total_chunks` into any - fields in `substitution-fields`. - """.lstrip() - ), - ): { - Required( - "total-chunks", - description=dedent( - """ - The total number of chunks to split the task into. - """.lstrip() - ), - ): int, - Optional( - "substitution-fields", - description=dedent( - """ - A list of fields that need to have `{this_chunk}` and/or - `{total_chunks}` replaced in them. - """.lstrip() - ), - ): [str], - } - }, - extra=ALLOW_EXTRA, -) +class ChunkSchema(Schema, forbid_unknown_fields=False): + # `chunk` can be used to split one task into `total-chunks` + # tasks, substituting `this_chunk` and `total_chunks` into any + # fields in `substitution-fields`. + chunk: Optional[ChunkConfig] = None + + +CHUNK_SCHEMA = ChunkSchema transforms = TransformSequence() transforms.add_validate(CHUNK_SCHEMA) diff --git a/src/taskgraph/transforms/docker_image.py b/src/taskgraph/transforms/docker_image.py index a9e76abfc..643be20e5 100644 --- a/src/taskgraph/transforms/docker_image.py +++ b/src/taskgraph/transforms/docker_image.py @@ -5,17 +5,13 @@ import logging import os import re -from textwrap import dedent - -from voluptuous import Optional, Required +from typing import Optional import taskgraph from taskgraph.transforms.base import TransformSequence from taskgraph.util import json from taskgraph.util.docker import create_context_tar, generate_context_hash -from taskgraph.util.schema import LegacySchema - -from .task import task_description_schema +from taskgraph.util.schema import IndexSchema, Schema logger = logging.getLogger(__name__) @@ -31,84 +27,31 @@ transforms = TransformSequence() + #: Schema for docker_image transforms -docker_image_schema = LegacySchema( - { - Required( - "name", - description=dedent( - """ - Name of the docker image. - """ - ).lstrip(), - ): str, - Optional( - "parent", - description=dedent( - """ - Name of the parent docker image. - """ - ).lstrip(), - ): str, - Optional( - "symbol", - description=dedent( - """ - Treeherder symbol. - """ - ).lstrip(), - ): str, - Optional( - "task-from", - description=dedent( - """ - Relative path (from config.path) to the file the docker image was defined in. - """ - ).lstrip(), - ): str, - Optional( - "args", - description=dedent( - """ - Arguments to use for the Dockerfile. - """ - ).lstrip(), - ): {str: str}, - Optional( - "definition", - description=dedent( - """ - Name of the docker image definition under taskcluster/docker, when - different from the docker image name. - """ - ).lstrip(), - ): str, - Optional( - "packages", - description=dedent( - """ - List of package tasks this docker image depends on. - """ - ).lstrip(), - ): [str], - Optional( - "index", - description=dedent( - """ - Information for indexing this build so its artifacts can be discovered. - """ - ).lstrip(), - ): task_description_schema["index"], - Optional( - "cache", - description=dedent( - """ - Whether this image should be cached based on inputs. - """ - ).lstrip(), - ): bool, - } -) +class DockerImageSchema(Schema): + # Name of the docker image. + name: str + # Name of the parent docker image. + parent: Optional[str] = None + # Treeherder symbol. + symbol: Optional[str] = None + # Relative path (from config.path) to the file the docker image was defined in. + task_from: Optional[str] = None + # Arguments to use for the Dockerfile. + args: Optional[dict[str, str]] = None + # Name of the docker image definition under taskcluster/docker, when + # different from the docker image name. + definition: Optional[str] = None + # List of package tasks this docker image depends on. + packages: Optional[list[str]] = None + # Information for indexing this build so its artifacts can be discovered. + index: Optional[IndexSchema] = None + # Whether this image should be cached based on inputs. + cache: Optional[bool] = None + + +docker_image_schema = DockerImageSchema transforms.add_validate(docker_image_schema) diff --git a/src/taskgraph/transforms/matrix.py b/src/taskgraph/transforms/matrix.py index 855bffa41..659335515 100644 --- a/src/taskgraph/transforms/matrix.py +++ b/src/taskgraph/transforms/matrix.py @@ -8,59 +8,42 @@ """ from copy import deepcopy -from textwrap import dedent - -from voluptuous import ALLOW_EXTRA, Extra, Optional, Required +from typing import Optional from taskgraph.transforms.base import TransformSequence -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema from taskgraph.util.templates import substitute_task_fields + +class MatrixConfig(Schema, forbid_unknown_fields=False): + # Exclude the specified combination(s) of matrix values from the + # final list of tasks. + # + # If only a subset of the possible rows are present in the + # exclusion rule, then *all* combinations including that subset + # subset will be excluded. + exclude: Optional[list[dict[str, str]]] = None + # Sets the task name to the specified format string. + # + # Useful for cases where the default of joining matrix values by + # a dash is not desired. + set_name: Optional[str] = None + # List of fields in the task definition to substitute matrix values into. + # + # If not specified, all fields in the task definition will be + # substituted. + substitution_fields: Optional[list[str]] = None + # Extra dimension keys (e.g. "platform": ["linux", "win"]) allowed + # via forbid_unknown_fields=False + + #: Schema for matrix transforms -MATRIX_SCHEMA = LegacySchema( - { - Required("name"): str, - Optional("matrix"): { - Optional( - "exclude", - description=dedent( - """ - Exclude the specified combination(s) of matrix values from the - final list of tasks. - - If only a subset of the possible rows are present in the - exclusion rule, then *all* combinations including that subset - subset will be excluded. - """.lstrip() - ), - ): [{str: str}], - Optional( - "set-name", - description=dedent( - """ - Sets the task name to the specified format string. - - Useful for cases where the default of joining matrix values by - a dash is not desired. - """.lstrip() - ), - ): str, - Optional( - "substitution-fields", - description=dedent( - """ - List of fields in the task definition to substitute matrix values into. - - If not specified, all fields in the task definition will be - substituted. - """ - ), - ): [str], - Extra: [str], - }, - }, - extra=ALLOW_EXTRA, -) +class MatrixSchema(Schema, forbid_unknown_fields=False): + name: str + matrix: Optional[MatrixConfig] = None + + +MATRIX_SCHEMA = MatrixSchema transforms = TransformSequence() transforms.add_validate(MATRIX_SCHEMA) diff --git a/src/taskgraph/transforms/run/index_search.py b/src/taskgraph/transforms/run/index_search.py index d5c0c6109..fd3ada672 100644 --- a/src/taskgraph/transforms/run/index_search.py +++ b/src/taskgraph/transforms/run/index_search.py @@ -8,26 +8,24 @@ phase will replace the task with the task from the other graph. """ -from voluptuous import Required +from typing import Literal from taskgraph.transforms.base import TransformSequence from taskgraph.transforms.run import run_task_using -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema transforms = TransformSequence() #: Schema for run.using index-search -run_task_schema = LegacySchema( - { - Required("using"): "index-search", - Required( - "index-search", - "A list of indexes in decreasing order of priority at which to lookup for this " - "task. This is interpolated with the graph parameters.", - ): [str], - } -) +class IndexSearchRunSchema(Schema): + using: Literal["index-search"] + # A list of indexes in decreasing order of priority at which to lookup for this + # task. This is interpolated with the graph parameters. + index_search: list[str] + + +run_task_schema = IndexSearchRunSchema @run_task_using("always-optimized", "index-search", schema=run_task_schema) diff --git a/src/taskgraph/transforms/run/toolchain.py b/src/taskgraph/transforms/run/toolchain.py index 77406ad61..aeba0e82d 100644 --- a/src/taskgraph/transforms/run/toolchain.py +++ b/src/taskgraph/transforms/run/toolchain.py @@ -5,9 +5,7 @@ Support for running toolchain-building tasks via dedicated scripts """ -from textwrap import dedent - -from voluptuous import ALLOW_EXTRA, Any, Optional, Required +from typing import Literal, Optional, Union import taskgraph from taskgraph.transforms.run import configure_taskdesc_for_run, run_task_using @@ -18,84 +16,36 @@ ) from taskgraph.util import path as mozpath from taskgraph.util.hash import hash_paths -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema from taskgraph.util.shell import quote as shell_quote CACHE_TYPE = "toolchains.v3" + #: Schema for run.using toolchain -toolchain_run_schema = LegacySchema( - { - Required( - "using", - description=dedent( - """ - Specifies the run type. Must be "toolchain-script". - """ - ), - ): "toolchain-script", - Required( - "script", - description=dedent( - """ - The script (in taskcluster/scripts/misc) to run. - """ - ), - ): str, - Optional( - "arguments", - description=dedent( - """ - Arguments to pass to the script. - """ - ), - ): [str], - Optional( - "resources", - description=dedent( - """ - Paths/patterns pointing to files that influence the outcome of - a toolchain build. - """ - ), - ): [str], - Required( - "toolchain-artifact", - description=dedent( - """ - Path to the artifact produced by the toolchain task. - """ - ), - ): str, - Optional( - "toolchain-alias", - description=dedent( - """ - An alias that can be used instead of the real toolchain task name in - fetch stanzas for tasks. - """ - ), - ): Any(str, [str]), - Optional( - "toolchain-env", - description=dedent( - """ - Additional env variables to add to the worker when using this - toolchain. - """ - ), - ): {str: object}, - Required( - "workdir", - description=dedent( - """ - Base work directory used to set up the task. - """ - ), - ): str, - }, - extra=ALLOW_EXTRA, -) +class ToolchainRunSchema(Schema, forbid_unknown_fields=False): + # Specifies the run type. Must be "toolchain-script". + using: Literal["toolchain-script"] + # The script (in taskcluster/scripts/misc) to run. + script: str + # Path to the artifact produced by the toolchain task. + toolchain_artifact: str + # Base work directory used to set up the task. + workdir: str + # Arguments to pass to the script. + arguments: Optional[list[str]] = None + # Paths/patterns pointing to files that influence the outcome of + # a toolchain build. + resources: Optional[list[str]] = None + # An alias that can be used instead of the real toolchain task name in + # fetch stanzas for tasks. + toolchain_alias: Optional[Union[str, list[str]]] = None + # Additional env variables to add to the worker when using this + # toolchain. + toolchain_env: Optional[dict[str, object]] = None + + +toolchain_run_schema = ToolchainRunSchema def get_digest_data(config, run, taskdesc): diff --git a/src/taskgraph/transforms/task_context.py b/src/taskgraph/transforms/task_context.py index e38648cd3..26df43b4a 100644 --- a/src/taskgraph/transforms/task_context.py +++ b/src/taskgraph/transforms/task_context.py @@ -1,82 +1,55 @@ -from textwrap import dedent - -from voluptuous import ALLOW_EXTRA, Any, Optional, Required +from typing import Optional, Union from taskgraph.transforms.base import TransformSequence -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema from taskgraph.util.templates import deep_get, substitute_task_fields from taskgraph.util.yaml import load_yaml -#: Schema for the task_context transforms -SCHEMA = LegacySchema( - { - Optional("name"): str, - Optional( - "task-context", - description=dedent( - """ - `task-context` can be used to substitute values into any field in a - task with data that is not known until `taskgraph` runs. - This data can be provided via `from-parameters` or `from-file`, - which can pull in values from parameters and a defined yml file - respectively. +class TaskContextConfig(Schema): + # A list of fields in the task to substitute the provided values + # into. + substitution_fields: list[str] + # Retrieve task context values from parameters. A single + # parameter may be provided or a list of parameters in + # priority order. The latter can be useful in implementing a + # "default" value if some other parameter is not provided. + from_parameters: Optional[dict[str, Union[list[str], str]]] = None + # Retrieve task context values from a yaml file. The provided + # file should usually only contain top level keys and values + # (eg: nested objects will not be interpolated - they will be + # substituted as text representations of the object). + from_file: Optional[str] = None + # Key/value pairs to be used as task context + from_object: Optional[object] = None - Data may also be provided directly in the `from-object` section of - `task-context`. This can be useful in `kinds` that define most of - their contents in `task-defaults`, but have some values that may - differ for various concrete `tasks` in the `kind`. - If the same key is found in multiple places the order of precedence - is as follows: - - Parameters - - `from-object` keys - - File +#: Schema for the task_context transforms +class TaskContextSchema(Schema, forbid_unknown_fields=False): + name: Optional[str] = None + # `task-context` can be used to substitute values into any field in a + # task with data that is not known until `taskgraph` runs. + # + # This data can be provided via `from-parameters` or `from-file`, + # which can pull in values from parameters and a defined yml file + # respectively. + # + # Data may also be provided directly in the `from-object` section of + # `task-context`. This can be useful in `kinds` that define most of + # their contents in `task-defaults`, but have some values that may + # differ for various concrete `tasks` in the `kind`. + # + # If the same key is found in multiple places the order of precedence + # is as follows: + # - Parameters + # - `from-object` keys + # - File + # + # That is to say: parameters will always override anything else. + task_context: Optional[TaskContextConfig] = None - That is to say: parameters will always override anything else. - """.lstrip(), - ), - ): { - Optional( - "from-parameters", - description=dedent( - """ - Retrieve task context values from parameters. A single - parameter may be provided or a list of parameters in - priority order. The latter can be useful in implementing a - "default" value if some other parameter is not provided. - """.lstrip() - ), - ): {str: Any([str], str)}, - Optional( - "from-file", - description=dedent( - """ - Retrieve task context values from a yaml file. The provided - file should usually only contain top level keys and values - (eg: nested objects will not be interpolated - they will be - substituted as text representations of the object). - """.lstrip() - ), - ): str, - Optional( - "from-object", - description="Key/value pairs to be used as task context", - ): object, - Required( - "substitution-fields", - description=dedent( - """ - A list of fields in the task to substitute the provided values - into. - """.lstrip() - ), - ): [str], - }, - }, - extra=ALLOW_EXTRA, -) +SCHEMA = TaskContextSchema transforms = TransformSequence() transforms.add_validate(SCHEMA) diff --git a/src/taskgraph/util/schema.py b/src/taskgraph/util/schema.py index 9f256f69d..71c9011a0 100644 --- a/src/taskgraph/util/schema.py +++ b/src/taskgraph/util/schema.py @@ -311,6 +311,20 @@ def validate(cls, data): raise msgspec.ValidationError(str(e)) +class IndexSchema(Schema): + # the name of the product this build produces + product: str + # the names to use for this task in the TaskCluster index + job_name: str + # Type of gecko v2 index to use + type: str + # The rank that the task will receive in the TaskCluster + # index. A newly completed task supersedes the currently + # indexed task iff it has a higher rank. If unspecified, + # 'by-tier' behavior will be used. + rank: Union[Literal["by-tier", "build_date"], int] = "by-tier" + + class IndexSearchOptimizationSchema(Schema): """Search the index for the given index namespaces.""" From 1991917e60a6aa1a8700bba9aa92a8fb420b18ba Mon Sep 17 00:00:00 2001 From: abhishekmadan30 Date: Tue, 10 Feb 2026 00:08:52 -0500 Subject: [PATCH 2/2] feat: convert all interconnected/dependent schemas to msgspec --- docs/conf.py | 5 + src/taskgraph/config.py | 159 ++--- src/taskgraph/transforms/fetch.py | 205 +++--- src/taskgraph/transforms/from_deps.py | 119 +--- src/taskgraph/transforms/notify.py | 145 ++-- src/taskgraph/transforms/run/__init__.py | 244 +++---- src/taskgraph/transforms/run/run_task.py | 125 +--- src/taskgraph/transforms/task.py | 821 ++++++++--------------- src/taskgraph/util/dependencies.py | 3 +- src/taskgraph/util/schema.py | 22 +- test/test_transforms_run_run_task.py | 9 +- test/test_util_schema.py | 101 +-- 12 files changed, 753 insertions(+), 1205 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 73edaeb3d..ee00ecf30 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -43,6 +43,11 @@ # This pattern also affects html_static_path and html_extra_path. exclude_patterns = ["docs/_build", "Thumbs.db", ".DS_Store"] +# Suppress ambiguous cross-reference warnings from autodoc. +# Multiple Schema classes define fields with the same name (e.g. "path"), +# which Sphinx can't disambiguate. +suppress_warnings = ["ref.python"] + # -- Options for HTML output ------------------------------------------------- diff --git a/src/taskgraph/config.py b/src/taskgraph/config.py index b9a2b57d2..688a5dca3 100644 --- a/src/taskgraph/config.py +++ b/src/taskgraph/config.py @@ -8,106 +8,83 @@ import sys from dataclasses import dataclass from pathlib import Path +from typing import Literal, Optional, Union -from voluptuous import ALLOW_EXTRA, All, Any, Extra, Length, Optional, Required - -from .util.caches import CACHES from .util.python_path import find_object -from .util.schema import LegacySchema, optionally_keyed_by, validate_schema +from .util.schema import Schema, TaskPriority, optionally_keyed_by, validate_schema from .util.vcs import get_repository from .util.yaml import load_yaml logger = logging.getLogger(__name__) +CacheType = Literal["cargo", "checkout", "npm", "pip", "uv"] + + +class WorkerAlias(Schema): + provisioner: optionally_keyed_by("level", str, use_msgspec=True) # type: ignore + implementation: str + os: str + worker_type: optionally_keyed_by("level", str, use_msgspec=True) # type: ignore + + +class WorkersConfig(Schema): + aliases: dict[str, WorkerAlias] + + +class RunConfig(Schema): + # List of caches to enable, or a boolean to enable/disable all of them. + use_caches: Optional[Union[bool, list[CacheType]]] = None + + +class RepositoryConfig(Schema, forbid_unknown_fields=False): + name: str + project_regex: Optional[str] = None + ssh_secret_name: Optional[str] = None + # FIXME: Extra keys allowed via forbid_unknown_fields=False + + +class TaskgraphConfig(Schema): + repositories: dict[str, RepositoryConfig] + # Python function to call to register extensions. + register: Optional[str] = None + decision_parameters: Optional[str] = None + # The taskcluster index prefix to use for caching tasks. + # Defaults to `trust-domain`. + cached_task_prefix: Optional[str] = None + # Should tasks from pull requests populate the cache + cache_pull_requests: Optional[bool] = None + # Regular expressions matching index paths to be summarized. + index_path_regexes: Optional[list[str]] = None + # Configuration related to the 'run' transforms. + run: Optional[RunConfig] = None + + def __post_init__(self): + # Validate repositories has at least 1 entry (was All(..., Length(min=1))) + if not self.repositories: + raise ValueError("'repositories' must have at least one entry") + #: Schema for the graph config -graph_config_schema = LegacySchema( - { - # The trust-domain for this graph. - # (See https://firefox-source-docs.mozilla.org/taskcluster/taskcluster/taskgraph.html#taskgraph-trust-domain) # noqa - Required("trust-domain"): str, - Optional( - "docker-image-kind", - description="Name of the docker image kind (default: docker-image)", - ): str, - Required("task-priority"): optionally_keyed_by( - "project", - "level", - Any( - "highest", - "very-high", - "high", - "medium", - "low", - "very-low", - "lowest", - ), - ), - Optional( - "task-deadline-after", - description="Default 'deadline' for tasks, in relative date format. " - "Eg: '1 week'", - ): optionally_keyed_by("project", str), - Optional( - "task-expires-after", - description="Default 'expires-after' for level 1 tasks, in relative date format. " - "Eg: '90 days'", - ): str, - Required("workers"): { - Required("aliases"): { - str: { - Required("provisioner"): optionally_keyed_by("level", str), - Required("implementation"): str, - Required("os"): str, - Required("worker-type"): optionally_keyed_by("level", str), - } - }, - }, - Required("taskgraph"): { - Optional( - "register", - description="Python function to call to register extensions.", - ): str, - Optional("decision-parameters"): str, - Optional( - "cached-task-prefix", - description="The taskcluster index prefix to use for caching tasks. " - "Defaults to `trust-domain`.", - ): str, - Optional( - "cache-pull-requests", - description="Should tasks from pull requests populate the cache", - ): bool, - Optional( - "index-path-regexes", - description="Regular expressions matching index paths to be summarized.", - ): [str], - Optional( - "run", - description="Configuration related to the 'run' transforms.", - ): { - Optional( - "use-caches", - description="List of caches to enable, or a boolean to " - "enable/disable all of them.", - ): Any(bool, list(CACHES.keys())), - }, - Required("repositories"): All( - { - str: { - Required("name"): str, - Optional("project-regex"): str, - Optional("ssh-secret-name"): str, - # FIXME - Extra: str, - } - }, - Length(min=1), - ), - }, - }, - extra=ALLOW_EXTRA, -) +class GraphConfigSchema(Schema, forbid_unknown_fields=False): + # The trust-domain for this graph. + trust_domain: str + task_priority: optionally_keyed_by( # type: ignore + "project", "level", TaskPriority, use_msgspec=True + ) + workers: WorkersConfig + taskgraph: TaskgraphConfig + # Name of the docker image kind (default: docker-image) + docker_image_kind: Optional[str] = None + # Default 'deadline' for tasks, in relative date format. Eg: '1 week' + task_deadline_after: Optional[ + optionally_keyed_by("project", str, use_msgspec=True) # type: ignore + ] = None + # Default 'expires-after' for level 1 tasks, in relative date format. + # Eg: '90 days' + task_expires_after: Optional[str] = None + + +graph_config_schema = GraphConfigSchema @dataclass(frozen=True, eq=False) diff --git a/src/taskgraph/transforms/fetch.py b/src/taskgraph/transforms/fetch.py index e165eec31..b8e65d61d 100644 --- a/src/taskgraph/transforms/fetch.py +++ b/src/taskgraph/transforms/fetch.py @@ -9,77 +9,48 @@ import os import re from dataclasses import dataclass -from textwrap import dedent -from typing import Callable - -from voluptuous import Extra, Optional, Required +from typing import Callable, Literal, Optional import taskgraph from ..util import path from ..util.cached_tasks import add_optimization -from ..util.schema import LegacySchema, validate_schema +from ..util.schema import Schema, validate_schema from ..util.treeherder import join_symbol from .base import TransformSequence CACHE_TYPE = "content.v1" + +class FetchSubSchema(Schema, forbid_unknown_fields=False): + # The fetch type + type: str + + #: Schema for fetch transforms -FETCH_SCHEMA = LegacySchema( - { - Required( - "name", - description=dedent( - """ - Name of the task. - """.lstrip() - ), - ): str, - Optional( - "task-from", - description=dedent( - """ - Relative path (from config.path) to the file the task was defined - in. - """.lstrip() - ), - ): str, - Required( - "description", - description=dedent( - """ - Description of the task. - """.lstrip() - ), - ): str, - Optional("expires-after"): str, - Optional("docker-image"): object, - Optional( - "fetch-alias", - description=dedent( - """ - An alias that can be used instead of the real fetch task name in - fetch stanzas for tasks. - """.lstrip() - ), - ): str, - Optional( - "artifact-prefix", - description=dedent( - """ - The prefix of the taskcluster artifact being uploaded. - Defaults to `public/`; if it starts with something other than - `public/` the artifact will require scopes to access. - """.lstrip() - ), - ): str, - Optional("attributes"): {str: object}, - Required("fetch"): { - Required("type"): str, - Extra: object, - }, - } -) +class FetchSchema(Schema): + # Name of the task. + name: str + # Description of the task. + description: str + # The fetch configuration + fetch: FetchSubSchema + # Relative path (from config.path) to the file the task was defined + # in. + task_from: Optional[str] = None + expires_after: Optional[str] = None + docker_image: Optional[object] = None + # An alias that can be used instead of the real fetch task name in + # fetch stanzas for tasks. + fetch_alias: Optional[str] = None + # The prefix of the taskcluster artifact being uploaded. + # Defaults to `public/`; if it starts with something other than + # `public/` the artifact will require scopes to access. + artifact_prefix: Optional[str] = None + attributes: Optional[dict[str, object]] = None + + +FETCH_SCHEMA = FetchSchema # define a collection of payload builders, depending on the worker implementation fetch_builders = {} @@ -87,15 +58,13 @@ @dataclass(frozen=True) class FetchBuilder: - schema: LegacySchema + schema: Schema builder: Callable def fetch_builder(name, schema): - schema = LegacySchema({Required("type"): name}).extend(schema) - def wrap(func): - fetch_builders[name] = FetchBuilder(schema, func) # type: ignore + fetch_builders[name] = FetchBuilder(schema, func) return func return wrap @@ -204,45 +173,45 @@ def make_task(config, tasks): yield task_desc -@fetch_builder( - "static-url", - schema={ - # The URL to download. - Required("url"): str, - # The SHA-256 of the downloaded content. - Required("sha256"): str, - # Size of the downloaded entity, in bytes. - Required("size"): int, - # GPG signature verification. - Optional("gpg-signature"): { - # URL where GPG signature document can be obtained. Can contain the - # value ``{url}``, which will be substituted with the value from - # ``url``. - Required("sig-url"): str, - # Path to file containing GPG public key(s) used to validate - # download. - Required("key-path"): str, - }, - # The name to give to the generated artifact. Defaults to the file - # portion of the URL. Using a different extension converts the - # archive to the given type. Only conversion to .tar.zst is - # supported. - Optional("artifact-name"): str, - # Strip the given number of path components at the beginning of - # each file entry in the archive. - # Requires an artifact-name ending with .tar.zst. - Optional("strip-components"): int, - # Add the given prefix to each file entry in the archive. - # Requires an artifact-name ending with .tar.zst. - Optional("add-prefix"): str, - # Headers to pass alongside the request. - Optional("headers"): { - str: str, - }, - # IMPORTANT: when adding anything that changes the behavior of the task, - # it is important to update the digest data used to compute cache hits. - }, -) +class GpgSignatureConfig(Schema): + # URL where GPG signature document can be obtained. Can contain the + # value ``{url}``, which will be substituted with the value from + # ``url``. + sig_url: str + # Path to file containing GPG public key(s) used to validate + # download. + key_path: str + + +class StaticUrlFetchSchema(Schema, forbid_unknown_fields=False): + type: Literal["static-url"] + # The URL to download. + url: str + # The SHA-256 of the downloaded content. + sha256: str + # Size of the downloaded entity, in bytes. + size: int + # GPG signature verification. + gpg_signature: Optional[GpgSignatureConfig] = None + # The name to give to the generated artifact. Defaults to the file + # portion of the URL. Using a different extension converts the + # archive to the given type. Only conversion to .tar.zst is + # supported. + artifact_name: Optional[str] = None + # Strip the given number of path components at the beginning of + # each file entry in the archive. + # Requires an artifact-name ending with .tar.zst. + strip_components: Optional[int] = None + # Add the given prefix to each file entry in the archive. + # Requires an artifact-name ending with .tar.zst. + add_prefix: Optional[str] = None + # Headers to pass alongside the request. + headers: Optional[dict[str, str]] = None + # IMPORTANT: when adding anything that changes the behavior of the task, + # it is important to update the digest data used to compute cache hits. + + +@fetch_builder("static-url", schema=StaticUrlFetchSchema) def create_fetch_url_task(config, name, fetch): artifact_name = fetch.get("artifact-name") if not artifact_name: @@ -305,21 +274,21 @@ def create_fetch_url_task(config, name, fetch): } -@fetch_builder( - "git", - schema={ - Required("repo"): str, - Required("revision"): str, - Optional("include-dot-git"): bool, - Optional("artifact-name"): str, - Optional("path-prefix"): str, - # ssh-key is a taskcluster secret path (e.g. project/civet/github-deploy-key) - # In the secret dictionary, the key should be specified as - # "ssh_privkey": "-----BEGIN OPENSSH PRIVATE KEY-----\nkfksnb3jc..." - # n.b. The OpenSSH private key file format requires a newline at the end of the file. - Optional("ssh-key"): str, - }, -) +class GitFetchSchema(Schema, forbid_unknown_fields=False): + type: Literal["git"] + repo: str + revision: str + include_dot_git: Optional[bool] = None + artifact_name: Optional[str] = None + path_prefix: Optional[str] = None + # ssh-key is a taskcluster secret path (e.g. project/civet/github-deploy-key) + # In the secret dictionary, the key should be specified as + # "ssh_privkey": "-----BEGIN OPENSSH PRIVATE KEY-----\nkfksnb3jc..." + # n.b. The OpenSSH private key file format requires a newline at the end of the file. + ssh_key: Optional[str] = None + + +@fetch_builder("git", schema=GitFetchSchema) def create_git_fetch_task(config, name, fetch): path_prefix = fetch.get("path-prefix") if not path_prefix: diff --git a/src/taskgraph/transforms/from_deps.py b/src/taskgraph/transforms/from_deps.py index c03148c99..02444630f 100644 --- a/src/taskgraph/transforms/from_deps.py +++ b/src/taskgraph/transforms/from_deps.py @@ -13,105 +13,42 @@ from copy import deepcopy from textwrap import dedent - -from voluptuous import ALLOW_EXTRA, Any, Optional, Required +from typing import Optional, Union from taskgraph.transforms.base import TransformSequence -from taskgraph.transforms.run import fetches_schema +from taskgraph.transforms.run import FetchesEntrySchema from taskgraph.util.attributes import attrmatch from taskgraph.util.dependencies import GROUP_BY_MAP, get_dependencies -from taskgraph.util.schema import LegacySchema, validate_schema +from taskgraph.util.schema import Schema, validate_schema from taskgraph.util.set_name import SET_NAME_MAP + +class FromDepsConfig(Schema): + # Limit dependencies to specified kinds (defaults to all kinds in + # `kind-dependencies`). + kinds: Optional[list[str]] = None + # Set-name function (dynamic: validated at runtime against SET_NAME_MAP). + set_name: Optional[Union[bool, str, dict[str, object]]] = None + # Limit dependencies to tasks whose attributes match. + with_attributes: Optional[dict[str, Union[list, str]]] = None + # Group cross-kind dependencies using the given group-by function. + group_by: Optional[Union[str, dict[str, object]]] = None + # If True, copy attributes from the dependency matching the first kind + # in the `kinds` list. + copy_attributes: Optional[bool] = None + # If true (the default), there must be only a single unique task + # for each kind in a dependency group. + unique_kinds: Optional[bool] = None + # If present, a `fetches` entry will be added for each task dependency. + fetches: Optional[dict[str, list[FetchesEntrySchema]]] = None + + #: Schema for from_deps transforms -FROM_DEPS_SCHEMA = LegacySchema( - { - Required("from-deps"): { - Optional( - "kinds", - description=dedent( - """ - Limit dependencies to specified kinds (defaults to all kinds in - `kind-dependencies`). +class FromDepsSchema(Schema, forbid_unknown_fields=False): + from_deps: FromDepsConfig - The first kind in the list is the "primary" kind. The - dependency of this kind will be used to derive the label - and copy attributes (if `copy-attributes` is True). - """.lstrip() - ), - ): [str], - Optional( - "set-name", - description=dedent( - """ - UPDATE ME AND DOCS - """.lstrip() - ), - ): Any( - None, - False, - *SET_NAME_MAP, - {Any(*SET_NAME_MAP): object}, - ), - Optional( - "with-attributes", - description=dedent( - """ - Limit dependencies to tasks whose attributes match - using :func:`~taskgraph.util.attributes.attrmatch`. - """.lstrip() - ), - ): {str: Any(list, str)}, - Optional( - "group-by", - description=dedent( - """ - Group cross-kind dependencies using the given group-by - function. One task will be created for each group. If not - specified, the 'single' function will be used which creates - a new task for each individual dependency. - """.lstrip() - ), - ): Any( - None, - *GROUP_BY_MAP, - {Any(*GROUP_BY_MAP): object}, - ), - Optional( - "copy-attributes", - description=dedent( - """ - If True, copy attributes from the dependency matching the - first kind in the `kinds` list (whether specified explicitly - or taken from `kind-dependencies`). - """.lstrip() - ), - ): bool, - Optional( - "unique-kinds", - description=dedent( - """ - If true (the default), there must be only a single unique task - for each kind in a dependency group. Setting this to false - disables that requirement. - """.lstrip() - ), - ): bool, - Optional( - "fetches", - description=dedent( - """ - If present, a `fetches` entry will be added for each task - dependency. Attributes of the upstream task may be used as - substitution values in the `artifact` or `dest` values of the - `fetches` entry. - """.lstrip() - ), - ): {str: [fetches_schema]}, - }, - }, - extra=ALLOW_EXTRA, -) + +FROM_DEPS_SCHEMA = FromDepsSchema transforms = TransformSequence() transforms.add_validate(FROM_DEPS_SCHEMA) diff --git a/src/taskgraph/transforms/notify.py b/src/taskgraph/transforms/notify.py index a7d118f10..cdd17cfe1 100644 --- a/src/taskgraph/transforms/notify.py +++ b/src/taskgraph/transforms/notify.py @@ -8,12 +8,12 @@ more information. """ -from voluptuous import ALLOW_EXTRA, Any, Exclusive, Optional, Required +from typing import Literal, Optional, Union from taskgraph.transforms.base import TransformSequence -from taskgraph.util.schema import LegacySchema, optionally_keyed_by, resolve_keyed_by +from taskgraph.util.schema import Schema, optionally_keyed_by, resolve_keyed_by -_status_type = Any( +StatusType = Literal[ "on-completed", "on-defined", "on-exception", @@ -21,31 +21,34 @@ "on-pending", "on-resolved", "on-running", -) - -_recipients = [ - { - Required("type"): "email", - Required("address"): optionally_keyed_by("project", "level", str), - Optional("status-type"): _status_type, - }, - { - Required("type"): "matrix-room", - Required("room-id"): str, - Optional("status-type"): _status_type, - }, - { - Required("type"): "pulse", - Required("routing-key"): str, - Optional("status-type"): _status_type, - }, - { - Required("type"): "slack-channel", - Required("channel-id"): str, - Optional("status-type"): _status_type, - }, ] + +class EmailRecipient(Schema, tag_field="type", tag="email"): + address: optionally_keyed_by("project", "level", str, use_msgspec=True) # type: ignore + status_type: Optional[StatusType] = None + + +class MatrixRoomRecipient(Schema, tag_field="type", tag="matrix-room"): + room_id: str + status_type: Optional[StatusType] = None + + +class PulseRecipient(Schema, tag_field="type", tag="pulse"): + routing_key: str + status_type: Optional[StatusType] = None + + +class SlackChannelRecipient(Schema, tag_field="type", tag="slack-channel"): + channel_id: str + status_type: Optional[StatusType] = None + + +Recipient = Union[ + EmailRecipient, MatrixRoomRecipient, PulseRecipient, SlackChannelRecipient +] + + _route_keys = { "email": "address", "matrix-room": "room-id", @@ -54,43 +57,61 @@ } """Map each type to its primary key that will be used in the route.""" + +class EmailLinkContent(Schema): + text: str + href: str + + +class EmailContent(Schema): + subject: Optional[str] = None + content: Optional[str] = None + link: Optional[EmailLinkContent] = None + + +class MatrixContent(Schema): + body: Optional[str] = None + formatted_body: Optional[str] = None + format: Optional[str] = None + msg_type: Optional[str] = None + + +class SlackContent(Schema): + text: Optional[str] = None + blocks: Optional[list] = None + attachments: Optional[list] = None + + +class NotifyContentConfig(Schema): + email: Optional[EmailContent] = None + matrix: Optional[MatrixContent] = None + slack: Optional[SlackContent] = None + + +class NotifyConfig(Schema): + recipients: list[Recipient] + content: Optional[NotifyContentConfig] = None + + +class LegacyNotificationsConfig(Schema): + # Continue supporting the legacy schema for backwards compat. + emails: optionally_keyed_by("project", "level", list[str], use_msgspec=True) # type: ignore + subject: str + message: Optional[str] = None + status_types: Optional[list[StatusType]] = None + + #: Schema for notify transforms -NOTIFY_SCHEMA = LegacySchema( - { - Exclusive("notify", "config"): { - Required("recipients"): [Any(*_recipients)], - Optional("content"): { - Optional("email"): { - Optional("subject"): str, - Optional("content"): str, - Optional("link"): { - Required("text"): str, - Required("href"): str, - }, - }, - Optional("matrix"): { - Optional("body"): str, - Optional("formatted-body"): str, - Optional("format"): str, - Optional("msg-type"): str, - }, - Optional("slack"): { - Optional("text"): str, - Optional("blocks"): list, - Optional("attachments"): list, - }, - }, - }, - # Continue supporting the legacy schema for backwards compat. - Exclusive("notifications", "config"): { - Required("emails"): optionally_keyed_by("project", "level", [str]), - Required("subject"): str, - Optional("message"): str, - Optional("status-types"): [_status_type], - }, - }, - extra=ALLOW_EXTRA, -) +class NotifySchema(Schema, forbid_unknown_fields=False): + notify: Optional[NotifyConfig] = None + notifications: Optional[LegacyNotificationsConfig] = None + + def __post_init__(self): + if self.notify is not None and self.notifications is not None: + raise ValueError("'notify' and 'notifications' are mutually exclusive") + + +NOTIFY_SCHEMA = NotifySchema transforms = TransformSequence() transforms.add_validate(NOTIFY_SCHEMA) diff --git a/src/taskgraph/transforms/run/__init__.py b/src/taskgraph/transforms/run/__init__.py index ed3d7bf02..8bb20604f 100644 --- a/src/taskgraph/transforms/run/__init__.py +++ b/src/taskgraph/transforms/run/__init__.py @@ -11,159 +11,121 @@ import copy import logging -from textwrap import dedent - -from voluptuous import Exclusive, Extra, Optional, Required +from typing import Literal, Optional, Union from taskgraph.transforms.base import TransformSequence from taskgraph.transforms.cached_tasks import order_tasks -from taskgraph.transforms.task import task_description_schema +from taskgraph.transforms.task import TaskDescriptionSchema from taskgraph.util import json from taskgraph.util import path as mozpath from taskgraph.util.python_path import import_sibling_modules -from taskgraph.util.schema import LegacySchema, validate_schema +from taskgraph.util.schema import ( + OptimizationType, + Schema, + validate_schema, +) from taskgraph.util.taskcluster import get_artifact_prefix from taskgraph.util.workertypes import worker_type_implementation logger = logging.getLogger(__name__) + # Fetches may be accepted in other transforms and eventually passed along # to a `task` (eg: from_deps). Defining this here allows them to reuse # the schema and avoid duplication. -fetches_schema = { - Required("artifact"): str, - Optional("dest"): str, - Optional("extract"): bool, - Optional("verify-hash"): bool, -} +class FetchesEntrySchema(Schema): + artifact: str + dest: Optional[str] = None + extract: Optional[bool] = None + verify_hash: Optional[bool] = None + + +class WhenConfig(Schema): + # This task only needs to be run if a file matching one of the given + # patterns has changed in the push. + files_changed: Optional[list[str]] = None + + +class RunConfig(Schema, forbid_unknown_fields=False): + # The key to a run implementation in a peer module to this one. + using: str + # Base work directory used to set up the task. + workdir: Optional[str] = None + #: Schema for a run transforms -run_description_schema = LegacySchema( - { - Optional( - "name", - description=dedent( - """ - The name of the task. At least one of 'name' or 'label' must be - specified. If 'label' is not provided, it will be generated from - the 'name' by prepending the kind. - """ - ), - ): str, - Optional( - "label", - description=dedent( - """ - The label of the task. At least one of 'name' or 'label' must be - specified. If 'label' is not provided, it will be generated from - the 'name' by prepending the kind. - """ - ), - ): str, - # the following fields are passed directly through to the task description, - # possibly modified by the run implementation. See - # taskcluster/taskgraph/transforms/task.py for the schema details. - Required("description"): task_description_schema["description"], - Optional("priority"): task_description_schema["priority"], - Optional("attributes"): task_description_schema["attributes"], - Optional("task-from"): task_description_schema["task-from"], - Optional("dependencies"): task_description_schema["dependencies"], - Optional("soft-dependencies"): task_description_schema["soft-dependencies"], - Optional("if-dependencies"): task_description_schema["if-dependencies"], - Optional("requires"): task_description_schema["requires"], - Optional("deadline-after"): task_description_schema["deadline-after"], - Optional("expires-after"): task_description_schema["expires-after"], - Optional("routes"): task_description_schema["routes"], - Optional("scopes"): task_description_schema["scopes"], - Optional("tags"): task_description_schema["tags"], - Optional("extra"): task_description_schema["extra"], - Optional("treeherder"): task_description_schema["treeherder"], - Optional("index"): task_description_schema["index"], - Optional("run-on-projects"): task_description_schema["run-on-projects"], - Optional("run-on-tasks-for"): task_description_schema["run-on-tasks-for"], - Optional("run-on-git-branches"): task_description_schema["run-on-git-branches"], - Optional("shipping-phase"): task_description_schema["shipping-phase"], - Optional("always-target"): task_description_schema["always-target"], - Exclusive("optimization", "optimization"): task_description_schema[ - "optimization" - ], - Optional("needs-sccache"): task_description_schema["needs-sccache"], - Exclusive( - "when", - "optimization", - description=dedent( - """ - The "when" section contains descriptions of the circumstances under - which this task should be included in the task graph. This will be - converted into an optimization, so it cannot be specified in a run - description that also gives 'optimization'. - """ - ), - ): { - Optional( - "files-changed", - description=dedent( - """ - This task only needs to be run if a file matching one of the given - patterns has changed in the push. The patterns use the mozpack - match function (python/mozbuild/mozpack/path.py). - """ - ), - ): [str], - }, - Optional( - "fetches", - description=dedent( - """ - A list of artifacts to install from 'fetch' tasks. - """ - ), - ): { - str: [ - str, - fetches_schema, - ], - }, - Required( - "run", - description=dedent( - """ - A description of how to run this task. - """ - ), - ): { - Required( - "using", - description=dedent( - """ - The key to a run implementation in a peer module to this one. - """ - ), - ): str, - Optional( - "workdir", - description=dedent( - """ - Base work directory used to set up the task. - """ - ), - ): str, - # Any remaining content is verified against that run implementation's - # own schema. - Extra: object, - }, - Required("worker-type"): task_description_schema["worker-type"], - Optional( - "worker", - description=dedent( - """ - This object will be passed through to the task description, with additions - provided by the task's run-using function. - """ - ), - ): dict, - } -) +class RunDescriptionSchema(Schema, forbid_unknown_fields=False): + # A description of how to run this task. + run: RunConfig + worker_type: TaskDescriptionSchema.__annotations__["worker_type"] # type: ignore # noqa: F821 + # Description of the task (for metadata). + description: TaskDescriptionSchema.__annotations__["description"] # type: ignore # noqa: F821 + # The name of the task. + name: Optional[str] = None + # The label of the task. + label: Optional[str] = None + # Priority of the task. + priority: TaskDescriptionSchema.__annotations__["priority"] = None # type: ignore + # Attributes for this task. + attributes: TaskDescriptionSchema.__annotations__["attributes"] = None # type: ignore + # Relative path (from config.path) to the file task was defined in. + task_from: TaskDescriptionSchema.__annotations__["task_from"] = None # type: ignore + # Dependencies of this task. + dependencies: TaskDescriptionSchema.__annotations__["dependencies"] = None # type: ignore + # Soft dependencies of this task, as a list of task labels. + soft_dependencies: TaskDescriptionSchema.__annotations__["soft_dependencies"] = None # type: ignore + # Dependencies that must be scheduled in order for this task to run. + if_dependencies: TaskDescriptionSchema.__annotations__["if_dependencies"] = None # type: ignore + # Specifies the condition for task execution. + requires: TaskDescriptionSchema.__annotations__["requires"] = None # type: ignore + # Deadline time relative to task creation, with units. + deadline_after: TaskDescriptionSchema.__annotations__["deadline_after"] = None # type: ignore + # Expiration time relative to task creation, with units. + expires_after: TaskDescriptionSchema.__annotations__["expires_after"] = None # type: ignore + # Custom routes for this task. + routes: TaskDescriptionSchema.__annotations__["routes"] = None # type: ignore + # Custom scopes for this task. + scopes: TaskDescriptionSchema.__annotations__["scopes"] = None # type: ignore + # Tags for this task. + tags: TaskDescriptionSchema.__annotations__["tags"] = None # type: ignore + # Custom 'task.extra' content. + extra: TaskDescriptionSchema.__annotations__["extra"] = None # type: ignore + # Treeherder-related information. + treeherder: TaskDescriptionSchema.__annotations__["treeherder"] = None # type: ignore + # Information for indexing this build. + index: TaskDescriptionSchema.__annotations__["index"] = None # type: ignore + # The `run_on_projects` attribute, defaulting to 'all'. + run_on_projects: TaskDescriptionSchema.__annotations__["run_on_projects"] = None # type: ignore + # Specifies tasks for which this task should run. + run_on_tasks_for: TaskDescriptionSchema.__annotations__["run_on_tasks_for"] = None # type: ignore + # Specifies git branches for which this task should run. + run_on_git_branches: TaskDescriptionSchema.__annotations__[ # type: ignore + "run_on_git_branches" # type: ignore + ] = None + # The `shipping_phase` attribute, defaulting to None. + shipping_phase: TaskDescriptionSchema.__annotations__["shipping_phase"] = None # type: ignore + # The `always-target` attribute. + always_target: Optional[bool] = None + # Optimization to perform on this task during the optimization phase. + optimization: Optional[OptimizationType] = None + # Whether the task should use sccache compiler caching. + needs_sccache: Optional[bool] = None + # The "when" section contains descriptions of the circumstances under + # which this task should be included in the task graph. + when: Optional[WhenConfig] = None + # A list of artifacts to install from 'fetch' tasks. + fetches: Optional[dict[str, list[Union[str, FetchesEntrySchema]]]] = None + # This object will be passed through to the task description, with additions + # provided by the task's run-using function. + worker: Optional[dict] = None + + def __post_init__(self): + # Exclusive: optimization and when are mutually exclusive + if self.optimization is not None and self.when is not None: + raise ValueError("'optimization' and 'when' are mutually exclusive") + + +run_description_schema = RunDescriptionSchema transforms = TransformSequence() transforms.add_validate(run_description_schema) @@ -456,9 +418,11 @@ def wrap(func): return wrap -@run_task_using( - "always-optimized", "always-optimized", LegacySchema({"using": "always-optimized"}) -) +class AlwaysOptimizedRunSchema(Schema): + using: Literal["always-optimized"] + + +@run_task_using("always-optimized", "always-optimized", AlwaysOptimizedRunSchema) def always_optimized(config, task, taskdesc): pass diff --git a/src/taskgraph/transforms/run/run_task.py b/src/taskgraph/transforms/run/run_task.py index ce81d7d23..c5af62eb9 100644 --- a/src/taskgraph/transforms/run/run_task.py +++ b/src/taskgraph/transforms/run/run_task.py @@ -7,111 +7,56 @@ import dataclasses import os -from textwrap import dedent - -from voluptuous import Any, Optional, Required +from typing import Literal, Optional, Union from taskgraph.transforms.run import run_task_using from taskgraph.transforms.run.common import ( support_caches, support_vcs_checkout, ) -from taskgraph.transforms.task import taskref_or_string from taskgraph.util import path, taskcluster -from taskgraph.util.caches import CACHES -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema, taskref_or_string_msgspec EXEC_COMMANDS = { "bash": ["bash", "-cx"], "powershell": ["powershell.exe", "-ExecutionPolicy", "Bypass"], } +CacheType = Literal["cargo", "checkout", "npm", "pip", "uv"] +ExecWith = Literal["bash", "powershell"] + #: Schema for run.using run_task -run_task_schema = LegacySchema( - { - Required( - "using", - description=dedent( - """ - Specifies the task type. Must be 'run-task'. - """.lstrip() - ), - ): "run-task", - Optional( - "use-caches", - description=dedent( - """ - Specifies which caches to use. May take a boolean in which case either all - (True) or no (False) caches will be used. Alternatively, it can accept a - list of caches to enable. Defaults to only the checkout cache enabled. - """.lstrip() - ), - ): Any(bool, list(CACHES.keys())), - Required( - "checkout", - description=dedent( - """ - If true (the default), perform a checkout on the worker. Can also be a - dictionary specifying explicit checkouts. - """.lstrip() - ), - ): Any(bool, {str: dict}), - Optional( - "cwd", - description=dedent( - """ - Path to run command in. If a checkout is present, the path to the checkout - will be interpolated with the key `checkout`. - """.lstrip() - ), - ): str, - Required( - "command", - description=dedent( - """ - The command arguments to pass to the `run-task` script, after the checkout - arguments. If a list, it will be passed directly; otherwise it will be - included in a single argument to the command specified by `exec-with`. - """.lstrip() - ), - ): Any([taskref_or_string], taskref_or_string), - Optional( - "exec-with", - description=dedent( - """ - Specifies what to execute the command with in the event the command is a - string. - """.lstrip() - ), - ): Any(*list(EXEC_COMMANDS)), - Optional( - "run-task-command", - description=dedent( - """ - Command used to invoke the `run-task` script. Can be used if the script - or Python installation is in a non-standard location on the workers. - """.lstrip() - ), - ): list, - Required( - "workdir", - description=dedent( - """ - Base work directory used to set up the task. - """.lstrip() - ), - ): str, - Optional( - "run-as-root", - description=dedent( - """ - Whether to run as root. Defaults to False. - """.lstrip() - ), - ): bool, - } -) +class RunTaskRunSchema(Schema, forbid_unknown_fields=False): + # Specifies the task type. Must be 'run-task'. + using: Literal["run-task"] + # The command arguments to pass to the `run-task` script, after the checkout + # arguments. If a list, it will be passed directly; otherwise it will be + # included in a single argument to the command specified by `exec-with`. + command: Union[list[taskref_or_string_msgspec], taskref_or_string_msgspec] + # If true (the default), perform a checkout on the worker. Can also be a + # dictionary specifying explicit checkouts. + checkout: Union[bool, dict[str, dict]] + # Base work directory used to set up the task. + workdir: str + # Specifies which caches to use. May take a boolean in which case either all + # (True) or no (False) caches will be used. Alternatively, it can accept a + # list of caches to enable. Defaults to only the checkout cache enabled. + use_caches: Optional[Union[bool, list[CacheType]]] = None + # Path to run command in. If a checkout is present, the path to the checkout + # will be interpolated with the key `checkout`. + cwd: Optional[str] = None + # Specifies what to execute the command with in the event the command is a + # string. + exec_with: Optional[ExecWith] = None + # Command used to invoke the `run-task` script. Can be used if the script + # or Python installation is in a non-standard location on the workers. + run_task_command: Optional[list] = None + # Whether to run as root. Defaults to False. + run_as_root: Optional[bool] = None + + +run_task_schema = RunTaskRunSchema def common_setup(config, task, taskdesc, command): diff --git a/src/taskgraph/transforms/task.py b/src/taskgraph/transforms/task.py index 59efbd8f3..e9e437cd3 100644 --- a/src/taskgraph/transforms/task.py +++ b/src/taskgraph/transforms/task.py @@ -14,20 +14,20 @@ import time from copy import deepcopy from dataclasses import dataclass -from textwrap import dedent -from typing import Callable - -from voluptuous import All, Any, Extra, NotIn, Optional, Required +from typing import Callable, Literal, Optional, Union from taskgraph.transforms.base import TransformSequence from taskgraph.util.hash import hash_path from taskgraph.util.keyed_by import evaluate_keyed_by from taskgraph.util.schema import ( - LegacySchema, - OptimizationSchema, + IndexSchema, + OptimizationType, + Schema, + TaskPriority, + TreeherderConfig, optionally_keyed_by, resolve_keyed_by, - taskref_or_string, + taskref_or_string_msgspec, validate_schema, ) from taskgraph.util.treeherder import split_symbol, treeherder_defaults @@ -47,343 +47,82 @@ def run_task_suffix(): return hash_path(RUN_TASK)[0:20] +class WorkerSchema(Schema, forbid_unknown_fields=False): + # The worker implementation type. + implementation: str + + #: Schema for the task transforms -task_description_schema = LegacySchema( - { - Required( - "label", - description=dedent( - """ - The label for this task. - """.lstrip() - ), - ): str, - Required( - "description", - description=dedent( - """ - Description of the task (for metadata). - """.lstrip() - ), - ): str, - Optional( - "attributes", - description=dedent( - """ - Attributes for this task. - """.lstrip() - ), - ): {str: object}, - Optional( - "task-from", - description=dedent( - """ - Relative path (from config.path) to the file task was defined - in. - """.lstrip() - ), - ): str, - Optional( - "dependencies", - description=dedent( - """ - Dependencies of this task, keyed by name; these are passed - through verbatim and subject to the interpretation of the - Task's get_dependencies method. - """.lstrip() - ), - ): { - All( - str, - NotIn( - ["self", "decision"], - "Can't use 'self` or 'decision' as dependency names.", - ), - ): object, - }, - Optional( - "priority", - description=dedent( - """ - Priority of the task. - """.lstrip() - ), - ): Any( - "highest", - "very-high", - "high", - "medium", - "low", - "very-low", - "lowest", - ), - Optional( - "soft-dependencies", - description=dedent( - """ - Soft dependencies of this task, as a list of task labels. - """.lstrip() - ), - ): [str], - Optional( - "if-dependencies", - description=dedent( - """ - Dependencies that must be scheduled in order for this task to run. - """.lstrip() - ), - ): [str], - Optional( - "requires", - description=dedent( - """ - Specifies the condition for task execution. - """.lstrip() - ), - ): Any("all-completed", "all-resolved"), - Optional( - "expires-after", - description=dedent( - """ - Expiration time relative to task creation, with units (e.g., - '14 days'). Defaults are set based on the project. - """.lstrip() - ), - ): str, - Optional( - "deadline-after", - description=dedent( - """ - Deadline time relative to task creation, with units (e.g., - '14 days'). Defaults are set based on the project. - """.lstrip() - ), - ): str, - Optional( - "routes", - description=dedent( - """ - Custom routes for this task; the default treeherder routes will - be added automatically. - """.lstrip() - ), - ): [str], - Optional( - "scopes", - description=dedent( - """ - Custom scopes for this task; any scopes required for the worker - will be added automatically. The following parameters will be - substituted in each scope: - - {level} -- the scm level of this push - {project} -- the project of this push. - """.lstrip() - ), - ): [str], - Optional( - "tags", - description=dedent( - """ - Tags for this task. - """.lstrip() - ), - ): {str: str}, - Optional( - "extra", - description=dedent( - """ - Custom 'task.extra' content. - """.lstrip() - ), - ): {str: object}, - Optional( - "treeherder", - description=dedent( - """ - Treeherder-related information. Can be a simple `true` to - auto-generate information or a dictionary with specific keys. - """.lstrip() - ), - ): Any( - True, - { - "symbol": Optional( - str, - description=dedent( - """ - Either a bare symbol, or 'grp(sym)'. Defaults to the - uppercased first letter of each section of the kind - (delimited by '-') all smooshed together. - """.lstrip() - ), - ), - "kind": Optional( - Any("build", "test", "other"), - description=dedent( - """ - The task kind. Defaults to 'build', 'test', or 'other' - based on the kind name. - """.lstrip() - ), - ), - "tier": Optional( - int, - description=dedent( - """ - Tier for this task. Defaults to 1. - """.lstrip() - ), - ), - "platform": Optional( - str, - description=dedent( - """ - Task platform in the form platform/collection, used to - set treeherder.machine.platform and - treeherder.collection or treeherder.labels Defaults to - 'default/opt'. - """.lstrip() - ), - ), - }, - ), - Optional( - "index", - description=dedent( - """ - Information for indexing this build so its artifacts can be - discovered. If omitted, the build will not be indexed. - """.lstrip() - ), - ): { - # the name of the product this build produces - "product": str, - # the names to use for this task in the TaskCluster index - "job-name": str, - # Type of gecko v2 index to use - "type": str, - # The rank that the task will receive in the TaskCluster - # index. A newly completed task supersedes the currently - # indexed task iff it has a higher rank. If unspecified, - # 'by-tier' behavior will be used. - "rank": Any( - # Rank is equal the timestamp of the build_date for tier-1 - # tasks, and zero for non-tier-1. This sorts tier-{2,3} - # builds below tier-1 in the index. - "by-tier", - # Rank is given as an integer constant (e.g. zero to make - # sure a task is last in the index). - int, - # Rank is equal to the timestamp of the build_date. This - # option can be used to override the 'by-tier' behavior - # for non-tier-1 tasks. - "build_date", - ), - }, - Optional( - "run-on-projects", - description=dedent( - """ - The `run_on_projects` attribute, defaulting to 'all'. Dictates - the projects on which this task should be included in the - target task set. See the attributes documentation for details. - """.lstrip() - ), - ): optionally_keyed_by("build-platform", [str]), - Optional( - "run-on-tasks-for", - description=dedent( - """ - Specifies tasks for which this task should run. - """.lstrip() - ), - ): [str], - Optional( - "run-on-git-branches", - description=dedent( - """ - Specifies git branches for which this task should run. - """.lstrip() - ), - ): [str], - Optional( - "shipping-phase", - description=dedent( - """ - The `shipping_phase` attribute, defaulting to None. Specifies - the release promotion phase that this task belongs to. - """.lstrip() - ), - ): Any( - None, - "build", - "promote", - "push", - "ship", - ), - Required( - "always-target", - description=dedent( - """ - The `always-target` attribute will cause the task to be - included in the target_task_graph regardless of filtering. - - Tasks included in this manner will be candidates for - optimization even when `optimize_target_tasks` is False, unless - the task was also explicitly chosen by the target_tasks method. - """.lstrip() - ), - ): bool, - Required( - "optimization", - description=dedent( - """ - Optimization to perform on this task during the optimization - phase. Defined in taskcluster/taskgraph/optimize.py. - """.lstrip() - ), - ): OptimizationSchema, - Required( - "worker-type", - description=dedent( - """ - The provisioner-id/worker-type for the task. The following - parameters will be substituted in this string: - - {level} -- the scm level of this push. - """.lstrip() - ), - ): str, - Required( - "needs-sccache", - description=dedent( - """ - Whether the task should use sccache compiler caching. - """.lstrip() - ), - ): bool, - Optional( - "worker", - description=dedent( - """ - Information specific to the worker implementation that will run - this task. - """.lstrip() - ), - ): { - Required( - "implementation", - description=dedent( - """ - The worker implementation type. - """.lstrip() - ), - ): str, - Extra: object, - }, - } -) +class TaskDescriptionSchema(Schema): + # The label for this task. + label: str + # Description of the task (for metadata). + description: str + # The `always-target` attribute will cause the task to be included in + # the target_task_graph regardless of filtering. + always_target: bool + # Optimization to perform on this task during the optimization phase. + optimization: OptimizationType + # The provisioner-id/worker-type for the task. + worker_type: str + # Whether the task should use sccache compiler caching. + needs_sccache: bool + # Attributes for this task. + attributes: Optional[dict[str, object]] = None + # Relative path (from config.path) to the file task was defined in. + task_from: Optional[str] = None + # Dependencies of this task, keyed by name; these are passed through + # verbatim and subject to the interpretation of the Task's + # get_dependencies method. + dependencies: Optional[dict[str, object]] = None + # Priority of the task. + priority: Optional[TaskPriority] = None + # Soft dependencies of this task, as a list of task labels. + soft_dependencies: Optional[list[str]] = None + # Dependencies that must be scheduled in order for this task to run. + if_dependencies: Optional[list[str]] = None + # Specifies the condition for task execution. + requires: Optional[Literal["all-completed", "all-resolved"]] = None + # Expiration time relative to task creation, with units (e.g., '14 days'). + expires_after: Optional[str] = None + # Deadline time relative to task creation, with units (e.g., '14 days'). + deadline_after: Optional[str] = None + # Custom routes for this task. + routes: Optional[list[str]] = None + # Custom scopes for this task. + scopes: Optional[list[str]] = None + # Tags for this task. + tags: Optional[dict[str, str]] = None + # Custom 'task.extra' content. + extra: Optional[dict[str, object]] = None + # Treeherder-related information. Can be a simple `true` to + # auto-generate information or a dictionary with specific keys. + treeherder: Optional[Union[bool, TreeherderConfig]] = None + # Information for indexing this build so its artifacts can be discovered. + index: Optional[IndexSchema] = None + # The `run_on_projects` attribute, defaulting to 'all'. + run_on_projects: Optional[ + optionally_keyed_by("build-platform", list[str], use_msgspec=True) # type: ignore + ] = None + # Specifies tasks for which this task should run. + run_on_tasks_for: Optional[list[str]] = None + # Specifies git branches for which this task should run. + run_on_git_branches: Optional[list[str]] = None + # The `shipping_phase` attribute, defaulting to None. + shipping_phase: Optional[Literal["build", "promote", "push", "ship"]] = None + # Information specific to the worker implementation that will run this task. + worker: Optional[WorkerSchema] = None + + def __post_init__(self): + if self.dependencies: + for key in self.dependencies: + if key in ("self", "decision"): + raise ValueError( + "Can't use 'self' or 'decision' as dependency names." + ) + + +task_description_schema = TaskDescriptionSchema TC_TREEHERDER_SCHEMA_URL = ( "https://github.com/taskcluster/taskcluster-treeherder/" @@ -430,18 +169,14 @@ def get_default_deadline(graph_config, project): @dataclass(frozen=True) class PayloadBuilder: - schema: LegacySchema + schema: object builder: Callable def payload_builder(name, schema): - schema = LegacySchema( - {Required("implementation"): name, Optional("os"): str} - ).extend(schema) - def wrap(func): assert name not in payload_builders, f"duplicate payload builder name {name}" - payload_builders[name] = PayloadBuilder(schema, func) # type: ignore + payload_builders[name] = PayloadBuilder(schema, func) return func return wrap @@ -472,86 +207,67 @@ def verify_index(config, index): raise Exception(UNSUPPORTED_INDEX_PRODUCT_ERROR.format(product=product)) -@payload_builder( - "docker-worker", - schema={ - Required("os"): "linux", - # For tasks that will run in docker-worker, this is the name of the docker - # image or in-tree docker image to run the task in. If in-tree, then a - # dependency will be created automatically. This is generally - # `desktop-test`, or an image that acts an awful lot like it. - Required("docker-image"): Any( - # a raw Docker image path (repo/image:tag) - str, - # an in-tree generated docker image (from `taskcluster/docker/`) - {"in-tree": str}, - # an indexed docker image - {"indexed": str}, - ), - # worker features that should be enabled - Required("relengapi-proxy"): bool, - Required("chain-of-trust"): bool, - Required("taskcluster-proxy"): bool, - Required("allow-ptrace"): bool, - Required("loopback-video"): bool, - Required("loopback-audio"): bool, - Required("docker-in-docker"): bool, # (aka 'dind') - Required("privileged"): bool, - # Paths to Docker volumes. - # - # For in-tree Docker images, volumes can be parsed from Dockerfile. - # This only works for the Dockerfile itself: if a volume is defined in - # a base image, it will need to be declared here. Out-of-tree Docker - # images will also require explicit volume annotation. - # - # Caches are often mounted to the same path as Docker volumes. In this - # case, they take precedence over a Docker volume. But a volume still - # needs to be declared for the path. - Optional("volumes"): [str], - # caches to set up for the task - Optional("caches"): [ - { - # only one type is supported by any of the workers right now - "type": "persistent", - # name of the cache, allowing reuse by subsequent tasks naming the - # same cache - "name": str, - # location in the task image where the cache will be mounted - "mount-point": str, - # Whether the cache is not used in untrusted environments - # (like the Try repo). - Optional("skip-untrusted"): bool, - } - ], - # artifacts to extract from the task image after completion - Optional("artifacts"): [ - { - # type of artifact -- simple file, or recursive directory, - # or a volume mounted directory. - "type": Any("file", "directory", "volume"), - # task image path from which to read artifact - "path": str, - # name of the produced artifact (root of the names for - # type=directory) - "name": str, - } - ], - # environment variables - Required("env"): {str: taskref_or_string}, - # the command to run; if not given, docker-worker will default to the - # command in the docker image - Optional("command"): [taskref_or_string], - # the maximum time to run, in seconds - Required("max-run-time"): int, - # the exit status code(s) that indicates the task should be retried - Optional("retry-exit-status"): [int], - # the exit status code(s) that indicates the caches used by the task - # should be purged - Optional("purge-caches-exit-status"): [int], - # Whether any artifacts are assigned to this worker - Optional("skip-artifacts"): bool, - }, -) +DockerImage = Union[str, dict[str, str]] + + +class DockerWorkerCacheEntry(Schema): + # only one type is supported by any of the workers right now + type: Literal["persistent"] + # name of the cache, allowing reuse by subsequent tasks naming the same cache + name: str + # location in the task image where the cache will be mounted + mount_point: str + # Whether the cache is not used in untrusted environments (like the Try repo). + skip_untrusted: Optional[bool] = None + + +class DockerWorkerArtifact(Schema): + # type of artifact -- simple file, or recursive directory, or a volume mounted directory. + type: Literal["file", "directory", "volume"] + # task image path from which to read artifact + path: str + # name of the produced artifact (root of the names for type=directory) + name: str + + +class DockerWorkerPayloadSchema(Schema, forbid_unknown_fields=False): + implementation: Literal["docker-worker"] + os: Literal["linux"] + # For tasks that will run in docker-worker, this is the name of the docker + # image or in-tree docker image to run the task in. + docker_image: DockerImage + # worker features that should be enabled + relengapi_proxy: bool + chain_of_trust: bool + taskcluster_proxy: bool + allow_ptrace: bool + loopback_video: bool + loopback_audio: bool + docker_in_docker: bool # (aka 'dind') + privileged: bool + # environment variables + env: dict[str, taskref_or_string_msgspec] + # the maximum time to run, in seconds + max_run_time: int + # Paths to Docker volumes. + volumes: Optional[list[str]] = None + # caches to set up for the task + caches: Optional[list[DockerWorkerCacheEntry]] = None + # artifacts to extract from the task image after completion + artifacts: Optional[list[DockerWorkerArtifact]] = None + # the command to run; if not given, docker-worker will default to the + # command in the docker image + command: Optional[list[taskref_or_string_msgspec]] = None + # the exit status code(s) that indicates the task should be retried + retry_exit_status: Optional[list[int]] = None + # the exit status code(s) that indicates the caches used by the task + # should be purged + purge_caches_exit_status: Optional[list[int]] = None + # Whether any artifacts are assigned to this worker + skip_artifacts: Optional[bool] = None + + +@payload_builder("docker-worker", schema=DockerWorkerPayloadSchema) def build_docker_worker_payload(config, task, task_def): worker = task["worker"] level = int(config.params["level"]) @@ -762,89 +478,71 @@ def build_docker_worker_payload(config, task, task_def): payload["capabilities"] = capabilities -@payload_builder( - "generic-worker", - schema={ - Required("os"): Any("windows", "macosx", "linux", "linux-bitbar"), - # see http://schemas.taskcluster.net/generic-worker/v1/payload.json - # and https://docs.taskcluster.net/reference/workers/generic-worker/payload - # command is a list of commands to run, sequentially - # on Windows, each command is a string, on OS X and Linux, each command is - # a string array - Required("command"): Any( - [taskref_or_string], - [[taskref_or_string]], # Windows # Linux / OS X - ), - # artifacts to extract from the task image after completion; note that artifacts - # for the generic worker cannot have names - Optional("artifacts"): [ - { - # type of artifact -- simple file, or recursive directory - "type": Any("file", "directory"), - # filesystem path from which to read artifact - "path": str, - # if not specified, path is used for artifact name - Optional("name"): str, - } - ], - # Directories and/or files to be mounted. - # The actual allowed combinations are stricter than the model below, - # but this provides a simple starting point. - # See https://docs.taskcluster.net/reference/workers/generic-worker/payload - Optional("mounts"): [ - { - # A unique name for the cache volume, implies writable cache directory - # (otherwise mount is a read-only file or directory). - Optional("cache-name"): str, - # Optional content for pre-loading cache, or mandatory content for - # read-only file or directory. Pre-loaded content can come from either - # a task artifact or from a URL. - Optional("content"): { - # *** Either (artifact and task-id) or url must be specified. *** - # Artifact name that contains the content. - Optional("artifact"): str, - # Task ID that has the artifact that contains the content. - Optional("task-id"): taskref_or_string, - # URL that supplies the content in response to an unauthenticated - # GET request. - Optional("url"): str, - }, - # *** Either file or directory must be specified. *** - # If mounting a cache or read-only directory, the filesystem location of - # the directory should be specified as a relative path to the task - # directory here. - Optional("directory"): str, - # If mounting a file, specify the relative path within the task - # directory to mount the file (the file will be read only). - Optional("file"): str, - # Required if and only if `content` is specified and mounting a - # directory (not a file). This should be the archive format of the - # content (either pre-loaded cache or read-only directory). - Optional("format"): Any("rar", "tar.bz2", "tar.gz", "zip"), - } - ], - # environment variables - Required("env"): {str: taskref_or_string}, - # the maximum time to run, in seconds - Required("max-run-time"): int, - # the exit status code(s) that indicates the task should be retried - Optional("retry-exit-status"): [int], - # the exit status code(s) that indicates the caches used by the task - # should be purged - Optional("purge-caches-exit-status"): [int], - # os user groups for test task workers - Optional("os-groups"): [str], - # feature for test task to run as administarotr - Optional("run-as-administrator"): bool, - # feature for task to run as current OS user - Optional("run-task-as-current-user"): bool, - # optional features - Required("chain-of-trust"): bool, - Optional("taskcluster-proxy"): bool, - # Whether any artifacts are assigned to this worker - Optional("skip-artifacts"): bool, - }, -) +class GenericWorkerArtifact(Schema): + # type of artifact -- simple file, or recursive directory + type: Literal["file", "directory"] + # filesystem path from which to read artifact + path: str + # if not specified, path is used for artifact name + name: Optional[str] = None + + +class MountContentSchema(Schema): + # Artifact name that contains the content. + artifact: Optional[str] = None + # Task ID that has the artifact that contains the content. + task_id: Optional[taskref_or_string_msgspec] = None + # URL that supplies the content in response to an unauthenticated GET request. + url: Optional[str] = None + + +class MountSchema(Schema): + # A unique name for the cache volume. + cache_name: Optional[str] = None + # Optional content for pre-loading cache, or mandatory content for + # read-only file or directory. + content: Optional[MountContentSchema] = None + # If mounting a cache or read-only directory. + directory: Optional[str] = None + # If mounting a file. + file: Optional[str] = None + # Archive format of the content. + format: Optional[Literal["rar", "tar.bz2", "tar.gz", "zip"]] = None + + +class GenericWorkerPayloadSchema(Schema, forbid_unknown_fields=False): + implementation: Literal["generic-worker"] + os: Literal["windows", "macosx", "linux", "linux-bitbar"] + # command is a list of commands to run, sequentially + # On Windows, each command is a string; on Linux/OS X, each command is a string array + command: list + # environment variables + env: dict[str, taskref_or_string_msgspec] + # the maximum time to run, in seconds + max_run_time: int + # optional features + chain_of_trust: bool + # artifacts to extract from the task image after completion + artifacts: Optional[list[GenericWorkerArtifact]] = None + # Directories and/or files to be mounted. + mounts: Optional[list[MountSchema]] = None + # the exit status code(s) that indicates the task should be retried + retry_exit_status: Optional[list[int]] = None + # the exit status code(s) that indicates the caches used by the task + # should be purged + purge_caches_exit_status: Optional[list[int]] = None + # os user groups for test task workers + os_groups: Optional[list[str]] = None + # feature for test task to run as administrator + run_as_administrator: Optional[bool] = None + # feature for task to run as current OS user + run_task_as_current_user: Optional[bool] = None + taskcluster_proxy: Optional[bool] = None + # Whether any artifacts are assigned to this worker + skip_artifacts: Optional[bool] = None + + +@payload_builder("generic-worker", schema=GenericWorkerPayloadSchema) def build_generic_worker_payload(config, task, task_def): worker = task["worker"] @@ -956,38 +654,41 @@ def build_generic_worker_payload(config, task, task_def): task_def["payload"]["features"] = features -@payload_builder( - "beetmover", - schema={ - # the maximum time to run, in seconds - Required("max-run-time"): int, - # locale key, if this is a locale beetmover task - Optional("locale"): str, - Optional("partner-public"): bool, - Required("release-properties"): { - "app-name": str, - "app-version": str, - "branch": str, - "build-id": str, - "hash-type": str, - "platform": str, - }, - # list of artifact URLs for the artifacts that should be beetmoved - Required("upstream-artifacts"): [ - { - # taskId of the task with the artifact - Required("taskId"): taskref_or_string, - # type of signing task (for CoT) - Required("taskType"): str, - # Paths to the artifacts to sign - Required("paths"): [str], - # locale is used to map upload path and allow for duplicate simple names - Required("locale"): str, - } - ], - Optional("artifact-map"): object, - }, -) +class ReleaseProperties(Schema): + app_name: str + app_version: str + branch: str + build_id: str + hash_type: str + platform: str + + +class UpstreamArtifact(Schema, rename="camel"): + # taskId of the task with the artifact + task_id: taskref_or_string_msgspec + # type of signing task (for CoT) + task_type: str + # Paths to the artifacts to sign + paths: list[str] + # locale is used to map upload path and allow for duplicate simple names + locale: str + + +class BeetmoverPayloadSchema(Schema, forbid_unknown_fields=False): + implementation: Literal["beetmover"] + # the maximum time to run, in seconds + max_run_time: int + # release properties + release_properties: ReleaseProperties + # list of artifact URLs for the artifacts that should be beetmoved + upstream_artifacts: list[UpstreamArtifact] + # locale key, if this is a locale beetmover task + locale: Optional[str] = None + partner_public: Optional[bool] = None + artifact_map: Optional[object] = None + + +@payload_builder("beetmover", schema=BeetmoverPayloadSchema) def build_beetmover_payload(config, task, task_def): worker = task["worker"] release_properties = worker["release-properties"] @@ -1013,25 +714,27 @@ def build_beetmover_payload(config, task, task_def): task_def["payload"]["is_partner_repack_public"] = worker["partner-public"] -@payload_builder( - "invalid", - schema={ - # an invalid task is one which should never actually be created; this is used in - # release automation on branches where the task just doesn't make sense - Extra: object, - }, -) +class InvalidPayloadSchema(Schema, forbid_unknown_fields=False): + # an invalid task is one which should never actually be created; this is used in + # release automation on branches where the task just doesn't make sense + implementation: Literal["invalid"] + + +@payload_builder("invalid", schema=InvalidPayloadSchema) def build_invalid_payload(config, task, task_def): task_def["payload"] = "invalid task - should never be created" -@payload_builder( - "always-optimized", - schema={ - Extra: object, - }, -) -@payload_builder("succeed", schema={}) +class AlwaysOptimizedPayloadSchema(Schema, forbid_unknown_fields=False): + implementation: Literal["always-optimized"] + + +class SucceedPayloadSchema(Schema): + implementation: Literal["succeed"] + + +@payload_builder("always-optimized", schema=AlwaysOptimizedPayloadSchema) +@payload_builder("succeed", schema=SucceedPayloadSchema) def build_dummy_payload(config, task, task_def): task_def["payload"] = {} diff --git a/src/taskgraph/util/dependencies.py b/src/taskgraph/util/dependencies.py index 17c41d732..47fc8072a 100644 --- a/src/taskgraph/util/dependencies.py +++ b/src/taskgraph/util/dependencies.py @@ -7,7 +7,6 @@ from taskgraph.task import Task from taskgraph.transforms.base import TransformConfig -from taskgraph.util.schema import LegacySchema # Define a collection of group_by functions GROUP_BY_MAP = {} @@ -36,7 +35,7 @@ def group_by_all(config, tasks): return [[task for task in tasks]] -@group_by("attribute", schema=LegacySchema(str)) +@group_by("attribute", schema=str) def group_by_attribute(config, tasks, attr): groups = {} for task in tasks: diff --git a/src/taskgraph/util/schema.py b/src/taskgraph/util/schema.py index 71c9011a0..a18305a13 100644 --- a/src/taskgraph/util/schema.py +++ b/src/taskgraph/util/schema.py @@ -45,6 +45,9 @@ def validate_schema(schema, obj, msg_prefix): else: # Fall back to msgspec.convert for validation msgspec.convert(obj, schema) + # Handle plain Python types (e.g. str, int) via msgspec.convert + elif isinstance(schema, type): + msgspec.convert(obj, schema) else: raise TypeError(f"Unsupported schema type: {type(schema)}") except ( @@ -317,7 +320,7 @@ class IndexSchema(Schema): # the names to use for this task in the TaskCluster index job_name: str # Type of gecko v2 index to use - type: str + type: str = "generic" # The rank that the task will receive in the TaskCluster # index. A newly completed task supersedes the currently # indexed task iff it has a higher rank. If unspecified, @@ -325,6 +328,23 @@ class IndexSchema(Schema): rank: Union[Literal["by-tier", "build_date"], int] = "by-tier" +class TreeherderConfig(Schema): + # Either a bare symbol, or 'grp(sym)'. Defaults to the + # uppercased first letter of each section of the kind + # (delimited by '-') all smooshed together. + symbol: Optional[str] = None + # The task kind. Defaults to 'build', 'test', or 'other' + # based on the kind name. + kind: Optional[Literal["build", "test", "other"]] = None + # Tier for this task. Defaults to 1. + tier: Optional[int] = None + # Task platform in the form platform/collection, used to + # set treeherder.machine.platform and + # treeherder.collection or treeherder.labels Defaults to + # 'default/opt'. + platform: Optional[str] = None + + class IndexSearchOptimizationSchema(Schema): """Search the index for the given index namespaces.""" diff --git a/test/test_transforms_run_run_task.py b/test/test_transforms_run_run_task.py index 2ef25b9f2..e41b74de4 100644 --- a/test/test_transforms_run_run_task.py +++ b/test/test_transforms_run_run_task.py @@ -5,12 +5,13 @@ import os.path from pprint import pprint +import msgspec import pytest from taskgraph.transforms.run import make_task_description from taskgraph.transforms.task import payload_builders, set_defaults from taskgraph.util.caches import CACHES -from taskgraph.util.schema import LegacySchema, validate_schema +from taskgraph.util.schema import validate_schema from taskgraph.util.taskcluster import get_root_url from taskgraph.util.templates import merge @@ -257,9 +258,9 @@ def inner(task, **kwargs): print("Dumping for copy/paste:") pprint(caches, indent=2) - # Create a new schema object with just the part relevant to caches. - partial_schema = LegacySchema(payload_builders[impl].schema.schema[key]) - validate_schema(partial_schema, caches, "validation error") + # Validate the caches against the relevant field type from the schema. + field_type = payload_builders[impl].schema.__annotations__[key] + msgspec.convert(caches, field_type) return caches diff --git a/test/test_util_schema.py b/test/test_util_schema.py index 3364b2813..6a240e4da 100644 --- a/test/test_util_schema.py +++ b/test/test_util_schema.py @@ -4,61 +4,65 @@ import unittest +import msgspec import pytest -from voluptuous import Invalid, MultipleInvalid import taskgraph from taskgraph.util.schema import ( - LegacySchema, + Schema, optionally_keyed_by, resolve_keyed_by, validate_schema, ) -schema = LegacySchema( - { - "x": int, - "y": str, - } -) + +class SampleSchema(Schema): + x: int + y: str class TestValidateSchema(unittest.TestCase): def test_valid(self): - validate_schema(schema, {"x": 10, "y": "foo"}, "pfx") + validate_schema(SampleSchema, {"x": 10, "y": "foo"}, "pfx") def test_invalid(self): try: - validate_schema(schema, {"x": "not-int"}, "pfx") + validate_schema(SampleSchema, {"x": "not-int"}, "pfx") self.fail("no exception raised") except Exception as e: self.assertTrue(str(e).startswith("pfx\n")) -class TestCheckSchema(unittest.TestCase): - def test_schema(self): - "Creating a schema applies taskgraph checks." - with self.assertRaises(Exception): - LegacySchema({"camelCase": int}) +class TestSchemaFeatures(unittest.TestCase): + def test_kebab_rename(self): + """Schema renames snake_case fields to kebab-case.""" + result = msgspec.convert({"my-field": 42}, MyFieldSchema) + assert result.my_field == 42 + + def test_forbid_unknown_fields(self): + """Schema rejects unknown fields by default.""" + with self.assertRaises((msgspec.ValidationError, msgspec.DecodeError)): + msgspec.convert({"x": 1, "y": "a", "z": True}, SampleSchema) + + def test_allow_unknown_fields(self): + """Schema with forbid_unknown_fields=False allows extra fields.""" - def test_extend_schema(self): - "Extending a schema applies taskgraph checks." - with self.assertRaises(Exception): - LegacySchema({"kebab-case": int}).extend({"camelCase": int}) + class OpenSchema(Schema, forbid_unknown_fields=False): + x: int - def test_extend_schema_twice(self): - "Extending a schema twice applies taskgraph checks." - with self.assertRaises(Exception): - LegacySchema({"kebab-case": int}).extend({"more-kebab": int}).extend( - {"camelCase": int} - ) + result = msgspec.convert({"x": 1, "extra": "ok"}, OpenSchema) + assert result.x == 1 -def test_check_skipped(monkeypatch): - """Schema not validated if 'check=False' or taskgraph.fast is unset.""" - LegacySchema({"camelCase": int}, check=False) # assert no exception +class MyFieldSchema(Schema): + my_field: int + + +def test_validation_skipped(monkeypatch): + """Validation is skipped when taskgraph.fast is True.""" monkeypatch.setattr(taskgraph, "fast", True) - LegacySchema({"camelCase": int}) # assert no exception + # Pass invalid data — should not raise because validation is skipped + validate_schema(SampleSchema, {"x": "not-int"}, "pfx") class TestResolveKeyedBy(unittest.TestCase): @@ -238,29 +242,32 @@ def test_no_key(self): def test_optionally_keyed_by(): - validator = optionally_keyed_by("foo", str) - assert validator("baz") == "baz" - assert validator({"by-foo": {"a": "b", "c": "d"}}) == {"a": "b", "c": "d"} + typ = optionally_keyed_by("foo", str, use_msgspec=True) + assert msgspec.convert("baz", typ) == "baz" + assert msgspec.convert({"by-foo": {"a": "b", "c": "d"}}, typ) == { + "by-foo": {"a": "b", "c": "d"} + } - with pytest.raises(Invalid): - validator({"by-foo": {"a": 1, "c": "d"}}) + with pytest.raises(msgspec.ValidationError): + msgspec.convert({"by-foo": {"a": 1, "c": "d"}}, typ) - with pytest.raises(MultipleInvalid): - validator({"by-bar": {"a": "b"}}) + with pytest.raises(msgspec.ValidationError): + msgspec.convert({"by-bar": {"a": "b"}}, typ) def test_optionally_keyed_by_mulitple_keys(): - validator = optionally_keyed_by("foo", "bar", str) - assert validator("baz") == "baz" - assert validator({"by-foo": {"a": "b", "c": "d"}}) == {"a": "b", "c": "d"} - assert validator({"by-bar": {"x": "y"}}) == {"x": "y"} - assert validator({"by-foo": {"a": {"by-bar": {"x": "y"}}}}) == {"a": {"x": "y"}} + typ = optionally_keyed_by("foo", "bar", str, use_msgspec=True) + assert msgspec.convert("baz", typ) == "baz" + assert msgspec.convert({"by-foo": {"a": "b", "c": "d"}}, typ) == { + "by-foo": {"a": "b", "c": "d"} + } + assert msgspec.convert({"by-bar": {"x": "y"}}, typ) == {"by-bar": {"x": "y"}} - with pytest.raises(Invalid): - validator({"by-foo": {"a": 123, "c": "d"}}) + with pytest.raises(msgspec.ValidationError): + msgspec.convert({"by-foo": {"a": 123, "c": "d"}}, typ) - with pytest.raises(MultipleInvalid): - validator({"by-bar": {"a": 1}}) + with pytest.raises(msgspec.ValidationError): + msgspec.convert({"by-bar": {"a": 1}}, typ) - with pytest.raises(MultipleInvalid): - validator({"by-unknown": {"a": "b"}}) + with pytest.raises(msgspec.ValidationError): + msgspec.convert({"by-unknown": {"a": "b"}}, typ)