From 2faa54bd5e7ebea4f75744ed7379dddafb244b83 Mon Sep 17 00:00:00 2001 From: DaZuiZui Date: Mon, 22 Jun 2026 15:50:03 +0800 Subject: [PATCH 1/3] Implement FFT table function --- .../it/db/it/IoTDBFFTTableFunctionIT.java | 223 ++++++ .../analyzer/StatementAnalyzer.java | 67 +- .../relational/planner/RelationPlanner.java | 3 + .../analyzer/TableFunctionTest.java | 136 ++++ iotdb-core/node-commons/pom.xml | 4 + .../function/TableBuiltinTableFunction.java | 4 + .../relational/tvf/FFTTableFunction.java | 731 ++++++++++++++++++ .../relational/tvf/FFTTableFunctionTest.java | 246 ++++++ 8 files changed, 1413 insertions(+), 1 deletion(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBFFTTableFunctionIT.java create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunction.java create mode 100644 iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunctionTest.java diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBFFTTableFunctionIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBFFTTableFunctionIT.java new file mode 100644 index 0000000000000..8eb95c9022578 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBFFTTableFunctionIT.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.db.it; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.Statement; + +import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail; +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBFFTTableFunctionIT { + + private static final String DATABASE_NAME = "test_fft"; + private static final String[] SQLS = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + "CREATE TABLE signal(device_id STRING TAG, temperature DOUBLE FIELD, speed INT32 FIELD, note STRING FIELD)", + "INSERT INTO signal(time, device_id, temperature, speed, note) VALUES (0, 'd1', 1.0, 1, 'ok')", + "INSERT INTO signal(time, device_id, temperature, speed, note) VALUES (1000, 'd1', 0.0, 2, 'ok')", + "INSERT INTO signal(time, device_id, temperature, speed, note) VALUES (2000, 'd1', 0.0, 3, 'ok')", + "INSERT INTO signal(time, device_id, temperature, speed, note) VALUES (3000, 'd1', 0.0, 4, 'ok')", + "INSERT INTO signal(time, device_id, temperature, speed, note) VALUES (0, 'd2', 2.0, 4, 'ok')", + "INSERT INTO signal(time, device_id, temperature, speed, note) VALUES (1000, 'd2', 0.0, 3, 'ok')", + "INSERT INTO signal(time, device_id, temperature, speed, note) VALUES (2000, 'd2', 0.0, 2, 'ok')", + "INSERT INTO signal(time, device_id, temperature, speed, note) VALUES (3000, 'd2', 0.0, 1, 'ok')", + "CREATE TABLE single_row(device_id STRING TAG, value DOUBLE FIELD)", + "INSERT INTO single_row(time, device_id, value) VALUES (0, 'd1', 1.0)", + "CREATE TABLE no_numeric(device_id STRING TAG, note STRING FIELD)", + "INSERT INTO no_numeric(time, device_id, note) VALUES (0, 'd1', 'ok')", + "CREATE TABLE with_null(device_id STRING TAG, value DOUBLE FIELD)", + "INSERT INTO with_null(time, device_id, value) VALUES (0, 'd1', 1.0)", + "INSERT INTO with_null(time, device_id, value) VALUES (1000, 'd1', null)", + "CREATE TABLE irregular(device_id STRING TAG, value DOUBLE FIELD)", + "INSERT INTO irregular(time, device_id, value) VALUES (0, 'd1', 1.0)", + "INSERT INTO irregular(time, device_id, value) VALUES (1000, 'd1', 2.0)", + "INSERT INTO irregular(time, device_id, value) VALUES (2500, 'd1', 3.0)", + "FLUSH" + }; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + insertData(); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + private static void insertData() { + String currentSql = null; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + for (String sql : SQLS) { + currentSql = sql; + statement.execute(sql); + } + } catch (Exception e) { + throw new AssertionError("insertData failed while executing [" + currentSql + "].", e); + } + } + + @Test + public void testFFTWithPartitionAndMultipleColumns() { + String[] expectedHeader = + new String[] { + "device_id", + "frequency_index", + "frequency", + "temperature_real", + "temperature_imag", + "speed_real", + "speed_imag" + }; + String[] retArray = + new String[] { + "d1,0,0.0,1.0,0.0,10.0,0.0,", + "d1,1,0.25,1.0,0.0,-2.0,2.0,", + "d1,2,-0.5,1.0,0.0,-2.0,0.0,", + "d1,3,-0.25,1.0,0.0,-2.0,-2.0,", + "d2,0,0.0,2.0,0.0,10.0,0.0,", + "d2,1,0.25,2.0,0.0,2.0,-2.0,", + "d2,2,-0.5,2.0,0.0,2.0,0.0,", + "d2,3,-0.25,2.0,0.0,2.0,2.0," + }; + + tableResultSetEqualTest( + "SELECT * FROM FFT(DATA => signal PARTITION BY device_id ORDER BY time, " + + "SAMPLE_INTERVAL => 1s, N => 4) ORDER BY device_id, frequency_index", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFFTDefaultNAndInferredSampleInterval() { + String[] expectedHeader = + new String[] {"frequency_index", "frequency", "temperature_real", "temperature_imag"}; + String[] retArray = + new String[] {"0,0.0,1.0,0.0,", "1,0.25,1.0,0.0,", "2,-0.5,1.0,0.0,", "3,-0.25,1.0,0.0,"}; + + tableResultSetEqualTest( + "SELECT frequency_index, frequency, temperature_real, temperature_imag " + + "FROM FFT(DATA => (SELECT time, temperature FROM signal WHERE device_id='d1') " + + "ORDER BY time) ORDER BY frequency_index", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFFTTruncateAndZeroPad() { + tableResultSetEqualTest( + "SELECT frequency_index, frequency, speed_real, speed_imag " + + "FROM FFT(DATA => (SELECT time, speed FROM signal WHERE device_id='d1') " + + "ORDER BY time, SAMPLE_INTERVAL => 1s, N => 2) ORDER BY frequency_index", + new String[] {"frequency_index", "frequency", "speed_real", "speed_imag"}, + new String[] {"0,0.0,3.0,0.0,", "1,-0.5,-1.0,0.0,"}, + DATABASE_NAME); + + tableResultSetEqualTest( + "SELECT frequency_index, frequency, temperature_real, temperature_imag " + + "FROM FFT(DATA => (SELECT time, temperature FROM signal WHERE device_id='d1') " + + "ORDER BY time, SAMPLE_INTERVAL => 1s, N => 8) ORDER BY frequency_index", + new String[] {"frequency_index", "frequency", "temperature_real", "temperature_imag"}, + new String[] { + "0,0.0,1.0,0.0,", + "1,0.125,1.0,0.0,", + "2,0.25,1.0,0.0,", + "3,0.375,1.0,0.0,", + "4,-0.5,1.0,0.0,", + "5,-0.375,1.0,0.0,", + "6,-0.25,1.0,0.0,", + "7,-0.125,1.0,0.0," + }, + DATABASE_NAME); + } + + @Test + public void testFFTNorm() { + tableResultSetEqualTest( + "SELECT frequency_index, temperature_real, temperature_imag " + + "FROM FFT(DATA => (SELECT time, temperature FROM signal WHERE device_id='d1') " + + "ORDER BY time, SAMPLE_INTERVAL => 1s, N => 4, NORM => 'forward') " + + "ORDER BY frequency_index", + new String[] {"frequency_index", "temperature_real", "temperature_imag"}, + new String[] {"0,0.25,0.0,", "1,0.25,0.0,", "2,0.25,0.0,", "3,0.25,0.0,"}, + DATABASE_NAME); + + tableResultSetEqualTest( + "SELECT frequency_index, temperature_real, temperature_imag " + + "FROM FFT(DATA => (SELECT time, temperature FROM signal WHERE device_id='d1') " + + "ORDER BY time, SAMPLE_INTERVAL => 1s, N => 4, NORM => 'ortho') " + + "ORDER BY frequency_index", + new String[] {"frequency_index", "temperature_real", "temperature_imag"}, + new String[] {"0,0.5,0.0,", "1,0.5,0.0,", "2,0.5,0.0,", "3,0.5,0.0,"}, + DATABASE_NAME); + } + + @Test + public void testFFTFailures() { + tableAssertTestFail( + "SELECT * FROM FFT(DATA => signal PARTITION BY device_id, SAMPLE_INTERVAL => 1s)", + "701: Table argument with set semantics requires an ORDER BY clause.", + DATABASE_NAME); + tableAssertTestFail( + "SELECT * FROM FFT(DATA => single_row PARTITION BY device_id ORDER BY time)", + "701: FFT requires at least two rows to infer SAMPLE_INTERVAL.", + DATABASE_NAME); + tableAssertTestFail( + "SELECT * FROM FFT(DATA => no_numeric PARTITION BY device_id ORDER BY time, SAMPLE_INTERVAL => 1s)", + "701: No numeric columns found for FFT calculation.", + DATABASE_NAME); + tableAssertTestFail( + "SELECT * FROM FFT(DATA => with_null PARTITION BY device_id ORDER BY time, SAMPLE_INTERVAL => 1s)", + "701: FFT does not support null values in column [value].", + DATABASE_NAME); + tableAssertTestFail( + "SELECT * FROM FFT(DATA => irregular PARTITION BY device_id ORDER BY time)", + "701: FFT requires evenly spaced input time values when SAMPLE_INTERVAL is not specified.", + DATABASE_NAME); + tableAssertTestFail( + "SELECT * FROM FFT(DATA => irregular PARTITION BY device_id ORDER BY time, SAMPLE_INTERVAL => 1s)", + "701: FFT input time interval must match the specified SAMPLE_INTERVAL.", + DATABASE_NAME); + tableAssertTestFail( + "SELECT * FROM FFT(DATA => signal PARTITION BY device_id ORDER BY time, N => 65537)", + "701: FFT transform length N must not exceed 65536.", + DATABASE_NAME); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index 5a24cd4f09a94..106915579757a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -121,6 +121,7 @@ import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.FFTTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.M4TableFunction; import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; import org.apache.iotdb.db.i18n.DataNodeQueryMessages; @@ -5165,7 +5166,7 @@ public Scope visitTableFunctionInvocation(TableFunctionInvocation node, Optional Scope argumentScope = analysis.getScope(argument.getRelation()); if (argument.isPassThroughColumns()) { argumentScope.getRelationType().getAllFields().forEach(fields::add); - } else if (!TableBuiltinTableFunction.M4.getFunctionName().equalsIgnoreCase(functionName) + } else if (!isPartitionColumnsProvidedByProperSchema(functionName) && argument.getPartitionBy().isPresent()) { argument.getPartitionBy().get().stream() .map(expression -> validateAndGetInputField(expression, argumentScope)) @@ -5297,9 +5298,73 @@ private ArgumentsAnalysis analyzeArguments( } } tryAppendM4ModeArgument(functionName, arguments, parameterSpecifications, passedArguments); + tryAppendFFTInternalArguments( + functionName, arguments, parameterSpecifications, passedArguments); return new ArgumentsAnalysis(passedArguments.buildOrThrow(), tableArgumentAnalyses.build()); } + private boolean isPartitionColumnsProvidedByProperSchema(String functionName) { + return TableBuiltinTableFunction.M4.getFunctionName().equalsIgnoreCase(functionName) + || TableBuiltinTableFunction.FFT.getFunctionName().equalsIgnoreCase(functionName); + } + + private void tryAppendFFTInternalArguments( + String functionName, + List arguments, + List parameterSpecifications, + ImmutableMap.Builder passedArguments) { + if (!TableBuiltinTableFunction.FFT.getFunctionName().equalsIgnoreCase(functionName)) { + return; + } + + Optional sampleIntervalArgument = + findOptionalTableFunctionArgument( + arguments, parameterSpecifications, FFTTableFunction.SAMPLE_INTERVAL_PARAMETER_NAME); + if (sampleIntervalArgument.isPresent() + && !(sampleIntervalArgument.get().getValue() instanceof TimeDurationLiteral)) { + throw new SemanticException( + "The SAMPLE_INTERVAL argument of FFT must be a duration literal."); + } + + Optional nArgument = + findOptionalTableFunctionArgument( + arguments, parameterSpecifications, FFTTableFunction.N_PARAMETER_NAME); + if (nArgument.isPresent() && nArgument.get().getValue() instanceof TimeDurationLiteral) { + throw new SemanticException("The N argument of FFT must be a positive integer."); + } + + validateFFTOrderBySortOrder(arguments, parameterSpecifications); + passedArguments.put( + FFTTableFunction.SAMPLE_INTERVAL_SPECIFIED_PARAMETER_NAME, + new ScalarArgument( + org.apache.iotdb.udf.api.type.Type.BOOLEAN, sampleIntervalArgument.isPresent())); + } + + private void validateFFTOrderBySortOrder( + List arguments, + List parameterSpecifications) { + Optional dataArgument = + findOptionalTableFunctionArgument( + arguments, parameterSpecifications, FFTTableFunction.DATA_PARAMETER_NAME); + if (!dataArgument.isPresent() + || !(dataArgument.get().getValue() instanceof TableFunctionTableArgument)) { + return; + } + + Optional orderBy = + ((TableFunctionTableArgument) dataArgument.get().getValue()).getOrderBy(); + if (!orderBy.isPresent()) { + return; + } + + for (SortItem sortItem : orderBy.get().getSortItems()) { + if (sortItem.getOrdering() != SortItem.Ordering.ASCENDING) { + throw new SemanticException( + "The ORDER BY clause of the DATA argument must sort the time column in ascending order."); + } + } + } + private void tryAppendM4ModeArgument( String functionName, List arguments, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index bf14e37d6f3f9..4cce197f89e0b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -1564,6 +1564,9 @@ public RelationPlan visitTableFunctionInvocation(TableFunctionInvocation node, V } else if (!TableBuiltinTableFunction.M4 .getFunctionName() .equalsIgnoreCase(functionAnalysis.getFunctionName()) + && !TableBuiltinTableFunction.FFT + .getFunctionName() + .equalsIgnoreCase(functionAnalysis.getFunctionName()) && tableArgument.getPartitionBy().isPresent()) { tableArgument.getPartitionBy().get().stream() // the original symbols for partitioning columns, not coerced diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java index 9d09ff057b30a..c6a057d6fa090 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.exception.SemanticException; import org.apache.iotdb.commons.queryengine.plan.relational.function.tvf.ForecastTableFunction; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.FFTTableFunction; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester; import org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern; @@ -482,6 +483,141 @@ public void testForecastFunctionAbnormal() { } } + @Test + public void testFFTFunction() { + PlanTester planTester = new PlanTester(); + String sql = + "SELECT * FROM FFT(" + + "DATA => table1 PARTITION BY tag1 ORDER BY time, " + + "SAMPLE_INTERVAL => 1s, " + + "N => 4, " + + "NORM => 'ortho')"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = + tableScan( + "testdb.table1", + ImmutableMap.builder() + .put("time", "time") + .put("tag1_0", "tag1") + .put("s1", "s1") + .put("s2", "s2") + .put("s3", "s3") + .buildOrThrow()); + + Consumer tableFunctionMatcher = + builder -> + builder + .name("fft") + .properOutputs( + "tag1", + "frequency_index", + "frequency", + "s1_real", + "s1_imag", + "s2_real", + "s2_imag", + "s3_real", + "s3_imag") + .requiredSymbols("time", "tag1_0", "s1", "s2", "s3") + .handle( + new MapTableFunctionHandle.Builder() + .addProperty(FFTTableFunction.SAMPLE_INTERVAL_PARAMETER_NAME, 1000L) + .addProperty( + FFTTableFunction.SAMPLE_INTERVAL_SPECIFIED_PARAMETER_NAME, true) + .addProperty(FFTTableFunction.N_PARAMETER_NAME, 4L) + .addProperty(FFTTableFunction.NORM_PARAMETER_NAME, "ortho") + .addProperty("__FFT_PARTITION_TYPES", "STRING") + .addProperty("__FFT_VALUE_TYPES", "INT64,INT64,DOUBLE") + .addProperty("__FFT_VALUE_NAMES", "s1,s2,s3") + .build()); + + assertPlan( + logicalQueryPlan, anyTree(tableFunctionProcessor(tableFunctionMatcher, sort(tableScan)))); + } + + @Test + public void testFFTDefaultArguments() { + PlanTester planTester = new PlanTester(); + String sql = "SELECT * FROM TABLE(FFT(DATA => TABLE(table1) ORDER BY time))"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = + tableScan( + "testdb.table1", + ImmutableMap.builder() + .put("time", "time") + .put("s1", "s1") + .put("s2", "s2") + .put("s3", "s3") + .buildOrThrow()); + + Consumer tableFunctionMatcher = + builder -> + builder + .name("fft") + .properOutputs( + "frequency_index", + "frequency", + "s1_real", + "s1_imag", + "s2_real", + "s2_imag", + "s3_real", + "s3_imag") + .requiredSymbols("time", "s1", "s2", "s3") + .handle( + new MapTableFunctionHandle.Builder() + .addProperty( + FFTTableFunction.SAMPLE_INTERVAL_PARAMETER_NAME, Long.MIN_VALUE) + .addProperty( + FFTTableFunction.SAMPLE_INTERVAL_SPECIFIED_PARAMETER_NAME, false) + .addProperty(FFTTableFunction.N_PARAMETER_NAME, -1L) + .addProperty(FFTTableFunction.NORM_PARAMETER_NAME, "backward") + .addProperty("__FFT_PARTITION_TYPES", "") + .addProperty("__FFT_VALUE_TYPES", "INT64,INT64,DOUBLE") + .addProperty("__FFT_VALUE_NAMES", "s1,s2,s3") + .build()); + + assertPlan( + logicalQueryPlan, anyTree(tableFunctionProcessor(tableFunctionMatcher, sort(tableScan)))); + } + + @Test + public void testFFTRejectsInvalidArguments() { + assertAnalyzeFails( + "SELECT * FROM FFT(DATA => table1 PARTITION BY tag1, SAMPLE_INTERVAL => 1ms)", + "Table argument with set semantics requires an ORDER BY clause."); + assertAnalyzeFails( + "SELECT * FROM FFT(DATA => table1 PARTITION BY tag1 ORDER BY time DESC, SAMPLE_INTERVAL => 1ms)", + "The ORDER BY clause of the DATA argument must sort the time column in ascending order."); + assertAnalyzeFails( + "SELECT * FROM FFT(DATA => table1 PARTITION BY tag1 ORDER BY s1, SAMPLE_INTERVAL => 1ms)", + "The ORDER BY clause of the DATA argument must contain exactly the time column."); + assertAnalyzeFails( + "SELECT * FROM FFT(DATA => table1 PARTITION BY tag1 ORDER BY time, SAMPLE_INTERVAL => 1)", + "The SAMPLE_INTERVAL argument of FFT must be a duration literal."); + assertAnalyzeFails( + "SELECT * FROM FFT(DATA => table1 PARTITION BY tag1 ORDER BY time, N => 0)", + "Invalid scalar argument N, should be a positive value"); + assertAnalyzeFails( + "SELECT * FROM FFT(DATA => table1 PARTITION BY tag1 ORDER BY time, N => 65537)", + "FFT transform length N must not exceed 65536."); + assertAnalyzeFails( + "SELECT * FROM FFT(DATA => table1 PARTITION BY tag1 ORDER BY time, NORM => 'bad')", + "Invalid NORM value for FFT. Supported values are backward, forward and ortho."); + assertAnalyzeFails( + "SELECT * FROM FFT(DATA => (SELECT time, tag1 FROM table1) PARTITION BY tag1 ORDER BY time)", + "No numeric columns found for FFT calculation."); + } + + private void assertAnalyzeFails(String sql, String message) { + try { + analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT); + fail(); + } catch (SemanticException e) { + assertEquals(message, e.getMessage()); + } + } + @Test public void testM4TimeWindowMode() { PlanTester planTester = new PlanTester(); diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml index f0a1afcb8221c..204ffc1cf851b 100644 --- a/iotdb-core/node-commons/pom.xml +++ b/iotdb-core/node-commons/pom.xml @@ -147,6 +147,10 @@ cglib cglib + + com.github.wendykierp + JTransforms + com.google.errorprone error_prone_annotations diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java index eb969a2c6ae34..decaf8dc4669e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.function.tvf.PatternMatchTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.CapacityTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.CumulateTableFunction; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.FFTTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.HOPTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.M4TableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.SessionTableFunction; @@ -45,6 +46,7 @@ public enum TableBuiltinTableFunction { VARIATION("variation"), CAPACITY("capacity"), M4("m4"), + FFT("fft"), FORECAST("forecast"), PATTERN_MATCH("pattern_match"), CLASSIFY("classify"); @@ -91,6 +93,8 @@ public static TableFunction getBuiltinTableFunction(String functionName) { return new CapacityTableFunction(); case "m4": return new M4TableFunction(); + case "fft": + return new FFTTableFunction(); case "forecast": return new ForecastTableFunction(); case "classify": diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunction.java new file mode 100644 index 0000000000000..e9d1011f337d0 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunction.java @@ -0,0 +1,731 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf.builtin.relational.tvf; + +import org.apache.iotdb.commons.exception.SemanticException; +import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; +import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; +import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; +import org.apache.iotdb.udf.api.relational.table.argument.Argument; +import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; +import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; +import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; +import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.utils.Binary; +import org.jtransforms.fft.DoubleFFT_1D; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +import static org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex; +import static org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.POSITIVE_LONG_CHECKER; + +public class FFTTableFunction implements TableFunction { + + public static final String DATA_PARAMETER_NAME = "DATA"; + public static final String SAMPLE_INTERVAL_PARAMETER_NAME = "SAMPLE_INTERVAL"; + public static final String N_PARAMETER_NAME = "N"; + public static final String NORM_PARAMETER_NAME = "NORM"; + public static final String SAMPLE_INTERVAL_SPECIFIED_PARAMETER_NAME = + "__FFT_SAMPLE_INTERVAL_SPECIFIED"; + + private static final String TIME_COLUMN_NAME = "time"; + private static final String OUTPUT_FREQUENCY_INDEX_COLUMN = "frequency_index"; + private static final String OUTPUT_FREQUENCY_COLUMN = "frequency"; + private static final String PARTITION_TYPES_PROPERTY = "__FFT_PARTITION_TYPES"; + private static final String VALUE_TYPES_PROPERTY = "__FFT_VALUE_TYPES"; + private static final String VALUE_NAMES_PROPERTY = "__FFT_VALUE_NAMES"; + private static final long UNSPECIFIED_SAMPLE_INTERVAL = Long.MIN_VALUE; + private static final long UNSPECIFIED_N = -1L; + private static final long MAX_TRANSFORM_LENGTH = 65_536L; + private static final long MAX_SPECTRUM_DOUBLE_VALUES = 16_777_216L; + private static final String NORM_BACKWARD = "backward"; + private static final String NORM_FORWARD = "forward"; + private static final String NORM_ORTHO = "ortho"; + private static final Set SUPPORTED_PARTITION_TYPES = + new HashSet<>( + Arrays.asList( + Type.BOOLEAN, + Type.INT32, + Type.INT64, + Type.FLOAT, + Type.DOUBLE, + Type.TEXT, + Type.TIMESTAMP, + Type.DATE, + Type.BLOB, + Type.STRING)); + private static final Set SUPPORTED_VALUE_TYPES = + new HashSet<>(Arrays.asList(Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE)); + + @Override + public List getArgumentsSpecifications() { + return Arrays.asList( + TableParameterSpecification.builder().name(DATA_PARAMETER_NAME).setSemantics().build(), + ScalarParameterSpecification.builder() + .name(SAMPLE_INTERVAL_PARAMETER_NAME) + .type(Type.INT64) + .defaultValue(UNSPECIFIED_SAMPLE_INTERVAL) + .addChecker(POSITIVE_LONG_CHECKER) + .build(), + ScalarParameterSpecification.builder() + .name(N_PARAMETER_NAME) + .type(Type.INT64) + .defaultValue(UNSPECIFIED_N) + .addChecker(POSITIVE_LONG_CHECKER) + .build(), + ScalarParameterSpecification.builder() + .name(NORM_PARAMETER_NAME) + .type(Type.STRING) + .defaultValue(NORM_BACKWARD) + .build()); + } + + @Override + public TableFunctionAnalysis analyze(Map arguments) throws UDFException { + TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); + if (tableArgument.getOrderBy().isEmpty()) { + throw new SemanticException("Table argument with set semantics requires an ORDER BY clause."); + } + + int timeColumnIndex = + findColumnIndex(tableArgument, TIME_COLUMN_NAME, Collections.singleton(Type.TIMESTAMP)); + validateOrderBy(tableArgument); + + List partitionIndexes = getPartitionIndexes(tableArgument); + Set excludedIndexes = new HashSet<>(partitionIndexes); + excludedIndexes.add(timeColumnIndex); + + List valueIndexes = new ArrayList<>(); + List valueNames = new ArrayList<>(); + List valueTypes = new ArrayList<>(); + List partitionTypes = new ArrayList<>(); + DescribedSchema.Builder schemaBuilder = new DescribedSchema.Builder(); + + for (int partitionIndex : partitionIndexes) { + Type type = tableArgument.getFieldTypes().get(partitionIndex); + partitionTypes.add(type); + schemaBuilder.addField(tableArgument.getFieldNames().get(partitionIndex).get(), type); + } + schemaBuilder + .addField(OUTPUT_FREQUENCY_INDEX_COLUMN, Type.INT64) + .addField(OUTPUT_FREQUENCY_COLUMN, Type.DOUBLE); + + for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) { + if (excludedIndexes.contains(i)) { + continue; + } + Type type = tableArgument.getFieldTypes().get(i); + if (!SUPPORTED_VALUE_TYPES.contains(type)) { + continue; + } + String columnName = + tableArgument + .getFieldNames() + .get(i) + .orElseThrow( + () -> new SemanticException("FFT requires named numeric input columns.")); + valueIndexes.add(i); + valueNames.add(columnName); + valueTypes.add(type); + schemaBuilder.addField(columnName + "_real", Type.DOUBLE); + schemaBuilder.addField(columnName + "_imag", Type.DOUBLE); + } + + if (valueIndexes.isEmpty()) { + throw new SemanticException("No numeric columns found for FFT calculation."); + } + + long transformLength = (long) ((ScalarArgument) arguments.get(N_PARAMETER_NAME)).getValue(); + validateTransformLength(transformLength, valueIndexes.size()); + String norm = + ((String) ((ScalarArgument) arguments.get(NORM_PARAMETER_NAME)).getValue()) + .toLowerCase(Locale.ROOT); + validateNorm(norm); + + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder() + .addProperty( + SAMPLE_INTERVAL_PARAMETER_NAME, + ((ScalarArgument) arguments.get(SAMPLE_INTERVAL_PARAMETER_NAME)).getValue()) + .addProperty( + SAMPLE_INTERVAL_SPECIFIED_PARAMETER_NAME, + (boolean) + ((ScalarArgument) arguments.get(SAMPLE_INTERVAL_SPECIFIED_PARAMETER_NAME)) + .getValue()) + .addProperty(N_PARAMETER_NAME, transformLength) + .addProperty(NORM_PARAMETER_NAME, norm) + .addProperty(PARTITION_TYPES_PROPERTY, joinTypes(partitionTypes)) + .addProperty(VALUE_TYPES_PROPERTY, joinTypes(valueTypes)) + .addProperty(VALUE_NAMES_PROPERTY, joinStrings(valueNames)) + .build(); + + List requiredColumns = new ArrayList<>(); + requiredColumns.add(timeColumnIndex); + requiredColumns.addAll(partitionIndexes); + requiredColumns.addAll(valueIndexes); + + return TableFunctionAnalysis.builder() + .properColumnSchema(schemaBuilder.build()) + .requireRecordSnapshot(false) + .requiredColumns(DATA_PARAMETER_NAME, requiredColumns) + .handle(handle) + .build(); + } + + @Override + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + MapTableFunctionHandle handle = (MapTableFunctionHandle) tableFunctionHandle; + boolean sampleIntervalSpecified = + (boolean) handle.getProperty(SAMPLE_INTERVAL_SPECIFIED_PARAMETER_NAME); + long sampleInterval = (long) handle.getProperty(SAMPLE_INTERVAL_PARAMETER_NAME); + long transformLength = (long) handle.getProperty(N_PARAMETER_NAME); + String norm = (String) handle.getProperty(NORM_PARAMETER_NAME); + Type[] partitionTypes = parseTypes((String) handle.getProperty(PARTITION_TYPES_PROPERTY)); + Type[] valueTypes = parseTypes((String) handle.getProperty(VALUE_TYPES_PROPERTY)); + String[] valueNames = splitStrings((String) handle.getProperty(VALUE_NAMES_PROPERTY)); + + return new TableFunctionProcessorProvider() { + @Override + public TableFunctionDataProcessor getDataProcessor() { + return new FFTDataProcessor( + sampleIntervalSpecified, + sampleInterval, + transformLength, + norm, + createColumns(partitionTypes, 1), + createNumericColumns(valueTypes, valueNames, partitionTypes.length + 1)); + } + }; + } + + private static void validateOrderBy(TableArgument tableArgument) { + if (tableArgument.getOrderBy().size() != 1 + || !tableArgument.getOrderBy().get(0).equalsIgnoreCase(TIME_COLUMN_NAME)) { + throw new SemanticException( + "The ORDER BY clause of the DATA argument must contain exactly the time column."); + } + } + + private static List getPartitionIndexes(TableArgument tableArgument) + throws UDFException { + List indexes = new ArrayList<>(); + for (String partitionColumn : tableArgument.getPartitionBy()) { + indexes.add(findColumnIndex(tableArgument, partitionColumn, SUPPORTED_PARTITION_TYPES)); + } + return indexes; + } + + public static void validateTransformLength(long transformLength, int valueColumnCount) { + if (transformLength == UNSPECIFIED_N) { + return; + } + if (transformLength > MAX_TRANSFORM_LENGTH) { + throw new SemanticException( + String.format("FFT transform length N must not exceed %d.", MAX_TRANSFORM_LENGTH)); + } + long spectrumDoubleValues; + try { + spectrumDoubleValues = + Math.multiplyExact(Math.multiplyExact(transformLength, 2L), valueColumnCount); + } catch (ArithmeticException e) { + throw new SemanticException( + "FFT spectrum buffer is too large. Reduce N or the number of numeric columns."); + } + if (spectrumDoubleValues > MAX_SPECTRUM_DOUBLE_VALUES) { + throw new SemanticException( + "FFT spectrum buffer is too large. Reduce N or the number of numeric columns."); + } + } + + private static void validateNorm(String norm) { + if (!NORM_BACKWARD.equals(norm) && !NORM_FORWARD.equals(norm) && !NORM_ORTHO.equals(norm)) { + throw new SemanticException( + "Invalid NORM value for FFT. Supported values are backward, forward and ortho."); + } + } + + private static String joinTypes(List types) { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < types.size(); i++) { + if (i > 0) { + builder.append(','); + } + builder.append(types.get(i).name()); + } + return builder.toString(); + } + + private static Type[] parseTypes(String value) { + if (value.isEmpty()) { + return new Type[0]; + } + String[] values = value.split(","); + Type[] types = new Type[values.length]; + for (int i = 0; i < values.length; i++) { + types[i] = Type.valueOf(values[i]); + } + return types; + } + + private static String joinStrings(List values) { + return String.join(",", values); + } + + private static String[] splitStrings(String value) { + if (value.isEmpty()) { + return new String[0]; + } + return value.split(","); + } + + private static ValueColumn[] createColumns(Type[] types, int firstInputIndex) { + ValueColumn[] columns = new ValueColumn[types.length]; + for (int i = 0; i < types.length; i++) { + columns[i] = new ValueColumn(firstInputIndex + i, ValueOperator.fromType(types[i])); + } + return columns; + } + + private static NumericColumn[] createNumericColumns( + Type[] types, String[] names, int firstInputIndex) { + NumericColumn[] columns = new NumericColumn[types.length]; + for (int i = 0; i < types.length; i++) { + columns[i] = + new NumericColumn(firstInputIndex + i, names[i], NumericOperator.fromType(types[i])); + } + return columns; + } + + private enum ValueOperator { + BOOLEAN(Type.BOOLEAN) { + @Override + Object read(Record record, int index) { + return record.getBoolean(index); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeBoolean((Boolean) value); + } + }, + INT32(Type.INT32) { + @Override + Object read(Record record, int index) { + return record.getInt(index); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeInt((Integer) value); + } + }, + INT64(Type.INT64) { + @Override + Object read(Record record, int index) { + return record.getLong(index); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeLong((Long) value); + } + }, + FLOAT(Type.FLOAT) { + @Override + Object read(Record record, int index) { + return record.getFloat(index); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeFloat((Float) value); + } + }, + DOUBLE(Type.DOUBLE) { + @Override + Object read(Record record, int index) { + return record.getDouble(index); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeDouble((Double) value); + } + }, + TEXT(Type.TEXT) { + @Override + Object read(Record record, int index) { + return record.getBinary(index); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeBinary((Binary) value); + } + }, + BLOB(Type.BLOB) { + @Override + Object read(Record record, int index) { + return record.getBinary(index); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeBinary((Binary) value); + } + }, + TIMESTAMP(Type.TIMESTAMP) { + @Override + Object read(Record record, int index) { + return record.getLong(index); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeLong((Long) value); + } + }, + DATE(Type.DATE) { + @Override + Object read(Record record, int index) { + return record.getLocalDate(index); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeObject(value); + } + }, + STRING(Type.STRING) { + @Override + Object read(Record record, int index) { + return record.getBinary(index); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeBinary((Binary) value); + } + }; + + private final Type type; + + ValueOperator(Type type) { + this.type = type; + } + + abstract Object read(Record record, int index); + + abstract void write(ColumnBuilder builder, Object value); + + static ValueOperator fromType(Type type) { + for (ValueOperator valueOperator : values()) { + if (valueOperator.type == type) { + return valueOperator; + } + } + throw new IllegalArgumentException("Unsupported FFT partition type: " + type); + } + } + + private enum NumericOperator { + INT32(Type.INT32) { + @Override + double read(Record record, int index) { + return record.getInt(index); + } + }, + INT64(Type.INT64) { + @Override + double read(Record record, int index) { + return record.getLong(index); + } + }, + FLOAT(Type.FLOAT) { + @Override + double read(Record record, int index) { + return record.getFloat(index); + } + }, + DOUBLE(Type.DOUBLE) { + @Override + double read(Record record, int index) { + return record.getDouble(index); + } + }; + + private final Type type; + + NumericOperator(Type type) { + this.type = type; + } + + abstract double read(Record record, int index); + + static NumericOperator fromType(Type type) { + for (NumericOperator numericOperator : values()) { + if (numericOperator.type == type) { + return numericOperator; + } + } + throw new IllegalArgumentException("Unsupported FFT value type: " + type); + } + } + + private static class ValueColumn { + private final int inputIndex; + private final ValueOperator valueOperator; + + private ValueColumn(int inputIndex, ValueOperator valueOperator) { + this.inputIndex = inputIndex; + this.valueOperator = valueOperator; + } + + private Object read(Record record) { + return valueOperator.read(record, inputIndex); + } + + private void write(ColumnBuilder builder, Object value) { + valueOperator.write(builder, value); + } + } + + private static class NumericColumn { + private final int inputIndex; + private final String name; + private final NumericOperator numericOperator; + + private NumericColumn(int inputIndex, String name, NumericOperator numericOperator) { + this.inputIndex = inputIndex; + this.name = name; + this.numericOperator = numericOperator; + } + + private double read(Record record) { + return numericOperator.read(record, inputIndex); + } + } + + private static class FFTDataProcessor implements TableFunctionDataProcessor { + private final boolean sampleIntervalSpecified; + private final long sampleInterval; + private final long specifiedTransformLength; + private final String norm; + private final ValueColumn[] partitionColumns; + private final NumericColumn[] valueColumns; + private final Object[] partitionValues; + private final boolean[] partitionValueIsNull; + private final List rows = new ArrayList<>(); + private long previousTime; + private long inferredSampleInterval = UNSPECIFIED_SAMPLE_INTERVAL; + private boolean initialized; + + private FFTDataProcessor( + boolean sampleIntervalSpecified, + long sampleInterval, + long specifiedTransformLength, + String norm, + ValueColumn[] partitionColumns, + NumericColumn[] valueColumns) { + this.sampleIntervalSpecified = sampleIntervalSpecified; + this.sampleInterval = sampleInterval; + this.specifiedTransformLength = specifiedTransformLength; + this.norm = norm; + this.partitionColumns = partitionColumns; + this.valueColumns = valueColumns; + this.partitionValues = new Object[partitionColumns.length]; + this.partitionValueIsNull = new boolean[partitionColumns.length]; + } + + @Override + public void process( + Record input, + List properColumnBuilders, + ColumnBuilder passThroughIndexBuilder) { + long currentTime = input.getLong(0); + if (!initialized) { + capturePartitionValues(input); + initialized = true; + } else if (currentTime <= previousTime) { + throw new SemanticException( + "The time column of FFT input must be strictly ascending within each partition."); + } else { + validateSampleInterval(currentTime - previousTime); + } + previousTime = currentTime; + + double[] row = new double[valueColumns.length]; + for (int i = 0; i < valueColumns.length; i++) { + NumericColumn valueColumn = valueColumns[i]; + if (input.isNull(valueColumn.inputIndex)) { + throw new SemanticException( + String.format("FFT does not support null values in column [%s].", valueColumn.name)); + } + row[i] = valueColumn.read(input); + } + rows.add(row); + } + + @Override + public void finish( + List properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { + if (rows.isEmpty()) { + return; + } + + int transformLength = getTransformLength(); + double sampleIntervalSeconds = getSampleIntervalSeconds(); + double scaleFactor = getScaleFactor(transformLength); + double[][] spectra = new double[valueColumns.length][2 * transformLength]; + + int copiedRows = Math.min(rows.size(), transformLength); + DoubleFFT_1D fft = new DoubleFFT_1D(transformLength); + for (int columnIndex = 0; columnIndex < valueColumns.length; columnIndex++) { + double[] spectrum = spectra[columnIndex]; + for (int rowIndex = 0; rowIndex < copiedRows; rowIndex++) { + spectrum[2 * rowIndex] = rows.get(rowIndex)[columnIndex]; + } + fft.complexForward(spectrum); + } + + for (int frequencyIndex = 0; frequencyIndex < transformLength; frequencyIndex++) { + int outputColumnIndex = 0; + for (int partitionIndex = 0; partitionIndex < partitionColumns.length; partitionIndex++) { + if (partitionValueIsNull[partitionIndex]) { + properColumnBuilders.get(outputColumnIndex++).appendNull(); + } else { + partitionColumns[partitionIndex].write( + properColumnBuilders.get(outputColumnIndex++), partitionValues[partitionIndex]); + } + } + properColumnBuilders.get(outputColumnIndex++).writeLong(frequencyIndex); + properColumnBuilders + .get(outputColumnIndex++) + .writeDouble( + calculateFrequency(frequencyIndex, transformLength, sampleIntervalSeconds)); + for (int columnIndex = 0; columnIndex < valueColumns.length; columnIndex++) { + double[] spectrum = spectra[columnIndex]; + properColumnBuilders + .get(outputColumnIndex++) + .writeDouble(spectrum[2 * frequencyIndex] * scaleFactor); + properColumnBuilders + .get(outputColumnIndex++) + .writeDouble(spectrum[2 * frequencyIndex + 1] * scaleFactor); + } + } + } + + private void capturePartitionValues(Record input) { + for (int i = 0; i < partitionColumns.length; i++) { + if (input.isNull(partitionColumns[i].inputIndex)) { + partitionValueIsNull[i] = true; + } else { + partitionValues[i] = partitionColumns[i].read(input); + } + } + } + + private int getTransformLength() { + long transformLength = + specifiedTransformLength == UNSPECIFIED_N ? rows.size() : specifiedTransformLength; + validateTransformLength(transformLength, valueColumns.length); + return (int) transformLength; + } + + private double getSampleIntervalSeconds() { + long interval; + if (sampleIntervalSpecified) { + interval = sampleInterval; + } else { + if (rows.size() < 2) { + throw new SemanticException("FFT requires at least two rows to infer SAMPLE_INTERVAL."); + } + interval = inferredSampleInterval; + } + double intervalSeconds = + interval * TimestampPrecisionUtils.currPrecision.toNanos(1L) / 1_000_000_000.0; + if (intervalSeconds <= 0) { + throw new SemanticException("FFT SAMPLE_INTERVAL must be positive."); + } + return intervalSeconds; + } + + private void validateSampleInterval(long currentInterval) { + if (sampleIntervalSpecified) { + if (currentInterval != sampleInterval) { + throw new SemanticException( + "FFT input time interval must match the specified SAMPLE_INTERVAL."); + } + return; + } + + if (inferredSampleInterval == UNSPECIFIED_SAMPLE_INTERVAL) { + inferredSampleInterval = currentInterval; + } else if (currentInterval != inferredSampleInterval) { + throw new SemanticException( + "FFT requires evenly spaced input time values when SAMPLE_INTERVAL is not specified."); + } + } + + private double getScaleFactor(int transformLength) { + if (NORM_FORWARD.equals(norm)) { + return 1.0 / transformLength; + } + if (NORM_ORTHO.equals(norm)) { + return 1.0 / Math.sqrt(transformLength); + } + return 1.0; + } + + private double calculateFrequency( + int frequencyIndex, int transformLength, double sampleIntervalSeconds) { + int positiveFrequencyCount = (transformLength + 1) / 2; + int signedIndex = + frequencyIndex < positiveFrequencyCount + ? frequencyIndex + : frequencyIndex - transformLength; + return signedIndex / (transformLength * sampleIntervalSeconds); + } + } +} diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunctionTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunctionTest.java new file mode 100644 index 0000000000000..456b024e5ff8b --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunctionTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf.builtin.relational.tvf; + +import org.apache.iotdb.commons.exception.SemanticException; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.argument.Argument; +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; +import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; +import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.utils.Binary; +import org.junit.Test; + +import java.io.File; +import java.time.LocalDate; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class FFTTableFunctionTest { + + private final FFTTableFunction function = new FFTTableFunction(); + + @Test + public void testRejectsDuplicateTime() throws UDFException { + TableFunctionDataProcessor processor = createProcessor(false); + processor.process(record(1L, 1.0), Collections.emptyList(), null); + + assertSemanticException( + () -> processor.process(record(1L, 2.0), Collections.emptyList(), null), + "The time column of FFT input must be strictly ascending within each partition."); + } + + @Test + public void testRejectsOutOfOrderTime() throws UDFException { + TableFunctionDataProcessor processor = createProcessor(false); + processor.process(record(2L, 1.0), Collections.emptyList(), null); + + assertSemanticException( + () -> processor.process(record(1L, 2.0), Collections.emptyList(), null), + "The time column of FFT input must be strictly ascending within each partition."); + } + + @Test + public void testRejectsSingleRowWithoutSampleInterval() throws UDFException { + TableFunctionDataProcessor processor = createProcessor(false); + processor.process(record(1L, 1.0), Collections.emptyList(), null); + + assertSemanticException( + () -> processor.finish(Collections.emptyList(), null), + "FFT requires at least two rows to infer SAMPLE_INTERVAL."); + } + + @Test + public void testRejectsIrregularTimeWhenInferringSampleInterval() throws UDFException { + TableFunctionDataProcessor processor = createProcessor(false); + processor.process(record(0L, 1.0), Collections.emptyList(), null); + processor.process(record(1L, 2.0), Collections.emptyList(), null); + + assertSemanticException( + () -> processor.process(record(3L, 3.0), Collections.emptyList(), null), + "FFT requires evenly spaced input time values when SAMPLE_INTERVAL is not specified."); + } + + @Test + public void testRejectsTimeGapDifferentFromExplicitSampleInterval() throws UDFException { + TableFunctionDataProcessor processor = createProcessor(true); + processor.process(record(0L, 1.0), Collections.emptyList(), null); + + assertSemanticException( + () -> processor.process(record(2L, 2.0), Collections.emptyList(), null), + "FFT input time interval must match the specified SAMPLE_INTERVAL."); + } + + @Test + public void testRejectsDefaultTransformLengthAboveLimit() throws UDFException { + TableFunctionDataProcessor processor = createProcessor(true); + for (long time = 0; time <= 65_536L; time++) { + processor.process(record(time, 1.0), Collections.emptyList(), null); + } + + assertSemanticException( + () -> processor.finish(Collections.emptyList(), null), + "FFT transform length N must not exceed 65536."); + } + + private TableFunctionDataProcessor createProcessor(boolean sampleIntervalSpecified) + throws UDFException { + Map arguments = new HashMap<>(); + arguments.put( + FFTTableFunction.DATA_PARAMETER_NAME, + new TableArgument( + Arrays.asList(Optional.of("time"), Optional.of("value")), + Arrays.asList(Type.TIMESTAMP, Type.DOUBLE), + Collections.emptyList(), + Collections.singletonList("time"), + false)); + arguments.put( + FFTTableFunction.SAMPLE_INTERVAL_PARAMETER_NAME, + new ScalarArgument(Type.INT64, sampleIntervalSpecified ? 1L : Long.MIN_VALUE)); + arguments.put( + FFTTableFunction.SAMPLE_INTERVAL_SPECIFIED_PARAMETER_NAME, + new ScalarArgument(Type.BOOLEAN, sampleIntervalSpecified)); + arguments.put(FFTTableFunction.N_PARAMETER_NAME, new ScalarArgument(Type.INT64, -1L)); + arguments.put( + FFTTableFunction.NORM_PARAMETER_NAME, new ScalarArgument(Type.STRING, "backward")); + + return function + .getProcessorProvider(function.analyze(arguments).getTableFunctionHandle()) + .getDataProcessor(); + } + + private Record record(long time, double value) { + return new SimpleRecord(time, value); + } + + private void assertSemanticException(Runnable runnable, String message) { + try { + runnable.run(); + fail(); + } catch (SemanticException e) { + assertEquals(message, e.getMessage()); + } + } + + private static class SimpleRecord implements Record { + private final long time; + private final double value; + + private SimpleRecord(long time, double value) { + this.time = time; + this.value = value; + } + + @Override + public int getInt(int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(int columnIndex) { + if (columnIndex == 0) { + return time; + } + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(int columnIndex) { + if (columnIndex == 1) { + return value; + } + throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public Binary getBinary(int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public String getString(int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public LocalDate getLocalDate(int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public Object getObject(int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public Optional getObjectFile(int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public long objectLength(int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public Binary readObject(int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public Binary readObject(int columnIndex, long offset, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public Type getDataType(int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isNull(int columnIndex) { + return false; + } + + @Override + public int size() { + return 2; + } + } +} From a9ae6f466ac8d2462d4baa0cbdec5236dacd53f2 Mon Sep 17 00:00:00 2001 From: DaZuiZui Date: Tue, 23 Jun 2026 14:53:52 +0800 Subject: [PATCH 2/3] Add FFT table function coverage and notes --- RELEASE_NOTES.md | 1 + .../relational/tvf/FFTTableFunctionTest.java | 98 ++++++++++++++++++- 2 files changed, 95 insertions(+), 4 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 6637974a2bf4a..a843547a8cbf6 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -25,6 +25,7 @@ - Data Query: Added list display for available DataNode nodes - Data Query: Added a system table for statistics on query latency in the table model - Data Query: Python SessionDataset supported converting TsBlock to DataFrame and returning DataFrame in batches +- Data Query: Added the built-in FFT/fft table-valued function for the table model. SAMPLE_INTERVAL must be a positive duration literal when specified; N defaults to the partition row count and is capped at 65,536; null numeric values are rejected; timestamps must be strictly ascending and evenly spaced unless SAMPLE_INTERVAL is explicitly provided, in which case input gaps must match it. - Storage Management: Supported custom column names for the TIME column - Storage Management: Supported viewing the complete definition statement of created tables/views via SQL - System Management: Added a system table for DataNode node connection status in the table model diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunctionTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunctionTest.java index 456b024e5ff8b..45a9dcd51d008 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunctionTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunctionTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf; import org.apache.iotdb.commons.exception.SemanticException; +import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.access.Record; import org.apache.iotdb.udf.api.relational.table.argument.Argument; @@ -28,6 +29,10 @@ import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; import org.apache.iotdb.udf.api.type.Type; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.read.common.block.column.DoubleColumnBuilder; +import org.apache.tsfile.read.common.block.column.LongColumnBuilder; import org.apache.tsfile.utils.Binary; import org.junit.Test; @@ -36,6 +41,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; @@ -44,8 +50,57 @@ public class FFTTableFunctionTest { + private static final double DELTA = 1e-9; + private final FFTTableFunction function = new FFTTableFunction(); + @Test + public void testWritesFullSpectrumAndZeroPadsToSpecifiedN() throws UDFException { + TableFunctionDataProcessor processor = createProcessor(true, 4L); + processor.process(record(0L, 1.0), Collections.emptyList(), null); + + List builders = createOutputBuilders(4); + processor.finish(builders, null); + + double intervalSeconds = TimestampPrecisionUtils.currPrecision.toNanos(1L) / 1_000_000_000.0; + assertLongColumn(builders.get(0).build(), 0L, 1L, 2L, 3L); + assertDoubleColumn( + builders.get(1).build(), + 0.0, + 1.0 / (4.0 * intervalSeconds), + -2.0 / (4.0 * intervalSeconds), + -1.0 / (4.0 * intervalSeconds)); + assertDoubleColumn(builders.get(2).build(), 1.0, 1.0, 1.0, 1.0); + assertDoubleColumn(builders.get(3).build(), 0.0, 0.0, 0.0, 0.0); + } + + @Test + public void testTruncatesInputRowsToSpecifiedN() throws UDFException { + TableFunctionDataProcessor processor = createProcessor(true, 2L); + processor.process(record(0L, 1.0), Collections.emptyList(), null); + processor.process(record(1L, 2.0), Collections.emptyList(), null); + processor.process(record(2L, 100.0), Collections.emptyList(), null); + processor.process(record(3L, 200.0), Collections.emptyList(), null); + + List builders = createOutputBuilders(2); + processor.finish(builders, null); + + assertLongColumn(builders.get(0).build(), 0L, 1L); + assertDoubleColumn(builders.get(2).build(), 3.0, -1.0); + assertDoubleColumn(builders.get(3).build(), 0.0, 0.0); + } + + @Test + public void testRejectsInvalidRowsEvenWhenBeyondTruncatedN() throws UDFException { + TableFunctionDataProcessor processor = createProcessor(true, 2L); + processor.process(record(0L, 1.0), Collections.emptyList(), null); + processor.process(record(1L, 2.0), Collections.emptyList(), null); + + assertSemanticException( + () -> processor.process(nullValueRecord(2L), Collections.emptyList(), null), + "FFT does not support null values in column [value]."); + } + @Test public void testRejectsDuplicateTime() throws UDFException { TableFunctionDataProcessor processor = createProcessor(false); @@ -111,6 +166,11 @@ public void testRejectsDefaultTransformLengthAboveLimit() throws UDFException { private TableFunctionDataProcessor createProcessor(boolean sampleIntervalSpecified) throws UDFException { + return createProcessor(sampleIntervalSpecified, -1L); + } + + private TableFunctionDataProcessor createProcessor( + boolean sampleIntervalSpecified, long transformLength) throws UDFException { Map arguments = new HashMap<>(); arguments.put( FFTTableFunction.DATA_PARAMETER_NAME, @@ -126,7 +186,8 @@ private TableFunctionDataProcessor createProcessor(boolean sampleIntervalSpecifi arguments.put( FFTTableFunction.SAMPLE_INTERVAL_SPECIFIED_PARAMETER_NAME, new ScalarArgument(Type.BOOLEAN, sampleIntervalSpecified)); - arguments.put(FFTTableFunction.N_PARAMETER_NAME, new ScalarArgument(Type.INT64, -1L)); + arguments.put( + FFTTableFunction.N_PARAMETER_NAME, new ScalarArgument(Type.INT64, transformLength)); arguments.put( FFTTableFunction.NORM_PARAMETER_NAME, new ScalarArgument(Type.STRING, "backward")); @@ -139,6 +200,32 @@ private Record record(long time, double value) { return new SimpleRecord(time, value); } + private Record nullValueRecord(long time) { + return new SimpleRecord(time, null); + } + + private List createOutputBuilders(int expectedPositionCount) { + return Arrays.asList( + new LongColumnBuilder(null, expectedPositionCount), + new DoubleColumnBuilder(null, expectedPositionCount), + new DoubleColumnBuilder(null, expectedPositionCount), + new DoubleColumnBuilder(null, expectedPositionCount)); + } + + private void assertLongColumn(Column column, long... expected) { + assertEquals(expected.length, column.getPositionCount()); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], column.getLong(i)); + } + } + + private void assertDoubleColumn(Column column, double... expected) { + assertEquals(expected.length, column.getPositionCount()); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], column.getDouble(i), DELTA); + } + } + private void assertSemanticException(Runnable runnable, String message) { try { runnable.run(); @@ -150,9 +237,9 @@ private void assertSemanticException(Runnable runnable, String message) { private static class SimpleRecord implements Record { private final long time; - private final double value; + private final Double value; - private SimpleRecord(long time, double value) { + private SimpleRecord(long time, Double value) { this.time = time; this.value = value; } @@ -177,7 +264,7 @@ public float getFloat(int columnIndex) { @Override public double getDouble(int columnIndex) { - if (columnIndex == 1) { + if (columnIndex == 1 && value != null) { return value; } throw new UnsupportedOperationException(); @@ -235,6 +322,9 @@ public Type getDataType(int columnIndex) { @Override public boolean isNull(int columnIndex) { + if (columnIndex == 1) { + return value == null; + } return false; } From 392ff7fd3faa12c9b907802516838bc322fe0574 Mon Sep 17 00:00:00 2001 From: DaZuiZui Date: Tue, 23 Jun 2026 15:50:29 +0800 Subject: [PATCH 3/3] Fix FFT buffering and distribution planning --- .../TableDistributedPlanGenerator.java | 6 ++++- .../analyzer/TableFunctionTest.java | 8 ++++++ .../relational/tvf/FFTTableFunction.java | 25 ++++++++++++++----- .../relational/tvf/FFTTableFunctionTest.java | 19 ++++++++++++-- 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 1f55f9bc4972c..a0a3774489e51 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -1826,7 +1826,7 @@ public List visitTableFunctionProcessor( if (node.getChildren().isEmpty()) { return Collections.singletonList(node); } - boolean canSplitPushDown = node.isRowSemantic() || (node.getChild() instanceof GroupNode); + boolean canSplitPushDown = node.isRowSemantic() || isPartitionedGroup(node.getChild()); List childrenNodes = node.getChild().accept(this, context); if (childrenNodes.size() == 1) { node.setChild(childrenNodes.get(0)); @@ -1842,6 +1842,10 @@ public List visitTableFunctionProcessor( } } + private boolean isPartitionedGroup(PlanNode node) { + return node instanceof GroupNode && ((GroupNode) node).getPartitionKeyCount() > 0; + } + private void buildRegionNodeMap( AggregationTableScanNode originalAggTableScanNode, List> regionReplicaSetsList, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java index c6a057d6fa090..347ec698fb01b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java @@ -579,6 +579,14 @@ public void testFFTDefaultArguments() { assertPlan( logicalQueryPlan, anyTree(tableFunctionProcessor(tableFunctionMatcher, sort(tableScan)))); + assertPlan( + planTester.getFragmentPlan(0), + output( + tableFunctionProcessor( + tableFunctionMatcher, mergeSort(exchange(), exchange(), exchange())))); + assertPlan(planTester.getFragmentPlan(1), sort(tableScan)); + assertPlan(planTester.getFragmentPlan(2), sort(tableScan)); + assertPlan(planTester.getFragmentPlan(3), sort(tableScan)); } @Test diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunction.java index e9d1011f337d0..bfce8501f01e6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunction.java @@ -557,6 +557,7 @@ private static class FFTDataProcessor implements TableFunctionDataProcessor { private final Object[] partitionValues; private final boolean[] partitionValueIsNull; private final List rows = new ArrayList<>(); + private long inputRowCount; private long previousTime; private long inferredSampleInterval = UNSPECIFIED_SAMPLE_INTERVAL; private boolean initialized; @@ -595,22 +596,34 @@ public void process( } previousTime = currentTime; - double[] row = new double[valueColumns.length]; + if (specifiedTransformLength == UNSPECIFIED_N && inputRowCount >= MAX_TRANSFORM_LENGTH) { + throw new SemanticException( + String.format("FFT transform length N must not exceed %d.", MAX_TRANSFORM_LENGTH)); + } + + boolean shouldCacheRow = + specifiedTransformLength == UNSPECIFIED_N || rows.size() < specifiedTransformLength; + double[] row = shouldCacheRow ? new double[valueColumns.length] : null; for (int i = 0; i < valueColumns.length; i++) { NumericColumn valueColumn = valueColumns[i]; if (input.isNull(valueColumn.inputIndex)) { throw new SemanticException( String.format("FFT does not support null values in column [%s].", valueColumn.name)); } - row[i] = valueColumn.read(input); + if (shouldCacheRow) { + row[i] = valueColumn.read(input); + } + } + inputRowCount++; + if (shouldCacheRow) { + rows.add(row); } - rows.add(row); } @Override public void finish( List properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { - if (rows.isEmpty()) { + if (inputRowCount == 0) { return; } @@ -668,7 +681,7 @@ private void capturePartitionValues(Record input) { private int getTransformLength() { long transformLength = - specifiedTransformLength == UNSPECIFIED_N ? rows.size() : specifiedTransformLength; + specifiedTransformLength == UNSPECIFIED_N ? inputRowCount : specifiedTransformLength; validateTransformLength(transformLength, valueColumns.length); return (int) transformLength; } @@ -678,7 +691,7 @@ private double getSampleIntervalSeconds() { if (sampleIntervalSpecified) { interval = sampleInterval; } else { - if (rows.size() < 2) { + if (inputRowCount < 2) { throw new SemanticException("FFT requires at least two rows to infer SAMPLE_INTERVAL."); } interval = inferredSampleInterval; diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunctionTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunctionTest.java index 45a9dcd51d008..a82a901f241f0 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunctionTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/FFTTableFunctionTest.java @@ -101,6 +101,21 @@ public void testRejectsInvalidRowsEvenWhenBeyondTruncatedN() throws UDFException "FFT does not support null values in column [value]."); } + @Test + public void testInfersSampleIntervalFromInputRowsBeyondSpecifiedN() throws UDFException { + TableFunctionDataProcessor processor = createProcessor(false, 1L); + processor.process(record(0L, 1.0), Collections.emptyList(), null); + processor.process(record(2L, 2.0), Collections.emptyList(), null); + + List builders = createOutputBuilders(1); + processor.finish(builders, null); + + assertLongColumn(builders.get(0).build(), 0L); + assertDoubleColumn(builders.get(1).build(), 0.0); + assertDoubleColumn(builders.get(2).build(), 1.0); + assertDoubleColumn(builders.get(3).build(), 0.0); + } + @Test public void testRejectsDuplicateTime() throws UDFException { TableFunctionDataProcessor processor = createProcessor(false); @@ -155,12 +170,12 @@ public void testRejectsTimeGapDifferentFromExplicitSampleInterval() throws UDFEx @Test public void testRejectsDefaultTransformLengthAboveLimit() throws UDFException { TableFunctionDataProcessor processor = createProcessor(true); - for (long time = 0; time <= 65_536L; time++) { + for (long time = 0; time < 65_536L; time++) { processor.process(record(time, 1.0), Collections.emptyList(), null); } assertSemanticException( - () -> processor.finish(Collections.emptyList(), null), + () -> processor.process(record(65_536L, 1.0), Collections.emptyList(), null), "FFT transform length N must not exceed 65536."); }