Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions .github/workflows/code-coverage.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
name: Code Coverage
name: E2E Tests and Code Coverage

permissions:
contents: read
id-token: write

on: [pull_request, workflow_dispatch]
on:
push:
branches:
- main
pull_request:
workflow_dispatch:

jobs:
test-with-coverage:
Expand Down Expand Up @@ -32,25 +37,16 @@ jobs:
with:
python-version: "3.10"
install-args: "--all-extras"
- name: Run parallel tests with coverage
- name: Run all tests with coverage
continue-on-error: false
run: |
poetry run pytest tests/unit tests/e2e \
-m "not serial" \
-n auto \
-n 4 \
--dist=loadgroup \
--cov=src \
--cov-report=xml \
--cov-report=term \
-v
- name: Run telemetry tests with coverage (isolated)
Comment thread
vikrantpuppala marked this conversation as resolved.
continue-on-error: false
run: |
poetry run pytest tests/e2e/test_concurrent_telemetry.py \
--cov=src \
--cov-append \
--cov-report=xml \
--cov-report=term \
-v
- name: Check for coverage override
id: override
env:
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/code-quality-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
install-args: "--all-extras"
cache-path: ".venv-pyarrow"
cache-suffix: "pyarrow-${{ matrix.dependency-version }}-"
- name: Install Python tools for custom versions
if: matrix.dependency-version != 'default'
Expand Down
58 changes: 0 additions & 58 deletions .github/workflows/daily-telemetry-e2e.yml

This file was deleted.

71 changes: 0 additions & 71 deletions .github/workflows/integration.yml

This file was deleted.

158 changes: 31 additions & 127 deletions tests/e2e/common/large_queries_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,139 +2,43 @@
import math
import time

import pytest

log = logging.getLogger(__name__)


class LargeQueriesMixin:
def fetch_rows(test_case, cursor, row_count, fetchmany_size):
"""
This mixin expects to be mixed with a CursorTest-like class
A generator for rows. Fetches until the end or up to 5 minutes.
"""

def fetch_rows(self, cursor, row_count, fetchmany_size):
"""
A generator for rows. Fetches until the end or up to 5 minutes.
"""
# TODO: Remove fetchmany_size when we have fixed the performance issues with fetchone
# in the Python client
max_fetch_time = 5 * 60 # Fetch for at most 5 minutes

rows = self.get_some_rows(cursor, fetchmany_size)
start_time = time.time()
n = 0
while rows:
for row in rows:
n += 1
yield row
if time.time() - start_time >= max_fetch_time:
log.warning("Fetching rows timed out")
break
rows = self.get_some_rows(cursor, fetchmany_size)
if not rows:
# Read all the rows, row_count should match
self.assertEqual(n, row_count)

num_fetches = max(math.ceil(n / 10000), 1)
latency_ms = int((time.time() - start_time) * 1000 / num_fetches), 1
print(
"Fetched {} rows with an avg latency of {} per fetch, ".format(
n, latency_ms
)
+ "assuming 10K fetch size."
max_fetch_time = 5 * 60 # Fetch for at most 5 minutes

rows = _get_some_rows(cursor, fetchmany_size)
start_time = time.time()
n = 0
while rows:
for row in rows:
n += 1
yield row
if time.time() - start_time >= max_fetch_time:
log.warning("Fetching rows timed out")
break
rows = _get_some_rows(cursor, fetchmany_size)
if not rows:
# Read all the rows, row_count should match
test_case.assertEqual(n, row_count)

num_fetches = max(math.ceil(n / 10000), 1)
latency_ms = int((time.time() - start_time) * 1000 / num_fetches), 1
print(
"Fetched {} rows with an avg latency of {} per fetch, ".format(
n, latency_ms
)

@pytest.mark.parametrize(
"extra_params",
[
{},
{"use_sea": True},
],
+ "assuming 10K fetch size."
)
def test_query_with_large_wide_result_set(self, extra_params):
resultSize = 300 * 1000 * 1000 # 300 MB
width = 8192 # B
rows = resultSize // width
cols = width // 36

# Set the fetchmany_size to get 10MB of data a go
fetchmany_size = 10 * 1024 * 1024 // width
# This is used by PyHive tests to determine the buffer size
self.arraysize = 1000
with self.cursor(extra_params) as cursor:
for lz4_compression in [False, True]:
cursor.connection.lz4_compression = lz4_compression
uuids = ", ".join(["uuid() uuid{}".format(i) for i in range(cols)])
cursor.execute(
"SELECT id, {uuids} FROM RANGE({rows})".format(
uuids=uuids, rows=rows
)
)
assert lz4_compression == cursor.active_result_set.lz4_compressed
for row_id, row in enumerate(
self.fetch_rows(cursor, rows, fetchmany_size)
):
assert row[0] == row_id # Verify no rows are dropped in the middle.
assert len(row[1]) == 36

@pytest.mark.parametrize(
"extra_params",
[
{},
{"use_sea": True},
],
)
def test_query_with_large_narrow_result_set(self, extra_params):
resultSize = 300 * 1000 * 1000 # 300 MB
width = 8 # sizeof(long)
rows = resultSize / width

# Set the fetchmany_size to get 10MB of data a go
fetchmany_size = 10 * 1024 * 1024 // width
# This is used by PyHive tests to determine the buffer size
self.arraysize = 10000000
with self.cursor(extra_params) as cursor:
cursor.execute("SELECT * FROM RANGE({rows})".format(rows=rows))
for row_id, row in enumerate(self.fetch_rows(cursor, rows, fetchmany_size)):
assert row[0] == row_id

@pytest.mark.parametrize(
"extra_params",
[
{},
{"use_sea": True},
],
)
def test_long_running_query(self, extra_params):
"""Incrementally increase query size until it takes at least 3 minutes,
and asserts that the query completes successfully.
"""
minutes = 60
min_duration = 3 * minutes

duration = -1
scale0 = 10000
scale_factor = 1
with self.cursor(extra_params) as cursor:
while duration < min_duration:
assert scale_factor < 4096, "Detected infinite loop"
start = time.time()

cursor.execute(
"""SELECT count(*)
FROM RANGE({scale}) x
JOIN RANGE({scale0}) y
ON from_unixtime(x.id * y.id, "yyyy-MM-dd") LIKE "%not%a%date%"
""".format(
scale=scale_factor * scale0, scale0=scale0
)
)

(n,) = cursor.fetchone()
assert n == 0

duration = time.time() - start
current_fraction = duration / min_duration
print("Took {} s with scale factor={}".format(duration, scale_factor))
# Extrapolate linearly to reach 3 min and add 50% padding to push over the limit
scale_factor = math.ceil(1.5 * scale_factor / current_fraction)
def _get_some_rows(cursor, fetchmany_size):
row = cursor.fetchone()
if row:
return [row]
else:
return None
Loading
Loading