Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,14 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string,
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
doneChan := make(chan struct{})
go func() {
visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
close(doneChan)
}()
defer func() {
errChan <- nil
<-doneChan
}()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This exact pattern is duplicated verbatim in cleanDeletedUsers below. Consider extracting a small helper to reduce the duplication, e.g.:

func runWithHeartBeat(ctx context.Context, mgr *VisitMarkerManager, interval time.Duration, fn func() error) error {
    errChan := make(chan error, 1)
    doneChan := make(chan struct{})
    go func() {
        mgr.HeartBeat(ctx, errChan, interval, true)
        close(doneChan)
    }()
    defer func() {
        errChan <- nil
        <-doneChan
    }()
    return fn()
}

This is optional — the duplication is small and the current code is clear.

return errors.Wrapf(c.cleanUser(ctx, userLogger, userBucket, userID, firstRun), "failed to delete blocks for user: %s", userID)
})
Expand Down Expand Up @@ -392,9 +397,14 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
doneChan := make(chan struct{})
go func() {
visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
close(doneChan)
}()
defer func() {
errChan <- nil
<-doneChan
}()
return errors.Wrapf(c.deleteUserMarkedForDeletion(ctx, userLogger, userBucket, userID), "failed to delete user marked for deletion: %s", userID)
})
Expand Down
Loading