diff --git a/coro/1_functions.png b/coro/1_functions.png new file mode 100644 index 0000000..40f3c59 Binary files /dev/null and b/coro/1_functions.png differ diff --git a/coro/2_coroutines.png b/coro/2_coroutines.png new file mode 100644 index 0000000..c5487c6 Binary files /dev/null and b/coro/2_coroutines.png differ diff --git a/coro/3_internals.png b/coro/3_internals.png new file mode 100644 index 0000000..22acfd3 Binary files /dev/null and b/coro/3_internals.png differ diff --git a/coro/README.md b/coro/README.md new file mode 100644 index 0000000..da293d9 --- /dev/null +++ b/coro/README.md @@ -0,0 +1,63 @@ +# coro + +### Sample code and notes about https://research.swtch.com/coro + +* Coroutines provide __concurrency without parallelism__: when one coroutine is running, the one that resumed it or yielded to it is not. +* Because __coroutines run one at a time and only switch at specific points__ in the program, the coroutines can share data among themselves without races. + * The explicit switches serve as synchronization points, creating happens-before edges + +## Converted Iterator Sequence + +* The exact order of the initial events depends on goroutine scheduling + * e.g. `resume` may be called before `goroutine1` blocks on `<-cin` +* `resume` + * executes on `main` + * returns control from `main` to `goroutine1` +* `f` function argument to `coro.New` (wrapped `iterator`) + * executes on `goroutine1` + * calls `yield` + * `yield` returns control from `goroutine1` to `main` + +```mermaid +sequenceDiagram + actor m as main + actor go as goroutine1 + participant in as cin + participant out as cout + + m->>m: resume := coro.New(push) + go->>in: in := <-cin + note over m,in: coro.New starts goroutine1
which blocks on <-cin, waiting
for the initial value from resume + + m->>m: resume(true) + m->>in: resume sends: cin <- true + m->>out: resume blocks: out <- cout + + in-->>go: true is received + go->>go: f(true, yield) is called + note over m,in: f is the arg passed to coro.New (push)
yield is created inside coro.New, and connects cin and cout + note over m,in: This begins the iterator inside push(),
which runs in goroutine1 + + go->>go: iterator calls yield(1) with first element + go->>out: yield sends first element (1) to cout + + go->>in: goroutine1 blocks on <-cin + out-->>m: first element (1) is received by resume + m->>m: v, ok := resume(true)
has returned + note over m,go: Control has returned to main
from goroutine1 + + alt + m->>m: resume(true) + note over m,go: Calling resume(true) again
returns control to goroutine1
and continues the iterator + else + m->>m: resume(false) + note over m,go: Calling resume(false)
terminates the coroutine + note over m,go: TODO sequence + end +``` + +--- + +## Kotlin + +> a coroutine is not bound to any particular thread. It may suspend its execution in one thread and resume in another one. diff --git a/coro/coro.go b/coro/coro.go new file mode 100644 index 0000000..223fa73 --- /dev/null +++ b/coro/coro.go @@ -0,0 +1,76 @@ +package coro + +import "fmt" + +func New[In, Out any](f func(In, func(Out) In) Out) (resume func(In) (Out, bool)) { + cin := make(chan In) + cout := make(chan Out) + running := true + resume = func(in In) (out Out, ok bool) { + if !running { + return + } + cin <- in + return <-cout, running + } + yield := func(out Out) In { + cout <- out + return <-cin + } + go func() { + out := f(<-cin, yield) + running = false + cout <- out + }() + return resume +} + +func Pull[V any](push func(yield func(V) bool) bool) (pull func() (V, bool), stop func()) { + copush := func(more bool, yield func(V) bool) V { + if more { + push(yield) + } + var zero V + return zero + } + resume := New(copush) + pull = func() (V, bool) { + return resume(true) + } + stop = func() { + resume(false) + } + return pull, stop +} + +func NewDebug[In, Out any](f func(In, func(Out) In) Out) (resume func(In) (Out, bool)) { + cin := make(chan In) + cout := make(chan Out) + running := true + resume = func(in In) (out Out, ok bool) { + if !running { + return + } + fmt.Printf("resume: send in=%v\n", in) + cin <- in + out = <-cout + fmt.Printf("resume: receive out=%v\n", out) + return out, running + } + yield := func(out Out) In { + fmt.Printf("yield: send out=%v\n", out) + cout <- out + in := <-cin + fmt.Printf("yield: receive in=%v\n", in) + return in + } + go func() { + fmt.Println("go func: wait for cin") + in := <-cin + fmt.Printf("go func: received in=%v\n", in) + out := f(in, yield) + running = false + cout <- out + }() + return resume +} diff --git a/coro/coro_test.go b/coro/coro_test.go new file mode 100644 index 0000000..e576087 --- /dev/null +++ b/coro/coro_test.go @@ -0,0 +1,80 @@ +package coro + +import ( + "fmt" + "testing" + + "github.com/Jimeux/iter/list" +) + +func TestCoro(t *testing.T) { + t.Run("New", func(t *testing.T) { + // since resume can only run when the calling goroutine is blocked, + // and vice versa, sharing the running variable is not a race. + // The two are synchronizing by taking turns executing. + // If resume is called after the coroutine has exited, + // resume returns a zero value and false. + resume := New(func(_ int, yield func(string) int) string { + // resume can be called once for each yield, plus once for the final return value + yield("hello") + yield("world") + return "done" + }) + for i := 0; i < 4; i++ { + s, ok := resume(0) + fmt.Printf("%q %v\n", s, ok) + } + }) + + t.Run("iterator with New", func(t *testing.T) { + l := list.New[int]() + for i := range 2 { + l.Push(i + 1) + } + + push := func(more bool, yield func(int) bool) int { + fmt.Println("call push (aka f)") + if more { + l.All()(yield) + } + var zero int + return zero + } + resume := New(push) + for { + fmt.Println("call resume") + v, ok := resume(true) + fmt.Printf("ok: %t\n", ok) + if !ok { + fmt.Printf("ok: %t\n", ok) + break + } + fmt.Printf("v=%d | ok=%t\n", v, ok) + } + }) + + t.Run("Pull", func(t *testing.T) { + l1 := list.New[int]() + for i := range 2 { + l1.Push(i) + } + l2 := list.New[int]() + for i := range 5 { + l2.Push(i * 10) + } + + next1, stop1 := Pull(l1.All()) + next2, stop2 := Pull(l2.All()) + defer stop1() + defer stop2() + for { + v1, ok1 := next1() + v2, ok2 := next2() + if !ok1 || !ok2 { + fmt.Printf("ok1 || ok2: %t, %t\n", ok1, ok2) + break + } + fmt.Printf("v1=%d, v2=%d | ok1=%t, ok2=%t\n", v1, v2, ok1, ok2) + } + }) +}