Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion OnlineMongoMigrationProcessor/Helpers/Mongo/MongoHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public static long GetActualDocumentCount(IMongoCollection<BsonDocument> collect
{
FilterDefinition<BsonDocument>? userFilter = GetFilterDoc(mu.UserFilter);
var filter = userFilter ?? Builders<BsonDocument>.Filter.Empty;
return collection.CountDocuments(filter);
return collection.CountDocuments(filter, new CountOptions { MaxTime = TimeSpan.FromMinutes(10) });
}

/// <summary>
Expand Down
87 changes: 41 additions & 46 deletions OnlineMongoMigrationProcessor/Partitioner/SamplePartitioner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,62 +143,46 @@ public static int GetMaxSamples()
}
}

if (chunkCount > adjustedMaxSamples)
bool usePaginationPartitioner = dataType != DataType.ObjectId && config.NonObjectIdPartitioner == PartitionerType.UsePagination;

if (optimizeForMongoDump)
{
int count = 2;
long newCount = docCountByType / ((long)minDocsPerSegment * count);
while (newCount > adjustedMaxSamples)
{
count++;
// DumpAndRestore: oversample then pick equidistant quantile boundaries
chunkCount = Math.Max(1, (int)Math.Ceiling((double)docCountByType / minDocsPerChunk));
segmentCount = 1;
adjustedMaxSamples = (int)Math.Min((long)Math.Floor(docCountByType * 0.04), 500000);

// Check for potential overflow before multiplication
long multiplier = (long)minDocsPerSegment * count;
if (multiplier <= 0 || multiplier > docCountByType)
{
break;
}
newCount = docCountByType / multiplier;
MigrationJobContext.AddVerboseLog($"SamplePartitioner.Calculating adjustedMaxSamples: collection={collection.CollectionNamespace}, newCount={newCount}, dataType={dataType}, docCountByType={docCountByType}, multiplier={multiplier}");
}

// Oversample ~200 per chunk, capped by adjustedMaxSamples
sampleCount = (int)Math.Min((long)chunkCount * 200, adjustedMaxSamples);
sampleCount = Math.Max(sampleCount, chunkCount); // at least chunkCount samples

log.WriteLine($"Requested chunk count {chunkCount} exceeds maximum samples {adjustedMaxSamples} for {collection.CollectionNamespace}. Adjusting to {newCount}", LogType.Warning);
chunkCount = (int)newCount;
MigrationJobContext.AddVerboseLog($"SamplePartitioner DumpAndRestore: collection={collection.CollectionNamespace}, dataType={dataType}, chunkCount={chunkCount}, sampleCount={sampleCount}");
}
else
{
// MongoDriver path: compute segments for parallel writes within each chunk
chunkCount = Math.Max(1, (int)Math.Ceiling((double)docCountByType / minDocsPerChunk));
docsInChunk = docCountByType / chunkCount;

segmentCount = Math.Min(
Math.Max(1, (int)Math.Ceiling((double)docsInChunk / minDocsPerSegment)),
GetMaxSegments()
);

// Ensure minimum documents per chunk
chunkCount = Math.Max(1, (int)Math.Ceiling((double)docCountByType / minDocsPerChunk));
docsInChunk = docCountByType / chunkCount;

// Calculate segments based on documents per chunk
segmentCount = Math.Min(
Math.Max(1, (int)Math.Ceiling((double)docsInChunk / minDocsPerSegment)),
GetMaxSegments()
);


// Calculate the total sample count
sampleCount = Math.Min(chunkCount * segmentCount, adjustedMaxSamples);


MigrationJobContext.AddVerboseLog($"SamplePartitioner.Calculating sampleCount: collection={collection.CollectionNamespace}, dataType={dataType}, segmentCount={segmentCount}, sampleCount{sampleCount}");

// Adjust segments per chunk based on the new sample count
segmentCount = Math.Max(1, sampleCount / chunkCount);

bool usePaginationPartitioner = dataType != DataType.ObjectId && config.NonObjectIdPartitioner == PartitionerType.UsePagination;
sampleCount = Math.Min(chunkCount * segmentCount, adjustedMaxSamples);
segmentCount = Math.Max(1, sampleCount / chunkCount);

// Optimize for non-dump scenarios
if (!optimizeForMongoDump && !usePaginationPartitioner)
{
while (chunkCount > segmentCount && segmentCount < GetMaxSegments())
if (!usePaginationPartitioner)
{
chunkCount--;
segmentCount++;
while (chunkCount > segmentCount && segmentCount < GetMaxSegments())
{
chunkCount--;
segmentCount++;
}
chunkCount = sampleCount / segmentCount;
}

chunkCount = sampleCount / segmentCount;
MigrationJobContext.AddVerboseLog($"SamplePartitioner MongoDriver: collection={collection.CollectionNamespace}, dataType={dataType}, chunkCount={chunkCount}, segmentCount={segmentCount}, sampleCount={sampleCount}");
}

MigrationJobContext.AddVerboseLog($"SamplePartitioner.Calculating chunkCount: collection={collection.CollectionNamespace}, dataType={dataType}, chunkCount={chunkCount}");
Expand Down Expand Up @@ -355,6 +339,17 @@ public static int GetMaxSamples()
return null;
}
// Step 3: Calculate partition boundaries
// For DumpAndRestore: pick equidistant quantile boundaries from oversampled sorted list
if (optimizeForMongoDump && partitionValues.Count > chunkCount)
{
var quantileBoundaries = new List<BsonValue>();
for (int k = 0; k < chunkCount; k++)
{
int idx = (int)((long)k * partitionValues.Count / chunkCount);
quantileBoundaries.Add(partitionValues[idx]);
}
partitionValues = quantileBoundaries;
}

chunkBoundaries = ConvertToBoundaries(partitionValues, segmentCount);

Expand Down