diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 5d95f03af0fe..e01dc9c12f8c 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -7,6 +7,7 @@ between `opentelemetry-exporter-zipkin-proto-http` (protobuf~=3.12) and `opentelemetry-proto` (protobuf>=5.0). * Fix missing `taskId` filter and incorrect `IN` clause parameter binding in `JDBCJFRDataQueryDAO` and `JDBCPprofDataQueryDAO`. * Remove deprecated `GroupBy.field_name` from BanyanDB `MeasureQuery` request building (Phase 1 of staged removal across repos). +* Push `taskId` filter down to the storage layer in `IAsyncProfilerTaskLogQueryDAO`, removing in-memory filtering from `AsyncProfilerQueryService`. #### UI diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/asyncprofiler/AsyncProfilerQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/asyncprofiler/AsyncProfilerQueryService.java index ce86f0fbc616..1057c4d3f31c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/asyncprofiler/AsyncProfilerQueryService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/asyncprofiler/AsyncProfilerQueryService.java @@ -102,13 +102,7 @@ public AsyncProfilerStackTree queryJFRData(String taskId, List instanceI } public List queryAsyncProfilerTaskLogs(String taskId) throws IOException { - List taskLogList = getTaskLogQueryDAO().getTaskLogList(); - return findMatchedLogs(taskId, taskLogList); - } - - private List findMatchedLogs(final String taskID, final List allLogs) { - return allLogs.stream() - .filter(l -> Objects.equals(l.getId(), taskID)) + return getTaskLogQueryDAO().getTaskLogList(taskId).stream() .map(this::extendTaskLog) .collect(Collectors.toList()); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/asyncprofiler/IAsyncProfilerTaskLogQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/asyncprofiler/IAsyncProfilerTaskLogQueryDAO.java index 8cb595f6df83..ae2c095bd352 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/asyncprofiler/IAsyncProfilerTaskLogQueryDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/asyncprofiler/IAsyncProfilerTaskLogQueryDAO.java @@ -26,7 +26,11 @@ public interface IAsyncProfilerTaskLogQueryDAO extends DAO { /** - * search all task log list in appoint task id + * Search task logs by the given task id. + * + * @param taskId the task id to filter by, must not be null or blank + * @return the task logs associated with the given task id + * @throws IOException if the query fails */ - List getTaskLogList() throws IOException; + List getTaskLogList(String taskId) throws IOException; } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAsyncProfilerTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAsyncProfilerTaskLogQueryDAO.java index 68e783cb45d1..b1ab61dd2212 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAsyncProfilerTaskLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAsyncProfilerTaskLogQueryDAO.java @@ -53,11 +53,12 @@ public BanyanDBAsyncProfilerTaskLogQueryDAO(BanyanDBStorageClient client, int ta } @Override - public List getTaskLogList() throws IOException { + public List getTaskLogList(String taskId) throws IOException { StreamQueryResponse resp = query(false, AsyncProfilerTaskLogRecord.INDEX_NAME, TAGS, new QueryBuilder() { @Override public void apply(StreamQuery query) { + query.and(eq(AsyncProfilerTaskLogRecord.TASK_ID, taskId)); query.setLimit(BanyanDBAsyncProfilerTaskLogQueryDAO.this.queryMaxSize); } }); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AsyncProfilerTaskLogQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AsyncProfilerTaskLogQueryEsDAO.java index d06d66ba6a8f..871560834558 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AsyncProfilerTaskLogQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AsyncProfilerTaskLogQueryEsDAO.java @@ -47,13 +47,14 @@ public AsyncProfilerTaskLogQueryEsDAO(ElasticSearchClient client, int queryMaxSi } @Override - public List getTaskLogList() throws IOException { + public List getTaskLogList(String taskId) throws IOException { final String index = IndexController.LogicIndicesRegister.getPhysicalTableName( AsyncProfilerTaskLogRecord.INDEX_NAME); final BoolQueryBuilder query = Query.bool(); if (IndexController.LogicIndicesRegister.isMergedTable(AsyncProfilerTaskLogRecord.INDEX_NAME)) { query.must(Query.term(IndexController.LogicIndicesRegister.RECORD_TABLE_NAME, AsyncProfilerTaskLogRecord.INDEX_NAME)); } + query.must(Query.term(AsyncProfilerTaskLogRecord.TASK_ID, taskId)); final SearchBuilder search = Search.builder().query(query) .sort(AsyncProfilerTaskLogRecord.OPERATION_TIME, Sort.Order.DESC) diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCAsyncProfilerTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCAsyncProfilerTaskLogQueryDAO.java index e8f76d4c4c33..88bf096d9d7e 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCAsyncProfilerTaskLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCAsyncProfilerTaskLogQueryDAO.java @@ -42,11 +42,11 @@ public class JDBCAsyncProfilerTaskLogQueryDAO implements IAsyncProfilerTaskLogQu @Override @SneakyThrows - public List getTaskLogList() { + public List getTaskLogList(String taskId) { List tables = tableHelper.getTablesWithinTTL(AsyncProfilerTaskLogRecord.INDEX_NAME); final List results = new ArrayList(); for (String table : tables) { - SQLAndParameters sqlAndParameters = buildSQL(table); + SQLAndParameters sqlAndParameters = buildSQL(table, taskId); List logs = jdbcClient.executeQuery( sqlAndParameters.sql(), resultSet -> { @@ -62,12 +62,14 @@ public List getTaskLogList() { return results; } - private SQLAndParameters buildSQL(String table) { + private SQLAndParameters buildSQL(String table, String taskId) { StringBuilder sql = new StringBuilder(); List parameters = new ArrayList<>(2); sql.append("select * from ").append(table) .append(" where ").append(JDBCTableInstaller.TABLE_COLUMN).append(" = ?"); parameters.add(AsyncProfilerTaskLogRecord.INDEX_NAME); + sql.append(" and ").append(AsyncProfilerTaskLogRecord.TASK_ID).append(" = ?"); + parameters.add(taskId); sql.append(" order by ").append(AsyncProfilerTaskLogRecord.OPERATION_TIME).append(" desc"); return new SQLAndParameters(sql.toString(), parameters); }