From 40f46d877a3c8554f1018b949af69de5142bbcfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E5=B7=9D?= <531829935@qq.com> Date: Tue, 24 Mar 2026 13:36:37 +0800 Subject: [PATCH] =?UTF-8?q?Doubao-Seed-2.0-Code:=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E9=A2=91=E7=8E=87=E9=99=90=E5=88=B6=EF=BC=8C=E4=B8=8D=E5=BD=B1?= =?UTF-8?q?=E5=93=8D=E7=8E=B0=E6=9C=89=E6=8E=A5=E5=8F=A3=EF=BC=8C=E9=BB=98?= =?UTF-8?q?=E8=AE=A41=E5=88=86=E9=92=9F30=E6=AC=A1=E8=AF=B7=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- adata/__init__.py | 13 +++++ adata/common/exception/exception_msg.py | 2 + adata/common/utils/__init__.py | 1 + adata/common/utils/rate_limiter.py | 64 +++++++++++++++++++++++++ adata/common/utils/sunrequests.py | 21 +++++++- test_etf_rate_limit.py | 57 ++++++++++++++++++++++ 6 files changed, 157 insertions(+), 1 deletion(-) create mode 100644 adata/common/utils/rate_limiter.py create mode 100644 test_etf_rate_limit.py diff --git a/adata/__init__.py b/adata/__init__.py index dee08e2..8c175b7 100644 --- a/adata/__init__.py +++ b/adata/__init__.py @@ -11,6 +11,7 @@ from adata.__version__ import __version__ from adata.bond import bond from adata.common.utils.sunrequests import SunProxy +from adata.common.utils.rate_limiter import rate_limiter from adata.fund import fund from adata.sentiment import sentiment from adata.stock import stock @@ -33,6 +34,18 @@ def proxy(is_proxy=False, ip: str = None, proxy_url: str = None): return +def reset_rate_limit(url: str = None): + """ + 重置请求频率限制记录 + :param url: 可选,指定URL来重置特定域名的记录,不传则重置所有域名 + """ + if url: + rate_limiter.reset_domain(url) + else: + rate_limiter.reset_all() + return + + # set up logging logger = logging.getLogger("adata") diff --git a/adata/common/exception/exception_msg.py b/adata/common/exception/exception_msg.py index 455feb5..61fa32e 100644 --- a/adata/common/exception/exception_msg.py +++ b/adata/common/exception/exception_msg.py @@ -10,3 +10,5 @@ """同花顺Ip限制的返回结果""" THS_IP_LIMIT_MSG = "ths流量防控:当前ip被限制,请降低请求频率或更换ip或使用代理设置,勿使用国外ip!!!" """同花顺IP:403限制提醒""" +RATE_LIMIT_MSG = "请求频率超限:域名 {domain} 在 {window_seconds} 秒内最多允许 {max_count} 次请求,请稍后再试" +"""请求频率限制提醒""" diff --git a/adata/common/utils/__init__.py b/adata/common/utils/__init__.py index 9b4eeb9..9aee9ed 100644 --- a/adata/common/utils/__init__.py +++ b/adata/common/utils/__init__.py @@ -7,5 +7,6 @@ """ from .snowflake import worker from .sunrequests import sun_requests as requests +from .rate_limiter import rate_limiter diff --git a/adata/common/utils/rate_limiter.py b/adata/common/utils/rate_limiter.py new file mode 100644 index 0000000..3c6d9a6 --- /dev/null +++ b/adata/common/utils/rate_limiter.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +""" +@desc: 滑动窗口频率限制器 +@author: +@time: 2026/03/24 +""" +import threading +import time +from collections import defaultdict +from urllib.parse import urlparse + + +class SlidingWindowRateLimiter: + """滑动窗口频率限制器""" + + def __init__(self): + self._lock = threading.Lock() + self._requests = defaultdict(list) + + def check_and_record(self, url, max_count=30, window_seconds=60): + """ + 检查请求是否超过频率限制,并记录当前请求 + :param url: 请求的URL + :param max_count: 时间窗口内最大请求次数 + :param window_seconds: 时间窗口大小,单位秒 + :return: (是否允许请求, 剩余请求次数, 重置时间) + """ + domain = self._extract_domain(url) + current_time = time.time() + + with self._lock: + timestamps = self._requests[domain] + + cutoff = current_time - window_seconds + valid_timestamps = [ts for ts in timestamps if ts > cutoff] + self._requests[domain] = valid_timestamps + + if len(valid_timestamps) >= max_count: + valid_timestamps.sort() + reset_time = valid_timestamps[0] + window_seconds + return False, 0, reset_time + + self._requests[domain].append(current_time) + return True, max_count - len(self._requests[domain]), current_time + window_seconds + + def _extract_domain(self, url): + """提取完整域名""" + parsed = urlparse(url) + return parsed.netloc + + def reset_domain(self, url): + """重置指定域名的请求记录""" + domain = self._extract_domain(url) + with self._lock: + if domain in self._requests: + del self._requests[domain] + + def reset_all(self): + """重置所有域名的请求记录""" + with self._lock: + self._requests.clear() + + +rate_limiter = SlidingWindowRateLimiter() diff --git a/adata/common/utils/sunrequests.py b/adata/common/utils/sunrequests.py index 9fec0ed..e9b27db 100644 --- a/adata/common/utils/sunrequests.py +++ b/adata/common/utils/sunrequests.py @@ -13,6 +13,9 @@ import requests +from adata.common.exception.exception_msg import RATE_LIMIT_MSG +from adata.common.utils.rate_limiter import rate_limiter + class SunProxy(object): _data = {} @@ -46,7 +49,8 @@ def __init__(self, sun_proxy: SunProxy = None) -> None: super().__init__() self.sun_proxy = sun_proxy - def request(self, method='get', url=None, times=3, retry_wait_time=1588, proxies=None, wait_time=None, **kwargs): + def request(self, method='get', url=None, times=3, retry_wait_time=1588, proxies=None, wait_time=None, + rate_limit_enabled=True, rate_limit_count=30, rate_limit_window=60, **kwargs): """ 简单封装的请求,参考requests,增加循环次数和次数之间的等待时间 :param proxies: 代理配置 @@ -55,9 +59,24 @@ def request(self, method='get', url=None, times=3, retry_wait_time=1588, proxies :param times: 次数,int :param retry_wait_time: 重试等待时间,毫秒 :param wait_time: 等待时间:毫秒;表示每个请求的间隔时间,在请求之前等待sleep,主要用于防止请求太频繁的限制。 + :param rate_limit_enabled: 是否启用频率限制,默认:True + :param rate_limit_count: 时间窗口内最大请求次数,默认:30 + :param rate_limit_window: 时间窗口大小,单位秒,默认:60 :param kwargs: 其它 requests 参数,用法相同 :return: res """ + # 0. 频率限制检查 + if rate_limit_enabled and url: + allowed, remaining, reset_time = rate_limiter.check_and_record( + url, max_count=rate_limit_count, window_seconds=rate_limit_window + ) + if not allowed: + from urllib.parse import urlparse + domain = urlparse(url).netloc + raise Exception(RATE_LIMIT_MSG.format( + domain=domain, max_count=rate_limit_count, window_seconds=rate_limit_window + )) + # 1. 获取设置代理 proxies = self.__get_proxies(proxies) # 2. 请求数据结果 diff --git a/test_etf_rate_limit.py b/test_etf_rate_limit.py new file mode 100644 index 0000000..497c299 --- /dev/null +++ b/test_etf_rate_limit.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +""" +测试ETF行情接口的频率限制功能 +""" +import time +from adata.fund.market.etf_market_ths import ETFMarketThs +from adata import reset_rate_limit + +def test_etf_rate_limit(): + print("=== 测试ETF行情接口频率限制 ===") + + # 重置所有频率限制 + reset_rate_limit() + + etf_market = ETFMarketThs() + fund_code = '512880' + start_date = '2024-01-01' + + print(f"\n测试目标:调用 {fund_code} 的ETF行情接口") + print(f"默认限制:每分钟最多30次请求") + print("-" * 60) + + success_count = 0 + try: + # 尝试调用35次,应该在第31次触发限流 + for i in range(35): + print(f"\n第 {i+1} 次请求...") + try: + result = etf_market.get_market_etf_ths( + fund_code=fund_code, + start_date=start_date + ) + if isinstance(result, Exception): + print(f" 请求返回异常: {result}") + else: + success_count += 1 + print(f" 请求成功!获取到 {len(result)} 条数据") + except Exception as e: + print(f" 捕获到异常: {type(e).__name__}: {e}") + print(f"\n{'='*60}") + print(f"频率限制已触发!") + print(f"成功请求次数: {success_count}") + print(f"{'='*60}") + break + + # 稍微间隔一下,避免请求过快同时也能测试频率限制 + time.sleep(0.1) + + except KeyboardInterrupt: + print("\n测试被用户中断") + except Exception as e: + print(f"\n发生未预期的错误: {e}") + + print("\n=== 测试完成 ===") + +if __name__ == "__main__": + test_etf_rate_limit()