Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
Expand Down Expand Up @@ -142,13 +141,13 @@ public GridEventConsumeHandler() {
}

/** {@inheritDoc} */
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
Map<Integer, T2<Long, Long>> cntrs) {
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
Map<Integer, Long> cntrs) {
// No-op.
}

/** {@inheritDoc} */
@Override public Map<Integer, T2<Long, Long>> updateCounters() {
@Override public Map<Integer, Long> updateCounters() {
return Collections.emptyMap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
Expand Down Expand Up @@ -117,13 +116,13 @@ public GridMessageListenHandler(@Nullable Object topic, IgniteBiPredicate<UUID,
}

/** {@inheritDoc} */
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
Map<Integer, T2<Long, Long>> cntrs) {
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
Map<Integer, Long> cntrs) {
// No-op.
}

/** {@inheritDoc} */
@Override public Map<Integer, T2<Long, Long>> updateCounters() {
@Override public Map<Integer, Long> updateCounters() {
return Collections.emptyMap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessageSerializer;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessageMarshallableSerializer;
import org.apache.ignite.internal.processors.continuous.StartRequestData;
import org.apache.ignite.internal.processors.continuous.StartRequestDataSerializer;
import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessageSerializer;
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageSerializer;
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2Serializer;
import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage;
import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessageSerializer;
import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
Expand Down Expand Up @@ -245,6 +253,7 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
factory.register(-200, TcpDiscoveryCollectionMessage::new,
new TcpDiscoveryCollectionMessageMarshallableSerializer(marsh, clsLdr));

factory.register(-118, StartRequestData::new, new StartRequestDataSerializer());
factory.register(-117, TcpDiscoveryNode::new, new TcpDiscoveryNodeMarshallableSerializer(marsh, clsLdr));
factory.register(-116, IgniteProductVersion::new, new IgniteProductVersionSerializer());
factory.register(-115, SchemaAlterTableAddColumnOperation::new, new SchemaAlterTableAddColumnOperationSerializer());
Expand Down Expand Up @@ -367,5 +376,8 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
new ServiceDeploymentRequestMarshallableSerializer(marsh, clsLdr));
factory.register(538, ServiceUndeploymentRequest::new, new ServiceUndeploymentRequestSerializer());
factory.register(539, ExchangeFailureMessage::new, new ExchangeFailureMessageSerializer());
factory.register(540, StartRoutineDiscoveryMessage::new, new StartRoutineDiscoveryMessageSerializer());
factory.register(541, StartRoutineAckDiscoveryMessage::new, new StartRoutineAckDiscoveryMessageSerializer());
factory.register(542, StartRoutineDiscoveryMessageV2::new, new StartRoutineDiscoveryMessageV2Serializer());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Collections;
import java.util.Map;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;

Expand Down Expand Up @@ -203,15 +202,14 @@ public long updateCounterAt(int idx) {
* @param cntrsMap Partial local counters map.
* @return Partition ID to partition counters map.
*/
public static Map<Integer, T2<Long, Long>> toCountersMap(CachePartitionPartialCountersMap cntrsMap) {
public static Map<Integer, Long> toCountersMap(CachePartitionPartialCountersMap cntrsMap) {
if (cntrsMap.size() == 0)
return Collections.emptyMap();

Map<Integer, T2<Long, Long>> res = U.newHashMap(cntrsMap.size());
Map<Integer, Long> res = U.newHashMap(cntrsMap.size());

for (int idx = 0; idx < cntrsMap.size(); idx++)
res.put(cntrsMap.partitionAt(idx),
new T2<>(cntrsMap.initialUpdateCounterAt(idx), cntrsMap.updateCounterAt(idx)));
res.put(cntrsMap.partitionAt(idx), cntrsMap.updateCounterAt(idx));

return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -200,10 +199,10 @@
private transient int cacheId;

/** */
private transient volatile Map<Integer, T2<Long, Long>> initUpdCntrs;
private transient volatile Map<Integer, Long> initUpdCntrs;

Check warning on line 202 in modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a thread-safe type; adding "volatile" is not enough to make this field thread-safe.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ0v7pe5hxXDQFXrUbhw&open=AZ0v7pe5hxXDQFXrUbhw&pullRequest=12952

/** */
private transient volatile Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode;
private transient volatile Map<UUID, Map<Integer, Long>> initUpdCntrsPerNode;

Check warning on line 205 in modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a thread-safe type; adding "volatile" is not enough to make this field thread-safe.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ0v7pe5hxXDQFXrUbhx&open=AZ0v7pe5hxXDQFXrUbhx&pullRequest=12952

/** */
private transient volatile AffinityTopologyVersion initTopVer;
Expand All @@ -224,7 +223,7 @@
private transient UUID routineId;

/** Local update counters values on listener start. Used for skipping events fired before the listener start. */
private transient volatile Map<Integer, T2<Long, Long>> locInitUpdCntrs;
private transient volatile Map<Integer, Long> locInitUpdCntrs;

Check warning on line 226 in modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a thread-safe type; adding "volatile" is not enough to make this field thread-safe.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ0v7pe5hxXDQFXrUbhy&open=AZ0v7pe5hxXDQFXrUbhy&pullRequest=12952

/** */
private transient GridKernalContext ctx;
Expand Down Expand Up @@ -361,15 +360,15 @@
}

/** {@inheritDoc} */
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
Map<Integer, T2<Long, Long>> cntrs) {
this.initUpdCntrsPerNode = cntrsPerNode;
this.initUpdCntrs = cntrs;
this.initTopVer = topVer;
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
Map<Integer, Long> cntrs) {
initUpdCntrsPerNode = cntrsPerNode;
initUpdCntrs = cntrs;
initTopVer = topVer;
}

/** {@inheritDoc} */
@Override public Map<Integer, T2<Long, Long>> updateCounters() {
@Override public Map<Integer, Long> updateCounters() {
return locInitUpdCntrs;
}

Expand Down Expand Up @@ -1163,32 +1162,31 @@
CacheContinuousQueryPartitionRecovery rec = rcvs.get(partId);

if (rec == null) {
T2<Long, Long> partCntrs = null;
Long partCntr = null;

Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode = this.initUpdCntrsPerNode;
Map<UUID, Map<Integer, Long>> initUpdCntrsPerNode = this.initUpdCntrsPerNode;

Check warning on line 1167 in modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename "initUpdCntrsPerNode" which hides the field declared at line 205.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ0v7pe5hxXDQFXrUbhz&open=AZ0v7pe5hxXDQFXrUbhz&pullRequest=12952

if (initUpdCntrsPerNode != null) {
GridCacheContext<K, V> cctx = cacheContext(ctx);

GridCacheAffinityManager aff = cctx.affinity();

for (ClusterNode node : aff.nodesByPartition(partId, topVer)) {
Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
Map<Integer, Long> map = initUpdCntrsPerNode.get(node.id());

if (map != null) {
partCntrs = map.get(partId);
partCntr = map.get(partId);

break;
}
}
}
else if (initUpdCntrs != null)
partCntrs = initUpdCntrs.get(partId);
partCntr = initUpdCntrs.get(partId);

T2<Long, Long> partCntrs0 = partCntrs;
Long partCntr0 = partCntr;
CacheContinuousQueryPartitionRecovery oldRec = rcvs.computeIfAbsent(partId, k ->
new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
partCntrs0 != null ? partCntrs0.get2() : null));
new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer, partCntr0));

if (oldRec != null)
rec = oldRec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;

/**
*
*/
public abstract class AbstractContinuousMessage implements DiscoveryCustomMessage {
public abstract class AbstractContinuousMessage implements Message, DiscoveryCustomMessage {
/** */
private static final long serialVersionUID = 2781778657738703012L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.T2;
import org.jetbrains.annotations.Nullable;

/**
Expand Down Expand Up @@ -163,11 +162,11 @@ public default void flushOnNodeLeft() {
* @param cntrs Init state for partition counters.
* @param topVer Topology version.
*/
public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
Map<Integer, T2<Long, Long>> cntrs);
public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
Map<Integer, Long> cntrs);

/**
* @return Init state for partition counters.
*/
public Map<Integer, T2<Long, Long>> updateCounters();
public Map<Integer, Long> updateCounters();
}
Loading
Loading