Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.analytics;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.ExecutionContext;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.pagination.Cursor;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
* Execution engine adapter for the analytics engine (Project Mustang).
*
* <p>Bridges the analytics engine's {@link QueryPlanExecutor} with the SQL plugin's {@link
* ExecutionEngine} response pipeline. Takes a Calcite {@link RelNode}, delegates execution to the
* analytics engine, and converts the raw results into {@link QueryResponse}.
*/
public class AnalyticsExecutionEngine implements ExecutionEngine {

private final QueryPlanExecutor planExecutor;

public AnalyticsExecutionEngine(QueryPlanExecutor planExecutor) {
this.planExecutor = planExecutor;
}

/** Not supported. Analytics queries use the RelNode path exclusively. */
@Override
public void execute(PhysicalPlan plan, ResponseListener<QueryResponse> listener) {
listener.onFailure(
new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
}

/** Not supported. Analytics queries use the RelNode path exclusively. */
@Override
public void execute(
PhysicalPlan plan, ExecutionContext context, ResponseListener<QueryResponse> listener) {
listener.onFailure(
new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
}

/** Not supported. Analytics queries use the RelNode path exclusively. */
@Override
public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listener) {
listener.onFailure(
new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
}

@Override
public void execute(
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
try {
Integer querySizeLimit = context.sysLimit.querySizeLimit();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check how PPL V3 handle this today? I'm thinking if we want to be consistent, this needs to be part of the RelNode plan we pass to QueryPlanExecutor instead of post-processing? Otherwise we may pull lots of data back?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Moved the query size limit into the RelNode plan using LogicalSystemLimit.create(), same pattern as QueryService.convertToCalcitePlan(). The limit is now part of the plan the analytics engine receives, so it can enforce it during execution instead of us pulling all rows back first.

Removed the post-processing truncation from AnalyticsExecutionEngine

Iterable<Object[]> rows = planExecutor.execute(plan, null);

List<RelDataTypeField> fields = plan.getRowType().getFieldList();
List<ExprValue> results = convertRows(rows, fields, querySizeLimit);
Schema schema = buildSchema(fields);

listener.onResponse(new QueryResponse(schema, results, Cursor.None));
} catch (Exception e) {
listener.onFailure(e);
}
}

private List<ExprValue> convertRows(
Iterable<Object[]> rows, List<RelDataTypeField> fields, Integer querySizeLimit) {
List<ExprValue> results = new ArrayList<>();
for (Object[] row : rows) {
if (querySizeLimit != null && results.size() >= querySizeLimit) {
break;
}
Map<String, ExprValue> valueMap = new LinkedHashMap<>();
for (int i = 0; i < fields.size(); i++) {
String columnName = fields.get(i).getName();
Object value = (i < row.length) ? row[i] : null;
valueMap.put(columnName, ExprValueUtils.fromObjectValue(value));
}
results.add(ExprTupleValue.fromExprValueMap(valueMap));
}
return results;
}

private Schema buildSchema(List<RelDataTypeField> fields) {
List<Schema.Column> columns = new ArrayList<>();
for (RelDataTypeField field : fields) {
ExprType exprType = convertType(field.getType());
columns.add(new Schema.Column(field.getName(), null, exprType));
}
return new Schema(columns);
}

private ExprType convertType(RelDataType type) {
try {
return OpenSearchTypeFactory.convertRelDataTypeToExprType(type);
} catch (IllegalArgumentException e) {
return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.analytics;

import org.apache.calcite.rel.RelNode;

/**
* Executes a Calcite {@link RelNode} logical plan against the analytics engine.
*
* <p>This is a local equivalent of {@code org.opensearch.analytics.exec.QueryPlanExecutor} from the
* analytics-framework library. It will be replaced by the upstream interface once the
* analytics-framework JAR is published.
*
* @see <a
* href="https://github.com/opensearch-project/OpenSearch/blob/9142d0e789c6a6c4708f1bc015745ed55202eefe/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/exec/QueryPlanExecutor.java">Upstream
* QueryPlanExecutor</a>
*/
@FunctionalInterface
public interface QueryPlanExecutor {

/**
* Executes the given logical plan and returns result rows.
*
* @param plan the Calcite RelNode subtree to execute
* @param context execution context (opaque to avoid server dependency)
* @return rows produced by the engine
*/
Iterable<Object[]> execute(RelNode plan, Object context);
}
Loading
Loading