From 972913cb2dae08f8009e4ceef92b22cc424f6624 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Tue, 30 Jun 2026 15:44:30 +0200 Subject: [PATCH 1/3] feat: gated tracemalloc heap-dump for one-pod prod profiling Diagnostic to find what actually holds the resident memory under backup load (the OOM), instead of inferring. Enabled only when S3PROXY_TRACEMALLOC is set (zero overhead otherwise): starts tracemalloc at startup and logs the top live Python allocations (size + call site) every S3PROXY_TRACEMALLOC_INTERVAL secs and on SIGUSR1. Chart gains an extraConfig passthrough so one replica can set the flag via values; revert after capture. --- chart/templates/s3proxy/configmap.yaml | 4 ++ chart/values.yaml | 5 ++- s3proxy/app.py | 54 ++++++++++++++++++++++++ tests/unit/test_tracemalloc_profiling.py | 41 ++++++++++++++++++ 4 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 tests/unit/test_tracemalloc_profiling.py diff --git a/chart/templates/s3proxy/configmap.yaml b/chart/templates/s3proxy/configmap.yaml index 4ed8a93..ed3cd19 100644 --- a/chart/templates/s3proxy/configmap.yaml +++ b/chart/templates/s3proxy/configmap.yaml @@ -22,3 +22,7 @@ data: S3PROXY_DASHBOARD_UI: "true" S3PROXY_DASHBOARD_PATH: {{ .Values.dashboard.path | quote }} {{- end }} + {{- /* Arbitrary extra env (e.g. diagnostics like S3PROXY_TRACEMALLOC=1). */ -}} + {{- range $k, $v := .Values.extraConfig }} + {{ $k }}: {{ $v | quote }} + {{- end }} diff --git a/chart/values.yaml b/chart/values.yaml index 42866a6..63c2a2f 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -264,4 +264,7 @@ topologySpreadConstraints: [] podDisruptionBudget: enabled: true minAvailable: 1 - # maxUnavailable: 1 # Alternative to minAvailable \ No newline at end of file + # maxUnavailable: 1 # Alternative to minAvailable +# Arbitrary extra S3PROXY_* env, injected via the config ConfigMap (envFrom). +# Used for time-boxed diagnostics, e.g. extraConfig: { S3PROXY_TRACEMALLOC: "1" }. +extraConfig: {} diff --git a/s3proxy/app.py b/s3proxy/app.py index aaa5c24..06cdbea 100644 --- a/s3proxy/app.py +++ b/s3proxy/app.py @@ -2,10 +2,14 @@ from __future__ import annotations +import asyncio +import contextlib import logging import os +import signal import sys import time +import tracemalloc import uuid from collections.abc import AsyncIterator from contextlib import asynccontextmanager @@ -65,6 +69,52 @@ def _silence_health_probe_access_logs() -> None: access_logger.addFilter(_health_probe_filter) +def _dump_tracemalloc(limit: int = 20) -> None: + """Log the top live Python allocations by size, with location. + + Diagnostic only (gated by S3PROXY_TRACEMALLOC). Lets us see, under real + backup load, exactly which call sites hold the resident memory that drives + the OOM -- instead of guessing. + """ + if not tracemalloc.is_tracing(): + return + snap = tracemalloc.take_snapshot() + stats = snap.statistics("lineno") + total_mb = sum(s.size for s in stats) / 1024 / 1024 + logger.warning("TRACEMALLOC_SNAPSHOT", total_tracked_mb=round(total_mb, 1), shown=limit) + for i, st in enumerate(stats[:limit], 1): + fr = st.traceback[0] + logger.warning( + "TRACEMALLOC_TOP", + rank=i, + size_mb=round(st.size / 1024 / 1024, 2), + count=st.count, + loc=f"{fr.filename}:{fr.lineno}", + ) + + +async def _periodic_tracemalloc(interval: int) -> None: + while True: + await asyncio.sleep(interval) + _dump_tracemalloc() + + +def _maybe_start_tracemalloc() -> asyncio.Task | None: + """Enable tracemalloc + periodic/SIGUSR1 heap dumps when S3PROXY_TRACEMALLOC is set. + + No-op (zero overhead) when unset. Used for one-pod, time-boxed profiling. + """ + if not os.environ.get("S3PROXY_TRACEMALLOC"): + return None + frames = int(os.environ.get("S3PROXY_TRACEMALLOC_FRAMES", "4")) + interval = int(os.environ.get("S3PROXY_TRACEMALLOC_INTERVAL", "15")) + tracemalloc.start(frames) + logger.warning("TRACEMALLOC_ENABLED", frames=frames, interval_sec=interval) + with contextlib.suppress(NotImplementedError, RuntimeError): + asyncio.get_running_loop().add_signal_handler(signal.SIGUSR1, _dump_tracemalloc) + return asyncio.create_task(_periodic_tracemalloc(interval)) + + def create_lifespan(settings: Settings, credentials_store: dict[str, str]) -> AsyncIterator[None]: """Create lifespan context manager for FastAPI app. @@ -114,8 +164,12 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: app.state.stats_store = stats_store app.state.start_time = time.monotonic() + tracemalloc_task = _maybe_start_tracemalloc() + yield + if tracemalloc_task is not None: + tracemalloc_task.cancel() await stats_store.aclose() # flush buffered samples before Redis closes await close_redis() await close_http_client() diff --git a/tests/unit/test_tracemalloc_profiling.py b/tests/unit/test_tracemalloc_profiling.py new file mode 100644 index 0000000..90ff6af --- /dev/null +++ b/tests/unit/test_tracemalloc_profiling.py @@ -0,0 +1,41 @@ +"""Self-check for the gated tracemalloc heap-dump diagnostic. + +Off by default (no env) => zero overhead, no tracing started. When enabled it +must take a snapshot and not raise. Used for one-pod, time-boxed prod profiling +to identify the live allocations driving the OOM. +""" + +import tracemalloc + +from s3proxy import app + + +def test_disabled_by_default(monkeypatch): + monkeypatch.delenv("S3PROXY_TRACEMALLOC", raising=False) + assert app._maybe_start_tracemalloc() is None + + +def test_dump_is_noop_when_not_tracing(): + # Should not raise even if tracemalloc isn't running. + if tracemalloc.is_tracing(): + tracemalloc.stop() + app._dump_tracemalloc() # no exception = pass + + +def test_dump_reports_allocations_when_tracing(): + tracemalloc.start(2) + try: + blob = bytearray(4 * 1024 * 1024) # 4MB, should show up + # Capture warning logs to confirm it emits a snapshot + top lines. + events = [] + import structlog + + app.logger = structlog.wrap_logger( + app.logger, processors=[lambda _l, _m, ev: events.append(ev) or ev] + ) + app._dump_tracemalloc(limit=5) + assert blob is not None + assert any(e.get("event") == "TRACEMALLOC_SNAPSHOT" for e in events) + assert any(e.get("event") == "TRACEMALLOC_TOP" for e in events) + finally: + tracemalloc.stop() From 9cea3ada57089fa030267278d483b6fbf98b8260 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Wed, 1 Jul 2026 07:50:26 +0200 Subject: [PATCH 2/3] feat: memory debug mode - log RSS vs tracked heap + top allocations Turn the tracemalloc diagnostic into a proper memory debug mode. The OOM's defining trait is the gap between real RSS (what the kernel kills on) and the Python-tracked heap: prod hit ~957MB RSS with only ~87MB tracked, i.e. the memory lived in C-level buffers (uvicorn/httptools sockets, allocator), which no top-allocations list can explain. So every interval the mode now logs RSS, tracked, untracked (rss-tracked) and the governor's active bytes side by side, then the top live Python allocations. One dump tells us which world we're in: - large untracked gap -> C-level (transport buffers), not a Python call site - small gap -> Python, and the top list names the exact line Gated by S3PROXY_MEMORY_DEBUG (alias S3PROXY_TRACEMALLOC), zero overhead when unset; dumps every S3PROXY_MEMORY_DEBUG_INTERVAL secs and on SIGUSR1. --- chart/templates/s3proxy/configmap.yaml | 2 +- chart/values.yaml | 5 ++- s3proxy/app.py | 55 ++++++++++++++++++------ tests/unit/test_tracemalloc_profiling.py | 8 +++- 4 files changed, 52 insertions(+), 18 deletions(-) diff --git a/chart/templates/s3proxy/configmap.yaml b/chart/templates/s3proxy/configmap.yaml index ed3cd19..2216fff 100644 --- a/chart/templates/s3proxy/configmap.yaml +++ b/chart/templates/s3proxy/configmap.yaml @@ -22,7 +22,7 @@ data: S3PROXY_DASHBOARD_UI: "true" S3PROXY_DASHBOARD_PATH: {{ .Values.dashboard.path | quote }} {{- end }} - {{- /* Arbitrary extra env (e.g. diagnostics like S3PROXY_TRACEMALLOC=1). */ -}} + {{- /* Arbitrary extra env (e.g. memory debug mode: S3PROXY_MEMORY_DEBUG=1). */ -}} {{- range $k, $v := .Values.extraConfig }} {{ $k }}: {{ $v | quote }} {{- end }} diff --git a/chart/values.yaml b/chart/values.yaml index 63c2a2f..427cc10 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -266,5 +266,8 @@ podDisruptionBudget: minAvailable: 1 # maxUnavailable: 1 # Alternative to minAvailable # Arbitrary extra S3PROXY_* env, injected via the config ConfigMap (envFrom). -# Used for time-boxed diagnostics, e.g. extraConfig: { S3PROXY_TRACEMALLOC: "1" }. +# Used for time-boxed diagnostics, e.g. memory debug mode: +# extraConfig: { S3PROXY_MEMORY_DEBUG: "1" } +# which logs RSS vs tracked heap + top allocations every interval (raise the pod +# memory limit first so it survives long enough to dump). extraConfig: {} diff --git a/s3proxy/app.py b/s3proxy/app.py index 06cdbea..c95f1e5 100644 --- a/s3proxy/app.py +++ b/s3proxy/app.py @@ -21,6 +21,7 @@ from prometheus_client import CONTENT_TYPE_LATEST, generate_latest from structlog.stdlib import BoundLogger +from . import concurrency from .client import SigV4Verifier from .config import Settings from .errors import S3Error, get_s3_error_code @@ -69,23 +70,47 @@ def _silence_health_probe_access_logs() -> None: access_logger.addFilter(_health_probe_filter) -def _dump_tracemalloc(limit: int = 20) -> None: - """Log the top live Python allocations by size, with location. +def _rss_mb() -> float | None: + """Process resident set size in MB from /proc (Linux). None elsewhere.""" + try: + with open("/proc/self/status") as f: + for line in f: + if line.startswith("VmRSS:"): + return int(line.split()[1]) / 1024 # kB -> MB + except OSError: + return None + return None + - Diagnostic only (gated by S3PROXY_TRACEMALLOC). Lets us see, under real - backup load, exactly which call sites hold the resident memory that drives - the OOM -- instead of guessing. +def _dump_tracemalloc(limit: int = 20) -> None: + """Log real RSS vs tracked Python heap + the top live allocations by call site. + + Diagnostic only (memory debug mode). The whole point is the gap: RSS is what + the kernel OOM-kills on, while tracemalloc only sees Python allocations. A + large rss-minus-tracked gap means the memory is C-level (uvicorn/httptools + socket buffers, openssl, allocator retention) -- NOT something a call site in + the top list will explain. A small gap means it IS Python, and the top list + names the exact line. Logging both each interval settles which world we're in. """ if not tracemalloc.is_tracing(): return snap = tracemalloc.take_snapshot() stats = snap.statistics("lineno") - total_mb = sum(s.size for s in stats) / 1024 / 1024 - logger.warning("TRACEMALLOC_SNAPSHOT", total_tracked_mb=round(total_mb, 1), shown=limit) + tracked_mb = sum(s.size for s in stats) / 1024 / 1024 + rss = _rss_mb() + governed_mb = concurrency.get_active_memory() / 1024 / 1024 + logger.warning( + "MEMORY_DEBUG", + rss_mb=round(rss, 1) if rss is not None else None, + tracked_mb=round(tracked_mb, 1), + untracked_mb=round(rss - tracked_mb, 1) if rss is not None else None, + governed_active_mb=round(governed_mb, 1), + shown=limit, + ) for i, st in enumerate(stats[:limit], 1): fr = st.traceback[0] logger.warning( - "TRACEMALLOC_TOP", + "MEMORY_DEBUG_TOP", rank=i, size_mb=round(st.size / 1024 / 1024, 2), count=st.count, @@ -100,16 +125,18 @@ async def _periodic_tracemalloc(interval: int) -> None: def _maybe_start_tracemalloc() -> asyncio.Task | None: - """Enable tracemalloc + periodic/SIGUSR1 heap dumps when S3PROXY_TRACEMALLOC is set. + """Enable memory debug mode (RSS + tracemalloc heap dumps) when requested. - No-op (zero overhead) when unset. Used for one-pod, time-boxed profiling. + Gated by S3PROXY_MEMORY_DEBUG (alias: S3PROXY_TRACEMALLOC). No-op with zero + overhead when unset. Used for one-pod, time-boxed profiling: dumps every + S3PROXY_MEMORY_DEBUG_INTERVAL secs and on SIGUSR1. """ - if not os.environ.get("S3PROXY_TRACEMALLOC"): + if not (os.environ.get("S3PROXY_MEMORY_DEBUG") or os.environ.get("S3PROXY_TRACEMALLOC")): return None - frames = int(os.environ.get("S3PROXY_TRACEMALLOC_FRAMES", "4")) - interval = int(os.environ.get("S3PROXY_TRACEMALLOC_INTERVAL", "15")) + frames = int(os.environ.get("S3PROXY_MEMORY_DEBUG_FRAMES", "4")) + interval = int(os.environ.get("S3PROXY_MEMORY_DEBUG_INTERVAL", "15")) tracemalloc.start(frames) - logger.warning("TRACEMALLOC_ENABLED", frames=frames, interval_sec=interval) + logger.warning("MEMORY_DEBUG_ENABLED", frames=frames, interval_sec=interval, rss_mb=_rss_mb()) with contextlib.suppress(NotImplementedError, RuntimeError): asyncio.get_running_loop().add_signal_handler(signal.SIGUSR1, _dump_tracemalloc) return asyncio.create_task(_periodic_tracemalloc(interval)) diff --git a/tests/unit/test_tracemalloc_profiling.py b/tests/unit/test_tracemalloc_profiling.py index 90ff6af..7dfbe84 100644 --- a/tests/unit/test_tracemalloc_profiling.py +++ b/tests/unit/test_tracemalloc_profiling.py @@ -11,6 +11,7 @@ def test_disabled_by_default(monkeypatch): + monkeypatch.delenv("S3PROXY_MEMORY_DEBUG", raising=False) monkeypatch.delenv("S3PROXY_TRACEMALLOC", raising=False) assert app._maybe_start_tracemalloc() is None @@ -35,7 +36,10 @@ def test_dump_reports_allocations_when_tracing(): ) app._dump_tracemalloc(limit=5) assert blob is not None - assert any(e.get("event") == "TRACEMALLOC_SNAPSHOT" for e in events) - assert any(e.get("event") == "TRACEMALLOC_TOP" for e in events) + snap = next(e for e in events if e.get("event") == "MEMORY_DEBUG") + # The debug line must carry the tracked total; rss/untracked are present + # on Linux (None elsewhere) -- the gap between them is the whole point. + assert "tracked_mb" in snap and "untracked_mb" in snap + assert any(e.get("event") == "MEMORY_DEBUG_TOP" for e in events) finally: tracemalloc.stop() From 0d0da61e9887e366dacd3a983eff6b3f06d05eaa Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Wed, 1 Jul 2026 09:19:33 +0200 Subject: [PATCH 3/3] perf: reuse one aiobotocore client per credential set Creating a client per request built a fresh aiohttp connector + SSLContext that loads the whole CA store each time -- memray showed ~9.5M allocations in _create_connector and repeated load_default_certs. Clients are pool-safe and meant to be long-lived, so cache one per (endpoint, key, region) and close them on shutdown. Big CPU/allocation win; modest (~5MB) memory. --- s3proxy/app.py | 3 +- s3proxy/client/__init__.py | 3 +- s3proxy/client/s3.py | 80 +++++++++++++++++++++------- tests/unit/test_client_reuse.py | 92 +++++++++++++++++++++++++++++++++ 4 files changed, 157 insertions(+), 21 deletions(-) create mode 100644 tests/unit/test_client_reuse.py diff --git a/s3proxy/app.py b/s3proxy/app.py index c95f1e5..330a6cd 100644 --- a/s3proxy/app.py +++ b/s3proxy/app.py @@ -22,7 +22,7 @@ from structlog.stdlib import BoundLogger from . import concurrency -from .client import SigV4Verifier +from .client import SigV4Verifier, close_cached_clients from .config import Settings from .errors import S3Error, get_s3_error_code from .handlers import S3ProxyHandler @@ -200,6 +200,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: await stats_store.aclose() # flush buffered samples before Redis closes await close_redis() await close_http_client() + await close_cached_clients() logger.info("Shutting down") return lifespan diff --git a/s3proxy/client/__init__.py b/s3proxy/client/__init__.py index 2f1fe18..c8b9e9a 100644 --- a/s3proxy/client/__init__.py +++ b/s3proxy/client/__init__.py @@ -1,6 +1,6 @@ """S3 client layer - credentials, verification, and API wrapper.""" -from .s3 import S3Client, get_shared_session +from .s3 import S3Client, close_cached_clients, get_shared_session from .types import ParsedRequest, S3Credentials from .verifier import CLOCK_SKEW_TOLERANCE, SigV4Verifier, _derive_signing_key @@ -11,5 +11,6 @@ "S3Credentials", "SigV4Verifier", "_derive_signing_key", + "close_cached_clients", "get_shared_session", ] diff --git a/s3proxy/client/s3.py b/s3proxy/client/s3.py index ac294c9..3e6f080 100644 --- a/s3proxy/client/s3.py +++ b/s3proxy/client/s3.py @@ -2,6 +2,8 @@ from __future__ import annotations +import asyncio +import contextlib import time from typing import TYPE_CHECKING, Any @@ -29,6 +31,33 @@ def get_shared_session() -> aioboto3.Session: return _shared_session +# Long-lived aiobotocore clients, keyed by (endpoint, access_key, region). Creating +# a client per request builds a fresh aiohttp connector + SSLContext that loads the +# whole CA store each time -- large native (OpenSSL) allocations invisible to +# tracemalloc, which pile up under concurrency and dominate RSS (memray: millions +# of allocations in _create_connector / load_default_certs). aiobotocore clients +# are pool-safe and meant to be reused, so we cache one per credential set and +# keep it open for the app's lifetime (closed via close_cached_clients on shutdown). +_client_cache: dict[tuple, tuple[Any, Any]] = {} +_client_cache_lock: asyncio.Lock | None = None + + +def _cache_lock() -> asyncio.Lock: + global _client_cache_lock + if _client_cache_lock is None: + _client_cache_lock = asyncio.Lock() + return _client_cache_lock + + +async def close_cached_clients() -> None: + """Close all cached aiobotocore clients (call on app shutdown).""" + async with _cache_lock(): + for ctx, _client in _client_cache.values(): + with contextlib.suppress(Exception): + await ctx.__aexit__(None, None, None) + _client_cache.clear() + + def _add_optional_kwargs(kwargs: dict[str, Any], **optional: Any) -> None: """Add non-None optional kwargs to the dict.""" for key, value in optional.items(): @@ -63,28 +92,41 @@ def __init__(self, settings: Settings, credentials: S3Credentials): self._client_context = None async def __aenter__(self): - """Enter async context - create client from shared session.""" - # Use shared session to avoid loading JSON service models repeatedly - # Each new session costs ~30-150MB for botocore service definitions - session = get_shared_session() - self._client_context = session.client( - "s3", - endpoint_url=self.settings.s3_endpoint, - config=self._config, - aws_access_key_id=self.credentials.access_key, - aws_secret_access_key=self.credentials.secret_key, - region_name=self.credentials.region, - ) - self._cached_client = await self._client_context.__aenter__() + """Enter async context - reuse a long-lived client for these credentials.""" + self._cached_client = await self._get_or_create_client() return self + async def _get_or_create_client(self): + key = ( + self.settings.s3_endpoint, + self.credentials.access_key, + self.credentials.secret_key, + self.credentials.region, + ) + cached = _client_cache.get(key) + if cached is not None: + return cached[1] + async with _cache_lock(): + cached = _client_cache.get(key) # double-checked under lock + if cached is not None: + return cached[1] + # Shared session avoids reloading botocore JSON models per client. + ctx = get_shared_session().client( + "s3", + endpoint_url=self.settings.s3_endpoint, + config=self._config, + aws_access_key_id=self.credentials.access_key, + aws_secret_access_key=self.credentials.secret_key, + region_name=self.credentials.region, + ) + client = await ctx.__aenter__() + _client_cache[key] = (ctx, client) + return client + async def __aexit__(self, exc_type, exc_val, exc_tb): - """Exit async context - clean up client.""" - if self._client_context is not None: - await self._client_context.__aexit__(exc_type, exc_val, exc_tb) - self._cached_client = None - self._client_context = None - logger.debug("Cleaned up S3 client context") + """Exit - the client is cached and shared, so it stays open (closed on + app shutdown via close_cached_clients). Nothing to tear down per request.""" + self._cached_client = None async def get_object( self, diff --git a/tests/unit/test_client_reuse.py b/tests/unit/test_client_reuse.py new file mode 100644 index 0000000..616e1a8 --- /dev/null +++ b/tests/unit/test_client_reuse.py @@ -0,0 +1,92 @@ +"""S3Client must reuse one aiobotocore client per credential set. + +Creating a client per request builds a fresh aiohttp connector + SSLContext that +loads the whole CA store each time -- large native allocations (invisible to +tracemalloc) that pile up under concurrency and drive RSS. memray showed millions +of allocations in _create_connector / load_default_certs. Clients are pool-safe +and meant to be long-lived, so the wrapper caches one per (endpoint, key, region) +and reuses it. These tests pin that: same creds => one underlying client; distinct +creds => distinct clients; and they're closed on shutdown. +""" + +import pytest + +from s3proxy.client import s3 +from s3proxy.client.types import S3Credentials + + +class _FakeCtx: + def __init__(self, client): + self._client = client + + async def __aenter__(self): + return self._client + + async def __aexit__(self, *a): + self._client.closed = True + return False + + +class _FakeClient: + def __init__(self, n): + self.n = n + self.closed = False + + +class _FakeSession: + def __init__(self): + self.calls = 0 + + def client(self, *a, **k): + self.calls += 1 + return _FakeCtx(_FakeClient(self.calls)) + + +class _Settings: + s3_endpoint = "http://minio:9000" + + +def _creds(key="AKIA1"): + return S3Credentials(access_key=key, secret_key="s", region="us-east-1") + + +@pytest.fixture(autouse=True) +async def _clean(monkeypatch): + fake = _FakeSession() + monkeypatch.setattr(s3, "get_shared_session", lambda: fake) + await s3.close_cached_clients() + yield fake + await s3.close_cached_clients() + + +@pytest.mark.asyncio +async def test_same_credentials_reuse_one_client(_clean): + settings = _Settings() + async with s3.S3Client(settings, _creds()) as c1: + first = c1._cached_client + async with s3.S3Client(settings, _creds()) as c2: + second = c2._cached_client + assert first is second # reused + assert _clean.calls == 1 # session.client() called once, not per request + assert first.closed is False # not torn down between requests + + +@pytest.mark.asyncio +async def test_distinct_credentials_distinct_clients(_clean): + settings = _Settings() + async with s3.S3Client(settings, _creds("AKIA1")) as c1: + a = c1._cached_client + async with s3.S3Client(settings, _creds("AKIA2")) as c2: + b = c2._cached_client + assert a is not b + assert _clean.calls == 2 + + +@pytest.mark.asyncio +async def test_close_cached_clients_tears_down(_clean): + settings = _Settings() + async with s3.S3Client(settings, _creds()) as c: + client = c._cached_client + await s3.close_cached_clients() + assert client.closed is True + assert len(s3._client_cache) == 0