From 03fac11923d93818bf2c8030a1be0df6492c90ad Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 16 Jun 2026 16:14:15 +0800 Subject: [PATCH] Fix aggregate write-back output database metadata (#17938) * Fix aggregate write-back output database metadata * Fixed i18n (cherry picked from commit 3f20717205688be0e3d98b7911bfc4284a1d95d3) --- .../event/common/row/PipeRowCollector.java | 28 ++++ .../tablet/PipeRawTabletEventConverter.java | 50 +++++++ .../tablet/PipeRawTabletInsertionEvent.java | 127 +++++++++++++++++- .../common/tablet/PipeTabletCollector.java | 28 ++++ .../aggregate/AggregateProcessor.java | 13 +- .../event/PipeTabletInsertionEventTest.java | 32 +++++ .../commons/partition/DataPartition.java | 16 ++- 7 files changed, 288 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java index c26b05f6756b3..5cd7b1518bbd1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java @@ -45,6 +45,30 @@ public PipeRowCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) { super(pipeTaskMeta, sourceEvent); } + public PipeRowCollector( + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent, + String sourceEventDataBase, + Boolean isTableModel) { + super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel); + } + + public PipeRowCollector( + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent, + String sourceEventDataBase, + Boolean isTableModel, + String rawTableModelDataBaseName, + String rawTreeModelDataBaseName) { + super( + pipeTaskMeta, + sourceEvent, + sourceEventDataBase, + isTableModel, + rawTableModelDataBaseName, + rawTreeModelDataBaseName); + } + @Override public void collectRow(Row row) { if (!(row instanceof PipeRow)) { @@ -98,6 +122,10 @@ private void collectTabletInsertionEvent() { PipeTabletUtils.compactBitMaps(tablet); tabletInsertionEventList.add( new PipeRawTabletInsertionEvent( + isTableModel, + sourceEventDataBaseName, + rawTableModelDataBaseName, + rawTreeModelDataBaseName, tablet, isAligned, sourceEvent == null ? null : sourceEvent.getPipeName(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java index 829c5304dabaa..d5277cb977c89 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java @@ -32,10 +32,60 @@ public abstract class PipeRawTabletEventConverter implements DataCollector { protected boolean isAligned = false; protected final PipeTaskMeta pipeTaskMeta; // Used to report progress protected final EnrichedEvent sourceEvent; // Used to report progress + protected String sourceEventDataBaseName; + protected Boolean isTableModel; + protected String rawTableModelDataBaseName; + protected String rawTreeModelDataBaseName; public PipeRawTabletEventConverter(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) { this.pipeTaskMeta = pipeTaskMeta; this.sourceEvent = sourceEvent; + if (sourceEvent instanceof PipeRawTabletInsertionEvent) { + final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = + (PipeRawTabletInsertionEvent) sourceEvent; + sourceEventDataBaseName = pipeRawTabletInsertionEvent.getSourceDatabaseNameFromDataRegion(); + isTableModel = pipeRawTabletInsertionEvent.getRawIsTableModelEvent(); + rawTableModelDataBaseName = pipeRawTabletInsertionEvent.getRawTableModelDataBase(); + rawTreeModelDataBaseName = pipeRawTabletInsertionEvent.getRawTreeModelDataBase(); + } else { + sourceEventDataBaseName = null; + isTableModel = null; + rawTableModelDataBaseName = null; + rawTreeModelDataBaseName = null; + } + } + + public PipeRawTabletEventConverter( + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent, + String sourceEventDataBase, + Boolean isTableModel) { + this(pipeTaskMeta, sourceEvent); + this.sourceEventDataBaseName = sourceEventDataBase; + this.isTableModel = isTableModel; + } + + public PipeRawTabletEventConverter( + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent, + String sourceEventDataBase, + Boolean isTableModel, + String rawTableModelDataBaseName, + String rawTreeModelDataBaseName) { + this(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel); + this.rawTableModelDataBaseName = rawTableModelDataBaseName; + this.rawTreeModelDataBaseName = rawTreeModelDataBaseName; + } + + public void resetDatabaseInfo( + final String sourceEventDataBaseName, + final Boolean isTableModel, + final String rawTableModelDataBaseName, + final String rawTreeModelDataBaseName) { + this.sourceEventDataBaseName = sourceEventDataBaseName; + this.isTableModel = isTableModel; + this.rawTableModelDataBaseName = rawTableModelDataBaseName; + this.rawTreeModelDataBaseName = rawTreeModelDataBaseName; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 261e3c7a8b324..6d6995f0231e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -54,6 +54,10 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent private Tablet tablet; private String deviceId; // Only used when the tablet is released. private final boolean isAligned; + private final Boolean isTableModelEvent; + private final String sourceDatabaseNameFromDataRegion; + private final String tableModelDatabaseName; + private final String treeModelDatabaseName; private final EnrichedEvent sourceEvent; private boolean needToReport; @@ -65,6 +69,10 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent private volatile ProgressIndex overridingProgressIndex; private PipeRawTabletInsertionEvent( + final Boolean isTableModelEvent, + final String sourceDatabaseNameFromDataRegion, + final String tableModelDatabaseName, + final String treeModelDatabaseName, final Tablet tablet, final boolean isAligned, final EnrichedEvent sourceEvent, @@ -78,6 +86,10 @@ private PipeRawTabletInsertionEvent( super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime); this.tablet = Objects.requireNonNull(tablet); this.isAligned = isAligned; + this.isTableModelEvent = isTableModelEvent; + this.sourceDatabaseNameFromDataRegion = sourceDatabaseNameFromDataRegion; + this.tableModelDatabaseName = tableModelDatabaseName; + this.treeModelDatabaseName = treeModelDatabaseName; this.sourceEvent = sourceEvent; this.needToReport = needToReport; @@ -96,6 +108,10 @@ private PipeRawTabletInsertionEvent( } public PipeRawTabletInsertionEvent( + final Boolean isTableModelEvent, + final String sourceDatabaseNameFromDataRegion, + final String tableModelDatabaseName, + final String treeModelDatabaseName, final Tablet tablet, final boolean isAligned, final String pipeName, @@ -104,6 +120,35 @@ public PipeRawTabletInsertionEvent( final EnrichedEvent sourceEvent, final boolean needToReport) { this( + isTableModelEvent, + sourceDatabaseNameFromDataRegion, + tableModelDatabaseName, + treeModelDatabaseName, + tablet, + isAligned, + sourceEvent, + needToReport, + pipeName, + creationTime, + pipeTaskMeta, + null, + Long.MIN_VALUE, + Long.MAX_VALUE); + } + + public PipeRawTabletInsertionEvent( + final Tablet tablet, + final boolean isAligned, + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta, + final EnrichedEvent sourceEvent, + final boolean needToReport) { + this( + null, + null, + null, + null, tablet, isAligned, sourceEvent, @@ -118,19 +163,49 @@ public PipeRawTabletInsertionEvent( @TestOnly public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean isAligned) { - this(tablet, isAligned, null, false, null, 0, null, null, Long.MIN_VALUE, Long.MAX_VALUE); + this( + null, + null, + null, + null, + tablet, + isAligned, + null, + false, + null, + 0, + null, + null, + Long.MIN_VALUE, + Long.MAX_VALUE); } @TestOnly public PipeRawTabletInsertionEvent( final Tablet tablet, final boolean isAligned, final PipePattern pattern) { - this(tablet, isAligned, null, false, null, 0, null, pattern, Long.MIN_VALUE, Long.MAX_VALUE); + this( + null, + null, + null, + null, + tablet, + isAligned, + null, + false, + null, + 0, + null, + pattern, + Long.MIN_VALUE, + Long.MAX_VALUE); } @TestOnly public PipeRawTabletInsertionEvent( final Tablet tablet, final long startTime, final long endTime) { - this(tablet, false, null, false, null, 0, null, null, startTime, endTime); + this( + null, null, null, null, tablet, false, null, false, null, 0, null, null, startTime, + endTime); } @Override @@ -219,6 +294,10 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final long startTime, final long endTime) { return new PipeRawTabletInsertionEvent( + isTableModelEvent, + sourceDatabaseNameFromDataRegion, + tableModelDatabaseName, + treeModelDatabaseName, tablet, isAligned, sourceEvent, @@ -278,6 +357,36 @@ public EnrichedEvent getSourceEvent() { return sourceEvent; } + public boolean isTableModelEvent() { + return Boolean.TRUE.equals(isTableModelEvent); + } + + public Boolean getRawIsTableModelEvent() { + return isTableModelEvent; + } + + public String getSourceDatabaseNameFromDataRegion() { + return sourceDatabaseNameFromDataRegion; + } + + public String getRawTableModelDataBase() { + return tableModelDatabaseName; + } + + public String getRawTreeModelDataBase() { + return treeModelDatabaseName; + } + + public String getTreeModelDatabaseName() { + return treeModelDatabaseName != null ? treeModelDatabaseName : sourceDatabaseNameFromDataRegion; + } + + public String getTableModelDatabaseName() { + return tableModelDatabaseName != null + ? tableModelDatabaseName + : sourceDatabaseNameFromDataRegion; + } + @Override public boolean isShouldReportOnCommit() { return shouldReportOnCommit && needToReport; @@ -344,7 +453,17 @@ public long count() { public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() { return new PipeRawTabletInsertionEvent( - convertToTablet(), isAligned, pipeName, creationTime, pipeTaskMeta, this, needToReport); + isTableModelEvent, + sourceDatabaseNameFromDataRegion, + tableModelDatabaseName, + treeModelDatabaseName, + convertToTablet(), + isAligned, + pipeName, + creationTime, + pipeTaskMeta, + this, + needToReport); } public boolean hasNoNeedParsingAndIsEmpty() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java index 3888988e67ed0..b9da59f311118 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java @@ -31,10 +31,38 @@ public PipeTabletCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) super(pipeTaskMeta, sourceEvent); } + public PipeTabletCollector( + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent, + String sourceEventDataBase, + Boolean isTableModel) { + super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel); + } + + public PipeTabletCollector( + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent, + String sourceEventDataBase, + Boolean isTableModel, + String rawTableModelDataBaseName, + String rawTreeModelDataBaseName) { + super( + pipeTaskMeta, + sourceEvent, + sourceEventDataBase, + isTableModel, + rawTableModelDataBaseName, + rawTreeModelDataBaseName); + } + @Override public void collectTablet(final Tablet tablet) { tabletInsertionEventList.add( new PipeRawTabletInsertionEvent( + isTableModel, + sourceEventDataBaseName, + rawTableModelDataBaseName, + rawTreeModelDataBaseName, tablet, isAligned, sourceEvent == null ? null : sourceEvent.getPipeName(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java index 191ec7ee71235..b5d151e0a8a65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java @@ -112,6 +112,7 @@ public class AggregateProcessor implements PipeProcessor { private PipeTaskMeta pipeTaskMeta; private long outputMaxDelayMilliseconds; private long outputMinReportIntervalMilliseconds; + private String outputDatabase; private String outputDatabaseWithPathSeparator; private final Map outputName2OperatorMap = new HashMap<>(); @@ -213,7 +214,7 @@ public void customize( PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_KEY, PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_DEFAULT_VALUE) * 1000; - final String outputDatabase = + outputDatabase = parameters.getStringOrDefault( PROCESSOR_OUTPUT_DATABASE_KEY, PROCESSOR_OUTPUT_DATABASE_DEFAULT_VALUE); outputDatabaseWithPathSeparator = @@ -413,6 +414,8 @@ private Map> processRow( final Row row, final RowCollector rowCollector, final AtomicReference exception) { final Map> resultMap = new HashMap<>(); + resetOutputDatabaseForGeneratedEvent(rowCollector); + final long timestamp = row.getTime(); for (int index = 0, size = row.size(); index < size; ++index) { // Do not calculate null values @@ -564,6 +567,7 @@ public void process(final Event event, final EventCollector eventCollector) thro pipeName2timeSeries2TimeSeriesRuntimeStateMap.get(pipeName).get(timeSeries); synchronized (stateReference) { final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, null); + resetOutputDatabaseForGeneratedEvent(rowCollector); try { collectWindowOutputs( stateReference.get().forceOutput(), timeSeries, rowCollector); @@ -596,6 +600,13 @@ public void process(final Event event, final EventCollector eventCollector) thro eventCollector.collect(event); } + private void resetOutputDatabaseForGeneratedEvent(final RowCollector rowCollector) { + if (!outputDatabase.isEmpty() && rowCollector instanceof PipeRowCollector) { + ((PipeRowCollector) rowCollector) + .resetDatabaseInfo(outputDatabase, Boolean.FALSE, null, outputDatabase); + } + } + /** * Collect {@link WindowOutput}s of a single timeSeries in one turn. The {@link TSDataType}s shall * be the same because the {@link AggregatedResultOperator}s shall return the same value for the diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index 308c5458dc9b9..8a290bd180310 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -22,6 +22,8 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern; +import org.apache.iotdb.db.pipe.event.common.row.PipeResetTabletRow; +import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.db.pipe.event.common.tablet.TabletInsertionDataContainer; @@ -41,6 +43,7 @@ import java.time.LocalDate; import java.util.Arrays; +import java.util.List; public class PipeTabletInsertionEventTest { @@ -320,6 +323,35 @@ public void convertToAlignedTabletForTest() { Assert.assertTrue(isAligned4); } + @Test + public void collectRowWithOverriddenTreeDatabaseForTest() { + final PipeRowCollector rowCollector = new PipeRowCollector(null, null, "root.test.sg_0", false); + rowCollector.resetDatabaseInfo("root.userResultDB", false, null, "root.userResultDB"); + + final MeasurementSchema[] outputSchemas = {new MeasurementSchema("avg", TSDataType.INT32)}; + rowCollector.collectRow( + new PipeResetTabletRow( + 0, + "root.userResultDB.d_0.s_1", + false, + outputSchemas, + new long[] {1L}, + new TSDataType[] {TSDataType.INT32}, + new Object[] {new int[] {1}}, + null, + new String[] {"avg"})); + + final List events = + rowCollector.convertToTabletInsertionEvents(false); + Assert.assertEquals(1, events.size()); + + final PipeRawTabletInsertionEvent event = (PipeRawTabletInsertionEvent) events.get(0); + Assert.assertEquals("root.userResultDB", event.getSourceDatabaseNameFromDataRegion()); + Assert.assertFalse(event.isTableModelEvent()); + Assert.assertEquals("root.userResultDB", event.getTreeModelDatabaseName()); + Assert.assertEquals("root.userResultDB.d_0.s_1", event.convertToTablet().deviceId); + } + @Test public void convertToTabletSkipsUnnecessaryBitMapsForTest() throws Exception { final BitMap[] bitMaps = new BitMap[schemas.length]; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java index 8993170107200..cf88cc630e3c8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java @@ -172,8 +172,20 @@ public List getDataRegionReplicaSetForWriting( List dataRegionReplicaSets = new ArrayList<>(); Map>> dataBasePartitionMap = dataPartitionMap.get(storageGroup); + if (dataBasePartitionMap == null) { + throw new RuntimeException( + "Database " + + storageGroup + + " not exists and failed to create automatically because enable_auto_create_schema is FALSE."); + } Map> slotReplicaSetMap = dataBasePartitionMap.get(seriesPartitionSlot); + if (slotReplicaSetMap == null) { + throw new RuntimeException( + String.format( + "Data partition is empty. device: %s, seriesSlot: %s, database: %s", + deviceName, seriesPartitionSlot, storageGroup)); + } for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) { List targetRegionList = slotReplicaSetMap.get(timePartitionSlot); if (targetRegionList == null || targetRegionList.isEmpty()) { @@ -199,7 +211,9 @@ public TRegionReplicaSet getDataRegionReplicaSetForWriting( databasePartitionMap = dataPartitionMap.get(storageGroup); if (databasePartitionMap == null) { throw new RuntimeException( - "Database not exists and failed to create automatically because enable_auto_create_schema is FALSE."); + "Database " + + storageGroup + + " not exists and failed to create automatically because enable_auto_create_schema is FALSE."); } List regions = databasePartitionMap.get(seriesPartitionSlot).get(timePartitionSlot);