Skip to content

Commit c3de69f

Browse files
committed
[Mustang] Add query routing and execution handoff for Parquet-backed indices (opensearch-project#5247)
Implements the query routing and AnalyticsExecutionEngine for Project Mustang's unified query pipeline. PPL queries targeting parquet_ prefixed indices are routed through UnifiedQueryPlanner and executed via a stub QueryPlanExecutor, with results formatted through the existing JDBC response pipeline. New files: - QueryPlanExecutor: @FunctionalInterface contract for analytics engine - AnalyticsExecutionEngine: converts Iterable<Object[]> to QueryResponse with type mapping and query size limit enforcement - RestUnifiedQueryAction: orchestrates schema building, planning, execution on sql-worker thread pool, with client/server error classification and metrics - StubQueryPlanExecutor: canned data for parquet_logs and parquet_metrics tables for development and testing Modified files: - RestPPLQueryAction: routing branch for parquet_ indices - SQLPlugin: passes ClusterService and NodeClient to RestPPLQueryAction - plugin/build.gradle: adds :api dependency for UnifiedQueryPlanner Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 513e1b2 commit c3de69f

10 files changed

Lines changed: 1072 additions & 2 deletions

File tree

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.executor.analytics;
7+
8+
import java.util.ArrayList;
9+
import java.util.LinkedHashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
import org.apache.calcite.rel.RelNode;
13+
import org.apache.calcite.rel.type.RelDataType;
14+
import org.apache.calcite.rel.type.RelDataTypeField;
15+
import org.opensearch.sql.calcite.CalcitePlanContext;
16+
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
17+
import org.opensearch.sql.common.response.ResponseListener;
18+
import org.opensearch.sql.data.model.ExprTupleValue;
19+
import org.opensearch.sql.data.model.ExprValue;
20+
import org.opensearch.sql.data.model.ExprValueUtils;
21+
import org.opensearch.sql.data.type.ExprType;
22+
import org.opensearch.sql.executor.ExecutionContext;
23+
import org.opensearch.sql.executor.ExecutionEngine;
24+
import org.opensearch.sql.executor.pagination.Cursor;
25+
import org.opensearch.sql.planner.physical.PhysicalPlan;
26+
27+
/**
28+
* Execution engine adapter for the analytics engine (Project Mustang).
29+
*
30+
* <p>Bridges the analytics engine's {@link QueryPlanExecutor} with the SQL plugin's {@link
31+
* ExecutionEngine} response pipeline. Takes a Calcite {@link RelNode}, delegates execution to the
32+
* analytics engine, and converts the raw results into {@link QueryResponse}.
33+
*/
34+
public class AnalyticsExecutionEngine implements ExecutionEngine {
35+
36+
private final QueryPlanExecutor planExecutor;
37+
38+
public AnalyticsExecutionEngine(QueryPlanExecutor planExecutor) {
39+
this.planExecutor = planExecutor;
40+
}
41+
42+
/** Not supported. Analytics queries use the RelNode path exclusively. */
43+
@Override
44+
public void execute(PhysicalPlan plan, ResponseListener<QueryResponse> listener) {
45+
listener.onFailure(
46+
new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
47+
}
48+
49+
/** Not supported. Analytics queries use the RelNode path exclusively. */
50+
@Override
51+
public void execute(
52+
PhysicalPlan plan, ExecutionContext context, ResponseListener<QueryResponse> listener) {
53+
listener.onFailure(
54+
new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
55+
}
56+
57+
/** Not supported. Analytics queries use the RelNode path exclusively. */
58+
@Override
59+
public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listener) {
60+
listener.onFailure(
61+
new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
62+
}
63+
64+
@Override
65+
public void execute(
66+
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
67+
try {
68+
Integer querySizeLimit = context.sysLimit.querySizeLimit();
69+
Iterable<Object[]> rows = planExecutor.execute(plan, null);
70+
71+
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
72+
List<ExprValue> results = convertRows(rows, fields, querySizeLimit);
73+
Schema schema = buildSchema(fields);
74+
75+
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
76+
} catch (Exception e) {
77+
listener.onFailure(e);
78+
}
79+
}
80+
81+
private List<ExprValue> convertRows(
82+
Iterable<Object[]> rows, List<RelDataTypeField> fields, Integer querySizeLimit) {
83+
List<ExprValue> results = new ArrayList<>();
84+
for (Object[] row : rows) {
85+
if (querySizeLimit != null && results.size() >= querySizeLimit) {
86+
break;
87+
}
88+
Map<String, ExprValue> valueMap = new LinkedHashMap<>();
89+
for (int i = 0; i < fields.size(); i++) {
90+
String columnName = fields.get(i).getName();
91+
Object value = (i < row.length) ? row[i] : null;
92+
valueMap.put(columnName, ExprValueUtils.fromObjectValue(value));
93+
}
94+
results.add(ExprTupleValue.fromExprValueMap(valueMap));
95+
}
96+
return results;
97+
}
98+
99+
private Schema buildSchema(List<RelDataTypeField> fields) {
100+
List<Schema.Column> columns = new ArrayList<>();
101+
for (RelDataTypeField field : fields) {
102+
ExprType exprType = convertType(field.getType());
103+
columns.add(new Schema.Column(field.getName(), null, exprType));
104+
}
105+
return new Schema(columns);
106+
}
107+
108+
private ExprType convertType(RelDataType type) {
109+
try {
110+
return OpenSearchTypeFactory.convertRelDataTypeToExprType(type);
111+
} catch (IllegalArgumentException e) {
112+
return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN;
113+
}
114+
}
115+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.executor.analytics;
7+
8+
import org.apache.calcite.rel.RelNode;
9+
10+
/**
11+
* Executes a Calcite {@link RelNode} logical plan against the analytics engine.
12+
*
13+
* <p>This is a local equivalent of {@code org.opensearch.analytics.exec.QueryPlanExecutor} from the
14+
* analytics-framework library. It will be replaced by the upstream interface once the
15+
* analytics-framework JAR is published.
16+
*
17+
* @see <a
18+
* href="https://github.com/opensearch-project/OpenSearch/blob/9142d0e789c6a6c4708f1bc015745ed55202eefe/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/exec/QueryPlanExecutor.java">Upstream
19+
* QueryPlanExecutor</a>
20+
*/
21+
@FunctionalInterface
22+
public interface QueryPlanExecutor {
23+
24+
/**
25+
* Executes the given logical plan and returns result rows.
26+
*
27+
* @param plan the Calcite RelNode subtree to execute
28+
* @param context execution context (opaque to avoid server dependency)
29+
* @return rows produced by the engine
30+
*/
31+
Iterable<Object[]> execute(RelNode plan, Object context);
32+
}

0 commit comments

Comments
 (0)