Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,59 @@ try (UnifiedQueryContext context = UnifiedQueryContext.builder()
}
```

## Profiling

The unified query API supports the same [profiling capability](../docs/user/ppl/interfaces/endpoint.md#profile-experimental) as the PPL REST endpoint. When enabled, each unified query component automatically collects per-phase timing metrics. For code outside unified query components (e.g., `PreparedStatement.executeQuery()` or response formatting), `context.measure()` records custom phases into the same profile.

```java
try (UnifiedQueryContext context = UnifiedQueryContext.builder()
.language(QueryType.PPL)
.catalog("catalog", schema)
.defaultNamespace("catalog")
.profiling(true)
.build()) {

// Auto-profiled: ANALYZE
RelNode plan = new UnifiedQueryPlanner(context).plan(query);

// Auto-profiled: OPTIMIZE
PreparedStatement stmt = new UnifiedQueryCompiler(context).compile(plan);

// User-profiled via measure()
ResultSet rs = context.measure(MetricName.EXECUTE, stmt::executeQuery);
String json = context.measure(MetricName.FORMAT, () -> formatter.format(result));

// Retrieve profile snapshot
QueryProfile profile = context.getProfile();
}
```

The returned `QueryProfile` follows the same JSON structure as the REST API:

```json
{
"summary": {
"total_time_ms": 33.34
},
"phases": {
"analyze": { "time_ms": 8.68 },
"optimize": { "time_ms": 18.2 },
"execute": { "time_ms": 4.87 },
"format": { "time_ms": 0.05 }
},
"plan": {
"node": "EnumerableCalc",
"time_ms": 4.82,
"rows": 2,
"children": [
{ "node": "CalciteEnumerableIndexScan", "time_ms": 4.12, "rows": 2 }
]
}
}
```

When profiling is disabled (the default), all components execute with zero overhead.

## Development & Testing

A set of unit tests is provided to validate planner behavior.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import lombok.Value;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.jdbc.CalciteSchema;
Expand All @@ -28,6 +30,10 @@
import org.opensearch.sql.calcite.SysLimit;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.monitor.profile.MetricName;
import org.opensearch.sql.monitor.profile.ProfileMetric;
import org.opensearch.sql.monitor.profile.QueryProfile;
import org.opensearch.sql.monitor.profile.QueryProfiling;

/**
* A reusable abstraction shared across unified query components (planner, compiler, etc.). This
Expand All @@ -43,13 +49,43 @@ public class UnifiedQueryContext implements AutoCloseable {
/** Settings containing execution limits and feature flags used by parsers and planners. */
Settings settings;

/**
* Returns the profiling result. Call after query execution to retrieve collected metrics. Returns
* empty if profiling was not enabled.
*/
public Optional<QueryProfile> getProfile() {
return Optional.ofNullable(QueryProfiling.current().finish());
}

/**
* Measures the execution time of the given action and records it as a profiling metric. When
* profiling is disabled, the action executes with no overhead. Use this for phases outside
* unified query components (e.g., execution, formatting).
*
* @param <T> the return type of the action
* @param metricName the metric to record
* @param action the action to measure
* @return the result of the action
* @throws Exception if the action throws
*/
public <T> T measure(MetricName metricName, Callable<T> action) throws Exception {
ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(metricName);
long start = System.nanoTime();
try {
return action.call();
} finally {
metric.set(System.nanoTime() - start);
}
}

/**
* Closes the underlying resource managed by this context.
*
* @throws Exception if an error occurs while closing the connection
*/
@Override
public void close() throws Exception {
QueryProfiling.clear();
if (planContext != null && planContext.connection != null) {
planContext.connection.close();
}
Expand All @@ -66,6 +102,7 @@ public static class Builder {
private final Map<String, Schema> catalogs = new HashMap<>();
private String defaultNamespace;
private boolean cacheMetadata = false;
private boolean profiling = false;

/**
* Setting values with defaults from SysLimit.DEFAULT. Only includes planning-required settings
Expand Down Expand Up @@ -125,6 +162,18 @@ public Builder cacheMetadata(boolean cache) {
return this;
}

/**
* Enables or disables query profiling. When enabled, profiling metrics are collected during
* query planning and execution, retrievable via {@link UnifiedQueryContext#getProfile()}.
*
* @param enabled whether to enable profiling
* @return this builder instance
*/
public Builder profiling(boolean enabled) {
this.profiling = enabled;
return this;
}

/**
* Sets a specific setting value by name.
*
Expand Down Expand Up @@ -152,6 +201,7 @@ public UnifiedQueryContext build() {
CalcitePlanContext planContext =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
QueryProfiling.activate(profiling);
return new UnifiedQueryContext(planContext, settings);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.api;

import static org.opensearch.sql.monitor.profile.MetricName.ANALYZE;

import lombok.RequiredArgsConstructor;
import org.antlr.v4.runtime.tree.ParseTree;
import org.apache.calcite.rel.RelCollation;
Expand Down Expand Up @@ -36,12 +38,16 @@ public class UnifiedQueryPlanner {
/** Planning strategy selected at construction time based on query type. */
private final PlanningStrategy strategy;

/** Unified query context for profiling support. */
private final UnifiedQueryContext context;

/**
* Constructs a UnifiedQueryPlanner with a unified query context.
*
* @param context the unified query context containing CalcitePlanContext
*/
public UnifiedQueryPlanner(UnifiedQueryContext context) {
this.context = context;
this.strategy =
context.getPlanContext().queryType == QueryType.SQL
? new CalciteNativeStrategy(context)
Expand All @@ -57,7 +63,7 @@ public UnifiedQueryPlanner(UnifiedQueryContext context) {
*/
public RelNode plan(String query) {
try {
return strategy.plan(query);
return context.measure(ANALYZE, () -> strategy.plan(query));
} catch (SyntaxCheckException e) {
// Re-throw syntax error without wrapping
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.api.compiler;

import static org.opensearch.sql.monitor.profile.MetricName.OPTIMIZE;

import java.sql.Connection;
import java.sql.PreparedStatement;
import lombok.NonNull;
Expand Down Expand Up @@ -46,26 +48,29 @@ public UnifiedQueryCompiler(UnifiedQueryContext context) {
*/
public PreparedStatement compile(@NonNull RelNode plan) {
try {
// Apply shuttle to convert LogicalTableScan to BindableTableScan
final RelHomogeneousShuttle shuttle =
new RelHomogeneousShuttle() {
@Override
public RelNode visit(TableScan scan) {
final RelOptTable table = scan.getTable();
if (scan instanceof LogicalTableScan
&& Bindables.BindableTableScan.canHandle(table)) {
return Bindables.BindableTableScan.create(scan.getCluster(), table);
}
return super.visit(scan);
}
};
RelNode transformedPlan = plan.accept(shuttle);

Connection connection = context.getPlanContext().connection;
final RelRunner runner = connection.unwrap(RelRunner.class);
return runner.prepareStatement(transformedPlan);
return context.measure(OPTIMIZE, () -> doCompile(plan));
} catch (Exception e) {
throw new IllegalStateException("Failed to compile logical plan", e);
}
}

private PreparedStatement doCompile(RelNode plan) throws Exception {
// Apply shuttle to convert LogicalTableScan to BindableTableScan
final RelHomogeneousShuttle shuttle =
new RelHomogeneousShuttle() {
@Override
public RelNode visit(TableScan scan) {
final RelOptTable table = scan.getTable();
if (scan instanceof LogicalTableScan && Bindables.BindableTableScan.canHandle(table)) {
return Bindables.BindableTableScan.create(scan.getCluster(), table);
}
return super.visit(scan);
}
};
RelNode transformedPlan = plan.accept(shuttle);

Connection connection = context.getPlanContext().connection;
final RelRunner runner = connection.unwrap(RelRunner.class);
return runner.prepareStatement(transformedPlan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.api;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.opensearch.sql.monitor.profile.MetricName.EXECUTE;

import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import org.apache.calcite.rel.RelNode;
import org.junit.Test;
import org.opensearch.sql.api.compiler.UnifiedQueryCompiler;
import org.opensearch.sql.monitor.profile.QueryProfile;
import org.opensearch.sql.monitor.profile.QueryProfiling;

/** Tests for profiling across unified query components and the measure() API. */
public class UnifiedQueryProfilingTest extends UnifiedQueryTestBase {

@Override
protected UnifiedQueryContext.Builder contextBuilder() {
return super.contextBuilder().profiling(true);
}

@Test
public void testProfilingEnabled() {
assertTrue(QueryProfiling.current().isEnabled());
}

@Test
public void testProfilingDisabledByDefault() throws Exception {
try (UnifiedQueryContext ctx = super.contextBuilder().build()) {
assertFalse(QueryProfiling.current().isEnabled());
}
}

@Test
public void testGetProfileReturnsEmptyWhenDisabled() throws Exception {
try (UnifiedQueryContext ctx = super.contextBuilder().build()) {
assertFalse(ctx.getProfile().isPresent());
}
}

@Test
public void testMeasureExecutesWhenProfilingDisabled() throws Exception {
try (UnifiedQueryContext ctx = super.contextBuilder().build()) {
assertEquals("done", ctx.measure(EXECUTE, () -> "done"));
assertFalse(ctx.getProfile().isPresent());
}
}

@Test
public void testPlannerAutoProfilesAnalyzePhase() {
planner.plan("source = catalog.employees");
assertTrue(context.getProfile().get().getPhases().get("analyze").getTimeMillis() >= 0);
}

@Test
public void testCompilerAutoProfilesOptimizePhase() {
RelNode plan = planner.plan("source = catalog.employees");
new UnifiedQueryCompiler(context).compile(plan);
assertTrue(context.getProfile().get().getPhases().get("optimize").getTimeMillis() >= 0);
}

@Test
public void testMeasureRecordsMetric() throws Exception {
assertEquals("done", context.measure(EXECUTE, () -> "done"));
assertTrue(context.getProfile().get().getPhases().get("execute").getTimeMillis() >= 0);
}

@Test
public void testFullPipelineProfiling() throws Exception {
RelNode plan = planner.plan("source = catalog.employees");
PreparedStatement stmt = new UnifiedQueryCompiler(context).compile(plan);
ResultSet rs = context.measure(EXECUTE, stmt::executeQuery);

QueryProfile profile = context.getProfile().get();
assertTrue(profile.getSummary().getTotalTimeMillis() >= 0);
assertTrue(profile.getPhases().get("analyze").getTimeMillis() >= 0);
assertTrue(profile.getPhases().get("optimize").getTimeMillis() >= 0);
assertTrue(profile.getPhases().get("execute").getTimeMillis() >= 0);
assertNotNull(profile.getPlan());
}

@Test
public void testProfilingClearedAfterClose() throws Exception {
assertTrue(QueryProfiling.current().isEnabled());
context.close();
assertFalse(QueryProfiling.current().isEnabled());
}

@Test
public void testMeasurePropagatesException() {
assertThrows(
IOException.class,
() ->
context.measure(
EXECUTE,
() -> {
throw new IOException("test error");
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected Map<String, Table> getTableMap() {
}
};

context = buildContext(queryType());
context = contextBuilder().build();
planner = new UnifiedQueryPlanner(context);
}

Expand All @@ -69,12 +69,12 @@ protected QueryType queryType() {
return QueryType.PPL;
}

/** Builds a UnifiedQueryContext with the test schema for the given query type. */
protected UnifiedQueryContext buildContext(QueryType queryType) {
return UnifiedQueryContext.builder()
.language(queryType)
.catalog(DEFAULT_CATALOG, testSchema)
.build();
/**
* Creates a pre-configured context builder with test schema. Subclasses can override to customize
* context configuration (e.g., enable profiling).
*/
protected UnifiedQueryContext.Builder contextBuilder() {
return UnifiedQueryContext.builder().language(queryType()).catalog(DEFAULT_CATALOG, testSchema);
}

@After
Expand Down
Loading