Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions java/driver/jni/src/main/cpp/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,30 @@ Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionSetOptionString(
}
}

JNIEXPORT void JNICALL
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionCommit(
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
struct AdbcError error = ADBC_ERROR_INIT;
auto* conn = reinterpret_cast<struct AdbcConnection*>(static_cast<uintptr_t>(handle));
try {
CHECK_ADBC_ERROR(AdbcConnectionCommit(conn, &error), error);
} catch (const AdbcException& e) {
e.ThrowJavaException(env);
}
}

JNIEXPORT void JNICALL
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionRollback(
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
struct AdbcError error = ADBC_ERROR_INIT;
auto* conn = reinterpret_cast<struct AdbcConnection*>(static_cast<uintptr_t>(handle));
try {
CHECK_ADBC_ERROR(AdbcConnectionRollback(conn, &error), error);
} catch (const AdbcException& e) {
e.ThrowJavaException(env);
}
}

JNIEXPORT jbyteArray JNICALL
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_databaseGetOptionBytes(
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle, jstring key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.BulkIngestMode;
import org.apache.arrow.adbc.core.IsolationLevel;
import org.apache.arrow.adbc.core.TypedKey;
import org.apache.arrow.adbc.driver.jni.impl.JniLoader;
import org.apache.arrow.adbc.driver.jni.impl.NativeConnectionHandle;
Expand Down Expand Up @@ -117,6 +118,87 @@ public ArrowReader getTableTypes() throws AdbcException {
return JniLoader.INSTANCE.connectionGetTableTypes(handle).importStream(allocator);
}

@Override
public boolean getReadOnly() throws AdbcException {
return getOption(JniDriver.READONLY);
}

@Override
public void setReadOnly(boolean isReadOnly) throws AdbcException {
setOption(JniDriver.READONLY, isReadOnly);
}

@Override
public boolean getAutoCommit() throws AdbcException {
return getOption(JniDriver.AUTOCOMMIT);
}

@Override
public void setAutoCommit(boolean enableAutoCommit) throws AdbcException {
setOption(JniDriver.AUTOCOMMIT, enableAutoCommit);
}

@Override
public IsolationLevel getIsolationLevel() throws AdbcException {
String level = getOption(JniDriver.ISOLATION_LEVEL);
if (level == null) {
return null;
}
switch (level) {
case JniDriver.ISOLATION_LEVEL_READ_UNCOMMITTED:
return IsolationLevel.READ_UNCOMMITTED;
case JniDriver.ISOLATION_LEVEL_READ_COMMITTED:
return IsolationLevel.READ_COMMITTED;
case JniDriver.ISOLATION_LEVEL_REPEATABLE_READ:
return IsolationLevel.REPEATABLE_READ;
case JniDriver.ISOLATION_LEVEL_SNAPSHOT:
return IsolationLevel.SNAPSHOT;
case JniDriver.ISOLATION_LEVEL_SERIALIZABLE:
return IsolationLevel.SERIALIZABLE;
default:
throw AdbcException.invalidArgument("[jni] invalid isolation level value: " + level);
}
}

@Override
public void setIsolationLevel(IsolationLevel level) throws AdbcException {
if (level == null) {
setOption(JniDriver.ISOLATION_LEVEL, (String) null);
} else {
String levelValue;
switch (level) {
case READ_UNCOMMITTED:
levelValue = JniDriver.ISOLATION_LEVEL_READ_UNCOMMITTED;
break;
case READ_COMMITTED:
levelValue = JniDriver.ISOLATION_LEVEL_READ_COMMITTED;
break;
case REPEATABLE_READ:
levelValue = JniDriver.ISOLATION_LEVEL_REPEATABLE_READ;
break;
case SNAPSHOT:
levelValue = JniDriver.ISOLATION_LEVEL_SNAPSHOT;
break;
case SERIALIZABLE:
levelValue = JniDriver.ISOLATION_LEVEL_SERIALIZABLE;
break;
default:
throw new IllegalArgumentException("Unknown isolation level: " + level);
}
setOption(JniDriver.ISOLATION_LEVEL, levelValue);
}
}

@Override
public void commit() throws AdbcException {
JniLoader.INSTANCE.connectionCommit(handle);
}

@Override
public void rollback() throws AdbcException {
JniLoader.INSTANCE.connectionRollback(handle);
}

@Override
public void close() {
handle.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ public class JniDriver implements AdbcDriver {
public static final TypedKey<String> PARAM_PROFILE_SEARCH_PATH =
new TypedKey<>("jni.additional_profile_search_path_list", String.class);

static final TypedKey<Boolean> AUTOCOMMIT =
new TypedKey<>("adbc.connection.autocommit", Boolean.class);
static final TypedKey<String> ISOLATION_LEVEL =
new TypedKey<>("adbc.connection.transaction.isolation_level", String.class);
static final TypedKey<Boolean> READONLY =
new TypedKey<>("adbc.connection.readonly", Boolean.class);
static final String ISOLATION_LEVEL_READ_UNCOMMITTED =
"adbc.connection.transaction.isolation.read_uncommitted";
static final String ISOLATION_LEVEL_READ_COMMITTED =
"adbc.connection.transaction.isolation.read_committed";
static final String ISOLATION_LEVEL_REPEATABLE_READ =
"adbc.connection.transaction.isolation.repeatable_read";
static final String ISOLATION_LEVEL_SNAPSHOT = "adbc.connection.transaction.isolation.snapshot";
static final String ISOLATION_LEVEL_SERIALIZABLE =
"adbc.connection.transaction.isolation.serializable";

private final BufferAllocator allocator;

public JniDriver(BufferAllocator allocator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,14 @@ public NativeQueryResult connectionGetTableTypes(NativeConnectionHandle connecti
return NativeAdbc.connectionGetTableTypes(connection.getConnectionHandle());
}

public void connectionCommit(NativeConnectionHandle connection) throws AdbcException {
NativeAdbc.connectionCommit(connection.getConnectionHandle());
}

public void connectionRollback(NativeConnectionHandle connection) throws AdbcException {
NativeAdbc.connectionRollback(connection.getConnectionHandle());
}

public byte[] connectionGetOptionBytes(NativeConnectionHandle handle, String key)
throws AdbcException {
return NativeAdbc.connectionGetOptionBytes(handle.getConnectionHandle(), key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ static native NativeSchemaResult connectionGetTableSchema(

static native NativeQueryResult connectionGetTableTypes(long handle) throws AdbcException;

static native void connectionCommit(long handle) throws AdbcException;

static native void connectionRollback(long handle) throws AdbcException;

static native byte[] connectionGetOptionBytes(long handle, String key) throws AdbcException;

static native double connectionGetOptionDouble(long handle, String key) throws AdbcException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,60 @@ void bulkIngest() throws Exception {
}
}

@Test
void commit() throws Exception {
try (final BufferAllocator allocator = new RootAllocator()) {
JniDriver driver = new JniDriver(allocator);
Map<String, Object> parameters = new HashMap<>();
JniDriver.PARAM_DRIVER.set(parameters, "adbc_driver_sqlite");
try (final AdbcDatabase db = driver.open(parameters);
final AdbcConnection conn = db.connect()) {
try (final AdbcStatement stmt = conn.createStatement()) {
stmt.setSqlQuery("DROP TABLE IF EXISTS foobar");
stmt.executeUpdate();
}

assertThat(conn.getAutoCommit()).isTrue();
// not supported by sqlite driver
// assertThat(conn.getIsolationLevel()).isEqualTo(IsolationLevel.SERIALIZABLE);
conn.setAutoCommit(false);
assertThat(conn.getAutoCommit()).isFalse();

try (final AdbcStatement stmt = conn.createStatement()) {
stmt.setSqlQuery("CREATE TABLE foobar (v)");
stmt.executeUpdate();

stmt.setSqlQuery("SELECT * FROM foobar");
try (AdbcStatement.QueryResult result = stmt.executeQuery()) {
result.getReader().loadNextBatch();
}
}

conn.rollback();

try (final AdbcStatement stmt = conn.createStatement()) {
stmt.setSqlQuery("SELECT * FROM foobar");
AdbcException e = assertThrows(AdbcException.class, stmt::executeQuery);
assertThat(e).hasMessageContaining("no such table: foobar");
}

try (final AdbcStatement stmt = conn.createStatement()) {
stmt.setSqlQuery("CREATE TABLE foobar (v)");
stmt.executeUpdate();
}

conn.commit();

try (final AdbcStatement stmt = conn.createStatement()) {
stmt.setSqlQuery("SELECT * FROM foobar");
try (AdbcStatement.QueryResult result = stmt.executeQuery()) {
result.getReader().loadNextBatch();
}
}
}
}
}

@Test
void getSetOption() throws Exception {
TypedKey<Integer> batchRowsInt = new TypedKey<>("adbc.sqlite.query.batch_rows", Integer.class);
Expand Down
Loading