Skip to content

Commit a9cfa36

Browse files
committed
address comments and fix test race condition
1 parent 2ee0815 commit a9cfa36

3 files changed

Lines changed: 16 additions & 25 deletions

File tree

pkg/compactor/compactor.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,8 +1048,10 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
10481048
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
10491049
}
10501050

1051-
// Add ignoreDeletionMarkFilter only when not using bucket index discovery.
1052-
if blockDiscoveryStrategy != cortex_tsdb.BucketIndexDiscovery {
1051+
// Add ignoreDeletionMarkFilter only when not using bucket index discovery or using default compaction strategy.
1052+
// CompactionStrategyDefault would mark parent blocks for deletion after compaction is finished. ShuffleShardingGrouper
1053+
// should ignore blocks marked for deletion during grouping stage directly.
1054+
if blockDiscoveryStrategy != cortex_tsdb.BucketIndexDiscovery || c.compactorCfg.CompactionStrategy == util.CompactionStrategyDefault {
10531055
filterList = append(filterList, ignoreDeletionMarkFilter)
10541056
}
10551057

pkg/compactor/partition_compaction_planner.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB
182182
}
183183
err = p.ignoreDeletionMarkFilter.Filter(p.ctx, resultMetasMap, p.compactorMetrics.metaFetcherSynced, p.compactorMetrics.metaFetcherModified)
184184
if err != nil {
185+
visitMarkerManager.MarkWithStatus(p.ctx, Failed)
186+
level.Warn(p.logger).Log("msg", "unable to filter blocks by deletion marker", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "err", err)
185187
return nil, err
186188
}
187189
var deletedBlocks []string

pkg/compactor/partition_compaction_planner_test.go

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"io"
87
"path"
98
"testing"
109
"time"
@@ -367,9 +366,8 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) {
367366
partitionID := 0
368367
visitMarkerTimeout := 5 * time.Minute
369368

370-
setupBucket := func(t *testing.T, deletionMarkBlockIDs []ulid.ULID) (*bucket.ClientMock, *partitionVisitMarker) {
369+
setupBucket := func(t *testing.T, deletionMarkBlockIDs []ulid.ULID) *bucket.ClientMock {
371370
bkt := &bucket.ClientMock{}
372-
uploadedVisitMarker := &partitionVisitMarker{}
373371

374372
expireTime := time.Now()
375373
visitMarker := partitionVisitMarker{
@@ -400,14 +398,7 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) {
400398
bkt.MockGet(visitMarkerFile, string(visitMarkerFileContent), nil)
401399
bkt.MockGet(partitionedGroupFile, string(partitionedGroupContent), nil)
402400

403-
// Capture uploaded visit marker content
404-
bkt.On("Upload", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
405-
reader := args.Get(2).(io.Reader)
406-
data, err := io.ReadAll(reader)
407-
if err == nil {
408-
_ = json.Unmarshal(data, uploadedVisitMarker)
409-
}
410-
}).Return(nil)
401+
bkt.On("Upload", mock.Anything, mock.Anything, mock.Anything).Return(nil)
411402

412403
// Mock deletion marks for specified blocks
413404
deletionMarkIDs := make(map[ulid.ULID]struct{})
@@ -431,10 +422,10 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) {
431422
}
432423
}
433424

434-
return bkt, uploadedVisitMarker
425+
return bkt
435426
}
436427

437-
createPlanner := func(bkt *bucket.ClientMock, partitionedGroupInfo PartitionedGroupInfo) *PartitionCompactionPlanner {
428+
createPlanner := func(bkt *bucket.ClientMock) *PartitionCompactionPlanner {
438429
registerer := prometheus.NewPedanticRegistry()
439430
metrics := newCompactorMetrics(registerer)
440431
logger := log.NewLogfmtLogger(&concurrency.SyncBuffer{})
@@ -458,15 +449,15 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) {
458449
}
459450

460451
t.Run("should plan successfully when no blocks are marked for deletion", func(t *testing.T) {
461-
bkt, uploadedVisitMarker := setupBucket(t, nil)
452+
bkt := setupBucket(t, nil)
462453
partitionedGroupContent := PartitionedGroupInfo{}
463454
partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID)
464455
raw, _ := bkt.Get(context.Background(), partitionedGroupFile)
465456
buf := make([]byte, 4096)
466457
n, _ := raw.Read(buf)
467458
_ = json.Unmarshal(buf[:n], &partitionedGroupContent)
468459

469-
p := createPlanner(bkt, partitionedGroupContent)
460+
p := createPlanner(bkt)
470461

471462
blocks := []*metadata.Meta{
472463
{BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}},
@@ -486,20 +477,18 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) {
486477
require.Len(t, actual, 2)
487478
assert.Equal(t, block1ulid, actual[0].ULID)
488479
assert.Equal(t, block2ulid, actual[1].ULID)
489-
// Visit marker should not be marked as Failed
490-
assert.NotEqual(t, Failed, uploadedVisitMarker.GetStatus())
491480
})
492481

493482
t.Run("should fail when blocks are marked for deletion", func(t *testing.T) {
494-
bkt, uploadedVisitMarker := setupBucket(t, []ulid.ULID{block2ulid})
483+
bkt := setupBucket(t, []ulid.ULID{block2ulid})
495484
partitionedGroupContent := PartitionedGroupInfo{}
496485
partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID)
497486
raw, _ := bkt.Get(context.Background(), partitionedGroupFile)
498487
buf := make([]byte, 4096)
499488
n, _ := raw.Read(buf)
500489
_ = json.Unmarshal(buf[:n], &partitionedGroupContent)
501490

502-
p := createPlanner(bkt, partitionedGroupContent)
491+
p := createPlanner(bkt)
503492

504493
blocks := []*metadata.Meta{
505494
{BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}},
@@ -519,19 +508,18 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) {
519508
require.Error(t, err)
520509
assert.Contains(t, err.Error(), "partitioned group contains 1 deleted blocks")
521510
assert.Nil(t, actual)
522-
assert.Equal(t, Failed, uploadedVisitMarker.GetStatus())
523511
})
524512

525513
t.Run("should fail when multiple blocks are marked for deletion", func(t *testing.T) {
526-
bkt, uploadedVisitMarker := setupBucket(t, []ulid.ULID{block1ulid, block3ulid})
514+
bkt := setupBucket(t, []ulid.ULID{block1ulid, block3ulid})
527515
partitionedGroupContent := PartitionedGroupInfo{}
528516
partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID)
529517
raw, _ := bkt.Get(context.Background(), partitionedGroupFile)
530518
buf := make([]byte, 4096)
531519
n, _ := raw.Read(buf)
532520
_ = json.Unmarshal(buf[:n], &partitionedGroupContent)
533521

534-
p := createPlanner(bkt, partitionedGroupContent)
522+
p := createPlanner(bkt)
535523

536524
blocks := []*metadata.Meta{
537525
{BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}},
@@ -551,6 +539,5 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) {
551539
require.Error(t, err)
552540
assert.Contains(t, err.Error(), "partitioned group contains 2 deleted blocks")
553541
assert.Nil(t, actual)
554-
assert.Equal(t, Failed, uploadedVisitMarker.GetStatus())
555542
})
556543
}

0 commit comments

Comments
 (0)