Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 118 additions & 109 deletions src/google/adk/agents/sequential_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,126 +33,135 @@
from .llm_agent import LlmAgent
from .sequential_agent_config import SequentialAgentConfig

logger = logging.getLogger('google_adk.' + __name__)
logger = logging.getLogger("google_adk." + __name__)


@experimental
class SequentialAgentState(BaseAgentState):
"""State for SequentialAgent."""
"""State for SequentialAgent."""

current_sub_agent: str = ''
"""The name of the current sub-agent to run."""
current_sub_agent: str = ""
"""The name of the current sub-agent to run."""


class SequentialAgent(BaseAgent):
"""A shell agent that runs its sub-agents in sequence."""

config_type: ClassVar[Type[BaseAgentConfig]] = SequentialAgentConfig
"""The config type for this agent."""

@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
if not self.sub_agents:
return

# Initialize or resume the execution state from the agent state.
agent_state = self._load_agent_state(ctx, SequentialAgentState)
start_index = self._get_start_index(agent_state)

pause_invocation = False
resuming_sub_agent = agent_state is not None
for i in range(start_index, len(self.sub_agents)):
sub_agent = self.sub_agents[i]
if not resuming_sub_agent:
# If we are resuming from the current event, it means the same event has
# already been logged, so we should avoid yielding it again.
"""A shell agent that runs its sub-agents in sequence."""

config_type: ClassVar[Type[BaseAgentConfig]] = SequentialAgentConfig
"""The config type for this agent."""

@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
if not self.sub_agents:
return

# Initialize or resume the execution state from the agent state.
agent_state = self._load_agent_state(ctx, SequentialAgentState)
start_index = self._get_start_index(agent_state)

pause_invocation = False
resuming_sub_agent = agent_state is not None
for i in range(start_index, len(self.sub_agents)):
sub_agent = self.sub_agents[i]
if not resuming_sub_agent:
# If we are resuming from the current event, it means the same event has
# already been logged, so we should avoid yielding it again.
if ctx.is_resumable:
agent_state = SequentialAgentState(current_sub_agent=sub_agent.name)
ctx.set_agent_state(self.name, agent_state=agent_state)
yield self._create_agent_state_event(ctx)

async with Aclosing(sub_agent.run_async(ctx)) as agen:
async for event in agen:
yield event
if ctx.should_pause_invocation(event):
pause_invocation = True
# Check for escalate action to enable early exit from the sequence.
# When escalate is set, we terminate immediately, stopping both
# subsequent events in the current agent and all remaining agents.
# Note: escalate takes precedence over pause_invocation.
if event.actions and event.actions.escalate:
return

# Skip the rest of the sub-agents if the invocation is paused.
if pause_invocation:
return

# Reset the flag for the next sub-agent.
resuming_sub_agent = False

if ctx.is_resumable:
agent_state = SequentialAgentState(current_sub_agent=sub_agent.name)
ctx.set_agent_state(self.name, agent_state=agent_state)
yield self._create_agent_state_event(ctx)

async with Aclosing(sub_agent.run_async(ctx)) as agen:
async for event in agen:
yield event
if ctx.should_pause_invocation(event):
pause_invocation = True

# Skip the rest of the sub-agents if the invocation is paused.
if pause_invocation:
return

# Reset the flag for the next sub-agent.
resuming_sub_agent = False

if ctx.is_resumable:
ctx.set_agent_state(self.name, end_of_agent=True)
yield self._create_agent_state_event(ctx)

def _get_start_index(
self,
agent_state: SequentialAgentState,
) -> int:
"""Calculates the start index for the sub-agent loop."""
if not agent_state:
return 0

if not agent_state.current_sub_agent:
# This means the process was finished.
return len(self.sub_agents)

try:
sub_agent_names = [sub_agent.name for sub_agent in self.sub_agents]
return sub_agent_names.index(agent_state.current_sub_agent)
except ValueError:
# A sub-agent was removed so the agent name is not found.
# For now, we restart from the beginning.
logger.warning(
'Sub-agent %s was removed so the agent name is not found. Restarting'
' from the beginning.',
agent_state.current_sub_agent,
)
return 0

@override
async def _run_live_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""Implementation for live SequentialAgent.

Compared to the non-live case, live agents process a continuous stream of audio
or video, so there is no way to tell if it's finished and should pass
to the next agent or not. So we introduce a task_completed() function so the
model can call this function to signal that it's finished the task and we
can move on to the next agent.

Args:
ctx: The invocation context of the agent.
"""
if not self.sub_agents:
return

# There is no way to know if it's using live during init phase so we have to init it here
for sub_agent in self.sub_agents:
# add tool
def task_completed():
"""
Signals that the agent has successfully completed the user's question
or task.
ctx.set_agent_state(self.name, end_of_agent=True)
yield self._create_agent_state_event(ctx)

def _get_start_index(
self,
agent_state: SequentialAgentState,
) -> int:
"""Calculates the start index for the sub-agent loop."""
if not agent_state:
return 0

if not agent_state.current_sub_agent:
# This means the process was finished.
return len(self.sub_agents)

try:
sub_agent_names = [sub_agent.name for sub_agent in self.sub_agents]
return sub_agent_names.index(agent_state.current_sub_agent)
except ValueError:
# A sub-agent was removed so the agent name is not found.
# For now, we restart from the beginning.
logger.warning(
"Sub-agent %s was removed so the agent name is not found. Restarting"
" from the beginning.",
agent_state.current_sub_agent,
)
return 0

@override
async def _run_live_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""Implementation for live SequentialAgent.

Compared to the non-live case, live agents process a continuous stream of audio
or video, so there is no way to tell if it's finished and should pass
to the next agent or not. So we introduce a task_completed() function so the
model can call this function to signal that it's finished the task and we
can move on to the next agent.

Args:
ctx: The invocation context of the agent.
"""
return 'Task completion signaled.'

if isinstance(sub_agent, LlmAgent):
# Use function name to dedupe.
if task_completed.__name__ not in sub_agent.tools:
sub_agent.tools.append(task_completed)
sub_agent.instruction += f"""If you finished the user's request
if not self.sub_agents:
return

# There is no way to know if it's using live during init phase so we have to init it here
for sub_agent in self.sub_agents:
# add tool
def task_completed():
"""
Signals that the agent has successfully completed the user's question
or task.
"""
return "Task completion signaled."

if isinstance(sub_agent, LlmAgent):
# Use function name to dedupe.
if task_completed.__name__ not in sub_agent.tools:
sub_agent.tools.append(task_completed)
sub_agent.instruction += f"""If you finished the user's request
according to its description, call the {task_completed.__name__} function
to exit so the next agents can take over. When calling this function,
do not generate any text other than the function call."""

for sub_agent in self.sub_agents:
async with Aclosing(sub_agent.run_live(ctx)) as agen:
async for event in agen:
yield event
for sub_agent in self.sub_agents:
async with Aclosing(sub_agent.run_live(ctx)) as agen:
async for event in agen:
yield event
# Check for escalate action to enable early exit in live mode.
if event.actions and event.actions.escalate:
return
2 changes: 2 additions & 0 deletions src/google/adk/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from .enterprise_search_tool import enterprise_web_search_tool as enterprise_web_search
from .example_tool import ExampleTool
from .exit_loop_tool import exit_loop
from .exit_sequence_tool import exit_sequence
from .function_tool import FunctionTool
from .get_user_choice_tool import get_user_choice_tool as get_user_choice
from .google_maps_grounding_tool import google_maps_grounding
Expand All @@ -48,6 +49,7 @@
'VertexAiSearchTool',
'ExampleTool',
'exit_loop',
'exit_sequence',
'FunctionTool',
'get_user_choice',
'load_artifacts',
Expand Down
48 changes: 48 additions & 0 deletions src/google/adk/tools/exit_sequence_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from .tool_context import ToolContext


def exit_sequence(tool_context: ToolContext):
"""Exits the sequential execution of agents immediately.

Call this function when you encounter a terminal condition and want to
prevent subsequent agents in the sequence from executing. This will also
stop any remaining events from the current agent.

This tool is specifically designed for use within SequentialAgent contexts.
When called, it sets the escalate flag, which causes the SequentialAgent
to terminate the sequence immediately, preventing both:
- Subsequent events from the current sub-agent
- All remaining sub-agents in the sequence

Use cases:
- A blocking error is encountered that makes further processing impossible
- A definitive answer is found early, making subsequent agents unnecessary
- A security or validation check fails and the workflow must stop
- Resource limits are reached and safe termination is required

Example:
If you're in a sequence of [validator, processor, finalizer] agents,
and the validator finds invalid data, it can call exit_sequence() to
prevent the processor and finalizer from running on bad data.

Args:
tool_context: The context of the current tool invocation.
"""
tool_context.actions.escalate = True
tool_context.actions.skip_summarization = True
Loading