diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java index 186426650fe50..dc4908ef2c38f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java @@ -59,6 +59,10 @@ public void serialize(DataOutputStream stream) throws IOException { deletion.serializeWithoutFileOffset(stream); } + public Deletion getModEntry() { + return this.deletion; + } + public static DeletionData deserialize(InputStream stream) throws IllegalPathException, IOException { return new DeletionData(Deletion.deserializeWithoutFileOffset(new DataInputStream(stream))); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java index af401f84aa777..4571bdb1531d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java @@ -156,7 +156,6 @@ private void processTimeChunkOrNonAlignedChunk(TsFileSequenceReader reader, byte long chunkOffset = reader.position(); timeChunkIndexOfCurrentValueColumn = pageIndex2TimesList.size(); consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData); - handleModification(offset2Deletions, chunkOffset); ChunkHeader header = reader.readChunkHeader(marker); String measurementId = header.getMeasurementID(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java index 37d38411ff671..1a407648a51f2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.load.LoadFileException; @@ -30,10 +31,15 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils; +import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; +import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; +import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; import org.apache.iotdb.db.storageengine.load.splitter.AlignedChunkData; +import org.apache.iotdb.db.storageengine.load.splitter.DeletionData; import org.apache.iotdb.db.storageengine.load.splitter.TsFileData; import org.apache.iotdb.db.storageengine.load.splitter.TsFileSplitter; @@ -54,7 +60,9 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -233,6 +241,84 @@ public void testCompactionFlushPageAndSplitByTimePartition() consumeChunkDataAndValidate(targetResource); } + @Test + public void testDeletionDataShouldOnlyBeGeneratedOnceAtEnd() + throws IOException, MetadataException, LoadFileException, IllegalPathException { + TsFileResource resource = createAlignedMultiDeviceFile(); + try (ModificationFile modificationFile = ModificationFile.getNormalMods(resource)) { + modificationFile.write( + new Deletion(new MeasurementPath("root.testsg.d0.s0"), Long.MAX_VALUE, 100)); + modificationFile.write( + new Deletion(new MeasurementPath("root.testsg.d0.s1"), Long.MAX_VALUE, 200)); + modificationFile.write( + new Deletion(new MeasurementPath("root.testsg.d1.s0"), Long.MAX_VALUE, 300)); + modificationFile.write( + new Deletion(new MeasurementPath("root.testsg.d1.s1"), Long.MAX_VALUE, 400)); + } + + List expectedMods = + new ArrayList<>(ModificationFile.getNormalMods(resource).getModifications()); + List deletionMods = new ArrayList<>(); + File actualModsFile = new File(resource.getTsFilePath() + ".mods"); + try (ModificationFile actualModificationFile = + new ModificationFile(actualModsFile.getAbsolutePath())) { + TsFileSplitter splitter = + new TsFileSplitter( + resource.getTsFile(), + tsFileData -> { + if (tsFileData instanceof DeletionData) { + deletionMods.add(((DeletionData) tsFileData).getModEntry()); + } + return true; + }); + splitter.splitTsFileByDataPartition(); + } + + List actualMods; + try (ModificationFile actualModificationFile = + new ModificationFile(actualModsFile.getAbsolutePath())) { + actualMods = new ArrayList<>(actualModificationFile.getModifications()); + } + Assert.assertEquals(expectedMods, actualMods); + Files.deleteIfExists(actualModsFile.toPath()); + } + + private TsFileResource createAlignedMultiDeviceFile() throws IOException { + TsFileResource resource = createEmptyFileAndResource(true); + try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { + writer.startChunkGroup("d0"); + writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue( + Arrays.asList("s0", "s1"), + new TimeRange[][] { + new TimeRange[] {new TimeRange(1, 100), new TimeRange(200, 300)}, + new TimeRange[] { + new TimeRange(604799900, 604800020), new TimeRange(604810020, 604820020) + } + }, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(false, false)); + writer.endChunkGroup(); + + writer.startChunkGroup("d1"); + writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue( + Arrays.asList("s0", "s1"), + new TimeRange[][] { + new TimeRange[] {new TimeRange(1, 100), new TimeRange(200, 300)}, + new TimeRange[] { + new TimeRange(604799900, 604800020), new TimeRange(604810020, 604820020) + } + }, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(false, false)); + writer.endChunkGroup(); + + writer.endFile(); + } + return resource; + } + private TsFileResource performCompaction() throws StorageEngineException, IOException,