From ea83821d9a883a51763a7668b3f9863ffe98ec96 Mon Sep 17 00:00:00 2001 From: "Ochui, Princewill Patrick" <21917688+ochui@users.noreply.github.com> Date: Tue, 24 Mar 2026 17:40:13 +0100 Subject: [PATCH 1/4] Refactors config to accept mapping, removes INI support Updates configuration handling to use a Python mapping instead of an INI file, simplifying initialization and validation. Removes all file-based config logic, adjusts documentation and tests accordingly, and improves type safety for clustered Redis config. Enhances maintainability and reduces test setup complexity. --- README.md | 59 +++++++++++++------- src/fq/queue.py | 89 +++++++++++++---------------- tests/config.py | 30 ++++++++++ tests/test_edge_cases.py | 100 ++++++++++----------------------- tests/test_func.py | 118 ++++++++++++++------------------------- tests/test_queue.py | 8 +-- 6 files changed, 184 insertions(+), 220 deletions(-) create mode 100644 tests/config.py diff --git a/README.md b/README.md index 82d150b..625cd66 100644 --- a/README.md +++ b/README.md @@ -33,22 +33,25 @@ pip install -e . ## Configuration -FQ reads a simple INI config file. Intervals are in milliseconds. -``` -[fq] -job_expire_interval : 5000 -job_requeue_interval : 5000 -default_job_requeue_limit : -1 ; -1 retries forever, 0 means no retries - -[redis] -db : 0 -key_prefix : queue_server -conn_type : tcp_sock ; or unix_sock -host : 127.0.0.1 -port : 6379 -password : -clustered : false -unix_socket_path : /tmp/redis.sock +FQ accepts a simple config mapping. Intervals are in milliseconds. +```python +config = { + "fq": { + "job_expire_interval": 5000, + "job_requeue_interval": 5000, + "default_job_requeue_limit": -1, # -1 retries forever, 0 means no retries + }, + "redis": { + "db": 0, + "key_prefix": "queue_server", + "conn_type": "tcp_sock", # or "unix_sock" + "host": "127.0.0.1", + "port": 6379, + "password": "", + "clustered": False, + "unix_socket_path": "/tmp/redis.sock", + }, +} ``` > If you connect via Unix sockets, uncomment the `unixsocket` lines in your `redis.conf`: @@ -66,8 +69,26 @@ from fq import FQ async def main(): - fq = FQ("config.conf") - await fq.initialize() # load config, connect to Redis, register Lua scripts + config = { + "fq": { + "job_expire_interval": 5000, + "job_requeue_interval": 5000, + "default_job_requeue_limit": -1, + }, + "redis": { + "db": 0, + "key_prefix": "queue_server", + "conn_type": "tcp_sock", + "host": "127.0.0.1", + "port": 6379, + "password": "", + "clustered": False, + "unix_socket_path": "/tmp/redis.sock", + }, + } + + fq = FQ(config) + await fq.initialize() # connect to Redis and register Lua scripts job_id = str(uuid.uuid4()) await fq.enqueue( @@ -102,7 +123,7 @@ Common operations: ## Development -- Start Redis for local development: `make redis-up` (binds to `localhost:6379`; matches `tests/test.conf`). +- Start Redis for local development: `make redis-up` (binds to `localhost:6379`). - Run the suite: `make test` (automatically starts and tears down Redis). - Build a wheel: `make build` - Install/uninstall from the build: `make install` / `make uninstall` diff --git a/src/fq/queue.py b/src/fq/queue.py index 6434dae..3574306 100644 --- a/src/fq/queue.py +++ b/src/fq/queue.py @@ -4,7 +4,7 @@ import asyncio import os -import configparser +from collections.abc import Mapping from redis.asyncio import Redis from redis.asyncio.cluster import RedisCluster @@ -25,19 +25,35 @@ class FQ(object): """The FQ object is the core of this queue. FQ does the following. - 1. Accepts a configuration file. + 1. Accepts structured configuration. 2. Initializes the queue. 3. Exposes functions to interact with the queue. """ - def __init__(self, config_path): + def __init__(self, config): """Construct a FQ object by doing the following. - 1. Read the configuration path. - 2. Load the config. + 1. Store the queue configuration. + 2. Validate the config shape. """ - self.config_path = config_path - self._load_config() self._r = None # redis client placeholder + if not isinstance(config, Mapping): + raise FQException("Config must be a mapping with redis and fq sections") + + normalized = {} + for section_name, section_values in config.items(): + if not isinstance(section_values, Mapping): + raise FQException( + "Config section '%s' must be a mapping" % section_name + ) + + normalized[str(section_name)] = { + str(option): value for option, value in section_values.items() + } + + if "redis" not in normalized or "fq" not in normalized: + raise FQException("Config missing required sections: redis, fq") + + self.config = normalized async def initialize(self): """Async initializer to set up redis and lua scripts.""" @@ -45,31 +61,33 @@ async def initialize(self): async def _initialize(self): """Read the FQ configuration and set up redis + Lua scripts.""" + fq_config = self.config["fq"] + redis_config = self.config["redis"] - self._key_prefix = self._config.get("redis", "key_prefix") - self._job_expire_interval = int(self._config.get("fq", "job_expire_interval")) - self._default_job_requeue_limit = int( - self._config.get("fq", "default_job_requeue_limit") - ) + self._key_prefix = redis_config["key_prefix"] + self._job_expire_interval = int(fq_config["job_expire_interval"]) + self._default_job_requeue_limit = int(fq_config["default_job_requeue_limit"]) - redis_connection_type = self._config.get("redis", "conn_type") - db = self._config.get("redis", "db") + redis_connection_type = redis_config["conn_type"] + db = redis_config["db"] if redis_connection_type == "unix_sock": self._r = Redis( db=db, - unix_socket_path=self._config.get("redis", "unix_socket_path"), + unix_socket_path=redis_config["unix_socket_path"], ) elif redis_connection_type == "tcp_sock": isclustered = False - if self._config.has_option("redis", "clustered"): - isclustered = self._config.getboolean("redis", "clustered") + if "clustered" in redis_config: + if not isinstance(redis_config["clustered"], bool): + raise FQException("redis.clustered must be a boolean") + isclustered = redis_config["clustered"] if isclustered: startup_nodes = [ { - "host": self._config.get("redis", "host"), - "port": int(self._config.get("redis", "port")), + "host": redis_config["host"], + "port": int(redis_config["port"]), } ] self._r = RedisCluster( @@ -80,9 +98,9 @@ async def _initialize(self): else: self._r = Redis( db=db, - host=self._config.get("redis", "host"), - port=int(self._config.get("redis", "port")), - password=self._config.get("redis", "password"), + host=redis_config["host"], + port=int(redis_config["port"]), + password=redis_config.get("password"), ) else: raise FQException("Unknown redis conn_type: %s" % redis_connection_type) @@ -107,36 +125,9 @@ async def _validate_redis_connection(self): if result is False: raise FQException("Failed to connect to Redis: ping returned False") - def _load_config(self): - """Read the configuration file and load it into memory.""" - if not os.path.isfile(self.config_path): - raise FQException("Config file not found: %s" % self.config_path) - - self._config = configparser.ConfigParser() - read_files = self._config.read(self.config_path) - - if not read_files: - raise FQException("Unable to read config file: %s" % self.config_path) - - if not self._config.has_section("redis") or not self._config.has_section( - "fq" - ): - raise FQException( - "Config file missing required sections: redis, fq (path: %s)" - % self.config_path - ) - def redis_client(self): return self._r - def reload_config(self, config_path=None): - """Reload the configuration from the new config file if provided - else reload the current config file. - """ - if config_path: - self.config_path = config_path - self._load_config() - def _load_lua_scripts(self): """Loads all lua scripts required by FQ.""" # load lua scripts diff --git a/tests/config.py b/tests/config.py new file mode 100644 index 0000000..617030c --- /dev/null +++ b/tests/config.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- + +from copy import deepcopy + + +TEST_CONFIG = { + "fq": { + "job_expire_interval": 5000, + "job_requeue_interval": 5000, + "default_job_requeue_limit": -1, + }, + "redis": { + "db": 0, + "key_prefix": "test_fq", + "conn_type": "tcp_sock", + "unix_socket_path": "/tmp/redis.sock", + "port": 6379, + "host": "127.0.0.1", + "clustered": False, + "password": "", + }, +} + + +def build_test_config(**section_overrides): + config = deepcopy(TEST_CONFIG) + for section_name, overrides in section_overrides.items(): + config.setdefault(section_name, {}) + config[section_name].update(overrides) + return config diff --git a/tests/test_edge_cases.py b/tests/test_edge_cases.py index 87c8721..d6924dc 100644 --- a/tests/test_edge_cases.py +++ b/tests/test_edge_cases.py @@ -2,14 +2,13 @@ # Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. -import os -import tempfile import unittest from unittest.mock import patch from fq import FQ from fq.utils import is_valid_identifier from fq.exceptions import BadArgumentException, FQException +from tests.config import build_test_config class FakeCluster: @@ -113,8 +112,7 @@ async def delete(self, key): class TestEdgeCases(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): - cwd = os.path.dirname(os.path.realpath(__file__)) - self.config_path = os.path.join(cwd, "test.conf") + self.config = build_test_config() self.fq_instance = None async def asyncTearDown(self): @@ -131,49 +129,36 @@ async def asyncTearDown(self): pass self.fq_instance = None - def test_missing_config_file_raises(self): - with self.assertRaisesRegex(FQException, "Config file not found"): + def test_invalid_config_type_raises(self): + with self.assertRaisesRegex(FQException, "Config must be a mapping"): FQ("/tmp/does-not-exist.conf") async def test_initialize_fails_fast_on_bad_redis(self): with patch("fq.queue.Redis", FakeRedisConnectionFailure): - fq = FQ(self.config_path) + fq = FQ(self.config) with self.assertRaisesRegex(FQException, "Failed to connect to Redis"): await fq.initialize() async def test_cluster_initialization(self): """Covers clustered Redis path (queue.py lines 69-75, 104-106).""" - with tempfile.NamedTemporaryFile(mode="w", suffix=".conf", delete=False) as f: - f.write( - """[fq] -job_expire_interval : 5000 -job_requeue_interval : 5000 -default_job_requeue_limit : -1 - -[redis] -db : 0 -key_prefix : test_fq_cluster -conn_type : tcp_sock -host : 127.0.0.1 -port : 6379 -clustered : true -password : -""" - ) - config_path = f.name - - try: - with patch("fq.queue.RedisCluster", FakeCluster): - fq = FQ(config_path) - await fq._initialize() - self.assertIsInstance(fq.redis_client(), FakeCluster) - await fq.close() - finally: - os.unlink(config_path) + config = build_test_config( + redis={"key_prefix": "test_fq_cluster", "clustered": True} + ) + with patch("fq.queue.RedisCluster", FakeCluster): + fq = FQ(config) + await fq._initialize() + self.assertIsInstance(fq.redis_client(), FakeCluster) + await fq.close() + + async def test_clustered_config_must_be_boolean(self): + config = build_test_config(redis={"clustered": "true"}) + fq = FQ(config) + with self.assertRaisesRegex(FQException, "redis.clustered must be a boolean"): + await fq._initialize() async def test_dequeue_payload_none(self): """Covers dequeue branch where payload is None (queue.py line 212).""" - fq = FQ(self.config_path) + fq = FQ(self.config) self.fq_instance = fq await fq._initialize() fake_dequeue = FakeLuaDequeue() @@ -184,7 +169,7 @@ async def test_dequeue_payload_none(self): async def test_clear_queue_delete_only(self): """Covers clear_queue else branch (queue.py lines 499, 502).""" - fq = FQ(self.config_path) + fq = FQ(self.config) self.fq_instance = fq await fq._initialize() await fq._r.flushdb() @@ -193,15 +178,15 @@ async def test_clear_queue_delete_only(self): async def test_close_fallback_paths(self): """Covers close() fallback paths (queue.py lines 528-549).""" - fq = FQ(self.config_path) + fq = FQ(self.config) fq._r = FakeRedisForClose() await fq.close() self.assertIsNone(fq._r) async def test_deep_status_calls_set(self): """Covers deep_status (queue.py line 420).""" - fq = FQ(self.config_path) - fq._key_prefix = fq._config.get("redis", "key_prefix") + fq = FQ(self.config) + fq._key_prefix = fq.config["redis"]["key_prefix"] fq._r = FakeRedisForDeepStatus() await fq.deep_status() self.assertEqual( @@ -215,39 +200,10 @@ def test_is_valid_identifier_non_string(self): self.assertFalse(is_valid_identifier(None)) self.assertFalse(is_valid_identifier(["a"])) - async def test_reload_config_with_new_path(self): - """Covers reload_config branch (queue.py lines 104-106).""" - with tempfile.NamedTemporaryFile(mode="w", suffix=".conf", delete=False) as f: - f.write( - """[fq] -job_expire_interval : 5000 -job_requeue_interval : 5000 -default_job_requeue_limit : -1 - -[redis] -db : 0 -key_prefix : new_prefix -conn_type : tcp_sock -port : 6379 -host : 127.0.0.1 -clustered : false -password : -""" - ) - new_config = f.name - - try: - fq = FQ(self.config_path) - fq.reload_config(new_config) - self.assertEqual(fq.config_path, new_config) - self.assertEqual(fq._config.get("redis", "key_prefix"), "new_prefix") - finally: - os.unlink(new_config) - async def test_clear_queue_purge_all_with_mixed_job_ids(self): """Covers purge_all loop branches (queue.py lines 463-468, 474-479).""" - fq = FQ(self.config_path) - fq._key_prefix = fq._config.get("redis", "key_prefix") + fq = FQ(self.config) + fq._key_prefix = fq.config["redis"]["key_prefix"] fq._r = FakeRedisForClear() response = await fq.clear_queue("qt", "qid", purge_all=True) self.assertEqual(response["status"], "Success") @@ -255,7 +211,7 @@ async def test_clear_queue_purge_all_with_mixed_job_ids(self): async def test_get_queue_length_invalid_params(self): """Covers validation branches (queue.py lines 499, 502).""" - fq = FQ(self.config_path) + fq = FQ(self.config) with self.assertRaises(BadArgumentException): await fq.get_queue_length("bad type", "qid") with self.assertRaises(BadArgumentException): @@ -263,7 +219,7 @@ async def test_get_queue_length_invalid_params(self): async def test_deep_status_real_redis(self): """Covers deep_status with real redis (queue.py line 420).""" - fq = FQ(self.config_path) + fq = FQ(self.config) self.fq_instance = fq await fq._initialize() result = await fq.deep_status() diff --git a/tests/test_func.py b/tests/test_func.py index 0116167..fd1aff3 100644 --- a/tests/test_func.py +++ b/tests/test_func.py @@ -1,17 +1,16 @@ # -*- coding: utf-8 -*- # Copyright (c) 2014 Plivo Team. See LICENSE.txt for details. -import os import uuid import time import math import asyncio import unittest import msgpack -import tempfile from unittest.mock import AsyncMock, MagicMock from fq import FQ from fq.exceptions import FQException from fq.utils import generate_epoch, deserialize_payload +from tests.config import build_test_config @@ -23,9 +22,7 @@ class FQTestCase(unittest.IsolatedAsyncioTestCase): """ async def asyncSetUp(self): - cwd = os.path.dirname(os.path.realpath(__file__)) - config_path = os.path.join(cwd, "test.conf") # test config - self.queue = FQ(config_path) + self.queue = FQ(build_test_config()) # flush all the keys in the test db before starting test await self.queue._initialize() await self.queue._r.flushdb() @@ -1734,9 +1731,7 @@ async def test_deep_status(self): async def test_initialize_public_method(self): """Test the public initialize() method.""" - cwd = os.path.dirname(os.path.realpath(__file__)) - config_path = os.path.join(cwd, "test.conf") - fq = FQ(config_path) + fq = FQ(build_test_config()) # Public initialize() should work await fq.initialize() @@ -1797,9 +1792,7 @@ async def test_redis_client_getter(self): async def test_close_properly_closes_connection(self): """Test close() method properly closes Redis connection.""" - cwd = os.path.dirname(os.path.realpath(__file__)) - config_path = os.path.join(cwd, "test.conf") - fq = FQ(config_path) + fq = FQ(build_test_config()) await fq._initialize() self.assertIsNotNone(fq._r) @@ -1808,79 +1801,54 @@ async def test_close_properly_closes_connection(self): async def test_close_with_none_client(self): """Test close() when redis client is None.""" - cwd = os.path.dirname(os.path.realpath(__file__)) - config_path = os.path.join(cwd, "test.conf") - fq = FQ(config_path) + fq = FQ(build_test_config()) # Don't initialize, so _r is None await fq.close() # Should not crash self.assertIsNone(fq._r) async def test_initialize_unix_socket_connection(self): """Test initialization with Unix socket connection - tests line 59.""" - # Create a temporary config with unix_sock - with tempfile.NamedTemporaryFile(mode='w', suffix='.conf', delete=False) as f: - f.write("""[fq] -job_expire_interval : 5000 -job_requeue_interval : 5000 -default_job_requeue_limit : -1 - -[redis] -db : 0 -key_prefix : test_fq_unix -conn_type : unix_sock -unix_socket_path : /tmp/redis_nonexistent.sock -""") - config_path = f.name - - try: - # Create a mock Redis class to capture initialization parameters - mock_redis_instance = MagicMock() - mock_redis_instance.ping = AsyncMock(return_value=True) - mock_redis_instance.register_script = MagicMock(return_value=MagicMock()) - mock_redis_instance.aclose = AsyncMock() - - redis_init_kwargs = {} - - def mock_redis_constructor(**kwargs): - redis_init_kwargs.update(kwargs) - return mock_redis_instance - - # Patch Redis to intercept the initialization - with unittest.mock.patch('fq.queue.Redis', side_effect=mock_redis_constructor): - fq = FQ(config_path) - await fq._initialize() - - # Verify that Redis was initialized with unix_socket_path - self.assertIn('unix_socket_path', redis_init_kwargs) - self.assertEqual(redis_init_kwargs['unix_socket_path'], '/tmp/redis_nonexistent.sock') - self.assertEqual(int(redis_init_kwargs['db']), 0) - - await fq.close() - finally: - os.unlink(config_path) + config = build_test_config( + redis={ + "key_prefix": "test_fq_unix", + "conn_type": "unix_sock", + "unix_socket_path": "/tmp/redis_nonexistent.sock", + } + ) + + # Create a mock Redis class to capture initialization parameters + mock_redis_instance = MagicMock() + mock_redis_instance.ping = AsyncMock(return_value=True) + mock_redis_instance.register_script = MagicMock(return_value=MagicMock()) + mock_redis_instance.aclose = AsyncMock() + + redis_init_kwargs = {} + + def mock_redis_constructor(**kwargs): + redis_init_kwargs.update(kwargs) + return mock_redis_instance + + # Patch Redis to intercept the initialization + with unittest.mock.patch("fq.queue.Redis", side_effect=mock_redis_constructor): + fq = FQ(config) + await fq._initialize() + + # Verify that Redis was initialized with unix_socket_path + self.assertIn("unix_socket_path", redis_init_kwargs) + self.assertEqual( + redis_init_kwargs["unix_socket_path"], "/tmp/redis_nonexistent.sock" + ) + self.assertEqual(int(redis_init_kwargs["db"]), 0) + + await fq.close() async def test_initialize_unknown_connection_type(self): """Test initialization with invalid connection type raises error - tests line 88.""" - with tempfile.NamedTemporaryFile(mode='w', suffix='.conf', delete=False) as f: - f.write("""[fq] -job_expire_interval : 5000 -job_requeue_interval : 5000 -default_job_requeue_limit : -1 - -[redis] -db : 0 -key_prefix : test_fq -conn_type : invalid_type -""") - config_path = f.name - - try: - fq = FQ(config_path) - # This tests line 88 - unknown conn_type - with self.assertRaisesRegex(FQException, "Unknown redis conn_type"): - await fq._initialize() - finally: - os.unlink(config_path) + config = build_test_config(redis={"conn_type": "invalid_type"}) + fq = FQ(config) + # This tests line 88 - unknown conn_type + with self.assertRaisesRegex(FQException, "Unknown redis conn_type"): + await fq._initialize() async def test_clear_queue_with_purge_all_and_string_job_uuid(self): """Test clear_queue with purge_all=True handles string job UUIDs - tests lines 464, 468.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index 840996d..78565e5 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,20 +1,18 @@ # -*- coding: utf-8 -*- # Copyright (c) 2014 Plivo Team. See LICENSE.txt for details. -import os import unittest from datetime import date from fq import FQ from fq.exceptions import BadArgumentException +from tests.config import build_test_config class FQTest(unittest.IsolatedAsyncioTestCase): """The FQTest contains test cases which validate the FQ interface.""" async def asyncSetUp(self): - cwd = os.path.dirname(os.path.realpath(__file__)) - config_path = os.path.join(cwd, "test.conf") # test config - self.queue = FQ(config_path) + self.queue = FQ(build_test_config()) await self.queue._initialize() self.valid_queue_type = "5m5_qu-eue" @@ -63,7 +61,7 @@ async def asyncSetUp(self): async def asyncTearDown(self): # flush redis at the end and close connection await self.queue._r.flushdb() - await self.queue._r.aclose() + await self.queue.close() # ---------- enqueue ---------- From 56675148a678c147f58ad95a76e6782098462cad Mon Sep 17 00:00:00 2001 From: "Ochui, Princewill Patrick" <21917688+ochui@users.noreply.github.com> Date: Tue, 24 Mar 2026 17:47:23 +0100 Subject: [PATCH 2/4] Remove legacy configuration files for fq and test --- src/fq/default.conf | 14 -------------- tests/test.conf | 17 ----------------- 2 files changed, 31 deletions(-) delete mode 100644 src/fq/default.conf delete mode 100644 tests/test.conf diff --git a/src/fq/default.conf b/src/fq/default.conf deleted file mode 100644 index 6163c9b..0000000 --- a/src/fq/default.conf +++ /dev/null @@ -1,14 +0,0 @@ -[fq] -job_expire_interval : 120000 ; in milliseconds -job_requeue_interval : 5000 ; in milliseconds -default_job_requeue_limit : 0 ; value of -1 retries infinitely - -[redis] -db = 0 -key_prefix = sharq_server -conn_type = tcp_sock -unix_socket_path = /tmp/redis.sock -port = 6379 -host = 127.0.0.1 -clustered = false -password = diff --git a/tests/test.conf b/tests/test.conf deleted file mode 100644 index b8fd6eb..0000000 --- a/tests/test.conf +++ /dev/null @@ -1,17 +0,0 @@ -[fq] -job_expire_interval : 5000 -job_requeue_interval : 5000 -default_job_requeue_limit : -1 - -[redis] -db : 0 -key_prefix : test_fq -conn_type : tcp_sock -; or unix_sock -;; unix connection settings -unix_socket_path : /tmp/redis.sock -;; tcp connection settings -port : 6379 -host : 127.0.0.1 -clustered : false -password : From 18ecef87501be21d5551fce78f8781244aa4e3e1 Mon Sep 17 00:00:00 2001 From: "Ochui, Princewill Patrick" <21917688+ochui@users.noreply.github.com> Date: Tue, 24 Mar 2026 18:02:40 +0100 Subject: [PATCH 3/4] Adds strict config validation and improves error messages Strengthens configuration validation by raising descriptive errors for missing or invalid config values during object construction, ensuring misconfigurations are caught early. Updates tests to reflect the new validation logic and error paths, improving clarity and robustness of error handling. --- src/fq/queue.py | 87 +++++++++++++++++++++++++++++++++++++--- tests/test_edge_cases.py | 31 ++++++++++---- tests/test_func.py | 17 ++++---- tests/test_queue.py | 2 +- 4 files changed, 114 insertions(+), 23 deletions(-) diff --git a/src/fq/queue.py b/src/fq/queue.py index 3574306..cde545c 100644 --- a/src/fq/queue.py +++ b/src/fq/queue.py @@ -53,14 +53,91 @@ def __init__(self, config): if "redis" not in normalized or "fq" not in normalized: raise FQException("Config missing required sections: redis, fq") + redis_config = normalized["redis"] + fq_config = normalized["fq"] + + if "key_prefix" not in redis_config: + raise FQException("Missing config: redis.key_prefix") + if not isinstance(redis_config["key_prefix"], str) or not redis_config[ + "key_prefix" + ]: + raise FQException( + "Invalid config: redis.key_prefix must be a non-empty string" + ) + + if "conn_type" not in redis_config: + raise FQException("Missing config: redis.conn_type") + if redis_config["conn_type"] not in {"tcp_sock", "unix_sock"}: + raise FQException( + "Invalid config: redis.conn_type must be 'tcp_sock' or 'unix_sock'" + ) + + if "db" not in redis_config: + raise FQException("Missing config: redis.db") + if isinstance(redis_config["db"], bool) or not isinstance( + redis_config["db"], int + ): + raise FQException("Invalid config: redis.db must be an integer") + + if "job_expire_interval" not in fq_config: + raise FQException("Missing config: fq.job_expire_interval") + if not is_valid_interval(fq_config["job_expire_interval"]): + raise FQException( + "Invalid config: fq.job_expire_interval must be a positive integer" + ) + + if "job_requeue_interval" not in fq_config: + raise FQException("Missing config: fq.job_requeue_interval") + if not is_valid_interval(fq_config["job_requeue_interval"]): + raise FQException( + "Invalid config: fq.job_requeue_interval must be a positive integer" + ) + + if "default_job_requeue_limit" not in fq_config: + raise FQException("Missing config: fq.default_job_requeue_limit") + if not is_valid_requeue_limit(fq_config["default_job_requeue_limit"]): + raise FQException( + "Invalid config: fq.default_job_requeue_limit must be an integer >= -1" + ) + + if redis_config["conn_type"] == "unix_sock": + if "unix_socket_path" not in redis_config: + raise FQException("Missing config: redis.unix_socket_path") + if not isinstance(redis_config["unix_socket_path"], str) or not redis_config[ + "unix_socket_path" + ]: + raise FQException( + "Invalid config: redis.unix_socket_path must be a non-empty string" + ) + + if redis_config["conn_type"] == "tcp_sock": + if "host" not in redis_config: + raise FQException("Missing config: redis.host") + if not isinstance(redis_config["host"], str) or not redis_config["host"]: + raise FQException( + "Invalid config: redis.host must be a non-empty string" + ) + + if "port" not in redis_config: + raise FQException("Missing config: redis.port") + if isinstance(redis_config["port"], bool) or not isinstance( + redis_config["port"], int + ): + raise FQException("Invalid config: redis.port must be an integer") + + if "clustered" in redis_config and not isinstance( + redis_config["clustered"], bool + ): + raise FQException("Invalid config: redis.clustered must be a boolean") + + if "password" in redis_config and redis_config["password"] is not None: + if not isinstance(redis_config["password"], str): + raise FQException("Invalid config: redis.password must be a string") + self.config = normalized async def initialize(self): """Async initializer to set up redis and lua scripts.""" - await self._initialize() - - async def _initialize(self): - """Read the FQ configuration and set up redis + Lua scripts.""" fq_config = self.config["fq"] redis_config = self.config["redis"] @@ -79,8 +156,6 @@ async def _initialize(self): elif redis_connection_type == "tcp_sock": isclustered = False if "clustered" in redis_config: - if not isinstance(redis_config["clustered"], bool): - raise FQException("redis.clustered must be a boolean") isclustered = redis_config["clustered"] if isclustered: diff --git a/tests/test_edge_cases.py b/tests/test_edge_cases.py index d6924dc..b84a51d 100644 --- a/tests/test_edge_cases.py +++ b/tests/test_edge_cases.py @@ -146,21 +146,36 @@ async def test_cluster_initialization(self): ) with patch("fq.queue.RedisCluster", FakeCluster): fq = FQ(config) - await fq._initialize() + await fq.initialize() self.assertIsInstance(fq.redis_client(), FakeCluster) await fq.close() - async def test_clustered_config_must_be_boolean(self): + def test_clustered_config_must_be_boolean(self): config = build_test_config(redis={"clustered": "true"}) - fq = FQ(config) - with self.assertRaisesRegex(FQException, "redis.clustered must be a boolean"): - await fq._initialize() + with self.assertRaisesRegex( + FQException, "Invalid config: redis.clustered must be a boolean" + ): + FQ(config) + + def test_missing_required_config_key_raises_with_path(self): + config = build_test_config() + del config["redis"]["key_prefix"] + with self.assertRaisesRegex(FQException, "Missing config: redis.key_prefix"): + FQ(config) + + def test_invalid_config_value_raises_with_path(self): + config = build_test_config(fq={"job_expire_interval": "5000"}) + with self.assertRaisesRegex( + FQException, + "Invalid config: fq.job_expire_interval must be a positive integer", + ): + FQ(config) async def test_dequeue_payload_none(self): """Covers dequeue branch where payload is None (queue.py line 212).""" fq = FQ(self.config) self.fq_instance = fq - await fq._initialize() + await fq.initialize() fake_dequeue = FakeLuaDequeue() fq._lua_dequeue = fake_dequeue result = await fq.dequeue() @@ -171,7 +186,7 @@ async def test_clear_queue_delete_only(self): """Covers clear_queue else branch (queue.py lines 499, 502).""" fq = FQ(self.config) self.fq_instance = fq - await fq._initialize() + await fq.initialize() await fq._r.flushdb() response = await fq.clear_queue(queue_type="noqueue", queue_id="missing") self.assertEqual(response["status"], "Failure") @@ -221,7 +236,7 @@ async def test_deep_status_real_redis(self): """Covers deep_status with real redis (queue.py line 420).""" fq = FQ(self.config) self.fq_instance = fq - await fq._initialize() + await fq.initialize() result = await fq.deep_status() self.assertTrue(result) diff --git a/tests/test_func.py b/tests/test_func.py index fd1aff3..a2035d6 100644 --- a/tests/test_func.py +++ b/tests/test_func.py @@ -24,7 +24,7 @@ class FQTestCase(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): self.queue = FQ(build_test_config()) # flush all the keys in the test db before starting test - await self.queue._initialize() + await self.queue.initialize() await self.queue._r.flushdb() # test specific values self._test_queue_id = "johndoe" @@ -1793,7 +1793,7 @@ async def test_redis_client_getter(self): async def test_close_properly_closes_connection(self): """Test close() method properly closes Redis connection.""" fq = FQ(build_test_config()) - await fq._initialize() + await fq.initialize() self.assertIsNotNone(fq._r) await fq.close() @@ -1831,7 +1831,7 @@ def mock_redis_constructor(**kwargs): # Patch Redis to intercept the initialization with unittest.mock.patch("fq.queue.Redis", side_effect=mock_redis_constructor): fq = FQ(config) - await fq._initialize() + await fq.initialize() # Verify that Redis was initialized with unix_socket_path self.assertIn("unix_socket_path", redis_init_kwargs) @@ -1843,12 +1843,13 @@ def mock_redis_constructor(**kwargs): await fq.close() async def test_initialize_unknown_connection_type(self): - """Test initialization with invalid connection type raises error - tests line 88.""" + """Test constructor validation with invalid connection type.""" config = build_test_config(redis={"conn_type": "invalid_type"}) - fq = FQ(config) - # This tests line 88 - unknown conn_type - with self.assertRaisesRegex(FQException, "Unknown redis conn_type"): - await fq._initialize() + with self.assertRaisesRegex( + FQException, + "Invalid config: redis.conn_type must be 'tcp_sock' or 'unix_sock'", + ): + FQ(config) async def test_clear_queue_with_purge_all_and_string_job_uuid(self): """Test clear_queue with purge_all=True handles string job UUIDs - tests lines 464, 468.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index 78565e5..e94a2f0 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -13,7 +13,7 @@ class FQTest(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): self.queue = FQ(build_test_config()) - await self.queue._initialize() + await self.queue.initialize() self.valid_queue_type = "5m5_qu-eue" self.invalid_queue_type_1 = "s!ms_queue" From 2968d718d29434f0b4494e053cad18e76591cf62 Mon Sep 17 00:00:00 2001 From: "Ochui, Princewill Patrick" <21917688+ochui@users.noreply.github.com> Date: Tue, 24 Mar 2026 18:13:10 +0100 Subject: [PATCH 4/4] Update authors in pyproject.toml to reflect the Flowdacity Development Team --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 3c1c34b..80a1492 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ dynamic = ["version"] description = "A simple Redis-based job queue system." readme = "README.md" license = {text = "MIT"} -authors = [{name = "Ochui Princewill", email = "ochui@flowdacity.com"}] +authors = [{name = "Flowdacity Development Team", email = "admin@flowdacity.com"}] requires-python = ">=3.12" dependencies = [ "msgpack>=1.1.2",