Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Flowise Codebase Guide

## Project Overview

Flowise is a visual flow-based tool for building LLM-powered applications, agents, and chatbots. It uses a node-based interface similar to Node-RED but specialized for AI workflows.

**Architecture**: TypeScript monorepo (PNPM + Turbo) with three main packages:
- `packages/components/` - Core node components and business logic
- `packages/server/` - Express backend with APIs and database
- `packages/ui/` - React frontend with visual flow builder

## Key Development Patterns

### 1. Node Development Pattern

All nodes follow a standardized interface in `packages/components/src/Interface.ts`:

```typescript
interface INode {
label: string // Display name
name: string // Unique identifier
type: string // Node category
version: number // Version for compatibility
category: string // UI grouping
baseClasses: string[] // Output types
inputs: INodeParams[] // Input configuration
outputs?: INodeOutputsValue[]
init?(nodeData: INodeData): Promise<any>
run?(nodeData: INodeData): Promise<any>
}
```

**Node Categories**:
- Agents, ChatModels, Tools, VectorStores, DocumentLoaders, Memory, Chains, AgentFlow

### 2. Streaming Architecture

Real-time updates use `IServerSideEventStreamer` interface with events like:
- `streamTokenEvent()` - Token-by-token LLM responses
- `streamToolEvent()` - Tool execution notifications
- `streamAgentReasoningEvent()` - Agent thought processes
- `streamCustomEvent()` - Custom event types

**Pattern**: Agents and chains receive `sseStreamer` via `options.sseStreamer` and call appropriate stream methods.

### 3. Tool Integration Pattern

Tools can be:
- **Custom Functions**: User-defined JavaScript with Zod schema validation
- **API Tools**: HTTP request wrappers (GET, POST, etc.)
- **Chain Tools**: Embedded chatflows as tools
- **MCP Tools**: Model Context Protocol integrations
- **Service Tools**: Platform integrations (Google, GitHub, Slack)

Tools receive runtime context via `RunnableConfig` parameter containing `chatId`, `sseStreamer`, etc.

### 4. Agent Patterns

**Traditional Agents**: Single LLM with tools (ReAct, Conversational)
**Multi-Agent**: Supervisor-worker coordination patterns
**AgentFlow v2**: Visual node-based agent orchestration with:
- Conditional branching
- Loops and iteration
- State management
- Human-in-the-loop

### 5. Database and API Patterns

**TypeORM** entities in `/server/src/database/entities/`
**RESTful APIs** with Express middleware for auth, rate limiting
**Queue system** using BullMQ for background processing
**WebSocket/SSE** for real-time communication

## File Organization

```
packages/
├── components/
│ ├── nodes/ # All node implementations
│ │ ├── agents/ # Agent nodes
│ │ ├── tools/ # Tool integrations (including MCP)
│ │ ├── chatmodels/ # LLM integrations
│ │ └── chains/ # LangChain chains
│ └── src/
│ ├── Interface.ts # Core interfaces
│ └── handler.ts # Execution handlers
├── server/
│ ├── src/controllers/ # API endpoints
│ ├── src/utils/ # SSEStreamer, buildChatflow
│ └── src/database/ # Database entities and migrations
└── ui/
├── src/views/ # Page components
├── src/api/ # API client functions
└── src/store/ # Redux store
```

## Development Guidelines

### Adding New Nodes
1. Create node file in appropriate `/nodes/` category
2. Implement `INode` interface with `init()` and/or `run()` methods
3. Export in `/components/src/index.ts`
4. Add to category mapping in `/server/src/utils/buildChatflow.ts`

### Adding Streaming Support
1. Extract `sseStreamer` from `options.sseStreamer`
2. Extract `chatId` from `options.chatId` or config
3. Call appropriate `stream*Event()` methods during execution
4. Use `streamCustomEvent()` for custom notification types

### Working with Tools
1. Tools are LangChain-compatible functions created with `tool()` helper
2. Use Zod schemas for input validation
3. Access runtime context via second `config` parameter
4. Return string responses that get passed back through agent layer

### MCP (Model Context Protocol) Pattern
- MCP tools in `/nodes/tools/MCP/` create LangChain tool wrappers
- Each tool call creates new MCP client connection
- Notifications received via client event handlers
- Currently notifications only logged to console (opportunity for enhancement)

## Common Patterns

### Error Handling
- Use try/catch blocks in async operations
- Return descriptive error messages as strings
- Log errors for debugging but don't expose sensitive data

### Configuration Management
- Use environment variables for external service configs
- Store sensitive data in credentials system
- Support both local and external credential storage

### Testing Strategy
- Component testing for individual nodes
- Integration testing for complete flows
- Use test databases for server tests
- Mock external APIs in tests

## Key Integration Points

### UI to Server Communication
- REST APIs for CRUD operations
- WebSocket/SSE for real-time streaming
- File uploads for document processing

### Server to Components
- Node execution via `run()` methods
- Streaming via `IServerSideEventStreamer`
- State management through memory systems

### External Integrations
- LangChain ecosystem compatibility
- OpenAI/Anthropic/Google API integrations
- Vector database connections
- Third-party service APIs (GitHub, Slack, etc.)

## Current Development Focus Areas

1. **AgentFlow v2**: Enhanced visual agent orchestration
2. **Multi-agent systems**: Coordination and communication patterns
3. **Real-time streaming**: Enhanced user feedback and monitoring
4. **MCP integrations**: Model Context Protocol tool ecosystem
5. **Enterprise features**: Workspaces, analytics, usage tracking

## Troubleshooting Common Issues

### Streaming Not Working
- Check if `sseStreamer` is properly extracted from options
- Verify `chatId` is available in context
- Ensure streaming is enabled in flow configuration

### Tool Execution Failures
- Validate input schemas with Zod
- Check credential configuration and permissions
- Review tool return value format (should be string)

### Node Not Appearing in UI
- Verify node is exported in `/components/src/index.ts`
- Check category mapping in buildChatflow.ts
- Ensure node follows `INode` interface correctly
40 changes: 22 additions & 18 deletions packages/components/nodes/agentflow/Agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2059,14 +2059,6 @@ class Agent_Agentflow implements INode {
let isToolRequireHumanInput =
(selectedTool as any).requiresHumanInput && (!iterationContext || Object.keys(iterationContext).length === 0)

const flowConfig = {
chatflowId: options.chatflowid,
sessionId: options.sessionId,
chatId: options.chatId,
input: input,
state: options.agentflowRuntime?.state
}

if (isToolRequireHumanInput) {
const toolCallDetails = '```json\n' + JSON.stringify(toolCall, null, 2) + '\n```'
const responseContent = response.content + `\nAttempting to use tool:\n${toolCallDetails}`
Expand All @@ -2084,7 +2076,17 @@ class Agent_Agentflow implements INode {

try {
//@ts-ignore
let toolOutput = await selectedTool.call(toolCall.args, { signal: abortController?.signal }, undefined, flowConfig)
let toolOutput = await selectedTool.invoke(toolCall.args, {
signal: abortController?.signal,
configurable: {
sessionId: options.sessionId,
chatId: chatId,
input: input,
state: options.agentflowRuntime?.state,
sseStreamer: sseStreamer,
flowise_chatId: chatId
}
})

if (options.analyticHandlers && toolIds) {
await options.analyticHandlers.onToolEnd(toolIds, toolOutput)
Expand Down Expand Up @@ -2383,14 +2385,6 @@ class Agent_Agentflow implements INode {
let parsedDocs
let parsedArtifacts

const flowConfig = {
chatflowId: options.chatflowid,
sessionId: options.sessionId,
chatId: options.chatId,
input: input,
state: options.agentflowRuntime?.state
}

if (humanInput.type === 'reject') {
messages.pop()
const toBeRemovedTool = toolsInstance.find((tool) => tool.name === toolCall.name)
Expand All @@ -2410,7 +2404,17 @@ class Agent_Agentflow implements INode {

try {
//@ts-ignore
let toolOutput = await selectedTool.call(toolCall.args, { signal: abortController?.signal }, undefined, flowConfig)
let toolOutput = await selectedTool.invoke(toolCall.args, {
signal: abortController?.signal,
configurable: {
sessionId: options.sessionId,
chatId: chatId,
input: input,
state: options.agentflowRuntime?.state,
sseStreamer: sseStreamer,
flowise_chatId: chatId
}
})

if (options.analyticHandlers && toolIds) {
await options.analyticHandlers.onToolEnd(toolIds, toolOutput)
Expand Down
17 changes: 14 additions & 3 deletions packages/components/nodes/agents/ToolAgent/ToolAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,12 @@ class ToolAgent_Agents implements INode {
}

async init(nodeData: INodeData, input: string, options: ICommonObject): Promise<any> {
return prepareAgent(nodeData, options, { sessionId: this.sessionId, chatId: options.chatId, input })
return prepareAgent(nodeData, options, {
sessionId: this.sessionId,
chatId: options.chatId,
input,
sseStreamer: options.sseStreamer
})
}

async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string | ICommonObject> {
Expand All @@ -141,7 +146,12 @@ class ToolAgent_Agents implements INode {
}
}

const executor = await prepareAgent(nodeData, options, { sessionId: this.sessionId, chatId: options.chatId, input })
const executor = await prepareAgent(nodeData, options, {
sessionId: this.sessionId,
chatId: options.chatId,
input,
sseStreamer: options.sseStreamer
})

const loggerHandler = new ConsoleCallbackHandler(options.logger, options?.orgId)
const callbacks = await additionalCallbacks(nodeData, options)
Expand Down Expand Up @@ -268,7 +278,7 @@ class ToolAgent_Agents implements INode {
const prepareAgent = async (
nodeData: INodeData,
options: ICommonObject,
flowObj: { sessionId?: string; chatId?: string; input?: string }
flowObj: { sessionId?: string; chatId?: string; input?: string; sseStreamer?: any }
) => {
const model = nodeData.inputs?.model as BaseChatModel
const maxIterations = nodeData.inputs?.maxIterations as string
Expand Down Expand Up @@ -370,6 +380,7 @@ const prepareAgent = async (
sessionId: flowObj?.sessionId,
chatId: flowObj?.chatId,
input: flowObj?.input,
sseStreamer: flowObj?.sseStreamer,
verbose: process.env.DEBUG === 'true' ? true : false,
maxIterations: maxIterations ? parseFloat(maxIterations) : undefined
})
Expand Down
6 changes: 6 additions & 0 deletions packages/components/nodes/tools/MCP/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export const MCP_STREAMING_CONFIG = {
DEFAULT_COMPLETION_TIMEOUT: 600000, // 10 minutes fallback - only as safety net
NOTIFICATION_DELAY: 1000, // 1 second delay before cleanup
SUPPORTED_NOTIFICATION_TYPES: ['logging/message', 'progress'],
STREAMING_MARKER: '[MCP Streaming]'
}
Loading
Loading