From f39bf5b8f3f44a46e44396634411f3fe8ee7e059 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sylvain=20Veyri=C3=A9?= Date: Wed, 18 Mar 2026 11:10:51 +0100 Subject: [PATCH 1/2] Upgrade Snowflake driver to v4 --- sdks/java/io/snowflake/build.gradle | 8 ++-- .../beam/sdk/io/snowflake/SnowflakeIO.java | 43 ++++++++++--------- .../test/FakeSnowflakeBasicDataSource.java | 6 ++- .../test/FakeSnowflakeBatchServiceImpl.java | 2 +- .../snowflake/test/FakeSnowflakeDatabase.java | 2 +- .../unit/DataSourceConfigurationTest.java | 2 +- .../unit/write/SchemaDispositionTest.java | 2 +- .../test/unit/write/SnowflakeIOWriteTest.java | 2 +- .../test/unit/write/StreamingWriteTest.java | 2 +- 9 files changed, 36 insertions(+), 33 deletions(-) diff --git a/sdks/java/io/snowflake/build.gradle b/sdks/java/io/snowflake/build.gradle index 9be257033edb..8d9a9a46557f 100644 --- a/sdks/java/io/snowflake/build.gradle +++ b/sdks/java/io/snowflake/build.gradle @@ -30,16 +30,16 @@ dependencies { implementation project(path: ":sdks:java:extensions:google-cloud-platform-core") permitUnusedDeclared project(path: ":sdks:java:extensions:google-cloud-platform-core") implementation library.java.slf4j_api - implementation group: 'net.snowflake', name: 'snowflake-jdbc', version: '3.20.0' - implementation group: 'com.opencsv', name: 'opencsv', version: '5.0' - implementation 'net.snowflake:snowflake-ingest-sdk:0.9.9' + implementation group: 'net.snowflake', name: 'snowflake-jdbc', version: '4.0.2' + implementation group: 'com.opencsv', name: 'opencsv', version: '5.12.0' + implementation 'net.snowflake:snowflake-ingest-sdk:4.4.2' implementation "org.bouncycastle:bcprov-jdk15on:1.70" implementation library.java.joda_time testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration") testImplementation project(path: ":sdks:java:io:common") testImplementation project(path: ":sdks:java:testing:test-utils") - testImplementation 'com.google.cloud:google-cloud-storage:1.102.0' + testImplementation 'com.google.cloud:google-cloud-storage:1.118.1' testImplementation library.java.avro testImplementation library.java.junit testImplementation library.java.slf4j_api diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java index 5350e0e2c1fe..4ebec3944423 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java @@ -36,7 +36,8 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.sql.DataSource; -import net.snowflake.client.jdbc.SnowflakeBasicDataSource; +import net.snowflake.client.api.datasource.SnowflakeDataSource; +import net.snowflake.client.api.datasource.SnowflakeDataSourceFactory; import net.snowflake.ingest.SimpleIngestManager; import net.snowflake.ingest.connection.HistoryResponse; import org.apache.beam.sdk.coders.Coder; @@ -1801,7 +1802,7 @@ public DataSourceConfiguration withAuthenticator(String authenticator) { } /** - * Sets loginTimeout that will be used in {@link SnowflakeBasicDataSource#setLoginTimeout}. + * Sets loginTimeout that will be used in {@link SnowflakeDataSource#setLoginTimeout}. * * @param loginTimeout Integer with timeout value. */ @@ -1818,59 +1819,59 @@ void populateDisplayData(DisplayData.Builder builder) { } } - /** Builds {@link SnowflakeBasicDataSource} based on the current configuration. */ + /** Builds {@link SnowflakeDataSource} based on the current configuration. */ public DataSource buildDatasource() { if (getDataSource() == null) { - SnowflakeBasicDataSource basicDataSource = new SnowflakeBasicDataSource(); - basicDataSource.setUrl(buildUrl()); + SnowflakeDataSource dataSource = SnowflakeDataSourceFactory.createDataSource(); + dataSource.setUrl(buildUrl()); if (isNotEmpty(getOauthToken())) { - basicDataSource.setOauthToken(getOauthToken().get()); + dataSource.setToken(getOauthToken().get()); } else if (isNotEmpty(getUsername()) && getPrivateKey() != null) { - basicDataSource.setUser(getUsername().get()); - basicDataSource.setPrivateKey(getPrivateKey()); + dataSource.setUser(getUsername().get()); + dataSource.setPrivateKey(getPrivateKey()); } else if (isNotEmpty(getUsername()) && isNotEmpty(getRawPrivateKey())) { PrivateKey privateKey = KeyPairUtils.preparePrivateKey( getRawPrivateKey().get(), getValueOrNull(getPrivateKeyPassphrase())); - basicDataSource.setPrivateKey(privateKey); - basicDataSource.setUser(getUsername().get()); + dataSource.setPrivateKey(privateKey); + dataSource.setUser(getUsername().get()); } else if (isNotEmpty(getUsername()) && isNotEmpty(getPassword())) { - basicDataSource.setUser(getUsername().get()); - basicDataSource.setPassword(getPassword().get()); + dataSource.setUser(getUsername().get()); + dataSource.setPassword(getPassword().get()); } else { throw new RuntimeException("Missing credentials values. Please check your credentials"); } if (isNotEmpty(getDatabase())) { - basicDataSource.setDatabaseName(getDatabase().get()); + dataSource.setDatabaseName(getDatabase().get()); } if (isNotEmpty(getWarehouse())) { - basicDataSource.setWarehouse(getWarehouse().get()); + dataSource.setWarehouse(getWarehouse().get()); } if (isNotEmpty(getSchema())) { - basicDataSource.setSchema(getSchema().get()); + dataSource.setSchema(getSchema().get()); } if (isNotEmpty(getServerName())) { - basicDataSource.setServerName(getServerName().get()); + dataSource.setServerName(getServerName().get()); } if (getPortNumber() != null) { - basicDataSource.setPortNumber(getPortNumber()); + dataSource.setPortNumber(getPortNumber()); } if (isNotEmpty(getRole())) { - basicDataSource.setRole(getRole().get()); + dataSource.setRole(getRole().get()); } if (getAuthenticator() != null) { - basicDataSource.setAuthenticator(getAuthenticator()); + dataSource.setAuthenticator(getAuthenticator()); } if (getLoginTimeout() != null) { try { - basicDataSource.setLoginTimeout(getLoginTimeout()); + dataSource.setLoginTimeout(getLoginTimeout()); } catch (SQLException e) { throw new RuntimeException("Failed to setLoginTimeout"); } } - return basicDataSource; + return dataSource; } return getDataSource(); } diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBasicDataSource.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBasicDataSource.java index 5fc694fb9d0c..4cd2e223cd44 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBasicDataSource.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBasicDataSource.java @@ -36,10 +36,12 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; -import net.snowflake.client.jdbc.SnowflakeBasicDataSource; +import net.snowflake.client.internal.api.implementation.datasource.SnowflakeBasicDataSource; /** - * Fake implementation of {@link net.snowflake.client.jdbc.SnowflakeBasicDataSource} used in tests. + * Fake implementation of {@link + * net.snowflake.client.internal.api.implementation.datasource.SnowflakeBasicDataSource} used in + * tests. */ public class FakeSnowflakeBasicDataSource extends SnowflakeBasicDataSource implements Serializable { @Override diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBatchServiceImpl.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBatchServiceImpl.java index d4c4523845bc..79a3900f3a2c 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBatchServiceImpl.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBatchServiceImpl.java @@ -24,7 +24,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import net.snowflake.client.jdbc.SnowflakeSQLException; +import net.snowflake.client.api.exception.SnowflakeSQLException; import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema; import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition; import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition; diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java index 338a316b1b9b..e2ff2fbf4db5 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java @@ -23,7 +23,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import net.snowflake.client.jdbc.SnowflakeSQLException; +import net.snowflake.client.api.exception.SnowflakeSQLException; /** Fake implementation of Snowflake warehouse used in test code. */ public class FakeSnowflakeDatabase implements Serializable { diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java index 37d04ed5926a..ba22dc51eb44 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java @@ -27,7 +27,7 @@ import java.util.Arrays; import java.util.stream.Collectors; import javax.sql.DataSource; -import net.snowflake.client.jdbc.SnowflakeBasicDataSource; +import net.snowflake.client.internal.api.implementation.datasource.SnowflakeBasicDataSource; import org.apache.beam.sdk.io.snowflake.SnowflakeIO; import org.apache.beam.sdk.io.snowflake.test.TestUtils; import org.junit.Before; diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SchemaDispositionTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SchemaDispositionTest.java index 4073ac8b59e2..c1758b6f0e5c 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SchemaDispositionTest.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SchemaDispositionTest.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.LongStream; -import net.snowflake.client.jdbc.SnowflakeSQLException; +import net.snowflake.client.api.exception.SnowflakeSQLException; import org.apache.beam.sdk.io.snowflake.SnowflakeIO; import org.apache.beam.sdk.io.snowflake.data.SnowflakeColumn; import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema; diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SnowflakeIOWriteTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SnowflakeIOWriteTest.java index 496037388875..d042ec00be38 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SnowflakeIOWriteTest.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SnowflakeIOWriteTest.java @@ -24,7 +24,7 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.LongStream; -import net.snowflake.client.jdbc.SnowflakeSQLException; +import net.snowflake.client.api.exception.SnowflakeSQLException; import org.apache.beam.sdk.io.snowflake.SnowflakeIO; import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices; import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource; diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java index 780aa26356a7..06dd5fe40e53 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java @@ -30,7 +30,7 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.LongStream; -import net.snowflake.client.jdbc.SnowflakeSQLException; +import net.snowflake.client.api.exception.SnowflakeSQLException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.snowflake.SnowflakeIO; import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices; From a6bf3f8ebeb0a3e6bbf57ff008e7e70c6ca1d767 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sylvain=20Veyri=C3=A9?= Date: Wed, 18 Mar 2026 11:12:37 +0100 Subject: [PATCH 2/2] SnowflakeIO: add account parameter --- .../org/apache/beam/sdk/io/snowflake/SnowflakeIO.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java index 4ebec3944423..dffa65db4ba5 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java @@ -1405,6 +1405,9 @@ public abstract static class DataSourceConfiguration implements Serializable { @Nullable public abstract ValueProvider getPassword(); + @Nullable + public abstract ValueProvider getAccount(); + @Nullable public abstract PrivateKey getPrivateKey(); @@ -1457,6 +1460,8 @@ abstract static class Builder { abstract Builder setPassword(ValueProvider password); + abstract Builder setAccount(ValueProvider account); + abstract Builder setPrivateKey(PrivateKey privateKey); abstract Builder setRawPrivateKey(ValueProvider rawPrivateKey); @@ -1843,6 +1848,9 @@ public DataSource buildDatasource() { throw new RuntimeException("Missing credentials values. Please check your credentials"); } + if (isNotEmpty(getAccount())) { + dataSource.setAccount(getAccount().get()); + } if (isNotEmpty(getDatabase())) { dataSource.setDatabaseName(getDatabase().get()); }