diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index 83346d34aee0..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 16 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json index b60f5c4cc3c8..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index b26833333238..c537844dc84a 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto index 5250aaaa1a7a..58d16b83749d 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto @@ -174,6 +174,13 @@ message LogicalTypes { // A variable-length string with its maximum length as the argument. VAR_CHAR = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:logical_type:var_char:v1"]; + + // A URN for Date type + // - Representation type: INT64 + // - A date without a timezone, represented by the number of days + // since the epoch. + DATE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:logical_type:date:v1"]; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index 8ad5bb5ff97f..d527b2949235 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.logicaltypes.Date; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; import org.apache.beam.sdk.schemas.logicaltypes.FixedString; @@ -113,6 +114,7 @@ private static String getLogicalTypeUrn(String identifier) { .put(VariableBytes.IDENTIFIER, VariableBytes.class) .put(FixedString.IDENTIFIER, FixedString.class) .put(VariableString.IDENTIFIER, VariableString.class) + .put(Date.IDENTIFIER, Date.class) .build(); public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java index 894b585fe660..09cc040b609c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java @@ -18,7 +18,10 @@ package org.apache.beam.sdk.schemas.logicaltypes; import java.time.LocalDate; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.schemas.Schema; +import org.checkerframework.checker.nullness.qual.Nullable; /** * A date without a time-zone. @@ -30,23 +33,20 @@ * incrementing count of days where day 0 is 1970-01-01 (ISO). */ public class Date implements Schema.LogicalType { - public static final String IDENTIFIER = "beam:logical_type:date:v1"; + public static final String IDENTIFIER = + SchemaApi.LogicalTypes.Enum.DATE + .getValueDescriptor() + .getOptions() + .getExtension(RunnerApi.beamUrn); @Override public String getIdentifier() { return IDENTIFIER; } - // unused @Override - public Schema.FieldType getArgumentType() { - return Schema.FieldType.STRING; - } - - // unused - @Override - public String getArgument() { - return ""; + public Schema.@Nullable FieldType getArgumentType() { + return null; } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java index fb9a8308fcff..8a382c4ad1b8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java @@ -25,6 +25,7 @@ import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.HashMap; @@ -142,6 +143,7 @@ public static Iterable data() { Field.of("decimal", FieldType.DECIMAL), Field.of("datetime", FieldType.DATETIME))) .add(Schema.of(Field.of("fixed_bytes", FieldType.logicalType(FixedBytes.of(24))))) .add(Schema.of(Field.of("micros_instant", FieldType.logicalType(new MicrosInstant())))) + .add(Schema.of(Field.of("date", FieldType.logicalType(SqlTypes.DATE)))) .add(Schema.of(Field.of("python_callable", FieldType.logicalType(new PythonCallable())))) .add( Schema.of( @@ -388,6 +390,7 @@ public static Iterable data() { .add(simpleRow(FieldType.DECIMAL, BigDecimal.valueOf(100000))) .add(simpleRow(FieldType.logicalType(new PortableNullArgLogicalType()), "str")) .add(simpleRow(FieldType.logicalType(new DateTime()), LocalDateTime.of(2000, 1, 3, 3, 1))) + .add(simpleRow(FieldType.logicalType(SqlTypes.DATE), LocalDate.of(2000, 1, 3))) .add(simpleNullRow(FieldType.STRING)) .add(simpleNullRow(FieldType.INT32)) .add(simpleNullRow(FieldType.map(FieldType.STRING, FieldType.INT32))) diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 74d9a39bb052..7777f63ffbe9 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -92,3 +92,4 @@ var_bytes = LogicalTypes.Enum.VAR_BYTES fixed_char = LogicalTypes.Enum.FIXED_CHAR var_char = LogicalTypes.Enum.VAR_CHAR +date = LogicalTypes.Enum.DATE diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index b1e53a79bd41..495fa28c5984 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -14,8 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -import os +import datetime import unittest import uuid @@ -28,10 +27,10 @@ @pytest.mark.uses_io_java_expansion_service -@unittest.skipUnless( - os.environ.get('EXPANSION_JARS'), - "EXPANSION_JARS environment var is not provided, " - "indicating that jars have not been built") +# @unittest.skipUnless( +# os.environ.get('EXPANSION_JARS'), +# "EXPANSION_JARS environment var is not provided, " +# "indicating that jars have not been built") class ManagedIcebergIT(unittest.TestCase): WAREHOUSE = "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java" @@ -49,7 +48,8 @@ def _create_row(self, num: int): bytes_=bytes(num), bool_=(num % 2 == 0), float_=(num + float(num) / 100), - arr_=[num, num, num]) + arr_=[num, num, num], + date_=datetime.date.today() - datetime.timedelta(days=num)) def test_write_read_pipeline(self): iceberg_config = { @@ -58,6 +58,7 @@ def test_write_read_pipeline(self): "catalog_properties": { "type": "hadoop", "warehouse": self.WAREHOUSE, + "io-impl": "org.apache.iceberg.gcp.gcs.GCSFileIO" } } diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 5dd8ff290c48..281518de6d41 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -34,6 +34,7 @@ bytes <-----> BYTES ByteString ------> BYTES Timestamp <-----> LogicalType(urn="beam:logical_type:micros_instant:v1") + datetime.date <---> LogicalType(urn="beam:logical_type:date:v1") Decimal <-----> LogicalType(urn="beam:logical_type:fixed_decimal:v1") Mapping <-----> MapType Sequence <-----> ArrayType @@ -991,6 +992,33 @@ def to_language_type(self, value): return Timestamp(seconds=int(value.seconds), micros=int(value.micros)) +@LogicalType._register_internal +class Date(NoArgumentLogicalType[datetime.date, np.int64]): + """Date logical type that handles ``datetime.date``, days since epoch.""" + EPOCH = datetime.date(1970, 1, 1) + + @classmethod + def urn(cls): + return common_urns.date.urn + + @classmethod + def representation_type(cls): + # type: () -> type + return np.int64 + + @classmethod + def language_type(cls): + return datetime.date + + def to_representation_type(self, value): + # type: (datetime.date) -> np.int64 + return (value - self.EPOCH).days + + def to_language_type(self, value): + # type: (np.int64) -> datetime.date + return self.EPOCH + datetime.timedelta(days=value) + + @LogicalType._register_internal class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): """A logical type for PythonCallableSource objects.""" diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 5a5d7396ab30..d70bf0c47d33 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -20,6 +20,7 @@ # pytype: skip-file import dataclasses +import datetime import itertools import pickle import unittest @@ -105,6 +106,7 @@ class ComplexSchema(NamedTuple): optional_array: Optional[Sequence[np.float32]] array_optional: Sequence[Optional[bool]] timestamp: Timestamp + date: datetime.date def get_test_beam_fieldtype_protos():