Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ disallow_untyped_defs = False
# is just not worth the effort.
disallow_untyped_defs = False

# These modules are deprecated (maybe implicitly, as being Gen2-only). Not
# worth adding new annotations to them.
[mypy-lsst.pipe.base.argumentParser.*]
disallow_untyped_defs = False
[mypy-lsst.pipe.base.shims.*]
disallow_untyped_defs = False

# ConfigOverrides uses the Python built-in ast module, and figuring out how
# to correctly type its visitation interface doesn't seem worth the effort
# right now.
Expand Down
173 changes: 173 additions & 0 deletions python/lsst/pipe/base/blocking_limited_butler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# This file is part of pipe_base.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ["BlockingLimitedButler"]

import logging
import time
from collections.abc import Iterable, Mapping
from typing import Any

from lsst.daf.butler import (
ButlerMetrics,
DatasetProvenance,
DatasetRef,
DeferredDatasetHandle,
DimensionUniverse,
LimitedButler,
StorageClass,
)

_LOG = logging.getLogger(__name__)


class BlockingLimitedButler(LimitedButler):
"""A `LimitedButler` that blocks until certain dataset types exist.

Parameters
----------
wrapped : `LimitedButler`
The butler to wrap.
timeouts : `~collections.abc.Mapping` [ `str`, `float` or `None` ]
Timeouts in seconds to wait for different dataset types. Dataset types
not included not blocked on (i.e. their timeout is ``0.0``).

Notes
-----
When a timeout is exceeded, `get` will raise `FileNotFoundError` (as usual
for a dataset that does not exist) and `stored_many` will mark the dataset
as non-existent. `getDeferred` does not block.
"""

def __init__(
self,
wrapped: LimitedButler,
timeouts: Mapping[str, float | None],
):
self._wrapped = wrapped
self._timeouts = timeouts

def close(self) -> None:
self._wrapped.close()

@property
def _metrics(self) -> ButlerMetrics:
# Need to always forward from the wrapped metrics object.
return self._wrapped._metrics

@_metrics.setter
def _metrics(self, metrics: ButlerMetrics) -> None:
# Allow record_metrics() context manager to override the wrapped
# butler.
self._wrapped._metrics = metrics

def get(
self,
ref: DatasetRef,
/,
*,
parameters: dict[str, Any] | None = None,
storageClass: StorageClass | str | None = None,
) -> Any:
parent_dataset_type_name = ref.datasetType.nameAndComponent()[0]
timeout = self._timeouts.get(parent_dataset_type_name, 0.0)
start = time.time()
while True:
try:
return self._wrapped.get(ref, parameters=parameters, storageClass=storageClass)
except FileNotFoundError as err:
if timeout is not None:
elapsed = time.time() - start
if elapsed > timeout:
err.add_note(f"Timed out after {elapsed:03f}s.")
raise
_LOG.info(f"Dataset {ref.datasetType} not immediately available for {ref.id}, waiting {timeout}s")
time.sleep(0.5)

def getDeferred(
self,
ref: DatasetRef,
/,
*,
parameters: dict[str, Any] | None = None,
storageClass: str | StorageClass | None = None,
) -> DeferredDatasetHandle:
# note that this does not use the block at all
return self._wrapped.getDeferred(ref, parameters=parameters, storageClass=storageClass)

def stored_many(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]:
start = time.time()
result = self._wrapped.stored_many(refs)
timeouts = {ref.id: self._timeouts.get(ref.datasetType.nameAndComponent()[0], 0.0) for ref in result}
while True:
elapsed = time.time() - start
remaining: list[DatasetRef] = []
for ref, exists in result.items():
timeout = timeouts[ref.id]
if not exists and (timeout is None or elapsed <= timeout):
_LOG.info(
f"Dataset {ref.datasetType} not immediately available for {ref.id}, "
f"waiting {timeout}s"
)
remaining.append(ref)
if not remaining:
return result
result.update(self._wrapped.stored_many(remaining))
time.sleep(0.5)

def isWriteable(self) -> bool:
return self._wrapped.isWriteable()

def put(self, obj: Any, ref: DatasetRef, /, *, provenance: DatasetProvenance | None = None) -> DatasetRef:
return self._wrapped.put(obj, ref, provenance=provenance)

def pruneDatasets(
self,
refs: Iterable[DatasetRef],
*,
disassociate: bool = True,
unstore: bool = False,
tags: Iterable[str] = (),
purge: bool = False,
) -> None:
return self._wrapped.pruneDatasets(
refs, disassociate=disassociate, unstore=unstore, tags=tags, purge=purge
)

@property
def dimensions(self) -> DimensionUniverse:
return self._wrapped.dimensions

@property
def _datastore(self) -> Any:
return self._wrapped._datastore

@_datastore.setter # demanded by MyPy since we declare it to be an instance attribute in LimitedButler.
def _datastore(self, value: Any) -> None:
self._wrapped._datastore = value
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/pipeline_graph/_task_subsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def discard(self, value: str) -> None:
self._members.discard(value)

@classmethod
def _from_iterable(cls, iterable: Iterable[str]) -> set[str]:
def _from_iterable[S](cls, iterable: Iterable[S]) -> set[S]:
# This is the hook used by collections.abc.Set when implementing
# operators that return new sets. In this case, we want those to be
# regular `set` (builtin) objects, not `TaskSubset` instances.
Expand Down
42 changes: 26 additions & 16 deletions python/lsst/pipe/base/single_quantum_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ class SingleQuantumExecutor(QuantumExecutor):

Parameters
----------
butler : `~lsst.daf.butler.Butler` or `None`, optional
Data butler, `None` means that a limited butler should be used instead.
butler : `~lsst.daf.butler.LimitedButler` or `None`, optional
Data butler; `None` means that ``limited_butler_factory`` should be
used instead.
task_factory : `.TaskFactory`, optional
Instance of a task factory. Defaults to a new instance of
`lsst.pipe.base.TaskFactory`.
Expand All @@ -94,8 +95,8 @@ class SingleQuantumExecutor(QuantumExecutor):
Enable debugging with ``lsstDebug`` facility for a task.
limited_butler_factory : `~collections.abc.Callable`, optional
A method that creates a `~lsst.daf.butler.LimitedButler` instance for a
given Quantum. This parameter must be defined if ``butler`` is `None`.
If ``butler`` is not `None` then this parameter is ignored.
given Quantum. This parameter must be provided if ``butler`` is
`None`. If ``butler`` is not `None` then this parameter is ignored.
resources : `.ExecutionResources`, optional
The resources available to this quantum when executing.
skip_existing : `bool`, optional
Expand All @@ -115,15 +116,15 @@ class SingleQuantumExecutor(QuantumExecutor):
continuing to run downstream tasks.
job_metadata : `~collections.abc.Mapping`
Mapping with extra metadata to embed within the quantum metadata under
the "job" key. This is intended to correspond to information common
to all quanta being executed in a single process, such as the time
taken to load the quantum graph in a BPS job.
the "job" key. This is intended to correspond to information common to
all quanta being executed in a single process, such as the time taken
to load the quantum graph in a BPS job.
"""

def __init__(
self,
*,
butler: Butler | None = None,
butler: LimitedButler | None = None,
task_factory: TaskFactory | None = None,
skip_existing_in: Any = None,
clobber_outputs: bool = False,
Expand All @@ -135,7 +136,17 @@ def __init__(
raise_on_partial_outputs: bool = True,
job_metadata: Mapping[str, int | str | float] | None = None,
):
self._butler = butler
self._butler: Butler | None = None
self._limited_butler: LimitedButler | None = None
match butler:
case Butler():
self._butler = butler
self._limited_butler = butler
case LimitedButler():
self._limited_butler = butler
case None:
if limited_butler_factory is None:
raise ValueError("limited_butler_factory is needed when butler is None")
self._task_factory = task_factory if task_factory is not None else TaskFactory()
self._clobber_outputs = clobber_outputs
self._enable_lsst_debug = enable_lsst_debug
Expand All @@ -144,10 +155,6 @@ def __init__(
self._assume_no_existing_outputs = assume_no_existing_outputs
self._raise_on_partial_outputs = raise_on_partial_outputs
self._job_metadata = job_metadata

if self._butler is None:
assert limited_butler_factory is not None, "limited_butler_factory is needed when butler is None"

# Find whether output run is in skip_existing_in.
self._skip_existing = skip_existing
if self._butler is not None and skip_existing_in and not self._skip_existing:
Expand Down Expand Up @@ -190,9 +197,12 @@ def _execute(
limited_butler = self._butler
else:
# We check this in constructor, but mypy needs this check here.
assert self._limited_butler_factory is not None
limited_butler = self._limited_butler_factory(quantum)
used_butler_factory = True
if self._limited_butler is not None:
limited_butler = self._limited_butler
else:
assert self._limited_butler_factory is not None
limited_butler = self._limited_butler_factory(quantum)
used_butler_factory = True

try:
return self._execute_with_limited_butler(
Expand Down
Loading
Loading