Skip to content

Fuse array higher-order functions in Project#15061

Draft
thirtiseven wants to merge 5 commits into
NVIDIA:mainfrom
thirtiseven:issue-14711-fuse-array-hof
Draft

Fuse array higher-order functions in Project#15061
thirtiseven wants to merge 5 commits into
NVIDIA:mainfrom
thirtiseven:issue-14711-fuse-array-hof

Conversation

@thirtiseven

@thirtiseven thirtiseven commented Jun 11, 2026

Copy link
Copy Markdown
Collaborator

Fixes #14711.

Description

This PR fuses compatible array higher-order functions that appear as top-level Project outputs, so multiple expressions over the same array can share the explode and intermediate projection work.

The fused path currently covers transform, filter, exists, and supported aggregate expressions. It groups HOFs by the same array argument and lambda arity, evaluates the shared exploded element batch once, then evaluates each lambda against the shared batch and reconstructs each output independently. The implementation keeps conservative guards for deterministic expressions, side-effect checks, and Project output ordering.

This does not add new user-facing configuration or change query results. Unsupported or non-matching shapes continue to use the existing per-expression evaluation path.

Tests added or updated:

  • test_array_heterogeneous_elementwise_hof_mixed_project
  • test_array_hof_mixed_project_with_aggregate

Validation run:

  • mvn -pl sql-plugin -Dbuildver=352 -DskipTests compile
  • mvn -pl dist -Dbuildver=352 -DskipTests package
  • mvn -N -Dbuildver=352 -DskipTests -Dmaven.scalastyle.skip=false verify
  • TESTS=array_test.py TEST_PARALLEL=0 ./integration_tests/run_pyspark_from_build.sh -s -k 'test_array_transform'
  • TESTS=array_test.py TEST_PARALLEL=0 ./integration_tests/run_pyspark_from_build.sh -s -k 'test_array_filter or test_array_exists or test_array_heterogeneous_elementwise_hof_mixed_project'
  • TESTS=higher_order_functions_test.py TEST_PARALLEL=0 ./integration_tests/run_pyspark_from_build.sh -s -k 'test_array_hof_mixed_project_with_aggregate or test_array_aggregate_count_if_int or test_array_aggregate_zero_is_outer_column or test_array_aggregate_empty_array'

Performance

Array HOF Project Perf

  • data path: /tmp/array_hof_project_perf_data
  • rows: 2000000
  • array length: 96
  • partitions: 16
  • warm runs averaged: 3
  • cold run: excluded from averages
  • speedup: CPU avg warm e2e ms / GPU avg warm e2e ms
  • GPU project nodes should be greater than 0 when RAPIDS is loaded and the project is on GPU
case project outputs CPU cold e2e ms CPU avg warm e2e ms GPU cold e2e ms GPU avg warm e2e ms speedup GPU project nodes GPU avg warm project op ms CPU warm e2e ms GPU warm e2e ms GPU warm project op ms checksum match
many_unary_transforms 12 44594.80 43513.73 1697.58 758.30 57.38x 1 3423.26 43490.74
43481.91
43568.52
783.86
733.20
757.85
3621.86
3229.59
3418.33
yes
heterogeneous_elementwise 8 20846.06 20699.53 641.77 560.03 36.96x 1 3396.65 20488.55
20610.92
20999.13
569.40
540.03
570.66
3554.11
3120.08
3515.77
yes
aggregate_heavy_mix 11 26083.71 26127.60 595.47 582.01 44.89x 1 4595.99 26235.57
26170.41
25976.83
585.06
569.50
591.46
4341.20
4586.28
4860.49
yes
indexed_transforms 10 42682.47 44172.19 690.54 652.53 67.69x 1 3208.59 44195.75
43949.65
44371.19
636.26
663.88
657.44
3075.26
3252.09
3298.42
yes

Wrote markdown results to /tmp/array_hof_project_perf_results.md

case GPU e2e before ms GPU e2e after ms e2e gain GPU project op before ms GPU project op after ms project op gain
many_unary_transforms 837.79 758.30 9.49% 4534.35 3423.26 24.50%
heterogeneous_elementwise 590.86 560.03 5.22% 4063.81 3396.65 16.42%
aggregate_heavy_mix 671.34 582.01 13.31% 6003.95 4595.99 23.45%
indexed_transforms 725.28 652.53 10.03% 4339.08 3208.59 26.05%

Checklists

Documentation

  • Updated for new or modified user-facing features or behaviors
  • No user-facing change

Testing

  • Added or modified tests to cover new code paths
  • Covered by existing tests
    (Please provide the names of the existing tests in the PR description.)
  • Not required

Performance

  • Tests ran and results are added in the PR description
  • Issue filed with a link in the PR description
  • Not required

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven

Copy link
Copy Markdown
Collaborator Author

@greptile review

@greptile-apps

greptile-apps Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces GpuArrayTransformFusion, an optimization that fuses multiple compatible array higher-order functions (transform, filter, exists, aggregate) appearing as top-level Project outputs over the same array, sharing the expensive explode step and intermediate projections across all of them.

  • Adds GpuArrayTransformFusion.project() as the first path tried in GpuProjectExec.project(), grouping fusable HOFs by the same array argument and lambda arity, then evaluating the shared exploded element batch once before dispatching per-HOF lambda evaluation and result reconstruction.
  • Extracts transformElementResults and aggregateElementResults as private[rapids] methods on the respective traits/classes so the fusion object can invoke per-HOF result reconstruction without duplicating logic.
  • Adds two integration tests verifying CPU/GPU result parity for mixed-HOF project shapes.

Confidence Score: 4/5

The change is an additive optimization path that falls back gracefully to the original expression-by-expression evaluation when no fusable groups are found, so no existing query can regress silently.

The fusion logic, resource management (withResource / catch-and-close in projectWithFusedGroups and evaluateFusedGroup), and grouping guards are all carefully constructed. The only notable gaps are quality-oriented: the consumeElementResults match is non-exhaustive at the trait level (an explicit fallback arm would give a clear error message rather than a MatchError on a future subclass), and the hasNoSideEffects fallback is silently conservative without a comment explaining the intent. Neither of these affects correctness today.

sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala — specifically the consumeElementResults pattern match and canReorderAcross helper.

Important Files Changed

Filename Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala Core fusion logic added in GpuArrayTransformFusion; non-exhaustive pattern match in consumeElementResults is a latent runtime risk if new GpuArrayTransformBase subclasses are added; resource lifecycle and group logic look correct.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala Minimal change: delegates to GpuArrayTransformFusion.project() before falling through to the original path; the recursive call into project() from makeSharedElementBatch terminates correctly since unionIntermediate never contains GpuArrayTransformBase expressions.
integration_tests/src/main/python/array_test.py New test uses assert_gpu_and_cpu_are_equal_collect and @disable_ansi_mode; covers a heterogeneous mix of transform/filter/exists with an outer column reference; data gen is non-empty and bounded.
integration_tests/src/main/python/higher_order_functions_test.py New test covers transform/filter/exists/aggregate fusion path with assert_gpu_and_cpu_are_equal_collect; correctly exercises the aggregate→arity-1 path alongside arity-1 elementwise HOFs.

Sequence Diagram

sequenceDiagram
    participant P as GpuProjectExec.project()
    participant F as GpuArrayTransformFusion
    participant E as evaluateFusedGroup()
    participant S as makeSharedElementBatch()
    participant T as per-HOF lambda eval

    P->>F: project(batch, boundExprs)
    F->>F: findFusedGroups(boundExprs)
    note over F: Group HOFs by same array arg + lambda arity
    F->>F: canReorderAcross() guard
    F->>P: None (no eligible groups) → fallback to safeMap
    alt fusable groups found
        F->>E: evaluateFusedGroup(batch, [HOF_A, HOF_B, ...])
        E->>S: makeSharedElementBatch(batch, arg, unionIntermediate)
        S->>S: GpuProjectExec.project(batch, unionIntermediate)
        S->>S: appendColumns + explode / explodePosition
        S-->>E: sharedBatch (element-level rows)
        loop for each HOF in group
            E->>T: makeTransformLambdaBatch(sharedBatch, unionIntermediate, HOF_i)
            T-->>E: lambdaBatch (cols sliced to HOF_i's intermediates + lambda args)
            E->>T: HOF_i.function.columnarEval(lambdaBatch)
            T-->>E: dataCol
            E->>E: consumeElementResults(dataCol, arg) → output column
        end
        E-->>F: Seq[GpuColumnVector] (one per HOF in group)
        F->>F: assign to outputColumns[] by original index
        F->>F: evaluate non-fused expressions individually
        F-->>P: Some(ColumnarBatch(outputColumns))
    end
Loading

Reviews (1): Last reviewed commit: "Cover heterogeneous array elementwise fu..." | Re-trigger Greptile

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA] Fuse multiple array higher-order functions reading the same column to share the explode step

2 participants