Skip to content

Commit 040a6b6

Browse files
committed
chore: minor cleanups around the eval flow
Also, fix a minor bug where script params were not flushed during "script flush" command. Signed-off-by: Roman Gershman <[email protected]>
1 parent ba07e77 commit 040a6b6

File tree

4 files changed

+31
-35
lines changed

4 files changed

+31
-35
lines changed

src/server/main_service.cc

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2194,26 +2194,21 @@ void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter
21942194
ServerState::tlocal()->RecordCallLatency(sha, (end - start) / 1000);
21952195
}
21962196

2197-
optional<ScriptMgr::ScriptParams> LoadScript(string_view sha, ScriptMgr* script_mgr,
2198-
Interpreter* interpreter) {
2199-
auto ss = ServerState::tlocal();
2200-
2201-
if (!interpreter->Exists(sha)) {
2202-
auto script_data = script_mgr->Find(sha);
2203-
if (!script_data)
2204-
return std::nullopt;
2205-
2206-
string err;
2207-
Interpreter::AddResult add_res = interpreter->AddFunction(sha, script_data->body, &err);
2208-
if (add_res != Interpreter::ADD_OK) {
2209-
LOG(ERROR) << "Error adding " << sha << " to database, err " << err;
2210-
return std::nullopt;
2211-
}
2197+
void LoadScript(string_view sha, ScriptMgr* script_mgr, Interpreter* interpreter) {
2198+
if (interpreter->Exists(sha))
2199+
return;
22122200

2213-
return script_data;
2201+
auto script_data = script_mgr->Find(sha);
2202+
if (!script_data) {
2203+
LOG(DFATAL) << "Script " << sha << " not found in script mgr";
2204+
return;
22142205
}
22152206

2216-
return ss->GetScriptParams(sha);
2207+
string err;
2208+
Interpreter::AddResult add_res = interpreter->AddFunction(sha, script_data->body, &err);
2209+
if (add_res != Interpreter::ADD_OK) {
2210+
LOG(DFATAL) << "Error adding " << sha << " to database, err " << err;
2211+
}
22172212
}
22182213

22192214
// Determine multi mode based on script params.
@@ -2252,13 +2247,9 @@ bool StartMulti(ConnectionContext* cntx, Transaction::MultiMode tx_mode, CmdArgL
22522247
return false;
22532248
}
22542249

2255-
static bool CanRunSingleShardMulti(optional<ShardId> sid, const ScriptMgr::ScriptParams& params,
2250+
static bool CanRunSingleShardMulti(optional<ShardId> sid, Transaction::MultiMode multi_mode,
22562251
const Transaction& tx) {
2257-
if (!sid.has_value()) {
2258-
return false;
2259-
}
2260-
2261-
if (DetermineMultiMode(params) != Transaction::LOCK_AHEAD) {
2252+
if (!sid.has_value() || multi_mode != Transaction::LOCK_AHEAD) {
22622253
return false;
22632254
}
22642255

@@ -2281,9 +2272,13 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
22812272
return builder->SendError(facade::kScriptNotFound);
22822273
}
22832274

2284-
auto params = LoadScript(eval_args.sha, server_family_.script_mgr(), interpreter);
2285-
if (!params)
2275+
auto* ss = ServerState::tlocal();
2276+
auto params = ss->GetScriptParams(eval_args.sha);
2277+
if (!params) {
22862278
return builder->SendError(facade::kScriptNotFound);
2279+
}
2280+
2281+
LoadScript(eval_args.sha, server_family_.script_mgr(), interpreter);
22872282

22882283
string error;
22892284

@@ -2318,6 +2313,9 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
23182313
Transaction* tx = cntx->transaction;
23192314
CHECK(tx != nullptr);
23202315

2316+
Interpreter::RunResult result;
2317+
Transaction::MultiMode script_mode = DetermineMultiMode(*params);
2318+
23212319
interpreter->SetGlobalArray("KEYS", eval_args.keys);
23222320
interpreter->SetGlobalArray("ARGV", eval_args.args);
23232321

@@ -2326,17 +2324,15 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
23262324
sinfo.reset();
23272325
};
23282326

2329-
Interpreter::RunResult result;
2330-
2331-
if (CanRunSingleShardMulti(sid, *params, *tx)) {
2327+
if (CanRunSingleShardMulti(sid, script_mode, *tx)) {
23322328
// If script runs on a single shard, we run it remotely to save hops.
23332329
interpreter->SetRedisFunc([cntx, this](Interpreter::CallArgs args) {
23342330
// Disable squashing, as we're using the squashing mechanism to run remotely.
23352331
args.async = false;
23362332
CallFromScript(cntx, args);
23372333
});
23382334

2339-
++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt;
2335+
++ss->stats.eval_shardlocal_coordination_cnt;
23402336
tx->PrepareMultiForScheduleSingleHop(cntx->ns, *sid, cntx->db_index(), args);
23412337
tx->ScheduleSingleHop([&](Transaction*, EngineShard*) {
23422338
boost::intrusive_ptr<Transaction> stub_tx =
@@ -2349,13 +2345,12 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
23492345
return OpStatus::OK;
23502346
});
23512347

2352-
if (*sid != ServerState::tlocal()->thread_index()) {
2348+
if (*sid != ss->thread_index()) {
23532349
VLOG(2) << "Migrating connection " << cntx->conn() << " from "
23542350
<< ProactorBase::me()->GetPoolIndex() << " to " << *sid;
23552351
cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(*sid), false);
23562352
}
23572353
} else {
2358-
Transaction::MultiMode script_mode = DetermineMultiMode(*params);
23592354
Transaction::MultiMode tx_mode = tx->GetMultiMode();
23602355
bool scheduled = false;
23612356

@@ -2371,7 +2366,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
23712366
scheduled = StartMulti(cntx, script_mode, eval_args.keys);
23722367
}
23732368

2374-
++ServerState::tlocal()->stats.eval_io_coordination_cnt;
2369+
++ss->stats.eval_io_coordination_cnt;
23752370
interpreter->SetRedisFunc(
23762371
[cntx, this](Interpreter::CallArgs args) { CallFromScript(cntx, args); });
23772372

src/server/script_mgr.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ void ScriptMgr::FlushAllScript() {
354354

355355
shard_set->pool()->AwaitFiberOnAll([](auto* pb) {
356356
ServerState* ss = ServerState::tlocal();
357-
ss->ResetInterpreter();
357+
ss->FlushScriptCache();
358358
});
359359
}
360360

src/server/server_state.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ void ServerState::ReturnInterpreter(Interpreter* ir) {
252252
interpreter_mgr_.Return(ir);
253253
}
254254

255-
void ServerState::ResetInterpreter() {
255+
void ServerState::FlushScriptCache() {
256+
cached_script_params_.clear();
256257
interpreter_mgr_.Reset();
257258
}
258259

src/server/server_state.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ class ServerState { // public struct - to allow initialization.
202202
// Return interpreter to internal manager to be re-used.
203203
void ReturnInterpreter(Interpreter*);
204204

205-
void ResetInterpreter();
205+
void FlushScriptCache();
206206

207207
// Invoke function on all free interpreters. They are marked atomically as
208208
// used and the function is allowed to suspend.

0 commit comments

Comments
 (0)