diff --git a/feapder/db/redisdb.py b/feapder/db/redisdb.py index d882e68..41d4a91 100644 --- a/feapder/db/redisdb.py +++ b/feapder/db/redisdb.py @@ -11,7 +11,11 @@ from typing import Union, List import redis -from redis.connection import Encoder as _Encoder + +try: + from redis.connection import Encoder as _Encoder +except ImportError: + from redis._parsers.encoders import Encoder as _Encoder from redis.exceptions import ConnectionError, TimeoutError from redis.exceptions import DataError from redis.sentinel import Sentinel @@ -50,7 +54,11 @@ def encode(self, value): return value -redis.connection.Encoder = Encoder +try: + redis.connection.Encoder = Encoder +except AttributeError: + import redis._parsers.encoders + redis._parsers.encoders.Encoder = Encoder class RedisDB: @@ -150,7 +158,7 @@ def get_connect(self): self._service_name, password=self._user_pass, db=self._db, - redis_class=redis.StrictRedis, + redis_class=redis.Redis, decode_responses=self._decode_responses, max_connections=self._max_connections, **self._kwargs, @@ -158,14 +166,14 @@ def get_connect(self): else: try: - from rediscluster import RedisCluster - except ModuleNotFoundError as e: - log.error('请安装 pip install "feapder[all]"') - os._exit(0) - - # log.debug("使用redis集群模式") + from redis.cluster import RedisCluster, ClusterNode + except ImportError as e: + raise ImportError( + "当前 redis 版本不支持集群模式,请安装 redis>=4.1.0" + ) from e + cluster_nodes = [ClusterNode(host=node["host"], port=int(node["port"])) for node in startup_nodes] self._redis = RedisCluster( - startup_nodes=startup_nodes, + startup_nodes=cluster_nodes, decode_responses=self._decode_responses, password=self._user_pass, max_connections=self._max_connections, @@ -175,7 +183,7 @@ def get_connect(self): self._is_redis_cluster = True else: ip, port = ip_ports[0].split(":") - self._redis = redis.StrictRedis( + self._redis = redis.Redis( host=ip, port=port, db=self._db, @@ -186,7 +194,7 @@ def get_connect(self): ) self._is_redis_cluster = False else: - self._redis = redis.StrictRedis.from_url( + self._redis = redis.Redis.from_url( self._url, decode_responses=self._decode_responses, **self._kwargs ) self._is_redis_cluster = False @@ -298,14 +306,13 @@ def sdelete(self, table): """ # 当 SCAN 命令的游标参数被设置为 0 时, 服务器将开始一次新的迭代, 而当服务器向用户返回值为 0 的游标时, 表示迭代已结束 - cursor = "0" - while cursor != 0: + cursor = 0 + while True: cursor, data = self._redis.sscan(table, cursor=cursor, count=500) for item in data: - # pipe.srem(table, item) self._redis.srem(table, item) - - # pipe.execute() + if cursor == 0: + break def sismember(self, table, key): "Return a boolean indicating if ``value`` is a member of set ``name``" @@ -925,9 +932,9 @@ def current_status(self, show_key=True, filter_key_by_used_memory=10 * 1024 * 10 header_style="title", ) - keys = self._redis.execute_command("keys *") + keys = self._redis.execute_command("KEYS", "*") for key in tqdm(keys): - key_type = self._redis.execute_command("type {}".format(key)) + key_type = self._redis.execute_command("TYPE", key) if key_type == "set": value_count = self._redis.scard(key) elif key_type == "zset": @@ -943,7 +950,7 @@ def current_status(self, show_key=True, filter_key_by_used_memory=10 * 1024 * 10 else: raise TypeError("尚不支持 {} 类型的key".format(key_type)) - used_memory = self._redis.execute_command("memory usage {}".format(key)) + used_memory = self._redis.execute_command("MEMORY", "USAGE", key) if used_memory >= filter_key_by_used_memory: used_memory_human = ( "%0.2fMB" % (used_memory / 1024 / 1024) if used_memory else 0 diff --git a/feapder/network/proxy_pool_old.py b/feapder/network/proxy_pool_old.py index 2e3bb6c..fddaabc 100644 --- a/feapder/network/proxy_pool_old.py +++ b/feapder/network/proxy_pool_old.py @@ -147,7 +147,7 @@ def get_proxy_from_redis(proxy_source_url, **kwargs): @return: [{'http':'http://xxx.xxx.xxx:xxx', 'https':'http://xxx.xxx.xxx.xxx:xxx'}] """ - redis_conn = redis.StrictRedis.from_url(proxy_source_url) + redis_conn = redis.Redis.from_url(proxy_source_url) key = kwargs.get("redis_proxies_key") assert key, "从redis中获取代理 需要指定 redis_proxies_key" proxies = redis_conn.zrange(key, 0, -1) diff --git a/feapder/requirements.txt b/feapder/requirements.txt index 2171767..3cdb95d 100644 --- a/feapder/requirements.txt +++ b/feapder/requirements.txt @@ -4,13 +4,13 @@ parsel>=1.5.2 PyExecJS>=1.5.1 pymongo>=3.10.1 PyMySQL>=0.9.3 -redis>=2.10.6,<4.0.0 +redis>=4.1.0 requests>=2.22.0 selenium>=3.141.0 bs4>=0.0.1 ipython>=7.14.0 bitarray>=1.5.3 -redis-py-cluster>=2.1.0 + cryptography>=3.3.2 urllib3>=1.25.8 loguru>=0.5.3 diff --git a/setup.py b/setup.py index cf4fe54..21c62b4 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,7 @@ "DBUtils>=2.0", "parsel>=1.5.2", "PyMySQL>=0.9.3", - "redis>=2.10.6,<4.0.0", + "redis>=4.1.0", "requests>=2.22.0", "bs4>=0.0.1", "ipython>=7.14.0", @@ -60,7 +60,6 @@ "bitarray>=1.5.3", "PyExecJS>=1.5.1", "pymongo>=3.10.1", - "redis-py-cluster>=2.1.0", ] + render_requires setuptools.setup( diff --git a/tests/test_redisdb_compat.py b/tests/test_redisdb_compat.py new file mode 100644 index 0000000..2b2c697 --- /dev/null +++ b/tests/test_redisdb_compat.py @@ -0,0 +1,59 @@ +import builtins +import inspect + +import pytest + +from feapder.db.redisdb import RedisDB + + +def test_redis_version_floor_synced(): + with open("feapder/requirements.txt", "r", encoding="utf-8") as f: + requirements = f.read() + with open("setup.py", "r", encoding="utf-8") as f: + setup_content = f.read() + assert "redis>=4.1.0" in requirements + assert "redis>=4.1.0" in setup_content + + +def test_cluster_import_error_message(monkeypatch): + original_import = builtins.__import__ + + def fake_import(name, globals=None, locals=None, fromlist=(), level=0): + if name == "redis.cluster": + raise ImportError("mock cluster import error") + return original_import(name, globals, locals, fromlist, level) + + monkeypatch.setattr(builtins, "__import__", fake_import) + with pytest.raises(ImportError, match="请安装 redis>=4.1.0"): + RedisDB(ip_ports="127.0.0.1:6379,127.0.0.1:6380", db=0, service_name="") + + +def test_sdelete_breaks_when_cursor_zero(): + class FakeRedis: + def __init__(self): + self.calls = [] + self._scan_result = [(1, ["a", "b"]), (0, ["c"])] + self._index = 0 + + def ping(self): + return True + + def sscan(self, table, cursor=0, count=500): + result = self._scan_result[self._index] + self._index += 1 + return result + + def srem(self, table, item): + self.calls.append((table, item)) + + db = RedisDB.__new__(RedisDB) + db._RedisDB__redis = FakeRedis() + db.sdelete("test_set") + assert db._RedisDB__redis.calls == [("test_set", "a"), ("test_set", "b"), ("test_set", "c")] + + +def test_current_status_uses_tokenized_execute_command(): + source = inspect.getsource(RedisDB.current_status) + assert 'execute_command("KEYS", "*")' in source + assert 'execute_command("TYPE", key)' in source + assert 'execute_command("MEMORY", "USAGE", key)' in source