Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,30 @@ public PipeRowCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
super(pipeTaskMeta, sourceEvent);
}

public PipeRowCollector(
PipeTaskMeta pipeTaskMeta,
EnrichedEvent sourceEvent,
String sourceEventDataBase,
Boolean isTableModel) {
super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
}

public PipeRowCollector(
PipeTaskMeta pipeTaskMeta,
EnrichedEvent sourceEvent,
String sourceEventDataBase,
Boolean isTableModel,
String rawTableModelDataBaseName,
String rawTreeModelDataBaseName) {
super(
pipeTaskMeta,
sourceEvent,
sourceEventDataBase,
isTableModel,
rawTableModelDataBaseName,
rawTreeModelDataBaseName);
}

@Override
public void collectRow(Row row) {
if (!(row instanceof PipeRow)) {
Expand Down Expand Up @@ -98,6 +122,10 @@ private void collectTabletInsertionEvent() {
PipeTabletUtils.compactBitMaps(tablet);
tabletInsertionEventList.add(
new PipeRawTabletInsertionEvent(
isTableModel,
sourceEventDataBaseName,
rawTableModelDataBaseName,
rawTreeModelDataBaseName,
tablet,
isAligned,
sourceEvent == null ? null : sourceEvent.getPipeName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,60 @@
protected boolean isAligned = false;
protected final PipeTaskMeta pipeTaskMeta; // Used to report progress
protected final EnrichedEvent sourceEvent; // Used to report progress
protected String sourceEventDataBaseName;
protected Boolean isTableModel;
protected String rawTableModelDataBaseName;
protected String rawTreeModelDataBaseName;

public PipeRawTabletEventConverter(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
this.pipeTaskMeta = pipeTaskMeta;
this.sourceEvent = sourceEvent;
if (sourceEvent instanceof PipeRawTabletInsertionEvent) {
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) sourceEvent;
sourceEventDataBaseName = pipeRawTabletInsertionEvent.getSourceDatabaseNameFromDataRegion();
isTableModel = pipeRawTabletInsertionEvent.getRawIsTableModelEvent();
rawTableModelDataBaseName = pipeRawTabletInsertionEvent.getRawTableModelDataBase();
rawTreeModelDataBaseName = pipeRawTabletInsertionEvent.getRawTreeModelDataBase();
} else {
sourceEventDataBaseName = null;
isTableModel = null;
rawTableModelDataBaseName = null;
rawTreeModelDataBaseName = null;
}
}

public PipeRawTabletEventConverter(

Check warning on line 58 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Change the visibility of this constructor to "protected".

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ7Tb6_IcgLJX1X58jIu&open=AZ7Tb6_IcgLJX1X58jIu&pullRequest=17965
PipeTaskMeta pipeTaskMeta,
EnrichedEvent sourceEvent,
String sourceEventDataBase,
Boolean isTableModel) {
this(pipeTaskMeta, sourceEvent);
this.sourceEventDataBaseName = sourceEventDataBase;
this.isTableModel = isTableModel;
}

public PipeRawTabletEventConverter(

Check warning on line 68 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Change the visibility of this constructor to "protected".

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ7Tb6_JcgLJX1X58jIv&open=AZ7Tb6_JcgLJX1X58jIv&pullRequest=17965
PipeTaskMeta pipeTaskMeta,
EnrichedEvent sourceEvent,
String sourceEventDataBase,
Boolean isTableModel,
String rawTableModelDataBaseName,
String rawTreeModelDataBaseName) {
this(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
this.rawTableModelDataBaseName = rawTableModelDataBaseName;
this.rawTreeModelDataBaseName = rawTreeModelDataBaseName;
}

public void resetDatabaseInfo(
final String sourceEventDataBaseName,
final Boolean isTableModel,
final String rawTableModelDataBaseName,
final String rawTreeModelDataBaseName) {
this.sourceEventDataBaseName = sourceEventDataBaseName;
this.isTableModel = isTableModel;
this.rawTableModelDataBaseName = rawTableModelDataBaseName;
this.rawTreeModelDataBaseName = rawTreeModelDataBaseName;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
private Tablet tablet;
private String deviceId; // Only used when the tablet is released.
private final boolean isAligned;
private final Boolean isTableModelEvent;
private final String sourceDatabaseNameFromDataRegion;
private final String tableModelDatabaseName;
private final String treeModelDatabaseName;

private final EnrichedEvent sourceEvent;
private boolean needToReport;
Expand All @@ -65,6 +69,10 @@
private volatile ProgressIndex overridingProgressIndex;

private PipeRawTabletInsertionEvent(
final Boolean isTableModelEvent,
final String sourceDatabaseNameFromDataRegion,
final String tableModelDatabaseName,
final String treeModelDatabaseName,
final Tablet tablet,
final boolean isAligned,
final EnrichedEvent sourceEvent,
Expand All @@ -78,6 +86,10 @@
super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
this.tablet = Objects.requireNonNull(tablet);
this.isAligned = isAligned;
this.isTableModelEvent = isTableModelEvent;
this.sourceDatabaseNameFromDataRegion = sourceDatabaseNameFromDataRegion;
this.tableModelDatabaseName = tableModelDatabaseName;
this.treeModelDatabaseName = treeModelDatabaseName;
this.sourceEvent = sourceEvent;
this.needToReport = needToReport;

Expand All @@ -95,7 +107,11 @@
}
}

public PipeRawTabletInsertionEvent(

Check warning on line 110 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Constructor has 11 parameters, which is greater than 7 authorized.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ7Tb7A0cgLJX1X58jIw&open=AZ7Tb7A0cgLJX1X58jIw&pullRequest=17965
final Boolean isTableModelEvent,
final String sourceDatabaseNameFromDataRegion,
final String tableModelDatabaseName,
final String treeModelDatabaseName,
final Tablet tablet,
final boolean isAligned,
final String pipeName,
Expand All @@ -104,6 +120,35 @@
final EnrichedEvent sourceEvent,
final boolean needToReport) {
this(
isTableModelEvent,
sourceDatabaseNameFromDataRegion,
tableModelDatabaseName,
treeModelDatabaseName,
tablet,
isAligned,
sourceEvent,
needToReport,
pipeName,
creationTime,
pipeTaskMeta,
null,
Long.MIN_VALUE,
Long.MAX_VALUE);
}

public PipeRawTabletInsertionEvent(
final Tablet tablet,
final boolean isAligned,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final EnrichedEvent sourceEvent,
final boolean needToReport) {
this(
null,
null,
null,
null,
tablet,
isAligned,
sourceEvent,
Expand All @@ -118,19 +163,49 @@

@TestOnly
public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean isAligned) {
this(tablet, isAligned, null, false, null, 0, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
this(
null,
null,
null,
null,
tablet,
isAligned,
null,
false,
null,
0,
null,
null,
Long.MIN_VALUE,
Long.MAX_VALUE);
}

@TestOnly
public PipeRawTabletInsertionEvent(
final Tablet tablet, final boolean isAligned, final PipePattern pattern) {
this(tablet, isAligned, null, false, null, 0, null, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
this(
null,
null,
null,
null,
tablet,
isAligned,
null,
false,
null,
0,
null,
pattern,
Long.MIN_VALUE,
Long.MAX_VALUE);
}

@TestOnly
public PipeRawTabletInsertionEvent(
final Tablet tablet, final long startTime, final long endTime) {
this(tablet, false, null, false, null, 0, null, null, startTime, endTime);
this(
null, null, null, null, tablet, false, null, false, null, 0, null, null, startTime,
endTime);
}

@Override
Expand Down Expand Up @@ -219,6 +294,10 @@
final long startTime,
final long endTime) {
return new PipeRawTabletInsertionEvent(
isTableModelEvent,
sourceDatabaseNameFromDataRegion,
tableModelDatabaseName,
treeModelDatabaseName,
tablet,
isAligned,
sourceEvent,
Expand Down Expand Up @@ -278,6 +357,36 @@
return sourceEvent;
}

public boolean isTableModelEvent() {
return Boolean.TRUE.equals(isTableModelEvent);
}

public Boolean getRawIsTableModelEvent() {
return isTableModelEvent;
}

public String getSourceDatabaseNameFromDataRegion() {
return sourceDatabaseNameFromDataRegion;
}

public String getRawTableModelDataBase() {
return tableModelDatabaseName;
}

public String getRawTreeModelDataBase() {
return treeModelDatabaseName;
}

public String getTreeModelDatabaseName() {
return treeModelDatabaseName != null ? treeModelDatabaseName : sourceDatabaseNameFromDataRegion;
}

public String getTableModelDatabaseName() {
return tableModelDatabaseName != null
? tableModelDatabaseName
: sourceDatabaseNameFromDataRegion;
}

@Override
public boolean isShouldReportOnCommit() {
return shouldReportOnCommit && needToReport;
Expand Down Expand Up @@ -344,7 +453,17 @@

public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
return new PipeRawTabletInsertionEvent(
convertToTablet(), isAligned, pipeName, creationTime, pipeTaskMeta, this, needToReport);
isTableModelEvent,
sourceDatabaseNameFromDataRegion,
tableModelDatabaseName,
treeModelDatabaseName,
convertToTablet(),
isAligned,
pipeName,
creationTime,
pipeTaskMeta,
this,
needToReport);
}

public boolean hasNoNeedParsingAndIsEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,38 @@ public PipeTabletCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent)
super(pipeTaskMeta, sourceEvent);
}

public PipeTabletCollector(
PipeTaskMeta pipeTaskMeta,
EnrichedEvent sourceEvent,
String sourceEventDataBase,
Boolean isTableModel) {
super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
}

public PipeTabletCollector(
PipeTaskMeta pipeTaskMeta,
EnrichedEvent sourceEvent,
String sourceEventDataBase,
Boolean isTableModel,
String rawTableModelDataBaseName,
String rawTreeModelDataBaseName) {
super(
pipeTaskMeta,
sourceEvent,
sourceEventDataBase,
isTableModel,
rawTableModelDataBaseName,
rawTreeModelDataBaseName);
}

@Override
public void collectTablet(final Tablet tablet) {
tabletInsertionEventList.add(
new PipeRawTabletInsertionEvent(
isTableModel,
sourceEventDataBaseName,
rawTableModelDataBaseName,
rawTreeModelDataBaseName,
tablet,
isAligned,
sourceEvent == null ? null : sourceEvent.getPipeName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
private PipeTaskMeta pipeTaskMeta;
private long outputMaxDelayMilliseconds;
private long outputMinReportIntervalMilliseconds;
private String outputDatabase;
private String outputDatabaseWithPathSeparator;

private final Map<String, AggregatedResultOperator> outputName2OperatorMap = new HashMap<>();
Expand Down Expand Up @@ -213,7 +214,7 @@
PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_KEY,
PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_DEFAULT_VALUE)
* 1000;
final String outputDatabase =
outputDatabase =
parameters.getStringOrDefault(
PROCESSOR_OUTPUT_DATABASE_KEY, PROCESSOR_OUTPUT_DATABASE_DEFAULT_VALUE);
outputDatabaseWithPathSeparator =
Expand Down Expand Up @@ -409,10 +410,12 @@
}
}

private Map<String, Pair<Long, ByteBuffer>> processRow(

Check warning on line 413 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 90 to 64, Complexity from 16 to 14, Nesting Level from 4 to 2, Number of Variables from 12 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ7Tb7BRcgLJX1X58jIx&open=AZ7Tb7BRcgLJX1X58jIx&pullRequest=17965
final Row row, final RowCollector rowCollector, final AtomicReference<Exception> exception) {
final Map<String, Pair<Long, ByteBuffer>> resultMap = new HashMap<>();

resetOutputDatabaseForGeneratedEvent(rowCollector);

final long timestamp = row.getTime();
for (int index = 0, size = row.size(); index < size; ++index) {
// Do not calculate null values
Expand Down Expand Up @@ -564,6 +567,7 @@
pipeName2timeSeries2TimeSeriesRuntimeStateMap.get(pipeName).get(timeSeries);
synchronized (stateReference) {
final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, null);
resetOutputDatabaseForGeneratedEvent(rowCollector);
try {
collectWindowOutputs(
stateReference.get().forceOutput(), timeSeries, rowCollector);
Expand Down Expand Up @@ -596,6 +600,13 @@
eventCollector.collect(event);
}

private void resetOutputDatabaseForGeneratedEvent(final RowCollector rowCollector) {
if (!outputDatabase.isEmpty() && rowCollector instanceof PipeRowCollector) {
((PipeRowCollector) rowCollector)
.resetDatabaseInfo(outputDatabase, Boolean.FALSE, null, outputDatabase);
}
}

/**
* Collect {@link WindowOutput}s of a single timeSeries in one turn. The {@link TSDataType}s shall
* be the same because the {@link AggregatedResultOperator}s shall return the same value for the
Expand Down
Loading
Loading