Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,42 +240,28 @@ async def stream_messages() -> None:
task_id=task.id,
timeout=90, # Increased timeout for CI environments
):
# A turn emits several messages (user echo, reasoning, agent text),
# each ending in "full" or "done"; consume until the text reply lands.
msg_type = event.get("type")
if msg_type == "full":
task_message_update = StreamTaskMessageFull.model_validate(event)
if task_message_update.parent_task_message and task_message_update.parent_task_message.id:
finished_message = await client.messages.retrieve(task_message_update.parent_task_message.id)
if (
finished_message.content
and finished_message.content.type == "text"
and finished_message.content.author == "user"
):
user_message_found = True
elif (
finished_message.content
and finished_message.content.type == "text"
and finished_message.content.author == "agent"
):
agent_response_found = True
elif finished_message.content and finished_message.content.type == "reasoning":
reasoning_found = True

# Exit early if we have what we need
if user_message_found and agent_response_found:
break

parent_task_message = StreamTaskMessageFull.model_validate(event).parent_task_message
elif msg_type == "done":
task_message_update_done = StreamTaskMessageDone.model_validate(event)
if task_message_update_done.parent_task_message and task_message_update_done.parent_task_message.id:
finished_message = await client.messages.retrieve(task_message_update_done.parent_task_message.id)
if finished_message.content and finished_message.content.type == "reasoning":
reasoning_found = True
elif (
finished_message.content
and finished_message.content.type == "text"
and finished_message.content.author == "agent"
):
agent_response_found = True
parent_task_message = StreamTaskMessageDone.model_validate(event).parent_task_message
else:
continue

if parent_task_message and parent_task_message.id:
finished_message = await client.messages.retrieve(parent_task_message.id)
content = finished_message.content
if content and content.type == "text" and content.author == "user":
user_message_found = True
elif content and content.type == "text" and content.author == "agent":
agent_response_found = True
elif content and content.type == "reasoning":
reasoning_found = True

# Stop once both the user echo and the agent's text reply are seen.
if user_message_found and agent_response_found:
break

stream_task = asyncio.create_task(stream_messages())
Expand Down
Loading