Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@

### Added

- Session affinity:同一对话链路由到同一账号,修复 `previous_response_id` 跨账号失效问题
- `prompt_cache_key`:每个对话链生成唯一 UUID 传递给后端,启用 prompt cache
- WebSocket 请求新增 `include: ["reasoning.encrypted_content"]`(reasoning 开启时自动设置)
- 请求级监控日志:affinity hit/miss、payload 大小、usage 统计、大 payload 告警
- E2E 测试:proxy-routes(36 cases)、dashboard-auth(9)、batch-label(11)、admin-general(11)、debug-routes(5)—— 覆盖率从 51% 提升至 ~75%
- 单元测试:config-loader(16 cases)、config-schema(10)、codex-models(9)
- account-import service 测试补充 RT rotation/fallback 2 cases

### Fixed

- 修复 `service_tier` 在 WebSocket 和 HTTP 两条路径均被丢弃的 bug — 现在正确转发给后端
- 修复 `PUT /api/proxies/settings` 被 `PUT /api/proxies/:id` 路由参数 shadow 的 bug(Hono 按注册顺序匹配)

### Changed
Expand Down
90 changes: 90 additions & 0 deletions src/auth/__tests__/session-affinity.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { describe, it, expect, afterEach } from "vitest";
import { SessionAffinityMap } from "../session-affinity.js";

describe("SessionAffinityMap", () => {
let map: SessionAffinityMap;

afterEach(() => {
map?.dispose();
});

it("records and looks up a mapping", () => {
map = new SessionAffinityMap();
map.record("resp_abc", "entry_123", "conv_1");
expect(map.lookup("resp_abc")).toBe("entry_123");
});

it("returns null for unknown response IDs", () => {
map = new SessionAffinityMap();
expect(map.lookup("resp_unknown")).toBeNull();
});

it("overwrites previous mapping for same response ID", () => {
map = new SessionAffinityMap();
map.record("resp_abc", "entry_1", "conv_1");
map.record("resp_abc", "entry_2", "conv_2");
expect(map.lookup("resp_abc")).toBe("entry_2");
});

it("expires entries after TTL", () => {
map = new SessionAffinityMap(50); // 50ms TTL
map.record("resp_abc", "entry_123", "conv_1");
expect(map.lookup("resp_abc")).toBe("entry_123");

const start = Date.now();
while (Date.now() - start < 60) {
// busy wait
}
expect(map.lookup("resp_abc")).toBeNull();
});

it("tracks size correctly", () => {
map = new SessionAffinityMap();
expect(map.size).toBe(0);
map.record("resp_1", "entry_1", "conv_1");
map.record("resp_2", "entry_2", "conv_2");
expect(map.size).toBe(2);
});

it("cleans up on dispose", () => {
map = new SessionAffinityMap();
map.record("resp_1", "entry_1", "conv_1");
map.dispose();
expect(map.size).toBe(0);
});

// Conversation ID tracking
it("looks up conversation ID for a response", () => {
map = new SessionAffinityMap();
map.record("resp_abc", "entry_1", "conv_xyz");
expect(map.lookupConversationId("resp_abc")).toBe("conv_xyz");
});

it("returns null conversation ID for unknown response", () => {
map = new SessionAffinityMap();
expect(map.lookupConversationId("resp_unknown")).toBeNull();
});

it("conversation ID is inherited across response chain", () => {
map = new SessionAffinityMap();
// Turn 1: new conversation
map.record("resp_1", "entry_1", "conv_abc");
// Turn 2: inherit conv ID from turn 1
const convId = map.lookupConversationId("resp_1");
expect(convId).toBe("conv_abc");
map.record("resp_2", "entry_1", convId!);
// Turn 3: inherit from turn 2
expect(map.lookupConversationId("resp_2")).toBe("conv_abc");
});

it("expires conversation ID along with entry", () => {
map = new SessionAffinityMap(50);
map.record("resp_abc", "entry_1", "conv_1");

const start = Date.now();
while (Date.now() - start < 60) {
// busy wait
}
expect(map.lookupConversationId("resp_abc")).toBeNull();
});
});
11 changes: 9 additions & 2 deletions src/auth/account-lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class AccountLifecycle {
if (slots.length === 0) this.acquireLocks.delete(entryId);
}

acquire(options?: { model?: string; excludeIds?: string[] }): AcquiredAccount | null {
acquire(options?: { model?: string; excludeIds?: string[]; preferredEntryId?: string }): AcquiredAccount | null {
const nowMs = Date.now();
const now = new Date(nowMs);

Expand Down Expand Up @@ -100,7 +100,14 @@ export class AccountLifecycle {
}
}

const selected = this.strategy.select(candidates, this.rotationState);
// Session affinity: prefer the account that owns the conversation
let selected: AccountEntry;
if (options?.preferredEntryId) {
const preferred = candidates.find((a) => a.id === options.preferredEntryId);
selected = preferred ?? this.strategy.select(candidates, this.rotationState);
} else {
selected = this.strategy.select(candidates, this.rotationState);
}
const prevSlots = this.acquireLocks.get(selected.id);
const prevSlotMs = prevSlots?.[prevSlots.length - 1] ?? null;
this.pushSlot(selected.id);
Expand Down
2 changes: 1 addition & 1 deletion src/auth/account-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class AccountPool {

// ── Lifecycle (acquire/release) ───────────────────────────────────

acquire(options?: { model?: string; excludeIds?: string[] }): AcquiredAccount | null {
acquire(options?: { model?: string; excludeIds?: string[]; preferredEntryId?: string }): AcquiredAccount | null {
return this.lifecycle.acquire(options);
}

Expand Down
87 changes: 87 additions & 0 deletions src/auth/session-affinity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Session affinity — maps Codex response IDs to account entry IDs.
*
* When a request includes `previous_response_id`, the proxy looks up which
* account created that response and routes to the same account. This enables:
* - Server-side conversation history reuse (previous_response_id chain)
* - Prompt cache hits (cache is per-account on the backend)
*/

interface AffinityEntry {
entryId: string;
conversationId: string;
createdAt: number;
}

const DEFAULT_TTL_MS = 4 * 60 * 60 * 1000; // 4 hours
const CLEANUP_INTERVAL_MS = 10 * 60 * 1000; // 10 minutes

export class SessionAffinityMap {
private map = new Map<string, AffinityEntry>();
private ttlMs: number;
private cleanupTimer: ReturnType<typeof setInterval> | null = null;

constructor(ttlMs: number = DEFAULT_TTL_MS) {
this.ttlMs = ttlMs;
this.cleanupTimer = setInterval(() => this.cleanup(), CLEANUP_INTERVAL_MS);
}

/** Record that a response was created by a specific account in a conversation. */
record(responseId: string, entryId: string, conversationId: string): void {
this.map.set(responseId, { entryId, conversationId, createdAt: Date.now() });
}

/** Look up which account created a given response. */
lookup(responseId: string): string | null {
const entry = this.getEntry(responseId);
return entry?.entryId ?? null;
}

/** Look up the conversation ID for a given response. */
lookupConversationId(responseId: string): string | null {
const entry = this.getEntry(responseId);
return entry?.conversationId ?? null;
}

private getEntry(responseId: string): AffinityEntry | null {
const entry = this.map.get(responseId);
if (!entry) return null;
if (Date.now() - entry.createdAt > this.ttlMs) {
this.map.delete(responseId);
return null;
}
return entry;
}

/** Remove expired entries. */
private cleanup(): void {
const now = Date.now();
for (const [key, entry] of this.map) {
if (now - entry.createdAt > this.ttlMs) {
this.map.delete(key);
}
}
}

get size(): number {
return this.map.size;
}

dispose(): void {
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer);
this.cleanupTimer = null;
}
this.map.clear();
}
}

/** Singleton instance. */
let instance: SessionAffinityMap | null = null;

export function getSessionAffinityMap(): SessionAffinityMap {
if (!instance) {
instance = new SessionAffinityMap();
}
return instance;
}
5 changes: 4 additions & 1 deletion src/proxy/codex-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ export class CodexApi {
if (request.tools?.length) wsRequest.tools = request.tools;
if (request.tool_choice) wsRequest.tool_choice = request.tool_choice;
if (request.text) wsRequest.text = request.text;
if (request.service_tier) wsRequest.service_tier = request.service_tier;
if (request.prompt_cache_key) wsRequest.prompt_cache_key = request.prompt_cache_key;
if (request.include?.length) wsRequest.include = request.include;

return createWebSocketResponse(wsUrl, headers, wsRequest, signal, this.proxyUrl);
}
Expand All @@ -233,7 +236,7 @@ export class CodexApi {
headers["Accept"] = "text/event-stream";
headers["OpenAI-Beta"] = "responses_websockets=2026-02-06";

const { service_tier: _st, previous_response_id: _pid, useWebSocket: _ws, ...bodyFields } = request;
const { previous_response_id: _pid, useWebSocket: _ws, ...bodyFields } = request;
const body = JSON.stringify(bodyFields);

let transportRes;
Expand Down
4 changes: 4 additions & 0 deletions src/proxy/codex-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ export interface CodexResponsesRequest {
};
/** Optional: reference a previous response for multi-turn (WebSocket only). */
previous_response_id?: string;
/** Prompt cache key — stable per-conversation UUID for backend prompt caching. */
prompt_cache_key?: string;
/** Include additional response data (e.g. "reasoning.encrypted_content"). */
include?: string[];
/** When true, use WebSocket transport (enables previous_response_id and server-side storage). */
useWebSocket?: boolean;
}
Expand Down
3 changes: 3 additions & 0 deletions src/proxy/ws-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ export interface WsCreateRequest {
strict?: boolean;
};
};
service_tier?: string | null;
prompt_cache_key?: string;
include?: string[];
// NOTE: `store` and `stream` are intentionally omitted.
// The backend defaults to storing via WebSocket and always streams.
}
Expand Down
12 changes: 10 additions & 2 deletions src/routes/shared/__tests__/account-acquisition.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,18 @@ describe("acquireAccount", () => {

const result = acquireAccount(pool as never, "gpt-5.4", ["x1"], "OpenAI");

expect(pool.acquire).toHaveBeenCalledWith({ model: "gpt-5.4", excludeIds: ["x1"] });
expect(pool.acquire).toHaveBeenCalledWith({ model: "gpt-5.4", excludeIds: ["x1"], preferredEntryId: undefined });
expect(result).toEqual({ entryId: "e1", token: "t1", accountId: "a1" });
});

it("passes preferredEntryId for session affinity", () => {
pool.acquire.mockReturnValue({ entryId: "e1", token: "t1", accountId: "a1" });

acquireAccount(pool as never, "gpt-5.4", undefined, "OpenAI", "e1");

expect(pool.acquire).toHaveBeenCalledWith({ model: "gpt-5.4", excludeIds: undefined, preferredEntryId: "e1" });
});

it("returns null when pool has no available account", () => {
pool.acquire.mockReturnValue(null);

Expand All @@ -45,7 +53,7 @@ describe("acquireAccount", () => {

acquireAccount(pool as never, "gpt-5.4", undefined, "OpenAI");

expect(pool.acquire).toHaveBeenCalledWith({ model: "gpt-5.4", excludeIds: undefined });
expect(pool.acquire).toHaveBeenCalledWith({ model: "gpt-5.4", excludeIds: undefined, preferredEntryId: undefined });
});
});

Expand Down
3 changes: 2 additions & 1 deletion src/routes/shared/account-acquisition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ export function acquireAccount(
model: string,
excludeIds?: string[],
tag?: string,
preferredEntryId?: string,
): AcquiredAccount | null {
const acquired = pool.acquire({ model, excludeIds });
const acquired = pool.acquire({ model, excludeIds, preferredEntryId });
if (!acquired && tag) {
console.warn(`[${tag}] No available account for model "${model}"`);
}
Expand Down
Loading
Loading