fix(forwarder): cache Firehose client + add gevent worker pool (MLI-7328)#850
Open
lorenzo-norcini-scale wants to merge 13 commits into
Open
fix(forwarder): cache Firehose client + add gevent worker pool (MLI-7328)#850lorenzo-norcini-scale wants to merge 13 commits into
lorenzo-norcini-scale wants to merge 13 commits into
Conversation
…328) The logging post-inference hook calls FirehoseStreamingStorageGateway.put_record() on every async task, and the gateway rebuilt an STS client + assume_role + a new Firehose client on every call. Under celery prefork (workers are long-lived, no child recycling) that per-task client churn grew each worker's RSS until it OOM-killed the celery-forwarder at its 2Gi limit. Cache the Firehose client (built once, backed by botocore RefreshableCredentials so the assumed role auto-refreshes before expiry), removing the churn and a per-task STS round-trip. Also add an opt-in gevent worker pool for the IO-bound forwarder; prefork stays the default and is unchanged. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
|
Ignore E402 in celery_forwarder.py (gevent monkey-patch must precede imports). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ork (MLI-7328) Opt-in prefork child recycling as defense-in-depth against per-task memory residue; off by default, no effect under gevent. Adds worker-pool wiring unit tests. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
eventlet was never requested, validated, or added as a dependency (would crash on import). Only gevent is supported. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Validate the pool env against {prefork, gevent} at import and raise a clear error, instead of passing a bad value to celery and failing cryptically at worker start.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- SQS integration test read results from redis db0 but the worker writes db1; fix to db1. - Guard int() parsing of FIREHOSE_CLIENT_MAX_POOL_CONNECTIONS and CELERY_WORKER_MAX_TASKS_PER_CHILD (warn + fall back instead of crash-looping). - Integration tests: warm up before timing concurrency; gate warm-shutdown on STARTED only; join stub_main proc; set USE_REDIS_LOCALHOST so the in-process producer and skip guard agree. - Add firehose build-failure-leaves-client-unset-and-retries unit test. - Correct the s3 per-greenlet cache comment (id reuse bounds it); document the private botocore _credentials assumption; use ==gevent for pool selection. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…(MLI-7328) - Add a CI smoke test that boots gevent under the real prod launch (ddtrace-run python -m ... --help) and fails on the import-lock ordering crash; assert socket+ssl patched. - Switch the pool subprocess tests to signal via exit code (child does the assert) instead of string-matching stdout/stderr, which was brittle. - Correct the patch-ordering comment: safe because launched as __main__ via -m, not because a plain import under ddtrace-run is safe (it deadlocks the import lock). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- subprocess tests get timeout= so a deadlock regression fails instead of hanging CI. - USE_REDIS_LOCALHOST set via per-test autouse fixture (auto-restored), not module-level env mutation at collection. - warm-shutdown test warms up first so boot can't eat the STARTED window. - stub_main teardown kills the proc if it survives join (no leaked port 5005). - assert the firehose client is built with the sized connection pool Config (guards a silent drop). - dedupe the firehose default to one local; drop the point-in-time benchmark numbers from the s3 comment (keep the mechanism). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- stub_main teardown: join() after kill() so the SIGKILL'd child is reaped and port 5005 is released. - firehose Config assertion compares to the module's _FIREHOSE_MAX_POOL_CONNECTIONS instead of a hardcoded 50 (decouples from the env being unset). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ivate _credentials (MLI-7328) Replace the RefreshableCredentials + botocore_session._credentials private-attr injection with a public-API approach: cache the client and rebuild it shortly before the assumed-role token expires (once per token lifetime, not per task). No botocore-internal access. Tests assert built-once-while-valid, rebuild-on-near-expiry, and the assumed-role creds + pool Config wiring. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Tighten the gevent/pool comments to the load-bearing points; use None (not 0) as the parse-failed sentinel so the positive-int check reads directly. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…iry (MLI-7328) Greptile P1: _build_client set self._expiry before the new client was constructed, so a failed refresh advanced expiry while keeping the old client -> future calls skipped rebuilding and served a stale/expiring client. _build_client is now pure (returns (client, expiry)); the caller assigns both only on success, so a failed (re)build leaves prior state intact and retries. Adds a regression test. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… (MLI-7328) The SQS integration test's worker subprocess and in-process producer build kombu/botocore SQS clients with no explicit endpoint, so they only reach localstack when AWS_ENDPOINT_URL is in the process env. The skip-gate probes localstack with an explicit endpoint_url, so without this the test green-lights itself while the clients hit real AWS. Add an autouse fixture that exports it per-test (auto-restored), mirroring the existing localhost-redis fixture. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem (MLI-7328)
The
loggingpost-inference hook callsFirehoseStreamingStorageGateway.put_record()on every async task. The gateway rebuilt an STS client, calledassume_role, and built a new Firehose client on every call. Under the celery prefork pool the workers are long-lived with no child recycling, so that per-task client churn grew each worker's RSS until the celery-forwarder was OOM-killed at its 2Gi limit (it OOMs while holding ~no in-flight work, matching the reported symptom).Fix
CELERY_WORKER_POOL=gevent): the forwarder is IO-bound, so gevent runs the same concurrency in one process instead of N, at much lower memory. Monkey-patch happens before imports, in the right order for ddtrace context. Prefork stays the default; the gevent monkey-patch and pool selection are skipped entirely under it.CELERY_WORKER_MAX_TASKS_PER_CHILD): recycle each prefork child after N tasks so the OS reclaims any per-task residue (e.g. glibc arena fragmentation). Off unless set; defense-in-depth, independent of the cache fix. No effect under gevent.Changes
inference/infra/gateways/firehose_streaming_storage_gateway.py: cached client, rebuilt ahead of assumed-role token expiry via publicboto3.client(..., aws_session_token=...)(no botocore-internal credential injection); lazy init guarded by a lock and a sized connection pool (both for gevent-safety; no-ops under prefork).inference/forwarding/celery_forwarder.py: guarded gevent monkey-patch + pool selection + env-gatedmax_tasks_per_child(prefork).requirements.{in,txt}: addgevent.core/celery/s3.py: comment noting the per-thread result-backend cache is per-greenlet (bounded) under gevent.max_tasks_per_childgating);test_forwarder_pool.pyintegration tests (prefork, gevent, concurrency, SQS, warm-shutdown).Behavior / rollout
CELERY_WORKER_POOL=geventis set.Validation (full detail in the linked doc)
Notes for reviewer
requirements.txtwas edited by hand; please regenerate with pip-compile to confirm pins.🤖 Generated with Claude Code
Greptile Summary
This PR fixes the celery-forwarder OOM by caching the Firehose boto3 client per worker (rebuilding only before assumed-role token expiry, not per task) and adds an opt-in gevent worker pool for lower memory footprint at equal concurrency.
firehose_streaming_storage_gateway.py): a pure_build_client()returns(client, expiry)without touchingself; the caller atomically assigns both under a double-checked lock, so a failed rebuild never advances_expirypast a client that was never installed. The lock isthreading.Lock, which becomes greenlet-aware undermonkey.patch_all()for gevent safety.celery_forwarder.py):monkey.patch_all()is guarded byCELERY_WORKER_POOL=geventand runs before all other imports at module top-of-file; prefork workers are entirely unaffected. An env-gatedmax_tasks_per_childknob provides defense-in-depth for prefork.Confidence Score: 5/5
Safe to merge — the client-caching logic is correct under both prefork and gevent, well-tested, and the gevent pool is opt-in behind an env var that defaults to the existing prefork behaviour.
The core fix (cached Firehose client with double-checked locking and a pure _build_client) is safe under both worker modes. Under prefork each child process holds its own instance so there is no cross-process sharing; under gevent cooperative scheduling ensures the unguarded outer check and the non-atomic tuple assignment are both safe. The test suite covers the key failure modes (first-build failure, refresh failure, rebuild-before-expiry). The gevent change is fully opt-in and leaves the default prefork path byte-for-byte unchanged.
requirements.txt was hand-edited; worth regenerating with pip-compile to confirm the gevent/zope pin graph is correct before the next dependency bump.
Important Files Changed
Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant Task as Celery Task (greenlet/child) participant GW as FirehoseStreamingStorageGateway participant Lock as threading.Lock participant STS as AWS STS participant FH as AWS Firehose Task->>GW: put_record(stream, record) GW->>GW: _get_firehose_client() alt _client is None OR _needs_rebuild() GW->>Lock: acquire() alt still needs rebuild (double-check) GW->>STS: boto3.client("sts").assume_role() STS-->>GW: Credentials + Expiration GW->>FH: boto3.client("firehose", creds, pool_size) FH-->>GW: firehose_client GW->>GW: "self._client, self._expiry = client, expiry" end GW->>Lock: release() end GW-->>Task: firehose_client Task->>FH: client.put_record(stream, data) FH-->>Task: response%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant Task as Celery Task (greenlet/child) participant GW as FirehoseStreamingStorageGateway participant Lock as threading.Lock participant STS as AWS STS participant FH as AWS Firehose Task->>GW: put_record(stream, record) GW->>GW: _get_firehose_client() alt _client is None OR _needs_rebuild() GW->>Lock: acquire() alt still needs rebuild (double-check) GW->>STS: boto3.client("sts").assume_role() STS-->>GW: Credentials + Expiration GW->>FH: boto3.client("firehose", creds, pool_size) FH-->>GW: firehose_client GW->>GW: "self._client, self._expiry = client, expiry" end GW->>Lock: release() end GW-->>Task: firehose_client Task->>FH: client.put_record(stream, data) FH-->>Task: responseReviews (3): Last reviewed commit: "test(forwarder): export AWS_ENDPOINT_URL..." | Re-trigger Greptile