Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.17.0-alpine3.14
FROM golang:1.21-alpine3.20
WORKDIR pubsub
RUN apk --no-cache update && \
apk --no-cache upgrade && \
Expand All @@ -8,5 +8,6 @@ RUN apk --no-cache update && \
gcc
COPY . .
RUN touch .env && chmod +x ./run-integration.sh
ENV INTEGRATION=true
#ENTRYPOINT ["tail", "-f", "/dev/null"]
ENTRYPOINT ["./run-integration.sh"]
18 changes: 13 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ LOG_DIRECTORY?=/aci/logs/localrun_$(shell date -u +"%FT%H%M%S%Z")
ACI_CONTAINER_NAME?=tester
export

.PHONY: check-suite
check-suite:
ifeq ($(SUITE),)
$(eval SUITE = TestSuite)
@echo "SUITE is not set. Setting SUITE to: $(SUITE)"
else
@echo "running suite $(SUITE)"
endif

.PHONY: test-setup
test-setup:
scripts/test-setup.sh "${ENVFILE}"
Expand All @@ -29,19 +38,18 @@ build-test-image:
echo "envfile : ${ENVFILE}"
echo "REGISTRY : ${REGISTRY}"

docker build -t ${IMAGE} .
docker build -t ${IMAGE} . --platform linux/amd64

push-test-image:
@docker login -u ${REGISTRY_USER} -p ${REGISTRY_PASSWORD} ${REGISTRY}
docker push ${IMAGE}


test-aci: clean-aci scripts/containergroup.yaml
containerId=$$(az container create --file scripts/containergroup.yaml \
--name "${ACI_CONTAINER_NAME}" \
--resource-group ${TEST_RESOURCE_GROUP} \
--subscription ${AZURE_SUBSCRIPTION_ID} \
--environment-variables SUITE=${SUITE} \
--environment-variables "SUITE"="${SUITE}" \
--verbose \
--query id -o tsv); \
./scripts/wait-aci.sh $${containerId}
Expand All @@ -53,7 +61,7 @@ shell-aci: clean-aci
--command-line "/bin/bash"; \
az container attach --name "pubsubtester" --resource-group "${TEST_RESOURCE_GROUP}"

scripts/containergroup.yaml:
scripts/containergroup.yaml: check-suite
envsubst < scripts/containergroup.template.yaml > scripts/containergroup.yaml

clean-aci:
Expand All @@ -69,7 +77,7 @@ integration-compose: build-test-image
@docker-compose --env-file "${ENVFILE}" up

integration-local:
LOG_DIRECTORY=. ./run-integration.sh TestConnectionString/TestCreate*
LOG_DIRECTORY=. ./run-integration.sh

integration-pipeline: scripts/containergroup.yaml
SUITE=$$(echo "${SUITE}" | tr '[:upper:]' '[:lower:]') \
Expand Down
75 changes: 68 additions & 7 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ func (s *SBSuite) TestPublishAndListen_ConcurrentLockRenewal() {
t := s.T()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
topicName := s.ApplyPrefix("default-topic")
topicName := s.ApplyPrefix("lock-renewal-topic")
subscriptionName := "sub"
s.EnsureTopic(ctx, t, topicName)
s.EnsureTopicSubscription(ctx, t, topicName, subscriptionName)
s.EnsureTopic(ctx, t, s.sbAdminClient, topicName)
s.EnsureTopicSubscription(ctx, t, s.sbAdminClient, topicName, subscriptionName)
success := make(chan bool)
sendCount := 25
go func() {
Expand All @@ -29,14 +29,13 @@ func (s *SBSuite) TestPublishAndListen_ConcurrentLockRenewal() {
lockRenewalInterval := 2 * time.Second
p := shuttle.NewProcessor(receiver,
shuttle.NewPanicHandler(nil,
shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval},
shuttle.NewRenewLockHandler(&shuttle.LockRenewalOptions{Interval: &lockRenewalInterval},
shuttle.NewSettlementHandler(nil,
testHandler(t, success, sendCount)))), &shuttle.ProcessorOptions{MaxConcurrency: 25})

t.Logf("start processor...")
err = p.Start(ctx)
t.Logf("processor exited: %s", err)
require.EqualError(t, err, context.DeadlineExceeded.Error())
}()

t.Logf("creating sender...")
Expand All @@ -58,18 +57,80 @@ func (s *SBSuite) TestPublishAndListen_ConcurrentLockRenewal() {
}
}

func (s *SBSuite) TestSenderFailOverAndMultiProcessor() {
t := s.T()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
topicName := s.ApplyPrefix("failover-topic")
subscriptionName := "sub"
s.EnsureTopic(ctx, t, s.sbAdminClient, topicName)
s.EnsureTopic(ctx, t, s.sbFailOverAdminClient, topicName)
s.EnsureTopicSubscription(ctx, t, s.sbAdminClient, topicName, subscriptionName)
s.EnsureTopicSubscription(ctx, t, s.sbFailOverAdminClient, topicName, subscriptionName)
success := make(chan bool)
sendCountPerNamespace := 10

go func() {
t.Logf("creating receiver...")
receiver, err := s.sbClient.NewReceiverForSubscription(topicName, subscriptionName, nil)
require.NoError(t, err)
failoverReceiver, err := s.sbFailOverClient.NewReceiverForSubscription(topicName, subscriptionName, nil)
require.NoError(t, err)
lockRenewalInterval := 2 * time.Second
receivers := []*shuttle.ReceiverEx{
shuttle.NewReceiverEx("primary", receiver),
shuttle.NewReceiverEx("failover", failoverReceiver),
}
p := shuttle.NewMultiProcessor(receivers,
shuttle.NewPanicHandler(nil,
shuttle.NewRenewLockHandler(&shuttle.LockRenewalOptions{Interval: &lockRenewalInterval},
shuttle.NewSettlementHandler(nil,
testHandler(t, success, sendCountPerNamespace*2)))), &shuttle.ProcessorOptions{MaxConcurrency: 20})

t.Logf("start processor...")
err = p.Start(ctx)
t.Logf("processor exited: %s", err)
}()

// Create initial sender and send a batch of messages
sender, err := s.sbClient.NewSender(topicName, nil)
require.NoError(t, err)
shuttleSender := shuttle.NewSender(sender, nil)
for i := 0; i < sendCountPerNamespace; i++ {
err = shuttleSender.SendMessage(ctx, []byte("{'value':'some message before failover'}"))
require.NoError(t, err)
}

// Fail over to a new sender and send another batch of messages
newSender, err := s.sbFailOverClient.NewSender(topicName, nil)
require.NoError(t, err)
shuttleSender.FailOver(newSender)
for i := 0; i < sendCountPerNamespace; i++ {
err = shuttleSender.SendMessage(ctx, []byte("{'value':'some message after failover'}"))
require.NoError(t, err)
}

select {
case ok := <-success:
require.True(t, ok)
case <-ctx.Done():
t.Errorf("did not complete the message in time")
}
}

func testHandler(t *testing.T, success chan bool, expectedCount int) shuttle.Settler {
var count uint32
return func(ctx context.Context, message *azservicebus.ReceivedMessage) shuttle.Settlement {
t.Logf("Processing message.\n Delivery Count: %d\n", message.DeliveryCount)
t.Logf("ID: %s - Locked Until: %s\n", message.MessageID, message.LockedUntil)
t.Logf("sleeping...")
atomic.AddUint32(&count, 1)
time.Sleep(12 * time.Second)
t.Logf("completing message...")
t.Logf("current send count: %d", count)
time.Sleep(10 * time.Second)
if count == uint32(expectedCount) {
success <- true
}
t.Logf("completing message...")
return &shuttle.Complete{}
}
}
46 changes: 24 additions & 22 deletions e2e/go.mod
Original file line number Diff line number Diff line change
@@ -1,43 +1,45 @@
module e2e

go 1.20
go 1.21

toolchain go1.21.6

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.4.0
github.com/Azure/go-shuttle/v2 v2.4.1
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.7.1
github.com/Azure/go-shuttle/v2 v2.6.4-0.20240529082704-b3e1a0e899e4
github.com/joho/godotenv v1.5.1
github.com/opentracing/opentracing-go v1.2.0
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.9.0
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
go.uber.org/multierr v1.11.0
)

require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/Azure/go-amqp v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.7.0 // indirect
github.com/Azure/go-amqp v1.0.5 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/devigned/tab v0.1.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/jstemmer/go-junit-report v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
go.opentelemetry.io/otel/trace v1.27.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading