diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java index 614418b2e369f..ee48f1abdc076 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java @@ -58,11 +58,6 @@ default boolean readFrom(ByteBuffer buf, MessageReader reader) { throw new UnsupportedOperationException(); } - /** - * Gets message type. - * - * @return Message type. - */ /** * Gets message type. * @@ -72,9 +67,8 @@ default short directType() { var clazz = getClass(); Short type = REGISTRATIONS.get(clazz); - if (type == null) { + if (type == null) throw new IgniteException("No registration for class " + clazz.getSimpleName()); - } return type; } @@ -91,8 +85,7 @@ default void registerAsDirectType(short directType) { var clazz = getClass(); var type = REGISTRATIONS.putIfAbsent(clazz, directType); - if ((type != null) && (type != directType)) { + if ((type != null) && (type != directType)) throw new IgniteException(clazz.getSimpleName() + " is already registered for direct type " + type); - } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java index 84cc53e433560..e8cb7db65409c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java @@ -31,5 +31,4 @@ public class HandshakeWaitMessage implements Message { @Override public String toString() { return S.toString(HandshakeWaitMessage.class, this); } - } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java index c7d11d53ae7ac..c3cbdae64e9c7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java @@ -23,18 +23,17 @@ import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.MessagesPluginProvider; import org.apache.ignite.spi.discovery.tcp.BlockTcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.DummyCustomDiscoveryMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.LogListener; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; import org.junit.Test; import static org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS; @@ -58,6 +57,8 @@ public class OutboundIoMessageQueueSizeTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(new BlockTcpDiscoverySpi().setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder())); cfg.setGridLogger(log); + cfg.setPluginProviders(new MessagesPluginProvider(DummyCustomDiscoveryMessage.class)); + return cfg; } @@ -124,7 +125,7 @@ public void testDiscoveryMsgQueue() throws Exception { metric.reset(); // Reset value accumulated before discovery SPI startup. - srv0.context().discovery().sendCustomEvent(new DummyCustomDiscoveryMessage(IgniteUuid.randomUuid())); + srv0.context().discovery().sendCustomEvent(new DummyCustomDiscoveryMessage()); // Assume our message can be added to queue concurrently with other messages // (for example, with metrics update message). @@ -142,7 +143,7 @@ public void testDiscoveryMsgQueue() throws Exception { try { for (int i = 0; i <= MSG_LIMIT; i++) - srv0.context().discovery().sendCustomEvent(new DummyCustomDiscoveryMessage(IgniteUuid.randomUuid())); + srv0.context().discovery().sendCustomEvent(new DummyCustomDiscoveryMessage()); assertTrue(metric.value() >= MSG_LIMIT); } @@ -150,27 +151,4 @@ public void testDiscoveryMsgQueue() throws Exception { latch.countDown(); } } - - /** */ - private static class DummyCustomDiscoveryMessage implements DiscoveryCustomMessage { - /** */ - private final IgniteUuid id; - - /** - * @param id Message id. - */ - DummyCustomDiscoveryMessage(IgniteUuid id) { - this.id = id; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid id() { - return id; - } - - /** {@inheritDoc} */ - @Nullable @Override public DiscoveryCustomMessage ackMessage() { - return null; - } - } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java index 8b8e91e3fbaa7..19b0a4a3f750c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java @@ -99,6 +99,7 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -911,7 +912,7 @@ protected static BlockingCustomMessageDiscoverySpi discoSpi(IgniteEx ignite) { } /** */ - protected static class BlockingCustomMessageDiscoverySpi extends TcpDiscoverySpi { + protected static class BlockingCustomMessageDiscoverySpi extends TestTcpDiscoverySpi { /** List of messages which have been blocked. */ private final List blocked = new CopyOnWriteArrayList<>(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java index cd54f29839d47..7b679de89ad4f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java @@ -36,8 +36,7 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree; -import org.apache.ignite.internal.util.distributed.MessagesPluginProvider; -import org.apache.ignite.internal.util.distributed.MessagesPluginProvider.MessagesInjectedTcpDiscoverySpi; +import org.apache.ignite.internal.util.distributed.TestIntegerMessage; import org.apache.ignite.internal.util.distributed.TestUuidMessage; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; @@ -48,6 +47,7 @@ import org.apache.ignite.plugin.PluginContext; import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.MessagesPluginProvider; import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.Nullable; import org.junit.Test; @@ -76,8 +76,10 @@ public class IgniteClusterSnapshotHandlerTest extends IgniteClusterSnapshotResto /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { return super.getConfiguration(igniteInstanceName) - .setPluginProviders(pluginProvider, new MessagesPluginProvider()) - .setDiscoverySpi(new MessagesInjectedTcpDiscoverySpi()); + .setPluginProviders( + pluginProvider, + new MessagesPluginProvider(TestIntegerMessage.class, TestUuidMessage.class) + ); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java index 43cb4a6dcae25..a900dc423b204 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java @@ -34,10 +34,9 @@ import org.apache.ignite.failure.StopNodeOrHaltFailureHandler; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.events.DiscoveryCustomEvent; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.MessagesPluginProvider; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -87,6 +86,8 @@ public class NodeSecurityContextPropagationTest extends GridCommonAbstractTest { .setIpFinder(new TcpDiscoveryVmIpFinder() .setAddresses(Collections.singleton("127.0.0.1:47500"))); + cfg.setPluginProviders(new MessagesPluginProvider(TestDiscoveryMessage.class, TestDiscoveryAcknowledgeMessage.class)); + return cfg; } @@ -218,30 +219,6 @@ private Object discoveryRingMessageWorker(IgniteEx ignite) { return U.field(impl, "msgWorker"); } - /** */ - public static class TestDiscoveryMessage extends AbstractTestDiscoveryMessage { - /** {@inheritDoc} */ - @Override public @Nullable DiscoveryCustomMessage ackMessage() { - return new TestDiscoveryAcknowledgeMessage(); - } - } - - /** */ - public static class TestDiscoveryAcknowledgeMessage extends AbstractTestDiscoveryMessage { } - - /** */ - public abstract static class AbstractTestDiscoveryMessage implements DiscoveryCustomMessage { - /** {@inheritDoc} */ - @Override public IgniteUuid id() { - return IgniteUuid.randomUuid(); - } - - /** {@inheritDoc} */ - @Override public @Nullable DiscoveryCustomMessage ackMessage() { - return null; - } - } - /** */ public static class BlockingDequeWrapper implements BlockingDeque { /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/TestDiscoveryAcknowledgeMessage.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/TestDiscoveryAcknowledgeMessage.java new file mode 100644 index 0000000000000..7c8a5f02a042c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/TestDiscoveryAcknowledgeMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.security; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.jetbrains.annotations.Nullable; + +/** */ +public class TestDiscoveryAcknowledgeMessage implements DiscoveryCustomMessage, Message { + /** */ + @Order(0) + IgniteUuid id = IgniteUuid.randomUuid(); + + /** Constructor for {@link MessageFactory}. */ + public TestDiscoveryAcknowledgeMessage() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Override public @Nullable DiscoveryCustomMessage ackMessage() { + return null; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/TestDiscoveryMessage.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/TestDiscoveryMessage.java new file mode 100644 index 0000000000000..b80a6a01e656d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/TestDiscoveryMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.security; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.jetbrains.annotations.Nullable; + +/** */ +public class TestDiscoveryMessage implements DiscoveryCustomMessage, Message { + /** */ + @Order(0) + IgniteUuid id = IgniteUuid.randomUuid(); + + /** Constructor for {@link MessageFactory}. */ + public TestDiscoveryMessage() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Override public @Nullable DiscoveryCustomMessage ackMessage() { + return new TestDiscoveryAcknowledgeMessage(); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java index 5fc16e6534baa..bfbc4128accc0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java @@ -28,10 +28,10 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.util.distributed.MessagesPluginProvider.MessagesInjectedTcpDiscoverySpi; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.MessagesPluginProvider; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -70,9 +70,7 @@ public class DistributedProcessClientAwaitTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(instanceName); cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); - - cfg.setPluginProviders(new MessagesPluginProvider()); - cfg.setDiscoverySpi(new MessagesInjectedTcpDiscoverySpi()); + cfg.setPluginProviders(new MessagesPluginProvider(TestIntegerMessage.class, TestUuidMessage.class)); return cfg; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java index e907d130bc0ac..19d3c1621cbee 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java @@ -28,8 +28,8 @@ import org.apache.ignite.failure.FailureHandler; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.distributed.MessagesPluginProvider.MessagesInjectedTcpDiscoverySpi; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.spi.MessagesPluginProvider; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -86,8 +86,7 @@ public class DistributedProcessCoordinatorLeftTest extends GridCommonAbstractTes } }); - cfg.setPluginProviders(new MessagesPluginProvider()); - cfg.setDiscoverySpi(new MessagesInjectedTcpDiscoverySpi()); + cfg.setPluginProviders(new MessagesPluginProvider(TestIntegerMessage.class, TestUuidMessage.class)); return cfg; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/MessagesPluginProvider.java b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/MessagesPluginProvider.java deleted file mode 100644 index 7e5b53492dc4e..0000000000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/MessagesPluginProvider.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.distributed; - -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; -import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.AbstractTestPluginProvider; -import org.apache.ignite.plugin.ExtensionRegistry; -import org.apache.ignite.plugin.PluginContext; -import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.testframework.GridTestUtils; - -import static org.apache.ignite.marshaller.Marshallers.jdk; -import static org.junit.Assert.assertTrue; - -/** */ -public class MessagesPluginProvider extends AbstractTestPluginProvider { - /** */ - private static final MessageFactoryProvider FACTORY_PROVIDER = f -> { - f.register(10_000, TestIntegerMessage::new, new TestIntegerMessageSerializer()); - f.register(10_001, TestUuidMessage::new, new TestUuidMessageSerializer()); - }; - - /** {@inheritDoc} */ - @Override public String name() { - return "Distributed process test messages plugin"; - } - - /** {@inheritDoc} */ - @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { - registry.registerExtension(MessageFactoryProvider.class, FACTORY_PROVIDER); - - // Register messages into the DiscoverySpi. - assertTrue(ctx.igniteConfiguration().getDiscoverySpi() instanceof MessagesInjectedTcpDiscoverySpi); - } - - /** */ - public static class MessagesInjectedTcpDiscoverySpi extends TcpDiscoverySpi { - /** {@inheritDoc} */ - @Override protected void initLocalNode(int srvPort, boolean addExtAddrAttr) { - GridTestUtils.setFieldValue(this, TcpDiscoverySpi.class, "msgFactory", new IgniteMessageFactoryImpl( - new MessageFactoryProvider[] { new DiscoveryMessageFactory(jdk(), U.gridClassLoader()), FACTORY_PROVIDER})); - - super.initLocalNode(srvPort, addExtAddrAttr); - } - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java index 6f148b3c98ff0..1fd753d6bb703 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java @@ -41,5 +41,4 @@ public TestIntegerMessage(int val) { public int value() { return val; } - } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java b/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java new file mode 100644 index 0000000000000..5134383656e71 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi; + +import java.util.function.Supplier; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; + +/** + * Plugin provider for registering test messages in the communication and discovery protocols. + */ +public class MessagesPluginProvider extends AbstractTestPluginProvider { + /** */ + private final MessageFactoryProvider msgFactoryProvider; + + /** */ + @SafeVarargs + public MessagesPluginProvider(Class... msgs) { + msgFactoryProvider = f -> { + short directType = 10_000; + + for (Class msg : msgs) { + Supplier msgSupp = () -> { + try { + return U.newInstance(msg); + } + catch (IgniteCheckedException e) { + throw new RuntimeException(e); + } + }; + + f.register(directType, msgSupp, loadSerializer(msg)); + + directType++; + } + }; + } + + /** {@inheritDoc} */ + @Override public String name() { + return "Test messages plugin"; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { + // Register messages into the communication protocol. + registry.registerExtension(MessageFactoryProvider.class, msgFactoryProvider); + } + + /** {@inheritDoc} */ + @Override public void start(PluginContext ctx) throws IgniteCheckedException { + // Register messages into the discovery protocol. + TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ctx.igniteConfiguration().getDiscoverySpi(); + + discoSpi.messageFactory(msgFactoryProvider); + } + + /** */ + private MessageSerializer loadSerializer(Class msgCls) { + try { + Class serCls = U.gridClassLoader() + .loadClass(msgCls.getPackage().getName() + "." + msgCls.getSimpleName() + "Serializer"); + + return (MessageSerializer)U.newInstance(serCls); + } + catch (Exception e) { + throw new RuntimeException("Unable to find serializer for message: " + msgCls, e); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java index a4025d614a152..c222840f15c5e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java @@ -21,13 +21,11 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.discovery.CustomEventListener; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.MessagesPluginProvider; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; import org.junit.Test; /** @@ -118,24 +116,21 @@ private IgniteConfiguration configuration(int nodeIdx) throws Exception { cfg.setDiscoverySpi(testSpi); + cfg.setPluginProviders(new MessagesPluginProvider(MessageForServer.class)); + return cfg; } - /** - * - */ - private class TestDiscoverySpi extends TcpDiscoverySpi { + /** */ + private class TestDiscoverySpi extends TestTcpDiscoverySpi { /** Test exchange. */ private TestDiscoveryDataExchange testEx = new TestDiscoveryDataExchange(); - /** - * - */ + /** */ public TestDiscoverySpi() { exchange = testEx; } - /** {@inheritDoc} */ @Override public void setDataExchange(DiscoverySpiDataExchange exchange) { testEx.setExchange(exchange); @@ -171,25 +166,4 @@ public void setExchange(DiscoverySpiDataExchange ex) { this.ex = ex; } } - - /** - * - */ - private static class MessageForServer implements DiscoveryServerOnlyCustomMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final IgniteUuid id = IgniteUuid.randomUuid(); - - /** {@inheritDoc} */ - @Override public IgniteUuid id() { - return id; - } - - /** {@inheritDoc} */ - @Nullable @Override public DiscoveryCustomMessage ackMessage() { - return null; - } - } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/MessageForServer.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/MessageForServer.java new file mode 100644 index 0000000000000..2495290d16560 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/MessageForServer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.jetbrains.annotations.Nullable; + +/** */ +public class MessageForServer implements DiscoveryServerOnlyCustomMessage, Message { + /** */ + @Order(0) + IgniteUuid id = IgniteUuid.randomUuid(); + + /** Constructor for {@link MessageFactory}. */ + public MessageForServer() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DummyCustomDiscoveryMessage.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DummyCustomDiscoveryMessage.java new file mode 100644 index 0000000000000..8eeae66fac4e5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DummyCustomDiscoveryMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** */ +public class DummyCustomDiscoveryMessage implements DiscoveryCustomMessage, Message { + /** */ + @Order(0) + IgniteUuid id = IgniteUuid.randomUuid(); + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public DummyCustomDiscoveryMessage() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java index 30f7ad478f47c..ff37efc0b669c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java @@ -24,13 +24,11 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.MessagesPluginProvider; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; import org.junit.Test; /** @@ -67,11 +65,13 @@ else if (igniteInstanceName.startsWith("listener")) else if (igniteInstanceName.startsWith("receiver")) disco = new DyingThreadDiscoverySpi(); else - disco = new TcpDiscoverySpi(); + disco = new TestTcpDiscoverySpi(); disco.setIpFinder(sharedStaticIpFinder); cfg.setDiscoverySpi(disco); + cfg.setPluginProviders(new MessagesPluginProvider(DummyCustomDiscoveryMessage.class)); + return cfg; } @@ -103,7 +103,7 @@ public void testPendingMessagesOverflow() throws Exception { receivedEnsuredMsgs.clear(); // Initial custom message will travel across the ring and will be discarded. - sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid()); + sendDummyCustomMessage(coordDisco); assertTrue("Sent: " + sentEnsuredMsgs + "; received: " + receivedEnsuredMsgs, GridTestUtils.waitForCondition(() -> { @@ -120,7 +120,7 @@ public void testPendingMessagesOverflow() throws Exception { int msgsNum = 2000; for (int i = 0; i < msgsNum; i++) - sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid()); + sendDummyCustomMessage(coordDisco); mediator.close(); victim.close(); @@ -149,7 +149,7 @@ public void testCustomMessageInSingletonCluster() throws Exception { }); // Custom message on a singleton cluster shouldn't break consistency of PendingMessages. - sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid()); + sendDummyCustomMessage(coordDisco); // Victim doesn't send acknowledges, so we need an intermediate node to accept messages, // so the coordinator could mark them as pending. @@ -172,7 +172,7 @@ public void testCustomMessageInSingletonCluster() throws Exception { int msgsNum = 100; for (int i = 0; i < msgsNum; i++) - sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid()); + sendDummyCustomMessage(coordDisco); mediator.close(); victim.close(); @@ -229,18 +229,15 @@ public void testDeliveryAllFailedMessagesInCorrectOrder() throws Exception { assertTrue("Sent: " + sentEnsuredMsgs + "; received: " + receivedEnsuredMsgs, delivered); } - /** - * @param disco Discovery SPI. - * @param id Message id. - */ - private void sendDummyCustomMessage(TcpDiscoverySpi disco, IgniteUuid id) { - disco.sendCustomEvent(new DummyCustomDiscoveryMessage(id)); + /** @param disco Discovery SPI. */ + private void sendDummyCustomMessage(TcpDiscoverySpi disco) { + disco.sendCustomEvent(new DummyCustomDiscoveryMessage()); } /** * Discovery SPI, that makes a thread to die when {@code blockMsgs} is set to {@code true}. */ - private class DyingThreadDiscoverySpi extends TcpDiscoverySpi { + private class DyingThreadDiscoverySpi extends TestTcpDiscoverySpi { /** {@inheritDoc} */ @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { if (blockMsgs) @@ -251,7 +248,7 @@ private class DyingThreadDiscoverySpi extends TcpDiscoverySpi { /** * Discovery SPI, that makes a node stop sending messages when {@code blockMsgs} is set to {@code true}. */ - private class DyingDiscoverySpi extends TcpDiscoverySpi { + private class DyingDiscoverySpi extends TestTcpDiscoverySpi { /** {@inheritDoc} */ @Override protected void writeToSocket( Socket sock, @@ -285,36 +282,11 @@ private class DyingDiscoverySpi extends TcpDiscoverySpi { /** * */ - private class ListeningDiscoverySpi extends TcpDiscoverySpi { + private class ListeningDiscoverySpi extends TestTcpDiscoverySpi { /** {@inheritDoc} */ @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { if (ensured(msg)) receivedEnsuredMsgs.add(msg); } } - - /** - * - */ - private static class DummyCustomDiscoveryMessage implements DiscoveryCustomMessage { - /** */ - private final IgniteUuid id; - - /** - * @param id Message id. - */ - DummyCustomDiscoveryMessage(IgniteUuid id) { - this.id = id; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid id() { - return id; - } - - /** {@inheritDoc} */ - @Nullable @Override public DiscoveryCustomMessage ackMessage() { - return null; - } - } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java index e3038bf5dc6a7..d7bc21a637ac3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java @@ -20,16 +20,23 @@ import java.io.IOException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.marshaller.Marshallers.jdk; import static org.apache.ignite.testframework.GridTestUtils.DiscoverySpiListenerWrapper.wrap; /** @@ -45,6 +52,9 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi implements IgniteDiscov /** */ private IgniteDiscoverySpiInternalListener internalLsnr; + /** */ + private MessageFactory msgFactory; + /** {@inheritDoc} */ @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { @@ -100,4 +110,32 @@ public void discoveryHook(DiscoveryHook discoHook) { this.discoHook = discoHook; } + + /** + * Sets test discovery messages factory provider. Note that {@link DiscoveryHook} must be set before SPI start. + * Otherwise, this method call will take no effect. + * + * @param msgFactoryProvider Discovery messages factory provider. + */ + public void messageFactory(MessageFactoryProvider msgFactoryProvider) { + assert !started(); + + this.msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[] { + new DiscoveryMessageFactory(jdk(), U.resolveClassLoader(ignite().configuration())), + msgFactoryProvider + }); + } + + /** {@inheritDoc} */ + @Override protected void initLocalNode(int srvPort, boolean addExtAddrAttr) { + if (msgFactory != null) + GridTestUtils.setFieldValue(this, TcpDiscoverySpi.class, "msgFactory", msgFactory); + + super.initLocalNode(srvPort, addExtAddrAttr); + } + + /** {@inheritDoc} */ + @Override public MessageFactory messageFactory() { + return msgFactory != null ? msgFactory : super.messageFactory(); + } }