Skip to content

fix(forwarder): cache Firehose client + add gevent worker pool (MLI-7328)#850

Open
lorenzo-norcini-scale wants to merge 13 commits into
mainfrom
lorenzonorcini/mli-7328-forwarder-firehose-cache-gevent
Open

fix(forwarder): cache Firehose client + add gevent worker pool (MLI-7328)#850
lorenzo-norcini-scale wants to merge 13 commits into
mainfrom
lorenzonorcini/mli-7328-forwarder-firehose-cache-gevent

Conversation

@lorenzo-norcini-scale

@lorenzo-norcini-scale lorenzo-norcini-scale commented Jun 27, 2026

Copy link
Copy Markdown
Collaborator

Problem (MLI-7328)

The logging post-inference hook calls FirehoseStreamingStorageGateway.put_record() on every async task. The gateway rebuilt an STS client, called assume_role, and built a new Firehose client on every call. Under the celery prefork pool the workers are long-lived with no child recycling, so that per-task client churn grew each worker's RSS until the celery-forwarder was OOM-killed at its 2Gi limit (it OOMs while holding ~no in-flight work, matching the reported symptom).

Fix

  • Cache the Firehose client (built once per worker), rebuilt shortly before the assumed-role token expires (once per token lifetime, not per task), using only public boto3 APIs (no botocore-internal credential injection). Removes the per-task client churn and per-task STS round-trip. Same role/region assumptions as before.
  • Opt-in gevent worker pool (CELERY_WORKER_POOL=gevent): the forwarder is IO-bound, so gevent runs the same concurrency in one process instead of N, at much lower memory. Monkey-patch happens before imports, in the right order for ddtrace context. Prefork stays the default; the gevent monkey-patch and pool selection are skipped entirely under it.
  • Opt-in prefork child recycling (CELERY_WORKER_MAX_TASKS_PER_CHILD): recycle each prefork child after N tasks so the OS reclaims any per-task residue (e.g. glibc arena fragmentation). Off unless set; defense-in-depth, independent of the cache fix. No effect under gevent.

Changes

  • inference/infra/gateways/firehose_streaming_storage_gateway.py: cached client, rebuilt ahead of assumed-role token expiry via public boto3.client(..., aws_session_token=...) (no botocore-internal credential injection); lazy init guarded by a lock and a sized connection pool (both for gevent-safety; no-ops under prefork).
  • inference/forwarding/celery_forwarder.py: guarded gevent monkey-patch + pool selection + env-gated max_tasks_per_child (prefork).
  • requirements.{in,txt}: add gevent.
  • core/celery/s3.py: comment noting the per-thread result-backend cache is per-greenlet (bounded) under gevent.
  • Tests: firehose built-once + rebuild-before-expiry + build-failure-retry unit tests; gevent import-order + worker-pool wiring unit tests (incl. max_tasks_per_child gating); test_forwarder_pool.py integration tests (prefork, gevent, concurrency, SQS, warm-shutdown).

Behavior / rollout

  • The firehose cache fix applies to both pools and is the live change on prefork (it is the OOM fix). The pool/concurrency model changes only if CELERY_WORKER_POOL=gevent is set.
  • Turning gevent on is a companion model-engine-internal change (set the env on the forwarder, resize the pod: ~1 CPU/process, lower memory, scale via replicas). Once deployed with gevent, the existing minikube async e2e covers it.

Validation (full detail in the linked doc)

  • Unit suite green, including the new cache/rebuild/pool-ordering guards.
  • Note: the 4h/7h soaks below ran an earlier in-place credential-refresh variant; they validate the cache fix (no per-task churn -> no leak). The shipped rebuild-ahead-of-expiry mechanism is separately validated: unit tests + a 3h prefork soak (52k tasks, ~24 client rebuilds over ~2.4 cycles, memory bounded ~64-79%, 0 errors, no OOM).
  • Native prod-equivalent repro: prefork + hook OOMs at ~t25m; with the cache fix memory plateaus, no OOM. gevent runs in 1 process at ~4-6x lower memory, validated for correctness, concurrency, the SQS broker (prod's broker), credential refresh, and warm-shutdown; a 4h gevent + SQS + S3 + firehose soak (results in the doc) stays flat.
  • Prefork (the shipping path) + cache soak-validated over SQS + S3 + firehose at 2Gi for 7h / 121k tasks: memory plateaus (bounded, no leak), no OOM, 0 errors, cache holds across ~62 credential refreshes. (At 2Gi the steady-state working set is high ~84% under SQS — a sizing matter, not the leak.)

Notes for reviewer

  • requirements.txt was edited by hand; please regenerate with pip-compile to confirm pins.
  • gevent is a new dependency; cp314 wheels exist for x86_64 and aarch64.

🤖 Generated with Claude Code

Greptile Summary

This PR fixes the celery-forwarder OOM by caching the Firehose boto3 client per worker (rebuilding only before assumed-role token expiry, not per task) and adds an opt-in gevent worker pool for lower memory footprint at equal concurrency.

  • Firehose client caching (firehose_streaming_storage_gateway.py): a pure _build_client() returns (client, expiry) without touching self; the caller atomically assigns both under a double-checked lock, so a failed rebuild never advances _expiry past a client that was never installed. The lock is threading.Lock, which becomes greenlet-aware under monkey.patch_all() for gevent safety.
  • Gevent pool (celery_forwarder.py): monkey.patch_all() is guarded by CELERY_WORKER_POOL=gevent and runs before all other imports at module top-of-file; prefork workers are entirely unaffected. An env-gated max_tasks_per_child knob provides defense-in-depth for prefork.
  • Tests: unit tests cover built-once, rebuild-before-expiry, failed-build-retry, and pool-wiring; subprocess-based tests guard gevent import ordering and ddtrace-run compatibility; integration tests (local-only) cover prefork, gevent, concurrency, warm-shutdown, and SQS.

Confidence Score: 5/5

Safe to merge — the client-caching logic is correct under both prefork and gevent, well-tested, and the gevent pool is opt-in behind an env var that defaults to the existing prefork behaviour.

The core fix (cached Firehose client with double-checked locking and a pure _build_client) is safe under both worker modes. Under prefork each child process holds its own instance so there is no cross-process sharing; under gevent cooperative scheduling ensures the unguarded outer check and the non-atomic tuple assignment are both safe. The test suite covers the key failure modes (first-build failure, refresh failure, rebuild-before-expiry). The gevent change is fully opt-in and leaves the default prefork path byte-for-byte unchanged.

requirements.txt was hand-edited; worth regenerating with pip-compile to confirm the gevent/zope pin graph is correct before the next dependency bump.

Important Files Changed

Filename Overview
model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py Core OOM fix: Firehose client is now cached per-instance with double-checked locking, rebuilt only near token expiry via a pure _build_client(). Logic handles first-build failure and refresh-failure correctly, and is safe under both prefork (per-process instance) and gevent (cooperative scheduling).
model-engine/model_engine_server/inference/forwarding/celery_forwarder.py Opt-in gevent pool (CELERY_WORKER_POOL=gevent) with monkey.patch_all() at module top-of-file before all other imports; prefork is unchanged. Adds env-gated max_tasks_per_child for prefork defense-in-depth. Logic is clean and well-guarded.
model-engine/tests/unit/infra/gateways/test_firehose_streaming_storage_gateway.py Comprehensive rewrite of gateway tests: covers built-once regression, rebuild-before-expiry, failed-build-retries, pure _build_client, and connection-pool wiring. Tests exercise the real _build_client path through mocked boto3.client routing.
model-engine/tests/integration/inference/test_forwarder_pool.py New integration tests for prefork and gevent pools (task completion, concurrency, warm-shutdown, SQS broker). Skipped in CI; local-only. Contains a minor type annotation issue (sqs_url: str = None).
model-engine/requirements.txt Hand-edited to pin gevent==26.5.0 and its transitive deps (zope-event, zope-interface, greenlet). PR author notes this should be regenerated with pip-compile to verify pins.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Task as Celery Task (greenlet/child)
    participant GW as FirehoseStreamingStorageGateway
    participant Lock as threading.Lock
    participant STS as AWS STS
    participant FH as AWS Firehose

    Task->>GW: put_record(stream, record)
    GW->>GW: _get_firehose_client()
    alt _client is None OR _needs_rebuild()
        GW->>Lock: acquire()
        alt still needs rebuild (double-check)
            GW->>STS: boto3.client("sts").assume_role()
            STS-->>GW: Credentials + Expiration
            GW->>FH: boto3.client("firehose", creds, pool_size)
            FH-->>GW: firehose_client
            GW->>GW: "self._client, self._expiry = client, expiry"
        end
        GW->>Lock: release()
    end
    GW-->>Task: firehose_client
    Task->>FH: client.put_record(stream, data)
    FH-->>Task: response
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Task as Celery Task (greenlet/child)
    participant GW as FirehoseStreamingStorageGateway
    participant Lock as threading.Lock
    participant STS as AWS STS
    participant FH as AWS Firehose

    Task->>GW: put_record(stream, record)
    GW->>GW: _get_firehose_client()
    alt _client is None OR _needs_rebuild()
        GW->>Lock: acquire()
        alt still needs rebuild (double-check)
            GW->>STS: boto3.client("sts").assume_role()
            STS-->>GW: Credentials + Expiration
            GW->>FH: boto3.client("firehose", creds, pool_size)
            FH-->>GW: firehose_client
            GW->>GW: "self._client, self._expiry = client, expiry"
        end
        GW->>Lock: release()
    end
    GW-->>Task: firehose_client
    Task->>FH: client.put_record(stream, data)
    FH-->>Task: response
Loading

Reviews (3): Last reviewed commit: "test(forwarder): export AWS_ENDPOINT_URL..." | Re-trigger Greptile

…328)

The logging post-inference hook calls FirehoseStreamingStorageGateway.put_record()
on every async task, and the gateway rebuilt an STS client + assume_role + a new
Firehose client on every call. Under celery prefork (workers are long-lived, no
child recycling) that per-task client churn grew each worker's RSS until it
OOM-killed the celery-forwarder at its 2Gi limit.

Cache the Firehose client (built once, backed by botocore RefreshableCredentials so
the assumed role auto-refreshes before expiry), removing the churn and a per-task STS
round-trip. Also add an opt-in gevent worker pool for the IO-bound forwarder; prefork
stays the default and is unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@socket-security

socket-security Bot commented Jun 27, 2026

Copy link
Copy Markdown

Review the following changes in direct dependencies. Learn more about Socket for GitHub.

Diff Package Supply Chain
Security
Vulnerability Quality Maintenance License
Addedpypi/​gevent@​26.5.073100100100100
Addedpypi/​zope-interface@​8.599100100100100
Addedpypi/​zope-event@​6.2100100100100100

View full report

lorenzo-norcini-scale and others added 10 commits June 27, 2026 17:33
Ignore E402 in celery_forwarder.py (gevent monkey-patch must precede imports).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ork (MLI-7328)

Opt-in prefork child recycling as defense-in-depth against per-task memory residue; off by default, no effect under gevent. Adds worker-pool wiring unit tests.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
eventlet was never requested, validated, or added as a dependency (would crash on import). Only gevent is supported.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Validate the pool env against {prefork, gevent} at import and raise a clear error, instead of passing a bad value to celery and failing cryptically at worker start.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- SQS integration test read results from redis db0 but the worker writes db1; fix to db1.
- Guard int() parsing of FIREHOSE_CLIENT_MAX_POOL_CONNECTIONS and CELERY_WORKER_MAX_TASKS_PER_CHILD (warn + fall back instead of crash-looping).
- Integration tests: warm up before timing concurrency; gate warm-shutdown on STARTED only; join stub_main proc; set USE_REDIS_LOCALHOST so the in-process producer and skip guard agree.
- Add firehose build-failure-leaves-client-unset-and-retries unit test.
- Correct the s3 per-greenlet cache comment (id reuse bounds it); document the private botocore _credentials assumption; use ==gevent for pool selection.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…(MLI-7328)

- Add a CI smoke test that boots gevent under the real prod launch (ddtrace-run python -m ... --help) and fails on the import-lock ordering crash; assert socket+ssl patched.
- Switch the pool subprocess tests to signal via exit code (child does the assert) instead of string-matching stdout/stderr, which was brittle.
- Correct the patch-ordering comment: safe because launched as __main__ via -m, not because a plain import under ddtrace-run is safe (it deadlocks the import lock).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- subprocess tests get timeout= so a deadlock regression fails instead of hanging CI.
- USE_REDIS_LOCALHOST set via per-test autouse fixture (auto-restored), not module-level env mutation at collection.
- warm-shutdown test warms up first so boot can't eat the STARTED window.
- stub_main teardown kills the proc if it survives join (no leaked port 5005).
- assert the firehose client is built with the sized connection pool Config (guards a silent drop).
- dedupe the firehose default to one local; drop the point-in-time benchmark numbers from the s3 comment (keep the mechanism).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- stub_main teardown: join() after kill() so the SIGKILL'd child is reaped and port 5005 is released.
- firehose Config assertion compares to the module's _FIREHOSE_MAX_POOL_CONNECTIONS instead of a hardcoded 50 (decouples from the env being unset).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ivate _credentials (MLI-7328)

Replace the RefreshableCredentials + botocore_session._credentials private-attr injection with a public-API approach: cache the client and rebuild it shortly before the assumed-role token expires (once per token lifetime, not per task). No botocore-internal access. Tests assert built-once-while-valid, rebuild-on-near-expiry, and the assumed-role creds + pool Config wiring.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Tighten the gevent/pool comments to the load-bearing points; use None (not 0) as the parse-failed sentinel so the positive-int check reads directly.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@lorenzo-norcini-scale lorenzo-norcini-scale marked this pull request as ready for review June 29, 2026 19:12
lorenzo-norcini-scale and others added 2 commits June 29, 2026 15:42
…iry (MLI-7328)

Greptile P1: _build_client set self._expiry before the new client was constructed, so a failed refresh advanced expiry while keeping the old client -> future calls skipped rebuilding and served a stale/expiring client. _build_client is now pure (returns (client, expiry)); the caller assigns both only on success, so a failed (re)build leaves prior state intact and retries. Adds a regression test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… (MLI-7328)

The SQS integration test's worker subprocess and in-process producer build
kombu/botocore SQS clients with no explicit endpoint, so they only reach
localstack when AWS_ENDPOINT_URL is in the process env. The skip-gate probes
localstack with an explicit endpoint_url, so without this the test green-lights
itself while the clients hit real AWS. Add an autouse fixture that exports it
per-test (auto-restored), mirroring the existing localhost-redis fixture.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant