Skip to content

Add query routing and execution handoff for Parquet-backed indices#5267

Merged
ahkcs merged 11 commits intoopensearch-project:feature/mustang-ppl-integrationfrom
ahkcs:pr/mustang-query-routing
Mar 26, 2026
Merged

Add query routing and execution handoff for Parquet-backed indices#5267
ahkcs merged 11 commits intoopensearch-project:feature/mustang-ppl-integrationfrom
ahkcs:pr/mustang-query-routing

Conversation

@ahkcs
Copy link
Collaborator

@ahkcs ahkcs commented Mar 25, 2026

Summary

Implements query routing and execution handoff for Project Mustang's unified query pipeline (#5247).

This PR covers Step 2 and Step 3 (partial) from the issue:

Step 2: Query routing and execution handoff [Done]

Add RestUnifiedQueryAction and AnalyticsExecutionEngine that detect non-Lucene indices, route PPL queries through UnifiedQueryPlanner.plan() -> QueryPlanExecutor.execute(), and schedule execution on sql-worker thread pool with security context propagation.

  • Index detection: StubIndexDetector.isAnalyticsIndex() extracts index name from PPL query via regex, checks for parquet_ prefix. Will be replaced by UnifiedQueryParser and index settings.
  • Query routing: TransportPPLQueryAction.doExecute() routes parquet indices to RestUnifiedQueryAction, everything else to existing Lucene path unchanged. Routing happens after PPL enabled check, metrics, request ID, and profiling setup.
  • Planning: UnifiedQueryPlanner.plan() parses PPL and generates Calcite RelNode against a stub schema. Query size limit added to plan as LogicalSystemLimit, consistent with PPL V3 (QueryService.convertToCalcitePlan).
  • Execution: AnalyticsExecutionEngine calls QueryPlanExecutor.execute(relNode), converts Iterable<Object[]> to QueryResponse with type mapping via OpenSearchTypeFactory.
  • Thread scheduling: Runs on sql-worker thread pool with security context propagation via withCurrentContext().
  • Profiling: Uses clearingListener from wrapWithProfilingClear() to ensure profiling thread-local state is cleaned up.
  • Stub executor: StubQueryPlanExecutor returns canned data for parquet_logs and parquet_metrics tables.

Step 3: Response formatting [Done] (explain support deferred to next PR)

Format Iterable<Object[]> results via existing JdbcResponseFormatter.

  • Response formatting: QueryResponse -> QueryResult with query-type-aware LangSpec (PPL_SPEC or SQL_SPEC) -> JdbcResponseFormatter produces standard JDBC JSON (schema, datarows, total, size, status).
  • Metrics: Query-type-aware success/failure metrics (PPL_REQ_TOTAL vs REQ_TOTAL, PPL_FAILED_REQ_COUNT_* vs FAILED_REQ_COUNT_*).

Step 4: Error handling (basic) [Done]

Client vs server error classification, request/failure metrics.

  • Error classification: Matches RestPPLQueryAction.isClientError() list minus NPE (NPE is a server bug in the analytics path). Will sync with Initial implementation of report-builder interface #5266.
  • Error formatting: Uses ErrorMessageFactory.createErrorMessage() for standard error response format.
  • Latency logging: [unified] Planning completed in Xms, [unified] Execution completed in Xms, N rows returned.

Not in this PR (next PR / blocked)

  • Step 1 (plugin wiring): Blocked -- analytics-engine JAR not available yet
  • Step 3 (explain): Will be in next PR
  • Step 5 (full integration tests with real analytics engine): Blocked -- will use stub for now

New Files

File Purpose
core/.../analytics/QueryPlanExecutor.java @FunctionalInterface -- contract for analytics engine execution
core/.../analytics/AnalyticsExecutionEngine.java ExecutionEngine impl -- converts raw rows to QueryResponse
plugin/.../rest/RestUnifiedQueryAction.java Orchestrates analytics query path: planning, execution, response formatting
plugin/.../rest/analytics/stub/StubQueryPlanExecutor.java Stub executor -- canned data for testing
plugin/.../rest/analytics/stub/StubSchemaProvider.java Stub schema -- hardcoded Calcite table definitions
plugin/.../rest/analytics/stub/StubIndexDetector.java Stub routing -- parquet_ prefix detection
integ-test/.../ppl/AnalyticsPPLIT.java Integration tests -- full pipeline verification

Modified Files

File Change
plugin/.../transport/TransportPPLQueryAction.java Added analytics routing after profiling setup, uses clearingListener
plugin/build.gradle Added :api dependency for UnifiedQueryPlanner

Test plan

  • 9 unit tests -- AnalyticsExecutionEngine (type mapping, row conversion, nulls, error propagation, PhysicalPlan rejection)
  • 2 unit tests -- RestUnifiedQueryAction (routing detection)
  • 9 integration tests -- AnalyticsPPLIT (full pipeline: schema+data verification with verifySchema/verifyDataRows, response format, projection, syntax error -> 400, Lucene regression)

@ahkcs ahkcs changed the title [Mustang] Add query routing and execution handoff for Parquet-backed indices Add query routing and execution handoff for Parquet-backed indices Mar 25, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Mar 25, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 0ea384a.

PathLineSeverityDescription
plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java130mediumQueries targeting 'parquet_*' indices are silently routed to the analytics engine via StubQueryPlanExecutor before normal PPL service processing (security checks, profiling, metrics). If the analytics path lacks equivalent authorization controls, this routing bypass could allow unauthenticated or unprivileged access to the analytics engine for any user who can craft a query targeting a parquet_ index. Warrants review to confirm the analytics path enforces the same access controls as the PPL service path.
plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java264lowNullPointerException is classified as a client error (HTTP 400) in isClientError(). NPEs are internal server bugs, not client mistakes. Misclassifying them as 4xx causes server-side errors to be reported as BAD_REQUEST, which could suppress monitoring/alerting for server faults and mislead operators about the source of failures.
plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubQueryPlanExecutor.java1lowA stub executor with hardcoded production data (real-looking IP addresses, timestamps, metrics) is wired directly into TransportPPLQueryAction for all parquet_ index queries. While clearly marked as temporary, shipping stub implementations in production code that return fabricated results is anomalous and could produce misleading query results in a deployed cluster if the stub is not replaced before release.

The table above displays the top 10 most important findings.

Total: 3 | Critical: 0 | High: 0 | Medium: 1 | Low: 2


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

@ahkcs ahkcs added PPL Piped processing language enhancement New feature or request labels Mar 25, 2026
@ahkcs ahkcs force-pushed the pr/mustang-query-routing branch from a924eda to f050fb9 Compare March 25, 2026 21:07
…indices (opensearch-project#5247)

Implements the query routing and AnalyticsExecutionEngine for Project
Mustang's unified query pipeline. PPL queries targeting parquet_ prefixed
indices are routed through UnifiedQueryPlanner and executed via a stub
QueryPlanExecutor, with results formatted through the existing JDBC
response pipeline.

New files:
- QueryPlanExecutor: @FunctionalInterface contract for analytics engine
- AnalyticsExecutionEngine: converts Iterable<Object[]> to QueryResponse
  with type mapping and query size limit enforcement
- RestUnifiedQueryAction: orchestrates schema building, planning,
  execution on sql-worker thread pool, with client/server error
  classification and metrics
- StubQueryPlanExecutor: canned data for parquet_logs and parquet_metrics
  tables for development and testing

Modified files:
- RestPPLQueryAction: routing branch for parquet_ indices
- SQLPlugin: passes ClusterService and NodeClient to RestPPLQueryAction
- plugin/build.gradle: adds :api dependency for UnifiedQueryPlanner

Signed-off-by: Kai Huang <kaihuang@amazon.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>
@ahkcs ahkcs force-pushed the pr/mustang-query-routing branch from f050fb9 to a16078c Compare March 25, 2026 21:09
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

1 similar comment
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

Add missing exception types to isClientError(): IndexNotFoundException,
ExpressionEvaluationException, QueryEngineException,
DataSourceClientException, IllegalAccessException. Matches the full
list in RestPPLQueryAction.isClientError().

Signed-off-by: Kai Huang <kaihuang@amazon.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

Extract StubSchemaProvider, StubQueryPlanExecutor, and StubIndexDetector
into plugin/.../rest/analytics/stub/ package to clearly separate
temporary stub code from production code. RestUnifiedQueryAction now
delegates to these stub classes.

Signed-off-by: Kai Huang <kaihuang@amazon.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

Replace hand-crafted JSON error response with
ErrorMessageFactory.createErrorMessage(), matching the standard error
format used in RestPPLQueryAction.reportError().

Signed-off-by: Kai Huang <kaihuang@amazon.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

Use QueryType to select the correct metrics (PPL_REQ_TOTAL vs REQ_TOTAL,
PPL_FAILED_REQ_COUNT_* vs FAILED_REQ_COUNT_*) and LangSpec (PPL_SPEC
vs SQL_SPEC) so this class can serve both PPL and SQL queries when
unified SQL support is added.

Signed-off-by: Kai Huang <kaihuang@amazon.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

Move the analytics index routing check from RestPPLQueryAction into
TransportPPLQueryAction.doExecute(). This ensures the analytics path
gets the same PPL enabled check, metrics, request ID, and inter-plugin
transport support as the existing Lucene path. RestPPLQueryAction and
SQLPlugin are reverted to their original state.

Added executeViaTransport() to RestUnifiedQueryAction which returns
results via ActionListener<TransportPPLQueryResponse> instead of
RestChannel, integrating properly with the transport action pattern.

Signed-off-by: Kai Huang <kaihuang@amazon.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

Add LogicalSystemLimit to the RelNode plan before passing it to the
analytics engine, consistent with PPL V3 (QueryService.convertToCalcitePlan).
This ensures the analytics engine enforces the limit during execution
rather than returning all rows for post-processing truncation.

Remove post-processing querySizeLimit truncation from
AnalyticsExecutionEngine -- the limit is now part of the plan the
executor receives.

Signed-off-by: Kai Huang <kaihuang@amazon.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

@ahkcs ahkcs marked this pull request as ready for review March 25, 2026 22:17
@qianheng-aws qianheng-aws deleted the branch opensearch-project:feature/mustang-ppl-integration March 26, 2026 09:24
@ahkcs ahkcs reopened this Mar 26, 2026
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

* @param channel the REST channel for sending the response
*/
public void execute(String query, QueryType queryType, RestChannel channel) {
client
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security context (user identity, permissions, audit trail) is not propagated when scheduling on the sql-worker thread pool. The existing PPL path uses OpenSearchQueryManager.withCurrentContext() to capture and restore ThreadContext on the worker thread. The analytics path calls client.threadPool().schedule() with a bare lambda, losing all security context. Queries may execute with wrong permissions. Wrap the lambda in both execute() and executeViaTransport() with a context-propagating wrapper equivalent to withCurrentContext().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Wrapped both execute() and executeViaTransport() with withCurrentContext() to capture and restore ThreadContext on the worker thread. Follows the same pattern as OpenSearchQueryManager.withCurrentContext().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated again to remove the duplicate REST execute() path

Comment on lines +126 to +131
// Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices
if (RestUnifiedQueryAction.isAnalyticsIndex(transformedRequest.getRequest())) {
unifiedQueryHandler.executeViaTransport(
transformedRequest.getRequest(), QueryType.PPL, transformedRequest, listener);
return;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Early return after analytics routing bypasses QueryContext.setProfile() (line 134) and the wrapWithProfilingClear(listener) cleanup wrapper (line 135). This leaks profiling thread-local state across requests. Either move the routing check after profiling setup, or duplicate setProfile()/profiling-clear in the analytics path.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Moved the routing check after QueryContext.setProfile() and wrapWithProfilingClear(listener), and now pass clearingListener to executeViaTransport() instead of raw listener.

Wrap scheduled lambdas in both execute() and executeViaTransport() with
withCurrentContext() to capture and restore ThreadContext (user identity,
permissions, audit trail) on the worker thread. Follows the same pattern
as OpenSearchQueryManager.withCurrentContext().

Signed-off-by: Kai Huang <kaihuang@amazon.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

Move the analytics index routing check after QueryContext.setProfile()
and wrapWithProfilingClear(listener). Use clearingListener instead of
raw listener so profiling thread-local state is properly cleaned up
after analytics path execution.

Signed-off-by: Kai Huang <kaihuang@amazon.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

NPE in the analytics path is a server bug (null schema field, missing
row), not bad user input. Removed from client error list. Will sync
this classification with RestPPLQueryAction updates in opensearch-project#5266.

Signed-off-by: Kai Huang <kaihuang@amazon.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

@ahkcs ahkcs requested a review from penghuo March 26, 2026 21:22
Remove execute(query, queryType, channel), doExecute(), createQueryListener(channel),
recordSuccessMetric(), recordFailureMetric(), reportError(), and related
REST imports. Since routing now goes through TransportPPLQueryAction,
the REST-specific path was unused. Renamed executeViaTransport() to
execute() as the sole entry point.

Signed-off-by: Kai Huang <kaihuang@amazon.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

@ahkcs ahkcs merged commit 23227fd into opensearch-project:feature/mustang-ppl-integration Mar 26, 2026
37 checks passed
@ahkcs ahkcs deleted the pr/mustang-query-routing branch March 26, 2026 23:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request PPL Piped processing language

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants