Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 31 additions & 26 deletions driver/src/main/java/oracle/nosql/driver/ops/PreparedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package oracle.nosql.driver.ops;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -35,11 +36,14 @@ public class PreparedStatement {
private final String querySchema;

/*
* The serialized PreparedStatement created at the backend store. It is
* opaque for the driver. It is received from the proxy and sent back to
* the proxy every time a new batch of results is needed.
* The serialized PreparedStatements created at the backend store. There is
* one of them for each union branch (or a single one if the query has no
* UNION). They are opaque for the driver. They are received from the proxy
* and one of them (the one corresponding to the current UNION branch that
* is being executed) is sent back to the proxy every time a new batch of
* results is needed.
*/
private final byte[] proxyStatement;
private final ArrayList<byte[]> proxyStatements;

/*
* The part of the query plan that must be executed at the driver. It is
Expand Down Expand Up @@ -79,15 +83,17 @@ public class PreparedStatement {


/*
* The namespace returned from a prepared query result, if any.
* The namespaces returned from a prepared query result. One for each
* UNION branch.
*/
private final String namespace;
private final ArrayList<String> namespaces;


/*
* The table name returned from a prepared query result, if any.
* The top-table names returned from a prepared query result. One for
* each UNION branch.
*/
private final String tableName;
private final ArrayList<String> topTableNames;

/*
* the operation code for the query.
Expand Down Expand Up @@ -128,32 +134,31 @@ public PreparedStatement(
String sqlText,
String queryPlan,
String querySchema,
byte[] proxyStatement,
ArrayList<byte[]> proxyStatements,
PlanIter driverPlan,
int numIterators,
int numRegisters,
Map<String, Integer> externalVars,
String namespace,
String tableName,
ArrayList<String> namespaces,
ArrayList<String> tableNames,
byte operation,
int maxParallelism) {

/* 10 is arbitrary. TODO: put magic number in it for validation? */
if (proxyStatement == null || proxyStatement.length < 10) {
if (proxyStatements.isEmpty()) {
throw new IllegalArgumentException(
"Invalid prepared query, cannot be null");
"Invalid prepared query: no proxy-side query");
}

this.sqlText = sqlText;
this.queryPlan = queryPlan;
this.querySchema = querySchema;
this.proxyStatement = proxyStatement;
this.proxyStatements = proxyStatements;
this.driverQueryPlan = driverPlan;
this.numIterators = numIterators;
this.numRegisters = numRegisters;
this.variables = externalVars;
this.namespace = namespace;
this.tableName = tableName;
this.namespaces = namespaces;
this.topTableNames = tableNames;
this.operation = operation;
this.maxParallelism = maxParallelism;
}
Expand All @@ -170,13 +175,13 @@ public PreparedStatement copyStatement() {
return new PreparedStatement(sqlText,
queryPlan,
querySchema,
proxyStatement,
proxyStatements,
driverQueryPlan,
numIterators,
numRegisters,
variables,
namespace,
tableName,
namespaces,
topTableNames,
operation,
maxParallelism);
}
Expand Down Expand Up @@ -313,8 +318,8 @@ public PreparedStatement setVariable(int pos, FieldValue value) {
* @return the serialized query
* @hidden
*/
public final byte[] getStatement() {
return proxyStatement;
public final byte[] getProxyStatement(int branch) {
return proxyStatements.get(branch);
}

/**
Expand Down Expand Up @@ -391,17 +396,17 @@ public int numIterators() {
* @return namespace from prepared statement, if any
* @hidden
*/
public String getNamespace() {
return namespace;
public String getNamespace(int branch) {
return namespaces.get(branch);
}

/**
* Internal use only
* @return table name from prepared statement, if any
* @hidden
*/
public String getTableName() {
return tableName;
public String getTopTableName(int branch) {
return topTableNames.get(branch);
}

/**
Expand Down
39 changes: 28 additions & 11 deletions driver/src/main/java/oracle/nosql/driver/ops/QueryRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,34 @@ public QueryRequest setNamespace(String namespace) {
return this;
}

@Override
public String getNamespace() {
if (namespace != null) {
return namespace;
}
if (preparedStatement == null) {
return null;
}
if (driver == null) {
return preparedStatement.getNamespace(0);
}
return preparedStatement.getNamespace(driver.getUnionBranch());
}

/**
* @hidden
*/
@Override
public String getTableName() {
if (preparedStatement == null) {
return null;
}
if (driver == null) {
return preparedStatement.getTopTableName(0);
}
return preparedStatement.getTopTableName(driver.getUnionBranch());
}

/**
* Returns the timeout to use for the operation, in milliseconds. A value
* of 0 indicates that the timeout has not been set.
Expand Down Expand Up @@ -1156,17 +1184,6 @@ public void validate() {
}
}

/**
* @hidden
*/
@Override
public String getTableName() {
if (preparedStatement == null) {
return null;
}
return preparedStatement.getTableName();
}

/**
* @hidden
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static oracle.nosql.driver.http.Client.trace;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -118,12 +119,15 @@ static PreparedStatement deserializeInternal(
*/
int savedOffset = in.getOffset();
in.skip(37); // 4 + 32 + 1
String namespace = readString(in);
String tableName = readString(in);
ArrayList<String> namespaces = new ArrayList<>();
namespaces.add(readString(in));
ArrayList<String> tableNames = new ArrayList<>();
tableNames.add(readString(in));
byte operation = in.readByte();
in.setOffset(savedOffset);

byte[] proxyStatement = readByteArrayWithInt(in);
ArrayList<byte[]> proxyStatements = new ArrayList<>();
proxyStatements.add(readByteArrayWithInt(in));

int numIterators = 0;
int numRegisters = 0;
Expand Down Expand Up @@ -160,13 +164,13 @@ static PreparedStatement deserializeInternal(
new PreparedStatement(sqlText,
queryPlan,
null, // query schema
proxyStatement,
proxyStatements,
driverPlan,
numIterators,
numRegisters,
externalVars,
namespace,
tableName,
namespaces,
tableNames,
operation,
0); /* no parallelism available */

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void serialize(Request request,
if (queryRq.isPrepared()) {

PreparedStatement ps = queryRq.getPreparedStatement();
writeByteArrayWithInt(out, ps.getStatement());
writeByteArrayWithInt(out, ps.getProxyStatement(0));

if (ps.getVariables() != null) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class NsonProtocol {
public static String PREPARED_QUERY = "pq";
public static String PREPARED_STATEMENT = "ps";
public static String QUERY = "q";
public static String QUERY_BRANCHES = "qb";
public static String QUERY_NAME = "qn";
public static String QUERY_OPERATION_NUM = "on";
public static String QUERY_VERSION = "qv";
Expand Down Expand Up @@ -251,6 +252,7 @@ public class NsonProtocol {
{PREPARED_QUERY,"PREPARED_QUERY"},
{PREPARED_STATEMENT,"PREPARED_STATEMENT"},
{QUERY,"QUERY"},
{QUERY_BRANCHES,"QUERY_BRANCHES"},
{QUERY_NAME,"QUERY_NAME"},
{QUERY_OPERATION_NUM,"QUERY_OPERATION_NUM"},
{QUERY_VERSION,"QUERY_VERSION"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,10 +872,13 @@ public void serialize(Request request,
writeMapField(ns, QUERY_VERSION, (int)queryVersion);
boolean isPrepared = rq.isPrepared();
if (isPrepared) {
QueryDriver driver = rq.getDriver();
int unionBranch = (driver != null ? driver.getUnionBranch() : 0);
byte[] proxyPlan = rq.getPreparedStatement().
getProxyStatement(unionBranch);
writeMapField(ns, IS_PREPARED, isPrepared);
writeMapField(ns, IS_SIMPLE_QUERY, rq.isSimpleQuery());
writeMapField(ns, PREPARED_QUERY,
rq.getPreparedStatement().getStatement());
writeMapField(ns, PREPARED_QUERY, proxyPlan);
/*
* validation of parallel ops is handled in
* QueryRequest.validate
Expand Down Expand Up @@ -1008,13 +1011,12 @@ private static void deserializePrepareOrQuery(
}
boolean isPreparedRequest = (prep != null);

byte[] proxyPreparedQuery = null;
ArrayList<String> namespaces = new ArrayList<>();
ArrayList<String> tableNames = new ArrayList<>();
ArrayList<byte[]> proxyPreparedQueries = new ArrayList<>();

DriverPlanInfo dpi = null;

String queryPlan = null;
String tableName = null;
String namespace = null;
String querySchema = null;
byte operation = 0;
int proxyTopoSeqNum = -1; /* QUERY_V3 and earlier */
Expand Down Expand Up @@ -1046,20 +1048,44 @@ private static void deserializePrepareOrQuery(
readPhase1Results(arr, qres);

} else if (name.equals(PREPARED_QUERY)) {
proxyPreparedQuery = Nson.readNsonBinary(in);
proxyPreparedQueries.add(Nson.readNsonBinary(in));

} else if (name.equals(QUERY_BRANCHES)) {
readType(in, Nson.TYPE_ARRAY);
in.readInt(); /* length of array in bytes */
int numBranches = in.readInt(); /* number of array elements */

for (int i = 0; i < numBranches; ++i) {
MapWalker walker2 = getMapWalker(in);
while (walker2.hasNext()) {
walker2.next();
String name2 = walker2.getCurrentName();
if (name2.equals(NAMESPACE)) {
namespaces.add(Nson.readNsonString(in));
} else if (name2.equals(TABLE_NAME)) {
tableNames.add(Nson.readNsonString(in));
} else if (name2.equals(PREPARED_QUERY)) {
proxyPreparedQueries.add(Nson.readNsonBinary(in));
} else {
throw new IOException(
"Unexpected field in query branch: " +
name2);
}
}
}

} else if (name.equals(DRIVER_QUERY_PLAN)) {
dpi = getDriverPlanInfo(Nson.readNsonBinary(in),
serialVersion);
queryVersion);

} else if (name.equals(REACHED_LIMIT) && qres != null) {
qres.setReachedLimit(Nson.readNsonBoolean(in));

} else if (name.equals(TABLE_NAME)) {
tableName = Nson.readNsonString(in);
tableNames.add(Nson.readNsonString(in));

} else if (name.equals(NAMESPACE)) {
namespace = Nson.readNsonString(in);
namespaces.add(Nson.readNsonString(in));

} else if (name.equals(QUERY_PLAN_STRING)) {
queryPlan = Nson.readNsonString(in);
Expand Down Expand Up @@ -1136,13 +1162,13 @@ private static void deserializePrepareOrQuery(
prep = new PreparedStatement(statement,
queryPlan,
querySchema,
proxyPreparedQuery,
proxyPreparedQueries,
(dpi!=null)?dpi.driverQueryPlan:null,
(dpi!=null)?dpi.numIterators:0,
(dpi!=null)?dpi.numRegisters:0,
(dpi!=null)?dpi.externalVars:null,
namespace,
tableName,
namespaces,
tableNames,
operation,
maxParallelism);
if (pres != null) {
Expand Down Expand Up @@ -1241,15 +1267,15 @@ private static void readPhase1Results(byte[] arr, QueryResult result)
}

private static DriverPlanInfo getDriverPlanInfo(byte[] arr,
short serialVersion)
short queryVersion)
throws IOException {
if (arr == null || arr.length == 0) {
return null;
}
ByteBuf buf = Unpooled.wrappedBuffer(arr);
ByteInputStream bis = new NettyByteInputStream(buf);
DriverPlanInfo dpi = new DriverPlanInfo();
dpi.driverQueryPlan = PlanIter.deserializeIter(bis, serialVersion);
dpi.driverQueryPlan = PlanIter.deserializeIter(bis, queryVersion);
if (dpi.driverQueryPlan == null) {
return null;
}
Expand Down
Loading