From 5e3d5f35db6df6ecc78b2676673275d99a3d3fb5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 11 Mar 2026 16:42:13 -0400 Subject: [PATCH 1/3] portable date --- .../beam/model/pipeline/v1/schema.proto | 7 +++++ .../beam/sdk/schemas/SchemaTranslation.java | 2 ++ .../beam/sdk/schemas/logicaltypes/Date.java | 20 ++++++------- .../sdk/schemas/SchemaTranslationTest.java | 3 ++ .../apache_beam/portability/common_urns.py | 1 + .../transforms/managed_iceberg_it_test.py | 14 ++++++---- sdks/python/apache_beam/typehints/schemas.py | 28 +++++++++++++++++++ .../apache_beam/typehints/schemas_test.py | 2 ++ 8 files changed, 61 insertions(+), 16 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto index 5250aaaa1a7a..58d16b83749d 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto @@ -174,6 +174,13 @@ message LogicalTypes { // A variable-length string with its maximum length as the argument. VAR_CHAR = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:logical_type:var_char:v1"]; + + // A URN for Date type + // - Representation type: INT64 + // - A date without a timezone, represented by the number of days + // since the epoch. + DATE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:logical_type:date:v1"]; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index 8ad5bb5ff97f..d527b2949235 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.logicaltypes.Date; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; import org.apache.beam.sdk.schemas.logicaltypes.FixedString; @@ -113,6 +114,7 @@ private static String getLogicalTypeUrn(String identifier) { .put(VariableBytes.IDENTIFIER, VariableBytes.class) .put(FixedString.IDENTIFIER, FixedString.class) .put(VariableString.IDENTIFIER, VariableString.class) + .put(Date.IDENTIFIER, Date.class) .build(); public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java index 894b585fe660..09cc040b609c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java @@ -18,7 +18,10 @@ package org.apache.beam.sdk.schemas.logicaltypes; import java.time.LocalDate; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.schemas.Schema; +import org.checkerframework.checker.nullness.qual.Nullable; /** * A date without a time-zone. @@ -30,23 +33,20 @@ * incrementing count of days where day 0 is 1970-01-01 (ISO). */ public class Date implements Schema.LogicalType { - public static final String IDENTIFIER = "beam:logical_type:date:v1"; + public static final String IDENTIFIER = + SchemaApi.LogicalTypes.Enum.DATE + .getValueDescriptor() + .getOptions() + .getExtension(RunnerApi.beamUrn); @Override public String getIdentifier() { return IDENTIFIER; } - // unused @Override - public Schema.FieldType getArgumentType() { - return Schema.FieldType.STRING; - } - - // unused - @Override - public String getArgument() { - return ""; + public Schema.@Nullable FieldType getArgumentType() { + return null; } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java index fb9a8308fcff..8a382c4ad1b8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java @@ -25,6 +25,7 @@ import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.HashMap; @@ -142,6 +143,7 @@ public static Iterable data() { Field.of("decimal", FieldType.DECIMAL), Field.of("datetime", FieldType.DATETIME))) .add(Schema.of(Field.of("fixed_bytes", FieldType.logicalType(FixedBytes.of(24))))) .add(Schema.of(Field.of("micros_instant", FieldType.logicalType(new MicrosInstant())))) + .add(Schema.of(Field.of("date", FieldType.logicalType(SqlTypes.DATE)))) .add(Schema.of(Field.of("python_callable", FieldType.logicalType(new PythonCallable())))) .add( Schema.of( @@ -388,6 +390,7 @@ public static Iterable data() { .add(simpleRow(FieldType.DECIMAL, BigDecimal.valueOf(100000))) .add(simpleRow(FieldType.logicalType(new PortableNullArgLogicalType()), "str")) .add(simpleRow(FieldType.logicalType(new DateTime()), LocalDateTime.of(2000, 1, 3, 3, 1))) + .add(simpleRow(FieldType.logicalType(SqlTypes.DATE), LocalDate.of(2000, 1, 3))) .add(simpleNullRow(FieldType.STRING)) .add(simpleNullRow(FieldType.INT32)) .add(simpleNullRow(FieldType.map(FieldType.STRING, FieldType.INT32))) diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 74d9a39bb052..7777f63ffbe9 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -92,3 +92,4 @@ var_bytes = LogicalTypes.Enum.VAR_BYTES fixed_char = LogicalTypes.Enum.FIXED_CHAR var_char = LogicalTypes.Enum.VAR_CHAR +date = LogicalTypes.Enum.DATE diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index b1e53a79bd41..d8b362853730 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import datetime import os import unittest import uuid @@ -28,10 +28,10 @@ @pytest.mark.uses_io_java_expansion_service -@unittest.skipUnless( - os.environ.get('EXPANSION_JARS'), - "EXPANSION_JARS environment var is not provided, " - "indicating that jars have not been built") +# @unittest.skipUnless( +# os.environ.get('EXPANSION_JARS'), +# "EXPANSION_JARS environment var is not provided, " +# "indicating that jars have not been built") class ManagedIcebergIT(unittest.TestCase): WAREHOUSE = "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java" @@ -49,7 +49,8 @@ def _create_row(self, num: int): bytes_=bytes(num), bool_=(num % 2 == 0), float_=(num + float(num) / 100), - arr_=[num, num, num]) + arr_=[num, num, num], + date_=datetime.date.today() - datetime.timedelta(days=num)) def test_write_read_pipeline(self): iceberg_config = { @@ -58,6 +59,7 @@ def test_write_read_pipeline(self): "catalog_properties": { "type": "hadoop", "warehouse": self.WAREHOUSE, + "io-impl": "org.apache.iceberg.gcp.gcs.GCSFileIO" } } diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 5dd8ff290c48..fd0909467d7a 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -34,6 +34,7 @@ bytes <-----> BYTES ByteString ------> BYTES Timestamp <-----> LogicalType(urn="beam:logical_type:micros_instant:v1") + datetime.date <---> LogicalType(urn="beam:logical_type:date:v1") Decimal <-----> LogicalType(urn="beam:logical_type:fixed_decimal:v1") Mapping <-----> MapType Sequence <-----> ArrayType @@ -991,6 +992,33 @@ def to_language_type(self, value): return Timestamp(seconds=int(value.seconds), micros=int(value.micros)) +# @LogicalType._register_internal +# class Date(NoArgumentLogicalType[datetime.date, np.int64]): +# """Date logical type that handles ``datetime.date``, days since epoch.""" +# EPOCH = datetime.date(1970, 1, 1) +# +# @classmethod +# def urn(cls): +# return common_urns.date.urn +# +# @classmethod +# def representation_type(cls): +# # type: () -> type +# return np.int64 +# +# @classmethod +# def language_type(cls): +# return datetime.date +# +# def to_representation_type(self, value): +# # type: (datetime.date) -> np.int64 +# return (value - self.EPOCH).days +# +# def to_language_type(self, value): +# # type: (np.int64) -> datetime.date +# return self.EPOCH + datetime.timedelta(days=value) + + @LogicalType._register_internal class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): """A logical type for PythonCallableSource objects.""" diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 5a5d7396ab30..d70bf0c47d33 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -20,6 +20,7 @@ # pytype: skip-file import dataclasses +import datetime import itertools import pickle import unittest @@ -105,6 +106,7 @@ class ComplexSchema(NamedTuple): optional_array: Optional[Sequence[np.float32]] array_optional: Sequence[Optional[bool]] timestamp: Timestamp + date: datetime.date def get_test_beam_fieldtype_protos(): From 957a702544b3062b3eb5ff17f917cc7522efa1b3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 11 Mar 2026 16:45:01 -0400 Subject: [PATCH 2/3] trigger ITs --- .../trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json index b60f5c4cc3c8..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index b26833333238..c537844dc84a 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } From 6a52c085885b98a12a457567914b9c97665f280f Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 12 Mar 2026 00:22:09 -0400 Subject: [PATCH 3/3] trigger ITs; cleanup --- ...am_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- .../transforms/managed_iceberg_it_test.py | 1 - sdks/python/apache_beam/typehints/schemas.py | 50 +++++++++---------- 3 files changed, 26 insertions(+), 27 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index 83346d34aee0..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 16 + "modification": 1 } diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index d8b362853730..495fa28c5984 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -15,7 +15,6 @@ # limitations under the License. # import datetime -import os import unittest import uuid diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index fd0909467d7a..281518de6d41 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -992,31 +992,31 @@ def to_language_type(self, value): return Timestamp(seconds=int(value.seconds), micros=int(value.micros)) -# @LogicalType._register_internal -# class Date(NoArgumentLogicalType[datetime.date, np.int64]): -# """Date logical type that handles ``datetime.date``, days since epoch.""" -# EPOCH = datetime.date(1970, 1, 1) -# -# @classmethod -# def urn(cls): -# return common_urns.date.urn -# -# @classmethod -# def representation_type(cls): -# # type: () -> type -# return np.int64 -# -# @classmethod -# def language_type(cls): -# return datetime.date -# -# def to_representation_type(self, value): -# # type: (datetime.date) -> np.int64 -# return (value - self.EPOCH).days -# -# def to_language_type(self, value): -# # type: (np.int64) -> datetime.date -# return self.EPOCH + datetime.timedelta(days=value) +@LogicalType._register_internal +class Date(NoArgumentLogicalType[datetime.date, np.int64]): + """Date logical type that handles ``datetime.date``, days since epoch.""" + EPOCH = datetime.date(1970, 1, 1) + + @classmethod + def urn(cls): + return common_urns.date.urn + + @classmethod + def representation_type(cls): + # type: () -> type + return np.int64 + + @classmethod + def language_type(cls): + return datetime.date + + def to_representation_type(self, value): + # type: (datetime.date) -> np.int64 + return (value - self.EPOCH).days + + def to_language_type(self, value): + # type: (np.int64) -> datetime.date + return self.EPOCH + datetime.timedelta(days=value) @LogicalType._register_internal