Skip to content

Commit 96ccce5

Browse files
authored
Merge pull request #1593 from Altinity/fix_query_cancelled_not_releasing_the_lock
Export Partition - release the part lock when the query is cancelled
2 parents aeaa6b7 + 022d159 commit 96ccce5

2 files changed

Lines changed: 184 additions & 8 deletions

File tree

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -354,20 +354,13 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
354354
size_t max_retries
355355
)
356356
{
357-
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed, will now increment counters", part_name);
357+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed", part_name);
358358

359359
if (!exception)
360360
{
361361
throw Exception(ErrorCodes::LOGICAL_ERROR, "ExportPartition scheduler task: No exception provided for error handling. Sounds like a bug");
362362
}
363363

364-
/// Early exit if the query was cancelled - no need to increment error counts
365-
if (exception->code() == ErrorCodes::QUERY_WAS_CANCELLED)
366-
{
367-
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export was cancelled, skipping error handling", part_name);
368-
return;
369-
}
370-
371364
Coordination::Stat locked_by_stat;
372365
std::string locked_by;
373366

@@ -385,6 +378,40 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
385378
return;
386379
}
387380

381+
/// Early exit if the query was cancelled - no need to increment error counts
382+
if (exception->code() == ErrorCodes::QUERY_WAS_CANCELLED)
383+
{
384+
/// Releasing the lock is important because a query can be cancelled due to SYSTEM STOP MOVES. If this is the case,
385+
/// other replicas should still be able to export this individual part. That's why there is a retry loop here.
386+
/// It is very unlikely this will be a problem in practice. The lock is ephemeral, which means it is automatically released
387+
/// if ClickHouse loses connection to ZooKeeper
388+
std::size_t retry_count = 0;
389+
static constexpr std::size_t max_lock_release_retries = 3;
390+
while (retry_count < max_lock_release_retries)
391+
{
392+
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
393+
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove);
394+
395+
const auto removal_code = zk->tryRemove(export_path / "locks" / part_name, locked_by_stat.version);
396+
397+
if (Coordination::Error::ZOK == removal_code)
398+
{
399+
break;
400+
}
401+
402+
if (Coordination::Error::ZBADVERSION == removal_code)
403+
{
404+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} lock version mismatch, will not increment error counts", part_name);
405+
break;
406+
}
407+
408+
retry_count++;
409+
}
410+
411+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export was cancelled, skipping error handling", part_name);
412+
return;
413+
}
414+
388415
Coordination::Requests ops;
389416

390417
const auto processing_part_path = processing_parts_path / part_name;

tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,3 +1378,152 @@ def test_sharded_export_partition_default_pattern(cluster):
13781378

13791379
# only one file with 3 rows should be present
13801380
assert int(total_count) == 3, f"Expected 3 rows, got {total_count}"
1381+
1382+
1383+
def test_export_partition_scheduler_skipped_when_moves_stopped(cluster):
1384+
node = cluster.instances["replica1"]
1385+
1386+
uid = str(uuid.uuid4()).replace("-", "_")
1387+
mt_table = f"sched_skip_mt_{uid}"
1388+
s3_table = f"sched_skip_s3_{uid}"
1389+
1390+
create_tables_and_insert_data(node, mt_table, s3_table, "replica1")
1391+
1392+
node.query(f"SYSTEM STOP MOVES {mt_table}")
1393+
1394+
node.query(
1395+
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}"
1396+
)
1397+
1398+
wait_for_export_to_start(node, mt_table, s3_table, "2020")
1399+
1400+
# Wait for several scheduler cycles (each fires every 5 s).
1401+
# If the guard is missing the scheduler would run and data would land in S3.
1402+
time.sleep(10)
1403+
1404+
status = node.query(
1405+
f"SELECT status FROM system.replicated_partition_exports"
1406+
f" WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}'"
1407+
f" AND partition_id = '2020'"
1408+
).strip()
1409+
1410+
assert status == "PENDING", (
1411+
f"Expected PENDING while moves are stopped, got '{status}'"
1412+
)
1413+
1414+
row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip())
1415+
assert row_count == 0, (
1416+
f"Expected 0 rows in S3 while scheduler is skipped, got {row_count}"
1417+
)
1418+
1419+
node.query(f"SYSTEM START MOVES {mt_table}")
1420+
1421+
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED", timeout=60)
1422+
1423+
row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip())
1424+
assert row_count == 3, f"Expected 3 rows in S3 after export completed, got {row_count}"
1425+
1426+
1427+
def test_export_partition_resumes_after_stop_moves(cluster):
1428+
node = cluster.instances["replica1"]
1429+
1430+
uid = str(uuid.uuid4()).replace("-", "_")
1431+
mt_table = f"stop_moves_before_mt_{uid}"
1432+
s3_table = f"stop_moves_before_s3_{uid}"
1433+
1434+
create_tables_and_insert_data(node, mt_table, s3_table, "replica1")
1435+
1436+
node.query(f"SYSTEM STOP MOVES {mt_table}")
1437+
1438+
node.query(
1439+
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}"
1440+
f" SETTINGS export_merge_tree_partition_max_retries = 50"
1441+
)
1442+
1443+
wait_for_export_to_start(node, mt_table, s3_table, "2020")
1444+
1445+
# Give the scheduler enough time to attempt (and cancel) the part task at
1446+
# least once, exercising the lock-release code path.
1447+
time.sleep(5)
1448+
1449+
status = node.query(
1450+
f"SELECT status FROM system.replicated_partition_exports"
1451+
f" WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}'"
1452+
f" AND partition_id = '2020'"
1453+
).strip()
1454+
assert status == "PENDING", f"Expected PENDING while moves are stopped, got '{status}'"
1455+
1456+
row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip())
1457+
assert row_count == 0, f"Expected 0 rows in S3 while moves are stopped, got {row_count}"
1458+
1459+
node.query(f"SYSTEM START MOVES {mt_table}")
1460+
1461+
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED", timeout=60)
1462+
1463+
row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip())
1464+
assert row_count == 3, f"Expected 3 rows in S3 after export completed, got {row_count}"
1465+
1466+
1467+
def test_export_partition_resumes_after_stop_moves_during_export(cluster):
1468+
skip_if_remote_database_disk_enabled(cluster)
1469+
1470+
node = cluster.instances["replica1"]
1471+
1472+
uid = str(uuid.uuid4()).replace("-", "_")
1473+
mt_table = f"stop_moves_during_mt_{uid}"
1474+
s3_table = f"stop_moves_during_s3_{uid}"
1475+
1476+
create_tables_and_insert_data(node, mt_table, s3_table, "replica1")
1477+
1478+
minio_ip = cluster.minio_ip
1479+
minio_port = cluster.minio_port
1480+
1481+
with PartitionManager() as pm:
1482+
pm.add_rule({
1483+
"instance": node,
1484+
"destination": node.ip_address,
1485+
"protocol": "tcp",
1486+
"source_port": minio_port,
1487+
"action": "REJECT --reject-with tcp-reset",
1488+
})
1489+
pm.add_rule({
1490+
"instance": node,
1491+
"destination": minio_ip,
1492+
"protocol": "tcp",
1493+
"destination_port": minio_port,
1494+
"action": "REJECT --reject-with tcp-reset",
1495+
})
1496+
1497+
node.query(
1498+
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}"
1499+
f" SETTINGS export_merge_tree_partition_max_retries = 50"
1500+
)
1501+
1502+
wait_for_export_to_start(node, mt_table, s3_table, "2020")
1503+
1504+
# Let the tasks start executing and failing against the blocked S3.
1505+
time.sleep(2)
1506+
1507+
node.query(f"SYSTEM STOP MOVES {mt_table}")
1508+
1509+
# Give the cancel callback time to fire and the lock-release path to run.
1510+
time.sleep(3)
1511+
1512+
status = node.query(
1513+
f"SELECT status FROM system.replicated_partition_exports"
1514+
f" WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}'"
1515+
f" AND partition_id = '2020'"
1516+
).strip()
1517+
1518+
assert status == "PENDING", (
1519+
f"Expected PENDING while moves are stopped and S3 is blocked, got '{status}'"
1520+
)
1521+
1522+
node.query(f"SYSTEM START MOVES {mt_table}")
1523+
1524+
# MinIO is now unblocked; the next scheduler cycle should succeed.
1525+
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED", timeout=60)
1526+
1527+
row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip())
1528+
assert row_count == 3, f"Expected 3 rows in S3 after export completed, got {row_count}"
1529+

0 commit comments

Comments
 (0)