From 8d8c25fc57154d7331e0635b1e7b3721b33011f4 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Fri, 27 Mar 2026 18:10:40 +0300 Subject: [PATCH] IGNITE-28050 Use message serializer for start continuous routine messages --- .../internal/GridEventConsumeHandler.java | 7 +- .../internal/GridMessageListenHandler.java | 7 +- .../discovery/DiscoveryMessageFactory.java | 12 + .../IncompleteDeserializationException.java | 53 ---- .../CachePartitionPartialCountersMap.java | 8 +- .../CacheContinuousQueryHandler.java | 34 ++- .../continuous/AbstractContinuousMessage.java | 3 +- .../continuous/GridContinuousHandler.java | 7 +- .../continuous/GridContinuousProcessor.java | 197 +++++---------- .../continuous/StartRequestData.java | 239 +++++++----------- .../continuous/StartRequestDataV2.java | 164 ------------ .../StartRoutineAckDiscoveryMessage.java | 35 ++- .../StartRoutineDiscoveryMessage.java | 86 ++----- .../StartRoutineDiscoveryMessageV2.java | 23 +- .../StopRoutineAckDiscoveryMessage.java | 4 +- .../StopRoutineDiscoveryMessage.java | 4 +- .../TcpDiscoveryCustomEventMessage.java | 17 +- .../resources/META-INF/classnames.properties | 2 - ...ncompleteDeserializationExceptionTest.java | 115 --------- .../testsuites/IgniteBasicTestSuite2.java | 3 - 20 files changed, 257 insertions(+), 763 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationException.java delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java delete mode 100644 modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationExceptionTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 4b1ad612add71..1ac2ce2bc9227 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -49,7 +49,6 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P2; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; @@ -142,13 +141,13 @@ public GridEventConsumeHandler() { } /** {@inheritDoc} */ - @Override public void updateCounters(AffinityTopologyVersion topVer, Map>> cntrsPerNode, - Map> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map> cntrsPerNode, + Map cntrs) { // No-op. } /** {@inheritDoc} */ - @Override public Map> updateCounters() { + @Override public Map updateCounters() { return Collections.emptyMap(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index bcd35ed249e5b..e59873f4b449f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -36,7 +36,6 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPeerDeployAware; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; @@ -117,13 +116,13 @@ public GridMessageListenHandler(@Nullable Object topic, IgniteBiPredicate>> cntrsPerNode, - Map> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map> cntrsPerNode, + Map cntrs) { // No-op. } /** {@inheritDoc} */ - @Override public Map> updateCounters() { + @Override public Map updateCounters() { return Collections.emptyMap(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 4960fb374c83a..7c2a991f18e6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -95,6 +95,14 @@ import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessageSerializer; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessageMarshallableSerializer; +import org.apache.ignite.internal.processors.continuous.StartRequestData; +import org.apache.ignite.internal.processors.continuous.StartRequestDataSerializer; +import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; +import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessageSerializer; +import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage; +import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageSerializer; +import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2; +import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2Serializer; import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessageSerializer; import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage; @@ -245,6 +253,7 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) { factory.register(-200, TcpDiscoveryCollectionMessage::new, new TcpDiscoveryCollectionMessageMarshallableSerializer(marsh, clsLdr)); + factory.register(-118, StartRequestData::new, new StartRequestDataSerializer()); factory.register(-117, TcpDiscoveryNode::new, new TcpDiscoveryNodeMarshallableSerializer(marsh, clsLdr)); factory.register(-116, IgniteProductVersion::new, new IgniteProductVersionSerializer()); factory.register(-115, SchemaAlterTableAddColumnOperation::new, new SchemaAlterTableAddColumnOperationSerializer()); @@ -367,5 +376,8 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) { new ServiceDeploymentRequestMarshallableSerializer(marsh, clsLdr)); factory.register(538, ServiceUndeploymentRequest::new, new ServiceUndeploymentRequestSerializer()); factory.register(539, ExchangeFailureMessage::new, new ExchangeFailureMessageSerializer()); + factory.register(540, StartRoutineDiscoveryMessage::new, new StartRoutineDiscoveryMessageSerializer()); + factory.register(541, StartRoutineAckDiscoveryMessage::new, new StartRoutineAckDiscoveryMessageSerializer()); + factory.register(542, StartRoutineDiscoveryMessageV2::new, new StartRoutineDiscoveryMessageV2Serializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationException.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationException.java deleted file mode 100644 index 5a440cefa1cca..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationException.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.discovery; - -import org.jetbrains.annotations.NotNull; - -/** - * Exception which can be used to access a message which failed to be deserialized completely using Java serialization. - * Throwed from deserialization methods it can be caught by a caller. - *

- * Should be {@link RuntimeException} because of limitations of Java serialization mechanisms. - *

- * Catching {@link ClassNotFoundException} inside deserialization methods cannot do the same trick because - * Java deserialization remembers such exception internally and will rethrow it anyway upon returing to a user. - */ -public class IncompleteDeserializationException extends RuntimeException { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final DiscoveryCustomMessage m; - - /** - * @param m Message. - */ - public IncompleteDeserializationException(@NotNull DiscoveryCustomMessage m) { - super(null, null, false, false); - - this.m = m; - } - - /** - * @return Message. - */ - @NotNull public DiscoveryCustomMessage message() { - return m; - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java index de0f3ac0d8597..7a723fbb78068 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.Map; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; @@ -203,15 +202,14 @@ public long updateCounterAt(int idx) { * @param cntrsMap Partial local counters map. * @return Partition ID to partition counters map. */ - public static Map> toCountersMap(CachePartitionPartialCountersMap cntrsMap) { + public static Map toCountersMap(CachePartitionPartialCountersMap cntrsMap) { if (cntrsMap.size() == 0) return Collections.emptyMap(); - Map> res = U.newHashMap(cntrsMap.size()); + Map res = U.newHashMap(cntrsMap.size()); for (int idx = 0; idx < cntrsMap.size(); idx++) - res.put(cntrsMap.partitionAt(idx), - new T2<>(cntrsMap.initialUpdateCounterAt(idx), cntrsMap.updateCounterAt(idx))); + res.put(cntrsMap.partitionAt(idx), cntrsMap.updateCounterAt(idx)); return res; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index ee131f4db3ebb..2c5ce3ba634a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -75,7 +75,6 @@ import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -200,10 +199,10 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler private transient int cacheId; /** */ - private transient volatile Map> initUpdCntrs; + private transient volatile Map initUpdCntrs; /** */ - private transient volatile Map>> initUpdCntrsPerNode; + private transient volatile Map> initUpdCntrsPerNode; /** */ private transient volatile AffinityTopologyVersion initTopVer; @@ -224,7 +223,7 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler private transient UUID routineId; /** Local update counters values on listener start. Used for skipping events fired before the listener start. */ - private transient volatile Map> locInitUpdCntrs; + private transient volatile Map locInitUpdCntrs; /** */ private transient GridKernalContext ctx; @@ -361,15 +360,15 @@ public void keepBinary(boolean keepBinary) { } /** {@inheritDoc} */ - @Override public void updateCounters(AffinityTopologyVersion topVer, Map>> cntrsPerNode, - Map> cntrs) { - this.initUpdCntrsPerNode = cntrsPerNode; - this.initUpdCntrs = cntrs; - this.initTopVer = topVer; + @Override public void updateCounters(AffinityTopologyVersion topVer, Map> cntrsPerNode, + Map cntrs) { + initUpdCntrsPerNode = cntrsPerNode; + initUpdCntrs = cntrs; + initTopVer = topVer; } /** {@inheritDoc} */ - @Override public Map> updateCounters() { + @Override public Map updateCounters() { return locInitUpdCntrs; } @@ -1163,9 +1162,9 @@ private String taskName() { CacheContinuousQueryPartitionRecovery rec = rcvs.get(partId); if (rec == null) { - T2 partCntrs = null; + Long partCntr = null; - Map>> initUpdCntrsPerNode = this.initUpdCntrsPerNode; + Map> initUpdCntrsPerNode = this.initUpdCntrsPerNode; if (initUpdCntrsPerNode != null) { GridCacheContext cctx = cacheContext(ctx); @@ -1173,22 +1172,21 @@ private String taskName() { GridCacheAffinityManager aff = cctx.affinity(); for (ClusterNode node : aff.nodesByPartition(partId, topVer)) { - Map> map = initUpdCntrsPerNode.get(node.id()); + Map map = initUpdCntrsPerNode.get(node.id()); if (map != null) { - partCntrs = map.get(partId); + partCntr = map.get(partId); break; } } } else if (initUpdCntrs != null) - partCntrs = initUpdCntrs.get(partId); + partCntr = initUpdCntrs.get(partId); - T2 partCntrs0 = partCntrs; + Long partCntr0 = partCntr; CacheContinuousQueryPartitionRecovery oldRec = rcvs.computeIfAbsent(partId, k -> - new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer, - partCntrs0 != null ? partCntrs0.get2() : null)); + new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer, partCntr0)); if (oldRec != null) rec = oldRec; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java index 64d3f98fb1c82..f3e4dc5228d89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java @@ -21,11 +21,12 @@ import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; /** * */ -public abstract class AbstractContinuousMessage implements DiscoveryCustomMessage { +public abstract class AbstractContinuousMessage implements Message, DiscoveryCustomMessage { /** */ private static final long serialVersionUID = 2781778657738703012L; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index f7482c78ebfa7..b1a3812f61217 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -24,7 +24,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.util.typedef.T2; import org.jetbrains.annotations.Nullable; /** @@ -163,11 +162,11 @@ public default void flushOnNodeLeft() { * @param cntrs Init state for partition counters. * @param topVer Topology version. */ - public void updateCounters(AffinityTopologyVersion topVer, Map>> cntrsPerNode, - Map> cntrs); + public void updateCounters(AffinityTopologyVersion topVer, Map> cntrsPerNode, + Map cntrs); /** * @return Init state for partition counters. */ - public Map> updateCounters(); + public Map updateCounters(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 521b881c82787..1786bdf400317 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -59,7 +59,6 @@ import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; -import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.discovery.CustomEventListener; import org.apache.ignite.internal.managers.discovery.DiscoCache; @@ -83,7 +82,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -976,30 +975,26 @@ private AbstractContinuousMessage createStartMessage(UUID routineId, dep = new GridDeploymentInfoBean(dep0); } - - // Handle peer deployment for other handler-specific objects. - hnd.p2pMarshal(ctx); } - if (discoProtoVer == 1) { - StartRequestData reqData = new StartRequestData( - nodeFilter, - hnd, - bufSize, - interval, - autoUnsubscribe); + StartRequestData reqData = new StartRequestData(nodeFilter, + hnd, + bufSize, + interval, + autoUnsubscribe, + hnd.keepBinary()); - if (clsName != null) { - reqData.className(clsName); - reqData.deploymentInfo(dep); + if (clsName != null) { + reqData.className(clsName); + reqData.deploymentInfo(dep); + } - reqData.p2pMarshal(marsh); - } + reqData.prepareMarshal(ctx); + if (discoProtoVer == 1) { StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage( routineId, - reqData, - reqData.handler().keepBinary()); + reqData); if (hnd.updateCounters() != null) msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters()); @@ -1009,24 +1004,9 @@ private AbstractContinuousMessage createStartMessage(UUID routineId, else { assert discoProtoVer == 2 : discoProtoVer; - byte[] nodeFilterBytes = nodeFilter != null ? U.marshal(marsh, nodeFilter) : null; - byte[] hndBytes = U.marshal(marsh, hnd); - - StartRequestDataV2 reqData = new StartRequestDataV2(nodeFilterBytes, - hndBytes, - bufSize, - interval, - autoUnsubscribe); - - if (clsName != null) { - reqData.className(clsName); - reqData.deploymentInfo(dep); - } - return new StartRoutineDiscoveryMessageV2( routineId, - reqData, - hnd.keepBinary()); + reqData); } } @@ -1362,7 +1342,7 @@ private void processStartAckRequest(AffinityTopologyVersion topVer, if (fut != null) { fut.onAllRemoteRegistered( topVer, - msg.errs(), + msg.errors(), msg.updateCountersPerNode(), msg.updateCounters()); } @@ -1378,51 +1358,31 @@ private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage UUID routineId = req.routineId(); - if (req.deserializationException() != null && checkNodeFilter(req)) { - IgniteCheckedException err = new IgniteCheckedException(req.deserializationException()); - - req.addError(node.id(), err); - - U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', err); - - return; - } - StartRequestData data = req.startRequestData(); - GridContinuousHandler hnd = data.handler(); - - if (req.keepBinary()) { - assert hnd instanceof CacheContinuousQueryHandler; - - ((CacheContinuousQueryHandler)hnd).keepBinary(true); - } - IgniteCheckedException err = null; try { - if (ctx.config().isPeerClassLoadingEnabled()) { - String clsName = data.className(); - - if (clsName != null) { - GridDeploymentInfo depInfo = data.deploymentInfo(); - - GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, - depInfo.userVersion(), node.id(), depInfo.classLoaderId(), depInfo.participants(), null); + data.finishUnmarshal(ctx, node.id()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal start request data [nodeId=" + node.id() + + ", routineId=" + routineId + ']', e); - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); + // Tolerate missing classes exceptions (e.g. remote filter class). + // We need this means because CQ registration process assumes that an "ack message" will be sent. + if (X.hasCause(e, ClassNotFoundException.class)) { + if (checkNodeFilter(req)) + req.addError(node.id(), e); - data.p2pUnmarshal(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config())); - } + return; } - } - catch (IgniteCheckedException e) { - err = e; - U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); + err = e; } + GridContinuousHandler hnd = data.handler(); + if (node.isClient()) { Map clientRoutineMap = clientInfos.get(node.id()); @@ -1435,7 +1395,7 @@ private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage } clientRoutineMap.put(routineId, new LocalRoutineInfo(node.id(), - data.projectionPredicate(), + data.nodeFilter(), hnd, data.bufferSize(), data.interval(), @@ -1444,7 +1404,7 @@ private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage if (err == null) { try { - IgnitePredicate prjPred = data.projectionPredicate(); + IgnitePredicate prjPred = data.nodeFilter(); if (prjPred != null) ctx.resource().injectGeneric(prjPred); @@ -1489,10 +1449,10 @@ private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage /** */ private boolean checkNodeFilter(StartRoutineDiscoveryMessage req) { StartRequestData reqData = req.startRequestData(); - IgnitePredicate prjPred; + IgnitePredicate nodeFilter; - return reqData == null || (prjPred = reqData.projectionPredicate()) == null - || prjPred.apply(ctx.discovery().localNode()); + return reqData == null || (nodeFilter = reqData.nodeFilter()) == null + || nodeFilter.apply(ctx.discovery().localNode()); } /** @@ -1514,12 +1474,12 @@ private void processRoutineStartResultMessage(UUID sndId, ContinuousRoutineStart private void processStartRequestV2(final AffinityTopologyVersion topVer, final ClusterNode snd, final StartRoutineDiscoveryMessageV2 msg) { - StartRequestDataV2 reqData = msg.startRequestData(); + StartRequestData reqData = msg.startRequestData(); ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(), msg.routineId(), - reqData.handlerBytes(), - reqData.nodeFilterBytes(), + reqData.hndBytes, + reqData.nodeFilterBytes, reqData.bufferSize(), reqData.interval(), reqData.autoUnsubscribe()); @@ -1540,73 +1500,42 @@ private void processStartRequestV2(final AffinityTopologyVersion topVer, return; } - StartRequestDataV2 reqData = msg.startRequestData(); + StartRequestData reqData = msg.startRequestData(); Exception err = null; - IgnitePredicate nodeFilter = null; - - CachePartitionPartialCountersMap cntrsMap = null; + try { + reqData.finishUnmarshal(ctx, snd.id()); + } + catch (IgniteCheckedException e) { + err = e; - if (reqData.nodeFilterBytes() != null) { - try { - if (ctx.config().isPeerClassLoadingEnabled() && reqData.className() != null) { - String clsName = reqData.className(); - GridDeploymentInfo depInfo = reqData.deploymentInfo(); - - GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), - clsName, - clsName, - depInfo.userVersion(), - snd.id(), - depInfo.classLoaderId(), - depInfo.participants(), - null); - - if (dep == null) { - throw new IgniteDeploymentCheckedException("Failed to obtain deployment " + - "for class: " + clsName); - } + U.error(log, "Failed to unmarshal continuous request data [" + + "routineId=" + msg.routineId + + ", srcNodeId=" + snd.id() + ']', e); + } - nodeFilter = U.unmarshal(marsh, - reqData.nodeFilterBytes(), - U.resolveClassLoader(dep.classLoader(), ctx.config())); - } - else { - nodeFilter = U.unmarshal(marsh, - reqData.nodeFilterBytes(), - U.resolveClassLoader(ctx.config())); - } + IgnitePredicate nodeFilter = reqData.nodeFilter(); - if (nodeFilter != null) - ctx.resource().injectGeneric(nodeFilter); + if (nodeFilter != null) { + try { + ctx.resource().injectGeneric(nodeFilter); } - catch (Exception e) { - err = e; - - U.error(log, "Failed to unmarshal continuous routine filter [" + + catch (IgniteCheckedException e) { + U.error(log, "Failed to inject generic into continuous routine filter [" + "routineId=" + msg.routineId + ", srcNodeId=" + snd.id() + ']', e); } } + CachePartitionPartialCountersMap cntrsMap = null; + boolean register = err == null && (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())); if (register) { try { - GridContinuousHandler hnd = U.unmarshal(marsh, - reqData.handlerBytes(), - U.resolveClassLoader(ctx.config())); - - if (ctx.config().isPeerClassLoadingEnabled()) - hnd.p2pUnmarshal(snd.id(), ctx); - - if (msg.keepBinary()) { - assert hnd instanceof CacheContinuousQueryHandler : hnd; - - ((CacheContinuousQueryHandler)hnd).keepBinary(true); - } + GridContinuousHandler hnd = reqData.handler(); registerHandler(snd.id(), msg.routineId, @@ -2544,7 +2473,7 @@ private class StartFuture extends GridFutureAdapter { resCollect = new DiscoveryMessageResultsCollector(ctx) { @Override protected RoutineRegisterResults createResult(Map> rcvd) { Map errs = null; - Map>> cntrsPerNode = null; + Map> cntrsPerNode = null; for (Map.Entry> entry : rcvd.entrySet()) { ContinuousRoutineStartResultMessage msg = entry.getValue().message(); @@ -2571,7 +2500,7 @@ private class StartFuture extends GridFutureAdapter { if (cntrsPerNode == null) cntrsPerNode = new HashMap<>(); - cntrsPerNode.put(entry.getKey(), CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); + cntrsPerNode.put(entry.getKey(), toCountersMap(cntrsMap)); } } } @@ -2598,8 +2527,8 @@ private class StartFuture extends GridFutureAdapter { private void onAllRemoteRegistered( AffinityTopologyVersion topVer, @Nullable Map errs, - Map>> cntrsPerNode, - Map> cntrs) { + Map> cntrsPerNode, + Map cntrs) { try { if (errs == null || errs.isEmpty()) { LocalRoutineInfo routine = locInfos.get(routineId); @@ -2692,7 +2621,7 @@ private static class RoutineRegisterResults { private final Map errs; /** */ - private final Map>> cntrsPerNode; + private final Map> cntrsPerNode; /** * @param topVer Topology version. @@ -2701,7 +2630,7 @@ private static class RoutineRegisterResults { */ RoutineRegisterResults(AffinityTopologyVersion topVer, Map errs, - Map>> cntrsPerNode) { + Map> cntrsPerNode) { this.topVer = topVer; this.errs = errs; this.cntrsPerNode = cntrsPerNode; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java index a3049366dfcc5..99c39201500af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,134 +17,96 @@ package org.apache.ignite.internal.processors.continuous; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; +import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.deployment.GridDeployment; +import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.marshaller.Marshaller; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Start request data. */ -class StartRequestData implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Projection predicate. */ - private IgnitePredicate prjPred; +public class StartRequestData implements Message { + /** Node filter. */ + private IgnitePredicate nodeFilter; - /** Serialized projection predicate. */ - private byte[] prjPredBytes; + /** Serialized node filter. */ + @Order(0) + byte[] nodeFilterBytes; /** Deployment class name. */ - private String clsName; + @Order(1) + String clsName; /** Deployment info. */ - private GridDeploymentInfo depInfo; + @Order(2) + GridDeploymentInfoBean depInfo; /** Handler. */ private GridContinuousHandler hnd; + /** Serialized handler. */ + @Order(3) + byte[] hndBytes; + /** Buffer size. */ - private int bufSize; + @Order(4) + int bufSize; /** Time interval. */ - private long interval; + @Order(5) + long interval; /** Automatic unsubscribe flag. */ - private boolean autoUnsubscribe; + @Order(6) + boolean autoUnsubscribe; - /** - * Required by {@link java.io.Externalizable}. - */ - public StartRequestData() { - // No-op. - } + /** Keep binary flag. */ + @Order(7) + boolean keepBinary; + + /** */ + public StartRequestData() {} /** - * @param prjPred Serialized projection predicate. + * @param nodeFilter Node filter. * @param hnd Handler. * @param bufSize Buffer size. * @param interval Time interval. * @param autoUnsubscribe Automatic unsubscribe flag. */ - StartRequestData(@Nullable IgnitePredicate prjPred, GridContinuousHandler hnd, - int bufSize, long interval, boolean autoUnsubscribe) { + public StartRequestData( + IgnitePredicate nodeFilter, + GridContinuousHandler hnd, + int bufSize, + long interval, + boolean autoUnsubscribe, + boolean keepBinary) { assert hnd != null; assert bufSize > 0; assert interval >= 0; - this.prjPred = prjPred; + this.nodeFilter = nodeFilter; this.hnd = hnd; this.bufSize = bufSize; this.interval = interval; this.autoUnsubscribe = autoUnsubscribe; + this.keepBinary = keepBinary; } /** - * @param marsh Marshaller. - * @throws org.apache.ignite.IgniteCheckedException In case of error. + * @return Node filter. */ - void p2pMarshal(Marshaller marsh) throws IgniteCheckedException { - assert marsh != null; - - prjPredBytes = U.marshal(marsh, prjPred); - } - - /** - * @param marsh Marshaller. - * @param ldr Class loader. - * @throws org.apache.ignite.IgniteCheckedException In case of error. - */ - void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { - assert marsh != null; - - assert prjPred == null; - assert prjPredBytes != null; - - prjPred = U.unmarshal(marsh, prjPredBytes, ldr); - } - - /** - * @return Projection predicate. - */ - public IgnitePredicate projectionPredicate() { - return prjPred; - } - - /** - * @param prjPred New projection predicate. - */ - public void projectionPredicate(IgnitePredicate prjPred) { - this.prjPred = prjPred; - } - - /** - * @return Serialized projection predicate. - */ - public byte[] projectionPredicateBytes() { - return prjPredBytes; - } - - /** - * @param prjPredBytes New serialized projection predicate. - */ - public void projectionPredicateBytes(byte[] prjPredBytes) { - this.prjPredBytes = prjPredBytes; - } - - /** - * @return Deployment class name. - */ - public String className() { - return clsName; + public IgnitePredicate nodeFilter() { + return nodeFilter; } /** @@ -154,17 +116,10 @@ public void className(String clsName) { this.clsName = clsName; } - /** - * @return Deployment info. - */ - public GridDeploymentInfo deploymentInfo() { - return depInfo; - } - /** * @param depInfo New deployment info. */ - public void deploymentInfo(GridDeploymentInfo depInfo) { + public void deploymentInfo(GridDeploymentInfoBean depInfo) { this.depInfo = depInfo; } @@ -175,13 +130,6 @@ public GridContinuousHandler handler() { return hnd; } - /** - * @param hnd New handler. - */ - public void handler(GridContinuousHandler hnd) { - this.hnd = hnd; - } - /** * @return Buffer size. */ @@ -189,13 +137,6 @@ public int bufferSize() { return bufSize; } - /** - * @param bufSize New buffer size. - */ - public void bufferSize(int bufSize) { - this.bufSize = bufSize; - } - /** * @return Time interval. */ @@ -225,45 +166,61 @@ public void autoUnsubscribe(boolean autoUnsubscribe) { } /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - boolean b = prjPredBytes != null; - - out.writeBoolean(b); + @Override public String toString() { + return S.toString(StartRequestData.class, this); + } - if (b) { - U.writeByteArray(out, prjPredBytes); - U.writeString(out, clsName); - out.writeObject(depInfo); + /** */ + public void prepareMarshal(GridKernalContext ctx) throws IgniteCheckedException { + if (hnd != null) { + if (ctx.config().isPeerClassLoadingEnabled()) { + // Handle peer deployment for other handler-specific objects. + hnd.p2pMarshal(ctx); + } + + hndBytes = U.marshal(ctx.marshaller(), hnd); } - else - out.writeObject(prjPred); - out.writeObject(hnd); - out.writeInt(bufSize); - out.writeLong(interval); - out.writeBoolean(autoUnsubscribe); + if (nodeFilter != null) + nodeFilterBytes = U.marshal(ctx.marshaller(), nodeFilter); } - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - boolean b = in.readBoolean(); - - if (b) { - prjPredBytes = U.readByteArray(in); - clsName = U.readString(in); - depInfo = (GridDeploymentInfo)in.readObject(); + /** */ + public void finishUnmarshal(GridKernalContext ctx, UUID sndId) throws IgniteCheckedException { + if (ctx.config().isPeerClassLoadingEnabled() && clsName != null) { + GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), + clsName, + clsName, + depInfo.userVersion(), + sndId, + depInfo.classLoaderId(), + depInfo.participants(), + null); + + if (dep == null) + throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); + + nodeFilter = U.unmarshal(ctx.marshaller(), + nodeFilterBytes, + U.resolveClassLoader(dep.classLoader(), ctx.config())); + } + else { + nodeFilter = U.unmarshal(ctx.marshaller(), + nodeFilterBytes, + U.resolveClassLoader(ctx.config())); } - else - prjPred = (IgnitePredicate)in.readObject(); - hnd = (GridContinuousHandler)in.readObject(); - bufSize = in.readInt(); - interval = in.readLong(); - autoUnsubscribe = in.readBoolean(); - } + if (hndBytes != null) { + hnd = U.unmarshal(ctx.marshaller(), hndBytes, U.resolveClassLoader(ctx.config())); - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StartRequestData.class, this); + if (ctx.config().isPeerClassLoadingEnabled()) + hnd.p2pUnmarshal(sndId, ctx); + + if (keepBinary) { + assert hnd instanceof CacheContinuousQueryHandler : hnd; + + ((CacheContinuousQueryHandler)hnd).keepBinary(true); + } + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java deleted file mode 100644 index 50e197143b76d..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.continuous; - -import java.io.Serializable; -import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Start request data. - */ -class StartRequestDataV2 implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Serialized node filter. */ - private byte[] nodeFilterBytes; - - /** Deployment class name. */ - private String clsName; - - /** Deployment info. */ - private GridDeploymentInfo depInfo; - - /** Serialized handler. */ - private byte[] hndBytes; - - /** Buffer size. */ - private int bufSize; - - /** Time interval. */ - private long interval; - - /** Automatic unsubscribe flag. */ - private boolean autoUnsubscribe; - - /** - * @param nodeFilterBytes Serialized node filter. - * @param hndBytes Serialized handler. - * @param bufSize Buffer size. - * @param interval Time interval. - * @param autoUnsubscribe Automatic unsubscribe flag. - */ - StartRequestDataV2( - byte[] nodeFilterBytes, - byte[] hndBytes, - int bufSize, - long interval, - boolean autoUnsubscribe) { - assert hndBytes != null; - assert bufSize > 0; - assert interval >= 0; - - this.nodeFilterBytes = nodeFilterBytes; - this.hndBytes = hndBytes; - this.bufSize = bufSize; - this.interval = interval; - this.autoUnsubscribe = autoUnsubscribe; - } - - /** - * @return Serialized node filter. - */ - public byte[] nodeFilterBytes() { - return nodeFilterBytes; - } - - /** - * @return Deployment class name. - */ - public String className() { - return clsName; - } - - /** - * @param clsName New deployment class name. - */ - public void className(String clsName) { - this.clsName = clsName; - } - - /** - * @return Deployment info. - */ - public GridDeploymentInfo deploymentInfo() { - return depInfo; - } - - /** - * @param depInfo New deployment info. - */ - public void deploymentInfo(GridDeploymentInfo depInfo) { - this.depInfo = depInfo; - } - - /** - * @return Handler. - */ - public byte[] handlerBytes() { - return hndBytes; - } - - /** - * @return Buffer size. - */ - public int bufferSize() { - return bufSize; - } - - /** - * @param bufSize New buffer size. - */ - public void bufferSize(int bufSize) { - this.bufSize = bufSize; - } - - /** - * @return Time interval. - */ - public long interval() { - return interval; - } - - /** - * @param interval New time interval. - */ - public void interval(long interval) { - this.interval = interval; - } - - /** - * @return Automatic unsubscribe flag. - */ - public boolean autoUnsubscribe() { - return autoUnsubscribe; - } - - /** - * @param autoUnsubscribe New automatic unsubscribe flag. - */ - public void autoUnsubscribe(boolean autoUnsubscribe) { - this.autoUnsubscribe = autoUnsubscribe; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StartRequestDataV2.class, this); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java index 4063e05c61820..ab19f9bc61538 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java @@ -20,10 +20,11 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; @@ -35,15 +36,18 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { private static final long serialVersionUID = 0L; /** */ - private final Map errs; + @Order(0) + Map errs; /** */ @GridToStringExclude - private final Map> updateCntrs; + @Order(1) + Map updateCntrs; /** */ @GridToStringExclude - private final Map>> updateCntrsPerNode; + @Order(2) + Map> updateCntrsPerNode; /** * @param routineId Routine id. @@ -52,16 +56,19 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { * @param cntrsPerNode Partition counters per node. */ public StartRoutineAckDiscoveryMessage(UUID routineId, - Map errs, - Map> cntrs, - Map>> cntrsPerNode) { + Map errs, + Map cntrs, + Map> cntrsPerNode) { super(routineId); this.errs = new HashMap<>(errs); - this.updateCntrs = cntrs; - this.updateCntrsPerNode = cntrsPerNode; + updateCntrs = cntrs; + updateCntrsPerNode = cntrsPerNode; } + /** */ + public StartRoutineAckDiscoveryMessage() {} + /** {@inheritDoc} */ @Nullable @Override public DiscoveryCustomMessage ackMessage() { return null; @@ -70,22 +77,22 @@ public StartRoutineAckDiscoveryMessage(UUID routineId, /** * @return Update counters for partitions. */ - public Map> updateCounters() { + public Map updateCounters() { return updateCntrs; } /** * @return Update counters for partitions per each node. */ - public Map>> updateCountersPerNode() { + public Map> updateCountersPerNode() { return updateCntrsPerNode; } /** * @return Errs. */ - public Map errs() { - return errs; + public Map errors() { + return F.viewReadOnly(errs, m -> ErrorMessage.error(m)); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index 4bca717bd4dd3..cf2f639aa8dd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -17,18 +17,14 @@ package org.apache.ignite.internal.processors.continuous; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; /** * Discovery message used for Continuous Query registration. @@ -38,35 +34,34 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { private static final long serialVersionUID = 0L; /** */ - private final StartRequestData startReqData; + @Order(0) + StartRequestData startReqData; /** */ - // Initilized here as well to preserve compatibility with previous versions - private Map errs = new HashMap<>(); + @Order(1) + Map errs = new HashMap<>(); /** */ - private Map> updateCntrs; + @Order(2) + Map updateCntrs; /** */ - private Map>> updateCntrsPerNode; - - /** Keep binary flag. */ - private boolean keepBinary; - - /** */ - private transient ClassNotFoundException deserEx; + @Order(3) + Map> updateCntrsPerNode; /** * @param routineId Routine id. * @param startReqData Start request data. */ - public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData, boolean keepBinary) { + public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) { super(routineId); this.startReqData = startReqData; - this.keepBinary = keepBinary; } + /** */ + public StartRoutineDiscoveryMessage() {} + /** * @return Start request data. */ @@ -82,21 +77,21 @@ public void addError(UUID nodeId, IgniteCheckedException e) { if (errs == null) errs = new HashMap<>(); - errs.put(nodeId, e); + errs.put(nodeId, new ErrorMessage(e)); } /** * @param cntrs Update counters. */ - private void addUpdateCounters(Map> cntrs) { + private void addUpdateCounters(Map cntrs) { if (updateCntrs == null) updateCntrs = new HashMap<>(); - for (Map.Entry> e : cntrs.entrySet()) { - T2 cntr0 = updateCntrs.get(e.getKey()); - T2 cntr1 = e.getValue(); + for (Map.Entry e : cntrs.entrySet()) { + Long cntr0 = updateCntrs.get(e.getKey()); + Long cntr1 = e.getValue(); - if (cntr0 == null || cntr1.get2() > cntr0.get2()) + if (cntr0 == null || cntr1 > cntr0) updateCntrs.put(e.getKey(), cntr1); } } @@ -105,31 +100,17 @@ private void addUpdateCounters(Map> cntrs) { * @param nodeId Local node ID. * @param cntrs Update counters. */ - public void addUpdateCounters(UUID nodeId, Map> cntrs) { + public void addUpdateCounters(UUID nodeId, Map cntrs) { addUpdateCounters(cntrs); if (updateCntrsPerNode == null) updateCntrsPerNode = new HashMap<>(); - Map> old = updateCntrsPerNode.put(nodeId, cntrs); + Map old = updateCntrsPerNode.put(nodeId, cntrs); assert old == null : old; } - /** - * @return Errs. - */ - public Map errs() { - return errs != null ? errs : Collections.emptyMap(); - } - - /** - * @return {@code True} if keep binary flag was set on continuous handler. - */ - public boolean keepBinary() { - return keepBinary; - } - /** {@inheritDoc} */ @Override public boolean isMutable() { return true; @@ -137,28 +118,7 @@ public boolean keepBinary() { /** {@inheritDoc} */ @Override public DiscoveryCustomMessage ackMessage() { - return new StartRoutineAckDiscoveryMessage(routineId, errs(), updateCntrs, updateCntrsPerNode); - } - - /** */ - private void readObject(ObjectInputStream in) throws IOException { - // Override default serialization in order to tolerate missing classes exceptions (e.g. remote filter class). - // We need this means because CQ registration process assumes that an "ack message" will be sent. - try { - in.defaultReadObject(); - } - catch (ClassNotFoundException e) { - deserEx = e; - - throw new IncompleteDeserializationException(this); - } - } - - /** - * @return Exception occurred during deserialization. - */ - @Nullable public ClassNotFoundException deserializationException() { - return deserEx; + return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java index 275765da73050..856f1b533eb56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.continuous; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; @@ -29,41 +30,29 @@ public class StartRoutineDiscoveryMessageV2 extends AbstractContinuousMessage { private static final long serialVersionUID = 0L; /** */ - private static final int KEEP_BINARY_FLAG = 0x01; + @Order(0) + StartRequestData startReqData; /** */ - private final StartRequestDataV2 startReqData; - - /** Flags. */ - private int flags; + public StartRoutineDiscoveryMessageV2() {} /** * @param routineId Routine id. * @param startReqData Start request data. - * @param keepBinary Keep binary flag. */ - StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestDataV2 startReqData, boolean keepBinary) { + StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestData startReqData) { super(routineId); this.startReqData = startReqData; - - if (keepBinary) - flags |= KEEP_BINARY_FLAG; } /** * @return Start request data. */ - public StartRequestDataV2 startRequestData() { + public StartRequestData startRequestData() { return startReqData; } - /** - * @return {@code True} if keep binary flag was set on continuous handler. - */ - public boolean keepBinary() { - return (flags & KEEP_BINARY_FLAG) != 0; - } /** {@inheritDoc} */ @Override public DiscoveryCustomMessage ackMessage() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java index 4710f493609a8..26263c4d62996 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java @@ -20,13 +20,12 @@ import java.util.UUID; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** * */ -public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage implements Message { +public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage { /** */ private static final long serialVersionUID = 0L; @@ -51,5 +50,4 @@ public StopRoutineAckDiscoveryMessage(UUID routineId) { @Override public String toString() { return S.toString(StopRoutineAckDiscoveryMessage.class, this, "routineId", routineId()); } - } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java index 39878b81514d8..45589a86a42c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java @@ -20,13 +20,12 @@ import java.util.UUID; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** * */ -public class StopRoutineDiscoveryMessage extends AbstractContinuousMessage implements Message { +public class StopRoutineDiscoveryMessage extends AbstractContinuousMessage { /** */ private static final long serialVersionUID = 0L; @@ -51,5 +50,4 @@ public StopRoutineDiscoveryMessage(UUID routineId) { @Override public String toString() { return S.toString(StopRoutineDiscoveryMessage.class, this, "routineId", routineId()); } - } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index 381a80995c3ef..4d37c8c145365 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -22,7 +22,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; -import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; @@ -128,20 +127,8 @@ public DiscoverySpiCustomMessage message() { if (serMsg != null) msg = (DiscoverySpiCustomMessage)serMsg; else { - try { - if (msgBytes != null) - msg = U.unmarshal(marsh, msgBytes, ldr); - } - catch (IgniteCheckedException e) { - // Try to resurrect a message in a case of deserialization failure - if (e.getCause() instanceof IncompleteDeserializationException) { - msg = ((IncompleteDeserializationException)e.getCause()).message(); - - return; - } - - throw e; - } + if (msgBytes != null) + msg = U.unmarshal(marsh, msgBytes, ldr); } } diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 5268663d0da9d..955e96644f5bf 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -728,7 +728,6 @@ org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$1 org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$3$1 org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$6 org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$7 -org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper org.apache.ignite.internal.managers.encryption.CacheGroupEncryptionKeys$TrackedWalSegment org.apache.ignite.internal.managers.encryption.CacheGroupPageScanner$1 @@ -1523,7 +1522,6 @@ org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$Discove org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryDataItem org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$LocalRoutineInfo org.apache.ignite.internal.processors.continuous.StartRequestData -org.apache.ignite.internal.processors.continuous.StartRequestDataV2 org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2 diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationExceptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationExceptionTest.java deleted file mode 100644 index e79af4f287aa4..0000000000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationExceptionTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.discovery; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.UUID; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; -import org.junit.Test; - -/** */ -public class IncompleteDeserializationExceptionTest extends GridCommonAbstractTest { - /** */ - private Path tmpDir; - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - tmpDir = Files.createTempDirectory(UUID.randomUUID().toString()); - - Files.createDirectories(tmpDir); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - Files.delete(tmpDir); - - super.afterTest(); - } - - /** */ - @Test - public void testMissingClassDeserialization() throws Exception { - try (ObjectInputStream in = new ObjectInputStream(getClass().getResourceAsStream("Wrapper.ser"))) { - in.readObject(); - - fail("Exception is expected"); - } - catch (IncompleteDeserializationException e) { - Wrapper wrp = (Wrapper)e.message(); - - assertNotNull(wrp); - assertEquals(42, wrp.i); - assertNull(wrp.o); - } - } - - /** */ - public static class Wrapper implements DiscoveryCustomMessage { - /** */ - private static final long serialVersionUID = 0; - - /** */ - private final int i; - - /** */ - private final Object o; - - /** */ - private Wrapper(int i, Object o) { - this.i = i; - this.o = o; - } - - /** */ - private void readObject(ObjectInputStream in) throws IOException { - try { - in.defaultReadObject(); - } - catch (ClassNotFoundException e) { - throw new IncompleteDeserializationException(this); - } - } - - /** {@inheritDoc} */ - @Override public IgniteUuid id() { - return null; - } - - /** {@inheritDoc} */ - @Override public @Nullable DiscoveryCustomMessage ackMessage() { - return null; - } - } - - // Commented lines were used to prepare serialized object -// public static void main(String[] args) throws IOException { -// try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream("Wrapper.ser"))) { -// out.writeObject(new Wrapper(42, new ForeignClass())); -// } -// } -// -// public static class ForeignClass implements Serializable { -// } -} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java index eac3c20f8e577..cdff17bb2f953 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java @@ -37,7 +37,6 @@ import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesTest; import org.apache.ignite.internal.managers.IgniteDiagnosticPartitionReleaseFutureLimitTest; import org.apache.ignite.internal.managers.communication.GridIoManagerFileTransmissionSelfTest; -import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationExceptionTest; import org.apache.ignite.internal.metric.MetricConfigurationTest; import org.apache.ignite.internal.metric.MetricsClusterActivationTest; import org.apache.ignite.internal.metric.PeriodicHistogramMetricImplTest; @@ -205,8 +204,6 @@ ClassPathContentLoggingTest.class, - IncompleteDeserializationExceptionTest.class, - GridIoManagerFileTransmissionSelfTest.class, IgniteStandardMXBeanTest.class,