-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathproxy_pool.py
More file actions
160 lines (132 loc) · 4.7 KB
/
proxy_pool.py
File metadata and controls
160 lines (132 loc) · 4.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""
代理池模块 - 多代理轮换与健康检查
特性:
- 多代理自动轮换
- 代理健康检查
- 失败自动剔除
- 支持 HTTP/SOCKS5
"""
import asyncio
import aiohttp
import random
from typing import List, Optional, Dict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from loguru import logger
@dataclass
class ProxyInfo:
"""代理信息"""
url: str
protocol: str = "http" # http, socks5
fail_count: int = 0
success_count: int = 0
last_check: Optional[datetime] = None
is_healthy: bool = True
class ProxyPool:
"""代理池管理器"""
def __init__(
self,
proxies: List[str] = None,
max_fail_count: int = 3,
health_check_interval: int = 300
):
self.proxies: List[ProxyInfo] = []
self.max_fail_count = max_fail_count
self.health_check_interval = health_check_interval
self._index = 0
self._lock = asyncio.Lock()
if proxies:
for p in proxies:
self.add_proxy(p)
def add_proxy(self, proxy_url: str):
"""添加代理"""
protocol = "socks5" if proxy_url.startswith("socks") else "http"
self.proxies.append(ProxyInfo(url=proxy_url, protocol=protocol))
logger.info(f"添加代理: {proxy_url[:30]}...")
def remove_proxy(self, proxy_url: str):
"""移除代理"""
self.proxies = [p for p in self.proxies if p.url != proxy_url]
async def get_proxy(self) -> Optional[str]:
"""获取下一个可用代理(轮询)"""
async with self._lock:
healthy = [p for p in self.proxies if p.is_healthy]
if not healthy:
return None
# 轮询选择
self._index = (self._index + 1) % len(healthy)
return healthy[self._index].url
async def get_random_proxy(self) -> Optional[str]:
"""随机获取可用代理"""
healthy = [p for p in self.proxies if p.is_healthy]
if not healthy:
return None
return random.choice(healthy).url
async def report_success(self, proxy_url: str):
"""报告代理成功"""
for p in self.proxies:
if p.url == proxy_url:
p.success_count += 1
p.fail_count = 0
p.is_healthy = True
break
async def report_failure(self, proxy_url: str):
"""报告代理失败"""
for p in self.proxies:
if p.url == proxy_url:
p.fail_count += 1
if p.fail_count >= self.max_fail_count:
p.is_healthy = False
logger.warning(f"代理标记为不健康: {proxy_url[:30]}...")
break
async def health_check(self, test_url: str = "https://api.openai.com"):
"""健康检查所有代理"""
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session:
for proxy in self.proxies:
try:
async with session.get(test_url, proxy=proxy.url) as resp:
if resp.status < 500:
proxy.is_healthy = True
proxy.fail_count = 0
else:
proxy.fail_count += 1
except Exception:
proxy.fail_count += 1
if proxy.fail_count >= self.max_fail_count:
proxy.is_healthy = False
proxy.last_check = datetime.now()
def get_stats(self) -> Dict:
"""获取代理池统计"""
healthy = sum(1 for p in self.proxies if p.is_healthy)
return {
"total": len(self.proxies),
"healthy": healthy,
"unhealthy": len(self.proxies) - healthy,
"proxies": [
{
"url": p.url[:30] + "...",
"healthy": p.is_healthy,
"success": p.success_count,
"fail": p.fail_count
}
for p in self.proxies
]
}
@property
def has_healthy_proxy(self) -> bool:
"""是否有可用代理"""
return any(p.is_healthy for p in self.proxies)
# 全局代理池
_proxy_pool: Optional[ProxyPool] = None
def init_proxy_pool(proxies: List[str]) -> ProxyPool:
"""初始化全局代理池"""
global _proxy_pool
_proxy_pool = ProxyPool(proxies)
return _proxy_pool
def get_proxy_pool() -> Optional[ProxyPool]:
"""获取全局代理池"""
return _proxy_pool
async def get_next_proxy() -> Optional[str]:
"""获取下一个代理"""
if _proxy_pool:
return await _proxy_pool.get_proxy()
return None