Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ private String buildClusterStatusFailureMessage(
.append(processStatusPassed)
.append(", expectedNodeSize=")
.append(
configNodeWrapperList.size() + dataNodeWrapperList.size() + aiNodeWrapperList.size())
configNodeWrapperList.size() + dataNodeWrapperList.size() + extraNodeWrappers.size())
.append(", actualNodeSize=")
.append(actualNodeSize);
if (showClusterStatus != null) {
Expand Down Expand Up @@ -592,7 +592,7 @@ private List<String> getClusterLogDirs() {
final List<AbstractNodeWrapper> allNodeWrappers = new ArrayList<>();
allNodeWrappers.addAll(configNodeWrapperList);
allNodeWrappers.addAll(dataNodeWrapperList);
allNodeWrappers.addAll(aiNodeWrapperList);
allNodeWrappers.addAll(extraNodeWrappers);
return allNodeWrappers.stream()
.map(AbstractNodeWrapper::getLogDirPath)
.distinct()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ public final class ConfigNodeMessages {
"Failed to drop trigger [%s], this trigger has not been created";
public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED =
"Failed to drop UDF [%s], this UDF has not been created";
public static final String
FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST =
"Failed to enrich pipe %s with root user for compatibility because root user %s does not exist.";
public static final String FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE =
"Failed to fetch schemaengine black list on DataNode {}, {}";
public static final String FAILED_TO_GET_FIELD = "Failed to get field {}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ public final class ConfigNodeMessages {
"Failed to drop trigger [%s], this trigger has not been created";
public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED =
"Failed to drop UDF [%s], this UDF has not been created";
public static final String
FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST =
"Failed to enrich pipe %s with root user for compatibility because root user %s does not exist.";
public static final String FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE =
"Failed to fetch schemaengine black list on DataNode {}, {}";
public static final String FAILED_TO_GET_FIELD = "Failed to get field {}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ public ConfigManager() throws IOException {
TriggerInfo triggerInfo = new TriggerInfo();
CQInfo cqInfo = new CQInfo();
ExternalServiceInfo externalServiceInfo = new ExternalServiceInfo();
PipeInfo pipeInfo = new PipeInfo();
this.permissionManager = createPermissionManager(authorInfo);
PipeInfo pipeInfo = new PipeInfo(userName -> this.permissionManager.login4Pipe(userName, null));
QuotaInfo quotaInfo = new QuotaInfo();
TTLInfo ttlInfo = new TTLInfo();
SubscriptionInfo subscriptionInfo = new SubscriptionInfo();
Expand Down Expand Up @@ -409,7 +410,6 @@ public ConfigManager() throws IOException {
new ClusterSchemaQuotaStatistics(
COMMON_CONF.getSeriesLimitThreshold(), COMMON_CONF.getDeviceLimitThreshold()));
this.partitionManager = new PartitionManager(this, partitionInfo);
this.permissionManager = createPermissionManager(authorInfo);
this.procedureManager = createProcedureManager(procedureInfo);
this.externalServiceManager = new ExternalServiceManager(this);
this.udfManager = new UDFManager(this, udfInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,7 @@ public boolean loadSnapshot(final File latestSnapshotRootDir) {
}
});
if (result.get()) {
pipeInfo.getPipeTaskInfo().enrichPipeMetasWithRootUserForCompatibility();
LOGGER.info(
ConfigNodeMessages.CONFIGNODESNAPSHOT_LOAD_SNAPSHOT_SUCCESS_LATESTSNAPSHOTROOTDIR,
latestSnapshotRootDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;

public class PipeInfo implements SnapshotProcessor {

Expand All @@ -58,8 +59,13 @@ public class PipeInfo implements SnapshotProcessor {
private final PipeTaskInfo pipeTaskInfo;

public PipeInfo() throws IOException {
this(null);
}

public PipeInfo(final Function<String, String> pipeUserCurrentPasswordProvider)
throws IOException {
pipePluginInfo = new PipePluginInfo();
pipeTaskInfo = new PipeTaskInfo();
pipeTaskInfo = new PipeTaskInfo(pipeUserCurrentPasswordProvider);
}

public PipePluginInfo getPipePluginInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
Expand Down Expand Up @@ -56,7 +57,6 @@
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
Expand All @@ -76,9 +76,11 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -95,10 +97,17 @@ public class PipeTaskInfo implements SnapshotProcessor {

// Pure in-memory object, not involved in snapshot serialization and deserialization.
private final PipeTaskInfoVersion pipeTaskInfoVersion;
// Accepts a username and returns its current stored password for pipe authentication.
private final Function<String, String> pipeUserCurrentPasswordProvider;

public PipeTaskInfo() {
this(null);
}

public PipeTaskInfo(final Function<String, String> pipeUserCurrentPasswordProvider) {
this.pipeMetaKeeper = new PipeMetaKeeper();
this.pipeTaskInfoVersion = new PipeTaskInfoVersion();
this.pipeUserCurrentPasswordProvider = pipeUserCurrentPasswordProvider;
}

/////////////////////////////// Lock ///////////////////////////////
Expand Down Expand Up @@ -445,6 +454,7 @@ private void validatePipePluginUsageByPipeInternal(String pluginName) {
public TSStatus createPipe(final CreatePipePlanV2 plan) {
acquireWriteLock();
try {
enrichPipeMetaWithRootUserForCompatibility(plan.getPipeStaticMeta());
pipeMetaKeeper.addPipeMeta(new PipeMeta(plan.getPipeStaticMeta(), plan.getPipeRuntimeMeta()));
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} finally {
Expand Down Expand Up @@ -502,6 +512,7 @@ public TSStatus operateMultiplePipes(final OperateMultiplePipesPlanV2 plan) {
public TSStatus alterPipe(final AlterPipePlanV2 plan) {
acquireWriteLock();
try {
enrichPipeMetaWithRootUserForCompatibility(plan.getPipeStaticMeta());
final PipeTemporaryMeta temporaryMeta =
pipeMetaKeeper.getPipeMeta(plan.getPipeStaticMeta().getPipeName()).getTemporaryMeta();
pipeMetaKeeper.removePipeMeta(plan.getPipeStaticMeta().getPipeName());
Expand Down Expand Up @@ -719,6 +730,7 @@ private TSStatus handleMetaChangesInternal(final PipeHandleMetaChangePlan plan)
plan.getPipeMetaList()
.forEach(
pipeMeta -> {
enrichPipeMetaWithRootUserForCompatibility(pipeMeta.getStaticMeta());
pipeMetaKeeper.addPipeMeta(pipeMeta);
logger.ifPresent(l -> l.debug(ConfigNodeMessages.RECORDING_PIPE_META, pipeMeta));
});
Expand Down Expand Up @@ -998,6 +1010,47 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException {
}
}

public void enrichPipeMetasWithRootUserForCompatibility() {
acquireWriteLock();
try {
pipeMetaKeeper
.getPipeMetaList()
.forEach(
pipeMeta -> enrichPipeMetaWithRootUserForCompatibility(pipeMeta.getStaticMeta()));
} finally {
releaseWriteLock();
}
}

private void enrichPipeMetaWithRootUserForCompatibility(final PipeStaticMeta pipeStaticMeta) {
if (pipeUserCurrentPasswordProvider == null) {
return;
}
final boolean shouldEnrichSource = pipeStaticMeta.mayNeedCompatibleRootUserForIoTDBSource();
final boolean shouldEnrichSink = pipeStaticMeta.mayNeedCompatibleRootUserForWriteBackSink();
if (!shouldEnrichSource && !shouldEnrichSink) {
return;
}

final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName();
final String password = pipeUserCurrentPasswordProvider.apply(rootUserName);
if (Objects.isNull(password)) {
throw new PipeException(
String.format(
ConfigNodeMessages
.FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST,
pipeStaticMeta.getPipeName(),
rootUserName));
}

if (shouldEnrichSource) {
pipeStaticMeta.enrichSourceWithRootUserForCompatibility(rootUserName, password);
}
if (shouldEnrichSink) {
pipeStaticMeta.enrichWriteBackSinkWithRootUserForCompatibility(rootUserName, password);
}
}

private void normalizeRecoveredConsensusPipeStatus() {
final List<String> restartedConsensusPipes = new ArrayList<>();

Expand Down
Loading
Loading