Skip to content

Commit 33090b2

Browse files
authored
Merge pull request #5 from FindHotel/s3-upload
DIN-34 Add S3 transport
2 parents 8382a5f + 02b37d9 commit 33090b2

11 files changed

Lines changed: 309 additions & 28 deletions

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ dist
66
MANIFEST
77
build
88
.eggs
9-
.env
9+
.env
10+
.pytest_cache

.travis.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
language: python
2+
sudo: required
3+
dist: xenial
24
python:
3-
- "3.3"
4-
- "3.4"
55
- "3.5"
6+
- "3.6"
7+
- "3.7"
68
install:
79
- "pip install ."
810
script: make test

HISTORY.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
1.5.0 / 2018-12-14
2+
==================
3+
4+
* Add S3 transport to upload files directly to S3.
5+
16
1.3.1 / 2018-01-06
27
==================
38

README.md

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
analytics-python
2-
==============
1+
# analytics-python
32

43
[![Build Status](https://travis-ci.org/FindHotel/analytics-python.svg?branch=master)](https://travis-ci.org/FindHotel/analytics-python)
54

@@ -9,27 +8,87 @@ analytics-python is a python client is a slightly modified version of [Segment's
98

109
## Usage
1110

11+
The documentation for Segment's Python SDK that this repository is based on
12+
is available at [https://segment.com/libraries/python](https://segment.com/libraries/python).
13+
Check Segment's docs to get familiar with the API.
14+
15+
You can package directly, in this case default `http` transport will be used:
16+
1217
```python
1318
import analytics
1419

1520
# This key will be passed in the `x-api-key` header of every request
1621
analytics.write_key='AWS_API_GATEWAY_KEY'
1722

1823
# The custom endpoint to where the events will be delivered to
19-
analytics.endpoint='https://polku.fih.io/dev/[hookname]'
24+
analytics.endpoint='https://segment.fih.io/v1/[endpoint-key]'
2025

2126
analytics.track('kljsdgs99', 'SignedUp', {'plan': 'Enterprise'})
27+
analytics.flush()
2228
```
2329

30+
Use client with custom error handling function:
2431

25-
## More information
32+
```python
2633

27-
The documentation for Segment's Python SDK that this repository is based on is available at [https://segment.com/libraries/python](https://segment.com/libraries/python). You can use Segment's docs to get familiar with the API.
34+
import analytics
2835

36+
ANALYTICS_WRITE_KEY='AWS_API_GATEWAY_KEY'
37+
ANALYTICS_ENDPOINT='https://segment.fih.io/v1/[endpoint-key]'
2938

30-
## License
39+
def log_error(e, batch):
40+
print("exception: {}, batch: {}".format(e, batch), flush=True)
41+
42+
client = analytics.Client(
43+
endpoint=ANALYTICS_ENDPOINT,
44+
write_key=ANALYTICS_WRITE_KEY,
45+
debug=analytics.debug,
46+
on_error=log_error,
47+
send=analytics.send,
48+
max_queue_size=analytics.max_queue_size,
49+
upload_size=analytics.upload_size
50+
)
3151

52+
client.track(...)
53+
client.flush()
3254
```
55+
56+
### Using S3 transport
57+
58+
When using `s3` transport SDK will upload data directly to AWS S3 bypassing http interface.
59+
60+
```python
61+
62+
MB = 1024*1024
63+
64+
c = Client(
65+
write_key="write-key",
66+
endpoint="https://segment.fih.io/v1/[endpoint-key]",
67+
upload_size=1*MB,
68+
transport='s3',
69+
max_queue_size=1000000,
70+
)
71+
72+
for i in range(30000):
73+
c.track(
74+
user_id='pavel',
75+
event='UUIDGenerated',
76+
properties=dict(id=str(uuid.uuid4()), counter=i)
77+
)
78+
if i % 10000 == 0:
79+
c.flush()
80+
81+
c.flush()
82+
assert False
83+
```
84+
85+
## More information
86+
87+
The documentation for Segment's Python SDK that this repository is based on is available at [https://segment.com/libraries/python](https://segment.com/libraries/python). You can use Segment's docs to get familiar with the API.
88+
89+
## License
90+
91+
```txt
3392
WWWWWW||WWWWWW
3493
W W W||W W W
3594
||

analytics/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"""Settings."""
88
write_key = None
99
endpoint = 'https://api.segment.io/v1/batch'
10+
transport = 'http'
1011
max_queue_size = 10000
1112
upload_size = 100
1213
on_error = None
@@ -55,7 +56,7 @@ def _proxy(method, *args, **kwargs):
5556
default_client = Client(write_key, debug=debug, on_error=on_error,
5657
send=send, endpoint=endpoint,
5758
max_queue_size=max_queue_size,
58-
upload_size=upload_size)
59+
upload_size=upload_size, transport=transport)
5960

6061
fn = getattr(default_client, method)
6162
fn(*args, **kwargs)

analytics/client.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from analytics.utils import guess_timezone, clean
1212
from analytics.consumer import Consumer
13+
from analytics.s3_consumer import S3Consumer
1314
from analytics.version import VERSION
1415

1516
try:
@@ -22,22 +23,37 @@
2223

2324

2425
class Client(object):
25-
"""Create a new Segment client."""
26+
"""Create a new Segment client.
27+
28+
upload_size has different meaning, depending on chosen transport.
29+
For http transport upload_size means number of items to be batched
30+
in a single POST request to backend.
31+
For s3 transport upload_size means size in bytes of _uncompressed_
32+
partition of the data. Sane default value is between 10 and 100 MB
33+
depending on compressability of underlying data.
34+
"""
2635
log = logging.getLogger('segment')
2736

2837
def __init__(self, write_key=None, debug=False, max_queue_size=10000,
29-
send=True, on_error=None, endpoint=None, upload_size=100):
38+
send=True, on_error=None, endpoint=None, upload_size=100,
39+
transport='http'):
3040
require('write_key', write_key, string_types)
31-
3241
self.queue = queue.Queue(max_queue_size)
33-
self.consumer = Consumer(self.queue, write_key, endpoint=endpoint,
34-
on_error=on_error, upload_size=upload_size)
3542
self.write_key = write_key
3643
self.endpoint = endpoint
3744
self.on_error = on_error
3845
self.debug = debug
3946
self.send = send
4047

48+
if transport == 'http':
49+
self.consumer = Consumer(self.queue, write_key, endpoint=endpoint,
50+
on_error=on_error, upload_size=upload_size)
51+
elif transport == 's3':
52+
self.consumer = S3Consumer(self.queue, write_key, endpoint=endpoint,
53+
on_error=on_error, upload_size=upload_size)
54+
else:
55+
raise ValueError("transport should be either http or s3")
56+
4157
if debug:
4258
self.log.setLevel(logging.DEBUG)
4359

analytics/consumer.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
import logging
22
from threading import Thread
3+
from queue import Empty
34

45
import analytics
5-
from analytics.version import VERSION
66
from analytics.request import post
77

8-
try:
9-
from queue import Empty
10-
except:
11-
from Queue import Empty
128

139
class Consumer(Thread):
1410
"""Consumes the messages from the client's queue."""
@@ -59,7 +55,7 @@ def upload(self):
5955
self.on_error(e, batch)
6056
finally:
6157
# mark items as acknowledged from queue
62-
for item in batch:
58+
for _ in batch:
6359
self.queue.task_done()
6460
return success
6561

analytics/request.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,30 @@
1010

1111
_session = sessions.Session()
1212

13+
@retry(wait_exponential_multiplier=500, wait_exponential_max=5000,
14+
stop_max_delay=20000)
15+
def get(write_key, endpoint):
16+
log = logging.getLogger('segment')
17+
headers = {
18+
'content-type': 'application/json',
19+
'x-api-key': write_key,
20+
}
21+
res = _session.get(endpoint, headers=headers, timeout=15)
22+
23+
if res.status_code == 200:
24+
log.debug('get request is successful')
25+
return res.json()
26+
27+
try:
28+
payload = res.json()
29+
log.debug('received response: %s', payload)
30+
raise APIError(
31+
res.status_code,
32+
payload.get('code', '???'),
33+
payload.get('message', '???'))
34+
except ValueError:
35+
raise APIError(res.status_code, 'unknown', res.text)
36+
1337

1438
@retry(wait_exponential_multiplier=500, wait_exponential_max=5000,
1539
stop_max_delay=20000)
@@ -21,7 +45,10 @@ def post(write_key, endpoint, **kwargs):
2145
body["sentAt"] = int(time.time()*1000)
2246
auth = HTTPBasicAuth(write_key, '')
2347
data = json.dumps(body, cls=DatetimeSerializer)
24-
headers = { 'content-type': 'application/json', 'x-api-key': write_key }
48+
headers = {
49+
'content-type': 'application/json',
50+
'x-api-key': write_key,
51+
}
2552
log.debug('making request: %s', data)
2653
res = _session.post(endpoint, data=data, auth=auth, headers=headers, timeout=15)
2754

0 commit comments

Comments
 (0)