Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions chart/templates/s3proxy/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ data:
S3PROXY_DASHBOARD_UI: "true"
S3PROXY_DASHBOARD_PATH: {{ .Values.dashboard.path | quote }}
{{- end }}
{{- /* Arbitrary extra env (e.g. memory debug mode: S3PROXY_MEMORY_DEBUG=1). */ -}}
{{- range $k, $v := .Values.extraConfig }}
{{ $k }}: {{ $v | quote }}
{{- end }}
8 changes: 7 additions & 1 deletion chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,10 @@ topologySpreadConstraints: []
podDisruptionBudget:
enabled: true
minAvailable: 1
# maxUnavailable: 1 # Alternative to minAvailable
# maxUnavailable: 1 # Alternative to minAvailable
# Arbitrary extra S3PROXY_* env, injected via the config ConfigMap (envFrom).
# Used for time-boxed diagnostics, e.g. memory debug mode:
# extraConfig: { S3PROXY_MEMORY_DEBUG: "1" }
# which logs RSS vs tracked heap + top allocations every interval (raise the pod
# memory limit first so it survives long enough to dump).
extraConfig: {}
84 changes: 83 additions & 1 deletion s3proxy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

from __future__ import annotations

import asyncio
import contextlib
import logging
import os
import signal
import sys
import time
import tracemalloc
import uuid
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
Expand All @@ -17,7 +21,8 @@
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from structlog.stdlib import BoundLogger

from .client import SigV4Verifier
from . import concurrency
from .client import SigV4Verifier, close_cached_clients
from .config import Settings
from .errors import S3Error, get_s3_error_code
from .handlers import S3ProxyHandler
Expand Down Expand Up @@ -65,6 +70,78 @@ def _silence_health_probe_access_logs() -> None:
access_logger.addFilter(_health_probe_filter)


def _rss_mb() -> float | None:
"""Process resident set size in MB from /proc (Linux). None elsewhere."""
try:
with open("/proc/self/status") as f:
for line in f:
if line.startswith("VmRSS:"):
return int(line.split()[1]) / 1024 # kB -> MB
except OSError:
return None
return None


def _dump_tracemalloc(limit: int = 20) -> None:
"""Log real RSS vs tracked Python heap + the top live allocations by call site.

Diagnostic only (memory debug mode). The whole point is the gap: RSS is what
the kernel OOM-kills on, while tracemalloc only sees Python allocations. A
large rss-minus-tracked gap means the memory is C-level (uvicorn/httptools
socket buffers, openssl, allocator retention) -- NOT something a call site in
the top list will explain. A small gap means it IS Python, and the top list
names the exact line. Logging both each interval settles which world we're in.
"""
if not tracemalloc.is_tracing():
return
snap = tracemalloc.take_snapshot()
stats = snap.statistics("lineno")
tracked_mb = sum(s.size for s in stats) / 1024 / 1024
rss = _rss_mb()
governed_mb = concurrency.get_active_memory() / 1024 / 1024
logger.warning(
"MEMORY_DEBUG",
rss_mb=round(rss, 1) if rss is not None else None,
tracked_mb=round(tracked_mb, 1),
untracked_mb=round(rss - tracked_mb, 1) if rss is not None else None,
governed_active_mb=round(governed_mb, 1),
shown=limit,
)
for i, st in enumerate(stats[:limit], 1):
fr = st.traceback[0]
logger.warning(
"MEMORY_DEBUG_TOP",
rank=i,
size_mb=round(st.size / 1024 / 1024, 2),
count=st.count,
loc=f"{fr.filename}:{fr.lineno}",
)


async def _periodic_tracemalloc(interval: int) -> None:
while True:
await asyncio.sleep(interval)
_dump_tracemalloc()


def _maybe_start_tracemalloc() -> asyncio.Task | None:
"""Enable memory debug mode (RSS + tracemalloc heap dumps) when requested.

Gated by S3PROXY_MEMORY_DEBUG (alias: S3PROXY_TRACEMALLOC). No-op with zero
overhead when unset. Used for one-pod, time-boxed profiling: dumps every
S3PROXY_MEMORY_DEBUG_INTERVAL secs and on SIGUSR1.
"""
if not (os.environ.get("S3PROXY_MEMORY_DEBUG") or os.environ.get("S3PROXY_TRACEMALLOC")):
return None
frames = int(os.environ.get("S3PROXY_MEMORY_DEBUG_FRAMES", "4"))
interval = int(os.environ.get("S3PROXY_MEMORY_DEBUG_INTERVAL", "15"))
tracemalloc.start(frames)
logger.warning("MEMORY_DEBUG_ENABLED", frames=frames, interval_sec=interval, rss_mb=_rss_mb())
with contextlib.suppress(NotImplementedError, RuntimeError):
asyncio.get_running_loop().add_signal_handler(signal.SIGUSR1, _dump_tracemalloc)
return asyncio.create_task(_periodic_tracemalloc(interval))


def create_lifespan(settings: Settings, credentials_store: dict[str, str]) -> AsyncIterator[None]:
"""Create lifespan context manager for FastAPI app.

Expand Down Expand Up @@ -114,11 +191,16 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
app.state.stats_store = stats_store
app.state.start_time = time.monotonic()

tracemalloc_task = _maybe_start_tracemalloc()

yield

if tracemalloc_task is not None:
tracemalloc_task.cancel()
await stats_store.aclose() # flush buffered samples before Redis closes
await close_redis()
await close_http_client()
await close_cached_clients()
logger.info("Shutting down")

return lifespan
Expand Down
3 changes: 2 additions & 1 deletion s3proxy/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""S3 client layer - credentials, verification, and API wrapper."""

from .s3 import S3Client, get_shared_session
from .s3 import S3Client, close_cached_clients, get_shared_session
from .types import ParsedRequest, S3Credentials
from .verifier import CLOCK_SKEW_TOLERANCE, SigV4Verifier, _derive_signing_key

Expand All @@ -11,5 +11,6 @@
"S3Credentials",
"SigV4Verifier",
"_derive_signing_key",
"close_cached_clients",
"get_shared_session",
]
80 changes: 61 additions & 19 deletions s3proxy/client/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import asyncio
import contextlib
import time
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -29,6 +31,33 @@ def get_shared_session() -> aioboto3.Session:
return _shared_session


# Long-lived aiobotocore clients, keyed by (endpoint, access_key, region). Creating
# a client per request builds a fresh aiohttp connector + SSLContext that loads the
# whole CA store each time -- large native (OpenSSL) allocations invisible to
# tracemalloc, which pile up under concurrency and dominate RSS (memray: millions
# of allocations in _create_connector / load_default_certs). aiobotocore clients
# are pool-safe and meant to be reused, so we cache one per credential set and
# keep it open for the app's lifetime (closed via close_cached_clients on shutdown).
_client_cache: dict[tuple, tuple[Any, Any]] = {}
_client_cache_lock: asyncio.Lock | None = None


def _cache_lock() -> asyncio.Lock:
global _client_cache_lock
if _client_cache_lock is None:
_client_cache_lock = asyncio.Lock()
return _client_cache_lock


async def close_cached_clients() -> None:
"""Close all cached aiobotocore clients (call on app shutdown)."""
async with _cache_lock():
for ctx, _client in _client_cache.values():
with contextlib.suppress(Exception):
await ctx.__aexit__(None, None, None)
_client_cache.clear()


def _add_optional_kwargs(kwargs: dict[str, Any], **optional: Any) -> None:
"""Add non-None optional kwargs to the dict."""
for key, value in optional.items():
Expand Down Expand Up @@ -63,28 +92,41 @@ def __init__(self, settings: Settings, credentials: S3Credentials):
self._client_context = None

async def __aenter__(self):
"""Enter async context - create client from shared session."""
# Use shared session to avoid loading JSON service models repeatedly
# Each new session costs ~30-150MB for botocore service definitions
session = get_shared_session()
self._client_context = session.client(
"s3",
endpoint_url=self.settings.s3_endpoint,
config=self._config,
aws_access_key_id=self.credentials.access_key,
aws_secret_access_key=self.credentials.secret_key,
region_name=self.credentials.region,
)
self._cached_client = await self._client_context.__aenter__()
"""Enter async context - reuse a long-lived client for these credentials."""
self._cached_client = await self._get_or_create_client()
return self

async def _get_or_create_client(self):
key = (
self.settings.s3_endpoint,
self.credentials.access_key,
self.credentials.secret_key,
self.credentials.region,
)
cached = _client_cache.get(key)
if cached is not None:
return cached[1]
async with _cache_lock():
cached = _client_cache.get(key) # double-checked under lock
if cached is not None:
return cached[1]
# Shared session avoids reloading botocore JSON models per client.
ctx = get_shared_session().client(
"s3",
endpoint_url=self.settings.s3_endpoint,
config=self._config,
aws_access_key_id=self.credentials.access_key,
aws_secret_access_key=self.credentials.secret_key,
region_name=self.credentials.region,
)
client = await ctx.__aenter__()
_client_cache[key] = (ctx, client)
return client

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context - clean up client."""
if self._client_context is not None:
await self._client_context.__aexit__(exc_type, exc_val, exc_tb)
self._cached_client = None
self._client_context = None
logger.debug("Cleaned up S3 client context")
"""Exit - the client is cached and shared, so it stays open (closed on
app shutdown via close_cached_clients). Nothing to tear down per request."""
self._cached_client = None

async def get_object(
self,
Expand Down
92 changes: 92 additions & 0 deletions tests/unit/test_client_reuse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""S3Client must reuse one aiobotocore client per credential set.

Creating a client per request builds a fresh aiohttp connector + SSLContext that
loads the whole CA store each time -- large native allocations (invisible to
tracemalloc) that pile up under concurrency and drive RSS. memray showed millions
of allocations in _create_connector / load_default_certs. Clients are pool-safe
and meant to be long-lived, so the wrapper caches one per (endpoint, key, region)
and reuses it. These tests pin that: same creds => one underlying client; distinct
creds => distinct clients; and they're closed on shutdown.
"""

import pytest

from s3proxy.client import s3
from s3proxy.client.types import S3Credentials


class _FakeCtx:
def __init__(self, client):
self._client = client

async def __aenter__(self):
return self._client

async def __aexit__(self, *a):
self._client.closed = True
return False


class _FakeClient:
def __init__(self, n):
self.n = n
self.closed = False


class _FakeSession:
def __init__(self):
self.calls = 0

def client(self, *a, **k):
self.calls += 1
return _FakeCtx(_FakeClient(self.calls))


class _Settings:
s3_endpoint = "http://minio:9000"


def _creds(key="AKIA1"):
return S3Credentials(access_key=key, secret_key="s", region="us-east-1")


@pytest.fixture(autouse=True)
async def _clean(monkeypatch):
fake = _FakeSession()
monkeypatch.setattr(s3, "get_shared_session", lambda: fake)
await s3.close_cached_clients()
yield fake
await s3.close_cached_clients()


@pytest.mark.asyncio
async def test_same_credentials_reuse_one_client(_clean):
settings = _Settings()
async with s3.S3Client(settings, _creds()) as c1:
first = c1._cached_client
async with s3.S3Client(settings, _creds()) as c2:
second = c2._cached_client
assert first is second # reused
assert _clean.calls == 1 # session.client() called once, not per request
assert first.closed is False # not torn down between requests


@pytest.mark.asyncio
async def test_distinct_credentials_distinct_clients(_clean):
settings = _Settings()
async with s3.S3Client(settings, _creds("AKIA1")) as c1:
a = c1._cached_client
async with s3.S3Client(settings, _creds("AKIA2")) as c2:
b = c2._cached_client
assert a is not b
assert _clean.calls == 2


@pytest.mark.asyncio
async def test_close_cached_clients_tears_down(_clean):
settings = _Settings()
async with s3.S3Client(settings, _creds()) as c:
client = c._cached_client
await s3.close_cached_clients()
assert client.closed is True
assert len(s3._client_cache) == 0
45 changes: 45 additions & 0 deletions tests/unit/test_tracemalloc_profiling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Self-check for the gated tracemalloc heap-dump diagnostic.

Off by default (no env) => zero overhead, no tracing started. When enabled it
must take a snapshot and not raise. Used for one-pod, time-boxed prod profiling
to identify the live allocations driving the OOM.
"""

import tracemalloc

from s3proxy import app


def test_disabled_by_default(monkeypatch):
monkeypatch.delenv("S3PROXY_MEMORY_DEBUG", raising=False)
monkeypatch.delenv("S3PROXY_TRACEMALLOC", raising=False)
assert app._maybe_start_tracemalloc() is None


def test_dump_is_noop_when_not_tracing():
# Should not raise even if tracemalloc isn't running.
if tracemalloc.is_tracing():
tracemalloc.stop()
app._dump_tracemalloc() # no exception = pass


def test_dump_reports_allocations_when_tracing():
tracemalloc.start(2)
try:
blob = bytearray(4 * 1024 * 1024) # 4MB, should show up
# Capture warning logs to confirm it emits a snapshot + top lines.
events = []
import structlog

app.logger = structlog.wrap_logger(
app.logger, processors=[lambda _l, _m, ev: events.append(ev) or ev]
)
app._dump_tracemalloc(limit=5)
assert blob is not None
snap = next(e for e in events if e.get("event") == "MEMORY_DEBUG")
# The debug line must carry the tracked total; rss/untracked are present
# on Linux (None elsewhere) -- the gap between them is the whole point.
assert "tracked_mb" in snap and "untracked_mb" in snap
assert any(e.get("event") == "MEMORY_DEBUG_TOP" for e in events)
finally:
tracemalloc.stop()