Skip to content

[Harbor 1/4] vero core: split visibility, budget ledger, evaluation engine refactor#3

Open
varunursekar wants to merge 1 commit into
mainfrom
harbor-1-core
Open

[Harbor 1/4] vero core: split visibility, budget ledger, evaluation engine refactor#3
varunursekar wants to merge 1 commit into
mainfrom
harbor-1-core

Conversation

@varunursekar

@varunursekar varunursekar commented Jun 24, 2026

Copy link
Copy Markdown

Draft · Stack 1 of 4 — review/merge in order (1→4). No Harbor concepts here; foundations the rest builds on (and useful on their own).

  • 3-tier split visibility: SplitAccessLevel gains no_access (hidden) alongside visible / non_viewable (aggregate-only).
  • BudgetLedger (core/budget.py): metered, optionally-durable per-split eval budget.
  • Staged eval: TaskOutput is a model; inference and scoring run as separate resumable disk-backed stages, with label-field scrubbing before inference.
  • Evaluation refactor: consolidate under vero/evaluation/ as a shared EvaluationEngine + Evaluator, port ExperimentRunnerTool onto it, and add the injectable EvalStrategy seam (Mode B's extension point).

Review focus: the visibility enum, BudgetLedger, the EvaluationEngine API, and the strategy seam. Much of evaluation/ is relocated code (skim).

Part of the Harbor integration (full squashed view: #2). Stack: this → [2/4] sidecar → [3/4] compiler → [4/4] docs.

🤖 Generated with Claude Code

Greptile Summary

This PR lays the foundational plumbing for the Harbor integration: a three-tier SplitAccessLevel enum (viewable / non_viewable / no_access), a metered BudgetLedger with optional durable JSON persistence, and a refactored evaluation stack that splits inference and scoring into two resumable disk-backed stages with label-field scrubbing.

  • BudgetLedger (core/budget.py): in-memory by default; persist_path enables a crash-safe durable variant for the Harbor sidecar, with reserve() providing an atomic check+decrement under an asyncio.Lock. One issue: _flush() does synchronous file I/O while the lock is held, which blocks the event loop in the durable path.
  • Staged evaluation (core/task/task.py): run_inference_stage and run_scoring_stage each persist per-sample results incrementally and skip already-completed samples on resume; label_fields are scrubbed from inference inputs but passed intact to scoring.
  • EvaluationEngine + EvalStrategy seam (evaluation/): shared engine consolidates budget metering and sample resolution; a Protocol-based EvalStrategy seam allows Harbor Mode B to inject a HarborRunner without coupling vero.evaluation to harbor-specific code.

Confidence Score: 3/5

Safe to merge for the in-process ExperimentRunnerTool path; the durable BudgetLedger path (Harbor sidecar) has a blocking I/O call under an asyncio lock that will degrade the event loop under concurrency.

The evaluation refactor, visibility enum, and EvalStrategy seam are clean and well-tested. The concern is in BudgetLedger.reserve(): it holds an asyncio.Lock and then calls _flush(), which does synchronous Path.write_text() on the main thread. For the in-memory path (persist_path=None) _flush() is a no-op and this is harmless. But the durable path — the variant the Harbor sidecar is documented to use — will block the event loop on every metered eval request, causing latency spikes and throttling throughput under concurrent load.

vero/src/vero/core/budget.py — the _flush() / reserve() interaction; vero/src/vero/core/task/task.py — double _load_and_prepare_data() call across the two stages.

Important Files Changed

Filename Overview
vero/src/vero/core/budget.py New BudgetLedger with durable JSON persistence; _flush() does blocking I/O inside the asyncio lock in reserve(), which will stall the event loop in the Harbor sidecar path.
vero/src/vero/core/task/task.py Refactored into two resumable stages (inference + scoring) with per-sample disk persistence and label-field scrubbing; both stages redundantly call _load_and_prepare_data().
vero/src/vero/evaluation/engine.py New EvaluationEngine shared core; clean budget metering with admin bypass; no_access gating via ledger allowlist is well-designed.
vero/src/vero/evaluation/evaluator.py Evaluator moved from vero/evaluator.py; gains eval_strategy injection seam for Mode B; logic is unchanged and correct.
vero/src/vero/evaluation/strategy.py Clean runtime-checkable Protocol defining the EvalStrategy seam; keeps vero.evaluation harbor-agnostic.
vero/src/vero/core/dataset/base.py Adds no_access tier to SplitAccessLevel and updates get_non_viewable_splits to include it; default test split promoted from non_viewable to no_access.
vero/src/vero/core/db/result.py TaskOutput converted from dataclass to Pydantic BaseModel with Exception-coercion validator; adds is_scored() to SampleResult; clean and correct.
vero/src/vero/evaluator.py Reduced to a back-compat re-export shim; unnecessarily exports private _resolve_vero_dependency.
vero/src/vero/tools/experiment_runner.py ExperimentRunnerTool ported onto EvaluationEngine; SplitBudget moved to vero.core.budget and re-exported for back-compat; delegation is clean.
vero/src/vero/policy.py Adds harbor field and lazy HarborRunner strategy injection; import path updates are correct.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[VeroTask.run] --> B[run_inference_stage]
    B --> B1{sample persisted?}
    B1 -- yes --> B2[skip]
    B1 -- no --> B3[_scrub_inputs]
    B3 --> B4[inference fn]
    B4 --> B5[_save_inference to disk]

    A --> C[run_scoring_stage]
    C --> C1{error or already scored?}
    C1 -- yes --> C2[skip]
    C1 -- no --> C3[load TaskOutput from disk]
    C3 --> C4[scoring fn with full row]
    C4 --> C5[_save_score to disk]

    A --> D[compute_metrics from disk]

    E[EvaluationEngine.evaluate] --> F{admin?}
    F -- no --> G[BudgetLedger.reserve]
    G --> G1[asyncio.Lock]
    G1 --> G2[check]
    G2 --> G3[record + _flush]
    G3 -. blocking I/O .-> G4[blocks event loop]
    F -- yes --> H[bypass budget]
    G --> I[Evaluator.evaluate]
    H --> I
    I --> J{eval_strategy set?}
    J -- Mode B --> K[EvalStrategy.produce_sample_results]
    J -- Mode A --> L[_run_task_in_subprocess]
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
    A[VeroTask.run] --> B[run_inference_stage]
    B --> B1{sample persisted?}
    B1 -- yes --> B2[skip]
    B1 -- no --> B3[_scrub_inputs]
    B3 --> B4[inference fn]
    B4 --> B5[_save_inference to disk]

    A --> C[run_scoring_stage]
    C --> C1{error or already scored?}
    C1 -- yes --> C2[skip]
    C1 -- no --> C3[load TaskOutput from disk]
    C3 --> C4[scoring fn with full row]
    C4 --> C5[_save_score to disk]

    A --> D[compute_metrics from disk]

    E[EvaluationEngine.evaluate] --> F{admin?}
    F -- no --> G[BudgetLedger.reserve]
    G --> G1[asyncio.Lock]
    G1 --> G2[check]
    G2 --> G3[record + _flush]
    G3 -. blocking I/O .-> G4[blocks event loop]
    F -- yes --> H[bypass budget]
    G --> I[Evaluator.evaluate]
    H --> I
    I --> J{eval_strategy set?}
    J -- Mode B --> K[EvalStrategy.produce_sample_results]
    J -- Mode A --> L[_run_task_in_subprocess]
Loading

Comments Outside Diff (1)

  1. vero/src/vero/core/budget.py, line 175-193 (link)

    P1 Blocking I/O inside asyncio.Lock in reserve()

    _flush() calls Path.write_text() (synchronous) while reserve() holds self._lock — an asyncio.Lock. Any concurrent coroutine that awaits reserve() will be blocked for the full duration of the file write; on a loaded Harbor sidecar with rapid eval requests, this stalls the entire event loop. The fix is to wrap the blocking writes in asyncio.to_thread() (or use aiofiles) and await them inside reserve(), keeping the lock protecting only the in-memory check+decrement step.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: vero/src/vero/core/budget.py
    Line: 175-193
    
    Comment:
    **Blocking I/O inside `asyncio.Lock` in `reserve()`**
    
    `_flush()` calls `Path.write_text()` (synchronous) while `reserve()` holds `self._lock` — an `asyncio.Lock`. Any concurrent coroutine that awaits `reserve()` will be blocked for the full duration of the file write; on a loaded Harbor sidecar with rapid eval requests, this stalls the entire event loop. The fix is to wrap the blocking writes in `asyncio.to_thread()` (or use `aiofiles`) and `await` them inside `reserve()`, keeping the lock protecting only the in-memory check+decrement step.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code Fix in Codex

Fix All in Cursor Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
vero/src/vero/core/budget.py:175-193
**Blocking I/O inside `asyncio.Lock` in `reserve()`**

`_flush()` calls `Path.write_text()` (synchronous) while `reserve()` holds `self._lock` — an `asyncio.Lock`. Any concurrent coroutine that awaits `reserve()` will be blocked for the full duration of the file write; on a loaded Harbor sidecar with rapid eval requests, this stalls the entire event loop. The fix is to wrap the blocking writes in `asyncio.to_thread()` (or use `aiofiles`) and `await` them inside `reserve()`, keeping the lock protecting only the in-memory check+decrement step.

### Issue 2 of 3
vero/src/vero/core/task/task.py:745-748
**Dataset loaded twice per evaluation**

`run_scoring_stage` calls `_load_and_prepare_data(params)` even though `run_inference_stage` already called it earlier in `run()`. On large datasets loaded from disk, this doubles the I/O and deserialization cost. Consider passing the already-loaded `(tasks, task_data)` tuple into both stage methods, or loading once at the `run()` level and threading it through.

### Issue 3 of 3
vero/src/vero/evaluator.py:8-13
The back-compat shim re-exports `_resolve_vero_dependency`, a private implementation detail that should not be part of any public surface. Callers importing via `vero.evaluator` have no reason to use it, and exporting it risks unintended coupling. Remove it from the shim — it's already accessible via `vero.evaluation.evaluator._resolve_vero_dependency` for internal use.

```suggestion
from vero.evaluation.evaluator import (  # noqa: F401
    Evaluator,
    isolate_project,
    run_evaluation,
)
```

Reviews (1): Last reviewed commit: "vero: 3-tier split visibility, budget le..." | Re-trigger Greptile

Greptile also left 1 inline comment on this PR.

…ne refactor

Foundational changes that the Harbor integration builds on, useful on their own:

- 3-tier split visibility: `SplitAccessLevel` gains `no_access` (hidden) alongside
  `visible` and `non_viewable` (aggregate-only).
- `BudgetLedger` (core/budget.py): a metered, optionally durable per-split eval
  budget extracted from the experiment runner.
- `TaskOutput` is now a model; the task pipeline runs inference and scoring as
  separate, resumable, disk-backed stages, with label-field scrubbing before inference.
- Evaluation refactor: consolidate the eval path under `vero/evaluation/` as a shared
  `EvaluationEngine` + `Evaluator`, port `ExperimentRunnerTool` onto it, and add an
  injectable `EvalStrategy` seam (the extension point Harbor Mode B uses).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@varunursekar varunursekar requested a review from a team June 24, 2026 18:17
@varunursekar varunursekar marked this pull request as ready for review June 24, 2026 18:21
Comment on lines +745 to +748
tasks, _ = self._load_and_prepare_data(params)
sample_ids = params.run.dataset_subset.sample_ids
if sample_ids is None:
sample_ids = list(range(len(tasks)))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Dataset loaded twice per evaluation

run_scoring_stage calls _load_and_prepare_data(params) even though run_inference_stage already called it earlier in run(). On large datasets loaded from disk, this doubles the I/O and deserialization cost. Consider passing the already-loaded (tasks, task_data) tuple into both stage methods, or loading once at the run() level and threading it through.

Prompt To Fix With AI
This is a comment left during a code review.
Path: vero/src/vero/core/task/task.py
Line: 745-748

Comment:
**Dataset loaded twice per evaluation**

`run_scoring_stage` calls `_load_and_prepare_data(params)` even though `run_inference_stage` already called it earlier in `run()`. On large datasets loaded from disk, this doubles the I/O and deserialization cost. Consider passing the already-loaded `(tasks, task_data)` tuple into both stage methods, or loading once at the `run()` level and threading it through.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially valid. The double load is real: run_inference_stage (line 680) and run_scoring_stage (line 745) each call _load_and_prepare_data, so a full run() deserializes the dataset twice.

The duplication is deliberate, though, not accidental. Each stage is independently resumable and independently invocable (a crashed run re-enters only the unfinished stage, and the Harbor sidecar can drive scoring without a fresh inference pass), so neither stage can assume the other already loaded the data. Threading a pre-loaded (tasks, task_data) tuple down from run() would only help the single-process full-pipeline path and would force both stage methods to grow an optional pre-loaded argument that the resume path can't supply.

For the HuggingFace default loader the second load is effectively a memory-mapped Arrow reopen (cheap), and custom @task.load_data loaders are expected to be idempotent. I'd rather keep the stages self-contained than add a parameter that's only populated on one of the two entry paths. Leaving as-is intentionally; if profiling on a large custom loader shows this is a hotspot we can add an optional in-process cache keyed on (dataset_id, split, sample_ids) rather than reshape the stage signatures.

Comment thread vero/src/vero/policy.py
@@ -337,6 +343,13 @@ async def init(self) -> None:
self._validate_budget_splits()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_validate_budget_splits() only checks budget splits against the dataset; it never reconciles budget against split_accesses. These two lists jointly enforce visibility (budget = which splits the agent may RUN; split_accesses = what gets WRITTEN back per tier). An init-time cross-check would close a real hole: every agent-budgeted split should be explicitly tiered non_viewable/no_access, raising on viewable-or-unlisted. Combined with tier_for_split defaulting an unlisted split to viewable (over in harbor/protocol.py), the concrete failure is: budget a split, forget to tier it, and the agent both runs it and gets its per-sample labels written to its own volume.

async def evaluate(self, req: EvalRequest, *, admin: bool = False) -> Experiment:
"""Meter (unless admin) and run one evaluation; return the full Experiment.

``no_access`` gating is implicit: those splits are absent from the budget

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This docstring is honest that no_access gating is implicit — it relies on those splits being absent from the budget ledger so reserve() raises. That couples a security tier to a budgeting structure: a split that is in the ledger but should be no_access would not be rejected here. Recommend evaluate() consult split_accesses directly and hard-reject tier == no_access for non-admin callers, independent of ledger membership.

]
self.persist_path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.persist_path.with_suffix(self.persist_path.suffix + ".tmp")
tmp.write_text(json.dumps(data, indent=2))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reserve() holds self._lock (an asyncio.Lock) across this synchronous tmp.write_text() + replace(). In the durable sidecar path under concurrency this stalls the event loop on every reservation. Keep the lock around only the in-memory check+decrement, and push the flush out with await asyncio.to_thread(...). (Confirms Greptile's P1.)

shehabyasser-scale added a commit that referenced this pull request Jun 30, 2026
…f-loop ledger flush

Core trust-boundary hardening for the eval engine (review findings on PR #3):

- engine: explicit, fail-closed no_access gate in evaluate(). Resolve the
  split's tier (unlisted defaults to no_access) and reject no_access for
  non-admin callers before the budget ledger, instead of relying implicitly on
  the split being absent from the ledger. Adds resolve_split_access() in
  core.dataset.base (no vero.harbor import) as the single fail-closed resolver.
- policy: reconcile budgets and split access at init. Auto-tier every
  agent-budgeted split to non_viewable when unlisted (evaluable, labels hidden),
  then reject any budgeted split explicitly tiered viewable or no_access. Keeps
  the train_budget convenience path working while closing the leak.
- budget: move the durable ledger flush off the event loop (asyncio.to_thread)
  while keeping it under the reservation lock, so the sync write no longer
  blocks the loop and concurrent flushes cannot race the temp file.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@shehabyasser-scale

shehabyasser-scale commented Jun 30, 2026

Copy link
Copy Markdown
Collaborator

On the design question raised in review (should split access be a dataset property checked in real time?): yes. I implemented the minimal version in the fix PR for this branch (#7) and wrote up the full reasoning there: #7 (comment) (budget vs access as separate axes, trust ownership, flexibility, and the deferred DatasetInfo follow-up).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants