Skip to content

Commit 2218f94

Browse files
authored
chore: minor cleanups around the eval flow (#6152)
Also, fix a minor bug where script params were not flushed during "script flush" command. Signed-off-by: Roman Gershman <[email protected]>
1 parent 9c999fe commit 2218f94

File tree

4 files changed

+31
-35
lines changed

4 files changed

+31
-35
lines changed

src/server/main_service.cc

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2196,26 +2196,21 @@ void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter
21962196
ServerState::tlocal()->RecordCallLatency(sha, (end - start) / 1000);
21972197
}
21982198

2199-
optional<ScriptMgr::ScriptParams> LoadScript(string_view sha, ScriptMgr* script_mgr,
2200-
Interpreter* interpreter) {
2201-
auto ss = ServerState::tlocal();
2202-
2203-
if (!interpreter->Exists(sha)) {
2204-
auto script_data = script_mgr->Find(sha);
2205-
if (!script_data)
2206-
return std::nullopt;
2207-
2208-
string err;
2209-
Interpreter::AddResult add_res = interpreter->AddFunction(sha, script_data->body, &err);
2210-
if (add_res != Interpreter::ADD_OK) {
2211-
LOG(ERROR) << "Error adding " << sha << " to database, err " << err;
2212-
return std::nullopt;
2213-
}
2199+
void LoadScript(string_view sha, ScriptMgr* script_mgr, Interpreter* interpreter) {
2200+
if (interpreter->Exists(sha))
2201+
return;
22142202

2215-
return script_data;
2203+
auto script_data = script_mgr->Find(sha);
2204+
if (!script_data) {
2205+
LOG(DFATAL) << "Script " << sha << " not found in script mgr";
2206+
return;
22162207
}
22172208

2218-
return ss->GetScriptParams(sha);
2209+
string err;
2210+
Interpreter::AddResult add_res = interpreter->AddFunction(sha, script_data->body, &err);
2211+
if (add_res != Interpreter::ADD_OK) {
2212+
LOG(DFATAL) << "Error adding " << sha << " to database, err " << err;
2213+
}
22192214
}
22202215

22212216
// Determine multi mode based on script params.
@@ -2254,13 +2249,9 @@ bool StartMulti(ConnectionContext* cntx, Transaction::MultiMode tx_mode, CmdArgL
22542249
return false;
22552250
}
22562251

2257-
static bool CanRunSingleShardMulti(optional<ShardId> sid, const ScriptMgr::ScriptParams& params,
2252+
static bool CanRunSingleShardMulti(optional<ShardId> sid, Transaction::MultiMode multi_mode,
22582253
const Transaction& tx) {
2259-
if (!sid.has_value()) {
2260-
return false;
2261-
}
2262-
2263-
if (DetermineMultiMode(params) != Transaction::LOCK_AHEAD) {
2254+
if (!sid.has_value() || multi_mode != Transaction::LOCK_AHEAD) {
22642255
return false;
22652256
}
22662257

@@ -2283,9 +2274,13 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
22832274
return builder->SendError(facade::kScriptNotFound);
22842275
}
22852276

2286-
auto params = LoadScript(eval_args.sha, server_family_.script_mgr(), interpreter);
2287-
if (!params)
2277+
auto* ss = ServerState::tlocal();
2278+
auto params = ss->GetScriptParams(eval_args.sha);
2279+
if (!params) {
22882280
return builder->SendError(facade::kScriptNotFound);
2281+
}
2282+
2283+
LoadScript(eval_args.sha, server_family_.script_mgr(), interpreter);
22892284

22902285
string error;
22912286

@@ -2320,6 +2315,9 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
23202315
Transaction* tx = cntx->transaction;
23212316
CHECK(tx != nullptr);
23222317

2318+
Interpreter::RunResult result;
2319+
Transaction::MultiMode script_mode = DetermineMultiMode(*params);
2320+
23232321
interpreter->SetGlobalArray("KEYS", eval_args.keys);
23242322
interpreter->SetGlobalArray("ARGV", eval_args.args);
23252323

@@ -2328,17 +2326,15 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
23282326
sinfo.reset();
23292327
};
23302328

2331-
Interpreter::RunResult result;
2332-
2333-
if (CanRunSingleShardMulti(sid, *params, *tx)) {
2329+
if (CanRunSingleShardMulti(sid, script_mode, *tx)) {
23342330
// If script runs on a single shard, we run it remotely to save hops.
23352331
interpreter->SetRedisFunc([cntx, this](Interpreter::CallArgs args) {
23362332
// Disable squashing, as we're using the squashing mechanism to run remotely.
23372333
args.async = false;
23382334
CallFromScript(cntx, args);
23392335
});
23402336

2341-
++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt;
2337+
++ss->stats.eval_shardlocal_coordination_cnt;
23422338
tx->PrepareMultiForScheduleSingleHop(cntx->ns, *sid, cntx->db_index(), args);
23432339
tx->ScheduleSingleHop([&](Transaction*, EngineShard*) {
23442340
boost::intrusive_ptr<Transaction> stub_tx =
@@ -2351,13 +2347,12 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
23512347
return OpStatus::OK;
23522348
});
23532349

2354-
if (*sid != ServerState::tlocal()->thread_index()) {
2350+
if (*sid != ss->thread_index()) {
23552351
VLOG(2) << "Migrating connection " << cntx->conn() << " from "
23562352
<< ProactorBase::me()->GetPoolIndex() << " to " << *sid;
23572353
cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(*sid), false);
23582354
}
23592355
} else {
2360-
Transaction::MultiMode script_mode = DetermineMultiMode(*params);
23612356
Transaction::MultiMode tx_mode = tx->GetMultiMode();
23622357
bool scheduled = false;
23632358

@@ -2373,7 +2368,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
23732368
scheduled = StartMulti(cntx, script_mode, eval_args.keys);
23742369
}
23752370

2376-
++ServerState::tlocal()->stats.eval_io_coordination_cnt;
2371+
++ss->stats.eval_io_coordination_cnt;
23772372
interpreter->SetRedisFunc(
23782373
[cntx, this](Interpreter::CallArgs args) { CallFromScript(cntx, args); });
23792374

src/server/script_mgr.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ void ScriptMgr::FlushAllScript() {
366366

367367
shard_set->pool()->AwaitFiberOnAll([](auto* pb) {
368368
ServerState* ss = ServerState::tlocal();
369-
ss->ResetInterpreter();
369+
ss->FlushScriptCache();
370370
});
371371
}
372372

src/server/server_state.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ void ServerState::ReturnInterpreter(Interpreter* ir) {
252252
interpreter_mgr_.Return(ir);
253253
}
254254

255-
void ServerState::ResetInterpreter() {
255+
void ServerState::FlushScriptCache() {
256+
cached_script_params_.clear();
256257
interpreter_mgr_.Reset();
257258
}
258259

src/server/server_state.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ class ServerState { // public struct - to allow initialization.
202202
// Return interpreter to internal manager to be re-used.
203203
void ReturnInterpreter(Interpreter*);
204204

205-
void ResetInterpreter();
205+
void FlushScriptCache();
206206

207207
// Invoke function on all free interpreters. They are marked atomically as
208208
// used and the function is allowed to suspend.

0 commit comments

Comments
 (0)