Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
dfea6a4
single commit
vaibhavk1992 Apr 25, 2025
b75bc7c
adding delta kernel
vaibhavk1992 May 16, 2025
16134b3
adding the test file
vaibhavk1992 May 19, 2025
3929e95
adding workable code for iteration over data
vaibhavk1992 May 26, 2025
c6379b5
adding Kernel 4.0 code
vaibhavk1992 Jun 11, 2025
6deb5f7
adding the working code with xtable that check getcurrenttable
vaibhavk1992 Jun 24, 2025
05d9984
Merge branch 'main' into test-4
vaibhavk1992 Jun 30, 2025
c7ba4b9
adding the dependecies
vaibhavk1992 Jun 30, 2025
0ff36a5
adding getcurrentsnapshot code
vaibhavk1992 Jul 19, 2025
18ab9d6
spotless fix
vaibhavk1992 Jul 19, 2025
e906091
spotless fix 2
vaibhavk1992 Jul 19, 2025
e00241c
spotless fix 2
vaibhavk1992 Jul 19, 2025
3fdfd31
fixed partitioned test case
vaibhavk1992 Jul 26, 2025
e0102e3
setting junit parallel execution to true
vaibhavk1992 Jul 28, 2025
381722a
testInsertsUpsertsAndDeletes test case addition,internal datatype add…
vaibhavk1992 Aug 5, 2025
809bfe8
added the fix for table basepath listing wrong paths
vaibhavk1992 Aug 7, 2025
40172f2
added the fix for table basepath listing wrong paths
vaibhavk1992 Aug 7, 2025
e0b7829
adding all tests
vaibhavk1992 Aug 27, 2025
9ac022a
adding refactored code
vaibhavk1992 Aug 27, 2025
73f33b6
spotless fix
vaibhavk1992 Aug 27, 2025
bee3e8a
fix change extraction
the-other-tim-brown Oct 5, 2025
e75bb55
adding the commitbacklog test cases changes
vaibhavk1992 Oct 7, 2025
21044af
Merge branch 'apache:main' into test-4
vaibhavk1992 Oct 7, 2025
e212f52
adding a test case testConvertFromDeltaPartitionFormat
vaibhavk1992 Oct 13, 2025
988cda1
adding a test case testConvertFromDeltaPartitionFormat
vaibhavk1992 Oct 13, 2025
1705ce4
adding the KernelPartitionExtractor test under kernel
vaibhavk1992 Oct 23, 2025
8f81109
commiting schema extractor and stats extrator
vaibhavk1992 Nov 11, 2025
49ebf21
adding unit test cases with the request changes on the PR
vaibhavk1992 Nov 17, 2025
70fe0e3
spotless fix
vaibhavk1992 Nov 17, 2025
fba7e0e
spotless fix
vaibhavk1992 Nov 17, 2025
bb4dc4e
Merge branch 'main' into test-4
vaibhavk1992 Nov 17, 2025
6b1be2d
adding haddop common in xtable service POM
vaibhavk1992 Nov 18, 2025
2f46699
changed map type to java and removed print commands
vaibhavk1992 Nov 22, 2025
70469fb
changed map type to java and removed print commands
vaibhavk1992 Nov 22, 2025
ae61a28
removing hadoop common from xtable service
vaibhavk1992 Nov 24, 2025
a6f86ac
fixing POM
vaibhavk1992 Nov 25, 2025
cd30bab
resolving some minor comments from review
vaibhavk1992 Nov 25, 2025
cecf300
changing constructor for Datafile extractor
vaibhavk1992 Nov 26, 2025
253de3f
add exclusion hadoop-client-runtime in POM
vaibhavk1992 Nov 27, 2025
019855b
removing unused code and string comparison method
vaibhavk1992 Dec 1, 2025
977df2f
removing while True condition
vaibhavk1992 Dec 2, 2025
3a1df45
Merge branch 'apache:main' into test-4
vaibhavk1992 Dec 29, 2025
2ea30a8
Add Delta Kernel integration with disabled tests
vaibhavk1992 Feb 6, 2026
5d695cc
Merge remote-tracking branch 'upstream/main' into test-4
vaibhavk1992 Feb 6, 2026
10f7fbe
adding the conversion target files
vaibhavk1992 Feb 9, 2026
de57c88
corrected the test cases
vaibhavk1992 Feb 13, 2026
0345140
spotless fix
vaibhavk1992 Feb 13, 2026
b9f27af
adding read write integration test case
vaibhavk1992 Feb 21, 2026
2af1236
addressed comments over PR
vaibhavk1992 Mar 2, 2026
2d0e16e
addressed comments over PR
vaibhavk1992 Mar 2, 2026
ab0417c
addressed comments over PR
vaibhavk1992 Mar 2, 2026
935d835
adding data types
vaibhavk1992 Mar 2, 2026
23f6321
dummy commit to trigger actions
vaibhavk1992 Mar 2, 2026
cf5029f
Merge upstream/main into test-4
vaibhavk1992 Mar 2, 2026
d622ae7
Apply spotless formatting to remove wildcard imports
vaibhavk1992 Mar 2, 2026
66eb9df
Fix missing imports after spotless wildcard removal
vaibhavk1992 Mar 2, 2026
d405870
adding read write integration test case
vaibhavk1992 Mar 2, 2026
37caddf
Fix exception handling, Scala/Java mixing, and test quality in Delta …
vaibhavk1992 Mar 11, 2026
3915e22
added fixes
vaibhavk1992 Mar 16, 2026
5868207
adding fixes
vaibhavk1992 Mar 16, 2026
ed034d9
Enhance Delta Kernel test coverage with strict assertions and complex…
vaibhavk1992 Apr 11, 2026
144c60e
Add Delta Kernel statistics support and refactor stats extraction
vaibhavk1992 Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,293 +18,69 @@

package org.apache.xtable.delta;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Value;
import lombok.extern.log4j.Log4j2;

import org.apache.commons.lang3.StringUtils;

import org.apache.spark.sql.delta.actions.AddFile;

import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;

import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.model.exception.ParseException;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.FileStats;
import org.apache.xtable.model.stat.Range;

/**
* DeltaStatsExtractor extracts column stats and also responsible for their serialization leveraging
* {@link DeltaValueConverter}.
* Delta Standalone stats extractor - delegates to {@link DeltaStatsUtils} for shared logic.
*
* @deprecated This class is a thin wrapper around DeltaStatsUtils. Consider using DeltaStatsUtils
* directly.
*/
@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaStatsExtractor {
private static final Set<InternalType> FIELD_TYPES_WITH_STATS_SUPPORT =
new HashSet<>(
Arrays.asList(
InternalType.BOOLEAN,
InternalType.DATE,
InternalType.DECIMAL,
InternalType.DOUBLE,
InternalType.INT,
InternalType.LONG,
InternalType.FLOAT,
InternalType.STRING,
InternalType.TIMESTAMP,
InternalType.TIMESTAMP_NTZ));

private static final DeltaStatsExtractor INSTANCE = new DeltaStatsExtractor();

private static final ObjectMapper MAPPER = new ObjectMapper();

/* this data structure collects type names of all unrecognized Delta Lake stats. For instance
data file stats in presence of delete vectors would contain 'tightBounds' stat which is
currently not handled by XTable */
private final Set<String> unsupportedStats = new HashSet<>();

public static DeltaStatsExtractor getInstance() {
return INSTANCE;
}

/**
* Converts XTable column statistics to Delta format JSON.
*
* @param schema the table schema
* @param numRecords the number of records
* @param columnStats the column statistics
* @return JSON string in Delta format
* @throws JsonProcessingException if serialization fails
*/
public String convertStatsToDeltaFormat(
InternalSchema schema, long numRecords, List<ColumnStat> columnStats)
throws JsonProcessingException {
DeltaStats.DeltaStatsBuilder deltaStatsBuilder = DeltaStats.builder();
deltaStatsBuilder.numRecords(numRecords);
if (columnStats == null) {
return MAPPER.writeValueAsString(deltaStatsBuilder.build());
}
Set<String> validPaths = getPathsFromStructSchemaForMinAndMaxStats(schema);
List<ColumnStat> validColumnStats =
columnStats.stream()
.filter(stat -> validPaths.contains(stat.getField().getPath()))
.collect(Collectors.toList());
DeltaStats deltaStats =
deltaStatsBuilder
.minValues(getMinValues(validColumnStats))
.maxValues(getMaxValues(validColumnStats))
.nullCount(getNullCount(validColumnStats))
.build();
return MAPPER.writeValueAsString(deltaStats);
}

private Set<String> getPathsFromStructSchemaForMinAndMaxStats(InternalSchema schema) {
return schema.getAllFields().stream()
.filter(
field -> {
InternalType type = field.getSchema().getDataType();
return FIELD_TYPES_WITH_STATS_SUPPORT.contains(type);
})
.map(InternalField::getPath)
.collect(Collectors.toSet());
}

private Map<String, Object> getMinValues(List<ColumnStat> validColumnStats) {
return getValues(validColumnStats, columnStat -> columnStat.getRange().getMinValue());
}

private Map<String, Object> getMaxValues(List<ColumnStat> validColumnStats) {
return getValues(validColumnStats, columnStat -> columnStat.getRange().getMaxValue());
}

private Map<String, Object> getValues(
List<ColumnStat> validColumnStats, Function<ColumnStat, Object> valueExtractor) {
Map<String, Object> jsonObject = new HashMap<>();
validColumnStats.forEach(
columnStat -> {
InternalField field = columnStat.getField();
String[] pathParts = field.getPathParts();
insertValueAtPath(
jsonObject,
pathParts,
DeltaValueConverter.convertToDeltaColumnStatValue(
valueExtractor.apply(columnStat), field.getSchema()));
});
return jsonObject;
}

private Map<String, Object> getNullCount(List<ColumnStat> validColumnStats) {
// TODO: Additional work needed to track nulls maps & arrays.
Map<String, Object> jsonObject = new HashMap<>();
validColumnStats.forEach(
columnStat -> {
String[] pathParts = columnStat.getField().getPathParts();
insertValueAtPath(jsonObject, pathParts, columnStat.getNumNulls());
});
return jsonObject;
}

private void insertValueAtPath(Map<String, Object> jsonObject, String[] pathParts, Object value) {
if (pathParts == null || pathParts.length == 0) {
return;
}
Map<String, Object> currObject = jsonObject;
for (int i = 0; i < pathParts.length; i++) {
String part = pathParts[i];
if (i == pathParts.length - 1) {
currObject.put(part, value);
} else {
if (!currObject.containsKey(part)) {
currObject.put(part, new HashMap<String, Object>());
}
try {
currObject = (HashMap<String, Object>) currObject.get(part);
} catch (ClassCastException e) {
throw new RuntimeException(
String.format(
"Cannot cast to hashmap while inserting stats at path %s",
String.join("->", pathParts)),
e);
}
}
}
}

public FileStats getColumnStatsForFile(AddFile addFile, List<InternalField> fields) {
if (StringUtils.isEmpty(addFile.stats())) {
return FileStats.builder().columnStats(Collections.emptyList()).numRecords(0).build();
}
// TODO: Additional work needed to track maps & arrays.
try {
DeltaStats deltaStats = MAPPER.readValue(addFile.stats(), DeltaStats.class);
collectUnsupportedStats(deltaStats.getAdditionalStats());

Map<String, Object> fieldPathToMaxValue = flattenStatMap(deltaStats.getMaxValues());
Map<String, Object> fieldPathToMinValue = flattenStatMap(deltaStats.getMinValues());
Map<String, Object> fieldPathToNullCount = flattenStatMap(deltaStats.getNullCount());
List<ColumnStat> columnStats =
fields.stream()
.filter(field -> fieldPathToMaxValue.containsKey(field.getPath()))
.map(
field -> {
String fieldPath = field.getPath();
Object minValue =
DeltaValueConverter.convertFromDeltaColumnStatValue(
fieldPathToMinValue.get(fieldPath), field.getSchema());
Object maxValue =
DeltaValueConverter.convertFromDeltaColumnStatValue(
fieldPathToMaxValue.get(fieldPath), field.getSchema());
Number nullCount = (Number) fieldPathToNullCount.get(fieldPath);
Range range = Range.vector(minValue, maxValue);
return ColumnStat.builder()
.field(field)
.numValues(deltaStats.getNumRecords())
.numNulls(nullCount.longValue())
.range(range)
.build();
})
.collect(CustomCollectors.toList(fields.size()));
return FileStats.builder()
.columnStats(columnStats)
.numRecords(deltaStats.getNumRecords())
.build();
} catch (IOException ex) {
throw new ParseException("Unable to parse stats json", ex);
}
}

private void collectUnsupportedStats(Map<String, Object> additionalStats) {
if (additionalStats == null || additionalStats.isEmpty()) {
return;
}

additionalStats.keySet().stream()
.filter(key -> !unsupportedStats.contains(key))
.forEach(
key -> {
log.info("Unrecognized/unsupported Delta data file stat: {}", key);
unsupportedStats.add(key);
});
return DeltaStatsUtils.convertStatsToDeltaFormat(schema, numRecords, columnStats);
}

/**
* Takes the input map which represents a json object and flattens it.
* Extracts column statistics from Delta AddFile.
*
* @param statMap input json map
* @return map with keys representing the dot-path for the field
* @param addFile the Delta AddFile action
* @param fields the fields to extract stats for
* @return FileStats containing column statistics
*/
private Map<String, Object> flattenStatMap(Map<String, Object> statMap) {
Map<String, Object> result = new HashMap<>();
Queue<StatField> statFieldQueue = new ArrayDeque<>();
statFieldQueue.add(StatField.of("", statMap));
while (!statFieldQueue.isEmpty()) {
StatField statField = statFieldQueue.poll();
String prefix = statField.getParentPath().isEmpty() ? "" : statField.getParentPath() + ".";
statField
.getValues()
.forEach(
(fieldName, value) -> {
String fullName = prefix + fieldName;
if (value instanceof Map) {
statFieldQueue.add(StatField.of(fullName, (Map<String, Object>) value));
} else {
result.put(fullName, value);
}
});
}
return result;
public FileStats getColumnStatsForFile(AddFile addFile, List<InternalField> fields) {
return DeltaStatsUtils.parseColumnStatsFromJson(addFile.stats(), fields);
}

/**
* Returns the names of all unsupported stats that have been discovered during the parsing of
* Delta Lake stats.
* Returns unsupported stats discovered during parsing.
*
* @return set of unsupported stats
* @return set of unsupported stat names
*/
@VisibleForTesting
Set<String> getUnsupportedStats() {
return Collections.unmodifiableSet(unsupportedStats);
}

@Builder
@Value
private static class DeltaStats {
long numRecords;
Map<String, Object> minValues;
Map<String, Object> maxValues;
Map<String, Object> nullCount;

/* this is a catch-all for any additional stats that are not explicitly handled */
@JsonIgnore
@Getter(lazy = true)
Map<String, Object> additionalStats = new HashMap<>();

@JsonAnySetter
public void setAdditionalStat(String key, Object value) {
getAdditionalStats().put(key, value);
}
}

@Value
@AllArgsConstructor(staticName = "of")
private static class StatField {
String parentPath;
Map<String, Object> values;
return DeltaStatsUtils.getUnsupportedStats();
}
}
Loading