Skip to content

Commit e726079

Browse files
Obstore workflow cache (#27)
1 parent 99e96e8 commit e726079

2 files changed

Lines changed: 65 additions & 9 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [0.48.0] - 2026-01-29
11+
1012
### Added
1113

12-
`tilebox-storage`: Added a `LocalFileSystemStorageClient` to access data on a local file system, a mounted network file
14+
- `tilebox-storage`: Added a `LocalFileSystemStorageClient` to access data on a local file system, a mounted network file
1315
system or a syncified directory with a remote file system (e.g. Dropbox, Google Drive, etc.).
16+
- `tilebox-workflows`: Added an `ObstoreCache` implementation for the task cache powered by `obstore`.
1417

1518
### Changed
1619

17-
`tilebox-storage`: Renamed the existing `StorageClient` base class in `tilebox.storage.aio` to `CachingStorageClient`
20+
- `tilebox-storage`: Renamed the existing `StorageClient` base class in `tilebox.storage.aio` to `CachingStorageClient`
1821
to accomodate the new `StorageClient` base class that does not provide caching, since `LocalFileSystemStorageClient` is
1922
the first client that does not cache data (since it's already on the local file system).
2023

@@ -321,7 +324,8 @@ the first client that does not cache data (since it's already on the local file
321324
- Released under the [MIT](https://opensource.org/license/mit) license.
322325
- Released packages: `tilebox-datasets`, `tilebox-workflows`, `tilebox-storage`, `tilebox-grpc`
323326

324-
[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.47.0...HEAD
327+
[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.48.0...HEAD
328+
[0.48.0]: https://github.com/tilebox/tilebox-python/compare/v0.47.0...v0.48.0
325329
[0.47.0]: https://github.com/tilebox/tilebox-python/compare/v0.46.0...v0.47.0
326330
[0.46.0]: https://github.com/tilebox/tilebox-python/compare/v0.45.0...v0.46.0
327331
[0.45.0]: https://github.com/tilebox/tilebox-python/compare/v0.44.0...v0.45.0

tilebox-workflows/tilebox/workflows/cache.py

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1+
import contextlib
12
import warnings
23
from abc import ABC, abstractmethod
34
from collections.abc import Iterator
45
from io import BytesIO
56
from pathlib import Path
7+
from pathlib import PurePosixPath as ObjectPath
68

79
import boto3
810
from botocore.exceptions import ClientError
911
from google.cloud.exceptions import NotFound
1012
from google.cloud.storage import Blob, Bucket
13+
from obstore.store import ObjectStore
1114

1215

1316
class JobCache(ABC):
@@ -62,6 +65,53 @@ def group(self, key: str) -> "NoCache":
6265
return self
6366

6467

68+
class ObstoreCache(JobCache):
69+
def __init__(self, store: ObjectStore, prefix: str | ObjectPath = ObjectPath(".")) -> None:
70+
"""A cache implementation backed by an obstore ObjectStore.
71+
72+
This cache implementation is the recommended way of working with the cache, as it provides a unified interface
73+
for working with different object stores, while also providing a way to transparently work with local files
74+
as well.
75+
76+
Args:
77+
store: The object store to use for the cache.
78+
prefix: A path prefix to append to all objects stored in the cache. Defaults to no prefix.
79+
"""
80+
self.store = store
81+
self.prefix = ObjectPath(prefix)
82+
83+
def __contains__(self, key: str) -> bool:
84+
with contextlib.suppress(OSError):
85+
self.store.get(str(self.prefix / key))
86+
return True # if get is successful, we know the key is in the cache
87+
88+
return False
89+
90+
def __setitem__(self, key: str, value: bytes) -> None:
91+
self.store.put(str(self.prefix / key), value)
92+
93+
def __delitem__(self, key: str) -> None:
94+
try:
95+
self.store.delete(str(self.prefix / key))
96+
except OSError:
97+
raise KeyError(f"{key} is not cached!") from None
98+
99+
def __getitem__(self, key: str) -> bytes:
100+
try:
101+
entry = self.store.get(str(self.prefix / key))
102+
return bytes(entry.bytes())
103+
except OSError:
104+
raise KeyError(f"{key} is not cached!") from None
105+
106+
def __iter__(self) -> Iterator[str]:
107+
for obj in self.store.list_with_delimiter(str(self.prefix))["objects"]:
108+
path: str = obj["path"]
109+
yield path.removeprefix(str(self.prefix) + "/")
110+
111+
def group(self, key: str) -> "ObstoreCache":
112+
return ObstoreCache(self.store, prefix=str(self.prefix / key))
113+
114+
65115
class InMemoryCache(JobCache):
66116
def __init__(self) -> None:
67117
"""A simple in-memory cache implementation.
@@ -153,7 +203,7 @@ def __init__(self, root: Path | str = Path("cache")) -> None:
153203
Args:
154204
root: File system path where the cache will be stored. Defaults to "cache" in the current working directory.
155205
"""
156-
self.root = root if isinstance(root, Path) else Path(root)
206+
self.root = Path(root)
157207

158208
def __contains__(self, key: str) -> bool:
159209
return (self.root / key).exists()
@@ -184,15 +234,17 @@ def group(self, key: str) -> "LocalFileSystemCache":
184234

185235

186236
class GoogleStorageCache(JobCache):
187-
def __init__(self, bucket: Bucket, prefix: str = "jobs") -> None:
237+
def __init__(self, bucket: Bucket, prefix: str | ObjectPath = "jobs") -> None:
188238
"""A cache implementation that stores data in Google Cloud Storage.
189239
190240
Args:
191241
bucket: The Google Cloud Storage bucket to use for the cache.
192242
prefix: A path prefix to append to all objects stored in the cache. Defaults to "jobs".
193243
"""
194244
self.bucket = bucket
195-
self.prefix = Path(prefix) # we still use pathlib here, because it's easier to work with when joining paths
245+
self.prefix = ObjectPath(
246+
prefix
247+
) # we still use pathlib here, because it's easier to work with when joining paths
196248

197249
def _blob(self, key: str) -> Blob:
198250
return self.bucket.blob(str(self.prefix / key))
@@ -228,22 +280,22 @@ def __iter__(self) -> Iterator[str]:
228280

229281
# make the names relative to the cache prefix (but including the key in the name)
230282
for blob in blobs:
231-
yield str(Path(blob.name).relative_to(self.prefix))
283+
yield str(ObjectPath(blob.name).relative_to(self.prefix))
232284

233285
def group(self, key: str) -> "GoogleStorageCache":
234286
return GoogleStorageCache(self.bucket, prefix=str(self.prefix / key))
235287

236288

237289
class AmazonS3Cache(JobCache):
238-
def __init__(self, bucket: str, prefix: str = "jobs") -> None:
290+
def __init__(self, bucket: str, prefix: str | ObjectPath = "jobs") -> None:
239291
"""A cache implementation that stores data in Amazon S3.
240292
241293
Args:
242294
bucket: The Amazon S3 bucket to use for the cache.
243295
prefix: A path prefix to append to all objects stored in the cache. Defaults to "jobs".
244296
"""
245297
self.bucket = bucket
246-
self.prefix = Path(prefix)
298+
self.prefix = ObjectPath(prefix)
247299
with warnings.catch_warnings():
248300
# https://github.com/boto/boto3/issues/3889
249301
warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*datetime.utcnow.*")

0 commit comments

Comments
 (0)