Skip to content

Commit

Permalink
fix check data type and add load IT
Browse files Browse the repository at this point in the history
  • Loading branch information
jt2863838 committed Jan 14, 2025
1 parent 7eff395 commit d07c818
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 80,7 @@ public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
@Override
public DataNodeConfig setCompactionScheduleIntervalInMs(int compactionScheduleIntervalInMs) {
properties.setProperty(
"compaction_schedule_interval_in_ms",
String.valueOf(compactionScheduleIntervalInMs));
"compaction_schedule_interval_in_ms", String.valueOf(compactionScheduleIntervalInMs));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 19,6 @@

package org.apache.iotdb.relational.it.schema;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.commons.utils.MetadataUtils;
import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.isession.SessionDataSet;
Expand All @@ -34,18 30,32 @@
import org.apache.iotdb.rpc.StatementExecutionException;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.ColumnSchema;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.record.Tablet.ColumnCategory;
import org.apache.tsfile.write.v4.ITsFileWriter;
import org.apache.tsfile.write.v4.TsFileWriterBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.iotdb.relational.it.session.IoTDBSessionRelationalIT.genValue;
import static org.junit.Assert.assertEquals;
Expand All @@ -54,6 64,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@SuppressWarnings("ResultOfMethodCallIgnored")
@RunWith(IoTDBTestRunner.class)
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
public class IoTDBAlterColumnTypeIT {
Expand Down Expand Up @@ -461,7 472,8 @@ public void testContinuousAlter() throws IoTDBConnectionException, StatementExec
tablet.reset();

// time=3 and time=4 are FLOAT
session.executeNonQueryStatement("ALTER TABLE alter_and_alter ALTER COLUMN s1 SET DATA TYPE FLOAT");
session.executeNonQueryStatement(
"ALTER TABLE alter_and_alter ALTER COLUMN s1 SET DATA TYPE FLOAT");
tablet =
new Tablet(
"alter_and_alter",
Expand All @@ -486,7 498,6 @@ public void testContinuousAlter() throws IoTDBConnectionException, StatementExec
session.insert(tablet);
tablet.reset();


// time=5 and time=6 are DOUBLE
session.executeNonQueryStatement(
"ALTER TABLE alter_and_alter ALTER COLUMN s1 SET DATA TYPE DOUBLE");
Expand Down Expand Up @@ -530,29 541,35 @@ public void testContinuousAlter() throws IoTDBConnectionException, StatementExec
public void testConcurrentWriteAndAlter()
throws IoTDBConnectionException, StatementExecutionException, InterruptedException {
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS concurrent_write_and_alter (s1 int32)");
// session.executeNonQueryStatement("SET CONFIGURATION enable_seq_space_compaction='false'");
session.executeNonQueryStatement(
"CREATE TABLE IF NOT EXISTS concurrent_write_and_alter (s1 int32)");
// session.executeNonQueryStatement("SET CONFIGURATION
// enable_seq_space_compaction='false'");
}

ExecutorService threadPool = Executors.newCachedThreadPool();
AtomicInteger writeCounter = new AtomicInteger(0);
int maxWrite = 10000;
int flushInterval = 100;
int alterStart = 5000;
threadPool.submit(() -> {
try {
write(writeCounter, maxWrite, flushInterval);
} catch (IoTDBConnectionException | StatementExecutionException e) {
throw new RuntimeException(e);
}
});
threadPool.submit(() -> {
try {
alter(writeCounter, alterStart);
} catch (InterruptedException | IoTDBConnectionException | StatementExecutionException e) {
throw new RuntimeException(e);
}
});
threadPool.submit(
() -> {
try {
write(writeCounter, maxWrite, flushInterval);
} catch (IoTDBConnectionException | StatementExecutionException e) {
throw new RuntimeException(e);
}
});
threadPool.submit(
() -> {
try {
alter(writeCounter, alterStart);
} catch (InterruptedException
| IoTDBConnectionException
| StatementExecutionException e) {
throw new RuntimeException(e);
}
});
threadPool.shutdown();
assertTrue(threadPool.awaitTermination(1, TimeUnit.MINUTES));

Expand All @@ -571,20 588,239 @@ private void write(AtomicInteger writeCounter, int maxWrite, int flushInterval)
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
int writtenCnt = 0;
do {
session.executeNonQueryStatement(String.format("INSERT INTO concurrent_write_and_alter (time, s1) VALUES (%d, %d)", writtenCnt, writtenCnt));
session.executeNonQueryStatement(
String.format(
"INSERT INTO concurrent_write_and_alter (time, s1) VALUES (%d, %d)",
writtenCnt, writtenCnt));
if (((writtenCnt 1) % flushInterval) == 0) {
session.executeNonQueryStatement("FLUSH");
}
} while ((writtenCnt = writeCounter.incrementAndGet()) < maxWrite);
}
}

private void alter(AtomicInteger writeCounter, int alterStart) throws InterruptedException, IoTDBConnectionException, StatementExecutionException {
private void alter(AtomicInteger writeCounter, int alterStart)
throws InterruptedException, IoTDBConnectionException, StatementExecutionException {
while (writeCounter.get() < alterStart) {
Thread.sleep(10);
}
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
session.executeNonQueryStatement("ALTER TABLE concurrent_write_and_alter ALTER COLUMN s1 SET DATA TYPE DOUBLE");
session.executeNonQueryStatement(
"ALTER TABLE concurrent_write_and_alter ALTER COLUMN s1 SET DATA TYPE DOUBLE");
}
}

@Test
public void testLoadAndAlter()
throws IoTDBConnectionException,
StatementExecutionException,
IOException,
WriteProcessException {
// file1-file4 s1=INT32
TableSchema schema1 =
new TableSchema(
"load_and_alter",
Arrays.asList(
new ColumnSchema("dId", TSDataType.STRING, ColumnCategory.TAG),
new ColumnSchema("s1", TSDataType.INT32, ColumnCategory.FIELD)));
// file1-file3 single device small range ([1, 1]), may load without split
List<File> filesToLoad = new ArrayList<>();
for (int i = 1; i <= 3; i ) {
File file = new File("target", "f" i ".tsfile");
try (ITsFileWriter tsFileWriter =
new TsFileWriterBuilder().file(file).tableSchema(schema1).build()) {
Tablet tablet =
new Tablet(
schema1.getTableName(),
Arrays.asList("dId", "s1"),
Arrays.asList(TSDataType.STRING, TSDataType.INT32),
Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD));
tablet.addTimestamp(0, 1);
tablet.addValue("dId", 0, "d" i);
tablet.addValue("s1", 0, 1);
tsFileWriter.write(tablet);
}
filesToLoad.add(file);
}
// file4 multi device large range ([2, 100_000_000]), load with split
File file = new File("target", "f" 4 ".tsfile");
try (ITsFileWriter tsFileWriter =
new TsFileWriterBuilder().file(file).tableSchema(schema1).build()) {
Tablet tablet =
new Tablet(
schema1.getTableName(),
Arrays.asList("dId", "s1"),
Arrays.asList(TSDataType.STRING, TSDataType.INT32),
Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD));
int rowIndex = 0;
for (int i = 1; i <= 3; i ) {
tablet.addTimestamp(rowIndex, 2);
tablet.addValue("dId", rowIndex, "d" i);
tablet.addValue("s1", rowIndex, 2);
rowIndex ;
tablet.addTimestamp(rowIndex, 100_000_000);
tablet.addValue("dId", rowIndex, "d" i);
tablet.addValue("s1", rowIndex, 100_000_000);
rowIndex ;
}
tsFileWriter.write(tablet);
}
filesToLoad.add(file);

// load file1-file4
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
for (File f : filesToLoad) {
session.executeNonQueryStatement("LOAD '" f.getAbsolutePath() "'");
}
}
// check load result
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
SessionDataSet dataSet =
session.executeQueryStatement("select count(s1) from load_and_alter");
RowRecord rec;
rec = dataSet.next();
assertEquals(9, rec.getFields().get(0).getLongV());
assertFalse(dataSet.hasNext());
}

filesToLoad.forEach(File::delete);
filesToLoad.clear();

// file5-file8 s1=DOUBLE
TableSchema schema2 =
new TableSchema(
"load_and_alter",
Arrays.asList(
new ColumnSchema("dId", TSDataType.STRING, ColumnCategory.TAG),
new ColumnSchema("s1", TSDataType.DOUBLE, ColumnCategory.FIELD)));
// file5-file7 single device small range ([3, 3]), may load without split
for (int i = 5; i <= 7; i ) {
file = new File("target", "f" i ".tsfile");
try (ITsFileWriter tsFileWriter =
new TsFileWriterBuilder().file(file).tableSchema(schema2).build()) {
Tablet tablet =
new Tablet(
schema2.getTableName(),
Arrays.asList("dId", "s1"),
Arrays.asList(TSDataType.STRING, TSDataType.DOUBLE),
Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD));
tablet.addTimestamp(0, 3);
tablet.addValue("dId", 0, "d" i);
tablet.addValue("s1", 0, 3.0);
tsFileWriter.write(tablet);
}
filesToLoad.add(file);
}
// file8 multi device large range ([4, 100_000_001]), load with split
file = new File("target", "f" 8 ".tsfile");
try (ITsFileWriter tsFileWriter =
new TsFileWriterBuilder().file(file).tableSchema(schema2).build()) {
Tablet tablet =
new Tablet(
schema1.getTableName(),
Arrays.asList("dId", "s1"),
Arrays.asList(TSDataType.STRING, TSDataType.DOUBLE),
Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD));
int rowIndex = 0;
for (int i = 1; i <= 3; i ) {
tablet.addTimestamp(rowIndex, 4);
tablet.addValue("dId", rowIndex, "d" i);
tablet.addValue("s1", rowIndex, 4.0);
rowIndex ;
tablet.addTimestamp(rowIndex, 100_000_001);
tablet.addValue("dId", rowIndex, "d" i);
tablet.addValue("s1", rowIndex, 100_000_001.0);
rowIndex ;
}
tsFileWriter.write(tablet);
}
filesToLoad.add(file);

// load file5-file8
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
for (File f : filesToLoad) {
session.executeNonQueryStatement("LOAD '" f.getAbsolutePath() "'");
}
}
// check load result
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
SessionDataSet dataSet =
session.executeQueryStatement("select count(s1) from load_and_alter");
RowRecord rec;
rec = dataSet.next();
assertEquals(18, rec.getFields().get(0).getLongV());
assertFalse(dataSet.hasNext());
}

// alter s1 to double
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
session.executeNonQueryStatement(
"ALTER TABLE load_and_alter ALTER COLUMN s1 SET DATA TYPE DOUBLE");
}

filesToLoad.forEach(File::delete);
filesToLoad.clear();

// file9-file12 s1=INT32
// file9-file11 single device small range ([5, 5]), may load without split
for (int i = 9; i <= 11; i ) {
file = new File("target", "f" i ".tsfile");
try (ITsFileWriter tsFileWriter =
new TsFileWriterBuilder().file(file).tableSchema(schema1).build()) {
Tablet tablet =
new Tablet(
schema1.getTableName(),
Arrays.asList("dId", "s1"),
Arrays.asList(TSDataType.STRING, TSDataType.INT32),
Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD));
tablet.addTimestamp(0, 5);
tablet.addValue("dId", 0, "d" i);
tablet.addValue("s1", 0, 5);
tsFileWriter.write(tablet);
}
filesToLoad.add(file);
}
// file12 multi device large range ([6, 100_000_002]), load with split
file = new File("target", "f" 12 ".tsfile");
try (ITsFileWriter tsFileWriter =
new TsFileWriterBuilder().file(file).tableSchema(schema1).build()) {
Tablet tablet =
new Tablet(
schema1.getTableName(),
Arrays.asList("dId", "s1"),
Arrays.asList(TSDataType.STRING, TSDataType.INT32),
Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD));
int rowIndex = 0;
for (int i = 1; i <= 3; i ) {
tablet.addTimestamp(rowIndex, 6);
tablet.addValue("dId", rowIndex, "d" i);
tablet.addValue("s1", rowIndex, 6);
rowIndex ;
tablet.addTimestamp(rowIndex, 100_000_002);
tablet.addValue("dId", rowIndex, "d" i);
tablet.addValue("s1", rowIndex, 100_000_002);
rowIndex ;
}
tsFileWriter.write(tablet);
}
filesToLoad.add(file);

// load file9-file12, should succeed
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
for (File f : filesToLoad) {
session.executeNonQueryStatement("LOAD '" f.getAbsolutePath() "'");
}
}
// check load result
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
SessionDataSet dataSet =
session.executeQueryStatement("select count(s1) from load_and_alter");
RowRecord rec;
rec = dataSet.next();
assertEquals(27, rec.getFields().get(0).getLongV());
assertFalse(dataSet.hasNext());
}

filesToLoad.forEach(File::delete);
}
}
Loading

0 comments on commit d07c818

Please sign in to comment.