diff --git a/examples/counter/package.json b/examples/counter/package.json index 6f8cf175b..6212603e8 100644 --- a/examples/counter/package.json +++ b/examples/counter/package.json @@ -6,6 +6,7 @@ "scripts": { "dev": "tsx src/server.ts", "check-types": "tsc --noEmit", + "connect": "tsx scripts/connect.ts", "test": "vitest run" }, "devDependencies": { diff --git a/examples/counter/scripts/connect.ts b/examples/counter/scripts/connect.ts index 251eb0cb8..805b957ad 100644 --- a/examples/counter/scripts/connect.ts +++ b/examples/counter/scripts/connect.ts @@ -1,28 +1,12 @@ import { createClient } from "rivetkit/client"; import type { Registry } from "../src/registry"; -// async function main() { -// const client = createClient(); -// -// const counter = await client.counter.getOrCreate().connect(); -// -// counter.on("newCount", (count: number) => console.log("Event:", count)); -// -// for (let i = 0; i < 5; i++) { -// const out = await counter.increment(5); -// console.log("RPC:", out); -// -// await new Promise((resolve) => setTimeout(resolve, 1000)); -// } -// -// await new Promise((resolve) => setTimeout(resolve, 10000)); -// await counter.dispose(); -// } - async function main() { - const client = createClient(); + const client = createClient("http://localhost:6420"); + + const counter = client.counter.getOrCreate().connect(); - const counter = await client.counter.getOrCreate(); + counter.on("newCount", (count: number) => console.log("Event:", count)); for (let i = 0; i < 5; i++) { const out = await counter.increment(5); @@ -30,6 +14,9 @@ async function main() { await new Promise((resolve) => setTimeout(resolve, 1000)); } + + await new Promise((resolve) => setTimeout(resolve, 10000)); + await counter.dispose(); } main(); diff --git a/packages/rivetkit/src/engine-process/log.ts b/packages/rivetkit/src/engine-process/log.ts new file mode 100644 index 000000000..87b8ab306 --- /dev/null +++ b/packages/rivetkit/src/engine-process/log.ts @@ -0,0 +1,5 @@ +import { getLogger } from "@/common/log"; + +export function logger() { + return getLogger("engine-process"); +} diff --git a/packages/rivetkit/src/engine-process/mod.ts b/packages/rivetkit/src/engine-process/mod.ts new file mode 100644 index 000000000..21d929c80 --- /dev/null +++ b/packages/rivetkit/src/engine-process/mod.ts @@ -0,0 +1,316 @@ +import { spawn } from "node:child_process"; +import { createWriteStream } from "node:fs"; +import * as fs from "node:fs/promises"; +import * as path from "node:path"; +import { pipeline } from "node:stream/promises"; +import { + ensureDirectoryExists, + getStoragePath, +} from "@/drivers/file-system/utils"; +import { logger } from "./log"; + +export const ENGINE_PORT = 6420; +export const ENGINE_ENDPOINT = `http://localhost:${ENGINE_PORT}`; + +const ENGINE_BASE_URL = "https://releases.rivet.gg/engine"; +const ENGINE_BINARY_NAME = "rivet-engine"; + +interface EnsureEngineProcessOptions { + version: string; +} + +export async function ensureEngineProcess( + options: EnsureEngineProcessOptions, +): Promise { + logger().debug({ msg: "ensuring engine process", version: options.version }); + const storageRoot = getStoragePath(); + const binDir = path.join(storageRoot, "bin"); + const varDir = path.join(storageRoot, "var"); + const logsDir = path.join(varDir, "logs", "rivet-engine"); + await ensureDirectoryExists(binDir); + await ensureDirectoryExists(varDir); + await ensureDirectoryExists(logsDir); + + const executableName = + process.platform === "win32" + ? `${ENGINE_BINARY_NAME}-${options.version}.exe` + : `${ENGINE_BINARY_NAME}-${options.version}`; + const binaryPath = path.join(binDir, executableName); + await downloadEngineBinaryIfNeeded(binaryPath, options.version, varDir); + + // Check if the engine is already running on the port + if (await isEngineRunning()) { + try { + await waitForEngineHealth(); + logger().debug({ + msg: "engine already running and healthy", + version: options.version, + }); + return; + } catch (error) { + logger().warn({ + msg: "existing engine process not healthy, cannot restart automatically", + error, + }); + throw new Error( + "Engine process exists but is not healthy. Please manually stop the process on port 6420 and retry.", + ); + } + } + + // Create log file streams with timestamp in the filename + const timestamp = new Date() + .toISOString() + .replace(/:/g, "-") + .replace(/\./g, "-"); + const stdoutLogPath = path.join(logsDir, `engine-${timestamp}-stdout.log`); + const stderrLogPath = path.join(logsDir, `engine-${timestamp}-stderr.log`); + + const stdoutStream = createWriteStream(stdoutLogPath, { flags: "a" }); + const stderrStream = createWriteStream(stderrLogPath, { flags: "a" }); + + logger().debug({ + msg: "creating engine log files", + stdout: stdoutLogPath, + stderr: stderrLogPath, + }); + + const child = spawn(binaryPath, ["start"], { + cwd: path.dirname(binaryPath), + stdio: ["inherit", "pipe", "pipe"], + env: { + ...process.env, + }, + }); + + if (!child.pid) { + throw new Error("failed to spawn rivet engine process"); + } + + // Pipe stdout and stderr to log files + if (child.stdout) { + child.stdout.pipe(stdoutStream); + } + if (child.stderr) { + child.stderr.pipe(stderrStream); + } + + logger().debug({ + msg: "spawned engine process", + pid: child.pid, + cwd: path.dirname(binaryPath), + }); + + child.once("exit", (code, signal) => { + logger().warn({ + msg: "engine process exited", + code, + signal, + }); + // Clean up log streams + stdoutStream.end(); + stderrStream.end(); + }); + + child.once("error", (error) => { + logger().error({ + msg: "engine process failed", + error, + }); + // Clean up log streams on error + stdoutStream.end(); + stderrStream.end(); + }); + + // Wait for engine to be ready + await waitForEngineHealth(); + + logger().info({ + msg: "engine process started", + pid: child.pid, + version: options.version, + logs: { + stdout: stdoutLogPath, + stderr: stderrLogPath, + }, + }); +} + +async function downloadEngineBinaryIfNeeded( + binaryPath: string, + version: string, + varDir: string, +): Promise { + const binaryExists = await fileExists(binaryPath); + if (binaryExists) { + logger().debug({ + msg: "engine binary already cached", + version, + path: binaryPath, + }); + return; + } + + const { targetTriplet, extension } = resolveTargetTriplet(); + const remoteFile = `${ENGINE_BINARY_NAME}-${targetTriplet}${extension}`; + const downloadUrl = `${ENGINE_BASE_URL}/${version}/${remoteFile}`; + logger().info({ + msg: "downloading engine binary", + url: downloadUrl, + path: binaryPath, + version, + }); + + const response = await fetch(downloadUrl); + if (!response.ok || !response.body) { + throw new Error( + `failed to download rivet engine binary from ${downloadUrl}: ${response.status} ${response.statusText}`, + ); + } + + const tempPath = `${binaryPath}.${process.pid}.tmp`; + await pipeline(response.body, createWriteStream(tempPath)); + if (process.platform !== "win32") { + await fs.chmod(tempPath, 0o755); + } + await fs.rename(tempPath, binaryPath); + logger().debug({ + msg: "engine binary download complete", + version, + path: binaryPath, + }); + logger().info({ + msg: "engine binary downloaded", + version, + path: binaryPath, + }); +} + +function resolveTargetTriplet(): { targetTriplet: string; extension: string } { + return resolveTargetTripletFor(process.platform, process.arch); +} + +export function resolveTargetTripletFor( + platform: NodeJS.Platform, + arch: typeof process.arch, +): { targetTriplet: string; extension: string } { + switch (platform) { + case "darwin": + if (arch === "arm64") { + return { targetTriplet: "aarch64-apple-darwin", extension: "" }; + } + if (arch === "x64") { + return { targetTriplet: "x86_64-apple-darwin", extension: "" }; + } + break; + case "linux": + if (arch === "x64") { + return { targetTriplet: "x86_64-unknown-linux-musl", extension: "" }; + } + break; + case "win32": + if (arch === "x64") { + return { targetTriplet: "x86_64-pc-windows-gnu", extension: ".exe" }; + } + break; + } + + throw new Error( + `unsupported platform for rivet engine binary: ${platform}/${arch}`, + ); +} + +async function isEngineRunning(): Promise { + // Check if the engine is running on the port + return await checkIfEngineAlreadyRunningOnPort(ENGINE_PORT); +} + +async function checkIfEngineAlreadyRunningOnPort( + port: number, +): Promise { + let response: Response; + try { + response = await fetch(`http://localhost:${port}/health`); + } catch (err) { + // Nothing is running on this port + return false; + } + + if (response.ok) { + const health = (await response.json()) as { + status?: string; + runtime?: string; + version?: string; + }; + + // Check what's running on this port + if (health.runtime === "engine") { + logger().debug({ + msg: "rivet engine already running on port", + port, + }); + return true; + } else if (health.runtime === "rivetkit") { + logger().error({ + msg: "another rivetkit process is already running on port", + port, + }); + throw new Error( + "RivetKit process already running on port 6420, stop that process and restart this.", + ); + } else { + throw new Error( + "Unknown process running on port 6420, cannot identify what it is.", + ); + } + } + + // Port responded but not with OK status + return false; +} + +async function fileExists(filePath: string): Promise { + try { + await fs.access(filePath); + return true; + } catch { + return false; + } +} + +const HEALTH_MAX_WAIT = 10_000; +const HEALTH_INTERVAL = 100; + +async function waitForEngineHealth(): Promise { + const maxRetries = Math.ceil(HEALTH_MAX_WAIT / HEALTH_INTERVAL); + + logger().debug({ msg: "waiting for engine health check" }); + + for (let i = 0; i < maxRetries; i++) { + try { + const response = await fetch(`${ENGINE_ENDPOINT}/health`); + if (response.ok) { + logger().debug({ msg: "engine health check passed" }); + return; + } + } catch (error) { + // Expected to fail while engine is starting up + if (i === maxRetries - 1) { + throw new Error( + `engine health check failed after ${maxRetries} retries: ${error}`, + ); + } + } + + if (i < maxRetries - 1) { + logger().trace({ + msg: "engine not ready, retrying", + attempt: i + 1, + maxRetries, + }); + await new Promise((resolve) => setTimeout(resolve, HEALTH_INTERVAL)); + } + } + + throw new Error(`engine health check failed after ${maxRetries} retries`); +} diff --git a/packages/rivetkit/src/registry/mod.ts b/packages/rivetkit/src/registry/mod.ts index 561e8ed6a..f4c4b456c 100644 --- a/packages/rivetkit/src/registry/mod.ts +++ b/packages/rivetkit/src/registry/mod.ts @@ -1,7 +1,9 @@ +import invariant from "invariant"; import { type Client, createClientWithDriver } from "@/client/client"; import { configureBaseLogger, configureDefaultLogger } from "@/common/log"; import type { ActorDriver } from "@/driver-helpers/mod"; import { chooseDefaultDriver } from "@/drivers/default"; +import { ENGINE_ENDPOINT, ensureEngineProcess } from "@/engine-process/mod"; import { configureInspectorAccessToken, getInspectorUrl, @@ -55,6 +57,34 @@ export class Registry { public start(inputConfig?: RunnerConfigInput): ServerOutput { const config = RunnerConfigSchema.parse(inputConfig); + // Promise for any async operations we need to wait to complete + const readyPromises = []; + + // Start engine + if (config.runEngine) { + logger().debug({ + msg: "run engine requested", + version: config.runEngineVersion, + }); + + // Set config to point to the engine + config.disableDefaultServer = true; + config.overrideServerAddress = ENGINE_ENDPOINT; + invariant( + config.endpoint === undefined, + "cannot specify 'endpoint' with 'runEngine'", + ); + config.endpoint = ENGINE_ENDPOINT; + + // Start the engine + const engineProcessPromise = ensureEngineProcess({ + version: config.runEngineVersion, + }); + + // Chain ready promise + readyPromises.push(engineProcessPromise); + } + // Configure logger if (config.logging?.baseLogger) { // Use provided base logger @@ -117,6 +147,10 @@ export class Registry { } else if (config.overrideServerAddress) { console.log(` - Endpoint: ${config.overrideServerAddress}`); } + if (config.runEngine) { + const padding = " ".repeat(Math.max(0, 13 - "Engine".length)); + console.log(` - Engine:${padding}v${config.runEngineVersion}`); + } for (const [k, v] of Object.entries(displayInfo.properties)) { const padding = " ".repeat(Math.max(0, 13 - k.length)); console.log(` - ${k}:${padding}${v}`); @@ -133,12 +167,11 @@ export class Registry { // // Even though we do not use the return value, this is required to start the code that will handle incoming actors if (!config.disableActorDriver) { - const _actorDriver = driver.actor( - this.#config, - config, - managerDriver, - client, - ); + Promise.all(readyPromises).then(() => { + logger().debug("ready promises finished, starting actor driver"); + + driver.actor(this.#config, config, managerDriver, client); + }); } else { serverlessActorDriverBuilder = ( token, diff --git a/packages/rivetkit/src/registry/run-config.ts b/packages/rivetkit/src/registry/run-config.ts index 8eeb14a74..4d6fe13e8 100644 --- a/packages/rivetkit/src/registry/run-config.ts +++ b/packages/rivetkit/src/registry/run-config.ts @@ -37,6 +37,18 @@ export const RunnerConfigSchema = z /** @experimental */ disableDefaultServer: z.boolean().optional().default(false), + /** @experimental */ + runEngine: z + .boolean() + .optional() + .default(() => getEnvUniversal("RIVET_RUN_ENGINE") === "1"), + + /** @experimental */ + runEngineVersion: z + .string() + .optional() + .default(() => getEnvUniversal("RIVET_RUN_ENGINE_VERSION") ?? "25.7.3"), + /** @experimental */ overrideServerAddress: z.string().optional(), diff --git a/packages/rivetkit/tests/engine-process.test.ts b/packages/rivetkit/tests/engine-process.test.ts new file mode 100644 index 000000000..b217af9c5 --- /dev/null +++ b/packages/rivetkit/tests/engine-process.test.ts @@ -0,0 +1,38 @@ +import { describe, expect, it } from "vitest"; +import { resolveTargetTripletFor } from "@/engine-process/mod"; + +describe("resolveTargetTripletFor", () => { + it("returns darwin arm64 target", () => { + expect(resolveTargetTripletFor("darwin", "arm64")).toEqual({ + targetTriplet: "aarch64-apple-darwin", + extension: "", + }); + }); + + it("returns darwin x64 target", () => { + expect(resolveTargetTripletFor("darwin", "x64")).toEqual({ + targetTriplet: "x86_64-apple-darwin", + extension: "", + }); + }); + + it("returns linux x64 target", () => { + expect(resolveTargetTripletFor("linux", "x64")).toEqual({ + targetTriplet: "x86_64-unknown-linux-musl", + extension: "", + }); + }); + + it("returns windows x64 target", () => { + expect(resolveTargetTripletFor("win32", "x64")).toEqual({ + targetTriplet: "x86_64-pc-windows-gnu", + extension: ".exe", + }); + }); + + it("throws for unsupported combinations", () => { + expect(() => + resolveTargetTripletFor("linux", "arm64" as typeof process.arch), + ).toThrow("unsupported platform for rivet engine binary: linux/arm64"); + }); +});