Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,27 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
async def on_task_create(self, params: CreateTaskParams) -> None:
logger.info(f"Received task create params: {params}")

# 1. Acknowledge that the task has been created.
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author="agent",
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
),
)
# 1. Acknowledge that the task has been created. Gate this one-time prologue
# on is_continued_run(): run_until_complete below recycles the workflow via
# continue-as-new, which re-enters on_task_create from the top — without this
# guard the "you should only see this once" welcome would re-fire on every
# recycle. Original run -> emit; continued (recycled) run -> skip.
if not self.is_continued_run():
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author="agent",
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
),
)

# 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once.

# Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met.
await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
)
# 2. Keep the workflow open to field events. We use run_until_complete
# instead of a bare wait_condition: it still waits indefinitely, but also
# recycles the Temporal event history via continue-as-new before it hits the
# ~50k-event / 50MB limit, so this chat can stay open forever. Adopting
# run_until_complete IS the opt-in — agents that keep the old wait_condition
# never recycle. This agent keeps no cross-turn state, so nothing needs
# restoring across a recycle and `params` is the only carry-forward. (Agents
# that DO keep state restore it at the top of @workflow.run on a recycled
# run — framework-specific, landing per-integration in follow-up PRs.)
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
Comment thread
greptile-apps[bot] marked this conversation as resolved.
2 changes: 1 addition & 1 deletion src/agentex/lib/core/clients/temporal/temporal_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async def start_workflow(
duplicate_policy: DuplicateWorkflowPolicy = DuplicateWorkflowPolicy.ALLOW_DUPLICATE,
retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
task_timeout: timedelta = timedelta(seconds=10),
execution_timeout: timedelta = timedelta(seconds=86400),
execution_timeout: timedelta | None = None,
**kwargs: Any,
) -> str:
temporal_retry_policy = TemporalRetryPolicy(**retry_policy.model_dump(exclude_unset=True))
Expand Down
11 changes: 10 additions & 1 deletion src/agentex/lib/core/temporal/services/temporal_task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ async def submit_task(self, agent: Agent, task: Task, params: dict[str, Any] | N
returns the workflow ID of the temporal workflow
"""
# None / 0 / negative => no execution timeout (workflow can stay open
# indefinitely, which long-lived chat/session agents rely on). A positive
# value bounds the whole continue-as-new chain's wall-clock lifetime.
timeout_seconds = self._env_vars.WORKFLOW_EXECUTION_TIMEOUT_SECONDS
execution_timeout = (
timedelta(seconds=timeout_seconds)
if timeout_seconds and timeout_seconds > 0
else None
)
return await self._temporal_client.start_workflow(
workflow=self._env_vars.WORKFLOW_NAME,
arg=CreateTaskParams(
Expand All @@ -42,7 +51,7 @@ async def submit_task(self, agent: Agent, task: Task, params: dict[str, Any] | N
),
id=task.id,
task_queue=self._env_vars.WORKFLOW_TASK_QUEUE,
execution_timeout=timedelta(seconds=self._env_vars.WORKFLOW_EXECUTION_TIMEOUT_SECONDS),
execution_timeout=execution_timeout,
)

async def get_state(self, task_id: str) -> WorkflowState:
Expand Down
136 changes: 136 additions & 0 deletions src/agentex/lib/core/temporal/workflows/workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any, Callable
from datetime import timedelta

from temporalio import workflow

Expand All @@ -24,3 +28,135 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
@abstractmethod
async def on_task_create(self, params: CreateTaskParams) -> None:
raise NotImplementedError

# ------------------------------------------------------------------ #
# Continue-as-new lifecycle helpers #
# #
# These let a long-lived chat/session workflow recycle its event #
# history so it can stay open indefinitely without hitting Temporal's #
# ~50k-event / 50MB history limit. They are OPT-IN: an agent gets #
# recycling only by calling `run_until_complete` from its #
# `@workflow.run` instead of the usual indefinite `wait_condition`. #
# The SDK owns the hard Temporal mechanics (recycle decision and #
# draining in-flight handlers before the continue_as_new call). #
# Restoring state after a recycle is the AGENT's job and is #
# framework-specific (rebuild from `adk.messages`, an `adk.state` #
# snapshot, or a framework's own memory); that lands per-integration #
# in follow-up PRs. The 000_hello_acp example shows the minimal #
# stateless adoption that needs no restoration. #
# ------------------------------------------------------------------ #

def should_continue_as_new(self) -> bool:
"""Whether this run should recycle its event history via continue-as-new.

True when Temporal suggests it: ``is_continue_as_new_suggested()`` fires as
the event history approaches the server's size/count limit, so we let
Temporal own the threshold rather than configuring one ourselves.

This reads only a deterministic ``workflow.info()`` value and emits no
commands, so it is safe to use directly as a ``workflow.wait_condition``
predicate, e.g.::

await workflow.wait_condition(
lambda: self._complete_task or self.should_continue_as_new()
)
"""
return workflow.info().is_continue_as_new_suggested()

async def drain_and_continue_as_new(
self,
*args: Any,
is_complete: Callable[[], bool] | None = None,
) -> None:
"""Drain in-flight signal handlers, then continue-as-new.

Call this from the agent's ``@workflow.run`` once the run loop wakes for a
recycle (see :meth:`should_continue_as_new`). ``args`` are forwarded
verbatim to ``workflow.continue_as_new`` and become the new run's input, so
pass whatever your ``@workflow.run`` signature expects — typically the
original ``CreateTaskParams`` (the new run keeps the same workflow id / task
id and re-hydrates its state from ``adk.state``).

IMPORTANT: keep your data OUTSIDE workflow state BEFORE calling this —
messages in ``adk.messages`` and any other state in ``adk.state``.
In-workflow attributes do NOT survive the recycle; only the forwarded
``args`` do.

Waits on ``all_handlers_finished`` first so an in-flight turn (a signal
handler still running an activity) is never lost or duplicated across the
recycle boundary. ``workflow.continue_as_new`` raises to end the run, so
this never returns normally — EXCEPT when ``is_complete`` is given and
returns True after draining: a completion signal can arrive while we wait
for the drain, and the recycled run would start fresh (losing that
completion), so in that case we return without recycling and let the caller
finish.
"""
# Don't recycle until any signal handler still running has finished, so a
# message mid-flight at the boundary is carried into the next run intact.
await workflow.wait_condition(workflow.all_handlers_finished)
# A completion signal may have landed during the drain — re-check before
# recycling so a workflow that should finish isn't kept open by the recycle.
if is_complete is not None and is_complete():
return
logger.info(
"Recycling workflow via continue-as-new "
f"(history_length={workflow.info().get_current_history_length()}, "
f"run_id={workflow.info().run_id})"
)
workflow.continue_as_new(*args)
Comment thread
greptile-apps[bot] marked this conversation as resolved.

async def run_until_complete(
self,
*continue_as_new_args: Any,
is_complete: Callable[[], bool],
timeout: timedelta | None = None,
) -> None:
Comment thread
greptile-apps[bot] marked this conversation as resolved.
"""Keep the workflow open to field events, recycling history as needed.

Drop-in replacement for the usual ``await workflow.wait_condition(
lambda: self._complete_task, timeout=None)`` at the end of an agent's
``@workflow.run``. ``is_complete`` is a no-arg predicate (typically
``lambda: self._complete_task``); ``continue_as_new_args`` are forwarded to
continue-as-new on recycle (typically the original ``CreateTaskParams``).

Adopting this method IS the opt-in to recycling — there is no flag. An agent
that keeps the old indefinite ``wait_condition`` never recycles.

``timeout`` is an optional cap on how long to wait with no progress; it
defaults to None = wait indefinitely (the usual case — Temporal can keep huge
numbers of idle workflows open). The broader workflow-level lifetime cap is
the execution timeout (``WORKFLOW_EXECUTION_TIMEOUT_SECONDS``, also infinite
by default). On ``timeout`` expiry ``wait_condition`` raises
``asyncio.TimeoutError`` like before.

Persist anything you need across a recycle OUTSIDE workflow state first —
messages in ``adk.messages``, other state in ``adk.state`` — and rebuild it
at the top of ``@workflow.run``.
"""
while True:
await workflow.wait_condition(
lambda: is_complete() or self.should_continue_as_new(),
timeout=timeout,
)
if is_complete():
return
# Drains in-flight handlers, then continue-as-new (raises; never
# returns) — UNLESS a completion signal arrived during the drain, in
# which case it returns here and the next loop iteration completes.
await self.drain_and_continue_as_new(
*continue_as_new_args, is_complete=is_complete
)
if is_complete():
return

def is_continued_run(self) -> bool:
"""Whether this run was produced by a continue-as-new from a prior run.

True only on a recycled run (``workflow.info().continued_run_id`` is set),
False on the original run a client created. Use it in ``@workflow.run`` to
gate one-time prologue work that must NOT repeat on every recycle — e.g. a
welcome message, or rehydrating state only when there's something to restore.
The recycled run re-enters ``@workflow.run`` from the top, so anything not
gated here runs again on each history rollover.
"""
return workflow.info().continued_run_id is not None
14 changes: 8 additions & 6 deletions src/agentex/lib/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from pathlib import Path

from dotenv import load_dotenv
from pydantic import Field

from agentex.lib.utils.logging import make_logger
from agentex.lib.utils.model_utils import BaseModel
Expand Down Expand Up @@ -76,11 +75,14 @@ class EnvironmentVariables(BaseModel):
# Workflow Configuration
WORKFLOW_TASK_QUEUE: str | None = None
WORKFLOW_NAME: str | None = None
# Maximum total time (in seconds) a workflow execution can run, including
# retries and continue-as-new. Defaults to 24h to bound runaway workflows;
# agents with longer-running tasks should override this. Must be > 0 — a
# zero or negative timedelta would cause every submitted workflow to fail.
WORKFLOW_EXECUTION_TIMEOUT_SECONDS: int = Field(default=86400, gt=0)
# Maximum total wall-clock time (in seconds) a workflow execution can run,
# INCLUDING retries and the entire continue-as-new chain (Temporal does not
# reset it on continue-as-new). Defaults to None = no execution timeout, so
# long-lived chat/session workflows can stay open indefinitely. None / 0 /
# negative are all treated as "no timeout" at the start_workflow call site.
# To bound idle workflows, use an explicit durable timer inside the workflow
# (e.g. run_until_complete's `timeout`), not this chain-wide ceiling.
WORKFLOW_EXECUTION_TIMEOUT_SECONDS: int | None = None
# Temporal Worker Configuration
HEALTH_CHECK_PORT: int = 80
# Auth Configuration
Expand Down
75 changes: 75 additions & 0 deletions tests/lib/core/temporal/test_base_workflow_continue_as_new.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Unit tests for BaseWorkflow's continue-as-new lifecycle helpers.
These exercise the pure decision helpers (``should_continue_as_new`` and
``is_continued_run``) by faking ``workflow.info()`` so we don't need a running
Temporal server. The drain + ``workflow.continue_as_new`` mechanics in
``drain_and_continue_as_new`` / ``run_until_complete`` are best covered by a
replay/integration test against a Temporal test environment (a follow-up).
"""

from __future__ import annotations

from typing import override

import pytest

from agentex.lib.core.temporal.workflows import workflow as base_workflow_module
from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow


class _ConcreteWorkflow(BaseWorkflow):
"""Minimal concrete subclass so we can instantiate the ABC in a test."""

def __init__(self) -> None:
self.display_name = "test"

@override
async def on_task_event_send(self, params) -> None: # pragma: no cover - unused
raise NotImplementedError

@override
async def on_task_create(self, params) -> None: # pragma: no cover - unused
raise NotImplementedError


class _FakeInfo:
def __init__(self, *, suggested: bool, continued_run_id: str | None = None) -> None:
self._suggested = suggested
self.continued_run_id = continued_run_id

def is_continue_as_new_suggested(self) -> bool:
return self._suggested


@pytest.fixture
def patch_info(monkeypatch):
"""Patch ``workflow.info`` used inside the BaseWorkflow module."""

def _apply(*, suggested: bool = False, continued_run_id: str | None = None) -> None:
monkeypatch.setattr(
base_workflow_module.workflow,
"info",
lambda: _FakeInfo(suggested=suggested, continued_run_id=continued_run_id),
)

return _apply


def test_recycles_when_temporal_suggests(patch_info):
patch_info(suggested=True)
assert _ConcreteWorkflow().should_continue_as_new() is True


def test_no_recycle_when_not_suggested(patch_info):
patch_info(suggested=False)
assert _ConcreteWorkflow().should_continue_as_new() is False


def test_is_continued_run_false_on_original_run(patch_info):
patch_info(continued_run_id=None)
assert _ConcreteWorkflow().is_continued_run() is False


def test_is_continued_run_true_after_recycle(patch_info):
patch_info(continued_run_id="run-123")
assert _ConcreteWorkflow().is_continued_run() is True
Loading