Skip to content

Add query routing and execution handoff for Parquet-backed indices#5264

Merged
ahkcs merged 1 commit intoopensearch-project:feature/mustang-ppl-integrationfrom
ahkcs:pr/mustang-query-routing
Mar 25, 2026
Merged

Add query routing and execution handoff for Parquet-backed indices#5264
ahkcs merged 1 commit intoopensearch-project:feature/mustang-ppl-integrationfrom
ahkcs:pr/mustang-query-routing

Conversation

@ahkcs
Copy link
Copy Markdown
Collaborator

@ahkcs ahkcs commented Mar 24, 2026

Summary

Implements query routing and execution handoff for Project Mustang's unified query pipeline (#5247).

This PR covers Step 2 and Step 3 (partial) from the issue:

Step 2: Query routing and execution handoff

Add RestUnifiedQueryAction and AnalyticsExecutionEngine that detect non-Lucene indices, route PPL queries through UnifiedQueryPlanner.plan() -> QueryPlanExecutor.execute(), and schedule execution on sql-worker thread pool with security context propagation.

  • Index detection: isAnalyticsIndex() extracts index name from PPL query via regex, checks for parquet_ prefix. Production will use index settings (e.g., index.storage_type).
  • Query routing: RestPPLQueryAction.prepareRequest() branches -- parquet indices go to RestUnifiedQueryAction, everything else goes to existing Lucene path unchanged.
  • Planning: UnifiedQueryPlanner.plan() parses PPL and generates Calcite RelNode against a stub schema (will be replaced by EngineContext.getSchema() from the analytics engine).
  • Execution: AnalyticsExecutionEngine calls QueryPlanExecutor.execute(relNode), converts Iterable<Object[]> to QueryResponse with type mapping via OpenSearchTypeFactory and query size limit enforcement.
  • Thread scheduling: Runs on sql-worker thread pool via client.threadPool().schedule().
  • Stub executor: StubQueryPlanExecutor returns canned data for parquet_logs and parquet_metrics tables for development and testing.

Step 3: Response formatting (explain support deferred to next PR)

Format Iterable<Object[]> results via existing JdbcResponseFormatter; return logical RelNode plan via _plugins/_ppl/_explain.

  • Response formatting: QueryResponse -> QueryResult with PPL_SPEC -> JdbcResponseFormatter produces standard JDBC JSON (schema, datarows, total, size, status). Same format as existing PPL responses.
  • Explain support: Deferred to next PR.

Step 4: Error handling (basic)

Client vs server error classification, request/failure metrics.

  • Error classification: SyntaxCheckException/SemanticCheckException -> HTTP 400 + PPL_FAILED_REQ_COUNT_CUS; engine failures -> HTTP 500 + PPL_FAILED_REQ_COUNT_SYS.
  • Latency logging: [unified] Planning completed in Xms, [unified] Execution completed in Xms, N rows returned.

Not in this PR (next PR / blocked)

  • Step 1 (plugin wiring): Blocked -- analytics-engine JAR not available yet
  • Step 3 (explain): Will be in next PR
  • Step 5 (full integration tests with real analytics engine): Blocked -- will use stub for now

New Files

File Purpose
core/.../analytics/QueryPlanExecutor.java @FunctionalInterface -- contract for analytics engine execution
core/.../analytics/AnalyticsExecutionEngine.java ExecutionEngine impl -- converts raw rows to QueryResponse
plugin/.../rest/RestUnifiedQueryAction.java Orchestrates analytics query path end-to-end
plugin/.../rest/StubQueryPlanExecutor.java Mock data for development/testing
integ-test/.../ppl/AnalyticsPPLIT.java Integration tests -- full pipeline verification

Modified Files

File Change
plugin/.../rest/RestPPLQueryAction.java Added routing branch for parquet_ indices
plugin/.../SQLPlugin.java Passes ClusterService + NodeClient to RestPPLQueryAction
plugin/build.gradle Added :api dependency for UnifiedQueryPlanner

Test plan

  • 10 unit tests -- AnalyticsExecutionEngine (type mapping, row conversion, size limit, nulls, error propagation, PhysicalPlan rejection)
  • 10 unit tests -- RestUnifiedQueryAction (index extraction regex, routing detection for various PPL patterns)
  • 9 integration tests -- AnalyticsPPLIT (full pipeline: schema+data verification with verifySchema/verifyDataRows, response format, projection, syntax error -> 400, Lucene regression)

@ahkcs ahkcs changed the title [Mustang] Add query routing and execution handoff for Parquet-backed indices Add query routing and execution handoff for Parquet-backed indices Mar 24, 2026
@ahkcs ahkcs added PPL Piped processing language enhancement New feature or request labels Mar 24, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 24, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit a924eda.

PathLineSeverityDescription
plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java1highStubQueryPlanExecutor is placed in production source (plugin/src/main/java), not in test directories. It returns hardcoded fake data (IPs, log messages, metrics) for any query targeting 'parquet_*' indices. Combined with the routing logic in RestPPLQueryAction, this stub will be deployed to production and serve fabricated data to real users silently, with no indication the results are synthetic.
plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java95highQueries matching 'parquet_*' index names are silently rerouted to RestUnifiedQueryAction before the normal PPL execution path (nodeClient.execute(PPLQueryAction.INSTANCE, ...)). The normal path includes OpenSearch's transport-layer authorization and access control checks. The new analytics path does not demonstrate equivalent ACL enforcement, creating a potential authorization bypass for any user who can construct a PPL query targeting a parquet_-prefixed index.
plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java247mediumThe reportError method constructs a JSON error response via string concatenation with only partial escaping (backslash, double-quote, newline, carriage-return). Characters such as null bytes, Unicode control characters, or other special sequences in exception messages could produce malformed or injected JSON responses sent directly to REST clients.
plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java226mediumNullPointerException is classified as a client error in isClientError(), causing it to return HTTP 400 Bad Request and increment the client-fault metric. This misclassification could mask server-side null dereference bugs as user errors, suppressing alerts and making server-side defects harder to detect or investigate in production monitoring.

The table above displays the top 10 most important findings.

Total: 4 | Critical: 0 | High: 2 | Medium: 2 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@ahkcs ahkcs force-pushed the pr/mustang-query-routing branch from 98f58c0 to 2860604 Compare March 24, 2026 19:17
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 24, 2026

PR Reviewer Guide 🔍

(Review updated until commit c3de69f)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: Core analytics execution engine and QueryPlanExecutor interface

Relevant files:

  • core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java
  • core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java
  • core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java

Sub-PR theme: PPL query routing to analytics engine with stub executor and integration tests

Relevant files:

  • plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java
  • plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java
  • plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java
  • plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
  • plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java
  • integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java

⚡ Recommended focus areas for review

JSON Injection

The reportError method manually constructs a JSON string by embedding e.getClass().getSimpleName() directly without sanitization. A class name could theoretically contain characters that break JSON structure. More critically, the manual escaping of reason is incomplete — it does not escape tab characters (\t) or other control characters (U+0000–U+001F), which are invalid unescaped in JSON strings and can cause JSON parse errors or injection in some clients.

private static void reportError(RestChannel channel, Exception e) {
  RestStatus status =
      isClientError(e) ? RestStatus.BAD_REQUEST : RestStatus.INTERNAL_SERVER_ERROR;
  String reason = e.getMessage() != null ? e.getMessage() : "Unknown error";
  // Escape characters that would break JSON
  reason =
      reason.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", "");
  channel.sendResponse(
      new BytesRestResponse(
          status,
          "application/json; charset=UTF-8",
          "{\"error\":{\"type\":\""
              + e.getClass().getSimpleName()
              + "\",\"reason\":\""
              + reason
              + "\"},\"status\":"
              + status.getStatus()
              + "}"));
}
Wrong Error Classification

NullPointerException is classified as a client error in isClientError. NPEs are almost always programming bugs (server errors), not malformed user input. Classifying them as client errors (HTTP 400) will mislead users and suppress server-error metrics (PPL_FAILED_REQ_COUNT_SYS), making bugs harder to detect.

private static boolean isClientError(Exception e) {
  return e instanceof SyntaxCheckException
      || e instanceof SemanticCheckException
      || e instanceof IllegalArgumentException
      || e instanceof NullPointerException;
}
Routing False Positive

isAnalyticsIndex checks only the last dot-separated segment for the parquet_ prefix. A query like source = parquet_catalog.regular_table would extract regular_table as the table name and correctly return false, but source = regular_catalog.parquet_logs would route to analytics. This is likely intended, but source = parquet_logs.something (where parquet_logs is a catalog, not a table) would also be routed to analytics incorrectly. The regex also matches source appearing in filter expressions (e.g., source = x | where source = parquet_logs), potentially misidentifying the index.

public static boolean isAnalyticsIndex(String query) {
  if (query == null) {
    return false;
  }
  String indexName = extractIndexName(query);
  if (indexName == null) {
    return false;
  }
  // Handle qualified names like "catalog.parquet_logs" — check the last segment
  int lastDot = indexName.lastIndexOf('.');
  String tableName = lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName;
  return tableName.startsWith("parquet_");
}
Null Context Passed

planExecutor.execute(plan, null) always passes null as the execution context. The QueryPlanExecutor interface documents the context parameter as "opaque", but passing null unconditionally means the real analytics engine (when substituted) will receive no context, likely causing a NullPointerException or silent misbehavior. The CalcitePlanContext is available and should be passed or wrapped.

Iterable<Object[]> rows = planExecutor.execute(plan, null);

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 24, 2026

PR Code Suggestions ✨

Latest suggestions up to c3de69f

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Remove NullPointerException from client error classification

NullPointerException is a programming error (server bug), not a client error.
Classifying it as a client error will return a misleading 400 BAD_REQUEST to the
user when the server has a null pointer bug. Remove it from the client error
classification.

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java [227-232]

 private static boolean isClientError(Exception e) {
     return e instanceof SyntaxCheckException
         || e instanceof SemanticCheckException
-        || e instanceof IllegalArgumentException
-        || e instanceof NullPointerException;
+        || e instanceof IllegalArgumentException;
 }
Suggestion importance[1-10]: 7

__

Why: NullPointerException is a server-side programming error and should not be classified as a client error (400 BAD_REQUEST). This misclassification would mislead users and hide server bugs.

Medium
Security
Use proper JSON serialization for error responses

The manual JSON escaping is incomplete and fragile — it misses tab characters (\t),
control characters, and other special characters that could break JSON or enable
injection. Use a proper JSON serialization approach (e.g., org.json.JSONObject or
Jackson) to build the error response safely.

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java [249-262]

-reason =
-    reason.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", "");
+org.json.JSONObject errorJson = new org.json.JSONObject();
+org.json.JSONObject errorDetail = new org.json.JSONObject();
+errorDetail.put("type", e.getClass().getSimpleName());
+errorDetail.put("reason", reason);
+errorJson.put("error", errorDetail);
+errorJson.put("status", status.getStatus());
 channel.sendResponse(
     new BytesRestResponse(
         status,
         "application/json; charset=UTF-8",
-        "{\"error\":{\"type\":\""
-            + e.getClass().getSimpleName()
-            + "\",\"reason\":\""
-            + reason
-            + "\"},\"status\":"
-            + status.getStatus()
-            + "}"));
+        errorJson.toString()));
Suggestion importance[1-10]: 6

__

Why: The manual JSON escaping is incomplete and could miss control characters or enable injection. Using a proper JSON library like org.json.JSONObject would be safer and more maintainable.

Low
General
Pass execution context instead of null to plan executor

Passing null as the execution context to planExecutor.execute() is fragile and may
cause NullPointerException in the real executor implementation. The context
parameter from CalcitePlanContext should be passed or a proper empty/default context
object should be used instead of null.

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java [69]

-Iterable<Object[]> rows = planExecutor.execute(plan, null);
+Iterable<Object[]> rows = planExecutor.execute(plan, context);
Suggestion importance[1-10]: 5

__

Why: Passing null as the execution context is fragile and could cause NullPointerException in real executor implementations. However, the CalcitePlanContext is not the same type as the opaque Object context parameter in QueryPlanExecutor, so passing context directly may not be semantically correct either.

Low
Fix substring matching order to avoid false positives

The order of checks matters here: planStr.contains("parquet_logs") will also match a
plan string that contains "parquet_logs" as a substring of "parquet_metrics_logs" or
similar, but more critically, "parquet_logs" check must come before
"parquet_metrics" since "parquet_metrics" does not contain "parquet_logs" — however
the reverse is not true if table names overlap. The current order is actually safe
for these two specific names, but the parquet_logs check should use a more specific
match to avoid false positives with future table names that might contain
parquet_logs as a substring.

plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java [49-59]

 private String extractTableName(RelNode plan) {
-    // Use RelOptUtil.toString to get the full plan tree including child nodes
     String planStr = RelOptUtil.toString(plan);
+    if (planStr.contains("parquet_metrics")) {
+      return "parquet_metrics";
+    }
     if (planStr.contains("parquet_logs")) {
       return "parquet_logs";
-    }
-    if (planStr.contains("parquet_metrics")) {
-      return "parquet_metrics";
     }
     return null;
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion to check parquet_metrics before parquet_logs is a minor defensive improvement for a stub class, but the current order is already safe for the two specific table names used. The impact is low since this is a temporary stub.

Low

Previous suggestions

Suggestions up to commit f84e2d7
CategorySuggestion                                                                                                                                    Impact
Possible issue
Remove NullPointerException from client error classification

NullPointerException is a programming error (server bug), not a client error.
Including it in isClientError will cause server-side bugs to be reported as 400 Bad
Request and increment the client error metric instead of the server error metric,
masking real issues. Remove NullPointerException from this check.

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java [226-231]

 private static boolean isClientError(Exception e) {
     return e instanceof SyntaxCheckException
         || e instanceof SemanticCheckException
-        || e instanceof IllegalArgumentException
-        || e instanceof NullPointerException;
+        || e instanceof IllegalArgumentException;
 }
Suggestion importance[1-10]: 7

__

Why: NullPointerException is a programming/server error, not a client error. Classifying it as a client error would return 400 status codes and increment the wrong metric, masking real bugs. This is a valid correctness concern.

Medium
Pass execution context instead of null to plan executor

Passing null as the execution context to planExecutor.execute is fragile — if the
QueryPlanExecutor implementation expects a non-null context, this will cause a
NullPointerException at runtime. The CalcitePlanContext is already available in
scope; consider passing it or a derived context object instead of null.

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java [69]

-Iterable<Object[]> rows = planExecutor.execute(plan, null);
+Iterable<Object[]> rows = planExecutor.execute(plan, context);
Suggestion importance[1-10]: 5

__

Why: Passing null as the context is intentional per the QueryPlanExecutor interface design (context is "opaque to avoid server dependency"), but passing the actual context object could cause issues if the real executor doesn't expect a CalcitePlanContext. The suggestion is reasonable but may not be correct given the interface's design intent.

Low
Security
Use safe JSON serialization for error responses

Manual JSON string construction via concatenation is fragile and incomplete — it
doesn't escape tab characters (\t), Unicode control characters, or other special
characters that could break JSON parsing or enable injection. Use a proper JSON
serialization approach (e.g., org.json.JSONObject) to safely build the error
response.

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java [248-261]

-reason =
-    reason.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", "");
+org.json.JSONObject errorObj = new org.json.JSONObject();
+org.json.JSONObject errorDetail = new org.json.JSONObject();
+errorDetail.put("type", e.getClass().getSimpleName());
+errorDetail.put("reason", reason);
+errorObj.put("error", errorDetail);
+errorObj.put("status", status.getStatus());
 channel.sendResponse(
     new BytesRestResponse(
         status,
         "application/json; charset=UTF-8",
-        "{\"error\":{\"type\":\""
-            + e.getClass().getSimpleName()
-            + "\",\"reason\":\""
-            + reason
-            + "\"},\"status\":"
-            + status.getStatus()
-            + "}"));
+        errorObj.toString()));
Suggestion importance[1-10]: 6

__

Why: Manual JSON string construction is fragile and incomplete — it doesn't handle all special characters. Using a proper JSON library like org.json.JSONObject would be safer and more maintainable.

Low
General
Remove unused constructor parameter to prevent confusion

The clusterService parameter is accepted but never used in the constructor body. If
it is intended for future use (e.g., checking index settings for routing), this is
fine, but if it was meant to be passed to RestUnifiedQueryAction or used for routing
decisions, it is silently dropped, which is a bug. Either use it or remove it from
the signature to avoid confusion.

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java [53-56]

-public RestPPLQueryAction(ClusterService clusterService, NodeClient client) {
+public RestPPLQueryAction(NodeClient client) {
     super();
     this.unifiedQueryHandler = new RestUnifiedQueryAction(client, new StubQueryPlanExecutor());
 }
Suggestion importance[1-10]: 4

__

Why: The clusterService parameter is accepted but never used, which is confusing. However, the comment in the PR suggests it may be intended for future use (checking index settings), so removing it could be premature. The suggestion has moderate relevance.

Low
Suggestions up to commit 2860604
CategorySuggestion                                                                                                                                    Impact
Possible issue
Remove NullPointerException from client error classification

NullPointerException is a programming error (server bug), not a client error.
Including it as a client error will mask real bugs by returning HTTP 400 instead of
500, making them harder to diagnose. Remove it from the client error classification.

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java [226-231]

 private static boolean isClientError(Exception e) {
     return e instanceof SyntaxCheckException
         || e instanceof SemanticCheckException
-        || e instanceof IllegalArgumentException
-        || e instanceof NullPointerException;
+        || e instanceof IllegalArgumentException;
   }
Suggestion importance[1-10]: 7

__

Why: NullPointerException is a programming error (server bug) and should not be classified as a client error (HTTP 400). This would mask real bugs and make them harder to diagnose. The fix is straightforward and correct.

Medium
General
Use proper JSON serialization for error responses

The manual JSON escaping is incomplete and fragile — it doesn't handle tab
characters (\t), Unicode control characters, or other special characters that would
produce invalid JSON. Use a proper JSON serialization approach (e.g., Jackson
ObjectMapper or a JSONObject) to build the error response to ensure the output is
always valid JSON.

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java [248-261]

-reason =
-        reason.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", "");
-    channel.sendResponse(
-        new BytesRestResponse(
-            status,
-            "application/json; charset=UTF-8",
-            "{\"error\":{\"type\":\""
-                + e.getClass().getSimpleName()
-                + "\",\"reason\":\""
-                + reason
-                + "\"},\"status\":"
-                + status.getStatus()
-                + "}"));
+org.json.JSONObject errorJson = new org.json.JSONObject();
+org.json.JSONObject errorDetail = new org.json.JSONObject();
+errorDetail.put("type", e.getClass().getSimpleName());
+errorDetail.put("reason", e.getMessage() != null ? e.getMessage() : "Unknown error");
+errorJson.put("error", errorDetail);
+errorJson.put("status", status.getStatus());
+channel.sendResponse(
+    new BytesRestResponse(
+        status,
+        "application/json; charset=UTF-8",
+        errorJson.toString()));
Suggestion importance[1-10]: 6

__

Why: The manual JSON escaping is fragile and incomplete (missing tab, Unicode control characters, etc.), which could produce invalid JSON. Using a proper JSON library like org.json.JSONObject would be more robust and maintainable.

Low
Remove unused constructor parameter

The clusterService parameter is accepted but never used in the constructor body.
This is misleading and suggests incomplete implementation. Either use it (e.g., to
look up index settings for routing decisions) or remove it from the signature to
avoid confusion.

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java [53-56]

-public RestPPLQueryAction(ClusterService clusterService, NodeClient client) {
+public RestPPLQueryAction(NodeClient client) {
     super();
     this.unifiedQueryHandler = new RestUnifiedQueryAction(client, new StubQueryPlanExecutor());
   }
Suggestion importance[1-10]: 5

__

Why: The clusterService parameter is accepted but never used in the constructor body, which is misleading. However, removing it would require updating the call site in SQLPlugin.java, and it may be intentionally kept for future use.

Low
Fix table name extraction ordering to avoid substring false matches

The order of checks matters here: "parquet_logs" is checked before
"parquet_metrics", but since "parquet_metrics" does not contain "parquet_logs" this
is fine currently. However, "parquet_logs" is a substring of any future table like
"parquet_logs_archive", which would incorrectly match. Check for "parquet_metrics"
first (more specific) and use exact matching or word-boundary checks to avoid false
positives.

plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java [49-59]

 private String extractTableName(RelNode plan) {
-    // Use RelOptUtil.toString to get the full plan tree including child nodes
     String planStr = RelOptUtil.toString(plan);
+    if (planStr.contains("parquet_metrics")) {
+      return "parquet_metrics";
+    }
     if (planStr.contains("parquet_logs")) {
       return "parquet_logs";
-    }
-    if (planStr.contains("parquet_metrics")) {
-      return "parquet_metrics";
     }
     return null;
   }
Suggestion importance[1-10]: 3

__

Why: The suggestion addresses a potential future issue where "parquet_logs" could match table names like "parquet_logs_archive", but this is a stub class that will be replaced by the real analytics engine. The impact is low given the temporary nature of this code.

Low

@ahkcs ahkcs force-pushed the pr/mustang-query-routing branch from 2860604 to f84e2d7 Compare March 24, 2026 20:09
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f84e2d7

@ahkcs ahkcs force-pushed the pr/mustang-query-routing branch from f84e2d7 to c3de69f Compare March 24, 2026 20:15
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c3de69f

@ahkcs ahkcs force-pushed the pr/mustang-query-routing branch from c3de69f to ea8047f Compare March 24, 2026 20:28
…indices (opensearch-project#5247)

Implements the query routing and AnalyticsExecutionEngine for Project
Mustang's unified query pipeline. PPL queries targeting parquet_ prefixed
indices are routed through UnifiedQueryPlanner and executed via a stub
QueryPlanExecutor, with results formatted through the existing JDBC
response pipeline.

New files:
- QueryPlanExecutor: @FunctionalInterface contract for analytics engine
- AnalyticsExecutionEngine: converts Iterable<Object[]> to QueryResponse
  with type mapping and query size limit enforcement
- RestUnifiedQueryAction: orchestrates schema building, planning,
  execution on sql-worker thread pool, with client/server error
  classification and metrics
- StubQueryPlanExecutor: canned data for parquet_logs and parquet_metrics
  tables for development and testing

Modified files:
- RestPPLQueryAction: routing branch for parquet_ indices
- SQLPlugin: passes ClusterService and NodeClient to RestPPLQueryAction
- plugin/build.gradle: adds :api dependency for UnifiedQueryPlanner

Signed-off-by: Kai Huang <kaihuang@amazon.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>
@ahkcs ahkcs force-pushed the pr/mustang-query-routing branch from ea8047f to a924eda Compare March 24, 2026 20:31

// --- isAnalyticsIndex ---

@Test
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, I will add a generic UnifiedQueryParser for this extraction. Probably you simplify the test below and also think about the test scope between this and AnalyticsExecutionEngineTest.

Copy link
Copy Markdown
Collaborator Author

@ahkcs ahkcs Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, PR updates have now been moved to #5267 due to accidental merge
Thanks for the heads up on UnifiedQueryParser. I've simplified the tests to just 2 routing behavior tests (parquet index routes to analytics, non-parquet routes to Lucene). Removed all the extractIndexName tests since that logic will be replaced.

}

/** Classify whether the exception is a client error (bad query) or server error (engine bug). */
private static boolean isClientError(Exception e) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall we have a long list in current SQL action. Could you double check?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Updated isClientError() to match the full list from RestPPLQueryAction

* "parquet_*" table. Will be replaced by EngineContext.getSchema() when the analytics engine is
* ready.
*/
private static AbstractSchema buildStubSchema() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make this a stub class and move it with StubQueryPlanExecutor (and maybe your temporary extraction logic together?) into a sub package? I'm just thinking this makes clear what we're stubbing.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved all stub code into plugin/.../rest/analytics/stub/ sub-package:

StubIndexDetector (temporary index extraction + parquet_ prefix check),
StubSchemaProvider (hardcoded Calcite schema)
StubQueryPlanExecutor (canned data)

RestUnifiedQueryAction now delegates to these.

new BytesRestResponse(
status,
"application/json; charset=UTF-8",
"{\"error\":{\"type\":\""
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot is this the standard way we do in SQL/PPL rest action today?

Copy link
Copy Markdown
Collaborator Author

@ahkcs ahkcs Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Replaced the hand-crafted JSON with ErrorMessageFactory.createErrorMessage(e, status.getStatus()).toString() -- same pattern as RestPPLQueryAction.reportError().

private static void recordFailureMetric(Exception e) {
if (isClientError(e)) {
LOG.warn("[unified] Client error in query execution", e);
Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS).increment();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see how we can make this flexible because we're going to enable unified SQL in this same class soon. Same for other code if PPL-specific.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made metrics and LangSpec query-type-aware. Now uses QueryType to select:

  • Metrics: PPL_REQ_TOTAL / PPL_FAILED_REQ_COUNT_* for PPL, REQ_TOTAL / FAILED_REQ_COUNT_* for SQL
  • Response formatting: PPL_SPEC for PPL, LangSpec.SQL_SPEC for SQL

TransportPPLQueryRequest transportPPLQueryRequest =
new TransportPPLQueryRequest(PPLQueryRequestFactory.getPPLRequest(request));

// Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you double check whether we need to do this here or after the transport action below? Maybe see why we added PPL transport action previously.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved routing from RestPPLQueryAction to TransportPPLQueryAction.doExecute().
RestPPLQueryAction and SQLPlugin are reverted to their original state

public void execute(
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
try {
Integer querySizeLimit = context.sysLimit.querySizeLimit();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check how PPL V3 handle this today? I'm thinking if we want to be consistent, this needs to be part of the RelNode plan we pass to QueryPlanExecutor instead of post-processing? Otherwise we may pull lots of data back?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Moved the query size limit into the RelNode plan using LogicalSystemLimit.create(), same pattern as QueryService.convertToCalcitePlan(). The limit is now part of the plan the analytics engine receives, so it can enforce it during execution instead of us pulling all rows back first.

Removed the post-processing truncation from AnalyticsExecutionEngine

@ahkcs ahkcs merged commit a924eda into opensearch-project:feature/mustang-ppl-integration Mar 25, 2026
34 of 36 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request PPL Piped processing language

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants