Fuse array higher-order functions in Project#15061
Conversation
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile review |
Greptile SummaryThis PR introduces
Confidence Score: 4/5The change is an additive optimization path that falls back gracefully to the original expression-by-expression evaluation when no fusable groups are found, so no existing query can regress silently. The fusion logic, resource management (withResource / catch-and-close in projectWithFusedGroups and evaluateFusedGroup), and grouping guards are all carefully constructed. The only notable gaps are quality-oriented: the consumeElementResults match is non-exhaustive at the trait level (an explicit fallback arm would give a clear error message rather than a MatchError on a future subclass), and the hasNoSideEffects fallback is silently conservative without a comment explaining the intent. Neither of these affects correctness today. sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala — specifically the consumeElementResults pattern match and canReorderAcross helper. Important Files Changed
Sequence DiagramsequenceDiagram
participant P as GpuProjectExec.project()
participant F as GpuArrayTransformFusion
participant E as evaluateFusedGroup()
participant S as makeSharedElementBatch()
participant T as per-HOF lambda eval
P->>F: project(batch, boundExprs)
F->>F: findFusedGroups(boundExprs)
note over F: Group HOFs by same array arg + lambda arity
F->>F: canReorderAcross() guard
F->>P: None (no eligible groups) → fallback to safeMap
alt fusable groups found
F->>E: evaluateFusedGroup(batch, [HOF_A, HOF_B, ...])
E->>S: makeSharedElementBatch(batch, arg, unionIntermediate)
S->>S: GpuProjectExec.project(batch, unionIntermediate)
S->>S: appendColumns + explode / explodePosition
S-->>E: sharedBatch (element-level rows)
loop for each HOF in group
E->>T: makeTransformLambdaBatch(sharedBatch, unionIntermediate, HOF_i)
T-->>E: lambdaBatch (cols sliced to HOF_i's intermediates + lambda args)
E->>T: HOF_i.function.columnarEval(lambdaBatch)
T-->>E: dataCol
E->>E: consumeElementResults(dataCol, arg) → output column
end
E-->>F: Seq[GpuColumnVector] (one per HOF in group)
F->>F: assign to outputColumns[] by original index
F->>F: evaluate non-fused expressions individually
F-->>P: Some(ColumnarBatch(outputColumns))
end
Reviews (1): Last reviewed commit: "Cover heterogeneous array elementwise fu..." | Re-trigger Greptile |
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Fixes #14711.
Description
This PR fuses compatible array higher-order functions that appear as top-level Project outputs, so multiple expressions over the same array can share the explode and intermediate projection work.
The fused path currently covers
transform,filter,exists, and supportedaggregateexpressions. It groups HOFs by the same array argument and lambda arity, evaluates the shared exploded element batch once, then evaluates each lambda against the shared batch and reconstructs each output independently. The implementation keeps conservative guards for deterministic expressions, side-effect checks, and Project output ordering.This does not add new user-facing configuration or change query results. Unsupported or non-matching shapes continue to use the existing per-expression evaluation path.
Tests added or updated:
test_array_heterogeneous_elementwise_hof_mixed_projecttest_array_hof_mixed_project_with_aggregateValidation run:
mvn -pl sql-plugin -Dbuildver=352 -DskipTests compilemvn -pl dist -Dbuildver=352 -DskipTests packagemvn -N -Dbuildver=352 -DskipTests -Dmaven.scalastyle.skip=false verifyTESTS=array_test.py TEST_PARALLEL=0 ./integration_tests/run_pyspark_from_build.sh -s -k 'test_array_transform'TESTS=array_test.py TEST_PARALLEL=0 ./integration_tests/run_pyspark_from_build.sh -s -k 'test_array_filter or test_array_exists or test_array_heterogeneous_elementwise_hof_mixed_project'TESTS=higher_order_functions_test.py TEST_PARALLEL=0 ./integration_tests/run_pyspark_from_build.sh -s -k 'test_array_hof_mixed_project_with_aggregate or test_array_aggregate_count_if_int or test_array_aggregate_zero_is_outer_column or test_array_aggregate_empty_array'Performance
Array HOF Project Perf
/tmp/array_hof_project_perf_data43481.91
43568.52
733.20
757.85
3229.59
3418.33
20610.92
20999.13
540.03
570.66
3120.08
3515.77
26170.41
25976.83
569.50
591.46
4586.28
4860.49
43949.65
44371.19
663.88
657.44
3252.09
3298.42
Wrote markdown results to /tmp/array_hof_project_perf_results.md
Checklists
Documentation
Testing
(Please provide the names of the existing tests in the PR description.)
Performance