Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Locale;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.sql.SqlExplainLevel;

@RequiredArgsConstructor
public enum ExplainMode {
Expand All @@ -26,4 +27,13 @@ public static ExplainMode of(String mode) {
return ExplainMode.STANDARD;
}
}

/** Convert to Calcite SqlExplainLevel for RelOptUtil.toString(). */
public SqlExplainLevel toExplainLevel() {
return switch (this) {
case SIMPLE -> SqlExplainLevel.NO_ATTRIBUTES;
case COST -> SqlExplainLevel.ALL_ATTRIBUTES;
default -> SqlExplainLevel.EXPPLAN_ATTRIBUTES;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.opensearch.sql.ast.statement.ExplainMode;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
import org.opensearch.sql.common.response.ResponseListener;
Expand Down Expand Up @@ -77,6 +79,20 @@ public void execute(
}
}

@Override
public void explain(
RelNode plan,
ExplainMode mode,
CalcitePlanContext context,
ResponseListener<ExplainResponse> listener) {
try {
String logical = RelOptUtil.toString(plan, mode.toExplainLevel());
listener.onResponse(new ExplainResponse(new ExplainResponseNodeV2(logical, null, null)));
} catch (Exception e) {
listener.onFailure(e);
}
}

private List<ExprValue> convertRows(Iterable<Object[]> rows, List<RelDataTypeField> fields) {
List<ExprValue> results = new ArrayList<>();
for (Object[] row : rows) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ppl;

import static org.opensearch.sql.util.MatcherUtils.assertYamlEqualsIgnoreId;

import java.io.IOException;
import org.junit.Test;

/**
* Explain integration tests for queries routed through the analytics engine path (Project Mustang).
* Validates that PPL queries targeting "parquet_*" indices produce correct logical plans via the
* _plugins/_ppl/_explain endpoint.
*
* <p>Expected output files are in resources/expectedOutput/analytics/. Each test compares the
* explain YAML output against its expected file, following the same pattern as CalciteExplainIT.
*
* <p>Since the analytics engine is not yet available, physical and extended plans are null. Only
* the logical plan (Calcite RelNode tree) is verified.
*/
public class AnalyticsExplainIT extends PPLIntegTestCase {

@Override
protected void init() throws Exception {
// No index loading needed -- stub schema and data are hardcoded
}

private String loadAnalyticsExpectedPlan(String fileName) {
return loadFromFile("expectedOutput/analytics/" + fileName);
}

@Test
public void testExplainSimpleScan() throws IOException {
assertYamlEqualsIgnoreId(
loadAnalyticsExpectedPlan("explain_simple_scan.yaml"),
explainQueryYaml("source = opensearch.parquet_logs"));
}

@Test
public void testExplainProject() throws IOException {
assertYamlEqualsIgnoreId(
loadAnalyticsExpectedPlan("explain_project.yaml"),
explainQueryYaml("source = opensearch.parquet_logs | fields ts, message"));
}

@Test
public void testExplainFilterAndProject() throws IOException {
assertYamlEqualsIgnoreId(
loadAnalyticsExpectedPlan("explain_filter_project.yaml"),
explainQueryYaml(
"source = opensearch.parquet_logs | where status = 200 | fields ts, message"));
}

@Test
public void testExplainAggregation() throws IOException {
assertYamlEqualsIgnoreId(
loadAnalyticsExpectedPlan("explain_aggregation.yaml"),
explainQueryYaml("source = opensearch.parquet_logs | stats count() by status"));
}

@Test
public void testExplainSort() throws IOException {
assertYamlEqualsIgnoreId(
loadAnalyticsExpectedPlan("explain_sort.yaml"),
explainQueryYaml("source = opensearch.parquet_logs | sort ts"));
}

@Test
public void testExplainEval() throws IOException {
assertYamlEqualsIgnoreId(
loadAnalyticsExpectedPlan("explain_eval.yaml"),
explainQueryYaml("source = opensearch.parquet_logs | eval error = status = 500"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(count()=[$1], status=[$0])
LogicalAggregate(group=[{0}], count()=[COUNT()])
LogicalProject(status=[$1])
LogicalTableScan(table=[[opensearch, parquet_logs]])
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(ts=[$0], status=[$1], message=[$2], ip_addr=[$3], error=[=($1, 500)])
LogicalTableScan(table=[[opensearch, parquet_logs]])
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(ts=[$0], message=[$2])
LogicalFilter(condition=[=($1, 200)])
LogicalTableScan(table=[[opensearch, parquet_logs]])
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(ts=[$0], message=[$2])
LogicalTableScan(table=[[opensearch, parquet_logs]])
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalTableScan(table=[[opensearch, parquet_logs]])
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
calcite:
logical: |
LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])
LogicalTableScan(table=[[opensearch, parquet_logs]])
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.plugin.rest;

import static org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
import static org.opensearch.sql.lang.PPLLangSpec.PPL_SPEC;
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;
Expand Down Expand Up @@ -72,6 +73,7 @@ public static boolean isAnalyticsIndex(String query) {
* @param pplRequest the original PPL request
* @param listener the transport action listener
*/
/** Execute a query through the unified query pipeline on the sql-worker thread pool. */
public void execute(
String query,
QueryType queryType,
Expand All @@ -85,6 +87,24 @@ public void execute(
SQL_WORKER_THREAD_POOL_NAME);
}

/**
* Explain a query through the unified query pipeline on the sql-worker thread pool. Returns
* ExplainResponse via ResponseListener so the caller (TransportPPLQueryAction) can format it
* using its own createExplainResponseListener, reusing the existing format-aware logic.
*/
public void explain(
String query,
QueryType queryType,
PPLQueryRequest pplRequest,
ResponseListener<ExplainResponse> listener) {
client
.threadPool()
.schedule(
withCurrentContext(() -> doExplain(query, queryType, pplRequest, listener)),
new TimeValue(0),
SQL_WORKER_THREAD_POOL_NAME);
}

private void doExecute(
String query,
QueryType queryType,
Expand All @@ -104,8 +124,6 @@ private void doExecute(
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
RelNode plan = planner.plan(query);

// Add query size limit to the plan so the analytics engine can enforce it
// during execution, consistent with PPL V3 (see QueryService.convertToCalcitePlan)
CalcitePlanContext planContext = context.getPlanContext();
plan = addQuerySizeLimit(plan, planContext);

Expand All @@ -123,6 +141,41 @@ private void doExecute(
}
}

private void doExplain(
String query,
QueryType queryType,
PPLQueryRequest pplRequest,
ResponseListener<ExplainResponse> listener) {
try {
long startTime = System.nanoTime();
AbstractSchema schema = StubSchemaProvider.buildSchema();

try (UnifiedQueryContext context =
UnifiedQueryContext.builder()
.language(queryType)
.catalog(SCHEMA_NAME, schema)
.defaultNamespace(SCHEMA_NAME)
.build()) {

UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
RelNode plan = planner.plan(query);

CalcitePlanContext planContext = context.getPlanContext();
plan = addQuerySizeLimit(plan, planContext);

long planTime = System.nanoTime();
LOG.info(
"[unified] Planning completed in {}ms for {} query",
(planTime - startTime) / 1_000_000,
queryType);

analyticsEngine.explain(plan, pplRequest.mode(), planContext, listener);
}
} catch (Exception e) {
listener.onFailure(e);
}
}

/**
* Add a system-level query size limit to the plan. This ensures the analytics engine enforces the
* limit during execution rather than returning all rows for post-processing truncation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,16 @@ protected void doExecute(

// Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices
if (RestUnifiedQueryAction.isAnalyticsIndex(transformedRequest.getRequest())) {
unifiedQueryHandler.execute(
transformedRequest.getRequest(), QueryType.PPL, transformedRequest, clearingListener);
if (transformedRequest.isExplainRequest()) {
unifiedQueryHandler.explain(
transformedRequest.getRequest(),
QueryType.PPL,
transformedRequest,
createExplainResponseListener(transformedRequest, clearingListener));
} else {
unifiedQueryHandler.execute(
transformedRequest.getRequest(), QueryType.PPL, transformedRequest, clearingListener);
}
return;
}

Expand Down
Loading