-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathexample.py
More file actions
475 lines (424 loc) · 23.8 KB
/
example.py
File metadata and controls
475 lines (424 loc) · 23.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
import time
import cv2
from typing import Any, List
from frame_source import FrameSourceFactory
from frame_processors.equirectangular360_processor import Equirectangular2PinholeProcessor
from frame_source.realsense_capture import RealsenseCapture
def test_audio_spectrogram(source=None, **kwargs):
"""Test audio spectrogram capture from microphone or audio file."""
cv2.namedWindow("Audio Spectrogram", cv2.WINDOW_NORMAL)
print("Testing Audio Spectrogram Capture:")
# Default audio parameters for good visualization
audio_params = {
'n_mels': 128,
'n_fft': 2048,
'window_duration': 3.0,
'freq_range': (20, 8000),
'frame_rate': 30,
'colormap': cv2.COLORMAP_VIRIDIS,
**kwargs
}
frame_source = FrameSourceFactory.create('audio_spectrogram', source=source, **audio_params)
if not frame_source.connect():
print("Failed to connect to audio source")
return
threaded = kwargs.get('threaded', True) # Default to threaded mode
print(f"Running in {'threaded' if threaded else 'blocking'} mode")
if threaded:
frame_source.start_async()
print("Started background spectrogram capture thread")
if frame_source.is_connected:
print(f"Audio spectrogram params:")
print(f" Frame size: {frame_source.get_frame_size()}")
print(f" FPS: {frame_source.get_fps()}")
# Audio-specific parameters (type check since not all cameras have these)
if hasattr(frame_source, 'get_n_mels'):
print(f" N mels: {frame_source.get_n_mels()}") # type: ignore
if hasattr(frame_source, 'get_window_duration'):
print(f" Window duration: {frame_source.get_window_duration()}s") # type: ignore
if hasattr(frame_source, 'get_freq_range'):
print(f" Frequency range: {frame_source.get_freq_range()}") # type: ignore
if hasattr(frame_source, 'get_sample_rate'):
print(f" Sample rate: {frame_source.get_sample_rate()}Hz") # type: ignore
if hasattr(frame_source, 'get_nyquist_frequency'):
print(f" Nyquist frequency (max): {frame_source.get_nyquist_frequency()}Hz") # type: ignore
if hasattr(frame_source, 'get_fft_size'):
print(f" FFT size: {frame_source.get_fft_size()}") # type: ignore
if hasattr(frame_source, 'get_contrast_method'):
print(f" Contrast method: {frame_source.get_contrast_method()}") # type: ignore
if hasattr(frame_source, 'get_gamma_correction'):
print(f" Gamma correction: {frame_source.get_gamma_correction():.2f}") # type: ignore
if hasattr(frame_source, 'get_noise_floor'):
print(f" Noise floor: {frame_source.get_noise_floor()} dB") # type: ignore
if hasattr(frame_source, 'get_percentile_range'):
print(f" Percentile range: {frame_source.get_percentile_range()}%") # type: ignore
def print_help():
print("\nKey controls:")
print(" ESC - Quit")
print(" h - Show this help")
print(" 0 - Grayscale (default)")
print(" 1 - Viridis colormap")
print(" 2 - Plasma colormap")
print(" 3 - Inferno colormap")
print(" 4 - Hot colormap")
print(" 5 - Jet colormap")
print(" +/- - Adjust mel bands (requires restart)")
print(" c - Cycle contrast methods (fixed/adaptive/percentile)")
print(" g/G - Decrease/increase gamma correction")
print(" n/N - Decrease/increase noise floor")
print(" p/P - Adjust percentile range")
print_help()
while frame_source.is_connected:
ret, frame = frame_source.read()
if ret and frame is not None:
cv2.imshow("Audio Spectrogram", frame)
key = cv2.waitKey(1) & 0xFF
if key == 27: # ESC key to quit
break
elif key == ord('h'): # Show help
print_help()
elif key == ord('0') and hasattr(frame_source, 'set_colormap'):
frame_source.set_colormap(None) # type: ignore
print("Colormap: Grayscale")
elif key == ord('1') and hasattr(frame_source, 'set_colormap'):
frame_source.set_colormap(cv2.COLORMAP_VIRIDIS) # type: ignore
print("Colormap: Viridis")
elif key == ord('2') and hasattr(frame_source, 'set_colormap'):
frame_source.set_colormap(cv2.COLORMAP_PLASMA) # type: ignore
print("Colormap: Plasma")
elif key == ord('3') and hasattr(frame_source, 'set_colormap'):
frame_source.set_colormap(cv2.COLORMAP_INFERNO) # type: ignore
print("Colormap: Inferno")
elif key == ord('4') and hasattr(frame_source, 'set_colormap'):
frame_source.set_colormap(cv2.COLORMAP_HOT) # type: ignore
print("Colormap: Hot")
elif key == ord('5') and hasattr(frame_source, 'set_colormap'):
frame_source.set_colormap(cv2.COLORMAP_JET) # type: ignore
print("Colormap: Jet")
elif key == ord('+') or key == ord('='):
if hasattr(frame_source, 'get_n_mels') and hasattr(frame_source, 'set_n_mels'):
current_mels = frame_source.get_n_mels() # type: ignore
frame_source.set_n_mels(min(current_mels + 16, 256)) # type: ignore
print(f"Mel bands: {frame_source.get_n_mels()} (restart to apply)") # type: ignore
elif key == ord('-'):
if hasattr(frame_source, 'get_n_mels') and hasattr(frame_source, 'set_n_mels'):
current_mels = frame_source.get_n_mels() # type: ignore
frame_source.set_n_mels(max(current_mels - 16, 32)) # type: ignore
print(f"Mel bands: {frame_source.get_n_mels()} (restart to apply)") # type: ignore
elif key == ord('c'): # Cycle contrast methods
if hasattr(frame_source, 'get_contrast_method') and hasattr(frame_source, 'set_contrast_method'):
current_method = frame_source.get_contrast_method() # type: ignore
methods = ['fixed', 'adaptive', 'percentile']
current_index = methods.index(current_method)
next_method = methods[(current_index + 1) % len(methods)]
frame_source.set_contrast_method(next_method) # type: ignore
print(f"Contrast method: {next_method}")
elif key == ord('g'): # Decrease gamma
if hasattr(frame_source, 'get_gamma_correction') and hasattr(frame_source, 'set_gamma_correction'):
current_gamma = frame_source.get_gamma_correction() # type: ignore
new_gamma = max(current_gamma - 0.1, 0.1)
frame_source.set_gamma_correction(new_gamma) # type: ignore
print(f"Gamma correction: {new_gamma:.2f} ({'more contrast' if new_gamma < 1.0 else 'less contrast'})")
elif key == ord('G'): # Increase gamma
if hasattr(frame_source, 'get_gamma_correction') and hasattr(frame_source, 'set_gamma_correction'):
current_gamma = frame_source.get_gamma_correction() # type: ignore
new_gamma = min(current_gamma + 0.1, 3.0)
frame_source.set_gamma_correction(new_gamma) # type: ignore
print(f"Gamma correction: {new_gamma:.2f} ({'more contrast' if new_gamma < 1.0 else 'less contrast'})")
elif key == ord('n'): # Decrease noise floor (less noise suppression)
if hasattr(frame_source, 'get_noise_floor') and hasattr(frame_source, 'set_noise_floor'):
current_floor = frame_source.get_noise_floor() # type: ignore
new_floor = max(current_floor - 5, -100)
frame_source.set_noise_floor(new_floor) # type: ignore
print(f"Noise floor: {new_floor} dB")
elif key == ord('N'): # Increase noise floor (more noise suppression)
if hasattr(frame_source, 'get_noise_floor') and hasattr(frame_source, 'set_noise_floor'):
current_floor = frame_source.get_noise_floor() # type: ignore
new_floor = min(current_floor + 5, -10)
frame_source.set_noise_floor(new_floor) # type: ignore
print(f"Noise floor: {new_floor} dB")
elif key == ord('p'): # Decrease percentile range (more aggressive)
if hasattr(frame_source, 'get_percentile_range') and hasattr(frame_source, 'set_percentile_range'):
low, high = frame_source.get_percentile_range() # type: ignore
new_low = min(low + 2, 20)
new_high = max(high - 2, 80)
if new_low < new_high:
frame_source.set_percentile_range(new_low, new_high) # type: ignore
print(f"Percentile range: {new_low}-{new_high}% (more aggressive)")
elif key == ord('P'): # Increase percentile range (less aggressive)
if hasattr(frame_source, 'get_percentile_range') and hasattr(frame_source, 'set_percentile_range'):
low, high = frame_source.get_percentile_range() # type: ignore
new_low = max(low - 2, 0)
new_high = min(high + 2, 100)
frame_source.set_percentile_range(new_low, new_high) # type: ignore
print(f"Percentile range: {new_low}-{new_high}% (less aggressive)")
if threaded:
frame_source.stop()
print("Stopped background spectrogram capture thread")
frame_source.disconnect()
cv2.destroyWindow("Audio Spectrogram")
def test_360_camera(name, **kwargs):
"""Test a 360 camera with equirectangular to pinhole projection."""
cv2.namedWindow("camera", cv2.WINDOW_NORMAL)
print("Testing 360 Camera Capture:")
camera = FrameSourceFactory.create(name, **kwargs)
camera.connect()
# Set camera resolution and fps - insta360 x5 webcam mode settings
camera.set_frame_size(2880, 1440)
camera.set_fps(30)
# Add processor if specified
if 'processor' in kwargs:
processor_config = kwargs.pop('processor')
processor_type = processor_config.pop('type')
if processor_type == 'equirectangular':
processor = Equirectangular2PinholeProcessor(**processor_config)
camera.attach_processor(processor)
threaded = kwargs.get('threaded', False)
if threaded:
camera.start_async()
if camera.is_connected:
print(f"Frame size: {camera.get_frame_size()}")
print(f"FPS: {camera.get_fps()}")
# Read a few frames
while camera.is_connected:
ret, frame = camera.read()
if ret:
if frame is not None:
cv2.imshow("camera", frame)
key = cv2.waitKey(1) & 0xFF
if key == 27: # ESC key to quit
break
elif key == ord('h'): # Show help
print("\nKey controls:")
print(" ESC - Quit")
print(" h - Show this help")
# Add processor controls if processor is attached
if hasattr(camera, '_processors') and camera._processors:
print("\nProcessor controls:")
print(" w/s - Adjust pitch (up/down)")
print(" a/d - Adjust yaw (left/right)")
print(" q/e - Adjust roll (left/right)")
print(" r - Reset processor angles")
elif key == ord('w'): # Pitch up
processors = getattr(camera, '_processors', [])
for processor in processors:
if hasattr(processor, 'get_parameter'):
current_pitch = processor.get_parameter('pitch') or 0
processor.set_parameter('pitch', current_pitch + 5.0)
print(f"Pitch: {processor.get_parameter('pitch'):.1f}°")
elif key == ord('s'): # Pitch down
processors = getattr(camera, '_processors', [])
for processor in processors:
if hasattr(processor, 'get_parameter'):
current_pitch = processor.get_parameter('pitch') or 0
processor.set_parameter('pitch', current_pitch - 5.0)
print(f"Pitch: {processor.get_parameter('pitch'):.1f}°")
elif key == ord('a'): # Yaw left
processors = getattr(camera, '_processors', [])
for processor in processors:
if hasattr(processor, 'get_parameter'):
current_yaw = processor.get_parameter('yaw') or 0
processor.set_parameter('yaw', current_yaw - 5.0)
print(f"Yaw: {processor.get_parameter('yaw'):.1f}°")
elif key == ord('d'): # Yaw right
processors = getattr(camera, '_processors', [])
for processor in processors:
if hasattr(processor, 'get_parameter'):
current_yaw = processor.get_parameter('yaw') or 0
processor.set_parameter('yaw', current_yaw + 5.0)
print(f"Yaw: {processor.get_parameter('yaw'):.1f}°")
elif key == ord('q'): # Roll left
processors = getattr(camera, '_processors', [])
for processor in processors:
if hasattr(processor, 'get_parameter'):
current_roll = processor.get_parameter('roll') or 0
processor.set_parameter('roll', current_roll - 5.0)
print(f"Roll: {processor.get_parameter('roll'):.1f}°")
elif key == ord('e'): # Roll right
processors = getattr(camera, '_processors', [])
for processor in processors:
if hasattr(processor, 'get_parameter'):
current_roll = processor.get_parameter('roll') or 0
processor.set_parameter('roll', current_roll + 5.0)
print(f"Roll: {processor.get_parameter('roll'):.1f}°")
elif key == ord('r'): # Reset processor angles
processors = getattr(camera, '_processors', [])
for processor in processors:
if hasattr(processor, 'set_parameter'):
processor.set_parameter('pitch', 0.0)
processor.set_parameter('yaw', 0.0)
processor.set_parameter('roll', 0.0)
print("Processor angles reset to 0°")
else:
print(f"Failed to read frame")
camera.disconnect()
def test_camera(name, **kwargs):
# Example 1: Webcam capture
cv2.namedWindow("camera", cv2.WINDOW_NORMAL)
print(f"Testing {name} Capture:")
if isinstance(name, str):
camera = FrameSourceFactory.create(name, **kwargs)
else:
camera = name
camera.connect()
threaded = kwargs.get('threaded', False)
if threaded:
camera.start_async()
width = kwargs.get('width', 1920)
height = kwargs.get('height', 1080)
fps = kwargs.get('fps', 30)
camera.set_frame_size(width, height)
camera.set_fps(fps)
if camera.is_connected:
exposure_range = camera.get_exposure_range()
if exposure_range is not None:
min_exp, max_exp = exposure_range
else:
min_exp, max_exp = None, None
gain_range = camera.get_gain_range()
if gain_range is not None:
min_gain, max_gain = gain_range
else:
min_gain, max_gain = None, None
# Lock exposure time but allow gain to vary for auto exposure
try:
# Enable auto gain only while keeping exposure fixed
camera.enable_auto_exposure(True) # Enable auto exposure/gain
print("Auto exposure/gain configured: exposure locked, gain variable")
except Exception as e:
print(f"Error configuring Ximea auto exposure/gain: {e}")
# camera.enable_auto_exposure(True)
print(f"Exposure: {camera.get_exposure()}")
print(f"Gain: {camera.get_gain()}")
print(f"Frame size: {camera.get_frame_size()}")
# Read a few frames
while camera.is_connected:
ret, frame = camera.read()
if ret:
if frame is not None:
cv2.imshow("camera", frame)
# Add key controls for exposure and gain adjustment
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('=') or key == ord('+'): # Increase exposure
current_exposure = camera.get_exposure()
if current_exposure is not None and min_exp is not None and max_exp is not None:
new_exposure = min(current_exposure + 1000, max_exp) # Increase by 1ms
camera.set_exposure(new_exposure)
print(f"Exposure increased to: {new_exposure} (range: {min_exp}-{max_exp})")
elif key == ord('-'): # Decrease exposure
current_exposure = camera.get_exposure()
if current_exposure is not None and min_exp is not None and max_exp is not None:
new_exposure = max(current_exposure - 1000, min_exp) # Decrease by 1ms
camera.set_exposure(new_exposure)
print(f"Exposure decreased to: {new_exposure} (range: {min_exp}-{max_exp})")
elif key == ord(']'): # Increase gain
current_gain = camera.get_gain()
if current_gain is not None and min_gain is not None and max_gain is not None:
new_gain = min(current_gain + 1, max_gain)
camera.set_gain(new_gain)
print(f"Gain increased to: {new_gain} (range: {min_gain}-{max_gain})")
elif key == ord('['): # Decrease gain
current_gain = camera.get_gain()
if current_gain is not None and min_gain is not None and max_gain is not None:
new_gain = max(current_gain - 1, min_gain)
camera.set_gain(new_gain)
print(f"Gain decreased to: {new_gain} (range: {min_gain}-{max_gain})")
elif key == ord('a'): # Toggle auto exposure
print("Toggling auto exposure...")
camera.enable_auto_exposure(True)
elif key == ord('m'): # Manual exposure mode
print("Switching to manual exposure...")
camera.enable_auto_exposure(False)
elif key == ord('h'): # Show help
print("\nKey controls:")
print(" q - Quit")
print(" + or = - Increase exposure")
print(" - - Decrease exposure")
print(" ] - Increase gain")
print(" [ - Decrease gain")
print(" a - Enable auto exposure")
print(" m - Manual exposure mode")
print(" h - Show this help")
else:
print(f"Failed to read frame")
camera.disconnect()
def test_multiple_cameras(cameras: List[Any], threaded: bool = True):
"""Test connecting to multiple different cameras types and viewing them live concurrently."""
capture_instances = []
grid_cols = 3
grid_rows = 2
win_w, win_h = 640, 480
for idx, cam_cfg in enumerate(cameras):
name = cam_cfg.pop('capture_type', None)
if not name:
print(f"Camera config missing 'capture_type': {cam_cfg}")
continue
cv2.namedWindow(f"{name}", cv2.WINDOW_NORMAL)
# Set window size and position for grid
cv2.resizeWindow(f"{name}", win_w, win_h)
col = idx % grid_cols
row = idx // grid_cols
x = col * win_w
y = row * win_h
cv2.moveWindow(f"{name}", x, y + (25 * row)) # Add some vertical spacing
print(f"Testing {name} Capture:")
camera = FrameSourceFactory.create(name, **cam_cfg)
if camera.connect():
camera.enable_auto_exposure(True) # Enable auto exposure by default
if threaded:
camera.start_async() # Always use threaded capture for this test
capture_instances.append((name, camera))
print(f"Connected to {name} camera")
else:
print(f"Failed to connect to {name} camera")
try:
while True:
for name, camera in capture_instances:
if camera.is_connected:
ret, frame = camera.read()
if ret:
cv2.imshow(f"{name}", frame)
else:
print(f"Failed to read frame from {name}")
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
for name, camera in capture_instances:
if camera.is_connected:
camera.stop()
camera.disconnect()
print(f"Disconnected from {name}")
if __name__ == "__main__":
# test_audio_spectrogram(source=None, threaded=True, n_mels=256, window_duration=3.0, freq_range=(20, 20000),
# sample_rate=44100, db_range=(-60, 0), contrast_method='adaptive',
# gamma_correction=2.0, noise_floor=-45, percentile_range=(0, 100), colormap=cv2.COLORMAP_INFERNO)
#### Advanced realsense demo
from frame_processors import RealsenseDepthProcessor
from frame_processors.realsense_depth_processor import RealsenseProcessingOutput
camera = RealsenseCapture(width=640, height=480)
processor = RealsenseDepthProcessor(output_format=RealsenseProcessingOutput.ALIGNED_SIDE_BY_SIDE)
camera.attach_processor(processor)
test_camera(camera)
#### Other demos
# test_camera('basler')
# test_camera('ximea')
# test_camera('webcam', source=0, threaded=True, width=1920, height=1080, fps=30) # standard 1080p webcam
# test_camera('webcam', source=0, threaded=True, width=2880, height=1440, fps=30) # insta360 x5 webcam mode settings
# test_camera('video_file', source="media/geti_demo.mp4", loop=True)
# test_camera('ipcam', source="rtsp://192.168.1.153:554/h264Preview_01_sub", username="admin", password="password")
# test_camera('folder', source="media/image_seq", sort_by='date', fps=30, real_time=True, loop=True)
# test_camera('screen', x=100, y=100, w=800, h=600, fps=30, threaded=True)
# test_360_camera('webcam', source=0, threaded=True, processor={'type': 'equirectangular', 'output_width': 1920, 'output_height': 1080, 'fov': 90})
# cameras = [
# {'capture_type': 'genicam', 'threaded': True},
# {'capture_type': 'basler', 'threaded': True},
# {'capture_type': 'ximea', 'threaded': True},
# {'capture_type': 'webcam', 'threaded': True},
# {'capture_type': 'ipcam', 'source': "http://pendelcam.kip.uni-heidelberg.de/mjpg/video.mjpg", 'threaded': True},
# {'capture_type': 'video_file', 'source': "media/geti_demo.mp4", 'loop': True, 'threaded': True},
# {'capture_type': 'folder', 'source': "media/image_seq", 'sort_by': 'date', 'fps': 30, 'real_time': True, 'loop': True, 'threaded': False},
# {'capture_type': 'screen', 'x': 100, 'y': 100, 'w': 800, 'h': 600, 'fps': 30, 'threaded': True}
# ]
# test_multiple_cameras(cameras, threaded=True)