forked from puxxustc/invlib
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlib_util.py
More file actions
131 lines (104 loc) · 3.04 KB
/
lib_util.py
File metadata and controls
131 lines (104 loc) · 3.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2020 - , puxxustc
import datetime
import multiprocessing
import multiprocessing.dummy
import threading
from wcwidth import wcswidth
import requests
def has_key(data, key):
value = data
for segment in key.split('.'):
if segment not in value:
return False
value = value[segment]
return True
def get_key(data, key):
value = data
for segment in key.split('.'):
if segment not in value:
return None
value = value[segment]
return value
def wpad(s, width):
return s + ' ' * (width - wcswidth(s))
def wcut(s, width):
s = wpad(s, width)
for i in range(width // 2, width):
_s = s[:i]
w = wcswidth(_s)
if w == width:
return _s
elif w > width:
return _s[:-1] + ' '
return s
def grace_format(fmt, data, key):
value = get_key(data, key)
if value is None:
try:
s = fmt % 0
return ' ' * len(s)
except TypeError:
pass
try:
s = fmt % ''
return ' ' * len(s)
except TypeError:
pass
return ''
else:
return fmt % value
def day2ts(day):
if isinstance(day, str):
return datetime.datetime.strptime(day, '%Y-%m-%d').timestamp()
elif isinstance(day, tuple):
return datetime.datetime(*day).timestamp()
else:
return day.timestamp()
def day2msts(day):
return day2ts(day) * 1000
# 并发执行
def parallel_call(max_workers, call_data):
def call_func(_args):
func, args, kwargs = _args
return func(*args, **kwargs)
pool = multiprocessing.dummy.Pool(max_workers)
result = pool.map(call_func, call_data)
return result
#
# 带有连接池的 requests 包装
#
class HttpApi(object):
def __init__(self, **kwargs):
self._session_lock = threading.Lock()
self.init_session()
def init_session(self):
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=100,
max_retries=3)
session.mount('http://', adapter)
session.mount('https://', adapter)
self._session = session
self._session_pid = multiprocessing.current_process().pid
# print('HttpApi: init session %d %s' % (self._session_pid, self._session))
@property
def s(self):
while True:
pid = multiprocessing.current_process().pid
if self._session_pid == pid:
return self._session
locked = self._session_lock.acquire(timeout=0.1)
if locked:
if self._session_pid == pid:
return self._session
self.init_session()
self._session_lock.release()
def __getattr__(self, method):
def wrapper(*args, **kwargs):
fun = getattr(self.s, method)
return fun(*args, **kwargs)
return wrapper
httpapi = HttpApi()