From 0565eb2de511556fcba7e51758c7ec0c89672ff0 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Mon, 6 Apr 2026 17:13:13 +0500 Subject: [PATCH] IGNITE-28259 Remove IgniteDhtPartitionHistorySuppliersMap --- .../communication/GridIoMessageFactory.java | 4 - .../GridCachePartitionExchangeManager.java | 6 +- .../GridDhtPartitionsExchangeFuture.java | 48 +++++-- .../GridDhtPartitionsFullMessage.java | 9 +- ...IgniteDhtPartitionHistorySuppliersMap.java | 119 ------------------ .../resources/META-INF/classnames.properties | 1 - .../communication/CompressedMessageTest.java | 9 +- 7 files changed, 48 insertions(+), 148 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 6eff060a04618..221eee331e473 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -189,8 +189,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPairSerializer; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMapSerializer; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMapSerializer; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessageSerializer; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; @@ -547,8 +545,6 @@ public GridIoMessageFactory(Marshaller marsh, ClassLoader clsLdr) { factory.register(506, CachePartitionFullCountersMap::new, new CachePartitionFullCountersMapSerializer()); factory.register(508, GroupPartitionIdPair::new, new GroupPartitionIdPairSerializer()); - factory.register(510, IgniteDhtPartitionHistorySuppliersMap::new, - new IgniteDhtPartitionHistorySuppliersMapSerializer()); factory.register(517, GridPartitionStateMap::new, new GridPartitionStateMapSerializer()); factory.register(518, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer()); factory.register(519, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index c06eeaad3935d..b805b01c5fb8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -87,7 +87,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask; @@ -1348,7 +1348,7 @@ private void sendAllPartitions( public GridDhtPartitionsFullMessage createPartitionsFullMessage( @Nullable final GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, - @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, + @Nullable Map> partHistSuppliers, @Nullable Map>> partsToReload ) { Collection grps = cctx.cache().cacheGroups(); @@ -1369,7 +1369,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( public GridDhtPartitionsFullMessage createPartitionsFullMessage( @Nullable final GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, - @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, + @Nullable Map> partHistSuppliers, @Nullable Map>> partsToReload, Collection grps ) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index c269af4923eb7..7243fc9fe4871 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -324,7 +324,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ @GridToStringExclude - private final IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); + private Map> partHistSuppliers = new HashMap<>(); + + /** */ + @GridToStringExclude + private final Object suppliersMux = new Object(); /** Set of nodes that cannot be used for wal rebalancing due to some reason. */ private final Set exclusionsFromHistoricalRebalance = ConcurrentHashMap.newKeySet(); @@ -587,11 +591,23 @@ public void affinityChangeMessage(CacheAffinityChangeMessage affChangeMsg) { * @return List of IDs of history supplier nodes or empty list if these doesn't exist. */ public List partitionHistorySupplier(int grpId, int partId, long cntrSince) { - List histSuppliers = partHistSuppliers.getSupplier(grpId, partId, cntrSince); + synchronized (suppliersMux) { + if (partHistSuppliers == null) + return Collections.emptyList(); + + List histSuppliers = new ArrayList<>(); + + for (Map.Entry> e : partHistSuppliers.entrySet()) { + Long historyCounter = e.getValue().get(new GroupPartitionIdPair(grpId, partId)); + + if (historyCounter != null && historyCounter <= cntrSince) + histSuppliers.add(e.getKey()); + } - histSuppliers.removeIf(exclusionsFromHistoricalRebalance::contains); + histSuppliers.removeIf(exclusionsFromHistoricalRebalance::contains); - return histSuppliers; + return histSuppliers; + } } /** @@ -2436,7 +2452,11 @@ private String exchangeTimingsLogMessage(String header, List timings) { // Create and destroy caches and cache proxies. cctx.cache().onExchangeDone(this, err); - Map locReserved = partHistSuppliers.getReservations(cctx.localNodeId()); + Map locReserved; + + synchronized (suppliersMux) { + locReserved = partHistSuppliers == null ? null : partHistSuppliers.get(cctx.localNodeId()); + } if (locReserved != null) { boolean success = cctx.database().reserveHistoryForPreloading(locReserved); @@ -3544,7 +3564,14 @@ private void findCounterForReservation( break; if (preferWalRebalance || maxOwnerCntr - ceilingMinReserved < ownerSize) { - partHistSuppliers.put(ownerId, grpId, p, ceilingMinReserved); + synchronized (suppliersMux) { + if (partHistSuppliers == null) + partHistSuppliers = new HashMap<>(); + + Map nodeMap = partHistSuppliers.computeIfAbsent(ownerId, k -> new HashMap<>()); + + nodeMap.put(new GroupPartitionIdPair(grpId, p), ceilingMinReserved); + } haveHistory.add(p); @@ -3653,7 +3680,7 @@ private void onAllReceived(@Nullable Collection sndResNodes) { assert crd.isLocal(); - assert partHistSuppliers.isEmpty() : partHistSuppliers; + assert F.isEmpty(partHistSuppliers) : partHistSuppliers; if (!exchCtx.mergeExchanges() && !crd.equals(events().discoveryCache().serverNodes().get(0))) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { @@ -4633,10 +4660,11 @@ else if (forceAffReassignment) private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPartitionsFullMessage msg) { cctx.versions().onExchange(msg.lastVersion().order()); - assert partHistSuppliers.isEmpty(); + assert F.isEmpty(partHistSuppliers); - partHistSuppliers.putAll(msg.partitionHistorySuppliers() != null ? msg.partitionHistorySuppliers() : - IgniteDhtPartitionHistorySuppliersMap.empty()); + synchronized (suppliersMux) { + partHistSuppliers = msg.partitionHistorySuppliers() == null ? null : msg.partitionHistorySuppliers(); + } // Reserve at least 2 threads for system operations. int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 84716df72f225..535ed0ac240f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -76,7 +76,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @Order(3) @Compress @GridToStringInclude - IgniteDhtPartitionHistorySuppliersMap partHistSuppliers; + Map> partHistSuppliers; /** Partitions that must be cleared and re-loaded. */ @Order(4) @@ -143,7 +143,7 @@ public GridDhtPartitionsFullMessage() { public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer, @NotNull AffinityTopologyVersion topVer, - @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, + @Nullable Map> partHistSuppliers, @Nullable Map>> partsToReload) { super(id, lastVer); @@ -326,7 +326,7 @@ public CachePartitionFullCountersMap partitionUpdateCounters(int grpId) { /** * @return Partitions history suppliers. */ - public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() { + public Map> partitionHistorySuppliers() { return partHistSuppliers; } @@ -444,9 +444,6 @@ public void topologyVersion(AffinityTopologyVersion topVer) { if (parts == null) parts = new HashMap<>(); - if (partHistSuppliers == null) - partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); - errs = errMsgs == null ? null : F.viewReadOnly(errMsgs, e -> e.error()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java deleted file mode 100644 index 77e7dcb1f061e..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.jetbrains.annotations.Nullable; - -/** - * - */ -public class IgniteDhtPartitionHistorySuppliersMap implements Message { - /** */ - private static final IgniteDhtPartitionHistorySuppliersMap EMPTY = new IgniteDhtPartitionHistorySuppliersMap(); - - /** */ - @Order(0) - Map> map; - - /** - * @return Empty map. - */ - public static IgniteDhtPartitionHistorySuppliersMap empty() { - return EMPTY; - } - - /** - * @param grpId Group ID. - * @param partId Partition ID. - * @param cntrSince Partition update counter since history supplying is requested. - * @return List of supplier UUIDs or empty list if haven't these. - */ - public synchronized List getSupplier(int grpId, int partId, long cntrSince) { - if (map == null) - return Collections.emptyList(); - - List suppliers = new ArrayList<>(); - - for (Map.Entry> e : map.entrySet()) { - UUID supplierNode = e.getKey(); - - Long historyCounter = e.getValue().get(new GroupPartitionIdPair(grpId, partId)); - - if (historyCounter != null && historyCounter <= cntrSince) - suppliers.add(supplierNode); - } - - return suppliers; - } - - /** - * @param nodeId Node ID to check. - * @return Reservations for the given node. - */ - @Nullable public synchronized Map getReservations(UUID nodeId) { - if (map == null) - return null; - - return map.get(nodeId); - } - - /** - * @param nodeId Node ID. - * @param grpId Cache group ID. - * @param partId Partition ID. - * @param cntr Partition counter. - */ - public synchronized void put(UUID nodeId, int grpId, int partId, long cntr) { - if (map == null) - map = new HashMap<>(); - - Map nodeMap = map.computeIfAbsent(nodeId, k -> new HashMap<>()); - - nodeMap.put(new GroupPartitionIdPair(grpId, partId), cntr); - } - - /** - * @return {@code True} if empty. - */ - public synchronized boolean isEmpty() { - return map == null || map.isEmpty(); - } - - /** - * @param that Other map to put. - */ - public synchronized void putAll(IgniteDhtPartitionHistorySuppliersMap that) { - map = that.map; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteDhtPartitionHistorySuppliersMap.class, this); - } - -} diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 50848832fe66d..8f0610e05c63e 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1192,7 +1192,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPre org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader$2 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap -org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java index 8b76d8e2f43d5..112311c240c42 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java @@ -29,7 +29,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -107,13 +106,13 @@ public void testWriteReadHugeMessage() { /** */ private GridDhtPartitionsFullMessage fullMessage() { - IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); + Map> partHistSuppliers = new HashMap<>(); Map>> partsToReload = new HashMap<>(); for (int i = 0; i < 500; i++) { UUID uuid = UUID.randomUUID(); - partHistSuppliers.put(uuid, i, i + 1, i + 2); + partHistSuppliers.put(uuid, Map.of(new GroupPartitionIdPair(i, i + 1), i + 2L)); partsToReload.put(uuid, Map.of(i, Set.of(i + 1))); } @@ -122,8 +121,8 @@ private GridDhtPartitionsFullMessage fullMessage() { /** */ private void assertEqualsFullMsg(GridDhtPartitionsFullMessage expected, GridDhtPartitionsFullMessage actual) { - Map> expHistSuppliers = U.field(expected.partitionHistorySuppliers(), "map"); - Map> actHistSuppliers = U.field(actual.partitionHistorySuppliers(), "map"); + Map> expHistSuppliers = expected.partitionHistorySuppliers(); + Map> actHistSuppliers = actual.partitionHistorySuppliers(); assertEquals(expHistSuppliers.size(), actHistSuppliers.size());