Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Substrate
Substrate is a simple thin abstraction for message publishing and consumption. It presents a simple API set for durable, at-least-once message publishing and subscription, on a number of backend message broker types.

The API is not yet stable.
This is especially true for everything in the experimental `x` package.

Current implementations and their status
----------------------------------------
Expand Down
212 changes: 212 additions & 0 deletions x/sync/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package sync

import (
"context"
"log"
"sync"
"time"

"github.com/cenkalti/backoff"
"github.com/uw-labs/substrate"
"github.com/uw-labs/sync/rungroup"
)

// SynchronousMessageSink is an extension of substrate.SynchronousMessageSink that exposes the Run method
type SynchronousMessageSink interface {
substrate.SynchronousMessageSink
Run(ctx context.Context, b backoff.BackOff) error
}

// AsyncMessageSinkFactory is a function that initialises asynchronous message sink
type AsyncMessageSinkFactory func() (substrate.AsyncMessageSink, error)

type synchronousMessageSinkAdapter struct {
mutex sync.Mutex
started bool
ctx context.Context
sink substrate.AsyncMessageSink
sinkFactory AsyncMessageSinkFactory

closeReq chan struct{}
closed chan error

toProduce chan *produceReq
}

// NewSynchronousMessageSink returns a new synchronous message sink, given an
// AsyncMessageSinkFactory. When Close is called on the SynchronousMessageSink, this
// is also propagated to the underlying AsyncMessageSink.
func NewSynchronousMessageSink(sinkFactory AsyncMessageSinkFactory) SynchronousMessageSink {
return &synchronousMessageSinkAdapter{
sinkFactory: sinkFactory,

closeReq: make(chan struct{}),
closed: make(chan error, 1),
toProduce: make(chan *produceReq),
}
}

type produceReq struct {
m substrate.Message
done chan error
ctx context.Context
}

// Run starts the asynchronous backend and blocks until it stops.
func (spa *synchronousMessageSinkAdapter) Run(ctx context.Context, b backoff.BackOff) (outErr error) {
spa.mutex.Lock()
if spa.started {
spa.mutex.Unlock()
return ErrAlreadyStarted
}

sink, err := spa.sinkFactory()
if err != nil {
spa.mutex.Unlock()
return err
}
spa.ctx = ctx
spa.sink = sink
spa.started = true
spa.mutex.Unlock()

if b != nil {
outErr = backoff.RetryNotify(spa.runPipeline, backoff.WithContext(b, ctx), spa.reconnect)
} else {
outErr = spa.runPipeline()
}

if outErr == context.Canceled {
outErr = nil
}
spa.closed <- spa.sink.Close()
close(spa.closed)

return outErr
}

func (spa *synchronousMessageSinkAdapter) reconnect(err error, _ time.Duration) {
spa.mutex.Lock()
defer spa.mutex.Unlock()

log.Printf("syncronous sink failed with: %s", err)
if err := spa.sink.Close(); err != nil {
log.Printf("closing the sink failed with: %s", err)
}
sink, err := spa.sinkFactory()
if err != nil {
spa.sink = nil
log.Printf("failed to connect to the sink: %s", err)
}
spa.sink = sink
}

func (spa *synchronousMessageSinkAdapter) passMessages(ctx context.Context, toSend chan<- substrate.Message, acks <-chan substrate.Message) error {
var needAcks []*produceReq
defer func() {
// Send error to all waiting publish requests before shutting down
for _, req := range needAcks {
select {
case <-req.ctx.Done():
case req.done <- ErrSinkClosedOrFailedDuringSend:
}
}
}()

for {
select {
case <-ctx.Done():
return nil
case pr := <-spa.toProduce:
needAcks = append(needAcks, pr)
select {
case <-ctx.Done():
return nil
case toSend <- pr.m:
case <-spa.closeReq:
return nil
}
case ack := <-acks:
if needAcks[0].m != ack {
panic("bug")
}
close(needAcks[0].done)
needAcks = needAcks[1:]
case <-spa.closeReq:
return nil
}
}
}

func (spa *synchronousMessageSinkAdapter) runPipeline() error {
if spa.sink == nil {
// Return error so we can try connecting again
return ErrNotConnected
}

toSend := make(chan substrate.Message)
acks := make(chan substrate.Message)

rg, ctx := rungroup.New(spa.ctx)

// no need to lock the sink in the following functions as reconnect is only called after this function terminates
// and this function starts only after Run was called
rg.Go(func() error {
return checkBackend(ctx, spa.sink)
})
rg.Go(func() error {
return spa.sink.PublishMessages(ctx, acks, toSend)
})
rg.Go(func() error {
return spa.passMessages(ctx, toSend, acks)
})

return rg.Wait()
}

func (spa *synchronousMessageSinkAdapter) PublishMessage(ctx context.Context, m substrate.Message) error {
pr := &produceReq{m, make(chan error), ctx}

select {
case spa.toProduce <- pr:
select {
case err := <-pr.done:
return err
case <-ctx.Done():
return ctx.Err()
}
case <-spa.closed:
return ErrSinkAlreadyClosed
case <-ctx.Done():
return ctx.Err()
}

}

func (spa *synchronousMessageSinkAdapter) Close() error {
select {
case err, ok := <-spa.closed:
if ok {
return err
}
return ErrSinkAlreadyClosed
case spa.closeReq <- struct{}{}:
// Check if channel is still open in case Close
// is called concurrently more than once
err, ok := <-spa.closed
if ok {
return err
}
return ErrSinkAlreadyClosed
}
}

func (spa *synchronousMessageSinkAdapter) Status() (*substrate.Status, error) {
spa.mutex.Lock()
defer spa.mutex.Unlock()
if spa.sink == nil {
return nil, ErrNotConnected
}

return spa.sink.Status()
}
112 changes: 112 additions & 0 deletions x/sync/sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package sync

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/uw-labs/substrate"
)

type message []byte

func (m *message) Data() []byte {
return []byte(*m)
}

func TestSyncProduceAdapterBasic(t *testing.T) {
assert := require.New(t)

sc := NewSynchronousMessageSink(newMockAsynSinkFactory(5))
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
assert.NoError(sc.Run(ctx, nil))
}()

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

m1, m2, m3, m4, m5 := message([]byte{'a'}), message([]byte{'b'}), message([]byte{'c'}), message([]byte{'d'}), message([]byte{'e'})
assert.NoError(sc.PublishMessage(ctx, &m1))
assert.NoError(sc.PublishMessage(ctx, &m2))
assert.NoError(sc.PublishMessage(ctx, &m3))
assert.NoError(sc.PublishMessage(ctx, &m4))
assert.NoError(sc.PublishMessage(ctx, &m5))

assert.NoError(sc.Close())

select {
case <-sc.(*synchronousMessageSinkAdapter).closed:
default:
t.Error("underlying async sink didn't get closed")
}

assert.Equal(ErrSinkAlreadyClosed, sc.Close())
assert.Equal(ErrSinkAlreadyClosed, sc.PublishMessage(ctx, &m1))
}

func TestSyncProduceAdapter_ErrorOnSend(t *testing.T) {
assert := require.New(t)
sc := NewSynchronousMessageSink(newMockAsynSinkFactory(0))

go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
assert.Equal(errSeenAllMessages, sc.Run(ctx, nil))
}()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

msg := message([]byte{'t'})
assert.Equal(ErrSinkClosedOrFailedDuringSend, sc.PublishMessage(ctx, &msg))

assert.Equal(nil, sc.Close())
assert.Equal(ErrSinkAlreadyClosed, sc.Close())
}

var errSeenAllMessages = errors.New("mock sink saw specified number of messages")

type mockAsyncSink struct {
toAckCount int
closed chan struct{}
}

func newMockAsynSinkFactory(toAckCount int) AsyncMessageSinkFactory {
return func() (substrate.AsyncMessageSink, error) {
return &mockAsyncSink{toAckCount, make(chan struct{}, 1)}, nil
}
}

func (mock *mockAsyncSink) PublishMessages(ctx context.Context, acks chan<- substrate.Message, messages <-chan substrate.Message) error {
for {

select {
case <-ctx.Done():
return ctx.Err()
case m := <-messages:
if mock.toAckCount > 0 {
select {
case <-ctx.Done():
return nil
case acks <- m:
}
mock.toAckCount--
} else {
return errSeenAllMessages
}
}
}
}

func (mock *mockAsyncSink) Close() error {
close(mock.closed)
return nil
}

func (mock *mockAsyncSink) Status() (*substrate.Status, error) {
return &substrate.Status{Working: true}, nil
}
45 changes: 45 additions & 0 deletions x/sync/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package sync

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/uw-labs/substrate"
)

var (
// ErrSinkAlreadyClosed is an error returned when user tries to publish a message or
// close a sink after it was already closed.
ErrSinkAlreadyClosed = errors.New("sink was closed already")
// ErrSinkClosedOrFailedDuringSend is an error returned when a sink is closed or fails while sending a message.
ErrSinkClosedOrFailedDuringSend = errors.New("sink was closed or failed while sending the message")

// ErrDisconnected is an error signifying that the backend of a synchronous adapter was disconnected
ErrDisconnected = errors.New("async backend was disconnected")
// ErrNotConnected is an error signifying that synchronous adapter is not connected to any backend
ErrNotConnected = errors.New("not connected to any async backend")
// ErrAlreadyStarted is an error signifying that synchronous adapter has already started
ErrAlreadyStarted = errors.New("sync adapter is already running")
)

// checkBackend periodically checks status of the provided statuser
func checkBackend(ctx context.Context, statuser substrate.Statuser) error {
ticker := time.NewTicker(time.Second * 20) // TODO: maybe make this configurable
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
s, err := statuser.Status()
if err != nil {
return err
}
if !s.Working {
return ErrDisconnected
}
}
}
}