Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions src/core/detail/listpack_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,11 @@ void ListpackWrap::Iterator::Read() {
next_ptr_ = lpNext(lp_, next_ptr_);
}

ListpackWrap::~ListpackWrap() {
DCHECK(!dirty_);
}

ListpackWrap ListpackWrap::WithCapacity(size_t capacity) {
return ListpackWrap{lpNew(capacity)};
}

uint8_t* ListpackWrap::GetPointer() {
dirty_ = false;
return lp_;
}

Expand All @@ -63,7 +58,6 @@ bool ListpackWrap::Delete(std::string_view key) {
return false;

lp_ = lpDeleteRangeWithEntry(lp_, &ptr, 2);
dirty_ = true;
return true;
}

Expand All @@ -90,7 +84,6 @@ bool ListpackWrap::Insert(std::string_view key, std::string_view value, bool ski
lp_ = lpReplace(lp_, &vptr, vsrc, value.size());
DCHECK_EQ(0u, lpLength(lp_) % 2);

dirty_ = true;
updated = true;
}
}
Expand All @@ -100,7 +93,6 @@ bool ListpackWrap::Insert(std::string_view key, std::string_view value, bool ski
// TODO: we should at least allocate once for both elements
lp_ = lpAppend(lp_, fsrc, key.size());
lp_ = lpAppend(lp_, vsrc, value.size());
dirty_ = true;
}

return !updated;
Expand All @@ -110,6 +102,10 @@ size_t ListpackWrap::size() const {
return lpLength(lp_) / 2;
}

size_t ListpackWrap::DataBytes() const {
return lpBytes(lp_);
}

ListpackWrap::Iterator ListpackWrap::begin() const {
return Iterator{lp_, lpFirst(lp_), intbuf_};
}
Expand Down
5 changes: 2 additions & 3 deletions src/core/detail/listpack_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ struct ListpackWrap {
using IntBuf = uint8_t[2][24];

public:
~ListpackWrap();

struct Iterator {
using iterator_category = std::forward_iterator_tag;
using difference_type = std::ptrdiff_t;
Expand Down Expand Up @@ -60,13 +58,14 @@ struct ListpackWrap {
Iterator end() const;
size_t size() const; // number of entries

size_t DataBytes() const;

// Get view from raw listpack iterator
static std::string_view GetView(uint8_t* lp_it, uint8_t int_buf[]);

private:
uint8_t* lp_; // the listpack itself
mutable IntBuf intbuf_; // buffer for integers decoded to strings
bool dirty_ = false; // whether lp_ was updated, but never retrieved with GetPointer
};

} // namespace dfly::detail
120 changes: 96 additions & 24 deletions src/server/hset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct HMapWrap {
}

public:
// Create from non-external prime value
HMapWrap(const PrimeValue& pv, DbContext db_cntx) {
DCHECK(!pv.IsExternal() || pv.IsCool());
if (pv.Encoding() == kEncodingListPack)
Expand All @@ -83,6 +84,9 @@ struct HMapWrap {
impl_ = GetStringMap(pv, db_cntx);
}

explicit HMapWrap(detail::ListpackWrap lw) : impl_{std::move(lw)} {
}

explicit HMapWrap(tiering::SerializedMap* sm) : impl_{sm} {
}

Expand Down Expand Up @@ -134,6 +138,14 @@ struct HMapWrap {
VisitMut(ov);
}

void Launder(tiering::SerializedMapDecoder* dec) {
Overloaded ov{
[](StringMap* s) {},
[&](detail::ListpackWrap& lw) { *dec->Write() = lw; },
};
VisitMut(ov);
}

template <typename T> optional<T> Get() const {
if (holds_alternative<T>(impl_))
return get<T>(impl_);
Expand Down Expand Up @@ -193,7 +205,12 @@ OpResult<T> ExecuteRO(Transaction* tx, F&& f) {
using D = tiering::SerializedMapDecoder;
util::fb2::Future<OpResult<T>> fut;
auto read_cb = [fut, f = std::move(f)](io::Result<D*> res) mutable {
HMapWrap hw{res.value()->Get()};
// Create wrapper from different types
Overloaded ov{
[](tiering::SerializedMap* sm) { return HMapWrap{sm}; },
[](detail::ListpackWrap* lw) { return HMapWrap{*lw}; },
};
auto hw = visit(ov, res.value()->Read());
fut.Resolve(f(hw));
};

Expand All @@ -216,15 +233,32 @@ OpResult<T> ExecuteRO(Transaction* tx, F&& f) {
}

// Wrap write handler
template <typename F> auto WrapW(F&& f) {
using RT = std::invoke_result_t<F, HMapWrap&>;
return [f = std::forward<F>(f)](Transaction* t, EngineShard* es) -> RT {
template <typename F> auto ExecuteW(Transaction* tx, F&& f) {
using T = typename std::invoke_result_t<F, HMapWrap&>::Type;
auto shard_cb = [f = std::forward<F>(f)](Transaction* t,
EngineShard* es) -> OpResult<CbVariant<T>> {
// Fetch value of hash type
auto [key, op_args] = KeyAndArgs(t, es);

auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_HASH);
RETURN_ON_BAD_STATUS(it_res);
auto& pv = it_res->it->second;

// Enqueue read for future values
if (pv.IsExternal() && !pv.IsCool()) {
using D = tiering::SerializedMapDecoder;
util::fb2::Future<OpResult<T>> fut;
auto read_cb = [fut, f = std::move(f)](io::Result<D*> res) mutable {
// Create wrapper from different types
HMapWrap hw{*res.value()->Write()};
fut.Resolve(f(hw));
hw.Launder(*res);
};

es->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{}, std::move(read_cb));
return CbVariant<T>{std::move(fut)};
}

// Remove document before modification
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, pv);

Expand All @@ -240,8 +274,11 @@ template <typename F> auto WrapW(F&& f) {
else
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);

return res;
RETURN_ON_BAD_STATUS(res);
return CbVariant<T>{std::move(res).value()};
};

return Unwrap(tx->ScheduleSingleHopT(std::move(shard_cb)));
}

size_t EstimateListpackMinBytes(CmdArgList members) {
Expand Down Expand Up @@ -299,6 +336,10 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc

auto& add_res = *op_res;
PrimeValue& pv = add_res.it->second;

if (pv.IsExternal() && !pv.IsCool())
return OpStatus::CANCELLED; // Not supported for offloaded values

if (add_res.is_new) {
pv.InitRobj(OBJ_HASH, kEncodingListPack, lpNew(0));
} else {
Expand Down Expand Up @@ -391,28 +432,27 @@ OpResult<vector<OptStr>> OpHMGet(const HMapWrap& hw, CmdArgList fields) {
DCHECK(!fields.empty());

std::vector<OptStr> result(fields.size());
if (auto lw = hw.Get<detail::ListpackWrap>(); lw) {
if (auto sm = hw.Get<StringMap*>(); sm) {
for (size_t i = 0; i < fields.size(); ++i) {
if (auto it = (*sm)->Find(fields[i]); it != (*sm)->end()) {
result[i].emplace(it->second, sdslen(it->second));
}
}
} else {
absl::flat_hash_map<string_view, absl::InlinedVector<size_t, 3>> reverse;
reverse.reserve(fields.size() + 1);
for (size_t i = 0; i < fields.size(); ++i) {
reverse[ArgS(fields, i)].push_back(i); // map fields to their index.
}

for (const auto [key, value] : *lw) {
for (const auto [key, value] : hw.Range()) {
if (auto it = reverse.find(key); it != reverse.end()) {
for (size_t index : it->second) {
DCHECK_LT(index, result.size());
result[index].emplace(value);
}
}
}
} else {
StringMap* sm = *hw.Get<StringMap*>();
for (size_t i = 0; i < fields.size(); ++i) {
if (auto it = sm->Find(fields[i]); it != sm->end()) {
result[i].emplace(it->second, sdslen(it->second));
}
}
}

return result;
Expand All @@ -424,8 +464,9 @@ struct OpSetParams {
bool keepttl = false;
};

OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList values,
const OpSetParams& op_sp = OpSetParams{}) {
OpResult<CbVariant<uint32_t>> OpSet(const OpArgs& op_args, string_view key, CmdArgList values,
const OpSetParams& op_sp = OpSetParams{},
optional<util::fb2::Future<bool>>* bp_anker = nullptr) {
DCHECK(!values.empty() && 0 == values.size() % 2);
VLOG(2) << "OpSet(" << key << ")";

Expand All @@ -438,6 +479,27 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
auto& it = add_res.it;
PrimeValue& pv = it->second;

// If the value is external, enqueue read and modify it there
if (pv.IsExternal() && !pv.IsCool()) {
if (op_sp.ttl != UINT32_MAX)
return OpStatus::CANCELLED; // Don't support expiry with offloaded hashes

using D = tiering::SerializedMapDecoder;
util::fb2::Future<OpResult<uint32_t>> fut;
auto read_cb = [fut, values, &op_sp](io::Result<D*> res) mutable {
auto& lw = *res.value()->Write();
uint32_t created = 0;
for (size_t i = 0; i < values.size(); i += 2) {
created += lw.Insert(values[i], values[i + 1], op_sp.skip_if_exists);
}
fut.Resolve(created);
};

op_args.shard->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{},
std::move(read_cb));
return CbVariant<uint32_t>{std::move(fut)};
}

if (add_res.is_new) {
if (op_sp.ttl == UINT32_MAX) {
lp = lpNew(0);
Expand Down Expand Up @@ -492,10 +554,13 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu

op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);

if (auto* ts = op_args.shard->tiered_storage(); ts)
ts->TryStash(op_args.db_cntx.db_index, key, &pv);
if (auto* ts = op_args.shard->tiered_storage(); ts) {
auto bp = ts->TryStash(op_args.db_cntx.db_index, key, &pv, true);
if (bp && bp_anker)
*bp_anker = std::move(*bp);
}

return created;
return CbVariant<uint32_t>{created};
}

void HGetGeneric(CmdArgList args, uint8_t getall_mask, Transaction* tx, SinkReplyBuilder* builder) {
Expand Down Expand Up @@ -587,7 +652,8 @@ void HSetEx(CmdArgList args, const CommandContext& cmd_cntx) {
return OpSet(t->GetOpArgs(shard), key, fields, op_sp);
};

OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
auto delayed_result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
OpResult<uint32_t> result = Unwrap(std::move(delayed_result));
if (result) {
cmd_cntx.rb->SendLong(*result);
} else {
Expand Down Expand Up @@ -618,7 +684,7 @@ void HSetFamily::HDel(CmdArgList args, const CommandContext& cmd_cntx) {
deleted += hw.Erase(s);
return deleted;
};
HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(WrapW(cb)));
HSetReplies{cmd_cntx.rb}.Send(ExecuteW(cmd_cntx.tx, std::move(cb)));
}

void HSetFamily::HExpire(CmdArgList args, const CommandContext& cmd_cntx) {
Expand Down Expand Up @@ -856,12 +922,18 @@ void HSetFamily::HSet(CmdArgList args, const CommandContext& cmd_cntx) {
return cmd_cntx.rb->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType);
}

optional<util::fb2::Future<bool>> tiered_backpressure;

args.remove_prefix(1);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpSet(t->GetOpArgs(shard), key, args);
return OpSet(t->GetOpArgs(shard), key, args, OpSetParams{}, &tiered_backpressure);
};

OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
auto delayed_result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
OpResult<uint32_t> result = Unwrap(std::move(delayed_result));

if (tiered_backpressure)
tiered_backpressure->GetFor(10ms);

if (result && cmd == "HSET") {
cmd_cntx.rb->SendLong(*result);
Expand All @@ -876,7 +948,7 @@ void HSetFamily::HSetNx(CmdArgList args, const CommandContext& cmd_cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpSet(t->GetOpArgs(shard), key, args.subspan(1), OpSetParams{.skip_if_exists = true});
};
HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(cb));
HSetReplies{cmd_cntx.rb}.Send(Unwrap(cmd_cntx.tx->ScheduleSingleHopT(cb)));
}

void StrVecEmplaceBack(StringVec& str_vec, const listpackEntry& lp) {
Expand Down
1 change: 0 additions & 1 deletion src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
// Set value to be an in-memory type again. Update memory stats.
void Upload(DbIndex dbid, string_view value, PrimeValue* pv) {
DCHECK(!value.empty());

switch (pv->GetExternalRep()) {
case CompactObj::ExternalRep::STRING:
pv->Materialize(value, true);
Expand Down
41 changes: 36 additions & 5 deletions src/server/tiered_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,8 @@ TEST_F(PureDiskTSTest, Dump) {
TEST_P(LatentCoolingTSTest, SimpleHash) {
absl::FlagSaver saver;
absl::SetFlag(&FLAGS_tiered_experimental_hash_support, true);
// For now, never upload as its not implemented yet
absl::SetFlag(&FLAGS_tiered_upload_threshold, 0.0);
absl::SetFlag(&FLAGS_tiered_upload_threshold,
0.0); // For now, never upload as its not implemented yet
UpdateFromFlags();

const size_t kNUM = 100;
Expand All @@ -516,9 +516,9 @@ TEST_P(LatentCoolingTSTest, SimpleHash) {
// Wait for all to be stashed or in end up in bins
ExpectConditionWithinTimeout([=] {
auto metrics = GetMetrics();
return metrics.tiered_stats.total_stashes +
metrics.tiered_stats.small_bins_filling_entries_cnt ==
kNUM;
size_t sum =
metrics.tiered_stats.total_stashes + metrics.tiered_stats.small_bins_filling_entries_cnt;
return sum == kNUM;
});

// Verify correctness
Expand All @@ -530,6 +530,37 @@ TEST_P(LatentCoolingTSTest, SimpleHash) {
auto v = string{31, 'x'} + 'f';
EXPECT_EQ(resp, v);
}

// Start offloading
SetFlag(&FLAGS_tiered_offload_threshold, 1.0);
UpdateFromFlags();
auto wait_offloaded = [=] {
auto metrics = GetMetrics();
size_t sum =
metrics.db_stats[0].tiered_entries + metrics.tiered_stats.small_bins_filling_entries_cnt;
return sum == kNUM;
};

// Wait for all offloads again
ExpectConditionWithinTimeout(wait_offloaded);

// HDEL
for (size_t i = 0; i < kNUM; i++) {
string key = absl::StrCat("k", i);
EXPECT_THAT(Run({"HDEL", key, string{1, 'c'}}), IntArg(1));
EXPECT_THAT(Run({"HLEN", key}), IntArg(25));
}

// Wait for all offloads again
ExpectConditionWithinTimeout(wait_offloaded);

// HSET new field
for (size_t i = 0; i < kNUM; i++) {
string key = absl::StrCat("k", i);
EXPECT_THAT(Run({"HSET", key, string{1, 'c'}, "Some new value"}), IntArg(1));
EXPECT_THAT(Run({"HLEN", key}), IntArg(26));
EXPECT_EQ(Run({"HGET", key, string{1, 'c'}}), "Some new value");
}
}

} // namespace dfly
Loading
Loading