diff --git a/CHANGELOG.md b/CHANGELOG.md index ec160cb..1226974 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All notable changes to this project will be documented in this file. +## [6.10.0] - 2025-11-21 +### Fixed +- Add on_first_sync callback + ## [6.9.3] - 2025-10-09 ### Fixed - Add validate_types to model interface diff --git a/Gemfile.lock b/Gemfile.lock index d19cb11..daafac9 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - table_sync (6.9.3) + table_sync (6.10.0) memery rabbit_messaging (>= 1.7.0) rails @@ -90,12 +90,12 @@ GEM bunny (2.24.0) amq-protocol (~> 2.3) sorted_set (~> 1, >= 1.0.2) - cgi (0.4.2) + cgi (0.5.0) coderay (1.1.3) concurrent-ruby (1.3.5) connection_pool (2.5.3) crass (1.0.6) - date (3.4.1) + date (3.5.0) diff-lcs (1.6.2) docile (1.4.1) drb (2.2.3) @@ -106,8 +106,8 @@ GEM activesupport (>= 6.1) i18n (1.14.7) concurrent-ruby (~> 1.0) - io-console (0.8.0) - irb (1.15.2) + io-console (0.8.1) + irb (1.15.3) pp (>= 0.6.0) rdoc (>= 4.0.0) reline (>= 0.4.2) @@ -124,18 +124,19 @@ GEM loofah (2.24.1) crass (~> 1.0.2) nokogiri (>= 1.12.0) - mail (2.8.1) + mail (2.9.0) + logger mini_mime (>= 0.1.1) net-imap net-pop net-smtp - marcel (1.0.4) + marcel (1.1.0) memery (1.7.0) method_source (1.1.0) mini_mime (1.1.5) mini_portile2 (2.8.9) minitest (5.25.5) - net-imap (0.5.8) + net-imap (0.5.12) date net-protocol net-pop (0.1.2) @@ -170,7 +171,7 @@ GEM ast (~> 2.4.1) racc pg (1.5.9) - pp (0.6.2) + pp (0.6.3) prettyprint prettyprint (0.2.0) prism (1.4.0) @@ -224,11 +225,12 @@ GEM rainbow (3.1.1) rake (13.2.1) rbtree (0.4.6) - rdoc (6.14.0) + rdoc (6.15.1) erb psych (>= 4.0.0) + tsort regexp_parser (2.10.0) - reline (0.6.1) + reline (0.6.3) io-console (~> 0.5) rspec (3.13.1) rspec-core (~> 3.13.0) @@ -308,10 +310,11 @@ GEM sorted_set (1.0.3) rbtree set (~> 1.0) - stringio (3.1.7) - thor (1.3.2) + stringio (3.1.8) + thor (1.4.0) timecop (0.9.10) timeout (0.4.3) + tsort (0.2.0) tzinfo (2.0.6) concurrent-ruby (~> 1.0) unicode-display_width (3.1.4) diff --git a/lib/table_sync/receiving.rb b/lib/table_sync/receiving.rb index dda279c..cd632c7 100644 --- a/lib/table_sync/receiving.rb +++ b/lib/table_sync/receiving.rb @@ -6,6 +6,7 @@ module Receiving require_relative "receiving/config_decorator" require_relative "receiving/dsl" require_relative "receiving/handler" + require_relative "receiving/hooks/once" require_relative "receiving/model/active_record" require_relative "receiving/model/sequel" end diff --git a/lib/table_sync/receiving/config.rb b/lib/table_sync/receiving/config.rb index 9909a96..3756e23 100644 --- a/lib/table_sync/receiving/config.rb +++ b/lib/table_sync/receiving/config.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require_relative "hooks/once" + module TableSync::Receiving class Config attr_reader :model, :events @@ -23,13 +25,10 @@ def invalid_events class << self attr_reader :default_values_for_options - # In a configs this options are requested as they are + # In a configs these options are requested as they are # config.option - get value # config.option(args) - set static value # config.option { ... } - set proc as value - # - # In `Receiving::Handler` or `Receiving::EventActions` this options are requested - # through `Receiving::ConfigDecorator#method_missing` which always executes `config.option` def add_option(name, value_setter_wrapper:, value_as_proc_setter_wrapper:, default:) ivar = :"@#{name}" @@ -55,11 +54,30 @@ def add_option(name, value_setter_wrapper:, value_as_proc_setter_wrapper:, defau instance_variable_set(ivar, result_value) end end + + def add_hook_option(name, hook_class:) + ivar = :"@#{name}" + + @default_values_for_options ||= {} + @default_values_for_options[ivar] = proc { [] } + + define_method(name) do |conditions, &handler| + hooks = instance_variable_get(ivar) + hooks ||= [] + + hooks << hook_class.new(conditions:, handler:) + instance_variable_set(ivar, hooks) + end + end end def allow_event?(name) events.include?(name) end + + def option(name) + instance_variable_get(:"@#{name}") + end end end @@ -201,6 +219,11 @@ def allow_event?(name) value_as_proc_setter_wrapper: any_value, default: proc { proc { |&block| block.call } } +TableSync::Receiving::Config.add_hook_option( + :on_first_sync, + hook_class: TableSync::Receiving::Hooks::Once, +) + %i[ before_update after_commit_on_update diff --git a/lib/table_sync/receiving/config_decorator.rb b/lib/table_sync/receiving/config_decorator.rb index a851263..4a4a4f7 100644 --- a/lib/table_sync/receiving/config_decorator.rb +++ b/lib/table_sync/receiving/config_decorator.rb @@ -2,9 +2,6 @@ module TableSync::Receiving class ConfigDecorator - extend Forwardable - - def_delegators :@config, :allow_event? # rubocop:disable Metrics/ParameterLists def initialize(config:, event:, model:, version:, project_id:, raw_data:) @config = config @@ -19,9 +16,17 @@ def initialize(config:, event:, model:, version:, project_id:, raw_data:) end # rubocop:enable Metrics/ParameterLists - def method_missing(name, **additional_params, &) - value = @config.send(name) + def option(name, **additional_params, &) + value = @config.option(name) value.is_a?(Proc) ? value.call(@default_params.merge(additional_params), &) : value end + + def model + @config.model + end + + def allow_event?(name) + @config.allow_event?(name) + end end end diff --git a/lib/table_sync/receiving/handler.rb b/lib/table_sync/receiving/handler.rb index d5019ef..479f1b3 100644 --- a/lib/table_sync/receiving/handler.rb +++ b/lib/table_sync/receiving/handler.rb @@ -22,22 +22,20 @@ def call next if data.empty? - version_key = config.version_key(data:) - data.each { |row| row[version_key] = version } - - target_keys = config.target_keys(data:) + target_keys = config.option(:target_keys, data:) validate_data(data, target_keys:) data.sort_by! { |row| row.values_at(*target_keys).map { |value| sort_key(value) } } + version_key = config.option(:version_key, data:) params = { data:, target_keys:, version_key: } if event == :update - params[:default_values] = config.default_values(data:) + params[:default_values] = config.option(:default_values, data:) end - config.wrap_receiving(event:, **params) do + config.option(:wrap_receiving, **params) do perform(config, params) end end @@ -83,25 +81,28 @@ def configs end def processed_data(config) + version_key = config.option(:version_key, data:) data.filter_map do |row| - next if config.skip(row:) + next if config.option(:skip, row:) row = row.dup - config.mapping_overrides(row:).each do |before, after| + config.option(:mapping_overrides, row:).each do |before, after| row[after] = row.delete(before) end - config.except(row:).each { |x| row.delete(x) } + config.option(:except, row:).each { |x| row.delete(x) } - row.merge!(config.additional_data(row:)) + row.merge!(config.option(:additional_data, row:)) - only = config.only(row:) + only = config.option(:only, row:) row, rest = row.partition { |key, _| key.in?(only) }.map(&:to_h) - rest_key = config.rest_key(row:, rest:) + rest_key = config.option(:rest_key, row:, rest:) (row[rest_key] ||= {}).merge!(rest) if rest_key + row[version_key] = version + row end end @@ -138,16 +139,16 @@ def validate_data_types(model, data) raise TableSync::DataError.new(data, errors.keys, errors.to_json) end - def perform(config, params) + def perform(config, params) # rubocop:disable Metrics/MethodLength model = config.model model.transaction do results = if event == :update - config.before_update(**params) + config.option(:before_update, **params) validate_data_types(model, params[:data]) model.upsert(**params) else - config.before_destroy(**params) + config.option(:before_destroy, **params) model.destroy(**params) end @@ -157,9 +158,15 @@ def perform(config, params) end if event == :update - model.after_commit { config.after_commit_on_update(**params, results:) } + model.after_commit do + config.option(:after_commit_on_update, **params, results:) + + Array(config.option(:on_first_sync)).each do |hook| + hook.perform(config:, targets: results) if hook.enabled? + end + end else - model.after_commit { config.after_commit_on_destroy(**params, results:) } + model.after_commit { config.option(:after_commit_on_destroy, **params, results:) } end end end diff --git a/lib/table_sync/receiving/hooks/once.rb b/lib/table_sync/receiving/hooks/once.rb new file mode 100644 index 0000000..98291bf --- /dev/null +++ b/lib/table_sync/receiving/hooks/once.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +module TableSync::Receiving::Hooks + class Once + LOCK_KEY = "hook-once-lock-key" + + attr_reader :conditions, :handler, :lookup_code + + def initialize(conditions:, handler:) + @conditions = conditions + @handler = handler + init_lookup_code + end + + def enabled? + conditions[:columns].any? + end + + def perform(config:, targets:) + target_keys = config.option(:target_keys) + model = config.model + + targets.each do |target| + next unless conditions?(target) + + keys = target.slice(*target_keys) + model.try_advisory_lock(prepare_lock_key(keys)) do + model.find_and_save(keys:) do |entry| + next unless allow?(entry) + + entry.hooks ||= [] + entry.hooks << lookup_code + model.after_commit { handler.call(entry:) } + end + end + end + end + + private + + def allow?(entry) + Array(entry.hooks).exclude?(lookup_code) + end + + def init_lookup_code + @lookup_code = conditions[:columns].map do |column| + "#{column}-#{conditions[column]}" + end.join(":") + end + + def conditions?(row) + conditions[:columns].all? do |column| + row[column] == (conditions[column] || row[column]) + end + end + + def prepare_lock_key(row_keys) + lock_keys = [LOCK_KEY] + row_keys.values + Zlib.crc32(lock_keys.join(":")) % (2**31) + end + end +end diff --git a/lib/table_sync/receiving/model/active_record.rb b/lib/table_sync/receiving/model/active_record.rb index 504f13e..865487a 100644 --- a/lib/table_sync/receiving/model/active_record.rb +++ b/lib/table_sync/receiving/model/active_record.rb @@ -2,6 +2,13 @@ module TableSync::Receiving::Model class ActiveRecord + ISOLATION_LEVELS = { + uncommitted: :read_uncommitted, + committed: :read_committed, + repeatable: :repeatable_read, + serializable: :serializable, + }.freeze + class AfterCommitWrap def initialize(&block) @callback = block @@ -33,6 +40,10 @@ def initialize(table_name) @schema = model_naming.schema.to_sym end + def isolation_level(lookup_code) + ISOLATION_LEVELS.fetch(lookup_code) + end + def columns raw_model.column_names.map(&:to_sym) end @@ -110,14 +121,30 @@ def validate_types(data) types_validator.validate(data) end - def transaction(&) - ::ActiveRecord::Base.transaction(&) + def transaction(**params, &) + ::ActiveRecord::Base.transaction(**params, &) end def after_commit(&) db.add_transaction_record(AfterCommitWrap.new(&)) end + def try_advisory_lock(lock_key) + transaction do + if db.query_value("SELECT pg_try_advisory_xact_lock(#{lock_key.to_i})") + yield + end + end + end + + def find_and_save(keys:) + entry = raw_model.find_by(keys) + return unless entry + + yield entry + entry.save! + end + private attr_reader :raw_model, :types_validator diff --git a/lib/table_sync/receiving/model/sequel.rb b/lib/table_sync/receiving/model/sequel.rb index cbccc99..1ed6152 100644 --- a/lib/table_sync/receiving/model/sequel.rb +++ b/lib/table_sync/receiving/model/sequel.rb @@ -4,6 +4,13 @@ module TableSync::Receiving::Model class Sequel attr_reader :table, :schema + ISOLATION_LEVELS = { + uncommitted: :uncommitted, + committed: :committed, + repeatable: :repeatable, + serializable: :serializable, + }.freeze + def initialize(table_name) @raw_model = Class.new(::Sequel::Model(table_name)).tap(&:unrestrict_primary_key) @types_validator = TableSync::Utils::Schema::Builder::Sequel.build(@raw_model) @@ -17,6 +24,10 @@ def initialize(table_name) @schema = model_naming.schema.to_sym end + def isolation_level(lookup_code) + ISOLATION_LEVELS.fetch(lookup_code) + end + def columns dataset.columns end @@ -57,14 +68,30 @@ def validate_types(data) types_validator.validate(data) end - def transaction(&) - db.transaction(&) + def transaction(**params, &) + db.transaction(**params, &) end def after_commit(&) db.after_commit(&) end + def try_advisory_lock(lock_key) + transaction do + if db.get(::Sequel.function(:pg_try_advisory_xact_lock, lock_key.to_i)) + yield + end + end + end + + def find_and_save(keys:) + entry = dataset.first(keys) + return unless entry + + yield entry + entry.save_changes + end + private attr_reader :raw_model, :types_validator diff --git a/lib/table_sync/version.rb b/lib/table_sync/version.rb index 59c7102..bee1ab1 100644 --- a/lib/table_sync/version.rb +++ b/lib/table_sync/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module TableSync - VERSION = "6.9.3" + VERSION = "6.10.0" end diff --git a/spec/receiving/config_spec.rb b/spec/receiving/config_spec.rb index 26bd1e1..6481bf5 100644 --- a/spec/receiving/config_spec.rb +++ b/spec/receiving/config_spec.rb @@ -349,6 +349,25 @@ end end + describe "#on_first_sync" do + it "returns correct default value" do + hooks = config.option(:on_first_sync) + expect(hooks).to be_empty + end + + it "processes a value" do + config.on_first_sync(columns: %i[test], test: "value") do |**_| + # Some hook work here + end + + hooks = config.option(:on_first_sync) + expect(hooks).not_to be_nil + expect(hooks.first).to be_enabled + expect(hooks.first.conditions[:columns]).not_to be_empty + expect(hooks.first.lookup_code).to eq("test-value") + end + end + %i[before_update after_commit_on_update before_destroy after_commit_on_destroy].each do |option| describe "##{option}" do it "returns correct default value" do diff --git a/spec/receiving/handler_spec.rb b/spec/receiving/handler_spec.rb index 9832142..cea481d 100644 --- a/spec/receiving/handler_spec.rb +++ b/spec/receiving/handler_spec.rb @@ -171,6 +171,7 @@ def fire_destroy_event online_status: nil, project_id: "PID", rest: nil, + hooks: nil, version: 123.34534, } end @@ -362,6 +363,7 @@ def fire_destroy_event online_status: nil, project_id: "PID", rest: nil, + hooks: nil, version: 123.34534, ) @@ -371,6 +373,7 @@ def fire_destroy_event online_status: nil, project_id: "PID", rest: nil, + hooks: nil, version: 123.34534, ) end @@ -605,6 +608,51 @@ def destroy(data:, target_keys:, version_key:) end end + describe "#on_first_sync" do + let(:callback_flags) { { on_first_sync: [] } } + let(:handler) do + callback_flags_link = callback_flags + handler = Class.new(described_class) + + handler.receive("User", to_table: :players) do + rest_key false + mapping_overrides id: :external_id + + on_first_sync columns: %i[online_status], online_status: true do |entry:| + callback_flags_link[:on_first_sync] << entry.external_id + end + end + + handler + end + let(:update_event) do + OpenStruct.new( + data: { + event: "update", + model: "User", + attributes: { + id: user_id, + email: "mail@example.com", + online_status: true, + }, + version: 123.34534, + }, + project_id: "pid", + ) + end + + it "provides proper event to wrap receiving" do + fire_update_event + expect(DB[:players].count).to eq(1) + expect(callback_flags[:on_first_sync]).to be_one + + update_event.data[:version] = update_event.data[:version] + 1 + fire_update_event + expect(DB[:players].count).to eq(1) + expect(callback_flags[:on_first_sync]).to be_one + end + end + describe "avoid dead locks" do let(:model) do Class.new(TableSync.receiving_model) do diff --git a/spec/receiving/models/active_record_spec.rb b/spec/receiving/models/active_record_spec.rb new file mode 100644 index 0000000..6c2c0a1 --- /dev/null +++ b/spec/receiving/models/active_record_spec.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +describe TableSync::Receiving::Model::ActiveRecord do + subject(:model) { described_class.new(:players) } + + describe "#isolation_level" do + it { expect(model.isolation_level(:uncommitted)).to eq(:read_uncommitted) } + it { expect(model.isolation_level(:committed)).to eq(:read_committed) } + it { expect(model.isolation_level(:repeatable)).to eq(:repeatable_read) } + it { expect(model.isolation_level(:serializable)).to eq(:serializable) } + it { expect { model.isolation_level(:invalid) }.to raise_error(KeyError) } + end + + describe "find_and_save" do + let(:primary_key) { :external_id } + let(:external_id) { 100_500 } + let!(:player) do + model.send(:raw_model).create( + external_id:, + email: "email@mail.com", + online_status: false, + version: 123.456, + ) + end + + let(:keys) { { external_id: } } + + it "finds and saves an entry" do + model.find_and_save(keys:) do |entry| + entry.online_status = true + end + expect(player.reload.online_status).to be_truthy + end + + it "does nothing" do + keys = { external_id: external_id + 1 } + model.find_and_save(keys:) do |entry| + entry.online_status = true + end + expect(player.reload.online_status).to be_falsy + end + end +end diff --git a/spec/receiving/models/sequel_spec.rb b/spec/receiving/models/sequel_spec.rb new file mode 100644 index 0000000..4ed3d53 --- /dev/null +++ b/spec/receiving/models/sequel_spec.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +describe TableSync::Receiving::Model::Sequel do + subject(:model) { described_class.new(:players) } + + describe "#isolation_level" do + it { expect(model.isolation_level(:uncommitted)).to eq(:uncommitted) } + it { expect(model.isolation_level(:committed)).to eq(:committed) } + it { expect(model.isolation_level(:repeatable)).to eq(:repeatable) } + it { expect(model.isolation_level(:serializable)).to eq(:serializable) } + it { expect { model.isolation_level(:invalid) }.to raise_error(KeyError) } + end + + describe "find_and_save" do + let(:primary_key) { :external_id } + let(:external_id) { 100_500 } + let!(:player) do + model.send(:raw_model).create( + external_id:, + email: "email@mail.com", + online_status: false, + version: 123.456, + ) + end + + let(:keys) { { external_id: } } + + it "finds and saves an entry" do + model.find_and_save(keys:) do |entry| + entry.online_status = true + end + expect(player.reload.online_status).to be_truthy + end + + it "does nothing" do + keys = { external_id: external_id + 1 } + model.find_and_save(keys:) do |entry| + entry.online_status = true + end + expect(player.reload.online_status).to be_falsy + end + end +end diff --git a/spec/receiving/models_spec.rb b/spec/receiving/models_spec.rb index fcdb777..bfa3491 100644 --- a/spec/receiving/models_spec.rb +++ b/spec/receiving/models_spec.rb @@ -12,7 +12,7 @@ it "#columns" do expect(players.columns) - .to eq(%i[external_id project_id email online_status version rest]) + .to eq(%i[external_id project_id email online_status version rest hooks]) expect(clients.columns) .to eq(%i[client_id project_id name ext_id ext_project_id ts_version ts_rest]) end @@ -473,6 +473,7 @@ online_status: true, version: 12.343, rest: nil, + hooks: nil, }, { external_id: 222, @@ -481,6 +482,7 @@ online_status: false, version: 122.565, rest: nil, + hooks: nil, }, ]) @@ -496,6 +498,7 @@ online_status: false, version: 122.565, rest: nil, + hooks: nil, }]) end @@ -508,6 +511,7 @@ online_status: true, version: 12.343, rest: nil, + hooks: nil, }, { external_id: 222, @@ -516,6 +520,7 @@ online_status: false, version: 122.565, rest: nil, + hooks: nil, }, { external_id: 333, @@ -524,6 +529,7 @@ online_status: false, version: 122.5653, rest: nil, + hooks: nil, }, ]) @@ -543,6 +549,7 @@ online_status: true, version: 12.343, rest: nil, + hooks: nil, }, { external_id: 222, @@ -551,6 +558,7 @@ online_status: false, version: 122.565, rest: nil, + hooks: nil, }, ]) end @@ -565,6 +573,7 @@ online_status: false, version: 12.343, rest: nil, + hooks: nil, }, { external_id: 2, @@ -573,6 +582,7 @@ online_status: false, version: 12.565, rest: nil, + hooks: nil, }, ]) end @@ -707,6 +717,7 @@ online_status: true, version: 12.343, rest: nil, + hooks: nil, }]) players.after_commit { checks[0] = "test_after_commit" } @@ -735,6 +746,7 @@ online_status: true, version: 12.343, rest: nil, + hooks: nil, }]) players.after_commit { checks[0] = "test_after_commit" } diff --git a/spec/support/database_settings.rb b/spec/support/database_settings.rb index 25ba8f6..2ce05ae 100644 --- a/spec/support/database_settings.rb +++ b/spec/support/database_settings.rb @@ -39,7 +39,8 @@ "email" varchar(255) NOT NULL, "online_status" boolean, "version" decimal, - "rest" jsonb + "rest" jsonb, + "hooks" jsonb ); CREATE TABLE "clients" (