[Harbor 1/4] vero core: split visibility, budget ledger, evaluation engine refactor#3
[Harbor 1/4] vero core: split visibility, budget ledger, evaluation engine refactor#3varunursekar wants to merge 1 commit into
Conversation
…ne refactor Foundational changes that the Harbor integration builds on, useful on their own: - 3-tier split visibility: `SplitAccessLevel` gains `no_access` (hidden) alongside `visible` and `non_viewable` (aggregate-only). - `BudgetLedger` (core/budget.py): a metered, optionally durable per-split eval budget extracted from the experiment runner. - `TaskOutput` is now a model; the task pipeline runs inference and scoring as separate, resumable, disk-backed stages, with label-field scrubbing before inference. - Evaluation refactor: consolidate the eval path under `vero/evaluation/` as a shared `EvaluationEngine` + `Evaluator`, port `ExperimentRunnerTool` onto it, and add an injectable `EvalStrategy` seam (the extension point Harbor Mode B uses). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| tasks, _ = self._load_and_prepare_data(params) | ||
| sample_ids = params.run.dataset_subset.sample_ids | ||
| if sample_ids is None: | ||
| sample_ids = list(range(len(tasks))) |
There was a problem hiding this comment.
Dataset loaded twice per evaluation
run_scoring_stage calls _load_and_prepare_data(params) even though run_inference_stage already called it earlier in run(). On large datasets loaded from disk, this doubles the I/O and deserialization cost. Consider passing the already-loaded (tasks, task_data) tuple into both stage methods, or loading once at the run() level and threading it through.
Prompt To Fix With AI
This is a comment left during a code review.
Path: vero/src/vero/core/task/task.py
Line: 745-748
Comment:
**Dataset loaded twice per evaluation**
`run_scoring_stage` calls `_load_and_prepare_data(params)` even though `run_inference_stage` already called it earlier in `run()`. On large datasets loaded from disk, this doubles the I/O and deserialization cost. Consider passing the already-loaded `(tasks, task_data)` tuple into both stage methods, or loading once at the `run()` level and threading it through.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Partially valid. The double load is real: run_inference_stage (line 680) and run_scoring_stage (line 745) each call _load_and_prepare_data, so a full run() deserializes the dataset twice.
The duplication is deliberate, though, not accidental. Each stage is independently resumable and independently invocable (a crashed run re-enters only the unfinished stage, and the Harbor sidecar can drive scoring without a fresh inference pass), so neither stage can assume the other already loaded the data. Threading a pre-loaded (tasks, task_data) tuple down from run() would only help the single-process full-pipeline path and would force both stage methods to grow an optional pre-loaded argument that the resume path can't supply.
For the HuggingFace default loader the second load is effectively a memory-mapped Arrow reopen (cheap), and custom @task.load_data loaders are expected to be idempotent. I'd rather keep the stages self-contained than add a parameter that's only populated on one of the two entry paths. Leaving as-is intentionally; if profiling on a large custom loader shows this is a hotspot we can add an optional in-process cache keyed on (dataset_id, split, sample_ids) rather than reshape the stage signatures.
| @@ -337,6 +343,13 @@ async def init(self) -> None: | |||
| self._validate_budget_splits() | |||
There was a problem hiding this comment.
_validate_budget_splits() only checks budget splits against the dataset; it never reconciles budget against split_accesses. These two lists jointly enforce visibility (budget = which splits the agent may RUN; split_accesses = what gets WRITTEN back per tier). An init-time cross-check would close a real hole: every agent-budgeted split should be explicitly tiered non_viewable/no_access, raising on viewable-or-unlisted. Combined with tier_for_split defaulting an unlisted split to viewable (over in harbor/protocol.py), the concrete failure is: budget a split, forget to tier it, and the agent both runs it and gets its per-sample labels written to its own volume.
| async def evaluate(self, req: EvalRequest, *, admin: bool = False) -> Experiment: | ||
| """Meter (unless admin) and run one evaluation; return the full Experiment. | ||
|
|
||
| ``no_access`` gating is implicit: those splits are absent from the budget |
There was a problem hiding this comment.
This docstring is honest that no_access gating is implicit — it relies on those splits being absent from the budget ledger so reserve() raises. That couples a security tier to a budgeting structure: a split that is in the ledger but should be no_access would not be rejected here. Recommend evaluate() consult split_accesses directly and hard-reject tier == no_access for non-admin callers, independent of ledger membership.
| ] | ||
| self.persist_path.parent.mkdir(parents=True, exist_ok=True) | ||
| tmp = self.persist_path.with_suffix(self.persist_path.suffix + ".tmp") | ||
| tmp.write_text(json.dumps(data, indent=2)) |
There was a problem hiding this comment.
reserve() holds self._lock (an asyncio.Lock) across this synchronous tmp.write_text() + replace(). In the durable sidecar path under concurrency this stalls the event loop on every reservation. Keep the lock around only the in-memory check+decrement, and push the flush out with await asyncio.to_thread(...). (Confirms Greptile's P1.)
…f-loop ledger flush Core trust-boundary hardening for the eval engine (review findings on PR #3): - engine: explicit, fail-closed no_access gate in evaluate(). Resolve the split's tier (unlisted defaults to no_access) and reject no_access for non-admin callers before the budget ledger, instead of relying implicitly on the split being absent from the ledger. Adds resolve_split_access() in core.dataset.base (no vero.harbor import) as the single fail-closed resolver. - policy: reconcile budgets and split access at init. Auto-tier every agent-budgeted split to non_viewable when unlisted (evaluable, labels hidden), then reject any budgeted split explicitly tiered viewable or no_access. Keeps the train_budget convenience path working while closing the leak. - budget: move the durable ledger flush off the event loop (asyncio.to_thread) while keeping it under the reservation lock, so the sync write no longer blocks the loop and concurrent flushes cannot race the temp file. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
On the design question raised in review (should split access be a dataset property checked in real time?): yes. I implemented the minimal version in the fix PR for this branch (#7) and wrote up the full reasoning there: #7 (comment) (budget vs access as separate axes, trust ownership, flexibility, and the deferred DatasetInfo follow-up). |
Draft · Stack 1 of 4 — review/merge in order (1→4). No Harbor concepts here; foundations the rest builds on (and useful on their own).
SplitAccessLevelgainsno_access(hidden) alongsidevisible/non_viewable(aggregate-only).BudgetLedger(core/budget.py): metered, optionally-durable per-split eval budget.TaskOutputis a model; inference and scoring run as separate resumable disk-backed stages, with label-field scrubbing before inference.vero/evaluation/as a sharedEvaluationEngine+Evaluator, portExperimentRunnerToolonto it, and add the injectableEvalStrategyseam (Mode B's extension point).Review focus: the visibility enum,
BudgetLedger, theEvaluationEngineAPI, and the strategy seam. Much ofevaluation/is relocated code (skim).Part of the Harbor integration (full squashed view: #2). Stack: this → [2/4] sidecar → [3/4] compiler → [4/4] docs.
🤖 Generated with Claude Code
Greptile Summary
This PR lays the foundational plumbing for the Harbor integration: a three-tier
SplitAccessLevelenum (viewable/non_viewable/no_access), a meteredBudgetLedgerwith optional durable JSON persistence, and a refactored evaluation stack that splits inference and scoring into two resumable disk-backed stages with label-field scrubbing.BudgetLedger(core/budget.py): in-memory by default;persist_pathenables a crash-safe durable variant for the Harbor sidecar, withreserve()providing an atomic check+decrement under anasyncio.Lock. One issue:_flush()does synchronous file I/O while the lock is held, which blocks the event loop in the durable path.core/task/task.py):run_inference_stageandrun_scoring_stageeach persist per-sample results incrementally and skip already-completed samples on resume;label_fieldsare scrubbed from inference inputs but passed intact to scoring.EvaluationEngine+EvalStrategyseam (evaluation/): shared engine consolidates budget metering and sample resolution; aProtocol-basedEvalStrategyseam allows Harbor Mode B to inject aHarborRunnerwithout couplingvero.evaluationto harbor-specific code.Confidence Score: 3/5
Safe to merge for the in-process ExperimentRunnerTool path; the durable BudgetLedger path (Harbor sidecar) has a blocking I/O call under an asyncio lock that will degrade the event loop under concurrency.
The evaluation refactor, visibility enum, and EvalStrategy seam are clean and well-tested. The concern is in BudgetLedger.reserve(): it holds an asyncio.Lock and then calls _flush(), which does synchronous Path.write_text() on the main thread. For the in-memory path (persist_path=None) _flush() is a no-op and this is harmless. But the durable path — the variant the Harbor sidecar is documented to use — will block the event loop on every metered eval request, causing latency spikes and throttling throughput under concurrent load.
vero/src/vero/core/budget.py — the _flush() / reserve() interaction; vero/src/vero/core/task/task.py — double _load_and_prepare_data() call across the two stages.
Important Files Changed
Flowchart
%%{init: {'theme': 'neutral'}}%% flowchart TD A[VeroTask.run] --> B[run_inference_stage] B --> B1{sample persisted?} B1 -- yes --> B2[skip] B1 -- no --> B3[_scrub_inputs] B3 --> B4[inference fn] B4 --> B5[_save_inference to disk] A --> C[run_scoring_stage] C --> C1{error or already scored?} C1 -- yes --> C2[skip] C1 -- no --> C3[load TaskOutput from disk] C3 --> C4[scoring fn with full row] C4 --> C5[_save_score to disk] A --> D[compute_metrics from disk] E[EvaluationEngine.evaluate] --> F{admin?} F -- no --> G[BudgetLedger.reserve] G --> G1[asyncio.Lock] G1 --> G2[check] G2 --> G3[record + _flush] G3 -. blocking I/O .-> G4[blocks event loop] F -- yes --> H[bypass budget] G --> I[Evaluator.evaluate] H --> I I --> J{eval_strategy set?} J -- Mode B --> K[EvalStrategy.produce_sample_results] J -- Mode A --> L[_run_task_in_subprocess]%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% flowchart TD A[VeroTask.run] --> B[run_inference_stage] B --> B1{sample persisted?} B1 -- yes --> B2[skip] B1 -- no --> B3[_scrub_inputs] B3 --> B4[inference fn] B4 --> B5[_save_inference to disk] A --> C[run_scoring_stage] C --> C1{error or already scored?} C1 -- yes --> C2[skip] C1 -- no --> C3[load TaskOutput from disk] C3 --> C4[scoring fn with full row] C4 --> C5[_save_score to disk] A --> D[compute_metrics from disk] E[EvaluationEngine.evaluate] --> F{admin?} F -- no --> G[BudgetLedger.reserve] G --> G1[asyncio.Lock] G1 --> G2[check] G2 --> G3[record + _flush] G3 -. blocking I/O .-> G4[blocks event loop] F -- yes --> H[bypass budget] G --> I[Evaluator.evaluate] H --> I I --> J{eval_strategy set?} J -- Mode B --> K[EvalStrategy.produce_sample_results] J -- Mode A --> L[_run_task_in_subprocess]Comments Outside Diff (1)
vero/src/vero/core/budget.py, line 175-193 (link)asyncio.Lockinreserve()_flush()callsPath.write_text()(synchronous) whilereserve()holdsself._lock— anasyncio.Lock. Any concurrent coroutine that awaitsreserve()will be blocked for the full duration of the file write; on a loaded Harbor sidecar with rapid eval requests, this stalls the entire event loop. The fix is to wrap the blocking writes inasyncio.to_thread()(or useaiofiles) andawaitthem insidereserve(), keeping the lock protecting only the in-memory check+decrement step.Prompt To Fix With AI
Prompt To Fix All With AI
Reviews (1): Last reviewed commit: "vero: 3-tier split visibility, budget le..." | Re-trigger Greptile