-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathvalidator.py
More file actions
1517 lines (1275 loc) · 61.9 KB
/
validator.py
File metadata and controls
1517 lines (1275 loc) · 61.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
验证器模块 - 异步 API Key 验证 + 深度价值评估
核心特性:
1. AsyncIO + aiohttp 高并发验证 (100 并发)
2. 深度价值评估 (GPT-4 探测、余额检测、RPM 透视)
3. 状态细分(valid, invalid, quota_exceeded, connection_error)
4. UI 仪表盘集成
v2.1 优化:
- 导出 OptimizedAsyncValidator(使用连接池和智能重试)
- 保持原有 AsyncValidator 向后兼容
"""
import asyncio
import ssl
from typing import Tuple, Optional, Dict, Any
from datetime import datetime
from dataclasses import dataclass
import aiohttp
from aiohttp import ClientTimeout, TCPConnector
from loguru import logger
from config import (
config,
PROTECTED_DOMAINS,
SAFE_HTTP_STATUS_CODES,
CIRCUIT_BREAKER_HTTP_CODES,
CIRCUIT_BREAKER_FAILURE_THRESHOLD,
CIRCUIT_BREAKER_RECOVERY_TIMEOUT,
CIRCUIT_BREAKER_HALF_OPEN_REQUESTS
)
from database import Database, LeakedKey, KeyStatus
# ============================================================================
# 常量与配置
# ============================================================================
# 最大并发数
MAX_CONCURRENCY = 100
# 请求超时
REQUEST_TIMEOUT = ClientTimeout(total=15, connect=10)
# 高价值模型列表
HIGH_VALUE_MODELS = ['gpt-4', 'gpt-4-turbo', 'gpt-4o', 'gpt-4-32k', 'claude-3-opus']
# RPM 阈值分级
RPM_ENTERPRISE_THRESHOLD = 3000 # >= 3000 为企业级
RPM_FREE_TRIAL_THRESHOLD = 20 # <= 20 为免费试用
# ============================================================================
# 熍断器 (Circuit Breaker) - 防误杀保护
# ============================================================================
from enum import Enum
from urllib.parse import urlparse
import time
import threading
class CircuitState(Enum):
"""熍断器状态"""
CLOSED = "closed" # 正常,允许请求
OPEN = "open" # 熍断,拒绝请求
HALF_OPEN = "half_open" # 半开,允许试探请求
class CircuitBreaker:
"""
域名熍断器 - 带防误杀保护
核心安全逻辑:
1. 受保护域名白名单 - 永远不熍断
2. 严格错误界定 - 只有连接层错误才触发熍断
"""
def __init__(self):
# 域名 -> 状态信息
self._domain_states: Dict[str, dict] = {}
self._lock = threading.Lock() # 线程安全
@staticmethod
def _extract_domain(url: str) -> str:
"""从 URL 提取域名"""
try:
parsed = urlparse(url)
return parsed.netloc.lower().split(':')[0] # 移除端口号
except Exception:
return ""
@staticmethod
def _is_protected_domain(domain: str) -> bool:
"""
检查域名是否受保护
支持后缀匹配,例如 my-resource.openai.azure.com 会匹配 openai.azure.com
"""
if not domain:
return True # 空域名默认保护
# 精确匹配
if domain in PROTECTED_DOMAINS:
return True
# 后缀匹配(用于 Azure 等动态子域名)
for protected in PROTECTED_DOMAINS:
if domain.endswith('.' + protected) or domain.endswith(protected):
return True
return False
@staticmethod
def _is_network_error(error: Exception = None, http_status: int = None) -> bool:
"""
判断是否为网络层错误(应触发熍断)
网络层错误:
- 连接拒绝/DNS 失败 (ClientConnectorError)
- 超时 (TimeoutError)
- 网关错误 502/503/504
应用层错误(不触发熍断):
- 401/403/429 等业务错误
"""
# 检查异常类型
if error is not None:
if isinstance(error, (aiohttp.ClientConnectorError, asyncio.TimeoutError)):
return True
# ServerDisconnectedError 等也算网络错误
if isinstance(error, aiohttp.ServerDisconnectedError):
return True
# 检查 HTTP 状态码
if http_status is not None:
if http_status in CIRCUIT_BREAKER_HTTP_CODES:
return True
# 应用层错误不触发熍断
if http_status in SAFE_HTTP_STATUS_CODES:
return False
return False
async def get_state(self, url: str) -> CircuitState:
"""获取域名的熍断状态"""
domain = self._extract_domain(url)
# 受保护域名永远返回 CLOSED
if self._is_protected_domain(domain):
return CircuitState.CLOSED
with self._lock:
if domain not in self._domain_states:
return CircuitState.CLOSED
state_info = self._domain_states[domain]
current_state = state_info.get('state', CircuitState.CLOSED)
# 检查是否应从 OPEN 转为 HALF_OPEN
if current_state == CircuitState.OPEN:
open_time = state_info.get('open_time', 0)
if time.time() - open_time >= CIRCUIT_BREAKER_RECOVERY_TIMEOUT:
state_info['state'] = CircuitState.HALF_OPEN
state_info['half_open_requests'] = 0
return CircuitState.HALF_OPEN
return current_state
async def is_allowed(self, url: str) -> bool:
"""检查请求是否允许"""
state = await self.get_state(url)
if state == CircuitState.CLOSED:
return True
elif state == CircuitState.OPEN:
return False
else: # HALF_OPEN
domain = self._extract_domain(url)
with self._lock:
state_info = self._domain_states.get(domain, {})
half_open_requests = state_info.get('half_open_requests', 0)
if half_open_requests < CIRCUIT_BREAKER_HALF_OPEN_REQUESTS:
state_info['half_open_requests'] = half_open_requests + 1
return True
return False
async def record_success(self, url: str):
"""记录成功请求"""
domain = self._extract_domain(url)
if self._is_protected_domain(domain):
return
with self._lock:
if domain in self._domain_states:
# 成功后重置状态
self._domain_states[domain] = {
'state': CircuitState.CLOSED,
'failure_count': 0
}
async def record_failure(
self,
url: str,
error: Exception = None,
http_status: int = None
):
"""
记录失败请求 - 带防误杀保护
Args:
url: 请求 URL
error: 异常对象
http_status: HTTP 状态码
"""
domain = self._extract_domain(url)
# ========== 安全检查 1: 受保护域名 ==========
if self._is_protected_domain(domain):
return # 直接忽略,不记录失败
# ========== 安全检查 2: 应用层错误 ==========
if not self._is_network_error(error, http_status):
return # 业务错误不触发熍断
# ========== 记录网络层失败 ==========
with self._lock:
if domain not in self._domain_states:
self._domain_states[domain] = {
'state': CircuitState.CLOSED,
'failure_count': 0
}
state_info = self._domain_states[domain]
state_info['failure_count'] = state_info.get('failure_count', 0) + 1
# 检查是否触发熍断
if state_info['failure_count'] >= CIRCUIT_BREAKER_FAILURE_THRESHOLD:
state_info['state'] = CircuitState.OPEN
state_info['open_time'] = time.time()
async def get_stats(self) -> Dict[str, Any]:
"""获取熍断器统计信息"""
with self._lock:
stats = {}
for domain, info in self._domain_states.items():
stats[domain] = {
'state': info.get('state', CircuitState.CLOSED).value,
'failure_count': info.get('failure_count', 0)
}
return stats
# 全局熍断器实例
circuit_breaker = CircuitBreaker()
def mask_key(key: str) -> str:
"""遮蔽 API Key 中间部分"""
if len(key) <= 12:
return key[:4] + "..." + key[-4:]
return key[:8] + "..." + key[-4:]
@dataclass
class ValidationResult:
"""验证结果数据类"""
status: KeyStatus
info: str
model_tier: str = ""
rpm: int = 0
balance_usd: float = 0.0
is_high_value: bool = False
class AsyncValidator:
"""异步 API Key 验证器 - 集成熍断器保护"""
def __init__(self, db: Database, dashboard=None):
self.db = db
self.dashboard = dashboard
self.semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
self._session: Optional[aiohttp.ClientSession] = None
self._circuit_breaker = circuit_breaker # 使用全局熍断器
async def _get_session(self) -> aiohttp.ClientSession:
"""获取或创建 aiohttp session"""
if self._session is None or self._session.closed:
# 配置代理
connector = TCPConnector(
limit=MAX_CONCURRENCY,
ssl=ssl.create_default_context(),
force_close=True
)
self._session = aiohttp.ClientSession(
connector=connector,
timeout=REQUEST_TIMEOUT,
trust_env=True # 支持环境变量代理
)
return self._session
async def close(self):
"""关闭 session"""
if self._session and not self._session.closed:
await self._session.close()
def _get_proxy(self) -> Optional[str]:
"""获取代理 URL"""
return config.proxy_url if config.proxy_url else None
def _log(self, message: str, level: str = "INFO"):
"""输出日志"""
if self.dashboard:
self.dashboard.add_log(message, level)
def _try_url_variants(self, base_url: str, path: str) -> list:
"""生成 URL 变体"""
base_url = base_url.rstrip('/')
path = path.lstrip('/')
variants = [f"{base_url}/{path}"]
if '/v1' not in base_url:
variants.append(f"{base_url}/v1/{path}")
if '/v1' in base_url:
base_without_v1 = base_url.replace('/v1', '')
variants.append(f"{base_without_v1}/v1/{path}")
return variants
# ========================================================================
# 熍断器集成方法
# ========================================================================
async def _check_circuit_breaker(self, base_url: str) -> Optional[ValidationResult]:
"""
检查熍断器状态
Returns:
如果被熍断,返回 CONNECTION_ERROR 结果;否则返回 None
"""
if not config.circuit_breaker_enabled:
return None
if not await self._circuit_breaker.is_allowed(base_url):
self._log(f"熍断中: {base_url[:30]}...", "WARN")
return ValidationResult(KeyStatus.CONNECTION_ERROR, "域名熍断中")
return None
async def _record_circuit_result(
self,
url: str,
success: bool = False,
error: Exception = None,
http_status: int = None
):
"""记录请求结果到熍断器"""
if not config.circuit_breaker_enabled:
return
if success:
await self._circuit_breaker.record_success(url)
else:
await self._circuit_breaker.record_failure(url, error, http_status)
# ========================================================================
# 异步验证方法
# ========================================================================
def _is_likely_valid_relay(self, base_url: str) -> bool:
"""
检查 URL 是否可能是有效的中转站 + SSRF 防护
安全检查:
1. 排除明显不是 API 中转站的 URL
2. 阻止私有 IP 地址 (RFC1918, loopback, link-local)
3. 阻止危险域名后缀
"""
if not base_url:
return True # 空 URL 会使用默认值
url_lower = base_url.lower()
# ========== SSRF 防护: 强制 HTTPS ==========
if not url_lower.startswith('https://'):
if not (url_lower.startswith('http://localhost') or url_lower.startswith('http://127.0.0.1')):
if url_lower.startswith('http://'):
return False
# ========== SSRF 防护: 阻止私有 IP ==========
try:
from urllib.parse import urlparse
import ipaddress
parsed = urlparse(base_url)
host = parsed.hostname or ''
try:
ip = ipaddress.ip_address(host)
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved:
return False
except ValueError:
pass
for suffix in ['.local', '.internal', '.corp', '.lan', '.home']:
if host.endswith(suffix):
return False
except Exception:
return False
# 无效域名黑名单
invalid_domains = [
'docs.djangoproject.com',
'docs.python.org',
'developer.mozilla.org',
'stackoverflow.com',
'themoviedb.org',
'prisma.io',
'pris.ly',
'every.to',
'makersuite.google.com',
'/settings',
'/ref/',
'/docs/',
'/guide',
]
for invalid in invalid_domains:
if invalid in url_lower:
return False
return True
async def validate_openai(self, api_key: str, base_url: str) -> ValidationResult:
"""异步验证 OpenAI / 中转站 - 集成熍断器保护"""
# ========== 新增:预检查 base_url 有效性 ==========
if not self._is_likely_valid_relay(base_url):
return ValidationResult(KeyStatus.INVALID, "base_url 无效")
if not base_url:
base_url = config.default_base_urls["openai"]
# 熍断器检查
circuit_result = await self._check_circuit_breaker(base_url)
if circuit_result:
return circuit_result
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
session = await self._get_session()
proxy = self._get_proxy()
model_tier = "GPT-3.5"
rpm = 0
models_list = []
# Step 1: GET /models
for url in self._try_url_variants(base_url, "models"):
try:
async with session.get(url, headers=headers, proxy=proxy) as resp:
# 提取 RPM
rpm = int(resp.headers.get('x-ratelimit-limit-requests', 0))
if resp.status == 200:
# 记录成功
await self._record_circuit_result(url, success=True)
data = await resp.json()
models_list = [m.get("id", "") for m in data.get("data", [])]
# 检测高价值模型
for m in models_list:
if any(hv in m.lower() for hv in ['gpt-4', 'gpt-4o']):
model_tier = "GPT-4"
break
model_names = [m[:15] for m in models_list[:3]]
info = f"{len(models_list)}模型: {', '.join(model_names)}"
# RPM 透视标记
rpm_tier = ""
if rpm >= RPM_ENTERPRISE_THRESHOLD:
rpm_tier = "Enterprise"
elif rpm > 0 and rpm <= RPM_FREE_TRIAL_THRESHOLD:
rpm_tier = "Free Trial"
if rpm_tier:
info = f"{info} [{rpm_tier}]"
is_high = model_tier == "GPT-4" or rpm >= RPM_ENTERPRISE_THRESHOLD
return ValidationResult(KeyStatus.VALID, info, model_tier, rpm, 0.0, is_high)
elif resp.status == 429:
# 429 是应用层错误,不触发熍断
await self._record_circuit_result(url, http_status=429)
return ValidationResult(KeyStatus.QUOTA_EXCEEDED, "配额耗尽")
elif resp.status in CIRCUIT_BREAKER_HTTP_CODES:
# 502/503/504 网关错误
await self._record_circuit_result(url, http_status=resp.status)
continue
else:
# 其他状态码 (401/403 等)
await self._record_circuit_result(url, http_status=resp.status)
except asyncio.TimeoutError as e:
await self._record_circuit_result(url, error=e)
continue
except aiohttp.ClientConnectorError as e:
await self._record_circuit_result(url, error=e)
return ValidationResult(KeyStatus.CONNECTION_ERROR, "连接失败")
except Exception as e:
logger.debug(f"验证异常: {type(e).__name__}: {e}")
continue
# Step 2: POST /chat/completions (Fallback)
chat_body = {"model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 1}
for url in self._try_url_variants(base_url, "chat/completions"):
try:
async with session.post(url, headers=headers, json=chat_body, proxy=proxy) as resp:
if resp.status == 200:
await self._record_circuit_result(url, success=True)
return ValidationResult(KeyStatus.VALID, "有效(chat)", model_tier, rpm, 0.0, False)
elif resp.status == 429:
await self._record_circuit_result(url, http_status=429)
return ValidationResult(KeyStatus.QUOTA_EXCEEDED, "配额耗尽")
elif resp.status in CIRCUIT_BREAKER_HTTP_CODES:
await self._record_circuit_result(url, http_status=resp.status)
continue
else:
await self._record_circuit_result(url, http_status=resp.status)
except aiohttp.ClientConnectorError as e:
await self._record_circuit_result(url, error=e)
return ValidationResult(KeyStatus.CONNECTION_ERROR, "连接失败")
except asyncio.TimeoutError as e:
await self._record_circuit_result(url, error=e)
continue
except Exception as e:
logger.debug(f"验证异常: {type(e).__name__}: {e}")
continue
return ValidationResult(KeyStatus.INVALID, "认证失败")
async def validate_gemini(self, api_key: str, base_url: str) -> ValidationResult:
"""异步验证 Gemini - 集成熍断器保护"""
url = f"https://generativelanguage.googleapis.com/v1beta/models?key={api_key}"
# 熍断器检查(Gemini 域名在白名单中,不会被熍断)
circuit_result = await self._check_circuit_breaker(url)
if circuit_result:
return circuit_result
session = await self._get_session()
proxy = self._get_proxy()
try:
async with session.get(url, proxy=proxy) as resp:
if resp.status == 200:
await self._record_circuit_result(url, success=True)
data = await resp.json()
models = data.get("models", [])
# Gemini Pro 检测
has_pro = any('gemini-1.5-pro' in m.get('name', '').lower() for m in models)
tier = "Gemini-Pro" if has_pro else "Gemini"
return ValidationResult(KeyStatus.VALID, f"{len(models)}模型", tier, 0, 0.0, has_pro)
elif resp.status == 429:
await self._record_circuit_result(url, http_status=429)
return ValidationResult(KeyStatus.QUOTA_EXCEEDED, "Gemini配额耗尽")
elif resp.status in CIRCUIT_BREAKER_HTTP_CODES:
await self._record_circuit_result(url, http_status=resp.status)
return ValidationResult(KeyStatus.CONNECTION_ERROR, f"网关错误 {resp.status}")
else:
await self._record_circuit_result(url, http_status=resp.status)
return ValidationResult(KeyStatus.INVALID, f"HTTP {resp.status}")
except aiohttp.ClientConnectorError as e:
await self._record_circuit_result(url, error=e)
return ValidationResult(KeyStatus.CONNECTION_ERROR, "Gemini连接失败")
except asyncio.TimeoutError as e:
await self._record_circuit_result(url, error=e)
return ValidationResult(KeyStatus.CONNECTION_ERROR, "Gemini超时")
except Exception as e:
return ValidationResult(KeyStatus.INVALID, str(e)[:20])
async def validate_anthropic(self, api_key: str, base_url: str) -> ValidationResult:
"""
异步验证 Anthropic Claude Key - 集成熍断器保护
Anthropic 使用专用 Headers:
- x-api-key: API Key
- anthropic-version: API 版本
必须用 POST /v1/messages 验证(不支持 GET /models)
特殊处理:
- 400 + "credit balance is too low" → QUOTA_EXCEEDED(Key 有效但没钱)
- 401 → INVALID(认证失败)
"""
if not base_url:
base_url = config.default_base_urls["anthropic"]
# 熍断器检查
circuit_result = await self._check_circuit_breaker(base_url)
if circuit_result:
return circuit_result
# Anthropic 专用 Headers
headers = {
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json"
}
# 必须 POST 请求
body = {
"model": "claude-3-haiku-20240307",
"max_tokens": 1,
"messages": [{"role": "user", "content": "Hi"}]
}
session = await self._get_session()
proxy = self._get_proxy()
try:
url = f"{base_url.rstrip('/')}/v1/messages"
async with session.post(url, headers=headers, json=body, proxy=proxy) as resp:
response_text = await resp.text()
if resp.status == 200:
await self._record_circuit_result(url, success=True)
# 尝试解析模型信息
try:
data = await resp.json()
model_used = data.get("model", "claude-3")
# 检测是否为高价值模型 (Opus/Sonnet)
is_high = "opus" in model_used.lower() or "sonnet" in model_used.lower()
tier = "Claude-3-Opus" if "opus" in model_used.lower() else "Claude-3"
return ValidationResult(KeyStatus.VALID, "Claude有效", tier, 0, 0.0, is_high)
except Exception:
return ValidationResult(KeyStatus.VALID, "Claude有效", "Claude-3", 0, 0.0, True)
elif resp.status == 400:
# 关键:检查是否为余额不足(Key 有效但没钱)
await self._record_circuit_result(url, http_status=400)
if "credit" in response_text.lower() and "balance" in response_text.lower():
# "credit balance is too low" - Key 有效但配额耗尽
return ValidationResult(KeyStatus.QUOTA_EXCEEDED, "Claude余额不足", "Claude-3", 0, 0.0, False)
elif "billing" in response_text.lower():
# 账单问题也视为有效但无配额
return ValidationResult(KeyStatus.QUOTA_EXCEEDED, "Claude账单问题", "Claude-3", 0, 0.0, False)
else:
# 其他 400 错误(可能是请求格式问题,但 Key 可能有效)
return ValidationResult(KeyStatus.VALID, "有效(请求错误)", "Claude", 0, 0.0, False)
elif resp.status == 401:
# 认证失败 - Key 无效
await self._record_circuit_result(url, http_status=401)
return ValidationResult(KeyStatus.INVALID, "Claude认证失败")
elif resp.status == 403:
# 权限不足 - 可能 Key 有效但被禁用
await self._record_circuit_result(url, http_status=403)
if "disabled" in response_text.lower():
return ValidationResult(KeyStatus.INVALID, "Claude Key已禁用")
return ValidationResult(KeyStatus.QUOTA_EXCEEDED, "Claude权限受限", "Claude", 0, 0.0, False)
elif resp.status == 429:
await self._record_circuit_result(url, http_status=429)
return ValidationResult(KeyStatus.QUOTA_EXCEEDED, "Claude速率限制")
elif resp.status in CIRCUIT_BREAKER_HTTP_CODES:
await self._record_circuit_result(url, http_status=resp.status)
return ValidationResult(KeyStatus.CONNECTION_ERROR, f"网关错误 {resp.status}")
else:
await self._record_circuit_result(url, http_status=resp.status)
return ValidationResult(KeyStatus.INVALID, f"HTTP {resp.status}")
except aiohttp.ClientConnectorError as e:
await self._record_circuit_result(base_url, error=e)
return ValidationResult(KeyStatus.CONNECTION_ERROR, "Claude连接失败")
except asyncio.TimeoutError as e:
await self._record_circuit_result(base_url, error=e)
return ValidationResult(KeyStatus.CONNECTION_ERROR, "Claude超时")
except Exception as e:
return ValidationResult(KeyStatus.INVALID, str(e)[:20])
async def validate_azure(self, api_key: str, base_url: str) -> ValidationResult:
"""异步验证 Azure - 集成熍断器保护"""
if not base_url:
return ValidationResult(KeyStatus.UNVERIFIED, "缺少Endpoint")
# 熍断器检查(Azure 域名在白名单中,不会被熍断)
circuit_result = await self._check_circuit_breaker(base_url)
if circuit_result:
return circuit_result
headers = {"api-key": api_key, "Content-Type": "application/json"}
session = await self._get_session()
proxy = self._get_proxy()
try:
url = f"{base_url.rstrip('/')}/openai/deployments?api-version=2023-05-15"
async with session.get(url, headers=headers, proxy=proxy) as resp:
if resp.status == 200:
await self._record_circuit_result(url, success=True)
return ValidationResult(KeyStatus.VALID, "Azure有效", "Azure-GPT", 0, 0.0, True)
elif resp.status == 429:
await self._record_circuit_result(url, http_status=429)
return ValidationResult(KeyStatus.QUOTA_EXCEEDED, "Azure配额耗尽")
elif resp.status in CIRCUIT_BREAKER_HTTP_CODES:
await self._record_circuit_result(url, http_status=resp.status)
return ValidationResult(KeyStatus.CONNECTION_ERROR, f"网关错误 {resp.status}")
else:
await self._record_circuit_result(url, http_status=resp.status)
return ValidationResult(KeyStatus.INVALID, f"HTTP {resp.status}")
except aiohttp.ClientConnectorError as e:
await self._record_circuit_result(base_url, error=e)
return ValidationResult(KeyStatus.CONNECTION_ERROR, "Azure连接失败")
except asyncio.TimeoutError as e:
await self._record_circuit_result(base_url, error=e)
return ValidationResult(KeyStatus.CONNECTION_ERROR, "Azure超时")
except Exception as e:
return ValidationResult(KeyStatus.INVALID, str(e)[:20])
async def probe_gpt4(self, api_key: str, base_url: str) -> bool:
"""探测是否支持 GPT-4"""
if not base_url:
base_url = config.default_base_urls["openai"]
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
body = {"model": "gpt-4", "messages": [{"role": "user", "content": "1"}], "max_tokens": 1}
session = await self._get_session()
proxy = self._get_proxy()
for url in self._try_url_variants(base_url, "chat/completions"):
try:
async with session.post(url, headers=headers, json=body, proxy=proxy) as resp:
return resp.status == 200
except Exception as e:
logger.debug(f"验证异常: {type(e).__name__}: {e}")
continue
return False
async def probe_billing(self, api_key: str, base_url: str) -> dict:
"""
探测中转站/官方 API 余额
返回:
{
'balance': float, # 余额 (USD)
'used': float, # 已使用
'limit': float, # 总额度
'source': str # 余额来源
}
"""
result = {'balance': 0.0, 'used': 0.0, 'limit': 0.0, 'source': ''}
headers = {"Authorization": f"Bearer {api_key}"}
session = await self._get_session()
proxy = self._get_proxy()
# ========== 1. OpenAI 官方余额检测 ==========
if not base_url or "api.openai.com" in base_url:
# 官方 API 余额查询需要 organization header
# 但大多数泄露的 Key 没有这个信息,跳过
return result
# ========== 2. 中转站余额检测 ==========
billing_endpoints = [
# one-api / new-api 格式 (最常见)
{
'path': '/api/user/self',
'fields': ['quota', 'used_quota', 'data.quota', 'data.used_quota']
},
{
'path': '/api/user/info',
'fields': ['quota', 'used_quota', 'balance', 'data.quota']
},
# OpenAI 兼容格式
{
'path': '/dashboard/billing/subscription',
'fields': ['hard_limit_usd', 'soft_limit_usd', 'system_hard_limit_usd']
},
{
'path': '/v1/dashboard/billing/subscription',
'fields': ['hard_limit_usd', 'soft_limit_usd']
},
{
'path': '/dashboard/billing/credit_grants',
'fields': ['total_granted', 'total_used', 'total_available']
},
# 其他中转站
{
'path': '/user/info',
'fields': ['balance', 'quota', 'credits', 'remaining']
},
{
'path': '/api/status',
'fields': ['quota', 'balance', 'credits']
},
]
for endpoint in billing_endpoints:
try:
url = f"{base_url.rstrip('/')}{endpoint['path']}"
async with session.get(url, headers=headers, proxy=proxy) as resp:
if resp.status == 200:
data = await resp.json()
balance = self._extract_balance_from_response(data, endpoint['fields'])
if balance > 0:
result['balance'] = balance
result['source'] = endpoint['path']
return result
except Exception as e:
logger.debug(f"验证异常: {type(e).__name__}: {e}")
continue
return result
def _extract_balance_from_response(self, data: dict, fields: list) -> float:
"""
从响应数据中提取余额
支持嵌套字段如 'data.quota'
"""
for field in fields:
try:
value = data
for key in field.split('.'):
if isinstance(value, dict):
value = value.get(key)
else:
value = None
break
if value is not None and value != 0:
balance = float(value)
# one-api 的 quota 单位是 "500000" 表示 $5
# 需要除以 100000 转换为美元
if balance > 10000:
balance = balance / 100000
return balance
except (ValueError, TypeError, AttributeError):
continue
return 0.0
async def probe_quota_by_request(self, api_key: str, base_url: str) -> dict:
"""
通过实际请求测试额度
这是最准确的方法:尝试发送一个最小请求
返回:
{
'has_quota': bool, # 是否有额度
'error_type': str, # 错误类型 (quota/rate_limit/auth/other)
'message': str # 详细信息
}
"""
if not base_url:
base_url = config.default_base_urls["openai"]
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
session = await self._get_session()
proxy = self._get_proxy()
# 最小化请求:1 token
chat_body = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "1"}],
"max_tokens": 1
}
for url in self._try_url_variants(base_url, "chat/completions"):
try:
async with session.post(url, headers=headers, json=chat_body, proxy=proxy) as resp:
response_text = await resp.text()
if resp.status == 200:
return {'has_quota': True, 'error_type': None, 'message': '有额度'}
elif resp.status == 429:
# 区分配额耗尽 vs 速率限制
if 'quota' in response_text.lower() or 'exceeded' in response_text.lower():
return {'has_quota': False, 'error_type': 'quota', 'message': '配额耗尽'}
elif 'rate' in response_text.lower():
# 速率限制说明 Key 有效,只是请求太快
return {'has_quota': True, 'error_type': 'rate_limit', 'message': '速率限制(有额度)'}
else:
return {'has_quota': False, 'error_type': 'quota', 'message': '429错误'}
elif resp.status == 401:
return {'has_quota': False, 'error_type': 'auth', 'message': '认证失败'}
elif resp.status == 402:
# Payment Required - 明确表示没钱
return {'has_quota': False, 'error_type': 'quota', 'message': '需要付款'}
elif resp.status == 400:
# 检查是否为余额不足
if 'insufficient' in response_text.lower() or 'quota' in response_text.lower():
return {'has_quota': False, 'error_type': 'quota', 'message': '余额不足'}
# 其他 400 错误可能是请求格式问题,Key 可能有效
return {'has_quota': True, 'error_type': 'other', 'message': '请求错误'}
else:
return {'has_quota': False, 'error_type': 'other', 'message': f'HTTP {resp.status}'}
except asyncio.TimeoutError:
continue
except aiohttp.ClientConnectorError:
return {'has_quota': False, 'error_type': 'connection', 'message': '连接失败'}
except Exception as e:
logger.debug(f"验证异常: {type(e).__name__}: {e}")
continue
return {'has_quota': False, 'error_type': 'other', 'message': '无法验证'}
# ========================================================================
# 新增平台验证方法
# ========================================================================
async def validate_huggingface(self, api_key: str, base_url: str) -> ValidationResult:
"""验证 Hugging Face API Key"""
if not base_url:
base_url = "https://api-inference.huggingface.co"
headers = {"Authorization": f"Bearer {api_key}"}
proxy = config.proxy_url or None
session = await self._get_session()
try:
async with session.get(
f"{base_url.rstrip('/')}/models/gpt2",
headers=headers, proxy=proxy
) as resp:
if resp.status == 200:
return ValidationResult(KeyStatus.VALID, "HuggingFace 有效")
elif resp.status == 401:
return ValidationResult(KeyStatus.INVALID, "无效")
elif resp.status == 429:
return ValidationResult(KeyStatus.QUOTA_EXCEEDED, "配额耗尽")
except Exception as e:
logger.debug(f"HuggingFace 验证异常: {e}")
return ValidationResult(KeyStatus.CONNECTION_ERROR, "连接失败")
async def validate_groq(self, api_key: str, base_url: str) -> ValidationResult:
"""验证 Groq API Key"""
if not base_url:
base_url = "https://api.groq.com/openai/v1"
headers = {"Authorization": f"Bearer {api_key}"}
proxy = config.proxy_url or None
session = await self._get_session()
try:
async with session.get(
f"{base_url.rstrip('/')}/models",
headers=headers, proxy=proxy
) as resp:
if resp.status == 200:
return ValidationResult(KeyStatus.VALID, "Groq 有效")
elif resp.status == 401:
return ValidationResult(KeyStatus.INVALID, "无效")
elif resp.status == 429:
return ValidationResult(KeyStatus.QUOTA_EXCEEDED, "配额耗尽")
except Exception as e:
logger.debug(f"Groq 验证异常: {e}")
return ValidationResult(KeyStatus.CONNECTION_ERROR, "连接失败")
async def validate_deepseek(self, api_key: str, base_url: str) -> ValidationResult:
"""验证 DeepSeek API Key"""
if not base_url:
base_url = "https://api.deepseek.com"
headers = {"Authorization": f"Bearer {api_key}"}
proxy = config.proxy_url or None
session = await self._get_session()
try:
async with session.get(
f"{base_url.rstrip('/')}/models",
headers=headers, proxy=proxy
) as resp:
if resp.status == 200:
return ValidationResult(KeyStatus.VALID, "DeepSeek 有效")
elif resp.status == 401:
return ValidationResult(KeyStatus.INVALID, "无效")
elif resp.status == 429:
return ValidationResult(KeyStatus.QUOTA_EXCEEDED, "配额耗尽")
except Exception as e:
logger.debug(f"DeepSeek 验证异常: {e}")
return ValidationResult(KeyStatus.CONNECTION_ERROR, "连接失败")