diff --git a/README.md b/README.md index 1b5edc2..c8bb39a 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ Substrate Substrate is a simple thin abstraction for message publishing and consumption. It presents a simple API set for durable, at-least-once message publishing and subscription, on a number of backend message broker types. The API is not yet stable. +This is especially true for everything in the experimental `x` package. Current implementations and their status ---------------------------------------- diff --git a/x/sync/sink.go b/x/sync/sink.go new file mode 100644 index 0000000..fec756e --- /dev/null +++ b/x/sync/sink.go @@ -0,0 +1,212 @@ +package sync + +import ( + "context" + "log" + "sync" + "time" + + "github.com/cenkalti/backoff" + "github.com/uw-labs/substrate" + "github.com/uw-labs/sync/rungroup" +) + +// SynchronousMessageSink is an extension of substrate.SynchronousMessageSink that exposes the Run method +type SynchronousMessageSink interface { + substrate.SynchronousMessageSink + Run(ctx context.Context, b backoff.BackOff) error +} + +// AsyncMessageSinkFactory is a function that initialises asynchronous message sink +type AsyncMessageSinkFactory func() (substrate.AsyncMessageSink, error) + +type synchronousMessageSinkAdapter struct { + mutex sync.Mutex + started bool + ctx context.Context + sink substrate.AsyncMessageSink + sinkFactory AsyncMessageSinkFactory + + closeReq chan struct{} + closed chan error + + toProduce chan *produceReq +} + +// NewSynchronousMessageSink returns a new synchronous message sink, given an +// AsyncMessageSinkFactory. When Close is called on the SynchronousMessageSink, this +// is also propagated to the underlying AsyncMessageSink. +func NewSynchronousMessageSink(sinkFactory AsyncMessageSinkFactory) SynchronousMessageSink { + return &synchronousMessageSinkAdapter{ + sinkFactory: sinkFactory, + + closeReq: make(chan struct{}), + closed: make(chan error, 1), + toProduce: make(chan *produceReq), + } +} + +type produceReq struct { + m substrate.Message + done chan error + ctx context.Context +} + +// Run starts the asynchronous backend and blocks until it stops. +func (spa *synchronousMessageSinkAdapter) Run(ctx context.Context, b backoff.BackOff) (outErr error) { + spa.mutex.Lock() + if spa.started { + spa.mutex.Unlock() + return ErrAlreadyStarted + } + + sink, err := spa.sinkFactory() + if err != nil { + spa.mutex.Unlock() + return err + } + spa.ctx = ctx + spa.sink = sink + spa.started = true + spa.mutex.Unlock() + + if b != nil { + outErr = backoff.RetryNotify(spa.runPipeline, backoff.WithContext(b, ctx), spa.reconnect) + } else { + outErr = spa.runPipeline() + } + + if outErr == context.Canceled { + outErr = nil + } + spa.closed <- spa.sink.Close() + close(spa.closed) + + return outErr +} + +func (spa *synchronousMessageSinkAdapter) reconnect(err error, _ time.Duration) { + spa.mutex.Lock() + defer spa.mutex.Unlock() + + log.Printf("syncronous sink failed with: %s", err) + if err := spa.sink.Close(); err != nil { + log.Printf("closing the sink failed with: %s", err) + } + sink, err := spa.sinkFactory() + if err != nil { + spa.sink = nil + log.Printf("failed to connect to the sink: %s", err) + } + spa.sink = sink +} + +func (spa *synchronousMessageSinkAdapter) passMessages(ctx context.Context, toSend chan<- substrate.Message, acks <-chan substrate.Message) error { + var needAcks []*produceReq + defer func() { + // Send error to all waiting publish requests before shutting down + for _, req := range needAcks { + select { + case <-req.ctx.Done(): + case req.done <- ErrSinkClosedOrFailedDuringSend: + } + } + }() + + for { + select { + case <-ctx.Done(): + return nil + case pr := <-spa.toProduce: + needAcks = append(needAcks, pr) + select { + case <-ctx.Done(): + return nil + case toSend <- pr.m: + case <-spa.closeReq: + return nil + } + case ack := <-acks: + if needAcks[0].m != ack { + panic("bug") + } + close(needAcks[0].done) + needAcks = needAcks[1:] + case <-spa.closeReq: + return nil + } + } +} + +func (spa *synchronousMessageSinkAdapter) runPipeline() error { + if spa.sink == nil { + // Return error so we can try connecting again + return ErrNotConnected + } + + toSend := make(chan substrate.Message) + acks := make(chan substrate.Message) + + rg, ctx := rungroup.New(spa.ctx) + + // no need to lock the sink in the following functions as reconnect is only called after this function terminates + // and this function starts only after Run was called + rg.Go(func() error { + return checkBackend(ctx, spa.sink) + }) + rg.Go(func() error { + return spa.sink.PublishMessages(ctx, acks, toSend) + }) + rg.Go(func() error { + return spa.passMessages(ctx, toSend, acks) + }) + + return rg.Wait() +} + +func (spa *synchronousMessageSinkAdapter) PublishMessage(ctx context.Context, m substrate.Message) error { + pr := &produceReq{m, make(chan error), ctx} + + select { + case spa.toProduce <- pr: + select { + case err := <-pr.done: + return err + case <-ctx.Done(): + return ctx.Err() + } + case <-spa.closed: + return ErrSinkAlreadyClosed + case <-ctx.Done(): + return ctx.Err() + } + +} + +func (spa *synchronousMessageSinkAdapter) Close() error { + select { + case err, ok := <-spa.closed: + if ok { + return err + } + return ErrSinkAlreadyClosed + case spa.closeReq <- struct{}{}: + // Check if channel is still open in case Close + // is called concurrently more than once + err, ok := <-spa.closed + if ok { + return err + } + return ErrSinkAlreadyClosed + } +} + +func (spa *synchronousMessageSinkAdapter) Status() (*substrate.Status, error) { + spa.mutex.Lock() + defer spa.mutex.Unlock() + if spa.sink == nil { + return nil, ErrNotConnected + } + + return spa.sink.Status() +} diff --git a/x/sync/sink_test.go b/x/sync/sink_test.go new file mode 100644 index 0000000..a10dc77 --- /dev/null +++ b/x/sync/sink_test.go @@ -0,0 +1,112 @@ +package sync + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/uw-labs/substrate" +) + +type message []byte + +func (m *message) Data() []byte { + return []byte(*m) +} + +func TestSyncProduceAdapterBasic(t *testing.T) { + assert := require.New(t) + + sc := NewSynchronousMessageSink(newMockAsynSinkFactory(5)) + go func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + assert.NoError(sc.Run(ctx, nil)) + }() + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + m1, m2, m3, m4, m5 := message([]byte{'a'}), message([]byte{'b'}), message([]byte{'c'}), message([]byte{'d'}), message([]byte{'e'}) + assert.NoError(sc.PublishMessage(ctx, &m1)) + assert.NoError(sc.PublishMessage(ctx, &m2)) + assert.NoError(sc.PublishMessage(ctx, &m3)) + assert.NoError(sc.PublishMessage(ctx, &m4)) + assert.NoError(sc.PublishMessage(ctx, &m5)) + + assert.NoError(sc.Close()) + + select { + case <-sc.(*synchronousMessageSinkAdapter).closed: + default: + t.Error("underlying async sink didn't get closed") + } + + assert.Equal(ErrSinkAlreadyClosed, sc.Close()) + assert.Equal(ErrSinkAlreadyClosed, sc.PublishMessage(ctx, &m1)) +} + +func TestSyncProduceAdapter_ErrorOnSend(t *testing.T) { + assert := require.New(t) + sc := NewSynchronousMessageSink(newMockAsynSinkFactory(0)) + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + assert.Equal(errSeenAllMessages, sc.Run(ctx, nil)) + }() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + msg := message([]byte{'t'}) + assert.Equal(ErrSinkClosedOrFailedDuringSend, sc.PublishMessage(ctx, &msg)) + + assert.Equal(nil, sc.Close()) + assert.Equal(ErrSinkAlreadyClosed, sc.Close()) +} + +var errSeenAllMessages = errors.New("mock sink saw specified number of messages") + +type mockAsyncSink struct { + toAckCount int + closed chan struct{} +} + +func newMockAsynSinkFactory(toAckCount int) AsyncMessageSinkFactory { + return func() (substrate.AsyncMessageSink, error) { + return &mockAsyncSink{toAckCount, make(chan struct{}, 1)}, nil + } +} + +func (mock *mockAsyncSink) PublishMessages(ctx context.Context, acks chan<- substrate.Message, messages <-chan substrate.Message) error { + for { + + select { + case <-ctx.Done(): + return ctx.Err() + case m := <-messages: + if mock.toAckCount > 0 { + select { + case <-ctx.Done(): + return nil + case acks <- m: + } + mock.toAckCount-- + } else { + return errSeenAllMessages + } + } + } +} + +func (mock *mockAsyncSink) Close() error { + close(mock.closed) + return nil +} + +func (mock *mockAsyncSink) Status() (*substrate.Status, error) { + return &substrate.Status{Working: true}, nil +} diff --git a/x/sync/status.go b/x/sync/status.go new file mode 100644 index 0000000..82fedbf --- /dev/null +++ b/x/sync/status.go @@ -0,0 +1,45 @@ +package sync + +import ( + "context" + "time" + + "github.com/pkg/errors" + "github.com/uw-labs/substrate" +) + +var ( + // ErrSinkAlreadyClosed is an error returned when user tries to publish a message or + // close a sink after it was already closed. + ErrSinkAlreadyClosed = errors.New("sink was closed already") + // ErrSinkClosedOrFailedDuringSend is an error returned when a sink is closed or fails while sending a message. + ErrSinkClosedOrFailedDuringSend = errors.New("sink was closed or failed while sending the message") + + // ErrDisconnected is an error signifying that the backend of a synchronous adapter was disconnected + ErrDisconnected = errors.New("async backend was disconnected") + // ErrNotConnected is an error signifying that synchronous adapter is not connected to any backend + ErrNotConnected = errors.New("not connected to any async backend") + // ErrAlreadyStarted is an error signifying that synchronous adapter has already started + ErrAlreadyStarted = errors.New("sync adapter is already running") +) + +// checkBackend periodically checks status of the provided statuser +func checkBackend(ctx context.Context, statuser substrate.Statuser) error { + ticker := time.NewTicker(time.Second * 20) // TODO: maybe make this configurable + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + s, err := statuser.Status() + if err != nil { + return err + } + if !s.Working { + return ErrDisconnected + } + } + } +}