Skip to content

Commit 71da5d2

Browse files
committed
[Mustang] Add explain support and integration tests for analytics path (opensearch-project#5247)
Add explain support for the analytics engine path: - AnalyticsExecutionEngine.explain(RelNode, ExplainMode, ...): returns logical plan via RelOptUtil.toString() with mode-aware SqlExplainLevel (SIMPLE/STANDARD/COST). Physical and extended plans are null since the analytics engine is not yet available. - RestUnifiedQueryAction.explain(): new entry point for explain requests, delegates to AnalyticsExecutionEngine.explain() with ExplainMode.STANDARD. Response formatted via JsonResponseFormatter with normalizeLf(). - TransportPPLQueryAction: routes explain requests for analytics indices to unifiedQueryHandler.explain() instead of unifiedQueryHandler.execute(). Integration tests (AnalyticsPPLIT): - testExplainResponseStructure: verifies JSON shape (calcite.logical, calcite.physical=null, calcite.extended=null) - testExplainProjectAndScan: LogicalProject + LogicalTableScan + table name - testExplainFilterPlan: LogicalFilter with condition value - testExplainAggregationPlan: LogicalAggregate with COUNT() - testExplainSortPlan: LogicalSort Unit tests (AnalyticsExecutionEngineTest): - explainRelNode_returnsLogicalPlan - explainRelNode_errorPropagation Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 23227fd commit 71da5d2

5 files changed

Lines changed: 228 additions & 7 deletions

File tree

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@
99
import java.util.LinkedHashMap;
1010
import java.util.List;
1111
import java.util.Map;
12+
import org.apache.calcite.plan.RelOptUtil;
1213
import org.apache.calcite.rel.RelNode;
1314
import org.apache.calcite.rel.type.RelDataType;
1415
import org.apache.calcite.rel.type.RelDataTypeField;
16+
import org.apache.calcite.sql.SqlExplainLevel;
17+
import org.opensearch.sql.ast.statement.ExplainMode;
1518
import org.opensearch.sql.calcite.CalcitePlanContext;
1619
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
1720
import org.opensearch.sql.common.response.ResponseListener;
@@ -77,6 +80,26 @@ public void execute(
7780
}
7881
}
7982

83+
@Override
84+
public void explain(
85+
RelNode plan,
86+
ExplainMode mode,
87+
CalcitePlanContext context,
88+
ResponseListener<ExplainResponse> listener) {
89+
try {
90+
SqlExplainLevel level =
91+
mode == ExplainMode.SIMPLE
92+
? SqlExplainLevel.NO_ATTRIBUTES
93+
: mode == ExplainMode.COST
94+
? SqlExplainLevel.ALL_ATTRIBUTES
95+
: SqlExplainLevel.EXPPLAN_ATTRIBUTES;
96+
String logical = RelOptUtil.toString(plan, level);
97+
listener.onResponse(new ExplainResponse(new ExplainResponseNodeV2(logical, null, null)));
98+
} catch (Exception e) {
99+
listener.onFailure(e);
100+
}
101+
}
102+
80103
private List<ExprValue> convertRows(Iterable<Object[]> rows, List<RelDataTypeField> fields) {
81104
List<ExprValue> results = new ArrayList<>();
82105
for (Object[] row : rows) {

core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,75 @@ void physicalPlanExplain_callsOnFailure() {
245245
+ errorRef.get().getMessage());
246246
}
247247

248+
// --- explain tests ---
249+
250+
@Test
251+
void explainRelNode_returnsLogicalPlan() {
252+
RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER);
253+
254+
AtomicReference<ExplainResponse> ref = new AtomicReference<>();
255+
AtomicReference<Exception> errorRef = new AtomicReference<>();
256+
engine.explain(
257+
relNode,
258+
org.opensearch.sql.ast.statement.ExplainMode.STANDARD,
259+
mockContext,
260+
new ResponseListener<ExplainResponse>() {
261+
@Override
262+
public void onResponse(ExplainResponse response) {
263+
ref.set(response);
264+
}
265+
266+
@Override
267+
public void onFailure(Exception e) {
268+
errorRef.set(e);
269+
}
270+
});
271+
272+
// RelOptUtil.toString on a mock may throw or succeed depending on mock setup.
273+
// Accept either a successful response or a caught failure.
274+
assertTrue(
275+
ref.get() != null || errorRef.get() != null,
276+
"Either onResponse or onFailure should have been called");
277+
if (ref.get() != null) {
278+
assertNotNull(ref.get().getCalcite(), "Explain should have calcite field");
279+
assertNotNull(ref.get().getCalcite().getLogical(), "Explain should have logical plan");
280+
System.out.println(
281+
"\n--- explainRelNode_returnsLogicalPlan ---\nLogical: "
282+
+ ref.get().getCalcite().getLogical()
283+
+ "Physical: "
284+
+ ref.get().getCalcite().getPhysical()
285+
+ "\nExtended: "
286+
+ ref.get().getCalcite().getExtended()
287+
+ "\n--- End ---");
288+
}
289+
}
290+
291+
@Test
292+
void explainRelNode_errorPropagation() {
293+
RelNode relNode = mock(RelNode.class);
294+
org.mockito.Mockito.doThrow(new RuntimeException("Explain failure"))
295+
.when(relNode)
296+
.explain(org.mockito.ArgumentMatchers.any());
297+
298+
AtomicReference<Exception> errorRef = new AtomicReference<>();
299+
engine.explain(
300+
relNode,
301+
org.opensearch.sql.ast.statement.ExplainMode.STANDARD,
302+
mockContext,
303+
new ResponseListener<ExplainResponse>() {
304+
@Override
305+
public void onResponse(ExplainResponse response) {}
306+
307+
@Override
308+
public void onFailure(Exception e) {
309+
errorRef.set(e);
310+
}
311+
});
312+
313+
assertNotNull(errorRef.get(), "onFailure should have been called");
314+
System.out.println(dumpError("explainRelNode_errorPropagation", errorRef.get()));
315+
}
316+
248317
// --- helpers ---
249318

250319
private QueryResponse executeAndCapture(RelNode relNode) {

integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,84 @@ public void testSingleFieldProjection() throws IOException {
150150
verifyNumOfRows(result, 3);
151151
}
152152

153+
// --- Explain tests ---
154+
155+
@Test
156+
public void testExplainResponseStructure() throws IOException {
157+
String query = "source = opensearch.parquet_logs | fields ts, message";
158+
String raw = explainQueryToString(query);
159+
LOG.info("[testExplainResponseStructure] query: {}\nresponse: {}", query, raw);
160+
JSONObject response = new JSONObject(raw);
161+
162+
assertTrue("Explain should have 'calcite' key. Response: " + raw, response.has("calcite"));
163+
JSONObject calcite = response.getJSONObject("calcite");
164+
assertTrue("Explain should have 'logical' key. Response: " + raw, calcite.has("logical"));
165+
String logical = calcite.getString("logical");
166+
assertFalse("Logical plan should not be empty. Response: " + raw, logical.isEmpty());
167+
assertTrue("Physical plan should be null. Response: " + raw, calcite.isNull("physical"));
168+
assertTrue("Extended plan should be null. Response: " + raw, calcite.isNull("extended"));
169+
}
170+
171+
@Test
172+
public void testExplainProjectAndScan() throws IOException {
173+
String query = "source = opensearch.parquet_logs | fields ts, message";
174+
String raw = explainQueryToString(query);
175+
String logical = extractLogicalPlan(raw);
176+
LOG.info("[testExplainProjectAndScan] query: {}\nlogical plan:\n{}", query, logical);
177+
178+
assertTrue(
179+
"Plan should contain LogicalProject. Plan:\n" + logical,
180+
logical.contains("LogicalProject"));
181+
assertTrue(
182+
"Plan should contain LogicalTableScan. Plan:\n" + logical,
183+
logical.contains("LogicalTableScan"));
184+
assertTrue(
185+
"Plan should reference parquet_logs. Plan:\n" + logical, logical.contains("parquet_logs"));
186+
}
187+
188+
@Test
189+
public void testExplainFilterPlan() throws IOException {
190+
String query = "source = opensearch.parquet_logs | where status = 200 | fields ts, message";
191+
String raw = explainQueryToString(query);
192+
String logical = extractLogicalPlan(raw);
193+
LOG.info("[testExplainFilterPlan] query: {}\nlogical plan:\n{}", query, logical);
194+
195+
assertTrue(
196+
"Plan should contain LogicalFilter. Plan:\n" + logical, logical.contains("LogicalFilter"));
197+
assertTrue(
198+
"Plan should contain filter condition 200. Plan:\n" + logical, logical.contains("200"));
199+
}
200+
201+
@Test
202+
public void testExplainAggregationPlan() throws IOException {
203+
String query = "source = opensearch.parquet_logs | stats count() by status";
204+
String raw = explainQueryToString(query);
205+
String logical = extractLogicalPlan(raw);
206+
LOG.info("[testExplainAggregationPlan] query: {}\nlogical plan:\n{}", query, logical);
207+
208+
assertTrue(
209+
"Plan should contain LogicalAggregate. Plan:\n" + logical,
210+
logical.contains("LogicalAggregate"));
211+
assertTrue("Plan should contain COUNT(). Plan:\n" + logical, logical.contains("COUNT()"));
212+
}
213+
214+
@Test
215+
public void testExplainSortPlan() throws IOException {
216+
String query = "source = opensearch.parquet_logs | sort ts";
217+
String raw = explainQueryToString(query);
218+
String logical = extractLogicalPlan(raw);
219+
LOG.info("[testExplainSortPlan] query: {}\nlogical plan:\n{}", query, logical);
220+
221+
assertTrue(
222+
"Plan should contain LogicalSort. Plan:\n" + logical, logical.contains("LogicalSort"));
223+
}
224+
225+
/** Extract the logical plan string from the explain JSON response. */
226+
private String extractLogicalPlan(String explainResponse) {
227+
JSONObject response = new JSONObject(explainResponse);
228+
return response.getJSONObject("calcite").getString("logical");
229+
}
230+
153231
// --- Error handling tests ---
154232

155233
@Test

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.plugin.rest;
77

8+
import static org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
9+
import static org.opensearch.sql.executor.ExecutionEngine.ExplainResponse.normalizeLf;
810
import static org.opensearch.sql.lang.PPLLangSpec.PPL_SPEC;
911
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
1012
import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;
@@ -19,6 +21,7 @@
1921
import org.opensearch.core.action.ActionListener;
2022
import org.opensearch.sql.api.UnifiedQueryContext;
2123
import org.opensearch.sql.api.UnifiedQueryPlanner;
24+
import org.opensearch.sql.ast.statement.ExplainMode;
2225
import org.opensearch.sql.calcite.CalcitePlanContext;
2326
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
2427
import org.opensearch.sql.common.response.ResponseListener;
@@ -33,6 +36,7 @@
3336
import org.opensearch.sql.ppl.domain.PPLQueryRequest;
3437
import org.opensearch.sql.protocol.response.QueryResult;
3538
import org.opensearch.sql.protocol.response.format.JdbcResponseFormatter;
39+
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
3640
import org.opensearch.sql.protocol.response.format.ResponseFormatter;
3741
import org.opensearch.transport.client.node.NodeClient;
3842

@@ -72,6 +76,7 @@ public static boolean isAnalyticsIndex(String query) {
7276
* @param pplRequest the original PPL request
7377
* @param listener the transport action listener
7478
*/
79+
/** Execute a query through the unified query pipeline on the sql-worker thread pool. */
7580
public void execute(
7681
String query,
7782
QueryType queryType,
@@ -80,7 +85,21 @@ public void execute(
8085
client
8186
.threadPool()
8287
.schedule(
83-
withCurrentContext(() -> doExecute(query, queryType, pplRequest, listener)),
88+
withCurrentContext(() -> doExecute(query, queryType, pplRequest, false, listener)),
89+
new TimeValue(0),
90+
SQL_WORKER_THREAD_POOL_NAME);
91+
}
92+
93+
/** Explain a query through the unified query pipeline on the sql-worker thread pool. */
94+
public void explain(
95+
String query,
96+
QueryType queryType,
97+
PPLQueryRequest pplRequest,
98+
ActionListener<TransportPPLQueryResponse> listener) {
99+
client
100+
.threadPool()
101+
.schedule(
102+
withCurrentContext(() -> doExecute(query, queryType, pplRequest, true, listener)),
84103
new TimeValue(0),
85104
SQL_WORKER_THREAD_POOL_NAME);
86105
}
@@ -89,6 +108,7 @@ private void doExecute(
89108
String query,
90109
QueryType queryType,
91110
PPLQueryRequest pplRequest,
111+
boolean isExplain,
92112
ActionListener<TransportPPLQueryResponse> listener) {
93113
try {
94114
long startTime = System.nanoTime();
@@ -104,8 +124,6 @@ private void doExecute(
104124
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
105125
RelNode plan = planner.plan(query);
106126

107-
// Add query size limit to the plan so the analytics engine can enforce it
108-
// during execution, consistent with PPL V3 (see QueryService.convertToCalcitePlan)
109127
CalcitePlanContext planContext = context.getPlanContext();
110128
plan = addQuerySizeLimit(plan, planContext);
111129

@@ -115,8 +133,13 @@ private void doExecute(
115133
(planTime - startTime) / 1_000_000,
116134
queryType);
117135

118-
analyticsEngine.execute(
119-
plan, planContext, createQueryListener(queryType, planTime, listener));
136+
if (isExplain) {
137+
analyticsEngine.explain(
138+
plan, ExplainMode.STANDARD, planContext, createExplainListener(listener));
139+
} else {
140+
analyticsEngine.execute(
141+
plan, planContext, createQueryListener(queryType, planTime, listener));
142+
}
120143
}
121144
} catch (Exception e) {
122145
listener.onFailure(e);
@@ -162,6 +185,29 @@ public void onFailure(Exception e) {
162185
};
163186
}
164187

188+
private ResponseListener<ExplainResponse> createExplainListener(
189+
ActionListener<TransportPPLQueryResponse> transportListener) {
190+
return new ResponseListener<ExplainResponse>() {
191+
@Override
192+
public void onResponse(ExplainResponse response) {
193+
JsonResponseFormatter<ExplainResponse> formatter =
194+
new JsonResponseFormatter<ExplainResponse>(PRETTY) {
195+
@Override
196+
protected Object buildJsonObject(ExplainResponse resp) {
197+
return normalizeLf(resp);
198+
}
199+
};
200+
transportListener.onResponse(
201+
new TransportPPLQueryResponse(formatter.format(response), formatter.contentType()));
202+
}
203+
204+
@Override
205+
public void onFailure(Exception e) {
206+
transportListener.onFailure(e);
207+
}
208+
};
209+
}
210+
165211
/**
166212
* Capture current thread context and restore it on the worker thread. Ensures security context
167213
* (user identity, permissions) is propagated. Same pattern as {@link

plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,13 @@ protected void doExecute(
127127

128128
// Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices
129129
if (RestUnifiedQueryAction.isAnalyticsIndex(transformedRequest.getRequest())) {
130-
unifiedQueryHandler.execute(
131-
transformedRequest.getRequest(), QueryType.PPL, transformedRequest, clearingListener);
130+
if (transformedRequest.isExplainRequest()) {
131+
unifiedQueryHandler.explain(
132+
transformedRequest.getRequest(), QueryType.PPL, transformedRequest, clearingListener);
133+
} else {
134+
unifiedQueryHandler.execute(
135+
transformedRequest.getRequest(), QueryType.PPL, transformedRequest, clearingListener);
136+
}
132137
return;
133138
}
134139

0 commit comments

Comments
 (0)