Skip to content

feat: Add Google Gemini plugin for Flyte#632

Open
andreahlert wants to merge 1 commit intoflyteorg:mainfrom
andreahlert:feat/gemini-plugin
Open

feat: Add Google Gemini plugin for Flyte#632
andreahlert wants to merge 1 commit intoflyteorg:mainfrom
andreahlert:feat/gemini-plugin

Conversation

@andreahlert
Copy link
Contributor

Summary

This PR adds a new plugin for integrating Google's Gemini API with Flyte, following the same patterns as the existing OpenAI and Anthropic plugins.

Features

  • function_tool decorator: Convert Flyte tasks to Gemini function declarations automatically
  • Automatic type conversion: Python type hints to JSON schema for tool input validation
  • Agent class: Configure Gemini agents with model, instructions, and tools
  • run_agent function: Execute agent loops that handle function calling and responses
  • Support for sync and async tasks
  • Manual function calling control: Uses AFC (Automatic Function Calling) disabled mode for explicit tool execution management

Usage Example

import flyte
from flyteplugins.gemini import function_tool, run_agent

env = flyte.TaskEnvironment("gemini-agent")

@env.task
async def get_weather(city: str) -> str:
    """Get the current weather for a city."""
    return f"Weather in {city}: sunny, 72F"

@env.task
async def agent_task(prompt: str) -> str:
    tools = [function_tool(get_weather)]
    return await run_agent(
        prompt=prompt,
        tools=tools,
        model="gemini-2.5-flash",
    )

Files Added

  • plugins/gemini/ - Plugin package
    • pyproject.toml - Package configuration
    • README.md - Documentation
    • src/flyteplugins/gemini/agents/_function_tools.py - Core implementation
    • tests/test_agents.py - Test suite (19 tests)
  • examples/genai/gemini_agent.py - Example workflow

Test Plan

  • All 19 unit tests pass
  • Type conversion tests (string, int, float, bool, list, dict, Optional)
  • Function schema extraction tests
  • FunctionTool creation tests (regular functions, Flyte tasks, async)
  • Agent configuration tests
  • Tool execution tests (sync and async)
  • run_agent tests (tool calls, errors, max iterations, finish reasons)

Why Gemini?

Google Gemini is one of the leading LLM providers alongside OpenAI and Anthropic. Having first-class support enables users to:

  • Choose the best model for their use case
  • Easily switch between providers
  • Use Gemini's unique capabilities (multimodal, large context windows, cost efficiency)

This plugin mirrors the Anthropic plugin's API design, making it easy to switch between providers.

Add a new plugin that integrates Google's Gemini API with Flyte,
enabling Flyte tasks to be used as tools for Gemini agents.

The plugin provides:
- FunctionTool: wraps Flyte tasks or regular callables as Gemini
  function declarations with automatic JSON schema generation
- function_tool: decorator/converter for creating tools from
  functions and Flyte tasks
- Agent: configuration dataclass for Gemini agent settings
- run_agent: async agent loop that handles function calling,
  error handling, and conversation state management

Includes comprehensive test coverage (19 tests) and a sandwich-making
example that demonstrates multi-tool orchestration.

Signed-off-by: André Ahlert <andre@aex.partners>

def _get_function_schema(func: typing.Callable) -> dict[str, typing.Any]:
"""Extract JSON schema from a function's type hints."""
sig = inspect.signature(func)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should not need to do this, this is already done as part of the AsyncFunctionTemplate - native interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked NativeInterface before going this route. It stores Python types as Dict[str, Tuple[Type, Any]] but doesn't expose a JSON schema conversion. Gemini's FunctionDeclaration needs parameters_json_schema as a JSON schema dict, so some conversion layer is needed regardless.

This is the same approach used in the Anthropic plugin (plugins/anthropic/.../agents/_function_tools.py, lines 34-66). Also, function_tool() needs to support regular callables and @flyte.trace decorated functions too, where NativeInterface isn't available.

Happy to refactor if there's a method I'm missing that already does this conversion.


tool_name = name or actual_func.__name__
tool_description = description or (actual_func.__doc__ or f"Execute {tool_name}")
input_schema = _get_function_schema(actual_func)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you already have native_interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

native_interface is only set when func is an AsyncFunctionTaskTemplate. For plain callables it's None, so we'd still need the inspect fallback. The Anthropic plugin stores native_interface the same way but generates the schema from the raw function for this reason.

One thing I could do is use native_interface.inputs when available and fall back to inspect otherwise. Would that be the direction you'd prefer?

if self.task is not None:
if self.is_async:
return await self.task(**kwargs)
return await asyncio.to_thread(self.task, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed and dangerous. this will start non daemon threads which will block. you dont need to do it, this is all solved by flyte, it gives you an async interface. doing the to_thread is not recommended.

i think we should just call the async execute method. it takes care of the rest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hear you on avoiding unnecessary threads. I looked at how the SDK handles this internally, and AsyncFunctionTaskTemplate.execute() itself uses run_sync_with_loop() for sync functions, which also spins up a thread with its own event loop (src/flyte/_utils/asyncify.py).

The reason for to_thread here: for sync Flyte tasks, task(**kwargs) goes through __call__ -> controller.submit_sync() which returns a blocking Future.result(None). Without to_thread, that blocks the event loop while the agent loop needs to stay async to handle multiple tool calls.

That said, if there's a cleaner async path I should be using instead, I'm happy to switch. The Anthropic plugin uses the same to_thread pattern, so if there's a better approach we could align both.

@kumare3
Copy link
Contributor

kumare3 commented Feb 8, 2026

I dont think this is right, i have not looked at the anthropic plugin, but this will result in bugs and instability

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants