generated from amazon-archives/__template_Custom
-
Notifications
You must be signed in to change notification settings - Fork 188
Add query routing and execution handoff for Parquet-backed indices #5264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
ahkcs
merged 1 commit into
opensearch-project:feature/mustang-ppl-integration
from
ahkcs:pr/mustang-query-routing
Mar 25, 2026
+1,172
−2
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
115 changes: 115 additions & 0 deletions
115
core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.executor.analytics; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.calcite.rel.RelNode; | ||
| import org.apache.calcite.rel.type.RelDataType; | ||
| import org.apache.calcite.rel.type.RelDataTypeField; | ||
| import org.opensearch.sql.calcite.CalcitePlanContext; | ||
| import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; | ||
| import org.opensearch.sql.common.response.ResponseListener; | ||
| import org.opensearch.sql.data.model.ExprTupleValue; | ||
| import org.opensearch.sql.data.model.ExprValue; | ||
| import org.opensearch.sql.data.model.ExprValueUtils; | ||
| import org.opensearch.sql.data.type.ExprType; | ||
| import org.opensearch.sql.executor.ExecutionContext; | ||
| import org.opensearch.sql.executor.ExecutionEngine; | ||
| import org.opensearch.sql.executor.pagination.Cursor; | ||
| import org.opensearch.sql.planner.physical.PhysicalPlan; | ||
|
|
||
| /** | ||
| * Execution engine adapter for the analytics engine (Project Mustang). | ||
| * | ||
| * <p>Bridges the analytics engine's {@link QueryPlanExecutor} with the SQL plugin's {@link | ||
| * ExecutionEngine} response pipeline. Takes a Calcite {@link RelNode}, delegates execution to the | ||
| * analytics engine, and converts the raw results into {@link QueryResponse}. | ||
| */ | ||
| public class AnalyticsExecutionEngine implements ExecutionEngine { | ||
|
|
||
| private final QueryPlanExecutor planExecutor; | ||
|
|
||
| public AnalyticsExecutionEngine(QueryPlanExecutor planExecutor) { | ||
| this.planExecutor = planExecutor; | ||
| } | ||
|
|
||
| /** Not supported. Analytics queries use the RelNode path exclusively. */ | ||
| @Override | ||
| public void execute(PhysicalPlan plan, ResponseListener<QueryResponse> listener) { | ||
| listener.onFailure( | ||
| new UnsupportedOperationException("Analytics engine only supports RelNode execution")); | ||
| } | ||
|
|
||
| /** Not supported. Analytics queries use the RelNode path exclusively. */ | ||
| @Override | ||
| public void execute( | ||
| PhysicalPlan plan, ExecutionContext context, ResponseListener<QueryResponse> listener) { | ||
| listener.onFailure( | ||
| new UnsupportedOperationException("Analytics engine only supports RelNode execution")); | ||
| } | ||
|
|
||
| /** Not supported. Analytics queries use the RelNode path exclusively. */ | ||
| @Override | ||
| public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listener) { | ||
| listener.onFailure( | ||
| new UnsupportedOperationException("Analytics engine only supports RelNode execution")); | ||
| } | ||
|
|
||
| @Override | ||
| public void execute( | ||
| RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) { | ||
| try { | ||
| Integer querySizeLimit = context.sysLimit.querySizeLimit(); | ||
| Iterable<Object[]> rows = planExecutor.execute(plan, null); | ||
|
|
||
| List<RelDataTypeField> fields = plan.getRowType().getFieldList(); | ||
| List<ExprValue> results = convertRows(rows, fields, querySizeLimit); | ||
| Schema schema = buildSchema(fields); | ||
|
|
||
| listener.onResponse(new QueryResponse(schema, results, Cursor.None)); | ||
| } catch (Exception e) { | ||
| listener.onFailure(e); | ||
| } | ||
| } | ||
|
|
||
| private List<ExprValue> convertRows( | ||
| Iterable<Object[]> rows, List<RelDataTypeField> fields, Integer querySizeLimit) { | ||
| List<ExprValue> results = new ArrayList<>(); | ||
| for (Object[] row : rows) { | ||
| if (querySizeLimit != null && results.size() >= querySizeLimit) { | ||
| break; | ||
| } | ||
| Map<String, ExprValue> valueMap = new LinkedHashMap<>(); | ||
| for (int i = 0; i < fields.size(); i++) { | ||
| String columnName = fields.get(i).getName(); | ||
| Object value = (i < row.length) ? row[i] : null; | ||
| valueMap.put(columnName, ExprValueUtils.fromObjectValue(value)); | ||
| } | ||
| results.add(ExprTupleValue.fromExprValueMap(valueMap)); | ||
| } | ||
| return results; | ||
| } | ||
|
|
||
| private Schema buildSchema(List<RelDataTypeField> fields) { | ||
| List<Schema.Column> columns = new ArrayList<>(); | ||
| for (RelDataTypeField field : fields) { | ||
| ExprType exprType = convertType(field.getType()); | ||
| columns.add(new Schema.Column(field.getName(), null, exprType)); | ||
| } | ||
| return new Schema(columns); | ||
| } | ||
|
|
||
| private ExprType convertType(RelDataType type) { | ||
| try { | ||
| return OpenSearchTypeFactory.convertRelDataTypeToExprType(type); | ||
| } catch (IllegalArgumentException e) { | ||
| return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN; | ||
| } | ||
| } | ||
| } | ||
32 changes: 32 additions & 0 deletions
32
core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.executor.analytics; | ||
|
|
||
| import org.apache.calcite.rel.RelNode; | ||
|
|
||
| /** | ||
| * Executes a Calcite {@link RelNode} logical plan against the analytics engine. | ||
| * | ||
| * <p>This is a local equivalent of {@code org.opensearch.analytics.exec.QueryPlanExecutor} from the | ||
| * analytics-framework library. It will be replaced by the upstream interface once the | ||
| * analytics-framework JAR is published. | ||
| * | ||
| * @see <a | ||
| * href="https://github.com/opensearch-project/OpenSearch/blob/9142d0e789c6a6c4708f1bc015745ed55202eefe/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/exec/QueryPlanExecutor.java">Upstream | ||
| * QueryPlanExecutor</a> | ||
| */ | ||
| @FunctionalInterface | ||
| public interface QueryPlanExecutor { | ||
|
|
||
| /** | ||
| * Executes the given logical plan and returns result rows. | ||
| * | ||
| * @param plan the Calcite RelNode subtree to execute | ||
| * @param context execution context (opaque to avoid server dependency) | ||
| * @return rows produced by the engine | ||
| */ | ||
| Iterable<Object[]> execute(RelNode plan, Object context); | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you check how PPL V3 handle this today? I'm thinking if we want to be consistent, this needs to be part of the RelNode plan we pass to
QueryPlanExecutorinstead of post-processing? Otherwise we may pull lots of data back?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Moved the query size limit into the RelNode plan using
LogicalSystemLimit.create(), same pattern asQueryService.convertToCalcitePlan(). The limit is now part of the plan the analytics engine receives, so it can enforce it during execution instead of us pulling all rows back first.Removed the post-processing truncation from
AnalyticsExecutionEngine