Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions test/rebalancer/rebalancer.result
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ test_run:switch('box_1_a')
vshard.storage.rebalancer_enable()
---
...
wait_rebalancer_state("Rebalance routes are sent", test_run)
wait_rebalancer_state("The following rebalancer routes were sent", test_run)
---
...
wait_rebalancer_state('The cluster is balanced ok', test_run)
Expand Down Expand Up @@ -239,7 +239,7 @@ cfg.rebalancer_disbalance_threshold = 0.01
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
---
...
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
---
...
wait_rebalancer_state('The cluster is balanced ok', test_run)
Expand Down Expand Up @@ -318,7 +318,13 @@ _bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
---
- [150, 'receiving']
...
wait_rebalancer_state("Some buckets are not active", test_run)
formatted_replicaset_uuid = string.gsub(util.replicasets[1], '%-', '%%-')
---
...
log_msg = string.format('Some buckets in replicaset %s are not active', formatted_replicaset_uuid)
---
...
wait_rebalancer_state(log_msg, test_run)
---
...
_bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.ACTIVE}})
Expand Down
8 changes: 5 additions & 3 deletions test/rebalancer/rebalancer.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, true)

test_run:switch('box_1_a')
vshard.storage.rebalancer_enable()
wait_rebalancer_state("Rebalance routes are sent", test_run)
wait_rebalancer_state("The following rebalancer routes were sent", test_run)

wait_rebalancer_state('The cluster is balanced ok', test_run)
_bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
Expand Down Expand Up @@ -118,7 +118,7 @@ _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
-- Return 1%.
cfg.rebalancer_disbalance_threshold = 0.01
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
wait_rebalancer_state('The cluster is balanced ok', test_run)
_bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
_bucket.index.status:min({vshard.consts.BUCKET.ACTIVE})
Expand Down Expand Up @@ -156,7 +156,9 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, false)
test_run:switch('box_1_a')
vshard.storage.rebalancer_enable()
_bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
wait_rebalancer_state("Some buckets are not active", test_run)
formatted_replicaset_uuid = string.gsub(util.replicasets[1], '%-', '%%-')
log_msg = string.format('Some buckets in replicaset %s are not active', formatted_replicaset_uuid)
wait_rebalancer_state(log_msg, test_run)
_bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.ACTIVE}})
vshard.storage.sync()

Expand Down
4 changes: 2 additions & 2 deletions test/rebalancer/stress_add_remove_several_rs.result
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ add_replicaset()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
---
...
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
---
...
-- Now, add a second replicaset.
Expand Down Expand Up @@ -422,7 +422,7 @@ remove_second_replicaset_first_stage()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
---
...
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
---
...
-- Rebalancing has been started - now remove second replicaset.
Expand Down
4 changes: 2 additions & 2 deletions test/rebalancer/stress_add_remove_several_rs.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fiber.sleep(0.5)
test_run:switch('box_1_a')
add_replicaset()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)

-- Now, add a second replicaset.

Expand Down Expand Up @@ -153,7 +153,7 @@ fiber.sleep(0.5)
test_run:switch('box_1_a')
remove_second_replicaset_first_stage()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
-- Rebalancing has been started - now remove second replicaset.
remove_replicaset_first_stage()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
Expand Down
136 changes: 136 additions & 0 deletions test/storage-luatest/storage_1_1_1_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,139 @@ test_group.test_manual_bucket_send_doubled_buckets = function(g)
ilt.assert_equals(box.space._bucket:get(bid), nil)
end, {bid})
end

local rebalancer_recovery_group = t.group('rebalancer-recovery-logging')

local function start_bucket_move(src_storage, dest_storage, bucket_id)
src_storage:exec(function(bucket_id, replicaset_id)
ivshard.storage.bucket_send(bucket_id, replicaset_id)
end, {bucket_id, dest_storage:replicaset_uuid()})

dest_storage:exec(function(bucket_id)
t.helpers.retrying({timeout = 10}, function()
t.assert(box.space._bucket:select(bucket_id))
end)
end, {bucket_id})
end

local function wait_for_bucket_is_transferred(src_storage, dest_storage,
bucket_id)
src_storage:exec(function(bucket_id)
t.helpers.retrying({}, function()
t.assert_equals(box.space._bucket:select(bucket_id), {})
end)
end, {bucket_id})
dest_storage:exec(function(bucket_id)
t.helpers.retrying({}, function()
t.assert_equals(box.space._bucket:select(bucket_id)[1].status,
'active')
end)
end, {bucket_id})
end

rebalancer_recovery_group.before_all(function(g)
global_cfg = vtest.config_new(cfg_template)
vtest.cluster_new(g, global_cfg)
g.router = vtest.router_new(g, 'router', global_cfg)
vtest.cluster_bootstrap(g, global_cfg)
vtest.cluster_wait_vclock_all(g)

vtest.cluster_exec_each_master(g, function()
box.schema.create_space('test_space')
box.space.test_space:format({
{name = 'pk', type = 'unsigned'},
{name = 'bucket_id', type = 'unsigned'},
})
box.space.test_space:create_index('primary', {parts = {'pk'}})
box.space.test_space:create_index(
'bucket_id', {parts = {'bucket_id'}, unique = false})
end)
end)

rebalancer_recovery_group.after_all(function(g)
g.cluster:drop()
end)

--
-- Improve logging of rebalancer and recovery (gh-212).
--
rebalancer_recovery_group.test_no_logs_while_unsuccess_recovery = function(g)
g.replica_2_a:exec(function()
ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = true
rawset(_G, 'old_call', ivshard.storage._call)
ivshard.storage._call = function(service_name, ...)
if service_name == 'recovery_bucket_stat' then
return error('TimedOut')
end
return _G.old_call(service_name, ...)
end
end)
local hanged_bucket_id_1 = vtest.storage_first_bucket(g.replica_1_a)
start_bucket_move(g.replica_1_a, g.replica_2_a, hanged_bucket_id_1)
local hanged_bucket_id_2 = vtest.storage_first_bucket(g.replica_1_a)
start_bucket_move(g.replica_1_a, g.replica_2_a, hanged_bucket_id_2)
t.helpers.retrying({}, function()
g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end)
t.assert(g.replica_1_a:grep_log('Error during recovery of bucket 1'))
end)
t.assert_not(g.replica_1_a:grep_log('Finish bucket recovery step, 0'))
g.replica_2_a:exec(function()
ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = false
ivshard.storage._call = _G.old_call
end)
t.helpers.retrying({timeout = 60}, function()
g.replica_2_a:exec(function()
ivshard.storage.garbage_collector_wakeup()
ivshard.storage.recovery_wakeup()
end)
g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end)
-- In some rare cases the recovery service can recover buckets one
-- by one. As a result we get multiple "Finish bucket recovery" and
-- "Recovery buckets" logs with different bucket ids and buckets'
-- count. That is why we should grep general logs without buckets'
-- count and bucket ids to avoid flakiness.
t.assert(g.replica_1_a:grep_log('Finish bucket recovery step'))
t.assert(g.replica_1_a:grep_log('Recovered buckets'))
end)
wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a,
hanged_bucket_id_1)
wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a,
hanged_bucket_id_2)
end

rebalancer_recovery_group.test_rebalancer_routes_logging = function(g)
local moved_bucket_from_2 = vtest.storage_first_bucket(g.replica_2_a)
start_bucket_move(g.replica_2_a, g.replica_1_a, moved_bucket_from_2)
local moved_bucket_from_3 = vtest.storage_first_bucket(g.replica_3_a)
start_bucket_move(g.replica_3_a, g.replica_1_a, moved_bucket_from_3)
t.helpers.retrying({timeout = 60}, function()
g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end)
t.assert(g.replica_1_a:grep_log('Apply rebalancer routes with 1 ' ..
'workers'))
end)
local rebalancer_routes_msg = string.format(
"{\"%s\":{\"%s\":1,\"%s\":1}}", g.replica_1_a:replicaset_uuid(),
g.replica_3_a:replicaset_uuid(), g.replica_2_a:replicaset_uuid())
t.assert(g.replica_1_a:grep_log(rebalancer_routes_msg))
t.helpers.retrying({}, function()
g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end)
g.replica_1_a:grep_log('The cluster is balanced ok.')
end)
end

rebalancer_recovery_group.test_no_log_spam_when_buckets_no_active = function(g)
local moved_bucket = vtest.storage_first_bucket(g.replica_2_a)
start_bucket_move(g.replica_1_a, g.replica_2_a, moved_bucket)
wait_for_bucket_is_transferred(g.replica_1_a, g.replica_2_a, moved_bucket)
vtest.storage_stop(g.replica_2_a)
local buckets_not_active = string.format('Some buckets in replicaset ' ..
'%s are not active',
g.replica_2_a:replicaset_uuid())
t.helpers.retrying({timeout = 60}, function()
g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end)
t.assert(g.replica_1_a:grep_log(buckets_not_active))
end)
vtest.storage_start(g.replica_2_a, global_cfg)
start_bucket_move(g.replica_2_a, g.replica_1_a, moved_bucket)
wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a, moved_bucket)
end
10 changes: 10 additions & 0 deletions test/unit/rebalancer.result
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,16 @@ _bucket:replace{1, consts.BUCKET.RECEIVING}
...
get_state()
---
- null
- name: PROC_LUA
code: 32
base_type: ClientError
type: ClientError
details: Replica _ has receiving buckets
message: Replica _ has receiving buckets
trace:
- file: /home/mrforza/Desktop/vshard/vshard/error.lua
line: 284
...
vshard.storage.internal.is_master = false
---
Expand Down
35 changes: 25 additions & 10 deletions vshard/storage/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local lmsgpack = require('msgpack')
local netbox = require('net.box') -- for net.box:self()
local trigger = require('internal.trigger')
local ffi = require('ffi')
local json_encode = require('json').encode
local yaml_encode = require('yaml').encode
local fiber_clock = lfiber.clock
local fiber_yield = lfiber.yield
Expand Down Expand Up @@ -932,6 +933,7 @@ local function recovery_step_by_type(type, limiter)
local recovered = 0
local total = 0
local start_format = 'Starting %s buckets recovery step'
local recovered_buckets = {SENT = {}, GARBAGE = {}, ACTIVE = {}}
for _, bucket in _bucket.index.status:pairs(type) do
lfiber.testcancel()
total = total + 1
Expand Down Expand Up @@ -992,22 +994,26 @@ local function recovery_step_by_type(type, limiter)
if recovery_local_bucket_is_sent(bucket, remote_bucket) then
_bucket:update({bucket_id}, {{'=', 2, BSENT}})
recovered = recovered + 1
table.insert(recovered_buckets['SENT'], bucket_id)
elseif recovery_local_bucket_is_garbage(bucket, remote_bucket) then
_bucket:update({bucket_id}, {{'=', 2, BGARBAGE}})
recovered = recovered + 1
table.insert(recovered_buckets['SENT'], bucket_id)
elseif recovery_local_bucket_is_active(bucket, remote_bucket) then
_bucket:replace({bucket_id, BACTIVE})
recovered = recovered + 1
table.insert(recovered_buckets['ACTIVE'], bucket_id)
elseif is_step_empty then
log.info('Bucket %s is %s local and %s on replicaset %s, waiting',
bucket_id, bucket.status, remote_bucket.status, peer_id)
end
is_step_empty = false
::continue::
end
if not is_step_empty then
if recovered > 0 then
log.info('Finish bucket recovery step, %d %s buckets are recovered '..
'among %d', recovered, type, total)
'among %d. Recovered buckets: %s', recovered, type, total,
json_encode(recovered_buckets))
end
return total, recovered
end
Expand Down Expand Up @@ -2800,6 +2806,7 @@ local function rebalancer_download_states()
replicaset, 'vshard.storage.rebalancer_request_state', {},
{timeout = consts.REBALANCER_GET_STATE_TIMEOUT})
if state == nil then
err.replicaset_id = replicaset.id
return nil, err
end
local bucket_count = state.bucket_active_count +
Expand Down Expand Up @@ -2848,9 +2855,11 @@ local function rebalancer_service_f(service, limiter)
local err = status and total_bucket_active_count or replicasets
if err then
limiter:log_error(err, service:set_status_error(
'Error during downloading rebalancer states: %s', err))
'Error during downloading rebalancer states: %s',
err.replicaset_id))
end
log.info('Some buckets are not active, retry rebalancing later')
log.info('Some buckets in replicaset %s are not active, retry ' ..
'rebalancing later', err and err.replicaset_id)
service:set_activity('idling')
lfiber.testcancel()
lfiber.sleep(consts.REBALANCER_WORK_INTERVAL)
Expand Down Expand Up @@ -2903,8 +2912,9 @@ local function rebalancer_service_f(service, limiter)
goto continue
end
end
log.info('Rebalance routes are sent. Schedule next wakeup after '..
'%f seconds', consts.REBALANCER_WORK_INTERVAL)
log.info('The following rebalancer routes were sent: %s. ' ..
'Schedule next wakeup after %f seconds', json_encode(routes),
consts.REBALANCER_WORK_INTERVAL)
service:set_activity('idling')
lfiber.testcancel()
lfiber.sleep(consts.REBALANCER_WORK_INTERVAL)
Expand Down Expand Up @@ -2938,18 +2948,23 @@ local function rebalancer_request_state()
return nil, err
end
if not M.is_rebalancer_active or rebalancing_is_in_progress() then
return
return nil, lerror.make('Rebalancer is not active or is in progress')
end
local _bucket = box.space._bucket
local status_index = _bucket.index.status
local repl_id = M.this_replica and M.this_replica.id or '_'
local buckets_invalid_state = 'Replica %s has %s buckets'
if #status_index:select({BSENDING}, {limit = 1}) > 0 then
return
return nil, lerror.make(string.format(buckets_invalid_state, repl_id,
consts.BUCKET.SENDING))
end
if #status_index:select({BRECEIVING}, {limit = 1}) > 0 then
return
return nil, lerror.make(string.format(buckets_invalid_state, repl_id,
consts.BUCKET.RECEIVING))
end
if #status_index:select({BGARBAGE}, {limit = 1}) > 0 then
return
return nil, lerror.make(string.format(buckets_invalid_state, repl_id,
consts.BUCKET.GARBAGE))
end
return {
bucket_active_count = status_index:count({BACTIVE}),
Expand Down
Loading