Add query routing and execution handoff for Parquet-backed indices#5267
Conversation
PR Code Analyzer ❗AI-powered 'Code-Diff-Analyzer' found issues on commit 0ea384a.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
|
Failed to generate code suggestions for PR |
a924eda to
f050fb9
Compare
…indices (opensearch-project#5247) Implements the query routing and AnalyticsExecutionEngine for Project Mustang's unified query pipeline. PPL queries targeting parquet_ prefixed indices are routed through UnifiedQueryPlanner and executed via a stub QueryPlanExecutor, with results formatted through the existing JDBC response pipeline. New files: - QueryPlanExecutor: @FunctionalInterface contract for analytics engine - AnalyticsExecutionEngine: converts Iterable<Object[]> to QueryResponse with type mapping and query size limit enforcement - RestUnifiedQueryAction: orchestrates schema building, planning, execution on sql-worker thread pool, with client/server error classification and metrics - StubQueryPlanExecutor: canned data for parquet_logs and parquet_metrics tables for development and testing Modified files: - RestPPLQueryAction: routing branch for parquet_ indices - SQLPlugin: passes ClusterService and NodeClient to RestPPLQueryAction - plugin/build.gradle: adds :api dependency for UnifiedQueryPlanner Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
f050fb9 to
a16078c
Compare
|
Failed to generate code suggestions for PR |
1 similar comment
|
Failed to generate code suggestions for PR |
Add missing exception types to isClientError(): IndexNotFoundException, ExpressionEvaluationException, QueryEngineException, DataSourceClientException, IllegalAccessException. Matches the full list in RestPPLQueryAction.isClientError(). Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
|
Failed to generate code suggestions for PR |
Extract StubSchemaProvider, StubQueryPlanExecutor, and StubIndexDetector into plugin/.../rest/analytics/stub/ package to clearly separate temporary stub code from production code. RestUnifiedQueryAction now delegates to these stub classes. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
|
Failed to generate code suggestions for PR |
Replace hand-crafted JSON error response with ErrorMessageFactory.createErrorMessage(), matching the standard error format used in RestPPLQueryAction.reportError(). Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
|
Failed to generate code suggestions for PR |
Use QueryType to select the correct metrics (PPL_REQ_TOTAL vs REQ_TOTAL, PPL_FAILED_REQ_COUNT_* vs FAILED_REQ_COUNT_*) and LangSpec (PPL_SPEC vs SQL_SPEC) so this class can serve both PPL and SQL queries when unified SQL support is added. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
|
Failed to generate code suggestions for PR |
Move the analytics index routing check from RestPPLQueryAction into TransportPPLQueryAction.doExecute(). This ensures the analytics path gets the same PPL enabled check, metrics, request ID, and inter-plugin transport support as the existing Lucene path. RestPPLQueryAction and SQLPlugin are reverted to their original state. Added executeViaTransport() to RestUnifiedQueryAction which returns results via ActionListener<TransportPPLQueryResponse> instead of RestChannel, integrating properly with the transport action pattern. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
|
Failed to generate code suggestions for PR |
Add LogicalSystemLimit to the RelNode plan before passing it to the analytics engine, consistent with PPL V3 (QueryService.convertToCalcitePlan). This ensures the analytics engine enforces the limit during execution rather than returning all rows for post-processing truncation. Remove post-processing querySizeLimit truncation from AnalyticsExecutionEngine -- the limit is now part of the plan the executor receives. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
|
Failed to generate code suggestions for PR |
|
Failed to generate code suggestions for PR |
| * @param channel the REST channel for sending the response | ||
| */ | ||
| public void execute(String query, QueryType queryType, RestChannel channel) { | ||
| client |
There was a problem hiding this comment.
Security context (user identity, permissions, audit trail) is not propagated when scheduling on the sql-worker thread pool. The existing PPL path uses OpenSearchQueryManager.withCurrentContext() to capture and restore ThreadContext on the worker thread. The analytics path calls client.threadPool().schedule() with a bare lambda, losing all security context. Queries may execute with wrong permissions. Wrap the lambda in both execute() and executeViaTransport() with a context-propagating wrapper equivalent to withCurrentContext().
There was a problem hiding this comment.
Good catch. Wrapped both execute() and executeViaTransport() with withCurrentContext() to capture and restore ThreadContext on the worker thread. Follows the same pattern as OpenSearchQueryManager.withCurrentContext().
There was a problem hiding this comment.
Updated again to remove the duplicate REST execute() path
| // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices | ||
| if (RestUnifiedQueryAction.isAnalyticsIndex(transformedRequest.getRequest())) { | ||
| unifiedQueryHandler.executeViaTransport( | ||
| transformedRequest.getRequest(), QueryType.PPL, transformedRequest, listener); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Early return after analytics routing bypasses QueryContext.setProfile() (line 134) and the wrapWithProfilingClear(listener) cleanup wrapper (line 135). This leaks profiling thread-local state across requests. Either move the routing check after profiling setup, or duplicate setProfile()/profiling-clear in the analytics path.
There was a problem hiding this comment.
Good catch. Moved the routing check after QueryContext.setProfile() and wrapWithProfilingClear(listener), and now pass clearingListener to executeViaTransport() instead of raw listener.
plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java
Outdated
Show resolved
Hide resolved
Wrap scheduled lambdas in both execute() and executeViaTransport() with withCurrentContext() to capture and restore ThreadContext (user identity, permissions, audit trail) on the worker thread. Follows the same pattern as OpenSearchQueryManager.withCurrentContext(). Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
|
Failed to generate code suggestions for PR |
Move the analytics index routing check after QueryContext.setProfile() and wrapWithProfilingClear(listener). Use clearingListener instead of raw listener so profiling thread-local state is properly cleaned up after analytics path execution. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
|
Failed to generate code suggestions for PR |
NPE in the analytics path is a server bug (null schema field, missing row), not bad user input. Removed from client error list. Will sync this classification with RestPPLQueryAction updates in opensearch-project#5266. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
|
Failed to generate code suggestions for PR |
Remove execute(query, queryType, channel), doExecute(), createQueryListener(channel), recordSuccessMetric(), recordFailureMetric(), reportError(), and related REST imports. Since routing now goes through TransportPPLQueryAction, the REST-specific path was unused. Renamed executeViaTransport() to execute() as the sole entry point. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
|
Failed to generate code suggestions for PR |
23227fd
into
opensearch-project:feature/mustang-ppl-integration
Summary
Implements query routing and execution handoff for Project Mustang's unified query pipeline (#5247).
This PR covers Step 2 and Step 3 (partial) from the issue:
Step 2: Query routing and execution handoff [Done]
StubIndexDetector.isAnalyticsIndex()extracts index name from PPL query via regex, checks forparquet_prefix. Will be replaced byUnifiedQueryParserand index settings.TransportPPLQueryAction.doExecute()routes parquet indices toRestUnifiedQueryAction, everything else to existing Lucene path unchanged. Routing happens after PPL enabled check, metrics, request ID, and profiling setup.UnifiedQueryPlanner.plan()parses PPL and generates CalciteRelNodeagainst a stub schema. Query size limit added to plan asLogicalSystemLimit, consistent with PPL V3 (QueryService.convertToCalcitePlan).AnalyticsExecutionEnginecallsQueryPlanExecutor.execute(relNode), convertsIterable<Object[]>toQueryResponsewith type mapping viaOpenSearchTypeFactory.sql-workerthread pool with security context propagation viawithCurrentContext().clearingListenerfromwrapWithProfilingClear()to ensure profiling thread-local state is cleaned up.StubQueryPlanExecutorreturns canned data forparquet_logsandparquet_metricstables.Step 3: Response formatting [Done] (explain support deferred to next PR)
QueryResponse->QueryResultwith query-type-awareLangSpec(PPL_SPEC or SQL_SPEC) ->JdbcResponseFormatterproduces standard JDBC JSON (schema,datarows,total,size,status).Step 4: Error handling (basic) [Done]
RestPPLQueryAction.isClientError()list minus NPE (NPE is a server bug in the analytics path). Will sync with Initial implementation of report-builder interface #5266.ErrorMessageFactory.createErrorMessage()for standard error response format.[unified] Planning completed in Xms,[unified] Execution completed in Xms, N rows returned.Not in this PR (next PR / blocked)
New Files
core/.../analytics/QueryPlanExecutor.java@FunctionalInterface-- contract for analytics engine executioncore/.../analytics/AnalyticsExecutionEngine.javaExecutionEngineimpl -- converts raw rows toQueryResponseplugin/.../rest/RestUnifiedQueryAction.javaplugin/.../rest/analytics/stub/StubQueryPlanExecutor.javaplugin/.../rest/analytics/stub/StubSchemaProvider.javaplugin/.../rest/analytics/stub/StubIndexDetector.javainteg-test/.../ppl/AnalyticsPPLIT.javaModified Files
plugin/.../transport/TransportPPLQueryAction.javaplugin/build.gradle:apidependency forUnifiedQueryPlannerTest plan
AnalyticsExecutionEngine(type mapping, row conversion, nulls, error propagation, PhysicalPlan rejection)RestUnifiedQueryAction(routing detection)AnalyticsPPLIT(full pipeline: schema+data verification withverifySchema/verifyDataRows, response format, projection, syntax error -> 400, Lucene regression)