diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java new file mode 100644 index 00000000000..4159fa4845d --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.analytics; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.executor.ExecutionContext; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.pagination.Cursor; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** + * Execution engine adapter for the analytics engine (Project Mustang). + * + *

Bridges the analytics engine's {@link QueryPlanExecutor} with the SQL plugin's {@link + * ExecutionEngine} response pipeline. Takes a Calcite {@link RelNode}, delegates execution to the + * analytics engine, and converts the raw results into {@link QueryResponse}. + */ +public class AnalyticsExecutionEngine implements ExecutionEngine { + + private final QueryPlanExecutor planExecutor; + + public AnalyticsExecutionEngine(QueryPlanExecutor planExecutor) { + this.planExecutor = planExecutor; + } + + /** Not supported. Analytics queries use the RelNode path exclusively. */ + @Override + public void execute(PhysicalPlan plan, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + /** Not supported. Analytics queries use the RelNode path exclusively. */ + @Override + public void execute( + PhysicalPlan plan, ExecutionContext context, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + /** Not supported. Analytics queries use the RelNode path exclusively. */ + @Override + public void explain(PhysicalPlan plan, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + @Override + public void execute( + RelNode plan, CalcitePlanContext context, ResponseListener listener) { + try { + Integer querySizeLimit = context.sysLimit.querySizeLimit(); + Iterable rows = planExecutor.execute(plan, null); + + List fields = plan.getRowType().getFieldList(); + List results = convertRows(rows, fields, querySizeLimit); + Schema schema = buildSchema(fields); + + listener.onResponse(new QueryResponse(schema, results, Cursor.None)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private List convertRows( + Iterable rows, List fields, Integer querySizeLimit) { + List results = new ArrayList<>(); + for (Object[] row : rows) { + if (querySizeLimit != null && results.size() >= querySizeLimit) { + break; + } + Map valueMap = new LinkedHashMap<>(); + for (int i = 0; i < fields.size(); i++) { + String columnName = fields.get(i).getName(); + Object value = (i < row.length) ? row[i] : null; + valueMap.put(columnName, ExprValueUtils.fromObjectValue(value)); + } + results.add(ExprTupleValue.fromExprValueMap(valueMap)); + } + return results; + } + + private Schema buildSchema(List fields) { + List columns = new ArrayList<>(); + for (RelDataTypeField field : fields) { + ExprType exprType = convertType(field.getType()); + columns.add(new Schema.Column(field.getName(), null, exprType)); + } + return new Schema(columns); + } + + private ExprType convertType(RelDataType type) { + try { + return OpenSearchTypeFactory.convertRelDataTypeToExprType(type); + } catch (IllegalArgumentException e) { + return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN; + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java b/core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java new file mode 100644 index 00000000000..fd322ca432a --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.analytics; + +import org.apache.calcite.rel.RelNode; + +/** + * Executes a Calcite {@link RelNode} logical plan against the analytics engine. + * + *

This is a local equivalent of {@code org.opensearch.analytics.exec.QueryPlanExecutor} from the + * analytics-framework library. It will be replaced by the upstream interface once the + * analytics-framework JAR is published. + * + * @see Upstream + * QueryPlanExecutor + */ +@FunctionalInterface +public interface QueryPlanExecutor { + + /** + * Executes the given logical plan and returns result rows. + * + * @param plan the Calcite RelNode subtree to execute + * @param context execution context (opaque to avoid server dependency) + * @return rows produced by the engine + */ + Iterable execute(RelNode plan, Object context); +} diff --git a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java new file mode 100644 index 00000000000..04e65b1d383 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java @@ -0,0 +1,383 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.analytics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.SysLimit; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; +import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +class AnalyticsExecutionEngineTest { + + private AnalyticsExecutionEngine engine; + private QueryPlanExecutor mockExecutor; + private CalcitePlanContext mockContext; + + @BeforeEach + void setUp() throws Exception { + mockExecutor = mock(QueryPlanExecutor.class); + engine = new AnalyticsExecutionEngine(mockExecutor); + mockContext = mock(CalcitePlanContext.class); + setSysLimit(mockContext, SysLimit.DEFAULT); + } + + /** Sets the public final sysLimit field on a mocked CalcitePlanContext. */ + private static void setSysLimit(CalcitePlanContext context, SysLimit sysLimit) throws Exception { + Field field = CalcitePlanContext.class.getDeclaredField("sysLimit"); + field.setAccessible(true); + field.set(context, sysLimit); + } + + @Test + void executeRelNode_basicTypesAndRows() { + RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER); + Iterable rows = Arrays.asList(new Object[] {"Alice", 30}, new Object[] {"Bob", 25}); + when(mockExecutor.execute(relNode, null)).thenReturn(rows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + // Schema: 2 columns [name:STRING, age:INTEGER] + assertEquals(2, response.getSchema().getColumns().size(), "Column count. " + dump); + assertEquals("name", response.getSchema().getColumns().get(0).getName(), dump); + assertEquals(ExprCoreType.STRING, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals("age", response.getSchema().getColumns().get(1).getName(), dump); + assertEquals( + ExprCoreType.INTEGER, response.getSchema().getColumns().get(1).getExprType(), dump); + + // Rows: [{name=Alice, age=30}, {name=Bob, age=25}] + assertEquals(2, response.getResults().size(), "Row count. " + dump); + assertEquals( + "Alice", response.getResults().get(0).tupleValue().get("name").value(), "Row 0. " + dump); + assertEquals( + 30, response.getResults().get(0).tupleValue().get("age").value(), "Row 0. " + dump); + assertEquals( + "Bob", response.getResults().get(1).tupleValue().get("name").value(), "Row 1. " + dump); + assertEquals( + 25, response.getResults().get(1).tupleValue().get("age").value(), "Row 1. " + dump); + + // Cursor: None + assertEquals(org.opensearch.sql.executor.pagination.Cursor.None, response.getCursor(), dump); + } + + @Test + void executeRelNode_numericTypes() { + RelNode relNode = + mockRelNode( + "b", SqlTypeName.TINYINT, + "s", SqlTypeName.SMALLINT, + "i", SqlTypeName.INTEGER, + "l", SqlTypeName.BIGINT, + "f", SqlTypeName.FLOAT, + "d", SqlTypeName.DOUBLE); + Iterable rows = + Collections.singletonList(new Object[] {(byte) 1, (short) 2, 3, 4L, 5.0f, 6.0}); + when(mockExecutor.execute(relNode, null)).thenReturn(rows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(ExprCoreType.BYTE, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals(ExprCoreType.SHORT, response.getSchema().getColumns().get(1).getExprType(), dump); + assertEquals( + ExprCoreType.INTEGER, response.getSchema().getColumns().get(2).getExprType(), dump); + assertEquals(ExprCoreType.LONG, response.getSchema().getColumns().get(3).getExprType(), dump); + assertEquals(ExprCoreType.FLOAT, response.getSchema().getColumns().get(4).getExprType(), dump); + assertEquals(ExprCoreType.DOUBLE, response.getSchema().getColumns().get(5).getExprType(), dump); + + // Verify actual values + assertEquals( + (byte) 1, + response.getResults().get(0).tupleValue().get("b").value(), + "byte value. " + dump); + assertEquals( + (short) 2, + response.getResults().get(0).tupleValue().get("s").value(), + "short value. " + dump); + assertEquals( + 3, response.getResults().get(0).tupleValue().get("i").value(), "int value. " + dump); + assertEquals( + 4L, response.getResults().get(0).tupleValue().get("l").value(), "long value. " + dump); + assertEquals( + 5.0f, response.getResults().get(0).tupleValue().get("f").value(), "float value. " + dump); + assertEquals( + 6.0, response.getResults().get(0).tupleValue().get("d").value(), "double value. " + dump); + } + + @Test + void executeRelNode_temporalTypes() { + RelNode relNode = + mockRelNode("dt", SqlTypeName.DATE, "tm", SqlTypeName.TIME, "ts", SqlTypeName.TIMESTAMP); + Iterable emptyRows = Collections.emptyList(); + when(mockExecutor.execute(relNode, null)).thenReturn(emptyRows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(ExprCoreType.DATE, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals(ExprCoreType.TIME, response.getSchema().getColumns().get(1).getExprType(), dump); + assertEquals( + ExprCoreType.TIMESTAMP, response.getSchema().getColumns().get(2).getExprType(), dump); + assertEquals(0, response.getResults().size(), "Should have 0 rows. " + dump); + } + + @Test + void executeRelNode_querySizeLimit() throws Exception { + RelNode relNode = mockRelNode("id", SqlTypeName.INTEGER); + List manyRows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + manyRows.add(new Object[] {i}); + } + when(mockExecutor.execute(relNode, null)).thenReturn(manyRows); + setSysLimit(mockContext, new SysLimit(10, 10000, 50000)); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals( + 10, + response.getResults().size(), + "Should truncate to querySizeLimit=10, got " + response.getResults().size() + ". " + dump); + } + + @Test + void executeRelNode_emptyResults() { + RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR); + Iterable emptyRows = Collections.emptyList(); + when(mockExecutor.execute(relNode, null)).thenReturn(emptyRows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(1, response.getSchema().getColumns().size(), "Schema column count. " + dump); + assertEquals(0, response.getResults().size(), "Row count should be 0. " + dump); + } + + @Test + void executeRelNode_nullValues() { + RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER); + Iterable rows = Collections.singletonList(new Object[] {null, null}); + when(mockExecutor.execute(relNode, null)).thenReturn(rows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(1, response.getResults().size(), "Row count. " + dump); + assertTrue( + response.getResults().get(0).tupleValue().get("name").isNull(), + "name should be null. " + dump); + assertTrue( + response.getResults().get(0).tupleValue().get("age").isNull(), + "age should be null. " + dump); + } + + @Test + void executeRelNode_errorPropagation() { + RelNode relNode = mockRelNode("id", SqlTypeName.INTEGER); + when(mockExecutor.execute(relNode, null)).thenThrow(new RuntimeException("Engine failure")); + + Exception error = executeAndCaptureError(relNode); + System.out.println(dumpError("executeRelNode_errorPropagation", error)); + + assertEquals( + "Engine failure", + error.getMessage(), + "Exception type: " + error.getClass().getSimpleName() + ", message: " + error.getMessage()); + } + + @Test + void physicalPlanExecute_callsOnFailure() { + PhysicalPlan physicalPlan = mock(PhysicalPlan.class); + AtomicReference errorRef = new AtomicReference<>(); + engine.execute(physicalPlan, failureListener(errorRef)); + + assertNotNull(errorRef.get(), "onFailure should have been called"); + System.out.println(dumpError("physicalPlanExecute_callsOnFailure", errorRef.get())); + assertTrue( + errorRef.get() instanceof UnsupportedOperationException, + "Expected UnsupportedOperationException, got: " + + errorRef.get().getClass().getSimpleName() + + " - " + + errorRef.get().getMessage()); + } + + @Test + void physicalPlanExecuteWithContext_callsOnFailure() { + PhysicalPlan physicalPlan = mock(PhysicalPlan.class); + AtomicReference errorRef = new AtomicReference<>(); + engine.execute( + physicalPlan, + org.opensearch.sql.executor.ExecutionContext.emptyExecutionContext(), + failureListener(errorRef)); + + assertNotNull(errorRef.get(), "onFailure should have been called"); + System.out.println(dumpError("physicalPlanExecuteWithContext_callsOnFailure", errorRef.get())); + assertTrue( + errorRef.get() instanceof UnsupportedOperationException, + "Expected UnsupportedOperationException, got: " + + errorRef.get().getClass().getSimpleName() + + " - " + + errorRef.get().getMessage()); + } + + @Test + void physicalPlanExplain_callsOnFailure() { + PhysicalPlan physicalPlan = mock(PhysicalPlan.class); + AtomicReference errorRef = new AtomicReference<>(); + engine.explain(physicalPlan, explainFailureListener(errorRef)); + + assertNotNull(errorRef.get(), "onFailure should have been called"); + System.out.println(dumpError("physicalPlanExplain_callsOnFailure", errorRef.get())); + assertTrue( + errorRef.get() instanceof UnsupportedOperationException, + "Expected UnsupportedOperationException, got: " + + errorRef.get().getClass().getSimpleName() + + " - " + + errorRef.get().getMessage()); + } + + // --- helpers --- + + private QueryResponse executeAndCapture(RelNode relNode) { + AtomicReference ref = new AtomicReference<>(); + engine.execute(relNode, mockContext, captureListener(ref)); + assertNotNull(ref.get(), "QueryResponse should not be null"); + // Always print the full response so test output shows exact results + System.out.println(dumpResponse(ref.get())); + return ref.get(); + } + + private Exception executeAndCaptureError(RelNode relNode) { + AtomicReference ref = new AtomicReference<>(); + engine.execute( + relNode, + mockContext, + new ResponseListener() { + @Override + public void onResponse(QueryResponse response) {} + + @Override + public void onFailure(Exception e) { + ref.set(e); + } + }); + assertNotNull(ref.get(), "onFailure should have been called"); + return ref.get(); + } + + private ResponseListener failureListener(AtomicReference ref) { + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) {} + + @Override + public void onFailure(Exception e) { + ref.set(e); + } + }; + } + + private ResponseListener explainFailureListener(AtomicReference ref) { + return new ResponseListener() { + @Override + public void onResponse(ExplainResponse response) {} + + @Override + public void onFailure(Exception e) { + ref.set(e); + } + }; + } + + private String dumpError(String testName, Exception e) { + return "\n--- " + + testName + + " ---\n" + + "Exception: " + + e.getClass().getSimpleName() + + "\n" + + "Message: " + + e.getMessage() + + "\n--- End ---"; + } + + /** Dumps the full QueryResponse into a readable string for test output and assertion messages. */ + private String dumpResponse(QueryResponse response) { + StringBuilder sb = new StringBuilder(); + sb.append("\n--- QueryResponse ---\n"); + + sb.append("Schema: ["); + sb.append( + response.getSchema().getColumns().stream() + .map(c -> c.getName() + ":" + c.getExprType().typeName()) + .collect(Collectors.joining(", "))); + sb.append("]\n"); + + sb.append("Rows (").append(response.getResults().size()).append("):\n"); + for (int i = 0; i < response.getResults().size(); i++) { + sb.append(" [").append(i).append("] "); + sb.append(response.getResults().get(i).tupleValue()); + sb.append("\n"); + } + + sb.append("Cursor: ").append(response.getCursor()).append("\n"); + sb.append("--- End ---"); + return sb.toString(); + } + + private RelNode mockRelNode(Object... nameTypePairs) { + SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataTypeFactory.Builder builder = typeFactory.builder(); + for (int i = 0; i < nameTypePairs.length; i += 2) { + String name = (String) nameTypePairs[i]; + SqlTypeName typeName = (SqlTypeName) nameTypePairs[i + 1]; + builder.add(name, typeName); + } + RelDataType rowType = builder.build(); + + RelNode relNode = mock(RelNode.class); + when(relNode.getRowType()).thenReturn(rowType); + return relNode; + } + + private ResponseListener captureListener(AtomicReference ref) { + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) { + ref.set(response); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected failure", e); + } + }; + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java new file mode 100644 index 00000000000..e7788ae5c67 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java @@ -0,0 +1,208 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl; + +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.client.ResponseException; + +/** + * Integration tests for PPL queries routed through the analytics engine path (Project Mustang). + * Queries targeting "parquet_*" indices are routed to {@code RestUnifiedQueryAction} which uses + * {@code AnalyticsExecutionEngine} with a stub {@code QueryPlanExecutor}. + * + *

These tests validate the full pipeline: REST request -> routing -> planning via + * UnifiedQueryPlanner -> execution via AnalyticsExecutionEngine -> response formatting. + * + *

The stub executor always returns the full table rows regardless of the logical plan. After + * projection (| fields), the execution engine maps row values by position -- so projected columns + * get the values from the corresponding positions in the full row, not the actual projected column. + * This is expected behavior for a stub; the real analytics engine will evaluate the plan correctly. + */ +public class AnalyticsPPLIT extends PPLIntegTestCase { + + private static final Logger LOG = LogManager.getLogger(AnalyticsPPLIT.class); + + @Override + protected void init() throws Exception { + // No index loading needed -- stub schema and data are hardcoded + // in RestUnifiedQueryAction and StubQueryPlanExecutor + } + + // --- Full table scan tests with schema + data verification --- + + @Test + public void testBasicQuerySchemaAndData() throws IOException { + String query = "source = opensearch.parquet_logs"; + JSONObject result = executeQuery(query); + LOG.info("[testBasicQuerySchemaAndData] query: {}\nresponse: {}", query, result.toString(2)); + + verifySchema( + result, + schema("ts", "timestamp"), + schema("status", "integer"), + schema("message", "keyword"), + schema("ip_addr", "keyword")); + verifyNumOfRows(result, 3); + verifyDataRows( + result, + rows("2024-01-15 10:30:00", 200, "Request completed", "192.168.1.1"), + rows("2024-01-15 10:31:00", 200, "Health check OK", "192.168.1.2"), + rows("2024-01-15 10:32:00", 500, "Internal server error", "192.168.1.3")); + } + + @Test + public void testParquetMetricsSchemaAndData() throws IOException { + String query = "source = opensearch.parquet_metrics"; + JSONObject result = executeQuery(query); + LOG.info( + "[testParquetMetricsSchemaAndData] query: {}\nresponse: {}", query, result.toString(2)); + + verifySchema( + result, + schema("ts", "timestamp"), + schema("cpu", "double"), + schema("memory", "double"), + schema("host", "keyword")); + verifyNumOfRows(result, 2); + verifyDataRows( + result, + rows("2024-01-15 10:30:00", 75.5, 8192.5, "host-1"), + rows("2024-01-15 10:31:00", 82.3, 7680.5, "host-2")); + } + + // --- Response format validation --- + + @Test + public void testResponseFormatHasRequiredFields() throws IOException { + String query = "source = opensearch.parquet_logs"; + JSONObject result = executeQuery(query); + LOG.info( + "[testResponseFormatHasRequiredFields] query: {}\nresponse: {}", query, result.toString(2)); + + String msg = "Full response: " + result.toString(2); + assertTrue("Response missing 'schema'. " + msg, result.has("schema")); + assertTrue("Response missing 'datarows'. " + msg, result.has("datarows")); + assertTrue("Response missing 'total'. " + msg, result.has("total")); + assertTrue("Response missing 'size'. " + msg, result.has("size")); + assertTrue("Response missing 'status'. " + msg, result.has("status")); + assertEquals( + "Expected status 200 but got " + result.getInt("status") + ". " + msg, + 200, + result.getInt("status")); + } + + @Test + public void testTotalAndSizeMatchRowCount() throws IOException { + String query = "source = opensearch.parquet_logs"; + JSONObject result = executeQuery(query); + LOG.info("[testTotalAndSizeMatchRowCount] query: {}\nresponse: {}", query, result.toString(2)); + + int rowCount = result.getJSONArray("datarows").length(); + assertEquals( + String.format( + "total should match row count. rows=%d, total=%d, size=%d. Response: %s", + rowCount, result.getInt("total"), result.getInt("size"), result.toString(2)), + rowCount, + result.getInt("total")); + assertEquals( + String.format( + "size should match row count. rows=%d, size=%d. Response: %s", + rowCount, result.getInt("size"), result.toString(2)), + rowCount, + result.getInt("size")); + } + + // --- Projection tests (schema verification -- stub doesn't evaluate projections) --- + + @Test + public void testFieldsProjectionChangesSchema() throws IOException { + String query = "source = opensearch.parquet_logs | fields ts, message"; + JSONObject result = executeQuery(query); + LOG.info( + "[testFieldsProjectionChangesSchema] query: {}\nresponse: {}", query, result.toString(2)); + + verifySchema(result, schema("ts", "timestamp"), schema("message", "keyword")); + verifyNumOfRows(result, 3); + } + + @Test + public void testSingleFieldProjection() throws IOException { + String query = "source = opensearch.parquet_logs | fields status"; + JSONObject result = executeQuery(query); + LOG.info("[testSingleFieldProjection] query: {}\nresponse: {}", query, result.toString(2)); + + verifySchema(result, schema("status", "integer")); + verifyNumOfRows(result, 3); + } + + // --- Error handling tests --- + + @Test + public void testSyntaxErrorReturnsClientError() throws IOException { + String query = "source = opensearch.parquet_logs | invalid_command"; + ResponseException e = assertThrows(ResponseException.class, () -> executeQuery(query)); + int statusCode = e.getResponse().getStatusLine().getStatusCode(); + String responseBody = getResponseBody(e.getResponse(), true); + LOG.info( + "[testSyntaxErrorReturnsClientError] query: {}\nstatus: {}\nresponse: {}", + query, + statusCode, + responseBody); + + assertTrue( + String.format( + "Syntax error should return 4xx, got %d. Response: %s", statusCode, responseBody), + statusCode >= 400 && statusCode < 500); + } + + // --- Regression tests --- + + @Test + public void testNonParquetQueryStillWorks() throws IOException { + loadIndex(Index.ACCOUNT); + String query = String.format("source=%s | head 1 | fields firstname", TEST_INDEX_ACCOUNT); + JSONObject result = executeQuery(query); + LOG.info("[testNonParquetQueryStillWorks] query: {}\nresponse: {}", query, result.toString(2)); + + assertNotNull("Non-parquet query returned null. Query: " + query, result); + assertTrue( + "Non-parquet query missing 'datarows'. Response: " + result.toString(2), + result.has("datarows")); + int rowCount = result.getJSONArray("datarows").length(); + assertTrue( + String.format( + "Non-parquet query returned 0 rows. Expected > 0. Response: %s", result.toString(2)), + rowCount > 0); + } + + @Test + public void testNonParquetAggregationStillWorks() throws IOException { + loadIndex(Index.ACCOUNT); + String query = String.format("source=%s | stats count()", TEST_INDEX_ACCOUNT); + JSONObject result = executeQuery(query); + LOG.info( + "[testNonParquetAggregationStillWorks] query: {}\nresponse: {}", query, result.toString(2)); + + int total = result.getInt("total"); + assertTrue( + String.format( + "Non-parquet aggregation returned total=%d, expected > 0. Response: %s", + total, result.toString(2)), + total > 0); + } +} diff --git a/plugin/build.gradle b/plugin/build.gradle index 340787fa01f..7b757759e13 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -160,6 +160,7 @@ dependencies { api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson_annotations}" api project(":ppl") + api project(':api') api project(':legacy') api project(':opensearch') api project(':prometheus') diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index edffd65f6bf..016d53dc8de 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -163,7 +163,7 @@ public List getRestHandlers( Metrics.getInstance().registerDefaultMetrics(); return Arrays.asList( - new RestPPLQueryAction(), + new RestPPLQueryAction(clusterService, this.client), new RestPPLGrammarAction(), new RestSqlAction(settings, injector), new RestSqlStatsAction(settings, restController), diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java index ffdd90504f7..4470a165075 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java @@ -17,6 +17,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.IndexNotFoundException; @@ -29,6 +30,7 @@ import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.exception.QueryEngineException; import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.response.error.ErrorMessageFactory; @@ -44,9 +46,13 @@ public class RestPPLQueryAction extends BaseRestHandler { private static final Logger LOG = LogManager.getLogger(); + /** Unified query handler for Parquet-backed indices (Analytics engine path). */ + private final RestUnifiedQueryAction unifiedQueryHandler; + /** Constructor of RestPPLQueryAction. */ - public RestPPLQueryAction() { + public RestPPLQueryAction(ClusterService clusterService, NodeClient client) { super(); + this.unifiedQueryHandler = new RestUnifiedQueryAction(client, new StubQueryPlanExecutor()); } private static boolean isClientError(Exception e) { @@ -86,6 +92,12 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod TransportPPLQueryRequest transportPPLQueryRequest = new TransportPPLQueryRequest(PPLQueryRequestFactory.getPPLRequest(request)); + // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices + String pplQuery = transportPPLQueryRequest.toPPLQueryRequest().getRequest(); + if (RestUnifiedQueryAction.isAnalyticsIndex(pplQuery)) { + return channel -> unifiedQueryHandler.execute(pplQuery, QueryType.PPL, channel); + } + return channel -> nodeClient.execute( PPLQueryAction.INSTANCE, diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java new file mode 100644 index 00000000000..113a9614975 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -0,0 +1,263 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest; + +import static org.opensearch.core.rest.RestStatus.OK; +import static org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import static org.opensearch.sql.lang.PPLLangSpec.PPL_SPEC; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; +import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.sql.api.UnifiedQueryContext; +import org.opensearch.sql.api.UnifiedQueryPlanner; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.common.antlr.SyntaxCheckException; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.executor.QueryType; +import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine; +import org.opensearch.sql.executor.analytics.QueryPlanExecutor; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.protocol.response.QueryResult; +import org.opensearch.sql.protocol.response.format.JdbcResponseFormatter; +import org.opensearch.sql.protocol.response.format.ResponseFormatter; +import org.opensearch.transport.client.node.NodeClient; + +/** + * Handles queries routed to the Analytics engine via the unified query pipeline. Parses PPL queries + * using {@link UnifiedQueryPlanner} to generate a Calcite {@link RelNode}, then delegates to {@link + * AnalyticsExecutionEngine} for execution. + */ +public class RestUnifiedQueryAction { + + private static final Logger LOG = LogManager.getLogger(RestUnifiedQueryAction.class); + private static final String SCHEMA_NAME = "opensearch"; + + /** + * Pattern to extract index name from PPL source clause. Matches: source = index, source=index, + * source = `index`, source = catalog.index + */ + private static final Pattern SOURCE_PATTERN = + Pattern.compile( + "source\\s*=\\s*`?([a-zA-Z0-9_.*]+(?:\\.[a-zA-Z0-9_.*]+)*)`?", Pattern.CASE_INSENSITIVE); + + private final AnalyticsExecutionEngine analyticsEngine; + private final NodeClient client; + + public RestUnifiedQueryAction(NodeClient client, QueryPlanExecutor planExecutor) { + this.client = client; + this.analyticsEngine = new AnalyticsExecutionEngine(planExecutor); + } + + /** + * Check if the query targets an analytics engine index (e.g., Parquet-backed). Currently uses a + * prefix convention ("parquet_"). In production, this will check index settings such as + * index.storage_type. + */ + public static boolean isAnalyticsIndex(String query) { + if (query == null) { + return false; + } + String indexName = extractIndexName(query); + if (indexName == null) { + return false; + } + // Handle qualified names like "catalog.parquet_logs" — check the last segment + int lastDot = indexName.lastIndexOf('.'); + String tableName = lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; + return tableName.startsWith("parquet_"); + } + + /** + * Extract the source index name from a PPL query string. + * + * @param query the PPL query string + * @return the index name, or null if not found + */ + static String extractIndexName(String query) { + Matcher matcher = SOURCE_PATTERN.matcher(query); + if (matcher.find()) { + return matcher.group(1); + } + return null; + } + + /** + * Execute a query through the unified query pipeline on the sql-worker thread pool. + * + * @param query the PPL query string + * @param queryType SQL or PPL + * @param channel the REST channel for sending the response + */ + public void execute(String query, QueryType queryType, RestChannel channel) { + client + .threadPool() + .schedule( + () -> doExecute(query, queryType, channel), + new TimeValue(0), + SQL_WORKER_THREAD_POOL_NAME); + } + + private void doExecute(String query, QueryType queryType, RestChannel channel) { + try { + long startTime = System.nanoTime(); + + // TODO: Replace stub schema with EngineContext.getSchema() when analytics engine is ready + AbstractSchema schema = buildStubSchema(); + + try (UnifiedQueryContext context = + UnifiedQueryContext.builder() + .language(queryType) + .catalog(SCHEMA_NAME, schema) + .defaultNamespace(SCHEMA_NAME) + .build()) { + + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + long planTime = System.nanoTime(); + LOG.info( + "[unified] Planning completed in {}ms for {} query", + (planTime - startTime) / 1_000_000, + queryType); + + CalcitePlanContext planContext = context.getPlanContext(); + analyticsEngine.execute(plan, planContext, createQueryListener(channel, planTime)); + } + } catch (Exception e) { + recordFailureMetric(e); + reportError(channel, e); + } + } + + private ResponseListener createQueryListener( + RestChannel channel, long planEndTime) { + ResponseFormatter formatter = new JdbcResponseFormatter(PRETTY); + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) { + long execTime = System.nanoTime(); + LOG.info( + "[unified] Execution completed in {}ms, {} rows returned", + (execTime - planEndTime) / 1_000_000, + response.getResults().size()); + Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_TOTAL).increment(); + Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_COUNT_TOTAL).increment(); + String result = + formatter.format( + new QueryResult( + response.getSchema(), response.getResults(), response.getCursor(), PPL_SPEC)); + channel.sendResponse(new BytesRestResponse(OK, formatter.contentType(), result)); + } + + @Override + public void onFailure(Exception e) { + recordFailureMetric(e); + reportError(channel, e); + } + }; + } + + /** + * Stub schema for development and testing. Returns a hardcoded table definition for any + * "parquet_*" table. Will be replaced by EngineContext.getSchema() when the analytics engine is + * ready. + */ + private static AbstractSchema buildStubSchema() { + return new AbstractSchema() { + @Override + protected Map getTableMap() { + return Map.of( + "parquet_logs", buildStubTable(), + "parquet_metrics", buildStubMetricsTable()); + } + }; + } + + private static Table buildStubTable() { + return new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return typeFactory + .builder() + .add("ts", SqlTypeName.TIMESTAMP) + .add("status", SqlTypeName.INTEGER) + .add("message", SqlTypeName.VARCHAR) + .add("ip_addr", SqlTypeName.VARCHAR) + .build(); + } + }; + } + + private static Table buildStubMetricsTable() { + return new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return typeFactory + .builder() + .add("ts", SqlTypeName.TIMESTAMP) + .add("cpu", SqlTypeName.DOUBLE) + .add("memory", SqlTypeName.DOUBLE) + .add("host", SqlTypeName.VARCHAR) + .build(); + } + }; + } + + /** Classify whether the exception is a client error (bad query) or server error (engine bug). */ + private static boolean isClientError(Exception e) { + return e instanceof SyntaxCheckException + || e instanceof SemanticCheckException + || e instanceof IllegalArgumentException + || e instanceof NullPointerException; + } + + private static void recordFailureMetric(Exception e) { + if (isClientError(e)) { + LOG.warn("[unified] Client error in query execution", e); + Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS).increment(); + } else { + LOG.error("[unified] Server error in query execution", e); + Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_SYS).increment(); + } + } + + private static void reportError(RestChannel channel, Exception e) { + RestStatus status = + isClientError(e) ? RestStatus.BAD_REQUEST : RestStatus.INTERNAL_SERVER_ERROR; + String reason = e.getMessage() != null ? e.getMessage() : "Unknown error"; + // Escape characters that would break JSON + reason = + reason.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", ""); + channel.sendResponse( + new BytesRestResponse( + status, + "application/json; charset=UTF-8", + "{\"error\":{\"type\":\"" + + e.getClass().getSimpleName() + + "\",\"reason\":\"" + + reason + + "\"},\"status\":" + + status.getStatus() + + "}")); + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java new file mode 100644 index 00000000000..fdd15eb7c17 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest; + +import java.time.Instant; +import java.util.List; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.opensearch.sql.executor.analytics.QueryPlanExecutor; + +/** + * Stub implementation of {@link QueryPlanExecutor} for development and testing. Returns canned data + * so the full pipeline (routing → planning → execution → response formatting) can be validated + * without the analytics engine. + * + *

Will be replaced by the real analytics engine implementation when available. + */ +public class StubQueryPlanExecutor implements QueryPlanExecutor { + + @Override + public Iterable execute(RelNode plan, Object context) { + // Return canned rows matching the stub schema defined in RestUnifiedQueryAction. + // The column order must match the schema: ts, status, message, ip_addr + // (for parquet_logs table). For other tables, return empty results. + String tableName = extractTableName(plan); + if (tableName != null && tableName.contains("parquet_logs")) { + return List.of( + new Object[] { + Instant.parse("2024-01-15T10:30:00Z"), 200, "Request completed", "192.168.1.1" + }, + new Object[] { + Instant.parse("2024-01-15T10:31:00Z"), 200, "Health check OK", "192.168.1.2" + }, + new Object[] { + Instant.parse("2024-01-15T10:32:00Z"), 500, "Internal server error", "192.168.1.3" + }); + } + if (tableName != null && tableName.contains("parquet_metrics")) { + return List.of( + new Object[] {Instant.parse("2024-01-15T10:30:00Z"), 75.5, 8192.5, "host-1"}, + new Object[] {Instant.parse("2024-01-15T10:31:00Z"), 82.3, 7680.5, "host-2"}); + } + return List.of(); + } + + private String extractTableName(RelNode plan) { + // Use RelOptUtil.toString to get the full plan tree including child nodes + String planStr = RelOptUtil.toString(plan); + if (planStr.contains("parquet_logs")) { + return "parquet_logs"; + } + if (planStr.contains("parquet_metrics")) { + return "parquet_metrics"; + } + return null; + } +} diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java new file mode 100644 index 00000000000..c22011ec47f --- /dev/null +++ b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java @@ -0,0 +1,96 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class RestUnifiedQueryActionTest { + + // --- isAnalyticsIndex --- + + @Test + public void isAnalyticsIndex_parquetPrefix() { + assertTrue(RestUnifiedQueryAction.isAnalyticsIndex("source = parquet_logs | fields ts")); + } + + @Test + public void isAnalyticsIndex_parquetPrefixNoSpaces() { + assertTrue(RestUnifiedQueryAction.isAnalyticsIndex("source=parquet_logs | fields ts")); + } + + @Test + public void isAnalyticsIndex_parquetPrefixWithCatalog() { + assertTrue( + RestUnifiedQueryAction.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts")); + } + + @Test + public void isAnalyticsIndex_nonParquetIndex() { + assertFalse(RestUnifiedQueryAction.isAnalyticsIndex("source = my_logs | fields ts")); + } + + @Test + public void isAnalyticsIndex_nullQuery() { + assertFalse(RestUnifiedQueryAction.isAnalyticsIndex(null)); + } + + @Test + public void isAnalyticsIndex_emptyQuery() { + assertFalse(RestUnifiedQueryAction.isAnalyticsIndex("")); + } + + // --- extractIndexName --- + + @Test + public void extractIndexName_simpleSource() { + assertEquals("my_logs", RestUnifiedQueryAction.extractIndexName("source = my_logs")); + } + + @Test + public void extractIndexName_noSpaces() { + assertEquals("my_logs", RestUnifiedQueryAction.extractIndexName("source=my_logs")); + } + + @Test + public void extractIndexName_withPipe() { + assertEquals( + "my_logs", + RestUnifiedQueryAction.extractIndexName("source = my_logs | where status = 200")); + } + + @Test + public void extractIndexName_backticks() { + assertEquals( + "my_logs", RestUnifiedQueryAction.extractIndexName("source = `my_logs` | fields ts")); + } + + @Test + public void extractIndexName_qualifiedName() { + assertEquals( + "opensearch.my_logs", + RestUnifiedQueryAction.extractIndexName("source = opensearch.my_logs | fields ts")); + } + + @Test + public void extractIndexName_caseInsensitive() { + assertEquals("my_logs", RestUnifiedQueryAction.extractIndexName("SOURCE = my_logs")); + } + + @Test + public void extractIndexName_noSourceClause() { + assertNull(RestUnifiedQueryAction.extractIndexName("describe my_logs")); + } + + @Test + public void extractIndexName_wildcardIndex() { + assertEquals("parquet_*", RestUnifiedQueryAction.extractIndexName("source = parquet_*")); + } +}