-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtorchcodec_reader.py
More file actions
65 lines (54 loc) · 2.67 KB
/
torchcodec_reader.py
File metadata and controls
65 lines (54 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from typing import Literal
from pathlib import Path
import torch
from torchcodec.decoders import VideoDecoder
from src.video_io.abstract_reader import AbstractVideoReader
class TorchcodecVideoReader(AbstractVideoReader):
"""Videoreader using TorchCodec library.
Args:
video_path (str | Path): Path to the input video file.
mode (Literal["seek", "stream"], optional): Reading mode: "seek" -
find each frame individually, "stream" - decode all frames from
the range of requested indeces and subsample.
Defaults to "stream".
output_format (Literal["THWC", "TCHW"], optional): Data format:
channel last or first. Defaults to "THWC".
device (str, optional): Device to send the resulted tensor to.
Defaults to "cuda:0".
"""
def __init__(self, video_path: str | Path,
mode: Literal["seek", "stream"] = "stream",
output_format: Literal["THWC", "TCHW"] = "THWC",
device: str = "cuda:0"):
super().__init__(video_path, mode=mode, output_format=output_format,
device=device)
def _initialize_reader(self) -> None:
# Tensor axis order rearranged in _finalize_tensor if required
self.decoder = VideoDecoder(self.video_path,
dimension_order="NHWC",
device=self.device)
self.num_frames = self.decoder.metadata.num_frames
self.fps = self.decoder.metadata.average_fps
def seek_read(self, frame_indices: list[int]) -> list[torch.Tensor]:
"""Retrieve frames by their indices using random access."""
frames = []
for idx in frame_indices:
if idx < 0 or idx >= self.num_frames:
raise ValueError(f"Invalid frame index: {idx}")
frame = self.decoder[idx]
frames.append(self._process_frame(frame))
return frames
def stream_read(self, frame_indices: list[int]) -> list[torch.Tensor]:
start_idx, end_idx = min(frame_indices), max(frame_indices)
if start_idx < 0 or end_idx >= self.num_frames:
raise ValueError(f"Invalid frame indices: {frame_indices}")
batch = self.decoder.get_frames_in_range(
start=start_idx, stop=end_idx + 1, step=1)
frames = batch.data # Frames in TCHW format
relative_indices = [idx - start_idx for idx in frame_indices]
return frames[relative_indices]
def _read_frames_slice(self, start_idx: int, stop_idx: int, step: int)\
-> torch.Tensor:
return self.decoder[start_idx:stop_idx:step]
def release(self) -> None:
self.decoder = None