Skip to content

Commit 6fcb955

Browse files
DeRaukGitHub Enterprise
authored andcommitted
Merge pull request coinbase#51 from francis-li/francis/go-client-compatibility
[TFRS-3063] go client compatibility
2 parents f50a876 + a88a180 commit 6fcb955

11 files changed

Lines changed: 190 additions & 16 deletions

File tree

.circleci/config.yml

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ orbs:
55
jobs:
66
test_gem:
77
docker:
8-
- image: circleci/ruby:2.6.3-stretch-node
8+
- image: cimg/ruby:2.7.6
99
executor: ruby/default
1010
steps:
1111
- checkout
@@ -16,12 +16,49 @@ jobs:
1616

1717
test_examples:
1818
docker:
19-
- image: circleci/ruby:2.6.3-stretch-node
19+
- image: cimg/ruby:2.7.6
20+
- image: circleci/postgres:alpine
21+
name: postgres
22+
environment:
23+
POSTGRES_PASSWORD: cadence
24+
- image: ubercadence/server:master-auto-setup
25+
name: cadence
26+
environment:
27+
DB: postgres
28+
DB_PORT: 5432
29+
POSTGRES_USER: postgres
30+
POSTGRES_PWD: cadence
31+
POSTGRES_SEEDS: postgres
32+
DYNAMIC_CONFIG_FILE_PATH: config/dynamicconfig/development.yaml
33+
- image: antstorm/cadence-tchannel-proxy:latest
34+
name: proxy
35+
environment:
36+
BIND_ADDRESS: 0.0.0.0:6666
37+
CADENCE_ADDRESS: cadence:7933
38+
39+
environment:
40+
CADENCE_HOST: proxy
41+
CADENCE_PORT: 6666
42+
2043
steps:
2144
- checkout
2245
- run:
2346
name: Bundle Install
2447
command: cd examples && bundle install --path vendor/bundle
48+
49+
- run:
50+
name: Register Domain
51+
command: cd examples && bin/register_domain ruby-samples
52+
53+
- run:
54+
name: Wait for Domain to settle
55+
command: sleep 15
56+
57+
- run:
58+
name: Boot up worker
59+
command: cd examples && bin/worker
60+
background: true
61+
2562
- run:
2663
name: Run RSpec
2764
command: cd examples && bundle exec rspec
@@ -31,4 +68,4 @@ workflows:
3168
test:
3269
jobs:
3370
- test_gem
34-
- test_examples
71+
- test_examples

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
# Changelog
22

3-
## 0.1.14
3+
## 0.1.16
4+
- Add activity task deserialization logic that is compatible with official go implementation
5+
6+
## 0.1.15
47
- Add rbi definition for Cadence::Crew
58

69
## 0.1.14

examples/Gemfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@ source 'https://rubygems.org'
33
gem 'cadence-ruby', path: '../'
44

55
gem 'dry-struct', '~> 1.1.1'
6-
gem 'dry-types', '~> 1.2.0'
6+
gem 'dry-types', '~> 1.6.0'
77
gem 'rspec', group: :test

examples/bin/crew

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#!/usr/bin/env ruby
2+
require_relative '../init'
3+
4+
require 'cadence/crew'
5+
require 'cadence/worker'
6+
7+
init = proc do |worker|
8+
Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f }
9+
Dir[File.expand_path('../activities/*.rb', __dir__)].each { |f| require f }
10+
Dir[File.expand_path('../middleware/*.rb', __dir__)].each { |f| require f }
11+
12+
worker.register_workflow(AsyncActivityWorkflow)
13+
worker.register_workflow(AsyncHelloWorldWorkflow)
14+
worker.register_workflow(AsyncTimerWorkflow)
15+
worker.register_workflow(BranchingWorkflow)
16+
worker.register_workflow(CancellingTimerWorkflow)
17+
worker.register_workflow(CheckWorkflow)
18+
worker.register_workflow(FailingWorkflow)
19+
worker.register_workflow(HelloWorldWorkflow)
20+
worker.register_workflow(LocalHelloWorldWorkflow)
21+
worker.register_workflow(LongWorkflow)
22+
worker.register_workflow(ParentWorkflow)
23+
worker.register_workflow(ProcessFileWorkflow)
24+
worker.register_workflow(ReleaseWorkflow)
25+
worker.register_workflow(SerialHelloWorldWorkflow)
26+
worker.register_workflow(SideEffectWorkflow)
27+
worker.register_workflow(SimpleTimerWorkflow)
28+
worker.register_workflow(TimeoutWorkflow)
29+
worker.register_workflow(TripBookingWorkflow)
30+
31+
worker.register_activity(AsyncActivity)
32+
worker.register_activity(EchoActivity)
33+
worker.register_activity(GenerateFileActivity)
34+
worker.register_activity(HelloWorldActivity)
35+
worker.register_activity(LongRunningActivity)
36+
worker.register_activity(ProcessFileActivity)
37+
worker.register_activity(RandomlyFailingActivity)
38+
worker.register_activity(RandomNumberActivity)
39+
worker.register_activity(SleepActivity)
40+
worker.register_activity(UploadFileActivity)
41+
worker.register_activity(Trip::BookFlightActivity)
42+
worker.register_activity(Trip::BookHotelActivity)
43+
worker.register_activity(Trip::CancelCarActivity)
44+
worker.register_activity(Trip::CancelFlightActivity)
45+
worker.register_activity(Trip::CancelHotelActivity)
46+
worker.register_activity(Trip::MakePaymentActivity)
47+
worker.register_activity(Trip::RentCarActivity)
48+
49+
worker.add_decision_middleware(LoggingMiddleware, 'EXAMPLE')
50+
worker.add_activity_middleware(LoggingMiddleware, 'EXAMPLE')
51+
end
52+
53+
worker = Cadence::Worker.new(polling_ttl: 10, thread_pool_size: 20)
54+
crew = Cadence::Crew.new(worker, 2)
55+
crew.after_fork &init
56+
crew.dispatch

examples/bin/worker

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,4 @@ worker.register_activity(Trip::RentCarActivity)
5050
worker.add_decision_middleware(LoggingMiddleware, 'EXAMPLE')
5151
worker.add_activity_middleware(LoggingMiddleware, 'EXAMPLE')
5252

53-
option1 = ARGV[0]
54-
if option1 == '--crew'
55-
crew = Cadence::Crew.new(worker, 2)
56-
crew.dispatch
57-
else
58-
worker.start
59-
end
53+
worker.start

lib/cadence/activity/task_processor.rb

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def process
3232
context = Activity::Context.new(connection, metadata)
3333

3434
result = middleware_chain.invoke(metadata) do
35-
activity_class.execute_in_context(context, JSON.deserialize(task.input))
35+
activity_class.execute_in_context(context, deserialize(task.input))
3636
end
3737

3838
# Do not complete asynchronous activities, these should be completed manually
@@ -79,6 +79,25 @@ def respond_failed(reason, details)
7979
Cadence.logger.error("Unable to fail Activity #{activity_name}: #{error.inspect}")
8080
Cadence::ErrorHandler.handle(error, metadata: metadata)
8181
end
82+
83+
def deserialize(input)
84+
JSON.deserialize(task.input)
85+
rescue Oj::ParseError
86+
# cadence official go-client serializes / deserializes input in a different format than this ruby client
87+
# adding additional deserialization logic here to help read input that is passed from go-client
88+
# https://github.com/uber-go/cadence-client/blob/0.18.x/internal/encoding.go#L45-L58
89+
#
90+
# this ruby client serializes / deserializes everything as one big string like below:
91+
# [1012474654, "second input"]
92+
#
93+
# while go client serializes input as separate input followed by line break
94+
# 1012474654
95+
# second input
96+
args = input.split(/\n/)
97+
res = args.map do |arg|
98+
JSON.deserialize(arg)
99+
end
100+
end
82101
end
83102
end
84103
end

lib/cadence/crew.rb

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,23 @@ def stop(signal)
3131
crew.each { |pid| stop_worker(signal, pid) }
3232
end
3333

34+
def after_fork(&block)
35+
raise 'after_fork can only be called before dispatching the worker' if crew.length.positive?
36+
37+
@after_fork_block = block
38+
end
39+
3440
private
3541

42+
attr_accessor :after_fork_block
3643
attr_reader :worker, :crew, :logger
3744

3845
def dispatch_worker
39-
pid = fork { worker.start }
46+
pid = fork do
47+
after_fork_block&.call(worker)
48+
worker.start
49+
end
50+
4051
crew << pid
4152

4253
Cadence.metrics.gauge("crew.num_workers", crew.size)
@@ -49,7 +60,7 @@ def monitor
4960
while crew.length.positive?
5061
(pid, status) = ::Process.waitpid2(-1)
5162
crew.delete(pid)
52-
63+
5364
Cadence.metrics.gauge("crew.num_workers", crew.size)
5465
logger.info "Worker quit. pid: #{pid.to_s}, exitstatus: #{status.exitstatus}, remaining_workers: #{crew.length}"
5566
end

lib/cadence/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module Cadence
2-
VERSION = '0.1.15'.freeze
2+
VERSION = '0.1.16'.freeze
33
end

rbi/cadence-ruby.rbi

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ class Cadence::Worker
140140
end
141141

142142
class Cadence::Crew
143+
def after_fork(block); end
144+
143145
def dispatch; end
144146

145147
def stop(signal); end

spec/unit/lib/cadence/activity/task_processor_spec.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
let(:middleware_chain) { Cadence::Middleware::Chain.new }
1717
let(:config) { Cadence::Configuration.new }
1818
let(:input) { ['arg1', 'arg2'] }
19+
let(:go_client_input) { "123\n\"abc\"" }
1920

2021
describe '#process' do
2122
let(:context) { instance_double('Cadence::Activity::Context', async?: false) }
@@ -40,6 +41,21 @@
4041
allow(Cadence.metrics).to receive(:timing)
4142
end
4243

44+
context 'when input comes from go client' do
45+
let(:activity_class) { double('Cadence::Activity', execute_in_context: nil) }
46+
let(:task) { Fabricate(:activity_task_thrift, activity_name: activity_name, input: go_client_input) }
47+
48+
before do
49+
allow(lookup).to receive(:find).with(activity_name).and_return(activity_class)
50+
end
51+
52+
it 'successfully deserializes the input and ran the activity' do
53+
subject.process
54+
55+
expect(activity_class).to have_received(:execute_in_context).with(context, [123, 'abc'])
56+
end
57+
end
58+
4359
context 'when activity is not registered' do
4460
it 'fails the activity task' do
4561
subject.process

0 commit comments

Comments
 (0)