Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMapSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMapSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMapSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessageSerializer;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
Expand Down Expand Up @@ -551,8 +549,6 @@ public GridIoMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
factory.register(508, GroupPartitionIdPair::new, new GroupPartitionIdPairSerializer());
factory.register(510, IgniteDhtPartitionHistorySuppliersMap::new,
new IgniteDhtPartitionHistorySuppliersMapSerializer());
factory.register(513, IgniteDhtPartitionsToReloadMap::new,
new IgniteDhtPartitionsToReloadMapSerializer());
factory.register(517, GridPartitionStateMap::new, new GridPartitionStateMapSerializer());
factory.register(518, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer());
factory.register(519, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
Expand Down Expand Up @@ -1350,7 +1349,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@Nullable final GridDhtPartitionExchangeId exchId,
@Nullable GridCacheVersion lastVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@Nullable IgniteDhtPartitionsToReloadMap partsToReload
@Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload
) {
Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();

Expand All @@ -1371,7 +1370,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@Nullable final GridDhtPartitionExchangeId exchId,
@Nullable GridCacheVersion lastVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@Nullable IgniteDhtPartitionsToReloadMap partsToReload,
@Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload,
Collection<CacheGroupContext> grps
) {
AffinityTopologyVersion ver = exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@

/** */
@GridToStringExclude
private final IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap();
private final Map<UUID, Map<Integer, Set<Integer>>> partsToReload = new HashMap<>();

/** */
private final AtomicBoolean done = new AtomicBoolean();
Expand Down Expand Up @@ -3425,8 +3425,15 @@
UUID nodeId = e.getKey();
Set<Integer> parts = e.getValue();

for (int part : parts)
partsToReload.put(nodeId, top.groupId(), part);
for (int part : parts) {
synchronized (partsToReload) {
Map<Integer, Set<Integer>> nodeMap = partsToReload.computeIfAbsent(nodeId, k -> new HashMap<>());

Set<Integer> partsToReload = nodeMap.computeIfAbsent(top.groupId(), k -> new HashSet<>());

Check warning on line 3432 in modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename "partsToReload" which hides the field declared at line 342.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ1TKAMx-tMJpjjAbwFM&open=AZ1TKAMx-tMJpjjAbwFM&pullRequest=12957

partsToReload.add(part);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Order(4)
@Compress
@GridToStringInclude
IgniteDhtPartitionsToReloadMap partsToReload;
Map<UUID, Map<Integer, Set<Integer>>> partsToReload;

/** Partition sizes. */
@Order(5)
Expand Down Expand Up @@ -144,7 +144,7 @@ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
@Nullable GridCacheVersion lastVer,
@NotNull AffinityTopologyVersion topVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@Nullable IgniteDhtPartitionsToReloadMap partsToReload) {
@Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload) {
super(id, lastVer);

assert id == null || topVer.equals(id.topologyVersion());
Expand Down Expand Up @@ -337,7 +337,11 @@ public Set<Integer> partsToReload(UUID nodeId, int grpId) {
if (partsToReload == null)
return Collections.emptySet();

return partsToReload.get(nodeId, grpId);
synchronized (partsToReload) {
Map<Integer, Set<Integer>> nodeMap = partsToReload.get(nodeId);

return nodeMap == null ? Collections.emptySet() : (Set<Integer>)F.emptyIfNull(nodeMap.get(grpId));
}
}

/**
Expand Down Expand Up @@ -443,9 +447,6 @@ public void topologyVersion(AffinityTopologyVersion topVer) {
if (partHistSuppliers == null)
partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();

if (partsToReload == null)
partsToReload = new IgniteDhtPartitionsToReloadMap();

errs = errMsgs == null ? null : F.viewReadOnly(errMsgs, e -> e.error());
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPre
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.managers.communication;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand All @@ -29,7 +30,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
Expand Down Expand Up @@ -108,13 +108,13 @@ public void testWriteReadHugeMessage() {
/** */
private GridDhtPartitionsFullMessage fullMessage() {
IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap();
Map<UUID, Map<Integer, Set<Integer>>> partsToReload = new HashMap<>();

for (int i = 0; i < 500; i++) {
UUID uuid = UUID.randomUUID();

partHistSuppliers.put(uuid, i, i + 1, i + 2);
partsToReload.put(uuid, i, i + 1);
partsToReload.put(uuid, Map.of(i, Set.of(i + 1)));
}

return new GridDhtPartitionsFullMessage(null, null, new AffinityTopologyVersion(0), partHistSuppliers, partsToReload);
Expand All @@ -130,8 +130,8 @@ private void assertEqualsFullMsg(GridDhtPartitionsFullMessage expected, GridDhtP
for (Map.Entry<UUID, Map<GroupPartitionIdPair, Long>> entry : expHistSuppliers.entrySet())
assertEquals(entry.getValue(), actHistSuppliers.get(entry.getKey()));

Map<UUID, Map<Integer, Set<Integer>>> expPartsToReload = U.field((Object)U.field(expected, "partsToReload"), "map");
Map<UUID, Map<Integer, Set<Integer>>> actPartsToReload = U.field((Object)U.field(actual, "partsToReload"), "map");
Map<UUID, Map<Integer, Set<Integer>>> expPartsToReload = U.field(expected, "partsToReload");
Map<UUID, Map<Integer, Set<Integer>>> actPartsToReload = U.field(actual, "partsToReload");

assertEquals(expPartsToReload.size(), actPartsToReload.size());

Expand Down
Loading