Add query routing and execution handoff for Parquet-backed indices#5264
Conversation
PR Code Analyzer ❗AI-powered 'Code-Diff-Analyzer' found issues on commit a924eda.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
98f58c0 to
2860604
Compare
PR Reviewer Guide 🔍(Review updated until commit c3de69f)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to c3de69f Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit f84e2d7
Suggestions up to commit 2860604
|
2860604 to
f84e2d7
Compare
|
Persistent review updated to latest commit f84e2d7 |
f84e2d7 to
c3de69f
Compare
|
Persistent review updated to latest commit c3de69f |
c3de69f to
ea8047f
Compare
…indices (opensearch-project#5247) Implements the query routing and AnalyticsExecutionEngine for Project Mustang's unified query pipeline. PPL queries targeting parquet_ prefixed indices are routed through UnifiedQueryPlanner and executed via a stub QueryPlanExecutor, with results formatted through the existing JDBC response pipeline. New files: - QueryPlanExecutor: @FunctionalInterface contract for analytics engine - AnalyticsExecutionEngine: converts Iterable<Object[]> to QueryResponse with type mapping and query size limit enforcement - RestUnifiedQueryAction: orchestrates schema building, planning, execution on sql-worker thread pool, with client/server error classification and metrics - StubQueryPlanExecutor: canned data for parquet_logs and parquet_metrics tables for development and testing Modified files: - RestPPLQueryAction: routing branch for parquet_ indices - SQLPlugin: passes ClusterService and NodeClient to RestPPLQueryAction - plugin/build.gradle: adds :api dependency for UnifiedQueryPlanner Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
ea8047f to
a924eda
Compare
|
|
||
| // --- isAnalyticsIndex --- | ||
|
|
||
| @Test |
There was a problem hiding this comment.
FYI, I will add a generic UnifiedQueryParser for this extraction. Probably you simplify the test below and also think about the test scope between this and AnalyticsExecutionEngineTest.
There was a problem hiding this comment.
FYI, PR updates have now been moved to #5267 due to accidental merge
Thanks for the heads up on UnifiedQueryParser. I've simplified the tests to just 2 routing behavior tests (parquet index routes to analytics, non-parquet routes to Lucene). Removed all the extractIndexName tests since that logic will be replaced.
| } | ||
|
|
||
| /** Classify whether the exception is a client error (bad query) or server error (engine bug). */ | ||
| private static boolean isClientError(Exception e) { |
There was a problem hiding this comment.
I recall we have a long list in current SQL action. Could you double check?
There was a problem hiding this comment.
Good catch. Updated isClientError() to match the full list from RestPPLQueryAction
| * "parquet_*" table. Will be replaced by EngineContext.getSchema() when the analytics engine is | ||
| * ready. | ||
| */ | ||
| private static AbstractSchema buildStubSchema() { |
There was a problem hiding this comment.
Could you make this a stub class and move it with StubQueryPlanExecutor (and maybe your temporary extraction logic together?) into a sub package? I'm just thinking this makes clear what we're stubbing.
There was a problem hiding this comment.
Moved all stub code into plugin/.../rest/analytics/stub/ sub-package:
StubIndexDetector (temporary index extraction + parquet_ prefix check),
StubSchemaProvider (hardcoded Calcite schema)
StubQueryPlanExecutor (canned data)
RestUnifiedQueryAction now delegates to these.
| new BytesRestResponse( | ||
| status, | ||
| "application/json; charset=UTF-8", | ||
| "{\"error\":{\"type\":\"" |
There was a problem hiding this comment.
I forgot is this the standard way we do in SQL/PPL rest action today?
There was a problem hiding this comment.
Good call. Replaced the hand-crafted JSON with ErrorMessageFactory.createErrorMessage(e, status.getStatus()).toString() -- same pattern as RestPPLQueryAction.reportError().
| private static void recordFailureMetric(Exception e) { | ||
| if (isClientError(e)) { | ||
| LOG.warn("[unified] Client error in query execution", e); | ||
| Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS).increment(); |
There was a problem hiding this comment.
Please see how we can make this flexible because we're going to enable unified SQL in this same class soon. Same for other code if PPL-specific.
There was a problem hiding this comment.
Made metrics and LangSpec query-type-aware. Now uses QueryType to select:
- Metrics: PPL_REQ_TOTAL / PPL_FAILED_REQ_COUNT_* for PPL, REQ_TOTAL / FAILED_REQ_COUNT_* for SQL
- Response formatting: PPL_SPEC for PPL, LangSpec.SQL_SPEC for SQL
| TransportPPLQueryRequest transportPPLQueryRequest = | ||
| new TransportPPLQueryRequest(PPLQueryRequestFactory.getPPLRequest(request)); | ||
|
|
||
| // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices |
There was a problem hiding this comment.
Could you double check whether we need to do this here or after the transport action below? Maybe see why we added PPL transport action previously.
There was a problem hiding this comment.
Moved routing from RestPPLQueryAction to TransportPPLQueryAction.doExecute().
RestPPLQueryAction and SQLPlugin are reverted to their original state
| public void execute( | ||
| RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) { | ||
| try { | ||
| Integer querySizeLimit = context.sysLimit.querySizeLimit(); |
There was a problem hiding this comment.
Could you check how PPL V3 handle this today? I'm thinking if we want to be consistent, this needs to be part of the RelNode plan we pass to QueryPlanExecutor instead of post-processing? Otherwise we may pull lots of data back?
There was a problem hiding this comment.
Good catch. Moved the query size limit into the RelNode plan using LogicalSystemLimit.create(), same pattern as QueryService.convertToCalcitePlan(). The limit is now part of the plan the analytics engine receives, so it can enforce it during execution instead of us pulling all rows back first.
Removed the post-processing truncation from AnalyticsExecutionEngine
a924eda
into
opensearch-project:feature/mustang-ppl-integration
Summary
Implements query routing and execution handoff for Project Mustang's unified query pipeline (#5247).
This PR covers Step 2 and Step 3 (partial) from the issue:
Step 2: Query routing and execution handoff
isAnalyticsIndex()extracts index name from PPL query via regex, checks forparquet_prefix. Production will use index settings (e.g.,index.storage_type).RestPPLQueryAction.prepareRequest()branches -- parquet indices go toRestUnifiedQueryAction, everything else goes to existing Lucene path unchanged.UnifiedQueryPlanner.plan()parses PPL and generates CalciteRelNodeagainst a stub schema (will be replaced byEngineContext.getSchema()from the analytics engine).AnalyticsExecutionEnginecallsQueryPlanExecutor.execute(relNode), convertsIterable<Object[]>toQueryResponsewith type mapping viaOpenSearchTypeFactoryand query size limit enforcement.sql-workerthread pool viaclient.threadPool().schedule().StubQueryPlanExecutorreturns canned data forparquet_logsandparquet_metricstables for development and testing.Step 3: Response formatting (explain support deferred to next PR)
QueryResponse->QueryResultwithPPL_SPEC->JdbcResponseFormatterproduces standard JDBC JSON (schema,datarows,total,size,status). Same format as existing PPL responses.Step 4: Error handling (basic)
SyntaxCheckException/SemanticCheckException-> HTTP 400 +PPL_FAILED_REQ_COUNT_CUS; engine failures -> HTTP 500 +PPL_FAILED_REQ_COUNT_SYS.[unified] Planning completed in Xms,[unified] Execution completed in Xms, N rows returned.Not in this PR (next PR / blocked)
New Files
core/.../analytics/QueryPlanExecutor.java@FunctionalInterface-- contract for analytics engine executioncore/.../analytics/AnalyticsExecutionEngine.javaExecutionEngineimpl -- converts raw rows toQueryResponseplugin/.../rest/RestUnifiedQueryAction.javaplugin/.../rest/StubQueryPlanExecutor.javainteg-test/.../ppl/AnalyticsPPLIT.javaModified Files
plugin/.../rest/RestPPLQueryAction.javaparquet_indicesplugin/.../SQLPlugin.javaClusterService+NodeClienttoRestPPLQueryActionplugin/build.gradle:apidependency forUnifiedQueryPlannerTest plan
AnalyticsExecutionEngine(type mapping, row conversion, size limit, nulls, error propagation, PhysicalPlan rejection)RestUnifiedQueryAction(index extraction regex, routing detection for various PPL patterns)AnalyticsPPLIT(full pipeline: schema+data verification withverifySchema/verifyDataRows, response format, projection, syntax error -> 400, Lucene regression)