Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit e42ae28

Browse files
committed
feat(core): support running engine
1 parent 75b93de commit e42ae28

File tree

5 files changed

+394
-3
lines changed

5 files changed

+394
-3
lines changed

packages/rivetkit/src/drivers/file-system/utils.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,15 @@ export function getStoragePath(customPath?: string): string {
5656
* Check if a path exists
5757
*/
5858
export async function pathExists(path: string): Promise<boolean> {
59+
console.log("check 1 ====");
5960
try {
60-
await fs.access(path);
61-
return true;
62-
} catch {
61+
console.log("check 2 ====");
62+
console.trace("pathExists checking ======== lksjdflkdsjf", path);
63+
return false;
64+
// const stats = await fs.stat(path);
65+
// return stats.isDirectory();
66+
} catch (error: any) {
67+
console.log("pathExists missing", path, error?.code ?? "unknown");
6368
return false;
6469
}
6570
}
@@ -70,9 +75,13 @@ export async function pathExists(path: string): Promise<boolean> {
7075
export async function ensureDirectoryExists(
7176
directoryPath: string,
7277
): Promise<void> {
78+
console.log("ensuring path ====", directoryPath);
7379
if (!(await pathExists(directoryPath))) {
80+
throw "ERROR";
81+
console.log("creating path", directoryPath);
7482
await fs.mkdir(directoryPath, { recursive: true });
7583
}
84+
console.log("after");
7685
}
7786

7887
/**
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
import { spawn } from "node:child_process";
2+
import { createWriteStream } from "node:fs";
3+
import * as fs from "node:fs/promises";
4+
import * as path from "node:path";
5+
import { pipeline } from "node:stream/promises";
6+
import { getLogger } from "@/common/log";
7+
import {
8+
ensureDirectoryExists,
9+
getStoragePath,
10+
} from "@/drivers/file-system/utils";
11+
12+
const ENGINE_BASE_URL = "https://releases.rivet.gg/engine";
13+
const ENGINE_BINARY_NAME = "rivet-engine";
14+
const ENGINE_PID_FILE = "engine-pid";
15+
const ENGINE_VERSION_FILE = "engine-version";
16+
17+
export interface EngineProcessState {
18+
binaryPath: string;
19+
pid: number;
20+
alreadyRunning: boolean;
21+
}
22+
23+
interface EnsureEngineProcessOptions {
24+
version: string;
25+
}
26+
27+
export async function ensureEngineProcess(
28+
options: EnsureEngineProcessOptions,
29+
): Promise<EngineProcessState> {
30+
logger().debug({ msg: "ensuring engine process", version: options.version });
31+
const storageRoot = getStoragePath();
32+
console.log('==== point 1');
33+
const binDir = path.join(storageRoot, "bin");
34+
const varDir = path.join(storageRoot, "var");
35+
await ensureDirectoryExists(binDir);
36+
console.log('==== point 2');
37+
await ensureDirectoryExists(varDir);
38+
console.log('==== point 3');
39+
40+
const executableName =
41+
process.platform === "win32"
42+
? `${ENGINE_BINARY_NAME}.exe`
43+
: ENGINE_BINARY_NAME;
44+
const binaryPath = path.join(binDir, executableName);
45+
console.log('==== point 4');
46+
await downloadEngineBinaryIfNeeded(binaryPath, options.version, varDir);
47+
48+
const pidFilePath = path.join(varDir, ENGINE_PID_FILE);
49+
const existingPid = await readExistingPid(pidFilePath);
50+
if (existingPid && isProcessRunning(existingPid)) {
51+
logger().debug({
52+
msg: "engine already running",
53+
pid: existingPid,
54+
version: options.version,
55+
});
56+
return { binaryPath, pid: existingPid, alreadyRunning: true };
57+
}
58+
59+
if (existingPid) {
60+
logger().debug({ msg: "removing stale engine pid", pid: existingPid });
61+
await fs.rm(pidFilePath, { force: true });
62+
}
63+
64+
const child = spawn(binaryPath, ["start"], {
65+
cwd: path.dirname(binaryPath),
66+
stdio: "inherit",
67+
env: {
68+
...process.env,
69+
RIVET__AUTH__ADMIN_TOKEN: process.env.RIVET__AUTH__ADMIN_TOKEN ?? "dev",
70+
},
71+
});
72+
73+
if (!child.pid) {
74+
throw new Error("failed to spawn rivet engine process");
75+
}
76+
77+
logger().debug({
78+
msg: "spawned engine process",
79+
pid: child.pid,
80+
cwd: path.dirname(binaryPath),
81+
});
82+
83+
child.once("exit", (code, signal) => {
84+
logger().warn({
85+
msg: "engine process exited",
86+
code,
87+
signal,
88+
});
89+
void fs.rm(pidFilePath, { force: true });
90+
});
91+
92+
child.once("error", (error) => {
93+
logger().error({
94+
msg: "engine process failed",
95+
error,
96+
});
97+
});
98+
99+
await fs.writeFile(pidFilePath, child.pid.toString(), "utf8");
100+
logger().info({
101+
msg: "engine process started",
102+
pid: child.pid,
103+
version: options.version,
104+
});
105+
106+
return { binaryPath, pid: child.pid, alreadyRunning: false };
107+
}
108+
109+
async function downloadEngineBinaryIfNeeded(
110+
binaryPath: string,
111+
version: string,
112+
varDir: string,
113+
): Promise<void> {
114+
const versionFilePath = path.join(varDir, ENGINE_VERSION_FILE);
115+
console.log('==== point 5');
116+
const currentVersion = await readCurrentVersion(versionFilePath);
117+
console.log('==== point 6');
118+
const binaryExists = await fileExists(binaryPath);
119+
console.log('==== point 7');
120+
if (binaryExists && currentVersion === version) {
121+
logger().debug({
122+
msg: "engine binary already cached",
123+
version,
124+
path: binaryPath,
125+
});
126+
return;
127+
}
128+
129+
const { targetTriplet, extension } = resolveTargetTriplet();
130+
const remoteFile = `${ENGINE_BINARY_NAME}-${targetTriplet}${extension}`;
131+
const downloadUrl = `${ENGINE_BASE_URL}/${version}/${remoteFile}`;
132+
logger().info({
133+
msg: "downloading engine binary",
134+
url: downloadUrl,
135+
path: binaryPath,
136+
version,
137+
});
138+
139+
const response = await fetch(downloadUrl);
140+
if (!response.ok || !response.body) {
141+
throw new Error(
142+
`failed to download rivet engine binary from ${downloadUrl}: ${response.status} ${response.statusText}`,
143+
);
144+
}
145+
146+
const tempPath = `${binaryPath}.${process.pid}.tmp`;
147+
await pipeline(response.body, createWriteStream(tempPath));
148+
if (process.platform !== "win32") {
149+
await fs.chmod(tempPath, 0o755);
150+
}
151+
await fs.rename(tempPath, binaryPath);
152+
await fs.writeFile(versionFilePath, version, "utf8");
153+
logger().debug({
154+
msg: "engine binary download complete",
155+
version,
156+
path: binaryPath,
157+
});
158+
logger().info({
159+
msg: "engine binary downloaded",
160+
version,
161+
path: binaryPath,
162+
});
163+
}
164+
165+
async function readExistingPid(
166+
pidFilePath: string,
167+
): Promise<number | undefined> {
168+
try {
169+
const content = await fs.readFile(pidFilePath, "utf8");
170+
const pid = Number.parseInt(content.trim(), 10);
171+
return Number.isFinite(pid) ? pid : undefined;
172+
} catch (error: any) {
173+
if (error?.code === "ENOENT") return undefined;
174+
logger().warn({
175+
msg: "failed to read engine pid",
176+
error,
177+
});
178+
return undefined;
179+
}
180+
}
181+
182+
async function readCurrentVersion(
183+
versionFilePath: string,
184+
): Promise<string | undefined> {
185+
try {
186+
return (await fs.readFile(versionFilePath, "utf8")).trim();
187+
} catch (error: any) {
188+
if (error?.code !== "ENOENT") {
189+
logger().debug({
190+
msg: "failed to read engine version",
191+
error,
192+
});
193+
}
194+
return undefined;
195+
}
196+
}
197+
198+
function resolveTargetTriplet(): { targetTriplet: string; extension: string } {
199+
return resolveTargetTripletFor(process.platform, process.arch);
200+
}
201+
202+
type NodeArch = typeof process.arch;
203+
204+
export function resolveTargetTripletFor(
205+
platform: NodeJS.Platform,
206+
arch: NodeArch,
207+
): { targetTriplet: string; extension: string } {
208+
switch (platform) {
209+
case "darwin":
210+
if (arch === "arm64") {
211+
return { targetTriplet: "aarch64-apple-darwin", extension: "" };
212+
}
213+
if (arch === "x64") {
214+
return { targetTriplet: "x86_64-apple-darwin", extension: "" };
215+
}
216+
break;
217+
case "linux":
218+
if (arch === "x64") {
219+
return { targetTriplet: "x86_64-unknown-linux-musl", extension: "" };
220+
}
221+
break;
222+
case "win32":
223+
if (arch === "x64") {
224+
return { targetTriplet: "x86_64-pc-windows-gnu", extension: ".exe" };
225+
}
226+
break;
227+
}
228+
229+
throw new Error(
230+
`unsupported platform for rivet engine binary: ${platform}/${arch}`,
231+
);
232+
}
233+
234+
function isProcessRunning(pid: number): boolean {
235+
try {
236+
process.kill(pid, 0);
237+
return true;
238+
} catch (error: any) {
239+
if (error?.code === "EPERM") {
240+
return true;
241+
}
242+
return false;
243+
}
244+
}
245+
246+
async function fileExists(filePath: string): Promise<boolean> {
247+
try {
248+
await fs.access(filePath);
249+
return true;
250+
} catch {
251+
return false;
252+
}
253+
}
254+
255+
function logger() {
256+
return getLogger("engine-process");
257+
}

0 commit comments

Comments
 (0)