Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,21 @@ public abstract class ItemsFromStdin implements Iterable<String> {
": one or more, separated by spaces. To read from stdin, specify '-' and supply one item per line.";

private List<String> items = new ArrayList<>();
private boolean readFromStdin = false;

protected void setItems(List<String> arguments) {
items = readItemsFromStdinIfNeeded(arguments);
readFromStdin = arguments != null && !arguments.isEmpty() &&
"-".equals(arguments.iterator().next());

if (readFromStdin) {
items = readItemsFromStdin();
} else {
items = arguments == null ? new ArrayList<>() : arguments;
}
}

public boolean isReadFromStdin() {
return readFromStdin;
}

public List<String> getItems() {
Expand All @@ -52,11 +64,7 @@ public int size() {
return items.size();
}

private static List<String> readItemsFromStdinIfNeeded(List<String> parameters) {
if (parameters.isEmpty() || !"-".equals(parameters.iterator().next())) {
return parameters;
}

private static List<String> readItemsFromStdin() {
List<String> items = new ArrayList<>();
Scanner scanner = new Scanner(System.in, StandardCharsets.UTF_8.name());
while (scanner.hasNextLine()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.lang.Math.max;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Clock;
import java.time.Instant;
Expand Down Expand Up @@ -87,6 +88,7 @@ public final class ContainerInfo implements Comparable<ContainerInfo> {
private long sequenceId;
// Health state of the container (determined by ReplicationManager)
private ContainerHealthState healthState;
private boolean suppressed;

private ContainerInfo(Builder b) {
containerID = ContainerID.valueOf(b.containerID);
Expand All @@ -102,6 +104,7 @@ private ContainerInfo(Builder b) {
replicationConfig = b.replicationConfig;
clock = b.clock;
healthState = b.healthState != null ? b.healthState : ContainerHealthState.HEALTHY;
suppressed = b.suppressed;
}

public static Codec<ContainerInfo> getCodec() {
Expand All @@ -123,6 +126,10 @@ public static ContainerInfo fromProtobuf(HddsProtos.ContainerInfoProto info) {
.setReplicationConfig(config)
.setSequenceId(info.getSequenceId());

if (info.hasSuppressed()) {
builder.setSuppressed(info.getSuppressed());
}

if (info.hasPipelineID()) {
builder.setPipelineID(PipelineID.getFromProtobuf(info.getPipelineID()));
}
Expand Down Expand Up @@ -263,6 +270,26 @@ public void setHealthState(ContainerHealthState newHealthState) {
this.healthState = newHealthState;
}

/**
* Check if container is suppressed.
* Only included in JSON output when true.
*
* @return boolean
*/
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isSuppressed() {
return suppressed;
}

/**
* Set the boolean for suppressed.
*
* @param suppressed checks if container is suppressed or not
*/
public void setSuppressed(boolean suppressed) {
this.suppressed = suppressed;
}

@JsonIgnore
public HddsProtos.ContainerInfoProto getProtobuf() {
HddsProtos.ContainerInfoProto.Builder builder =
Expand All @@ -288,6 +315,10 @@ public HddsProtos.ContainerInfoProto getProtobuf() {
builder.setPipelineID(getPipelineID().getProtobuf());
}

if (suppressed) {
builder.setSuppressed(true);
}

return builder.build();
}

Expand Down Expand Up @@ -390,6 +421,7 @@ public static class Builder {
private PipelineID pipelineID;
private ReplicationConfig replicationConfig;
private ContainerHealthState healthState;
private boolean suppressed;

public Builder setPipelineID(PipelineID pipelineId) {
this.pipelineID = pipelineId;
Expand Down Expand Up @@ -447,6 +479,11 @@ public Builder setHealthState(ContainerHealthState healthState) {
return this;
}

public Builder setSuppressed(boolean suppressed) {
this.suppressed = suppressed;
return this;
}

/**
* Also resets {@code stateEnterTime}, so make sure to set clock first.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,25 @@ ContainerListResult listContainer(long startContainerID, int count,
ReplicationConfig replicationConfig)
throws IOException;

/**
* Lists a range of containers and get their info.
*
* @param startContainerID start containerID.
* @param count count must be {@literal >} 0.
* @param state Container of this state will be returned.
* @param replicationConfig container replication Config.
* @param suppressed container to be suppressed/unsuppressed from report
* @return a list of containers capped by max count allowed
* in "ozone.scm.container.list.max.count" and total number of containers.
* @throws IOException
*/
ContainerListResult listContainer(long startContainerID, int count,
HddsProtos.LifeCycleState state,
HddsProtos.ReplicationType replicationType,
ReplicationConfig replicationConfig,
Boolean suppressed)
throws IOException;

/**
* Read meta data from an existing container.
* @param containerID - ID of the container.
Expand Down Expand Up @@ -465,4 +484,14 @@ DecommissionScmResponseProto decommissionScm(
*/
void reconcileContainer(long containerID) throws IOException;

/**
* Suppress or unsuppress containers from reports.
* Suppressed containers are excluded from replication manager reports
* regardless of their health state.
*
* @param containerIds container IDs to suppress or unsuppress
* @param suppress true to suppress, false to unsuppress
* @throws IOException
*/
List<Long> suppressContainers(List<Long> containerIds, boolean suppress) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,30 @@ ContainerListResult listContainer(long startContainerID,
HddsProtos.ReplicationType replicationType,
ReplicationConfig replicationConfig) throws IOException;

/**
* Ask SCM for a list of containers with a range of container ID, state
* and replication config, and the limit of count.
* The containers are returned from startID (exclusive), and
* filtered by state and replication config. The returned list is limited to
* count entries.
*
* @param startContainerID start container ID.
* @param count count, if count {@literal <} 0, the max size is unlimited.(
* Usually the count will be replace with a very big
* value instead of being unlimited in case the db is very big)
* @param state Container with this state will be returned.
* @param replicationConfig Replication config for the containers
* @param suppressed container to be suppressed/unsuppressed from report
* @return a list of containers capped by max count allowed
* in "ozone.scm.container.list.max.count" and total number of containers.
* @throws IOException
*/
ContainerListResult listContainer(long startContainerID,
int count, HddsProtos.LifeCycleState state,
HddsProtos.ReplicationType replicationType,
ReplicationConfig replicationConfig,
Boolean suppressed) throws IOException;

/**
* Deletes a container in SCM.
*
Expand Down Expand Up @@ -521,4 +545,13 @@ DecommissionScmResponseProto decommissionScm(
* @throws IOException On error
*/
void reconcileContainer(long containerID) throws IOException;

/**
* Suppress or unsuppress containers from reports.
*
* @param containerIds container IDs to suppress or unsuppress
* @param suppress true to suppress, false to unsuppress
* @throws IOException
*/
List<Long> suppressContainers(List<Long> containerIds, boolean suppress) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SuppressContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SuppressContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ScmInfo;
Expand Down Expand Up @@ -445,6 +447,16 @@ public ContainerListResult listContainer(long startContainerID, int count,
HddsProtos.ReplicationType replicationType,
ReplicationConfig replicationConfig)
throws IOException {
return listContainer(startContainerID, count, state, replicationType, replicationConfig, null);
}

@Override
public ContainerListResult listContainer(long startContainerID, int count,
HddsProtos.LifeCycleState state,
HddsProtos.ReplicationType replicationType,
ReplicationConfig replicationConfig,
Boolean suppressed)
throws IOException {
Preconditions.checkState(startContainerID >= 0,
"Container ID cannot be negative.");
Preconditions.checkState(count > 0,
Expand All @@ -454,6 +466,9 @@ public ContainerListResult listContainer(long startContainerID, int count,
builder.setStartContainerID(startContainerID);
builder.setCount(count);
builder.setTraceID(TracingUtil.exportCurrentSpan());
if (suppressed != null) {
builder.setSuppressed(suppressed);
}
if (state != null) {
builder.setState(state);
}
Expand Down Expand Up @@ -1312,6 +1327,19 @@ public void reconcileContainer(long containerID) throws IOException {
submitRequest(Type.ReconcileContainer, builder -> builder.setReconcileContainerRequest(request));
}

@Override
public List<Long> suppressContainers(List<Long> containerIds, boolean suppress)
throws IOException {
SuppressContainerRequestProto request = SuppressContainerRequestProto.newBuilder()
.addAllContainerIDs(containerIds)
.setSuppress(suppress)
.build();
SuppressContainerResponseProto response =
submitRequest(Type.SuppressContainer, builder -> builder.setSuppressContainerRequest(request))
.getSuppressContainerResponse();
return response.getFailedContainerIDsList();
}

/**
* Holder class to store the target SCM node ID for routing requests.
* This allows requests to be directed to specific SCM nodes in an HA cluster.
Expand Down
14 changes: 14 additions & 0 deletions hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ message ScmContainerLocationRequest {
optional ReconcileContainerRequestProto reconcileContainerRequest = 49;
optional GetDeletedBlocksTxnSummaryRequestProto getDeletedBlocksTxnSummaryRequest = 50;
optional SCMListContainerIDsRequestProto scmListContainerIDsRequest = 51;
optional SuppressContainerRequestProto suppressContainerRequest = 52;
}

message ScmContainerLocationResponse {
Expand Down Expand Up @@ -147,6 +148,7 @@ message ScmContainerLocationResponse {
optional ReconcileContainerResponseProto reconcileContainerResponse = 49;
optional GetDeletedBlocksTxnSummaryResponseProto getDeletedBlocksTxnSummaryResponse = 50;
optional SCMListContainerIDsResponseProto scmListContainerIDsResponse = 51;
optional SuppressContainerResponseProto suppressContainerResponse = 52;

enum Status {
OK = 1;
Expand Down Expand Up @@ -205,6 +207,7 @@ enum Type {
ReconcileContainer = 45;
GetDeletedBlocksTransactionSummary = 46;
ListContainerIDs = 47;
SuppressContainer = 48;
}

/**
Expand Down Expand Up @@ -313,6 +316,7 @@ message SCMListContainerRequestProto {
optional ReplicationFactor factor = 5;
optional ReplicationType type = 6;
optional ECReplicationConfig ecReplicationConfig = 7;
optional bool suppressed = 8;
}

message SCMListContainerResponseProto {
Expand Down Expand Up @@ -711,6 +715,16 @@ message ReconcileContainerRequestProto {
message ReconcileContainerResponseProto {
}

message SuppressContainerRequestProto {
repeated int64 containerIDs = 1;
optional bool suppress = 2;
}

message SuppressContainerResponseProto {
// Container IDs that failed to suppress or unsuppress. Empty if all succeeded.
repeated int64 failedContainerIDs = 1;
}

/**
* Protocol used from an HDFS node to StorageContainerManager. See the request
* and response messages for details of the RPC calls.
Expand Down
1 change: 1 addition & 0 deletions hadoop-hdds/interface-client/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ message ContainerInfoProto {
optional ReplicationFactor replicationFactor = 10;
required ReplicationType replicationType = 11;
optional ECReplicationConfig ecReplicationConfig = 12;
optional bool suppressed = 13;
}

message ContainerWithPipeline {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
Expand Down Expand Up @@ -238,4 +239,14 @@ void deleteContainer(ContainerID containerID)
* @return containerStateManger
*/
ContainerStateManager getContainerStateManager();

/**
* Update container info in the container manager.
* This is used for updating container metadata like ackMissing flag.
*
* @param containerInfo Updated container info proto
* @throws IOException
*/
void updateContainerInfo(ContainerID containerID, ContainerInfoProto containerInfo)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,21 @@ public void updateContainerState(final ContainerID cid,
}
}

@Override
public void updateContainerInfo(final ContainerID cid, ContainerInfoProto containerInfo)
throws IOException {
lock.lock();
try {
if (containerExist(cid)) {
containerStateManager.updateContainerInfo(containerInfo);
} else {
throw new ContainerNotFoundException(cid);
}
} finally {
lock.unlock();
}
}

@Override
public void transitionDeletingOrDeletedToTargetState(ContainerID containerID, LifeCycleState targetState)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,14 @@ void reinitialize(Table<ContainerID, ContainerInfo> containerStore)
default RequestType getType() {
return RequestType.CONTAINER;
}

/**
* Update container info.
*
* @param containerInfo Updated container info proto
* @throws IOException
*/
@Replicate
void updateContainerInfo(HddsProtos.ContainerInfoProto containerInfo)
throws IOException;
}
Loading