Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions .github/workflows/pynumaflow-lite.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
name: pynumaflow-lite

on:
push:
branches: ["main", "release/*"]
paths:
- "packages/pynumaflow-lite/**"
- ".github/workflows/pynumaflow-lite.yml"
pull_request:
branches: ["main", "release/*"]
paths:
- "packages/pynumaflow-lite/**"
- ".github/workflows/pynumaflow-lite.yml"

permissions:
contents: read

jobs:
lint:
name: Format and lint
runs-on: ubuntu-24.04
defaults:
run:
working-directory: packages/pynumaflow-lite

steps:
- uses: actions/checkout@v7

- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.10"

- name: Install uv
uses: astral-sh/setup-uv@v8.1.0
with:
python-version: "3.10"
working-directory: packages/pynumaflow-lite

- name: Set up Rust
run: |
rustup toolchain install stable --profile minimal --component clippy --component rustfmt
rustup default stable

- name: Install dependencies
run: uv sync --group dev

- name: Check Rust formatting
run: cargo fmt --all --check

- name: Check Python formatting
run: uv run ruff format --check pynumaflow_lite/ tests/ manifests/

- name: Ruff lint
run: uv run ruff check .

- name: Clippy
run: cargo clippy --workspace --all-targets --all-features -- -D warnings -A clippy::module_inception

test:
name: Test ${{ matrix.platform.artifact }}
needs: lint
runs-on: ${{ matrix.platform.os }}
defaults:
run:
working-directory: packages/pynumaflow-lite

strategy:
fail-fast: false
matrix:
platform:
- os: ubuntu-24.04
artifact: linux-x86_64
- os: macos-15
artifact: macos-aarch64

steps:
- uses: actions/checkout@v7

- name: Install uv
uses: astral-sh/setup-uv@v8.1.0
with:
working-directory: packages/pynumaflow-lite

- name: Set up Rust
run: |
rustup toolchain install stable --profile minimal
rustup default stable

- name: Run tests for Python versions
shell: bash
run: |
set -euo pipefail

for python_version in 3.10 3.11 3.12 3.13 3.14; do
echo "::group::Python ${python_version}"
uv python install "${python_version}"
export UV_PYTHON="${python_version}"
uv sync --group dev --python "${python_version}"
uv run --python "${python_version}" maturin develop

uv run --python "${python_version}" pytest -v

# cargo test embeds a Python interpreter via pyo3 (Python::initialize),
# which requires pinning the uv-managed interpreter at three layers:
# - PYO3_PYTHON: the interpreter pyo3's build script links against.
# - PYTHONHOME: where the embedded interpreter finds its stdlib at
# runtime; mismatch surfaces as "No module named 'encodings'".
# - LD_LIBRARY_PATH: where the dynamic loader finds libpython at load
# time. On Linux the test binary links libpython3.x.so, which lives
# under the interpreter's lib dir and is not on the default search
# path; missing it surfaces as "error while loading shared libraries:
# libpython3.x.so". (Harmless on macOS, which resolves differently.)
python_bin="$(uv run --python "${python_version}" python -c 'import sys; print(sys.executable)')"
export PYO3_PYTHON="${python_bin}"
export PYTHONHOME="$("${python_bin}" -c 'import sys; print(sys.base_prefix)')"
python_libdir="$("${python_bin}" -c 'import sysconfig; print(sysconfig.get_config_var("LIBDIR"))')"
export LD_LIBRARY_PATH="${python_libdir}${LD_LIBRARY_PATH:+:${LD_LIBRARY_PATH}}"
cargo test
unset PYO3_PYTHON PYTHONHOME LD_LIBRARY_PATH

unset UV_PYTHON
echo "::endgroup::"
done

build-wheels:
name: Build wheel ${{ matrix.platform.artifact }} py${{ matrix.python-version }}
needs: test
runs-on: ${{ matrix.platform.os }}
defaults:
run:
working-directory: packages/pynumaflow-lite

strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
platform:
- os: ubuntu-24.04
target: x86_64-unknown-linux-gnu
artifact: linux-x86_64
manylinux: "2014"
linux: true
- os: macos-15
target: aarch64-apple-darwin
artifact: macos-aarch64
manylinux: "off"
linux: false

steps:
- uses: actions/checkout@v7

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python-version }}

- name: Set up Rust
run: |
rustup toolchain install stable --profile minimal
rustup default stable

- name: Build wheel
uses: PyO3/maturin-action@v1
with:
command: build
working-directory: packages/pynumaflow-lite
target: ${{ matrix.platform.target }}
manylinux: ${{ matrix.platform.manylinux }}
args: >-
--release
--out dist
-i ${{ matrix.platform.linux && format('python{0}', matrix.python-version) || 'python' }}

- name: Upload wheel artifact
uses: actions/upload-artifact@v7
with:
name: pynumaflow-lite-${{ matrix.platform.artifact }}-py${{ matrix.python-version }}
path: packages/pynumaflow-lite/dist/*.whl
if-no-files-found: error
compression-level: 0
24 changes: 12 additions & 12 deletions packages/pynumaflow-lite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ name = "pynumaflow_lite"
crate-type = ["cdylib", "rlib"]

[dependencies]
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "44ee3068fcf7088ff265df7ae7ce1881a40694ff" }
pyo3 = { version = "0.27.1", features = ["chrono", "experimental-inspect"] }
tokio = "1.47.1"
tonic = "0.14.2"
tokio-stream = "0.1.17"
tower = "0.5.2"
hyper-util = "0.1.16"
prost-types = "0.14.1"
chrono = "0.4.42"
pyo3-async-runtimes = { version = "0.27.0", features = ["tokio-runtime"] }
futures-core = "0.3.31"
pin-project = "1.1.10"
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "de2fdc16da2f2bfb12ee04050e6e60c809692c2c" }
pyo3 = { version = "0.29.0", features = ["chrono", "experimental-inspect"] }
tokio = "1.52.3"
tonic = "0.14.6"
tokio-stream = "0.1.18"
tower = "0.5.3"
hyper-util = "0.1.20"
prost-types = "0.14.4"
chrono = "0.4.45"
pyo3-async-runtimes = { version = "0.29.0", features = ["tokio-runtime"] }
futures-core = "0.3.32"
pin-project = "1.1.13"

## Binaries for testing

Expand Down
6 changes: 4 additions & 2 deletions packages/pynumaflow-lite/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ clean:
cargo clean

py-fmt:
uv run black pynumaflow_lite/ tests/ manifests/
uv run ruff format pynumaflow_lite/ tests/ manifests/

py-lint: py-fmt
py-lint:
uv run ruff check --fix .
uv run ruff format pynumaflow_lite/ tests/ manifests/

fmt:
cargo fmt --all
Expand All @@ -58,6 +59,7 @@ lint: test-fmt clippy py-lint
.PHONY: test-fmt
test-fmt:
cargo fmt --all --check
uv run ruff format --check pynumaflow_lite/ tests/ manifests/

.PHONY: clippy
clippy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
flushing sorted data when the watermark advances.
"""

import signal
import asyncio
import signal
from collections.abc import AsyncIterator
from datetime import datetime
from typing import AsyncIterator

from pynumaflow_lite.accumulator import (
Accumulator,
AccumulatorAsyncServer,
Datum,
Message,
AccumulatorAsyncServer,
Accumulator,
)


Expand Down Expand Up @@ -107,7 +107,6 @@ async def main():
"""
Start the accumulator server.
"""
import signal

server = AccumulatorAsyncServer()

Expand All @@ -124,20 +123,9 @@ async def main():
await server.start(StreamSorter)
print("Shutting down gracefully...")
except asyncio.CancelledError:
try:
server.stop()
except Exception:
pass
server.stop()
return


# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.

signal.signal(signal.SIGINT, signal.default_int_handler)
try:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
except AttributeError:
pass

if __name__ == "__main__":
asyncio.run(main())
24 changes: 4 additions & 20 deletions packages/pynumaflow-lite/manifests/batchmap/batchmap_cat.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import asyncio
import signal
from collections.abc import AsyncIterable
from typing import Awaitable, Callable
from collections.abc import AsyncIterable, Awaitable, Callable

from pynumaflow_lite import batchmapper
from pynumaflow_lite.batchmapper import Message


class SimpleBatchCat(batchmapper.BatchMapper):
async def handler(
self, batch: AsyncIterable[batchmapper.Datum]
) -> batchmapper.BatchResponses:
async def handler(self, batch: AsyncIterable[batchmapper.Datum]) -> batchmapper.BatchResponses:
responses = batchmapper.BatchResponses()
async for d in batch:
resp = batchmapper.BatchResponse(d.id)
Expand All @@ -23,18 +20,8 @@ async def handler(
return responses


# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
signal.signal(signal.SIGINT, signal.default_int_handler)
try:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
except AttributeError:
pass


async def start(
f: Callable[
[AsyncIterable[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]
],
f: Callable[[AsyncIterable[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]],
):
server = batchmapper.BatchMapAsyncServer()

Expand All @@ -50,10 +37,7 @@ async def start(
await server.start(f)
print("Shutting down gracefully...")
except asyncio.CancelledError:
try:
server.stop()
except Exception:
pass
server.stop()
return


Expand Down
15 changes: 2 additions & 13 deletions packages/pynumaflow-lite/manifests/map/map_cat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import signal
from typing import Awaitable, Callable
from collections.abc import Awaitable, Callable

from pynumaflow_lite import mapper

Expand All @@ -18,14 +18,6 @@ async def handler(self, keys: list[str], payload: mapper.Datum) -> mapper.Messag
return messages


# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
signal.signal(signal.SIGINT, signal.default_int_handler)
try:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
except AttributeError:
pass


async def start(f: Callable[[list[str], mapper.Datum], Awaitable[mapper.Messages]]):
server = mapper.MapAsyncServer()

Expand All @@ -47,10 +39,7 @@ async def start(f: Callable[[list[str], mapper.Datum], Awaitable[mapper.Messages
print("Shutting down gracefully...")
except asyncio.CancelledError:
# Fallback in case the task was cancelled by the runner
try:
server.stop()
except Exception:
pass
server.stop()
return


Expand Down
Loading
Loading