Skip to content

feat: add repeat until workflow stages#773

Open
andreatgretel wants to merge 3 commits into
mainfrom
andreatgretel/feat/workflow-repeat-until
Open

feat: add repeat until workflow stages#773
andreatgretel wants to merge 3 commits into
mainfrom
andreatgretel/feat/workflow-repeat-until

Conversation

@andreatgretel

@andreatgretel andreatgretel commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

📋 Summary

This PR adds a bounded RepeatUntil stage policy so workflow chaining can keep generating candidates until filtered output reaches a target count. It covers the exact-N rejection-sampling case from the Slack thread while keeping runs bounded, resumable in append mode, and visible in workflow metadata.

🔗 Related Issue

N/A

🔄 Changes

  • Add the public RepeatUntil, RepeatUntilMode, and RepeatUntilExhaustion API exports.
  • Teach CompositeWorkflow.add_stage to accept repeat_until with append and discard modes.
  • Persist repeat policy and run details in stage metadata and fingerprints.
  • Trim selected output to exactly output_records when the repeat target is satisfied.
  • Let on_exhausted="return_partial" complete empty when no rows pass, matching allow_empty=True downstream skip behavior.
  • Make discard-mode resume semantics explicit with a warning when prior attempts are replaced.
  • Simplify append-mode resume handling to rely on artifact resume instead of a misleading metadata shortcut.
  • Add interface tests for append accumulation, non-empty and empty partial exhaustion, raised exhaustion, discard retries, discard resume warning, and processor-selected output.
  • Document the workflow chaining pattern for repeating until a filtered count in workflow-chaining.mdx, including append accumulation and cap semantics.

🔍 Attention Areas

⚠️ Reviewers: Please pay special attention to the following:

  • composite_workflow.py - Adds a public stage orchestration policy and changes stage metadata/fingerprint behavior for repeat runs.
🎬 E2E demo: exact-N filtering with workflow chaining
from pathlib import Path
import shutil

import pandas as pd

from data_designer.interface import RepeatUntil


def keep_quality_rows(stage_path: Path) -> Path:
    df = pd.read_parquet(stage_path / "parquet-files")
    output_path = stage_path / "callback-outputs" / "kept"
    if output_path.exists():
        shutil.rmtree(output_path)
    output_path.mkdir(parents=True)
    df[df["judge_score"] >= 0.8].to_parquet(output_path / "data.parquet", index=False)
    return output_path


workflow = data_designer.compose_workflow(name="quality-gated-candidates")
workflow.add_stage(
    "candidates",
    candidate_builder,
    num_records=1_000,
    on_success=keep_quality_rows,
    on_success_version="quality-filter-v1",
    repeat_until=RepeatUntil(
        output_records=5_000,
        max_iterations=10,
        max_generated_records=20_000,
        mode="append",
        on_exhausted="return_partial",
    ),
)
workflow.add_stage("enriched", enrichment_builder)

results = workflow.run()
  • num_records is the per-attempt growth size.
  • In append mode, each iteration requests the cumulative stage size and reruns the filter hook over the accumulated stage output.
  • Downstream stages receive exactly output_records selected rows once the target is reached, or the best partial output when bounded exhaustion is configured with return_partial.
  • If no rows pass with return_partial, the stage completes empty and downstream stages are skipped.

🧪 Testing

  • .venv/bin/ruff check --fix .
  • .venv/bin/ruff format .
  • .venv/bin/pytest packages/data-designer/tests/interface/test_composite_workflow.py -q
  • PATH=/Users/amanoel/.local/bin:$PATH make check-fern-docs-locally
  • Unit tests added/updated
  • E2E tests added/updated - N/A, no separate E2E path for this workflow API

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated (workflow chaining docs updated)

@github-actions

Copy link
Copy Markdown
Contributor

Fern preview: https://nvidia-preview-pr-773.docs.buildwithfern.com/nemo/datadesigner

Fern previews include the docs-website version archive with PR changes synced into latest. Notebook tutorials are rendered without execution outputs in previews.

@andreatgretel andreatgretel force-pushed the andreatgretel/feat/workflow-repeat-until branch from 3e1e30f to 022cac6 Compare June 26, 2026 16:26
@andreatgretel andreatgretel marked this pull request as ready for review June 26, 2026 16:26
@andreatgretel andreatgretel requested a review from a team as a code owner June 26, 2026 16:26
@greptile-apps

greptile-apps Bot commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces a bounded RepeatUntil stage policy for CompositeWorkflow that enables rejection-sampling patterns: a stage can keep generating candidates until its filtered (selected) output reaches an exact row count, with configurable iteration limits and exhaustion handling.

  • Append mode (mode="append", default): grows the stage artifact cumulatively each iteration (num_records × iteration), re-runs the on_success filter over all accumulated data, and trims the result to exactly output_records via _write_parquet_head when the target is first exceeded.
  • Discard mode (mode="discard"): replaces the stage artifact on every attempt and accumulates an explicit generated_records counter; warns when resumed because prior attempts are irrecoverably deleted.
  • Exhaustion handling: on_exhausted="raise" (default) raises a DataDesignerWorkflowError; on_exhausted="return_partial" returns whatever selected output exists, and an empty partial correctly propagates as completed_empty to skip downstream stages — guarded by the new _allows_empty_stage_output helper.
  • The repeat_until config is included in both the stage fingerprint and base metadata, so any policy change invalidates the resume cache and all run statistics (repeat_iterations, repeat_generated_records, repeat_satisfied, repeat_until_output_path) are persisted to workflow metadata.

Confidence Score: 5/5

Safe to merge. The new repeat loop logic, trim path, and exhaustion paths are all internally consistent and verified by the seven new integration tests.

Every branch introduced — append accumulation, discard replacement, trimming to exact count, non-empty partial, empty partial (completed_empty propagation), raised exhaustion, and processor-output selection — is exercised by a dedicated integration test. The fingerprint change correctly invalidates cached stages when the repeat policy changes, the _allows_empty_stage_output helper correctly gates the zero-record case only for exhausted RETURN_PARTIAL runs, and the repeat_until_output_path key is properly added to WORKFLOW_PATH_METADATA_KEYS so it round-trips correctly through resume normalization.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer/src/data_designer/interface/composite_workflow.py Core change: adds RepeatUntil policy, _run_stage_until_append/discard methods, exhaustion handling, and trim logic. Logic for append/discard modes, empty-partial paths, fingerprinting, and metadata is correct and consistent.
packages/data-designer/src/data_designer/interface/init.py Adds RepeatUntil, RepeatUntilMode, and RepeatUntilExhaustion to public lazy-import registry and TYPE_CHECKING block. Straightforward and correct.
packages/data-designer/tests/interface/test_composite_workflow.py Adds seven integration tests covering append accumulation and trim, non-empty partial, empty partial, raised exhaustion, discard retry, discard resume warning, and processor-output selection. All major code paths are exercised.
fern/versions/latest/pages/concepts/workflow-chaining.mdx Adds a new "Repeating until a filtered count" section with a working code example and accurate prose describing append/discard mode semantics and exhaustion options.

Reviews (1): Last reviewed commit: "refactor: simplify repeat until append r..." | Re-trigger Greptile

@github-actions

Copy link
Copy Markdown
Contributor

Thanks for putting this together, @andreatgretel — this is a clean, well-bounded addition and the test coverage is genuinely thorough.

Summary

This PR adds a bounded RepeatUntil stage policy to CompositeWorkflow so a stage can keep generating candidates until its selected (post-on_success/processor) output reaches an exact target row count. It supports append (cumulative, resumable) and discard (replace-per-attempt) modes, persists the policy and run details in stage metadata/fingerprints, trims overshoot to exactly output_records, and lets on_exhausted="return_partial" complete empty to match allow_empty=True skip semantics. The implementation matches the stated intent in the PR description, and the docs page is updated in step.

Findings

Warnings — Worth addressing

packages/data-designer/tests/interface/test_composite_workflow.py — No test exercises the max_generated_records cap

  • What: The docs and API lead heavily with max_generated_records as the primary bound, but no test drives the loop to break because of that cap. Every repeat test terminates via satisfaction, max_iterations, or empty exhaustion. The branch if _exceeds_max_generated_records(...): break (both append and discard) and the "did not run because no iteration fit" guard in _handle_repeat_until_exhausted (composite_workflow.py:814-817) are untested.
  • Why: Per AGENTS.md, new logic needs tests — and max_generated_records is exactly the kind of off-by-one-prone bound (append checks requested_records, discard checks generated_records + num_records) where a regression would be silent. The cap is also the safety mechanism the docs tell users to rely on.
  • Suggestion: Add one append-mode case where max_generated_records forces exhaustion before max_iterations (e.g. num_records=2, max_generated_records=3 so iteration 2's requested_records=4 trips the break with last_result from iteration 1), asserting repeat_iterations and the raised/partial outcome. A second tiny case with max_generated_records < num_records would lock in the last_result is None guard.

Suggestions — Take it or leave it

packages/data-designer/src/data_designer/interface/composite_workflow.py:285-286max_generated_records < num_records is silently unsatisfiable

  • What: If a user sets max_generated_records below the stage's num_records, the first iteration's requested_records (append) immediately trips the cap, last_result stays None, and the stage always raises — even with on_exhausted="return_partial". The misconfiguration only surfaces at run time.
  • Why: num_records lives on the stage, not on RepeatUntil, so __post_init__ can't catch this; but add_stage knows both values and could fail fast with a clear message.
  • Suggestion: Consider a guard in add_stage when repeat_until is set: if repeat_until.max_generated_records is not None and repeat_until.max_generated_records < (num_records or DEFAULT_NUM_RECORDS), raise a DataDesignerWorkflowError explaining the cap can never admit a single attempt. Optional — the runtime error is survivable, just later than ideal.

packages/data-designer/src/data_designer/interface/composite_workflow.py:1197-1202_write_parquet_head loads the full selected output into memory to keep the head

  • What: Trimming does _load_parquet_dataset(source_path).head(num_records), which concatenates every parquet file of the selected output into one DataFrame before slicing. In practice the selected (post-filter) output is near output_records, so this is usually fine, but a final iteration that overshoots heavily (e.g. target 5k, last attempt keeps 18k) materializes the whole thing just to write 5k rows.
  • Why: Pass 2 flags in-memory operations that scale with generated volume; this is the one spot in the new path that does.
  • Suggestion: Optional — read row-group metadata to take only enough files/rows to satisfy num_records (similar to how _count_parquet_records reads metadata lazily). Low priority given the typical post-filter size.

packages/data-designer/tests/interface/test_composite_workflow.py — No append-mode resume test

  • What: The commit "simplify append-mode resume handling to rely on artifact resume" changes resume behavior, and there's a nice discard-resume warning test, but no test resumes an append-mode stage and asserts it grows from the prior attempt rather than regenerating.
  • Suggestion: Worth a follow-up test that marks an append stage resumable mid-flight and confirms the second run continues accumulating. Take it or leave it.

What Looks Good

  • Careful tri-state handling of repeat_satisfied — using run_result.repeat_satisfied is False in _allows_empty_stage_output (composite_workflow.py:834) correctly distinguishes "ran and didn't satisfy" from None (non-repeat stage). Easy to get wrong with a truthy check; you didn't.
  • Frozen-dataclass validationRepeatUntil.__post_init__ validates bounds and coerces string inputs to enums via object.__setattr__, giving both immutability and friendly mode="append" ergonomics. Clean.
  • Mode semantics are well-separated_run_stage_until_append vs _run_stage_until_discard are distinct, readable, and the shared exhaustion/trim logic is factored into _handle_repeat_until_exhausted / _with_repeat_result without over-abstracting. The discard-resume warning is a thoughtful touch.
  • Tests assert real behavior — they check actual row contents, downstream propagation, and metadata fields, not just "it ran." The empty-partial and processor-output cases in particular exercise tricky boundaries.

Structural Impact

Reviewer interpretation: The reported HIGH risk / 103 import-direction violations are AST false positives: nearly all are .keys()/.items() method calls on local dicts (e.g. CompositeWorkflowResults.stage_results) that the extractor mis-attributes as config --> interface edges. The actual new imports in composite_workflow.py all flow interface → engine → config (the legal direction) — RepeatUntil/RepeatUntilMode/RepeatUntilExhaustion are defined in the interface layer, and __init__.py exposes them via the existing lazy-import + TYPE_CHECKING pattern. No real layering violation was introduced. The genuine signal worth heeding is the high connectivity of composite_workflow.py / CompositeWorkflow.run — this is a central orchestration node, so the backward-compat note below matters.

Backward compatibility: _stage_fingerprint now always includes a repeat_until key (None for stages without the policy), which changes the hash for all stages. This does not regress resume behavior because the fingerprint already incorporates library_version, so shipping this in a new version invalidates prior cached stages regardless — the added key rides along with an already-changing hash. Worth keeping in mind, not a blocker.

Raw graphify analysis

Structural Impact (graphify, 2.2s)

Risk: HIGH (103 import direction violation(s))

  • 3 Python files, 93 AST entities, 4/82 clusters

Import Direction Violations (103)

Legal direction: interface -> engine -> config

  • .create_report_section() (config) --calls--> .keys() (interface)
  • inject_sampler_type_into_params() (config) --calls--> .items() (interface)
  • _resolve_sampler_kwargs() (config) --calls--> .items() (interface)
  • allowed_references() (config) --calls--> .keys() (interface)
  • ._resolve_drop_column_names() (config) --calls--> .keys() (interface)
  • +98 more

High-Connectivity Changes

  • .items() (85 deps) in packages/data-designer/src/data_designer/interface/composite_workflow.py
  • composite_workflow.py (50 deps) in packages/data-designer/src/data_designer/interface/composite_workflow.py
  • CompositeWorkflow (49 deps) in packages/data-designer/src/data_designer/interface/composite_workflow.py
  • .keys() (40 deps) in packages/data-designer/src/data_designer/interface/composite_workflow.py
  • CompositeWorkflowResults (38 deps) in packages/data-designer/src/data_designer/interface/composite_workflow.py
  • .run() (32 deps) in packages/data-designer/src/data_designer/interface/composite_workflow.py
  • RepeatUntilExhaustion (23 deps) in packages/data-designer/src/data_designer/interface/composite_workflow.py
  • RepeatUntil (23 deps) in packages/data-designer/src/data_designer/interface/composite_workflow.py
  • +51 more

Cross-Package Dependencies

  • Lazily import interface exports when accessed. (interface) --rationale_for--> __getattr__() (config)
  • Lazily import interface exports when accessed. (interface) --uses--> ResumeMode (engine)
  • Return available exports for tab-completion. (interface) --rationale_for--> __dir__() (config)
  • Return available exports for tab-completion. (interface) --uses--> ResumeMode (engine)
  • RepeatUntilMode (interface) --uses--> DatasetProfilerResults (config)
  • RepeatUntilMode (interface) --uses--> ProcessorConfig (config)
  • +507 more

Verdict

Needs changes — only one Warning (the missing max_generated_records cap test); everything else is optional. The implementation itself looks correct and ships-ready once that bound has direct coverage.


This review was generated by an AI assistant.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant