-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcamera_streamer.py
More file actions
171 lines (144 loc) · 6.39 KB
/
camera_streamer.py
File metadata and controls
171 lines (144 loc) · 6.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import base64
import cv2
import zmq
import time
import argparse
import multiprocessing
import signal
from typing import List
import os
class BroadcastConfig:
def __init__(self, port: int, camera_id: int, jpg_quality: int, target_fps: int):
self.port = port
self.camera_id = camera_id
self.jpg_quality = jpg_quality
self.target_fps = target_fps
def broadcast_camera_data(config: BroadcastConfig, stop_event: multiprocessing.Event):
# Publish frames from a single camera on tcp://*:{port} until stop_event is set.
context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.setsockopt(zmq.CONFLATE, 1)
bind_addr = f'tcp://*:{config.port}'
print(f"\033[92m[stream-{config.port}] Binding PUB socket to {bind_addr}\033[0m")
footage_socket.bind(bind_addr) # 172.20.10.3
camera = cv2.VideoCapture(config.camera_id) # init the camera
camera.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 640)
camera.set(cv2.CAP_PROP_BUFFERSIZE, 1)
camera.set(cv2.CAP_PROP_FPS, config.target_fps)
camera.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
print(f"\033[92m[stream-{config.port}] Camera {config.camera_id} opened: {camera.isOpened()}\033[0m")
frame_count = 0
frame_interval = 1.0 / config.target_fps
try:
failed_frame_count = 0
while not stop_event.is_set():
frame_start = time.time()
grabbed, frame = camera.read() # grab the current frame
frame_count += 1
if not grabbed or frame is None:
print(f"\033[91m[stream-{config.port}] frame {frame_count}: camera read failed (grabbed={grabbed})\033[0m")
time.sleep(min(10, 0.2 * 2**failed_frame_count)) # backoff on failures (0.2s, 0.4s, 0.8s, 1.6s, ..., max 10s)
failed_frame_count += 1
continue
# encode
encoded, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, config.jpg_quality])
if not encoded:
print(f"\033[91m[stream-{config.port}] frame {frame_count}: encoding failed\033[0m")
time.sleep(min(10, 0.2 * 2**failed_frame_count)) # backoff on failures
failed_frame_count += 1
continue
failed_frame_count = 0 # reset on success
jpg_as_text = base64.b64encode(buffer)
try:
footage_socket.send(jpg_as_text)
except zmq.ZMQError as e:
print(f"\033[91m[stream-{config.port}] ZMQ send error: {e}\033[0m")
break
frame_end = time.time()
elapsed = frame_end - frame_start
sleep_time = frame_interval - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
finally:
print(f"[stream-{config.port}] cleaning up camera and socket (frames sent: {frame_count})")
camera.release()
cv2.destroyAllWindows()
try:
footage_socket.close()
except Exception as e:
print(f"[stream-{config.port}] error closing socket: {e}")
try:
context.term()
except Exception as e:
print(f"[stream-{config.port}] error terminating context: {e}")
def start_multiple_streams(base_port: int, camera_ids: List[int], jpg_quality: int, target_fps: int):
###
# Start a publisher for each camera_id on ports base_port + index.
#
# Returns (stop_event, threads).
###
stop_event = multiprocessing.Event()
processes: List[multiprocessing.Process] = []
for idx, cam_id in enumerate(camera_ids):
port = base_port + idx
print(f"[main] Starting thread for camera {cam_id} on port {port}")
broadcast_config = BroadcastConfig(
port=port,
camera_id=cam_id,
jpg_quality=jpg_quality,
target_fps=target_fps
)
p = multiprocessing.Process(
target=broadcast_camera_data,
args=(broadcast_config, stop_event),
daemon=True
)
p.start()
processes.append(p)
print(f"\033[93mAttempting to start camera {cam_id} on port {port}\033[0m")
time.sleep(1) # slight delay to stagger startups
return stop_event, processes
def find_available_cameras() -> List[int]:
available_cameras = []
for cam_id in range(0, 8, 2): # check first 8 camera IDs (0-7)
if f"video{cam_id}" not in os.listdir('/dev'):
continue
cap = cv2.VideoCapture(cam_id)
if cap.isOpened():
available_cameras.append(cam_id)
cap.release()
return available_cameras
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog='camera_streamer', description='Streams one or more cameras using OpenCV over ZMQ')
parser.add_argument('--base-port', type=int, default=5555, help='Starting port for the first camera. Subsequent cameras use base-port+index')
parser.add_argument('--camera-ids', type=int, nargs='+', default=[0], help='List of camera IDs to stream (example: --camera-ids 0 2 4)')
parser.add_argument('--auto-find-cameras', type=str, help='Automatically find available camera IDs (on or off, overrides --camera-ids)', choices=['on', 'off'], default='off')
parser.add_argument('--jpg-quality', type=int, default=20, help='Quality of jpgs being transmitted (1-100)')
parser.add_argument('--target-fps', type=int, default=30, help='Target frames per second for streaming')
args = parser.parse_args()
base_port = args.base_port
camera_ids = args.camera_ids
jpg_quality = args.jpg_quality
target_fps = args.target_fps
if args.auto_find_cameras.lower() == 'on':
camera_ids = find_available_cameras()
if not camera_ids:
print("\033[91mNo available cameras found. Exiting.\033[0m")
exit(1)
stop_event, processes = start_multiple_streams(base_port, camera_ids, jpg_quality, target_fps)
def _signal_handler(signum, frame):
print('Stopping streams...')
stop_event.set()
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
try:
for p in processes:
p.join()
except KeyboardInterrupt:
stop_event.set()
for p in processes:
p.join(timeout=2)
if p.is_alive():
p.terminate()
print('All streams stopped')