Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 27 additions & 32 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2194,26 +2194,21 @@ void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter
ServerState::tlocal()->RecordCallLatency(sha, (end - start) / 1000);
}

optional<ScriptMgr::ScriptParams> LoadScript(string_view sha, ScriptMgr* script_mgr,
Interpreter* interpreter) {
auto ss = ServerState::tlocal();

if (!interpreter->Exists(sha)) {
auto script_data = script_mgr->Find(sha);
if (!script_data)
return std::nullopt;

string err;
Interpreter::AddResult add_res = interpreter->AddFunction(sha, script_data->body, &err);
if (add_res != Interpreter::ADD_OK) {
LOG(ERROR) << "Error adding " << sha << " to database, err " << err;
return std::nullopt;
}
void LoadScript(string_view sha, ScriptMgr* script_mgr, Interpreter* interpreter) {
if (interpreter->Exists(sha))
return;

return script_data;
auto script_data = script_mgr->Find(sha);
if (!script_data) {
LOG(DFATAL) << "Script " << sha << " not found in script mgr";
return;
}

return ss->GetScriptParams(sha);
string err;
Interpreter::AddResult add_res = interpreter->AddFunction(sha, script_data->body, &err);
if (add_res != Interpreter::ADD_OK) {
LOG(DFATAL) << "Error adding " << sha << " to database, err " << err;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If AddFunction fails here we only log and continue, which could lead to executing the script path without the function actually being registered. Consider returning an error or aborting the eval path early after this failure to avoid inconsistent runtime behavior.

🤖 Was this useful? React with 👍 or 👎

}
}

// Determine multi mode based on script params.
Expand Down Expand Up @@ -2252,13 +2247,9 @@ bool StartMulti(ConnectionContext* cntx, Transaction::MultiMode tx_mode, CmdArgL
return false;
}

static bool CanRunSingleShardMulti(optional<ShardId> sid, const ScriptMgr::ScriptParams& params,
static bool CanRunSingleShardMulti(optional<ShardId> sid, Transaction::MultiMode multi_mode,
const Transaction& tx) {
if (!sid.has_value()) {
return false;
}

if (DetermineMultiMode(params) != Transaction::LOCK_AHEAD) {
if (!sid.has_value() || multi_mode != Transaction::LOCK_AHEAD) {
return false;
}

Expand All @@ -2281,9 +2272,13 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
return builder->SendError(facade::kScriptNotFound);
}

auto params = LoadScript(eval_args.sha, server_family_.script_mgr(), interpreter);
if (!params)
auto* ss = ServerState::tlocal();
auto params = ss->GetScriptParams(eval_args.sha);
if (!params) {
return builder->SendError(facade::kScriptNotFound);
}

LoadScript(eval_args.sha, server_family_.script_mgr(), interpreter);

string error;

Expand Down Expand Up @@ -2318,6 +2313,9 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
Transaction* tx = cntx->transaction;
CHECK(tx != nullptr);

Interpreter::RunResult result;
Transaction::MultiMode script_mode = DetermineMultiMode(*params);

interpreter->SetGlobalArray("KEYS", eval_args.keys);
interpreter->SetGlobalArray("ARGV", eval_args.args);

Expand All @@ -2326,17 +2324,15 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
sinfo.reset();
};

Interpreter::RunResult result;

if (CanRunSingleShardMulti(sid, *params, *tx)) {
if (CanRunSingleShardMulti(sid, script_mode, *tx)) {
// If script runs on a single shard, we run it remotely to save hops.
interpreter->SetRedisFunc([cntx, this](Interpreter::CallArgs args) {
// Disable squashing, as we're using the squashing mechanism to run remotely.
args.async = false;
CallFromScript(cntx, args);
});

++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt;
++ss->stats.eval_shardlocal_coordination_cnt;
tx->PrepareMultiForScheduleSingleHop(cntx->ns, *sid, cntx->db_index(), args);
tx->ScheduleSingleHop([&](Transaction*, EngineShard*) {
boost::intrusive_ptr<Transaction> stub_tx =
Expand All @@ -2349,13 +2345,12 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
return OpStatus::OK;
});

if (*sid != ServerState::tlocal()->thread_index()) {
if (*sid != ss->thread_index()) {
VLOG(2) << "Migrating connection " << cntx->conn() << " from "
<< ProactorBase::me()->GetPoolIndex() << " to " << *sid;
cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(*sid), false);
}
} else {
Transaction::MultiMode script_mode = DetermineMultiMode(*params);
Transaction::MultiMode tx_mode = tx->GetMultiMode();
bool scheduled = false;

Expand All @@ -2371,7 +2366,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
scheduled = StartMulti(cntx, script_mode, eval_args.keys);
}

++ServerState::tlocal()->stats.eval_io_coordination_cnt;
++ss->stats.eval_io_coordination_cnt;
interpreter->SetRedisFunc(
[cntx, this](Interpreter::CallArgs args) { CallFromScript(cntx, args); });

Expand Down
2 changes: 1 addition & 1 deletion src/server/script_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ void ScriptMgr::FlushAllScript() {

shard_set->pool()->AwaitFiberOnAll([](auto* pb) {
ServerState* ss = ServerState::tlocal();
ss->ResetInterpreter();
ss->FlushScriptCache();
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/server/server_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ void ServerState::ReturnInterpreter(Interpreter* ir) {
interpreter_mgr_.Return(ir);
}

void ServerState::ResetInterpreter() {
void ServerState::FlushScriptCache() {
cached_script_params_.clear();
interpreter_mgr_.Reset();
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class ServerState { // public struct - to allow initialization.
// Return interpreter to internal manager to be re-used.
void ReturnInterpreter(Interpreter*);

void ResetInterpreter();
void FlushScriptCache();

// Invoke function on all free interpreters. They are marked atomically as
// used and the function is allowed to suspend.
Expand Down
Loading