Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ databaseAttributeClause
databaseAttributeKey
: TTL
| TIME_PARTITION_INTERVAL
| TIME_PARTITION_ORIGIN
| SCHEMA_REGION_GROUP_NUM
| DATA_REGION_GROUP_NUM
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,10 @@ TIME_PARTITION_INTERVAL
: T I M E '_' P A R T I T I O N '_' I N T E R V A L
;

TIME_PARTITION_ORIGIN
: T I M E '_' P A R T I T I O N '_' O R I G I N
;

SCHEMA_REGION_GROUP_NUM
: S C H E M A '_' R E G I O N '_' G R O U P '_' N U M
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
Expand Down Expand Up @@ -232,6 +233,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case SetTimePartitionInterval:
plan = new SetTimePartitionIntervalPlan();
break;
case SetTimePartitionOrigin:
plan = new SetTimePartitionOriginPlan();
break;
case AdjustMaxRegionGroupNum:
plan = new AdjustMaxRegionGroupNumPlan();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public enum ConfigPhysicalPlanType {
SetSchemaReplicationFactor((short) 202),
SetDataReplicationFactor((short) 203),
SetTimePartitionInterval((short) 204),
SetTimePartitionOrigin((short) 212),
AdjustMaxRegionGroupNum((short) 205),
DeleteDatabase((short) 206),
PreDeleteDatabase((short) 207),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.consensus.request.write.database;

import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class SetTimePartitionOriginPlan extends ConfigPhysicalPlan {

private String storageGroup;

private long timePartitionOrigin;

public SetTimePartitionOriginPlan() {
super(ConfigPhysicalPlanType.SetTimePartitionOrigin);
}

public SetTimePartitionOriginPlan(String storageGroup, long timePartitionOrigin) {
this();
this.storageGroup = storageGroup;
this.timePartitionOrigin = timePartitionOrigin;
}

public String getDatabase() {
return storageGroup;
}

public long getTimePartitionOrigin() {
return timePartitionOrigin;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());

BasicStructureSerDeUtil.write(storageGroup, stream);
stream.writeLong(timePartitionOrigin);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
storageGroup = BasicStructureSerDeUtil.readString(buffer);
timePartitionOrigin = buffer.getLong();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SetTimePartitionOriginPlan that = (SetTimePartitionOriginPlan) o;
return timePartitionOrigin == that.timePartitionOrigin
&& storageGroup.equals(that.storageGroup);
}

@Override
public int hashCode() {
return Objects.hash(storageGroup, timePartitionOrigin);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.response.ainode.AINodeRegisterResp;
Expand Down Expand Up @@ -720,6 +721,16 @@ public TSStatus setTimePartitionInterval(
}
}

@Override
public TSStatus setTimePartitionOrigin(SetTimePartitionOriginPlan setTimePartitionOriginPlan) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return clusterSchemaManager.setTimePartitionOrigin(setTimePartitionOriginPlan);
} else {
return status;
}
}

@Override
public DataSet countMatchedDatabases(CountDatabasePlan countDatabasePlan) {
TSStatus status = confirmLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.cq.CQManager;
Expand Down Expand Up @@ -386,6 +387,8 @@ public interface IManager {

TSStatus setTimePartitionInterval(SetTimePartitionIntervalPlan configPhysicalPlan);

TSStatus setTimePartitionOrigin(SetTimePartitionOriginPlan configPhysicalPlan);

/**
* Count Databases.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.SetTableColumnCommentPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.SetTableCommentPlan;
Expand Down Expand Up @@ -470,6 +471,17 @@ public TSStatus setTimePartitionInterval(
}
}

public TSStatus setTimePartitionOrigin(SetTimePartitionOriginPlan setTimePartitionOriginPlan) {
try {
return getConsensusManager().write(setTimePartitionOriginPlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}

/**
* Only leader use this interface. Adjust the maxSchemaRegionGroupNum and maxDataRegionGroupNum of
* each Database based on existing cluster resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
Expand Down Expand Up @@ -435,6 +436,8 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
case SetTimePartitionInterval:
return clusterSchemaInfo.setTimePartitionInterval(
(SetTimePartitionIntervalPlan) physicalPlan);
case SetTimePartitionOrigin:
return clusterSchemaInfo.setTimePartitionOrigin((SetTimePartitionOriginPlan) physicalPlan);
case CreateRegionGroups:
return partitionInfo.createRegionGroups((CreateRegionGroupsPlan) physicalPlan);
case OfferRegionMaintainTasks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() {
*/
public void autoCleanPartitionTable(long TTL, TTimePartitionSlot currentTimeSlot) {
long[] removedTimePartitionSlots =
dataPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot).stream()
dataPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot, databaseName).stream()
.map(TTimePartitionSlot::getStartTime)
.collect(Collectors.toList())
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ public TSStatus autoCleanPartitionTable(AutoCleanPartitionTablePlan plan) {
if (isDatabaseExisted(database) && 0 < ttl && ttl < Long.MAX_VALUE) {
databasePartitionTables
.get(database)
.autoCleanPartitionTable(ttl, plan.getCurrentTimeSlot());
.autoCleanPartitionTable(ttl, plan.getCurrentTimeSlotMap());
}
});
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.DescTablePlan;
Expand All @@ -53,6 +54,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.AlterColumnDataTypePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
Expand Down Expand Up @@ -196,6 +198,10 @@ public TSStatus createDatabase(final DatabaseSchemaPlan plan) {
final TDatabaseSchema databaseSchema = plan.getSchema();
final PartialPath partialPathName = getQualifiedDatabasePartialPath(databaseSchema.getName());

// Update TimePartitionUtils cache with database-specific time partition settings
TimePartitionUtils.updateDatabaseTimePartitionConfig(
databaseSchema.getName(), databaseSchema);

final ConfigMTree mTree = databaseSchema.isIsTableModel() ? tableModelMTree : treeModelMTree;
mTree.setStorageGroup(partialPathName);

Expand Down Expand Up @@ -280,6 +286,9 @@ public TSStatus alterDatabase(final DatabaseSchemaPlan plan) {
.getAsMNode()
.setDatabaseSchema(currentSchema);

// Update TimePartitionUtils cache with new time partition settings
TimePartitionUtils.updateDatabaseTimePartitionConfig(currentSchema.getName(), currentSchema);

result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final MetadataException e) {
LOGGER.error(ERROR_NAME, e);
Expand All @@ -305,6 +314,9 @@ public TSStatus deleteDatabase(final DeleteDatabasePlan plan) {
(isTableModel ? tableModelMTree : treeModelMTree)
.deleteDatabase(getQualifiedDatabasePartialPath(plan.getName()));

// Remove database-specific time partition configuration from cache
TimePartitionUtils.removeDatabaseTimePartitionConfig(plan.getName());

result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final MetadataException e) {
LOGGER.warn("Database not exist", e);
Expand Down Expand Up @@ -482,6 +494,32 @@ public TSStatus setTimePartitionInterval(final SetTimePartitionIntervalPlan plan
return result;
}

public TSStatus setTimePartitionOrigin(final SetTimePartitionOriginPlan plan) {
final TSStatus result = new TSStatus();
databaseReadWriteLock.writeLock().lock();
try {
final ConfigMTree mTree =
PathUtils.isTableModelDatabase(plan.getDatabase()) ? tableModelMTree : treeModelMTree;
final PartialPath path = getQualifiedDatabasePartialPath(plan.getDatabase());
if (mTree.isDatabaseAlreadySet(path)) {
mTree
.getDatabaseNodeByDatabasePath(path)
.getAsMNode()
.getDatabaseSchema()
.setTimePartitionOrigin(plan.getTimePartitionOrigin());
result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode());
}
} catch (final MetadataException e) {
LOGGER.error(ERROR_NAME, e);
result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()).setMessage(ERROR_NAME);
} finally {
databaseReadWriteLock.writeLock().unlock();
}
return result;
}

/**
* Adjust the maximum RegionGroup count of each Database.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ private Flow analyzeMissingPartitions(final ConfigNodeProcedureEnv env) {
}

if (localEarliestSlotStartTime
> TimePartitionUtils.getStartTimeByPartitionId(earliestTimeslot)) {
> TimePartitionUtils.getStartTimeByPartitionId(earliestTimeslot, database)) {
databasesWithLostDataPartition.add(database);
LOG.warn(
"[DataPartitionIntegrity] Database {} has lost timeslot {} in its data table partition, and this issue needs to be repaired",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.response.ainode.AINodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.ainode.AINodeRegisterResp;
Expand Down Expand Up @@ -194,6 +195,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionOriginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowAINodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
Expand Down Expand Up @@ -446,19 +448,20 @@ public TSStatus alterDatabase(final TDatabaseSchema databaseSchema) {
"Failed to alter database. Doesn't support ALTER DataReplicationFactor yet.");
}

if (databaseSchema.isSetTimePartitionOrigin()) {
errorResp =
new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode())
.setMessage(
"Failed to alter database. Doesn't support ALTER TimePartitionOrigin yet.");
}

if (databaseSchema.isSetTimePartitionInterval()) {
errorResp =
new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode())
.setMessage(
"Failed to alter database. Doesn't support ALTER TimePartitionInterval yet.");
}
// Time partition settings are now supported for database-level configuration
// if (databaseSchema.isSetTimePartitionOrigin()) {
// errorResp =
// new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode())
// .setMessage(
// "Failed to alter database. Doesn't support ALTER TimePartitionOrigin yet.");
// }

// if (databaseSchema.isSetTimePartitionInterval()) {
// errorResp =
// new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode())
// .setMessage(
// "Failed to alter database. Doesn't support ALTER TimePartitionInterval yet.");
// }

if (errorResp != null) {
LOGGER.warn("Execute AlterDatabase: {} with result: {}", databaseSchema, errorResp);
Expand Down Expand Up @@ -513,6 +516,12 @@ public TSStatus setTimePartitionInterval(final TSetTimePartitionIntervalReq req)
new SetTimePartitionIntervalPlan(req.getDatabase(), req.getTimePartitionInterval()));
}

@Override
public TSStatus setTimePartitionOrigin(final TSetTimePartitionOriginReq req) throws TException {
return configManager.setTimePartitionOrigin(
new SetTimePartitionOriginPlan(req.getDatabase(), req.getTimePartitionOrigin()));
}

@Override
public TCountDatabaseResp countMatchedDatabases(final TGetDatabaseReq req) {
final PathPatternTree scope =
Expand Down
Loading