From e4887678fcca10f3107ac6f38d9e1b79e3e2b2af Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Thu, 26 Mar 2026 11:39:49 +0300 Subject: [PATCH 1/4] WIP --- .../continuous/GridContinuousProcessor.java | 710 ++---------------- .../StartRoutineDiscoveryMessage.java | 116 +-- .../StartRoutineDiscoveryMessageV2.java | 41 +- 3 files changed, 64 insertions(+), 803 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 521b881c82787..fe3e72a696245 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -17,12 +17,7 @@ package org.apache.ignite.internal.processors.continuous; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -34,15 +29,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; @@ -73,14 +65,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; -import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.systemview.ContinuousQueryViewWalker; import org.apache.ignite.internal.thread.OomExceptionHandler; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.gridfunc.ReadOnlyCollectionView2X; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; @@ -106,7 +96,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC; -import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.toCountersMap; @@ -166,15 +155,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** */ private boolean processorStopped; - /** Query sequence number for message topic. */ - private final AtomicLong seq = new AtomicLong(); - /** */ private ContinuousRoutinesInfo routinesInfo; - /** */ - private int discoProtoVer; - /** * @param ctx Kernal context. */ @@ -189,10 +172,7 @@ public GridContinuousProcessor(GridKernalContext ctx) { new ReadOnlyCollectionView2X<>(rmtInfos.entrySet(), locInfos.entrySet()), e -> new ContinuousQueryView(e.getKey(), e.getValue())); - discoProtoVer = ctx.discovery().mutableCustomMessages() ? 1 : 2; - - if (discoProtoVer == 2) - routinesInfo = new ContinuousRoutinesInfo(); + routinesInfo = new ContinuousRoutinesInfo(); retryDelay = ctx.config().getNetworkSendRetryDelay(); retryCnt = ctx.config().getNetworkSendRetryCount(); @@ -212,26 +192,11 @@ public GridContinuousProcessor(GridKernalContext ctx) { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StartRoutineDiscoveryMessage msg) { - assert discoProtoVer == 1 : discoProtoVer; - - if (ctx.isStopping()) - return; - - processStartRequest(snd, msg); - } - }); - - ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessageV2.class, - new CustomEventListener() { - @Override public void onCustomEvent(AffinityTopologyVersion topVer, - ClusterNode snd, - StartRoutineDiscoveryMessageV2 msg) { - assert discoProtoVer == 2 : discoProtoVer; if (ctx.isStopping()) return; - processStartRequestV2(topVer, snd, msg); + processStartRequest(topVer, snd, msg); } }); @@ -252,8 +217,7 @@ public GridContinuousProcessor(GridKernalContext ctx) { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StopRoutineDiscoveryMessage msg) { - if (discoProtoVer == 2) - routinesInfo.removeRoutine(msg.routineId); + routinesInfo.removeRoutine(msg.routineId); if (ctx.isStopping()) return; @@ -409,106 +373,12 @@ public void unlockStopping() { /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - if (discoProtoVer == 2) { - routinesInfo.collectJoiningNodeData(dataBag); - - return; - } - - Serializable data = getDiscoveryData(dataBag.joiningNodeId()); - - if (data != null) - dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), data); + routinesInfo.collectJoiningNodeData(dataBag); } /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - if (discoProtoVer == 2) { - routinesInfo.collectGridNodeData(dataBag); - - return; - } - - Serializable data = getDiscoveryData(dataBag.joiningNodeId()); - - if (data != null) - dataBag.addNodeSpecificData(CONTINUOUS_PROC.ordinal(), data); - } - - /** - * @param joiningNodeId Joining node id. - */ - private Serializable getDiscoveryData(UUID joiningNodeId) { - if (log.isDebugEnabled()) { - log.debug("collectDiscoveryData [node=" + joiningNodeId + - ", loc=" + ctx.localNodeId() + - ", locInfos=" + locInfos + - ", clientInfos=" + clientInfos + - ']'); - } - - if (!joiningNodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) { - Map> clientInfos0 = copyClientInfos(clientInfos); - - if (joiningNodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient()) { - Map infos = copyLocalInfos(locInfos); - - clientInfos0.put(ctx.localNodeId(), infos); - } - - DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0); - - // Collect listeners information (will be sent to joining node during discovery process). - for (Map.Entry e : locInfos.entrySet()) { - UUID routineId = e.getKey(); - LocalRoutineInfo info = e.getValue(); - - assert !ctx.config().isPeerClassLoadingEnabled() || - !(info.hnd instanceof CacheContinuousQueryHandler) || - ((CacheContinuousQueryHandler)info.hnd).isMarshalled(); - - data.addItem(new DiscoveryDataItem(routineId, - info.prjPred, - info.hnd, - info.bufSize, - info.interval, - info.autoUnsubscribe)); - } - - return data; - } - - return null; - } - - /** - * @param clientInfos Client infos. - */ - private Map> copyClientInfos(Map> clientInfos) { - Map> res = U.newHashMap(clientInfos.size()); - - for (Map.Entry> e : clientInfos.entrySet()) { - Map cp = U.newHashMap(e.getValue().size()); - - for (Map.Entry e0 : e.getValue().entrySet()) - cp.put(e0.getKey(), e0.getValue()); - - res.put(e.getKey(), cp); - } - - return res; - } - - /** - * @param locInfos Locale infos. - */ - private Map copyLocalInfos(Map locInfos) { - Map res = U.newHashMap(locInfos.size()); - - for (Map.Entry e : locInfos.entrySet()) - res.put(e.getKey(), e.getValue()); - - return res; + routinesInfo.collectGridNodeData(dataBag); } /** {@inheritDoc} */ @@ -520,91 +390,31 @@ private Map copyLocalInfos(Map l ']'); } - if (discoProtoVer == 2) { - if (data.hasJoiningNodeData()) { - ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData) - data.joiningNodeData(); + if (data.hasJoiningNodeData()) { + ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData) + data.joiningNodeData(); - for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { - routinesInfo.addRoutineInfo(routineInfo); + for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { + routinesInfo.addRoutineInfo(routineInfo); - onDiscoveryDataReceivedV2(routineInfo); - } + onDiscoveryDataReceivedV2(routineInfo); } } - else { - if (data.hasJoiningNodeData()) - onDiscoveryDataReceivedV1((DiscoveryData)data.joiningNodeData()); - } } /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - if (discoProtoVer == 2) { - if (data.commonData() != null) { - ContinuousRoutinesCommonDiscoveryData commonData = - (ContinuousRoutinesCommonDiscoveryData)data.commonData(); + if (data.commonData() != null) { + ContinuousRoutinesCommonDiscoveryData commonData = + (ContinuousRoutinesCommonDiscoveryData)data.commonData(); - for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { - if (routinesInfo.routineExists(routineInfo.routineId)) - continue; + for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { + if (routinesInfo.routineExists(routineInfo.routineId)) + continue; - routinesInfo.addRoutineInfo(routineInfo); + routinesInfo.addRoutineInfo(routineInfo); - onDiscoveryDataReceivedV2(routineInfo); - } - } - } - else { - Map nodeSpecData = data.nodeSpecificData(); - - if (nodeSpecData != null) { - for (Map.Entry e : nodeSpecData.entrySet()) - onDiscoveryDataReceivedV1((DiscoveryData)e.getValue()); - } - } - } - - /** - * Processes data received in a discovery message. - * Used with protocol version 1. - * - * @param data received discovery data. - */ - private void onDiscoveryDataReceivedV1(DiscoveryData data) { - if (data != null) { - for (DiscoveryDataItem item : data.items) { - if (!locInfos.containsKey(item.routineId)) { - registerHandlerOnJoin(data.nodeId, item.routineId, item.prjPred, - item.hnd, item.bufSize, item.interval, item.autoUnsubscribe); - } - - if (!item.autoUnsubscribe) { - locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(data.nodeId, - item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe)); - } - } - - // Process CQs started on clients. - for (Map.Entry> entry : data.clientInfos.entrySet()) { - UUID clientNodeId = entry.getKey(); - - if (!ctx.localNodeId().equals(clientNodeId)) { - Map clientRoutineMap = entry.getValue(); - - for (Map.Entry e : clientRoutineMap.entrySet()) { - UUID routineId = e.getKey(); - LocalRoutineInfo info = e.getValue(); - - registerHandlerOnJoin(clientNodeId, routineId, info.prjPred, - info.hnd, info.bufSize, info.interval, info.autoUnsubscribe); - } - } - - Map map = - clientInfos.computeIfAbsent(clientNodeId, k -> new HashMap<>()); - - map.putAll(entry.getValue()); + onDiscoveryDataReceivedV2(routineInfo); } } } @@ -756,95 +566,6 @@ public void onCacheStop(GridCacheContext ctx) { } } - /** - * Registers routine info to be sent in discovery data during this node join - * (to be used for internal queries started from client nodes). - * - * Peer class loading is not applied to static routines. - * - * @param cacheName Cache name. - * @param locLsnr Local listener. - * @param rmtFilter Remote filter. - * @param prjPred Projection predicate. - * @return Routine ID. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - public UUID registerStaticRoutine( - String cacheName, - CacheEntryUpdatedListener locLsnr, - CacheEntryEventSerializableFilter rmtFilter, - @Nullable IgnitePredicate prjPred) throws IgniteCheckedException { - String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName; - - CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler( - cacheName, - TOPIC_CACHE.topic(topicPrefix, ctx.localNodeId(), seq.incrementAndGet()), - locLsnr, - rmtFilter, - true, - false, - true, - false); - - hnd.internal(true); - - final UUID routineId = UUID.randomUUID(); - - LocalRoutineInfo routineInfo = new LocalRoutineInfo(ctx.localNodeId(), prjPred, hnd, 1, 0, true); - - if (discoProtoVer == 2) { - routinesInfo.addRoutineInfo(createRoutineInfo( - ctx.localNodeId(), - routineId, - hnd, - prjPred, - routineInfo.bufSize, - routineInfo.interval, - routineInfo.autoUnsubscribe)); - } - - locInfos.put(routineId, routineInfo); - - registerMessageListener(hnd); - - return routineId; - } - - /** - * @param srcNodeId Source node ID. - * @param routineId Routine ID. - * @param hnd Handler. - * @param nodeFilter Node filter. - * @param bufSize Handler buffer size. - * @param interval Time interval. - * @param autoUnsubscribe Auto unsubscribe flag. - * @return Routine info instance. - * @throws IgniteCheckedException If failed. - */ - private ContinuousRoutineInfo createRoutineInfo( - UUID srcNodeId, - UUID routineId, - GridContinuousHandler hnd, - @Nullable IgnitePredicate nodeFilter, - int bufSize, - long interval, - boolean autoUnsubscribe) - throws IgniteCheckedException { - byte[] hndBytes = marsh.marshal(hnd); - - byte[] filterBytes = nodeFilter != null ? marsh.marshal(nodeFilter) : null; - - return new ContinuousRoutineInfo( - srcNodeId, - routineId, - hndBytes, - filterBytes, - bufSize, - interval, - autoUnsubscribe); - } - /** * @param hnd Handler. * @param bufSize Buffer size. @@ -875,7 +596,7 @@ public IgniteInternalFuture startRoutine(GridContinuousHandler hnd, // Register routine locally. locInfos.put(routineId, - new LocalRoutineInfo(ctx.localNodeId(), prjPred, hnd, bufSize, interval, autoUnsubscribe)); + new LocalRoutineInfo(ctx.localNodeId(), hnd, bufSize, interval, autoUnsubscribe)); if (locOnly) { try { @@ -981,53 +702,25 @@ private AbstractContinuousMessage createStartMessage(UUID routineId, hnd.p2pMarshal(ctx); } - if (discoProtoVer == 1) { - StartRequestData reqData = new StartRequestData( - nodeFilter, - hnd, - bufSize, - interval, - autoUnsubscribe); + byte[] nodeFilterBytes = nodeFilter != null ? U.marshal(marsh, nodeFilter) : null; + byte[] hndBytes = U.marshal(marsh, hnd); - if (clsName != null) { - reqData.className(clsName); - reqData.deploymentInfo(dep); - - reqData.p2pMarshal(marsh); - } - - StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage( - routineId, - reqData, - reqData.handler().keepBinary()); - - if (hnd.updateCounters() != null) - msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters()); + StartRequestDataV2 reqData = new StartRequestDataV2(nodeFilterBytes, + hndBytes, + bufSize, + interval, + autoUnsubscribe); - return msg; + if (clsName != null) { + reqData.className(clsName); + reqData.deploymentInfo(dep); } - else { - assert discoProtoVer == 2 : discoProtoVer; - - byte[] nodeFilterBytes = nodeFilter != null ? U.marshal(marsh, nodeFilter) : null; - byte[] hndBytes = U.marshal(marsh, hnd); - StartRequestDataV2 reqData = new StartRequestDataV2(nodeFilterBytes, - hndBytes, - bufSize, - interval, - autoUnsubscribe); - if (clsName != null) { - reqData.className(clsName); - reqData.deploymentInfo(dep); - } - - return new StartRoutineDiscoveryMessageV2( - routineId, - reqData, - hnd.keepBinary()); - } + return new StartRoutineDiscoveryMessage( + routineId, + reqData, + hnd.keepBinary()); } /** @@ -1076,7 +769,7 @@ public IgniteInternalFuture stopRoutine(UUID routineId) { // Only one thread will stop routine with provided ID. if (fut == null) { - StopFuture old = stopFuts.putIfAbsent(routineId, fut = new StopFuture(ctx)); + StopFuture old = stopFuts.putIfAbsent(routineId, fut = new StopFuture()); if (old != null) fut = old; @@ -1097,7 +790,7 @@ public IgniteInternalFuture stopRoutine(UUID routineId) { unregisterHandler(routineId, routine.hnd, true); } - if (!stop && discoProtoVer == 2) + if (!stop) stop = routinesInfo.routineExists(routineId); // Finish if routine is not found (wrong ID is provided). @@ -1285,8 +978,7 @@ public void addNotification(UUID nodeId, clientInfos.clear(); - if (discoProtoVer == 2) - routinesInfo.onClientDisconnected(locInfos.keySet()); + routinesInfo.onClientDisconnected(locInfos.keySet()); if (log.isDebugEnabled()) { log.debug("after onDisconnected [rmtInfos=" + rmtInfos + @@ -1368,133 +1060,6 @@ private void processStartAckRequest(AffinityTopologyVersion topVer, } } - /** - * @param node Sender. - * @param req Start request. - */ - private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage req) { - if (node.id().equals(ctx.localNodeId())) - return; - - UUID routineId = req.routineId(); - - if (req.deserializationException() != null && checkNodeFilter(req)) { - IgniteCheckedException err = new IgniteCheckedException(req.deserializationException()); - - req.addError(node.id(), err); - - U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', err); - - return; - } - - StartRequestData data = req.startRequestData(); - - GridContinuousHandler hnd = data.handler(); - - if (req.keepBinary()) { - assert hnd instanceof CacheContinuousQueryHandler; - - ((CacheContinuousQueryHandler)hnd).keepBinary(true); - } - - IgniteCheckedException err = null; - - try { - if (ctx.config().isPeerClassLoadingEnabled()) { - String clsName = data.className(); - - if (clsName != null) { - GridDeploymentInfo depInfo = data.deploymentInfo(); - - GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, - depInfo.userVersion(), node.id(), depInfo.classLoaderId(), depInfo.participants(), null); - - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); - - data.p2pUnmarshal(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config())); - } - } - } - catch (IgniteCheckedException e) { - err = e; - - U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); - } - - if (node.isClient()) { - Map clientRoutineMap = clientInfos.get(node.id()); - - if (clientRoutineMap == null) { - clientRoutineMap = new HashMap<>(); - - Map old = clientInfos.put(node.id(), clientRoutineMap); - - assert old == null; - } - - clientRoutineMap.put(routineId, new LocalRoutineInfo(node.id(), - data.projectionPredicate(), - hnd, - data.bufferSize(), - data.interval(), - data.autoUnsubscribe())); - } - - if (err == null) { - try { - IgnitePredicate prjPred = data.projectionPredicate(); - - if (prjPred != null) - ctx.resource().injectGeneric(prjPred); - - if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) && - !locInfos.containsKey(routineId)) { - if (ctx.config().isPeerClassLoadingEnabled()) - hnd.p2pUnmarshal(node.id(), ctx); - - registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(), - data.autoUnsubscribe(), false); - - // Load partition counters. - if (err == null && hnd.isQuery()) { - GridCacheProcessor proc = ctx.cache(); - - if (proc != null) { - GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName()); - - if (cache != null && cache.context().userCache()) - req.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters()); - } - } - } - - if (!data.autoUnsubscribe()) - // Register routine locally. - locInfos.putIfAbsent(routineId, new LocalRoutineInfo( - node.id(), prjPred, hnd, data.bufferSize(), data.interval(), data.autoUnsubscribe())); - } - catch (IgniteCheckedException e) { - err = e; - - U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); - } - } - - if (err != null) - req.addError(ctx.localNodeId(), err); - } - - /** */ - private boolean checkNodeFilter(StartRoutineDiscoveryMessage req) { - StartRequestData reqData = req.startRequestData(); - IgnitePredicate prjPred; - - return reqData == null || (prjPred = reqData.projectionPredicate()) == null - || prjPred.apply(ctx.discovery().localNode()); - } - /** * @param sndId Sender node ID. * @param msg Message. @@ -1511,9 +1076,9 @@ private void processRoutineStartResultMessage(UUID sndId, ContinuousRoutineStart * @param snd Sender. * @param msg Start request. */ - private void processStartRequestV2(final AffinityTopologyVersion topVer, + private void processStartRequest(final AffinityTopologyVersion topVer, final ClusterNode snd, - final StartRoutineDiscoveryMessageV2 msg) { + final StartRoutineDiscoveryMessage msg) { StartRequestDataV2 reqData = msg.startRequestData(); ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(), @@ -1984,12 +1549,10 @@ private class DiscoveryListener implements GridLocalEventListener, HighPriorityL UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); - if (discoProtoVer == 2) { - routinesInfo.onNodeFail(nodeId); + routinesInfo.onNodeFail(nodeId); - for (StartFuture fut : startFuts.values()) - fut.onNodeFail(nodeId); - } + for (StartFuture fut : startFuts.values()) + fut.onNodeFail(nodeId); clientInfos.remove(nodeId); @@ -2062,9 +1625,6 @@ public static class LocalRoutineInfo implements Serializable, RoutineInfo { /** Source node id. */ private final UUID nodeId; - /** Projection predicate. */ - private final IgnitePredicate prjPred; - /** Continuous routine handler. */ private final GridContinuousHandler hnd; @@ -2079,7 +1639,6 @@ public static class LocalRoutineInfo implements Serializable, RoutineInfo { /** * @param nodeId Node id. - * @param prjPred Projection predicate. * @param hnd Continuous routine handler. * @param bufSize Buffer size. * @param interval Interval. @@ -2087,7 +1646,6 @@ public static class LocalRoutineInfo implements Serializable, RoutineInfo { */ LocalRoutineInfo( UUID nodeId, - @Nullable IgnitePredicate prjPred, GridContinuousHandler hnd, int bufSize, long interval, @@ -2098,7 +1656,6 @@ public static class LocalRoutineInfo implements Serializable, RoutineInfo { assert interval >= 0; this.nodeId = nodeId; - this.prjPred = prjPred; this.hnd = hnd; this.bufSize = bufSize; this.interval = interval; @@ -2367,157 +1924,6 @@ IgniteBiTuple checkInterval() { } } - /** - * Discovery data. - */ - private static class DiscoveryData implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Node ID. */ - private UUID nodeId; - - /** Items. */ - @GridToStringInclude - private Collection items; - - /** */ - private Map> clientInfos; - - /** - * Required by {@link Externalizable}. - */ - public DiscoveryData() { - // No-op. - } - - /** - * @param nodeId Node ID. - * @param clientInfos Client information. - */ - DiscoveryData(UUID nodeId, Map> clientInfos) { - assert nodeId != null; - - this.nodeId = nodeId; - - this.clientInfos = clientInfos; - - items = new ArrayList<>(); - } - - /** - * @param item Item. - */ - public void addItem(DiscoveryDataItem item) { - items.add(item); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, nodeId); - U.writeCollection(out, items); - U.writeMap(out, clientInfos); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - nodeId = U.readUuid(in); - items = U.readCollection(in); - clientInfos = U.readMap(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DiscoveryData.class, this); - } - } - - /** - * Discovery data item. - */ - private static class DiscoveryDataItem implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Consume ID. */ - private UUID routineId; - - /** Projection predicate. */ - private IgnitePredicate prjPred; - - /** Handler. */ - private GridContinuousHandler hnd; - - /** Buffer size. */ - private int bufSize; - - /** Time interval. */ - private long interval; - - /** Automatic unsubscribe flag. */ - private boolean autoUnsubscribe; - - /** - * Required by {@link Externalizable}. - */ - public DiscoveryDataItem() { - // No-op. - } - - /** - * @param routineId Consume ID. - * @param prjPred Projection predicate. - * @param hnd Handler. - * @param bufSize Buffer size. - * @param interval Time interval. - * @param autoUnsubscribe Automatic unsubscribe flag. - */ - DiscoveryDataItem(UUID routineId, - @Nullable IgnitePredicate prjPred, - GridContinuousHandler hnd, - int bufSize, - long interval, - boolean autoUnsubscribe - ) { - assert routineId != null; - assert hnd != null; - assert bufSize > 0; - assert interval >= 0; - - this.routineId = routineId; - this.prjPred = prjPred; - this.hnd = hnd; - this.bufSize = bufSize; - this.interval = interval; - this.autoUnsubscribe = autoUnsubscribe; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, routineId); - out.writeObject(prjPred); - out.writeObject(hnd); - out.writeInt(bufSize); - out.writeLong(interval); - out.writeBoolean(autoUnsubscribe); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - routineId = U.readUuid(in); - prjPred = (IgnitePredicate)in.readObject(); - hnd = (GridContinuousHandler)in.readObject(); - bufSize = in.readInt(); - interval = in.readLong(); - autoUnsubscribe = in.readBoolean(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DiscoveryDataItem.class, this); - } - } - /** * Future for start routine. */ @@ -2712,38 +2118,6 @@ private static class RoutineRegisterResults { * Future for stop routine. */ private static class StopFuture extends GridFutureAdapter { - /** Timeout object. */ - private volatile GridTimeoutObject timeoutObj; - - /** */ - private GridKernalContext ctx; - - /** - * @param ctx Kernal context. - */ - StopFuture(GridKernalContext ctx) { - this.ctx = ctx; - } - - /** - * @param timeoutObj Timeout object. - */ - public void addTimeoutObject(GridTimeoutObject timeoutObj) { - assert timeoutObj != null; - - this.timeoutObj = timeoutObj; - - ctx.timeout().addTimeoutObject(timeoutObj); - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { - if (timeoutObj != null) - ctx.timeout().removeTimeoutObject(timeoutObj); - - return super.onDone(res, err); - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(StopFuture.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index 4bca717bd4dd3..beae46ad9faf9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -17,148 +17,64 @@ package org.apache.ignite.internal.processors.continuous; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; /** - * Discovery message used for Continuous Query registration. + * */ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** */ private static final long serialVersionUID = 0L; /** */ - private final StartRequestData startReqData; - - /** */ - // Initilized here as well to preserve compatibility with previous versions - private Map errs = new HashMap<>(); - - /** */ - private Map> updateCntrs; + private static final int KEEP_BINARY_FLAG = 0x01; /** */ - private Map>> updateCntrsPerNode; + StartRequestDataV2 startReqData; - /** Keep binary flag. */ - private boolean keepBinary; + /** Flags. */ + private int flags; /** */ - private transient ClassNotFoundException deserEx; + public StartRoutineDiscoveryMessage() {} /** * @param routineId Routine id. * @param startReqData Start request data. + * @param keepBinary Keep binary flag. */ - public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData, boolean keepBinary) { + StartRoutineDiscoveryMessage(UUID routineId, StartRequestDataV2 startReqData, boolean keepBinary) { super(routineId); this.startReqData = startReqData; - this.keepBinary = keepBinary; + + if (keepBinary) + flags |= KEEP_BINARY_FLAG; } /** * @return Start request data. */ - public StartRequestData startRequestData() { + public StartRequestDataV2 startRequestData() { return startReqData; } - /** - * @param nodeId Node id. - * @param e Exception. - */ - public void addError(UUID nodeId, IgniteCheckedException e) { - if (errs == null) - errs = new HashMap<>(); - - errs.put(nodeId, e); - } - - /** - * @param cntrs Update counters. - */ - private void addUpdateCounters(Map> cntrs) { - if (updateCntrs == null) - updateCntrs = new HashMap<>(); - - for (Map.Entry> e : cntrs.entrySet()) { - T2 cntr0 = updateCntrs.get(e.getKey()); - T2 cntr1 = e.getValue(); - - if (cntr0 == null || cntr1.get2() > cntr0.get2()) - updateCntrs.put(e.getKey(), cntr1); - } - } - - /** - * @param nodeId Local node ID. - * @param cntrs Update counters. - */ - public void addUpdateCounters(UUID nodeId, Map> cntrs) { - addUpdateCounters(cntrs); - - if (updateCntrsPerNode == null) - updateCntrsPerNode = new HashMap<>(); - - Map> old = updateCntrsPerNode.put(nodeId, cntrs); - - assert old == null : old; - } - - /** - * @return Errs. - */ - public Map errs() { - return errs != null ? errs : Collections.emptyMap(); - } - /** * @return {@code True} if keep binary flag was set on continuous handler. */ public boolean keepBinary() { - return keepBinary; - } - - /** {@inheritDoc} */ - @Override public boolean isMutable() { - return true; + return (flags & KEEP_BINARY_FLAG) != 0; } /** {@inheritDoc} */ @Override public DiscoveryCustomMessage ackMessage() { - return new StartRoutineAckDiscoveryMessage(routineId, errs(), updateCntrs, updateCntrsPerNode); + return null; } - /** */ - private void readObject(ObjectInputStream in) throws IOException { - // Override default serialization in order to tolerate missing classes exceptions (e.g. remote filter class). - // We need this means because CQ registration process assumes that an "ack message" will be sent. - try { - in.defaultReadObject(); - } - catch (ClassNotFoundException e) { - deserEx = e; - - throw new IncompleteDeserializationException(this); - } - } - - /** - * @return Exception occurred during deserialization. - */ - @Nullable public ClassNotFoundException deserializationException() { - return deserEx; + @Override public boolean isMutable() { + return super.isMutable(); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java index 275765da73050..ce91ff7d5b46d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java @@ -18,56 +18,27 @@ package org.apache.ignite.internal.processors.continuous; import java.util.UUID; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; /** * */ -public class StartRoutineDiscoveryMessageV2 extends AbstractContinuousMessage { +public class StartRoutineDiscoveryMessageV2 extends StartRoutineDiscoveryMessage { /** */ private static final long serialVersionUID = 0L; - /** */ - private static final int KEEP_BINARY_FLAG = 0x01; - - /** */ - private final StartRequestDataV2 startReqData; - - /** Flags. */ - private int flags; - /** - * @param routineId Routine id. + * @param routineId Routine id. * @param startReqData Start request data. - * @param keepBinary Keep binary flag. + * @param keepBinary Keep binary flag. */ StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestDataV2 startReqData, boolean keepBinary) { - super(routineId); - - this.startReqData = startReqData; - - if (keepBinary) - flags |= KEEP_BINARY_FLAG; - } - - /** - * @return Start request data. - */ - public StartRequestDataV2 startRequestData() { - return startReqData; - } - - /** - * @return {@code True} if keep binary flag was set on continuous handler. - */ - public boolean keepBinary() { - return (flags & KEEP_BINARY_FLAG) != 0; + super(routineId, startReqData, keepBinary); } /** {@inheritDoc} */ - @Override public DiscoveryCustomMessage ackMessage() { - return null; + @Override public boolean isMutable() { + return true; } /** {@inheritDoc} */ From 7782eb01b304ad12fbaec80df7479fd63cf39c3c Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Thu, 26 Mar 2026 12:04:16 +0300 Subject: [PATCH 2/4] WIP --- .../processors/continuous/StartRoutineDiscoveryMessage.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index beae46ad9faf9..7a78491efcd9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -73,10 +73,6 @@ public boolean keepBinary() { return null; } - @Override public boolean isMutable() { - return super.isMutable(); - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(StartRoutineDiscoveryMessage.class, this, "routineId", routineId()); From 0d72f00e071978a8fb6c342b0c14ee71861bcdc1 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Thu, 26 Mar 2026 12:05:56 +0300 Subject: [PATCH 3/4] WIP --- .../StartRoutineDiscoveryMessageV2.java | 48 ------------------- .../resources/META-INF/classnames.properties | 1 - .../messaging/GridMessagingSelfTest.java | 5 +- 3 files changed, 2 insertions(+), 52 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java deleted file mode 100644 index ce91ff7d5b46d..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.continuous; - -import java.util.UUID; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * - */ -public class StartRoutineDiscoveryMessageV2 extends StartRoutineDiscoveryMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param routineId Routine id. - * @param startReqData Start request data. - * @param keepBinary Keep binary flag. - */ - StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestDataV2 startReqData, boolean keepBinary) { - super(routineId, startReqData, keepBinary); - } - - /** {@inheritDoc} */ - @Override public boolean isMutable() { - return true; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StartRoutineDiscoveryMessageV2.class, this, "routineId", routineId()); - } -} diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 03bdf2e6e8a4d..740bc6cc8c9ff 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1528,7 +1528,6 @@ org.apache.ignite.internal.processors.continuous.StartRequestData org.apache.ignite.internal.processors.continuous.StartRequestDataV2 org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage -org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2 org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$3 diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java index 6a7760ec5584c..5e2e4c9cdda25 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java @@ -40,7 +40,6 @@ import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.internal.DiscoverySpiTestListener; import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage; -import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2; import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.P2; @@ -1051,7 +1050,7 @@ public void testAsyncOld() throws Exception { } }, IllegalStateException.class, null); - lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class); + lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessage.class); final String topic = "topic"; @@ -1149,7 +1148,7 @@ public void testAsync() throws Exception { discoSpi.setInternalListener(lsnr); - lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class); + lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessage.class); final String topic = "topic"; From d3e6d4ee8b6d236fd9942a9906685ac60c46e48f Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Thu, 26 Mar 2026 12:14:10 +0300 Subject: [PATCH 4/4] WIP --- .../processors/continuous/GridContinuousProcessor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index fe3e72a696245..d2e0d5e25dc42 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -716,8 +716,7 @@ private AbstractContinuousMessage createStartMessage(UUID routineId, reqData.deploymentInfo(dep); } - - return new StartRoutineDiscoveryMessage( + return new StartRoutineDiscoveryMessage( routineId, reqData, hnd.keepBinary());