|
| 1 | +import gzip |
| 2 | +import json |
| 3 | +import os |
| 4 | +import os.path |
| 5 | + |
| 6 | +import appdirs |
| 7 | +import requests |
| 8 | + |
| 9 | +base_url = "https://www.https-rulesets.org/v1/" |
| 10 | +_ts = None |
| 11 | + |
| 12 | + |
| 13 | +def _storage_location(filename=None, timestamp=None): |
| 14 | + cache_dir = appdirs.user_cache_dir("https-everywhere-py") |
| 15 | + os.makedirs(cache_dir, exist_ok=True) |
| 16 | + |
| 17 | + if timestamp: |
| 18 | + filename = "default.rulesets.{}".format(timestamp) |
| 19 | + |
| 20 | + if filename: |
| 21 | + return os.path.join(cache_dir, filename) |
| 22 | + |
| 23 | + return cache_dir |
| 24 | + |
| 25 | + |
| 26 | +def fetch_channel_ts(): |
| 27 | + ruleset_ts_url = base_url + "latest-rulesets-timestamp" |
| 28 | + r = requests.get(ruleset_ts_url) |
| 29 | + r.raise_for_status() |
| 30 | + ts_string = r.text |
| 31 | + return int(ts_string) |
| 32 | + |
| 33 | + |
| 34 | +def _get_local_ts(): |
| 35 | + global _ts |
| 36 | + if not _ts: # pragma: no cover |
| 37 | + _ts = fetch_channel_ts() |
| 38 | + return _ts |
| 39 | + |
| 40 | + |
| 41 | +def _get_local(timestamp=None): |
| 42 | + if not timestamp: |
| 43 | + timestamp = _get_local_ts() # pragma: no cover |
| 44 | + location = _storage_location(timestamp=timestamp) |
| 45 | + if os.path.exists(location): |
| 46 | + with open(location) as f: |
| 47 | + return json.load(f) |
| 48 | + |
| 49 | + |
| 50 | +def fetch_update(timestamp=None): |
| 51 | + if not timestamp: # pragma: no cover |
| 52 | + timestamp = _get_local_ts() |
| 53 | + data = _get_local(timestamp) |
| 54 | + if data: |
| 55 | + return data |
| 56 | + ruleset_url = "{}rulesets-signature.{}.sha256".format(base_url, timestamp) |
| 57 | + r = requests.get(ruleset_url) |
| 58 | + r.raise_for_status() |
| 59 | + ruleset_url = "{}default.rulesets.{}.gz".format(base_url, timestamp) |
| 60 | + r = requests.get( |
| 61 | + ruleset_url, headers={"Accept-Encoding": "gzip, deflate, br"}, stream=True |
| 62 | + ) |
| 63 | + r.raise_for_status() |
| 64 | + location = _storage_location(timestamp=timestamp) |
| 65 | + data = gzip.GzipFile(fileobj=r.raw).read() |
| 66 | + with open(location, "wb") as f: |
| 67 | + f.write(data) |
| 68 | + return json.loads(data) |
0 commit comments