Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions feapder/db/redisdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from typing import Union, List

import redis
from redis.connection import Encoder as _Encoder

try:
from redis.connection import Encoder as _Encoder
except ImportError:
from redis._parsers.encoders import Encoder as _Encoder
from redis.exceptions import ConnectionError, TimeoutError
from redis.exceptions import DataError
from redis.sentinel import Sentinel
Expand Down Expand Up @@ -50,7 +54,11 @@ def encode(self, value):
return value


redis.connection.Encoder = Encoder
try:
redis.connection.Encoder = Encoder
except AttributeError:
import redis._parsers.encoders
redis._parsers.encoders.Encoder = Encoder


class RedisDB:
Expand Down Expand Up @@ -150,22 +158,22 @@ def get_connect(self):
self._service_name,
password=self._user_pass,
db=self._db,
redis_class=redis.StrictRedis,
redis_class=redis.Redis,
decode_responses=self._decode_responses,
max_connections=self._max_connections,
**self._kwargs,
)

else:
try:
from rediscluster import RedisCluster
except ModuleNotFoundError as e:
log.error('请安装 pip install "feapder[all]"')
os._exit(0)

# log.debug("使用redis集群模式")
from redis.cluster import RedisCluster, ClusterNode
except ImportError as e:
raise ImportError(
"当前 redis 版本不支持集群模式,请安装 redis>=4.1.0"
) from e
cluster_nodes = [ClusterNode(host=node["host"], port=int(node["port"])) for node in startup_nodes]
self._redis = RedisCluster(
startup_nodes=startup_nodes,
startup_nodes=cluster_nodes,
decode_responses=self._decode_responses,
password=self._user_pass,
max_connections=self._max_connections,
Expand All @@ -175,7 +183,7 @@ def get_connect(self):
self._is_redis_cluster = True
else:
ip, port = ip_ports[0].split(":")
self._redis = redis.StrictRedis(
self._redis = redis.Redis(
host=ip,
port=port,
db=self._db,
Expand All @@ -186,7 +194,7 @@ def get_connect(self):
)
self._is_redis_cluster = False
else:
self._redis = redis.StrictRedis.from_url(
self._redis = redis.Redis.from_url(
self._url, decode_responses=self._decode_responses, **self._kwargs
)
self._is_redis_cluster = False
Expand Down Expand Up @@ -298,14 +306,13 @@ def sdelete(self, table):
"""

# 当 SCAN 命令的游标参数被设置为 0 时, 服务器将开始一次新的迭代, 而当服务器向用户返回值为 0 的游标时, 表示迭代已结束
cursor = "0"
while cursor != 0:
cursor = 0
while True:
cursor, data = self._redis.sscan(table, cursor=cursor, count=500)
for item in data:
# pipe.srem(table, item)
self._redis.srem(table, item)

# pipe.execute()
if cursor == 0:
break

def sismember(self, table, key):
"Return a boolean indicating if ``value`` is a member of set ``name``"
Expand Down Expand Up @@ -925,9 +932,9 @@ def current_status(self, show_key=True, filter_key_by_used_memory=10 * 1024 * 10
header_style="title",
)

keys = self._redis.execute_command("keys *")
keys = self._redis.execute_command("KEYS", "*")
for key in tqdm(keys):
key_type = self._redis.execute_command("type {}".format(key))
key_type = self._redis.execute_command("TYPE", key)
if key_type == "set":
value_count = self._redis.scard(key)
elif key_type == "zset":
Expand All @@ -943,7 +950,7 @@ def current_status(self, show_key=True, filter_key_by_used_memory=10 * 1024 * 10
else:
raise TypeError("尚不支持 {} 类型的key".format(key_type))

used_memory = self._redis.execute_command("memory usage {}".format(key))
used_memory = self._redis.execute_command("MEMORY", "USAGE", key)
if used_memory >= filter_key_by_used_memory:
used_memory_human = (
"%0.2fMB" % (used_memory / 1024 / 1024) if used_memory else 0
Expand Down
2 changes: 1 addition & 1 deletion feapder/network/proxy_pool_old.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def get_proxy_from_redis(proxy_source_url, **kwargs):
@return: [{'http':'http://xxx.xxx.xxx:xxx', 'https':'http://xxx.xxx.xxx.xxx:xxx'}]
"""

redis_conn = redis.StrictRedis.from_url(proxy_source_url)
redis_conn = redis.Redis.from_url(proxy_source_url)
key = kwargs.get("redis_proxies_key")
assert key, "从redis中获取代理 需要指定 redis_proxies_key"
proxies = redis_conn.zrange(key, 0, -1)
Expand Down
4 changes: 2 additions & 2 deletions feapder/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ parsel>=1.5.2
PyExecJS>=1.5.1
pymongo>=3.10.1
PyMySQL>=0.9.3
redis>=2.10.6,<4.0.0
redis>=4.1.0
requests>=2.22.0
selenium>=3.141.0
bs4>=0.0.1
ipython>=7.14.0
bitarray>=1.5.3
redis-py-cluster>=2.1.0

cryptography>=3.3.2
urllib3>=1.25.8
loguru>=0.5.3
Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"DBUtils>=2.0",
"parsel>=1.5.2",
"PyMySQL>=0.9.3",
"redis>=2.10.6,<4.0.0",
"redis>=4.1.0",
"requests>=2.22.0",
"bs4>=0.0.1",
"ipython>=7.14.0",
Expand All @@ -60,7 +60,6 @@
"bitarray>=1.5.3",
"PyExecJS>=1.5.1",
"pymongo>=3.10.1",
"redis-py-cluster>=2.1.0",
] + render_requires

setuptools.setup(
Expand Down
59 changes: 59 additions & 0 deletions tests/test_redisdb_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import builtins
import inspect

import pytest

from feapder.db.redisdb import RedisDB


def test_redis_version_floor_synced():
with open("feapder/requirements.txt", "r", encoding="utf-8") as f:
requirements = f.read()
with open("setup.py", "r", encoding="utf-8") as f:
setup_content = f.read()
assert "redis>=4.1.0" in requirements
assert "redis>=4.1.0" in setup_content


def test_cluster_import_error_message(monkeypatch):
original_import = builtins.__import__

def fake_import(name, globals=None, locals=None, fromlist=(), level=0):
if name == "redis.cluster":
raise ImportError("mock cluster import error")
return original_import(name, globals, locals, fromlist, level)

monkeypatch.setattr(builtins, "__import__", fake_import)
with pytest.raises(ImportError, match="请安装 redis>=4.1.0"):
RedisDB(ip_ports="127.0.0.1:6379,127.0.0.1:6380", db=0, service_name="")


def test_sdelete_breaks_when_cursor_zero():
class FakeRedis:
def __init__(self):
self.calls = []
self._scan_result = [(1, ["a", "b"]), (0, ["c"])]
self._index = 0

def ping(self):
return True

def sscan(self, table, cursor=0, count=500):
result = self._scan_result[self._index]
self._index += 1
return result

def srem(self, table, item):
self.calls.append((table, item))

db = RedisDB.__new__(RedisDB)
db._RedisDB__redis = FakeRedis()
db.sdelete("test_set")
assert db._RedisDB__redis.calls == [("test_set", "a"), ("test_set", "b"), ("test_set", "c")]


def test_current_status_uses_tokenized_execute_command():
source = inspect.getsource(RedisDB.current_status)
assert 'execute_command("KEYS", "*")' in source
assert 'execute_command("TYPE", key)' in source
assert 'execute_command("MEMORY", "USAGE", key)' in source