diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 82513929b6971..df0b31090f54b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -557,7 +557,7 @@ private String buildClusterStatusFailureMessage( .append(processStatusPassed) .append(", expectedNodeSize=") .append( - configNodeWrapperList.size() + dataNodeWrapperList.size() + aiNodeWrapperList.size()) + configNodeWrapperList.size() + dataNodeWrapperList.size() + extraNodeWrappers.size()) .append(", actualNodeSize=") .append(actualNodeSize); if (showClusterStatus != null) { @@ -592,7 +592,7 @@ private List getClusterLogDirs() { final List allNodeWrappers = new ArrayList<>(); allNodeWrappers.addAll(configNodeWrapperList); allNodeWrappers.addAll(dataNodeWrapperList); - allNodeWrappers.addAll(aiNodeWrapperList); + allNodeWrappers.addAll(extraNodeWrappers); return allNodeWrappers.stream() .map(AbstractNodeWrapper::getLogDirPath) .distinct() diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index f662fa5871dfb..69a69c76db71a 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -216,6 +216,9 @@ public final class ConfigNodeMessages { "Failed to drop trigger [%s], this trigger has not been created"; public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED = "Failed to drop UDF [%s], this UDF has not been created"; + public static final String + FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST = + "Failed to enrich pipe %s with root user for compatibility because root user %s does not exist."; public static final String FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE = "Failed to fetch schemaengine black list on DataNode {}, {}"; public static final String FAILED_TO_GET_FIELD = "Failed to get field {}"; diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index 8bffa1b08311a..6bf3da0e68b19 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -212,6 +212,9 @@ public final class ConfigNodeMessages { "Failed to drop trigger [%s], this trigger has not been created"; public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED = "Failed to drop UDF [%s], this UDF has not been created"; + public static final String + FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST = + "Failed to enrich pipe %s with root user for compatibility because root user %s does not exist."; public static final String FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE = "Failed to fetch schemaengine black list on DataNode {}, {}"; public static final String FAILED_TO_GET_FIELD = "Failed to get field {}"; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 37dd66b152729..f13b3f7b005f5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -375,7 +375,8 @@ public ConfigManager() throws IOException { TriggerInfo triggerInfo = new TriggerInfo(); CQInfo cqInfo = new CQInfo(); ExternalServiceInfo externalServiceInfo = new ExternalServiceInfo(); - PipeInfo pipeInfo = new PipeInfo(); + this.permissionManager = createPermissionManager(authorInfo); + PipeInfo pipeInfo = new PipeInfo(userName -> this.permissionManager.login4Pipe(userName, null)); QuotaInfo quotaInfo = new QuotaInfo(); TTLInfo ttlInfo = new TTLInfo(); SubscriptionInfo subscriptionInfo = new SubscriptionInfo(); @@ -409,7 +410,6 @@ public ConfigManager() throws IOException { new ClusterSchemaQuotaStatistics( COMMON_CONF.getSeriesLimitThreshold(), COMMON_CONF.getDeviceLimitThreshold())); this.partitionManager = new PartitionManager(this, partitionInfo); - this.permissionManager = createPermissionManager(authorInfo); this.procedureManager = createProcedureManager(procedureInfo); this.externalServiceManager = new ExternalServiceManager(this); this.udfManager = new UDFManager(this, udfInfo); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index ed23fda8a6f8f..4a70ace8ca19e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -801,6 +801,7 @@ public boolean loadSnapshot(final File latestSnapshotRootDir) { } }); if (result.get()) { + pipeInfo.getPipeTaskInfo().enrichPipeMetasWithRootUserForCompatibility(); LOGGER.info( ConfigNodeMessages.CONFIGNODESNAPSHOT_LOAD_SNAPSHOT_SUCCESS_LATESTSNAPSHOTROOTDIR, latestSnapshotRootDir); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java index e353398cd9de1..bef26386afd7c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.function.Function; public class PipeInfo implements SnapshotProcessor { @@ -58,8 +59,13 @@ public class PipeInfo implements SnapshotProcessor { private final PipeTaskInfo pipeTaskInfo; public PipeInfo() throws IOException { + this(null); + } + + public PipeInfo(final Function pipeUserCurrentPasswordProvider) + throws IOException { pipePluginInfo = new PipePluginInfo(); - pipeTaskInfo = new PipeTaskInfo(); + pipeTaskInfo = new PipeTaskInfo(pipeUserCurrentPasswordProvider); } public PipePluginInfo getPipePluginInfo() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 3c2e331b0a164..247f803152bd9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; @@ -56,7 +57,6 @@ import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure; import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; -import org.apache.iotdb.confignode.service.ConfigNode; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -76,9 +76,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -95,10 +97,17 @@ public class PipeTaskInfo implements SnapshotProcessor { // Pure in-memory object, not involved in snapshot serialization and deserialization. private final PipeTaskInfoVersion pipeTaskInfoVersion; + // Accepts a username and returns its current stored password for pipe authentication. + private final Function pipeUserCurrentPasswordProvider; public PipeTaskInfo() { + this(null); + } + + public PipeTaskInfo(final Function pipeUserCurrentPasswordProvider) { this.pipeMetaKeeper = new PipeMetaKeeper(); this.pipeTaskInfoVersion = new PipeTaskInfoVersion(); + this.pipeUserCurrentPasswordProvider = pipeUserCurrentPasswordProvider; } /////////////////////////////// Lock /////////////////////////////// @@ -445,6 +454,7 @@ private void validatePipePluginUsageByPipeInternal(String pluginName) { public TSStatus createPipe(final CreatePipePlanV2 plan) { acquireWriteLock(); try { + enrichPipeMetaWithRootUserForCompatibility(plan.getPipeStaticMeta()); pipeMetaKeeper.addPipeMeta(new PipeMeta(plan.getPipeStaticMeta(), plan.getPipeRuntimeMeta())); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } finally { @@ -502,6 +512,7 @@ public TSStatus operateMultiplePipes(final OperateMultiplePipesPlanV2 plan) { public TSStatus alterPipe(final AlterPipePlanV2 plan) { acquireWriteLock(); try { + enrichPipeMetaWithRootUserForCompatibility(plan.getPipeStaticMeta()); final PipeTemporaryMeta temporaryMeta = pipeMetaKeeper.getPipeMeta(plan.getPipeStaticMeta().getPipeName()).getTemporaryMeta(); pipeMetaKeeper.removePipeMeta(plan.getPipeStaticMeta().getPipeName()); @@ -719,6 +730,7 @@ private TSStatus handleMetaChangesInternal(final PipeHandleMetaChangePlan plan) plan.getPipeMetaList() .forEach( pipeMeta -> { + enrichPipeMetaWithRootUserForCompatibility(pipeMeta.getStaticMeta()); pipeMetaKeeper.addPipeMeta(pipeMeta); logger.ifPresent(l -> l.debug(ConfigNodeMessages.RECORDING_PIPE_META, pipeMeta)); }); @@ -998,6 +1010,47 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException { } } + public void enrichPipeMetasWithRootUserForCompatibility() { + acquireWriteLock(); + try { + pipeMetaKeeper + .getPipeMetaList() + .forEach( + pipeMeta -> enrichPipeMetaWithRootUserForCompatibility(pipeMeta.getStaticMeta())); + } finally { + releaseWriteLock(); + } + } + + private void enrichPipeMetaWithRootUserForCompatibility(final PipeStaticMeta pipeStaticMeta) { + if (pipeUserCurrentPasswordProvider == null) { + return; + } + final boolean shouldEnrichSource = pipeStaticMeta.mayNeedCompatibleRootUserForIoTDBSource(); + final boolean shouldEnrichSink = pipeStaticMeta.mayNeedCompatibleRootUserForWriteBackSink(); + if (!shouldEnrichSource && !shouldEnrichSink) { + return; + } + + final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); + final String password = pipeUserCurrentPasswordProvider.apply(rootUserName); + if (Objects.isNull(password)) { + throw new PipeException( + String.format( + ConfigNodeMessages + .FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST, + pipeStaticMeta.getPipeName(), + rootUserName)); + } + + if (shouldEnrichSource) { + pipeStaticMeta.enrichSourceWithRootUserForCompatibility(rootUserName, password); + } + if (shouldEnrichSink) { + pipeStaticMeta.enrichWriteBackSinkWithRootUserForCompatibility(rootUserName, password); + } + } + private void normalizeRecoveredConsensusPipeStatus() { final List restartedConsensusPipes = new ArrayList<>(); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java index 762c3bf045c5b..7b78f59253dc8 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java @@ -20,11 +20,15 @@ package org.apache.iotdb.confignode.persistence.pipe; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; @@ -88,6 +92,145 @@ public void testRecordDataNodePushPipeMetaExceptionsKeepsUserStoppedPipeOutOfAut Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get()); } + @Test + public void testEnrichOldUserPipeWithRootUserForCompatibility() { + final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); + final String rootPassword = "root-current-password"; + pipeTaskInfo = new PipeTaskInfo(username -> rootPassword); + + createPipe("oldPipe", PipeStatus.STOPPED); + + final Map sourceAttributes = + pipeTaskInfo + .getPipeMetaByPipeName("oldPipe") + .getStaticMeta() + .getSourceParameters() + .getAttribute(); + Assert.assertEquals( + String.valueOf(IoTDBConstant.SUPER_USER_ID), + sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USER_ID)); + Assert.assertEquals( + rootUserName, sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)); + Assert.assertEquals( + rootPassword, sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + Assert.assertFalse( + pipeTaskInfo + .getPipeMetaByPipeName("oldPipe") + .getStaticMeta() + .getSinkParameters() + .getAttribute() + .containsKey(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY)); + } + + @Test + public void testDoNotOverwritePipeWithUserForCompatibility() { + pipeTaskInfo = new PipeTaskInfo(username -> "root-current-password"); + + createPipeWithSourceAttributes( + "newPipe", + new HashMap() { + { + put("extractor", "iotdb-source"); + put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "user"); + put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, "user-password"); + } + }); + + final Map sourceAttributes = + pipeTaskInfo + .getPipeMetaByPipeName("newPipe") + .getStaticMeta() + .getSourceParameters() + .getAttribute(); + Assert.assertEquals("user", sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)); + Assert.assertEquals( + "user-password", sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + } + + @Test + public void testDoNotEnrichSystemPipeForCompatibility() { + pipeTaskInfo = new PipeTaskInfo(username -> "root-current-password"); + + createPipeWithSourceAttributes( + PipeStaticMeta.generateSubscriptionPipeName("topic", "group"), + new HashMap() { + { + put("extractor", "iotdb-source"); + } + }); + + final Map sourceAttributes = + pipeTaskInfo + .getPipeMetaByPipeName(PipeStaticMeta.generateSubscriptionPipeName("topic", "group")) + .getStaticMeta() + .getSourceParameters() + .getAttribute(); + Assert.assertFalse(sourceAttributes.containsKey(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)); + Assert.assertFalse(sourceAttributes.containsKey(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + } + + @Test + public void testEnrichOldWriteBackSinkWithRootUserForCompatibility() { + final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); + final String rootPassword = "root-current-password"; + pipeTaskInfo = new PipeTaskInfo(username -> rootPassword); + + createPipeWithAttributes( + "oldWriteBackPipe", + new HashMap() { + { + put("extractor", "iotdb-source"); + put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "source-user"); + put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, "source-password"); + } + }, + new HashMap() { + { + put("connector", "write-back-sink"); + } + }); + + final Map sinkAttributes = + pipeTaskInfo + .getPipeMetaByPipeName("oldWriteBackPipe") + .getStaticMeta() + .getSinkParameters() + .getAttribute(); + Assert.assertEquals( + String.valueOf(IoTDBConstant.SUPER_USER_ID), + sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_USER_ID)); + Assert.assertEquals(rootUserName, sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY)); + Assert.assertEquals(rootPassword, sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY)); + } + + @Test + public void testEnrichLoadedPipeMetasWithRootUserForCompatibility() { + final String rootPassword = "root-current-password"; + pipeTaskInfo = new PipeTaskInfo(username -> rootPassword); + + createPipeWithSourceAttributes( + "loadedPipe", + new HashMap() { + { + put("extractor", "iotdb-source"); + } + }); + final Map sourceAttributes = + pipeTaskInfo + .getPipeMetaByPipeName("loadedPipe") + .getStaticMeta() + .getSourceParameters() + .getAttribute(); + sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_USER_ID); + sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY); + sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY); + + pipeTaskInfo.enrichPipeMetasWithRootUserForCompatibility(); + + Assert.assertEquals( + rootPassword, sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + } + private Map createErrorRespMap(final String pipeName) { final TPushPipeMetaRespExceptionMessage exceptionMessage = new TPushPipeMetaRespExceptionMessage( @@ -101,11 +244,27 @@ private Map createErrorRespMap(final String pipeName private void createPipe(final String pipeName, final PipeStatus initialStatus) { final Map extractorAttributes = new HashMap<>(); - final Map processorAttributes = new HashMap<>(); - final Map connectorAttributes = new HashMap<>(); extractorAttributes.put("extractor", "iotdb-source"); - processorAttributes.put("processor", "do-nothing-processor"); + createPipeWithSourceAttributes(pipeName, extractorAttributes); + + if (PipeStatus.RUNNING.equals(initialStatus)) { + pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); + } + } + + private void createPipeWithSourceAttributes( + final String pipeName, final Map extractorAttributes) { + final Map connectorAttributes = new HashMap<>(); connectorAttributes.put("connector", "iotdb-thrift-sink"); + createPipeWithAttributes(pipeName, extractorAttributes, connectorAttributes); + } + + private void createPipeWithAttributes( + final String pipeName, + final Map extractorAttributes, + final Map connectorAttributes) { + final Map processorAttributes = new HashMap<>(); + processorAttributes.put("processor", "do-nothing-processor"); final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID); final ConcurrentMap pipeTasks = new ConcurrentHashMap<>(); @@ -120,9 +279,5 @@ private void createPipe(final String pipeName, final PipeStatus initialStatus) { connectorAttributes); final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks); pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta)); - - if (PipeStatus.RUNNING.equals(initialStatus)) { - pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); - } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java index a21b8fc58c21b..7223ad7dcd905 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java @@ -36,6 +36,7 @@ import java.nio.file.Files; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java index 4b57cd64e43b2..9855552113c67 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java @@ -19,7 +19,9 @@ package org.apache.iotdb.commons.pipe.agent.task.meta; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility; import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils; @@ -97,6 +99,64 @@ public boolean isSourceExternal() { .toLowerCase()); } + public boolean mayNeedCompatibleRootUserForIoTDBSource() { + final String pluginName = + sourceParameters + .getStringOrDefault( + Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, PipeSourceConstant.SOURCE_KEY), + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + .toLowerCase(); + + return PipeType.USER.equals(getPipeType()) + && (pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + || pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) + && !sourceParameters.hasAnyAttributes( + PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY, + PipeSourceConstant.SOURCE_IOTDB_USER_KEY, + PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY, + PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, + PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY, + PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY); + } + + public boolean mayNeedCompatibleRootUserForWriteBackSink() { + final String pluginName = + sinkParameters + .getStringOrDefault( + Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY), + BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName()) + .toLowerCase(); + + return PipeType.USER.equals(getPipeType()) + && (pluginName.equals(BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName()) + || pluginName.equals(BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName())) + && !sinkParameters.hasAnyAttributes( + PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY, + PipeSinkConstant.SINK_IOTDB_USER_KEY, + PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY, + PipeSinkConstant.SINK_IOTDB_USERNAME_KEY, + PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY, + PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY); + } + + public void enrichSourceWithRootUserForCompatibility( + final String rootUserName, final String password) { + sourceParameters + .getAttribute() + .put(PipeSourceConstant.SOURCE_IOTDB_USER_ID, String.valueOf(IoTDBConstant.SUPER_USER_ID)); + sourceParameters.getAttribute().put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, rootUserName); + sourceParameters.getAttribute().put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, password); + } + + public void enrichWriteBackSinkWithRootUserForCompatibility( + final String rootUserName, final String password) { + sinkParameters + .getAttribute() + .put(PipeSinkConstant.SINK_IOTDB_USER_ID, String.valueOf(IoTDBConstant.SUPER_USER_ID)); + sinkParameters.getAttribute().put(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY, rootUserName); + sinkParameters.getAttribute().put(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY, password); + } + public ByteBuffer serialize() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);