Skip to content

Commit 2e9e3fa

Browse files
authored
Fix Await to process Go-scheduled jobs instead of blocking in js_std_loop (#690)
* fix: replace blocking js_std_loop in Await with cooperative polling * fix: feedback from gemini * fix: adding one more test * fix: more coverage - the nil check on ctx.jobQueue at line 1056 is redundant since len() on a nil channel returns 0 - more coverage
1 parent b8d50de commit 2e9e3fa

2 files changed

Lines changed: 151 additions & 5 deletions

File tree

context.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"os"
66
"sync/atomic"
7+
"time"
78
"unsafe"
89
)
910

@@ -25,6 +26,11 @@ type Context struct {
2526

2627
const defaultJobQueueSize = 1024
2728

29+
// awaitPollInterval is the duration the Await loop sleeps when no JS or Go
30+
// jobs are pending. Keeps CPU usage low while ensuring Go-scheduled work
31+
// (e.g., resolved Promises from goroutines) is picked up promptly.
32+
const awaitPollInterval = time.Millisecond
33+
2834
// awaitPromiseStateHook and awaitExecutePendingJobHook are used only in tests to
2935
// force specific Await code paths; they must remain nil in production.
3036
var (
@@ -985,7 +991,13 @@ func (ctx *Context) Loop() {
985991
ctx.ProcessJobs()
986992
}
987993

988-
// Wait for a promise and execute pending jobs while waiting for it. Return the promise result or JS_EXCEPTION in case of promise rejection.
994+
// Wait for a promise and execute pending jobs while waiting for it.
995+
// Return the promise result or JS_EXCEPTION in case of promise rejection.
996+
//
997+
// This implementation uses a polling loop instead of blocking in js_std_loop.
998+
// This allows Go-scheduled work (via ctx.Schedule) to be processed between
999+
// iterations, enabling async Go bridge functions (fetch, storage, etc.) to
1000+
// resolve Promises from goroutines without blocking the event loop.
9891001
func (ctx *Context) Await(v *Value) *Value {
9901002
if v == nil || !v.IsPromise() {
9911003
return v
@@ -1002,7 +1014,9 @@ func (ctx *Context) Await(v *Value) *Value {
10021014
runtimeRef := ctx.runtime.ref
10031015

10041016
for {
1017+
// Drain Go-scheduled work (resolve/reject from goroutines)
10051018
ctx.ProcessJobs()
1019+
10061020
state := C.JS_PromiseState(ctx.ref, promise.ref)
10071021
if hook := awaitPromiseStateHook; hook != nil {
10081022
if override, ok := hook(ctx, promise, int(state)); ok {
@@ -1016,6 +1030,7 @@ func (ctx *Context) Await(v *Value) *Value {
10161030
reason := C.JS_PromiseResult(ctx.ref, promise.ref)
10171031
return &Value{ctx: ctx, ref: C.JS_Throw(ctx.ref, reason)}
10181032
case pendingState:
1033+
// Process JS microtasks (Promise.then callbacks, queueMicrotask)
10191034
executed := C.JS_ExecutePendingJob(runtimeRef, nil)
10201035
if hook := awaitExecutePendingJobHook; hook != nil {
10211036
if override, ok := hook(ctx, promise, int(executed)); ok {
@@ -1026,7 +1041,22 @@ func (ctx *Context) Await(v *Value) *Value {
10261041
return ctx.ThrowInternalError("failed to execute pending job")
10271042
}
10281043
if executed == 0 {
1029-
C.js_std_loop(ctx.ref)
1044+
// No JS microtasks pending. Check for Go-scheduled work first.
1045+
ctx.ProcessJobs()
1046+
1047+
// Re-check promise state — Go jobs may have resolved it.
1048+
newState := C.JS_PromiseState(ctx.ref, promise.ref)
1049+
if newState != pendingState {
1050+
continue // resolved — loop back to handle it
1051+
}
1052+
1053+
// Still pending. Check if there are pending Go jobs in the queue.
1054+
// If so, keep polling. If not, yield briefly — a goroutine will
1055+
// Schedule work soon (HTTP response, storage result, etc.)
1056+
if len(ctx.jobQueue) > 0 {
1057+
continue // more Go jobs to process
1058+
}
1059+
time.Sleep(awaitPollInterval)
10301060
}
10311061
default:
10321062
return v

context_test.go

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,17 +1002,52 @@ func TestContextInternalsCoverage(t *testing.T) {
10021002
require.Nil(t, ctx.Await(nilPromise))
10031003
})
10041004

1005-
t.Run("AwaitDrivesStdLoopForTimeout", func(t *testing.T) {
1005+
t.Run("AwaitDrivesScheduleForResolve", func(t *testing.T) {
10061006
rt := NewRuntime()
10071007
defer rt.Close()
10081008
ctx := rt.NewContext()
10091009
defer ctx.Close()
10101010

1011-
promise := ctx.Eval(`new Promise((resolve) => { setTimeout(() => resolve("timer result"), 0); })`)
1011+
// Test that Await processes Go-scheduled work (via ctx.Schedule)
1012+
// instead of relying on js_std_loop for C-level timers.
1013+
promise := ctx.NewPromise(func(resolve, reject func(*Value)) {
1014+
go func() {
1015+
ctx.Schedule(func(ctx *Context) {
1016+
val := ctx.NewString("scheduled result")
1017+
defer val.Free()
1018+
resolve(val)
1019+
})
1020+
}()
1021+
})
1022+
defer promise.Free()
1023+
result := ctx.Await(promise)
1024+
defer result.Free()
1025+
require.Equal(t, "scheduled result", result.ToString())
1026+
})
1027+
1028+
t.Run("AwaitPollsUntilDelayedResolve", func(t *testing.T) {
1029+
rt := NewRuntime()
1030+
defer rt.Close()
1031+
ctx := rt.NewContext()
1032+
defer ctx.Close()
1033+
1034+
// Test that the Await polling loop correctly yields and re-checks
1035+
// when the Promise is resolved after a delay from a goroutine.
1036+
// This exercises the time.Sleep path in Await's pending case.
1037+
promise := ctx.NewPromise(func(resolve, reject func(*Value)) {
1038+
go func() {
1039+
time.Sleep(5 * time.Millisecond) // force Await to poll a few times
1040+
ctx.Schedule(func(ctx *Context) {
1041+
val := ctx.NewString("delayed result")
1042+
defer val.Free()
1043+
resolve(val)
1044+
})
1045+
}()
1046+
})
10121047
defer promise.Free()
10131048
result := ctx.Await(promise)
10141049
defer result.Free()
1015-
require.Equal(t, "timer result", result.ToString())
1050+
require.Equal(t, "delayed result", result.ToString())
10161051
})
10171052

10181053
t.Run("AwaitHandlesPendingJobFailure", func(t *testing.T) {
@@ -1043,6 +1078,87 @@ func TestContextInternalsCoverage(t *testing.T) {
10431078
require.Contains(t, err.Error(), "failed to execute pending job")
10441079
})
10451080

1081+
// Cover line 1049-1050: executed==0, ProcessJobs resolves the promise,
1082+
// re-check sees non-pending state → continue.
1083+
t.Run("AwaitReCheckResolvesAfterProcessJobs", func(t *testing.T) {
1084+
rt := NewRuntime()
1085+
defer rt.Close()
1086+
ctx := rt.NewContext()
1087+
defer ctx.Close()
1088+
1089+
var resolvePromise func(*Value)
1090+
promise := ctx.NewPromise(func(resolve, reject func(*Value)) {
1091+
resolvePromise = resolve
1092+
})
1093+
defer promise.Free()
1094+
1095+
firstCall := true
1096+
awaitExecutePendingJobHook = func(hookCtx *Context, _ *Value, current int) (int, bool) {
1097+
if hookCtx != ctx {
1098+
return current, false
1099+
}
1100+
if firstCall {
1101+
firstCall = false
1102+
// Schedule a job that resolves the promise. ProcessJobs() at
1103+
// line 1045 will pick it up, so the re-check at line 1048
1104+
// sees fulfilled state.
1105+
ctx.Schedule(func(inner *Context) {
1106+
val := inner.NewString("resolved-via-recheck")
1107+
defer val.Free()
1108+
resolvePromise(val)
1109+
})
1110+
return 0, true // force executed=0
1111+
}
1112+
return current, false
1113+
}
1114+
t.Cleanup(func() { awaitExecutePendingJobHook = nil })
1115+
1116+
result := ctx.Await(promise)
1117+
defer result.Free()
1118+
require.Equal(t, "resolved-via-recheck", result.ToString())
1119+
})
1120+
1121+
// Cover line 1056-1057: executed==0, ProcessJobs drains but promise
1122+
// stays pending, yet another Go job is already queued → continue.
1123+
t.Run("AwaitContinuesWhenJobQueueNonEmpty", func(t *testing.T) {
1124+
rt := NewRuntime()
1125+
defer rt.Close()
1126+
ctx := rt.NewContext()
1127+
defer ctx.Close()
1128+
1129+
var resolvePromise func(*Value)
1130+
promise := ctx.NewPromise(func(resolve, reject func(*Value)) {
1131+
resolvePromise = resolve
1132+
})
1133+
defer promise.Free()
1134+
1135+
callCount := 0
1136+
awaitExecutePendingJobHook = func(hookCtx *Context, _ *Value, current int) (int, bool) {
1137+
if hookCtx != ctx {
1138+
return current, false
1139+
}
1140+
callCount++
1141+
if callCount == 1 {
1142+
// First iteration: force executed=0, and enqueue two jobs.
1143+
// ProcessJobs at line 1045 drains the first, but the second
1144+
// remains → len(jobQueue) > 0 → continue (line 1056-1057).
1145+
ctx.Schedule(func(*Context) {}) // drained by ProcessJobs
1146+
ctx.Schedule(func(inner *Context) { // stays in queue → triggers continue
1147+
val := inner.NewString("after-queue-check")
1148+
defer val.Free()
1149+
resolvePromise(val)
1150+
})
1151+
return 0, true
1152+
}
1153+
return current, false
1154+
}
1155+
t.Cleanup(func() { awaitExecutePendingJobHook = nil })
1156+
1157+
result := ctx.Await(promise)
1158+
defer result.Free()
1159+
require.Equal(t, "after-queue-check", result.ToString())
1160+
})
1161+
10461162
t.Run("AwaitFallsBackOnUnexpectedState", func(t *testing.T) {
10471163
rt := NewRuntime()
10481164
defer rt.Close()

0 commit comments

Comments
 (0)