Skip to content

Conversation

@kostasrim
Copy link
Contributor

@kostasrim kostasrim commented Nov 17, 2025

This PR adds IoLoopV2 that removes fiber blocking read calls from the flow. This allows the connection fiber to handle other events while recv calls are handled asynchronously from readiness events triggered by a registered multishot poll.

  • add IoLoopV2 that uses non fiber blocking async reads via poll registration
  • add tls label in pytests
  • add a manual trigger workflow to run all the regression tests (both epoll/uring) without tls

Follow up on a separate PR:

  • Allow also EnableRecvMultishot()

Resolves #6028

}

phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No functional change from here and onwards in comparison IoLoop

await check_stats()


@pytest.mark.tls
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no changes to the tests. I added the tls label to disable tls tests on the manual workflow I introduced (because tls is broken for now).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, so instead - avoid enabling v2 for tls sockets. we have is_tls_ flag for that

Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
@kostasrim kostasrim marked this pull request as ready for review December 1, 2025 10:49
@kostasrim
Copy link
Contributor Author

@kostasrim kostasrim changed the title [wip] chore: asynchronous IO for connection fiber chore: asynchronous IO for connection fiber Dec 1, 2025
Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 5 suggestions posted.

Comment augment review to trigger a new review at any time.

export ROOT_DIR="${GITHUB_WORKSPACE}/tests/dragonfly/valkey_search"
export UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1 # to crash on errors
if [[ "${{inputs.df-arg}}" == 'epoll' ]]; then
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DF_ARG is only set when inputs.df-arg equals 'epoll', which prevents other values (e.g., the workflow’s 'expiremental_io_loop_v2') from being propagated to pytest.

🤖 Was this useful? React with 👍 or 👎

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have epoll argument why do you need this?

export FILTER="${{inputs.filter}} and not exclude_epoll"
# Run only replication tests with epoll
timeout 80m pytest -m "$FILTER" --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --log-cli-level=INFO || code=$?
timeout 80m pytest -m "$FILTER" $DF_ARG --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --df --log-cli-level=INFO || code=$?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an extra --df with no value after force_epoll=true in the pytest command, which is likely to break argument parsing.

🤖 Was this useful? React with 👍 or 👎

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

io_ec_ = make_error_code(errc::connection_aborted);
}

LOG_IF(FATAL, !io_ec_) << "Recv error: " << strerror(-res) << " errno " << errno;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strerror(-res) uses the negated recv return value instead of errno, so the logged error message text will be incorrect.

🤖 Was this useful? React with 👍 or 👎

[this]() { return io_buf_.InputLen() > 0 || io_ec_ || io_buf_.AppendLen() == 0; });

if (io_ec_) {
LOG_IF(WARNING, cntx()->replica_conn) << "async io error: " << ec;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log line prints ec, which is uninitialized here, instead of the actual I/O error io_ec_.

🤖 Was this useful? React with 👍 or 👎

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

#include <absl/strings/str_cat.h>
#include <absl/time/time.h>

#include <condition_variable>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO remove the includes. plugin is too aggressive

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, please

@kostasrim kostasrim requested a review from romange December 1, 2025 10:54
@kostasrim
Copy link
Contributor Author

@romange you can start taking a look.

  1. I added a workflow + flag to run ioloopv2. The former is to avoid running the tests on both paths every time. Once we slowly migrate I will remove the workflow file -- it's just a temporary solution
  2. I only used epoll based notifications -- will follow up with enable_multi_shot_=true as well
  3. I introduced ioloopv2 to provide clear serparation between the two implementations

export ROOT_DIR="${GITHUB_WORKSPACE}/tests/dragonfly/valkey_search"
export UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1 # to crash on errors
if [[ "${{inputs.df-arg}}" == 'epoll' ]]; then
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have epoll argument why do you need this?

export FILTER="${{inputs.filter}} and not exclude_epoll"
# Run only replication tests with epoll
timeout 80m pytest -m "$FILTER" --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --log-cli-level=INFO || code=$?
timeout 80m pytest -m "$FILTER" $DF_ARG --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --df --log-cli-level=INFO || code=$?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

#include <absl/strings/str_cat.h>
#include <absl/time/time.h>

#include <condition_variable>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, please

}

io::MutableBytes buf = io_buf_.AppendBuffer();
int res = recv(socket_->native_handle(), buf.data(), buf.size(), 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use TryRecv interface

do {
HandleMigrateRequest();

// We *must* poll again for readiness. The event handler we registered above
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is possible to improve for sure. 'We must' is a too strong statement as you can introduce a state that tracks whether the read is needed and when it's not needed.
Either change to TODO or fix it in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, is it a regression to poll unconditionally ? I imagine, the overhead for EAGAIN is minimal so we don't really need a variable for this. I wrote must because I found poll to be the simplest way. I reworded the comment to be less strong. My only question is, what is the reason to prefer a flag over an unconditional poll assuming EAGAIN is not eating CPU time?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a system call and we add another system-call per each loop. It's fine for now but I think we will have to add a state variable to track when this call is needed. I do not know how it effects performance, in fact, we will need to do some benchmarking tests on this PR. I will do it next week

[this]() { return io_buf_.InputLen() > 0 || io_ec_ || io_buf_.AppendLen() == 0; });

if (io_ec_) {
LOG_IF(WARNING, cntx()->replica_conn) << "async io error: " << ec;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

util::fb2::Fiber async_fb_; // async fiber (if started)

std::error_code io_ec_;
util::fb2::EventCount io_event_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not use EventCount for thread-local synchronization - use CondVarAny instead

await check_stats()


@pytest.mark.tls
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, so instead - avoid enabling v2 for tls sockets. we have is_tls_ flag for that

Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
@kostasrim kostasrim requested a review from romange December 3, 2025 10:44
return;
}

LOG_EVERY_T(ERROR, 10) << "Recv error: " << ec;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

io::Result<size_t> res = socket_->TryRecv(buf);

// error path
if (!res) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for readability, I would reverse this condition and handle the happy case first. aka
if (res && *res) {


from . import dfly_args
from .instance import DflyInstance
from .utility import *
Copy link
Collaborator

@romange romange Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed?

UpdateIoBufCapacity(io_buf_, stats_, [&]() { io_buf_.EnsureCapacity(64); });
auto res = IoLoop();
variant<error_code, Connection::ParserStatus> res;
if (GetFlag(FLAGS_expiremental_io_loop_v2) && !is_tls_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

introduce a boolean variable instead of duplicating condition below.

return;
}
// A recv call can return fewer bytes than requested even if the
// socket buffer actually contains enough data to satisfy the full request.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you say this? yes, there are some corner cases but in general, Recv will try to unload as much as possible and fill the destination buffer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recv can return less data than available on the socket in non-blocking mode. so even if the read buffers contain 1k bytes and even if you try to read 1k, recv might return less.

I don't want to sound too confident here, that's my current understanding of the behaviour in non blocking mode.

Either way, I want to describe the reason why poll is important: a) because it could be that we read less data than it is available or b) the capacity of io_buf_ is less than what recv can satisfy in one call

Copy link
Collaborator

@romange romange Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this exact example can not happen. i,e, with 1KB you will always read 1KB. There are some limits of how much it will read in one call, but besides that, my understanding is that a) can happen only because of b)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, define "available". how can you observe "available" size without reading the data?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, you can: ioctl(socket_fd, FIONREAD, &bytes_available)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, gemini confirms your statement. Good to know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah perplexity confirmed this for me as well but I do not trust it that much -- If I recall correctly I think I read this somewhere in a conversation on the kernel mailing list but I didn't verify it myself (and hence wrote I am not sure)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, you can: ioctl(socket_fd, FIONREAD, &bytes_available)

yes the receive buffers


// TODO EnableRecvMultishot

// Breaks with TLS. RegisterOnRecv is unimplemented.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment should move where you check is_tls_

@romange
Copy link
Collaborator

romange commented Dec 3, 2025

LGTM in general, let's try submitting it this week.

Signed-off-by: Kostas Kyrimis <[email protected]>
@kostasrim
Copy link
Contributor Author

LGTM in general, let's try submitting it this week.

I addressed your comments and run the tests again. Plz rereview

@kostasrim kostasrim requested a review from romange December 3, 2025 13:15
<< "Redis parser error: " << result << " during parse: " << ToSV(read_buffer);
}
read_buffer.remove_prefix(consumed);
total += consumed;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I did not catch earlier - I suggest removing total counter and instead calling here
io_buf_.ConsumeInput(consumed);

I think it's better because as you said - we preempt here, during which time we can read more data.
this will lead to much smoother "pipeline-like" I/O.

Currently: you read, say 4KB, and your io_buf does not have more space. you start executing commands.
your read notifications won't fill the buffer until you finish running all the commands and empty your io_buf. only then you will call the recv and copy the next chunk.

@romange romange requested a review from Copilot December 3, 2025 17:44
Copilot finished reviewing on behalf of romange December 3, 2025 17:48
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces IoLoopV2, a new asynchronous I/O implementation for Dragonfly's connection handling that eliminates fiber-blocking read calls. The new implementation uses multishot poll registration for non-blocking socket reads, allowing the connection fiber to handle other events while recv operations are processed asynchronously. This is currently experimental and only enabled for non-TLS connections when the expiremental_io_loop_v2 flag is set.

Key changes:

  • Adds IoLoopV2() method that uses asynchronous, non-blocking reads via poll registration instead of fiber-blocking recv calls
  • Fixes a bug in ParseRedis() where buffer consumption was incorrectly calculated when preemption occurred mid-parsing
  • Adds GitHub workflow for regression testing with IoLoopV2 enabled, excluding TLS tests

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.

File Description
src/facade/dragonfly_connection.h Adds declarations for DoReadOnRecv(), IoLoopV2(), and synchronization primitives (io_ec_, io_event_) needed for asynchronous I/O
src/facade/dragonfly_connection.cc Implements the new asynchronous I/O loop with poll-based readiness notifications, fixes buffer consumption tracking in ParseRedis(), and integrates the new loop as an experimental opt-in feature
.github/workflows/ioloop-v2-regtests.yml Adds manual workflow for regression testing with IoLoopV2 enabled, excluding TLS tests
.github/actions/regression-tests/action.yml Updates regression test action to support passing experimental flags to dragonfly binary

aws-access-key-id: ${{ secrets.AWS_S3_ACCESS_KEY }}
aws-secret-access-key: ${{ secrets.AWS_S3_ACCESS_SECRET }}
s3-bucket: ${{ secrets.S3_REGTEST_BUCKET }}
df-arg: "expiremental_io_loop_v2"
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent spelling: "expiremental_io_loop_v2" should be "experimental_io_loop_v2" to match the spelling in the action.yml file (line 61).

Suggested change
df-arg: "expiremental_io_loop_v2"
df-arg: "experimental_io_loop_v2"

Copilot uses AI. Check for mistakes.
}

if (io_buf_.AppendLen() == 0) {
// We will regrow in IoLoop
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment says "We will regrow in IoLoop" but this method is called from IoLoopV2, not IoLoop. The comment should be updated to say "We will regrow in IoLoopV2" for clarity.

Suggested change
// We will regrow in IoLoop
// We will regrow in IoLoopV2

Copilot uses AI. Check for mistakes.
Comment on lines 2241 to 2311
size_t max_iobfuf_len = GetFlag(FLAGS_max_client_iobuf_len);

auto* peer = socket_.get();
recv_buf_.res_len = 0;

// TODO EnableRecvMultishot

peer->RegisterOnRecv([this](const FiberSocketBase::RecvNotification& n) {
DoReadOnRecv(n);
io_event_.notify_one();
});

do {
HandleMigrateRequest();

// Poll again for readiness. The event handler registered above is edge triggered
// (called once per socket readiness event). So, for example, it could be that the
// cb read less data than it is available because of io_buf_ capacity. If after
// an iteration the fiber does not poll the socket for more data it might deadlock.
// TODO maybe use a flag instead of a poll
DoReadOnRecv(FiberSocketBase::RecvNotification{true});
fb2::NoOpLock noop;
io_event_.wait(
noop, [this]() { return io_buf_.InputLen() > 0 || io_ec_ || io_buf_.AppendLen() == 0; });

if (io_ec_) {
LOG_IF(WARNING, cntx()->replica_conn) << "async io error: " << io_ec_;
return std::exchange(io_ec_, {});
}

phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;

if (io_buf_.InputLen() > 0) {
if (redis_parser_) {
parse_status = ParseRedis(max_busy_read_cycles_cached);
} else {
DCHECK(memcache_parser_);
parse_status = ParseMemcache();
}
} else {
parse_status = NEED_MORE;
DCHECK(io_buf_.AppendLen() == 0);
}

if (reply_builder_->GetError()) {
return reply_builder_->GetError();
}

if (parse_status == NEED_MORE) {
parse_status = OK;

size_t capacity = io_buf_.Capacity();
if (capacity < max_iobfuf_len) {
size_t parser_hint = 0;
if (redis_parser_)
parser_hint = redis_parser_->parselen_hint(); // Could be done for MC as well.

// If we got a partial request and we managed to parse its
// length, make sure we have space to store it instead of
// increasing space incrementally.
// (Note: The buffer object is only working in power-of-2 sizes,
// so there's no danger of accidental O(n^2) behavior.)
if (parser_hint > capacity) {
UpdateIoBufCapacity(io_buf_, stats_,
[&]() { io_buf_.Reserve(std::min(max_iobfuf_len, parser_hint)); });
}

// If we got a partial request because iobuf was full, grow it up to
// a reasonable limit to save on Recv() calls.
if (is_iobuf_full && capacity < max_iobfuf_len / 2) {
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in variable name: "max_iobfuf_len" should be "max_iobuf_len" (missing 'u'). This typo is used consistently in lines 2241, 2294, 2306, and 2311.

Copilot uses AI. Check for mistakes.
// if (std::holds_alternative<io::MutableBytes>(n.read_result))
using RecvNot = util::FiberSocketBase::RecvNotification::RecvCompletion;
if (std::holds_alternative<RecvNot>(n.read_result)) {
if (!std::get<bool>(n.read_result)) {
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent type handling: Line 2193 defines RecvNot as an alias for RecvNotification::RecvCompletion and line 2194 checks for this type, but line 2195 uses std::get<bool> instead of std::get<RecvNot>. Either use RecvNot consistently or remove the unnecessary typedef if RecvCompletion is indeed just bool.

Suggested change
if (!std::get<bool>(n.read_result)) {
if (!std::get<RecvNot>(n.read_result)) {

Copilot uses AI. Check for mistakes.
return;
}

DCHECK(false) << "Sould not reach here";
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in error message: "Sould" should be "Should".

Suggested change
DCHECK(false) << "Sould not reach here";
DCHECK(false) << "Should not reach here";

Copilot uses AI. Check for mistakes.
"If non-zero, waits for this time for more I/O "
" events to come for the connection in case there is only one command in the pipeline. ");

ABSL_FLAG(bool, expiremental_io_loop_v2, false, "new io loop");
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent spelling: flag is named "expiremental_io_loop_v2" (should be "experimental_io_loop_v2").

Suggested change
ABSL_FLAG(bool, expiremental_io_loop_v2, false, "new io loop");
ABSL_FLAG(bool, experimental_io_loop_v2, false, "new io loop");

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Integrate and test OnRecv in ConnFb

3 participants