diff --git a/CMakeLists.txt b/CMakeLists.txt index fef953244..221763d80 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,8 @@ set(ELLIPTICS_VERSION "${ELLIPTICS_VERSION_ABI}.${ELLIPTICS_VERSION_MINOR}") set(Boost_NO_BOOST_CMAKE TRUE) set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/Modules/") -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x -std=gnu++0x") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x -std=gnu++0x -Wfatal-errors") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wfatal-errors") include(CheckLargefile) include(CheckAtomic) include(CheckSendfile) diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt index 8493bbf25..c2cb1e843 100644 --- a/bindings/cpp/CMakeLists.txt +++ b/bindings/cpp/CMakeLists.txt @@ -39,6 +39,7 @@ set(ELLIPTICS_CLIENT_SRCS ../../library/request_queue.cpp ../../library/rbtree.c ../../library/trans.c + ../../library/trans.cpp ../../library/tests.c ../../library/common.cpp ../../library/access_context.cpp diff --git a/bindings/cpp/async_result.cpp b/bindings/cpp/async_result.cpp index 1a96a2703..8802b33e2 100644 --- a/bindings/cpp/async_result.cpp +++ b/bindings/cpp/async_result.cpp @@ -199,7 +199,10 @@ bool async_result::get(T &entry) { wait(session::throw_at_get); for (auto it = m_data->results.begin(); it != m_data->results.end(); ++it) { - if (it->status() == 0 && !it->data().empty()) { + bool is_valuable_entry = it->tmp_is_n2_protocol() + ? (it->status() == 0 && !it->data().empty()) + : (it->is_valid() && it->status() == 0); + if (is_valuable_entry) { entry = *it; return true; } diff --git a/bindings/cpp/callback.cpp b/bindings/cpp/callback.cpp index 7e8b909ae..7e773f7f0 100644 --- a/bindings/cpp/callback.cpp +++ b/bindings/cpp/callback.cpp @@ -40,24 +40,41 @@ class basic_handler return 0; } + // Used by old (protocol-dependent) mechanic, must be removed after refactoring basic_handler(std::unique_ptr logger, async_generic_result &result) : m_logger(std::move(logger)), m_handler(result), m_completed(0), m_total(0) { + memset(&m_addr, 0, sizeof(dnet_addr)); + memset(&m_cmd, 0, sizeof(dnet_cmd)); } - bool handle(dnet_addr *addr, dnet_cmd *cmd) + basic_handler(const dnet_cmd &cmd, std::unique_ptr logger, async_generic_result &result) : + m_cmd(cmd), + m_logger(std::move(logger)), + m_handler(result), m_completed(0), m_total(0) { - if (is_trans_destroyed(cmd)) { - return increment_completed(); - } + memset(&m_addr, 0, sizeof(dnet_addr)); + } + void log_reply_info(dnet_addr *addr, dnet_cmd *cmd) + { DNET_LOG(m_logger, cmd->status ? DNET_LOG_ERROR : DNET_LOG_NOTICE, "{}: {}: handled reply from: {}, " "trans: {}, cflags: {}, status: {}, " "size: {}, client: {}, last: {}", dnet_dump_id(&cmd->id), dnet_cmd_string(cmd->cmd), addr ? dnet_addr_string(addr) : "", cmd->trans, dnet_flags_dump_cflags(cmd->flags), int(cmd->status), cmd->size, !(cmd->flags & DNET_FLAGS_REPLY), !(cmd->flags & DNET_FLAGS_MORE)); + } + + // Used by old (protocol-dependent) mechanic, must be removed after refactoring + bool handle(dnet_addr *addr, dnet_cmd *cmd) + { + if (is_trans_destroyed(cmd)) { + return increment_completed(); + } + + log_reply_info(addr, cmd); auto data = std::make_shared(addr, cmd); @@ -71,6 +88,33 @@ class basic_handler return false; } + int on_reply(const std::shared_ptr &result, bool is_last) + { + // TODO(sabramkin): Output only protocol-independent known info (currently old-mechanic logging used) + log_reply_info(&m_addr, &m_cmd); + + auto data = std::make_shared(m_addr, m_cmd, result, 0, is_last); + callback_result_entry entry(data); + m_handler.process(entry); + + increment_completed(); // TODO(sabramkin): correctly process trans destroying + return 0; + } + + int on_reply_error(int err, bool is_last) + { + // TODO(sabramkin): Output only protocol-independent known info (currently old-mechanic logging used) + log_reply_info(&m_addr, &m_cmd); + + auto data = std::make_shared(m_addr, m_cmd, nullptr, err, is_last); + data->error = create_error(err, "n2 lookup_new error"); // TODO(sabramkin): rework error + callback_result_entry entry(data); + m_handler.process(entry); + + increment_completed(); // TODO(sabramkin): correctly process trans destroying + return 0; + } + // how many independent transactions share this handler plus call below // call below and corresponding +1 is needed, since transactions can be completed // before send_impl() calls this method to setup this 'reference counter' @@ -92,6 +136,11 @@ class basic_handler return false; } +public: + dnet_addr m_addr; + +private: + dnet_cmd m_cmd; std::unique_ptr m_logger; async_result_handler m_handler; std::atomic_size_t m_completed; @@ -106,7 +155,6 @@ async_generic_result send_impl(session &sess, T &control, Method method) async_generic_result result(sess); detail::basic_handler *handler = new detail::basic_handler(sess.get_logger(), result); - control.complete = detail::basic_handler::handler; control.priv = handler; @@ -142,6 +190,59 @@ async_generic_result send_to_single_state(session &sess, dnet_io_control &contro return send_impl(sess, control, send_to_single_state_io_impl); } +template +async_generic_result n2_send_impl(session &sess, const n2_request &request, Method method) +{ + async_generic_result result(sess); + + auto handler = std::make_shared(request.cmd, sess.get_logger(), result); + + auto calls_counter = std::make_shared>(false); + auto test_and_set_reply_has_sent = [calls_counter](bool last) { + if (last) { + return calls_counter->exchange(true); + } else { + return bool(*calls_counter); + } + }; + + n2_request_info request_info{ request, n2_repliers() }; + + request_info.repliers.on_reply = + [handler, test_and_set_reply_has_sent](const std::shared_ptr &result, bool last){ + if (test_and_set_reply_has_sent(last)) { + return -EALREADY; + } + + return handler->on_reply(result, last); + }; + request_info.repliers.on_reply_error = + [handler, test_and_set_reply_has_sent](int err, bool last){ + if (test_and_set_reply_has_sent(last)) { + return -EALREADY; + } + + return handler->on_reply_error(err, last); + }; + + const size_t count = method(sess, std::move(request_info), handler->m_addr); + handler->set_total(count); + return result; +} + +int n2_trans_alloc_send(dnet_session *s, n2_request_info &&request_info, dnet_addr &addr_out); // implemented in trans.cpp + +static size_t n2_send_to_single_state_impl(session &sess, n2_request_info &&request_info, dnet_addr &addr_out) +{ + n2_trans_alloc_send(sess.get_native(), std::move(request_info), addr_out); + return 1; +} + +async_generic_result n2_send_to_single_state(session &sess, const n2_request &request) +{ + return n2_send_impl(sess, request, n2_send_to_single_state_impl); +} + static size_t send_to_each_backend_impl(session &sess, dnet_trans_control &ctl) { return dnet_request_cmd(sess.get_native(), &ctl); diff --git a/bindings/cpp/callback_p.h b/bindings/cpp/callback_p.h index 5d5304191..e7c5719fb 100644 --- a/bindings/cpp/callback_p.h +++ b/bindings/cpp/callback_p.h @@ -30,6 +30,7 @@ #include #include "elliptics/async_result_cast.hpp" +#include "library/n2_protocol.hpp" namespace ioremap { namespace elliptics { @@ -63,30 +64,200 @@ class session_scope uint32_t m_policy; }; -class callback_result_data +// TODO(sabramkin): This abstraction is temporary and used while refactoring in progress. +// TODO(sabramkin): After refactoring only n2_callback_result_data should stay, no base is needed. +class callback_result_data_base +{ + public: + virtual dnet_addr *address() const = 0; + virtual dnet_cmd *command() const = 0; + virtual int status() const = 0; + virtual bool is_valid() const = 0; + virtual bool is_ack() const = 0; + virtual bool is_final() const = 0; + virtual bool is_client() const = 0; + + virtual ~callback_result_data_base() = default; + + error_info error; + + // Hint to determine derived without dynamic_cast. Reason: lookup_result_entry is used by two handlers: + // session::lookup and session::write. The first one is converted to protocol-independent, but the + // second one isn't. + bool tmp_is_n2_protocol; +}; + +#define DNET_DATA_BEGIN_2() try { \ + do {} while (false) + +#define DNET_DATA_END_2(SIZE) \ + } catch (not_found_error &) { \ + if (!is_valid()) { \ + throw_error(-ENOENT, "entry::%s(): entry is null", __FUNCTION__); \ + } else {\ + dnet_cmd *cmd = command(); \ + throw_error(-ENOENT, cmd->id, "entry::%s(): data.size is too small, expected: %zu, actual: %zu, status: %d", \ + __FUNCTION__, size_t(SIZE), data().size(), cmd->status); \ + } \ + throw; \ + } \ + do {} while (false) + +class callback_result_data : public callback_result_data_base { public: callback_result_data() { + tmp_is_n2_protocol = false; } callback_result_data(const dnet_addr *addr, const dnet_cmd *cmd) { + tmp_is_n2_protocol = false; + const size_t size = sizeof(dnet_addr) + sizeof(dnet_cmd) + cmd->size; - data = data_pointer::allocate(size); + raw_data = data_pointer::allocate(size); if (addr) - memcpy(data.data(), addr, sizeof(dnet_addr)); + memcpy(raw_data.data(), addr, sizeof(dnet_addr)); else - memset(data.data(), 0, sizeof(dnet_addr)); - memcpy(data.data() + sizeof(dnet_addr), cmd, sizeof(dnet_cmd) + cmd->size); + memset(raw_data.data(), 0, sizeof(dnet_addr)); + memcpy(raw_data.data() + sizeof(dnet_addr), cmd, sizeof(dnet_cmd) + cmd->size); } - virtual ~callback_result_data() + bool is_valid() const override { + return !raw_data.empty(); } - data_pointer data; - error_info error; + bool is_ack() const override + { + //printf("DBG status %d, size %d, index %d, empty %d\n", int(status()), int(data.size()), int(data.offset()), int(data.empty())); + return status() == 0 && data().empty(); + } + + bool is_final() const override + { + return !(command()->flags & DNET_FLAGS_MORE); + } + + bool is_client() const override + { + return !(command()->flags & DNET_FLAGS_REPLY); + } + + int status() const override + { + return command()->status; + } + + dnet_addr *address() const override + { + DNET_DATA_BEGIN_2(); + return raw_data + .data(); + DNET_DATA_END_2(0); + } + + dnet_cmd *command() const override + { + DNET_DATA_BEGIN_2(); + return raw_data + .skip() + .data(); + DNET_DATA_END_2(0); + } + + data_pointer data() const + { + DNET_DATA_BEGIN_2(); + return raw_data + .skip() + .skip(); + DNET_DATA_END_2(0); + } + + uint64_t size() const + { + constexpr size_t headers_size = sizeof(struct dnet_addr) + sizeof(struct dnet_cmd); + return std::max(raw_data.size(), headers_size) - headers_size; + } + + data_pointer raw_data; +}; + +class n2_callback_result_data : public callback_result_data_base +{ + public: + n2_callback_result_data() + : is_result_assigned(false) + { + tmp_is_n2_protocol = true; + } + + n2_callback_result_data(const dnet_addr &addr_in, const dnet_cmd &cmd_in, + const std::shared_ptr &result_body_in, int result_status_in, + bool is_last_in) + : addr(addr_in) + , cmd(cmd_in) + , is_result_assigned(true) + , result_body(result_body_in) + , result_status(result_status_in) + , is_last(is_last_in) + { + tmp_is_n2_protocol = true; + + // TODO(sabramkin): + // Here is emulated protocol logic for single-response commands. It is hardcode that we must + // resolve when we introduce bulk commands. See also is_final() method. Note that protocol + // mustn't provide its inner structures (such as dnet_cmd), so we must remove command() method + // in the future, and must remove cmd member. + cmd.flags = (cmd.flags & ~(DNET_FLAGS_NEED_ACK)) | DNET_FLAGS_REPLY; + cmd.status = result_status_in; + } + + dnet_addr *address() const override + { + return const_cast(&addr); + } + + dnet_cmd *command() const override + { + return const_cast(&cmd); + } + + int status() const override + { + return result_status; + } + + bool is_valid() const override + { + return is_result_assigned; + } + + bool is_ack() const override + { + return result_status == 0 && !result_body; + } + + bool is_final() const override + { + return is_last; + } + + bool is_client() const override + { + return false; + } + + dnet_addr addr; + dnet_cmd cmd; + + // Either result or nonzero result_status must be set + bool is_result_assigned; + std::shared_ptr result_body; + int result_status; + bool is_last; }; struct dnet_net_state_deleter @@ -103,6 +274,7 @@ typedef std::unique_ptr net_state_ptr; // Send request to specific state async_generic_result send_to_single_state(session &sess, const transport_control &control); async_generic_result send_to_single_state(session &sess, dnet_io_control &control); +async_generic_result n2_send_to_single_state(session &sess, const n2_request &request); // Send request to each backend async_generic_result send_to_each_backend(session &sess, const transport_control &control); diff --git a/bindings/cpp/newapi/result_entry.cpp b/bindings/cpp/newapi/result_entry.cpp index 7077b33d7..17f3b50b2 100644 --- a/bindings/cpp/newapi/result_entry.cpp +++ b/bindings/cpp/newapi/result_entry.cpp @@ -1,4 +1,6 @@ #include "elliptics/newapi/result_entry.hpp" +#include "bindings/cpp/callback_p.h" +#include "library/n2_protocol.hpp" #include "library/protocol.hpp" #include "library/elliptics.h" @@ -21,13 +23,41 @@ bool callback_result_entry::empty() const { } std::string lookup_result_entry::path() const { - dnet_lookup_response response; + if (tmp_is_n2_protocol()) { + auto &response = *static_cast(body()); + return response.path; + } + // fallback for using lookup_result_entry with session::write + dnet_lookup_response response; deserialize(raw_data(), response); return response.path; } dnet_record_info lookup_result_entry::record_info() const { + if (tmp_is_n2_protocol()) { + auto &response = *static_cast(body()); + + dnet_record_info info; + memset(&info, 0, sizeof(info)); + + info.record_flags = response.record_flags; + info.user_flags = response.user_flags; + + info.json_timestamp = response.json_timestamp; + info.json_offset = response.json_offset; + info.json_size = response.json_size; + info.json_capacity = response.json_capacity; + + info.data_timestamp = response.data_timestamp; + info.data_offset = response.data_offset; + info.data_size = response.data_size; + + return info; + } + + // fallback for using lookup_result_entry with session::write + dnet_record_info info; memset(&info, 0, sizeof(info)); @@ -61,16 +91,26 @@ lookup_result_entry::checksum_t convert_checksum(const std::vector(body()); + return convert_checksum(response.json_checksum); + } + + // fallback for using lookup_result_entry with session::write dnet_lookup_response response; deserialize(raw_data(), response); - return convert_checksum(response.json_checksum); } lookup_result_entry::checksum_t lookup_result_entry::data_checksum() const { + if (tmp_is_n2_protocol()) { + auto &response = *static_cast(body()); + return convert_checksum(response.data_checksum); + } + + // fallback for using lookup_result_entry with session::write dnet_lookup_response response; deserialize(raw_data(), response); - return convert_checksum(response.data_checksum); } diff --git a/bindings/cpp/newapi/session.cpp b/bindings/cpp/newapi/session.cpp index 10383bbfb..2be03eed8 100644 --- a/bindings/cpp/newapi/session.cpp +++ b/bindings/cpp/newapi/session.cpp @@ -14,6 +14,7 @@ #include "library/access_context.h" #include "library/common.hpp" #include "library/elliptics.h" +#include "library/n2_protocol.hpp" #include "library/protocol.hpp" #include "bindings/cpp/functional_p.h" @@ -103,19 +104,20 @@ class lookup_handler : public std::enable_shared_from_this { inner_handler(const session &s, const async_lookup_result &result, std::vector &&groups, - const dnet_trans_control &control) + n2_request &&request) : parent_type(s, result, std::move(groups)) - , m_control(control) { + , m_request(std::move(request)) { } protected: async_generic_result send_to_next_group() override { - m_control.id.group_id = current_group(); - return send_to_single_state(m_sess, m_control); + // TODO(sabramkin): control'll be mutated each time, think about copy? + m_request.cmd.id.group_id = current_group(); + return n2_send_to_single_state(m_sess, m_request); } private: - dnet_trans_control m_control; + n2_request m_request; }; public: @@ -128,17 +130,18 @@ class lookup_handler : public std::enable_shared_from_this { m_handler.set_total(1); } - void start(std::vector &&groups, const transport_control &control) { + void start(std::vector &&groups, n2_request &&request) { + dnet_cmd &cmd = request.cmd; DNET_LOG_INFO(m_log, "{}: {}: started: groups: {}, cflags: {}", dnet_dump_id_str(m_key.raw_id().id), - dnet_cmd_string(control.get_native().cmd), groups, - dnet_flags_dump_cflags(control.get_native().cflags)); + dnet_cmd_string(cmd.cmd), groups, + dnet_flags_dump_cflags(cmd.flags)); m_context.reset(new dnet_access_context(m_session.get_native_node())); if (m_context) { - m_context->add({{"cmd", std::string(dnet_cmd_string(control.get_native().cmd))}, + m_context->add({{"cmd", std::string(dnet_cmd_string(cmd.cmd))}, {"id", std::string(dnet_dump_id_str(m_key.id().id))}, {"access", "client"}, - {"cflags", std::string(dnet_flags_dump_cflags(control.get_native().cflags))}, + {"cflags", std::string(dnet_flags_dump_cflags(cmd.flags))}, {"trace_id", to_hex_string(m_session.get_trace_id())}, }); } @@ -147,7 +150,7 @@ class lookup_handler : public std::enable_shared_from_this { async_lookup_result result{m_session}; auto handler = std::make_shared(m_session, result, std::move(groups), - control.get_native()); + std::move(request)); handler->set_total(m_handler.get_total()); handler->start(); result.connect( @@ -191,14 +194,17 @@ async_lookup_result session::lookup(const key &id) { DNET_SESSION_GET_GROUPS(async_lookup_result); transform(id); - transport_control control; - control.set_key(id.id()); - control.set_command(DNET_CMD_LOOKUP_NEW); - control.set_cflags(get_cflags() | DNET_FLAGS_NEED_ACK); + dnet_cmd cmd; + memset(&cmd, 0, sizeof(dnet_cmd)); + cmd.id = id.id(); + cmd.cmd = DNET_CMD_LOOKUP_NEW; + cmd.flags = get_cflags() | DNET_FLAGS_NEED_ACK; + + n2_request request(cmd, ioremap::elliptics::n2::default_deadline()); async_lookup_result result(*this); auto handler = std::make_shared(*this, result, id); - handler->start(std::move(groups), control.get_native()); + handler->start(std::move(groups), std::move(request)); return result; } diff --git a/bindings/cpp/result_entry.cpp b/bindings/cpp/result_entry.cpp index 18b833433..5b8a2326b 100644 --- a/bindings/cpp/result_entry.cpp +++ b/bindings/cpp/result_entry.cpp @@ -23,6 +23,19 @@ #include #include #include +#include + +template T &dynamic_cast_checked(Base &base, const char *file, int line) { + try { + return dynamic_cast(base); + } catch (const std::bad_cast &) { + std::cout << getpid() << ": BAD_CAST: " << file << ":" << line << std::endl; + abort(); + return *(T *)0; + } +} + +#define dynamic_cast2(T, b) dynamic_cast_checked(b, __FILE__, __LINE__); namespace ioremap { namespace elliptics { @@ -46,6 +59,7 @@ namespace ioremap { namespace elliptics { } \ do {} while (false) +// TODO(sabramkin): this is default constructor that operates with deprecated callback_result_data callback_result_entry::callback_result_entry() : m_data(std::make_shared()) { } @@ -54,7 +68,7 @@ callback_result_entry::callback_result_entry(const callback_result_entry &other) { } -callback_result_entry::callback_result_entry(const std::shared_ptr &data) : m_data(data) +callback_result_entry::callback_result_entry(const std::shared_ptr &data) : m_data(data) { } @@ -70,27 +84,27 @@ callback_result_entry &callback_result_entry::operator =(const callback_result_e bool callback_result_entry::is_valid() const { - return !m_data->data.empty(); + return m_data->is_valid(); } bool callback_result_entry::is_ack() const { - return status() == 0 && data().empty(); + return m_data->is_ack(); } bool callback_result_entry::is_final() const { - return !(command()->flags & DNET_FLAGS_MORE); + return m_data->is_final(); } bool callback_result_entry::is_client() const { - return !(command()->flags & DNET_FLAGS_REPLY); + return m_data->is_client(); } int callback_result_entry::status() const { - return command()->status; + return m_data->status(); } error_info callback_result_entry::error() const @@ -100,40 +114,41 @@ error_info callback_result_entry::error() const data_pointer callback_result_entry::raw_data() const { - return m_data->data; + auto &old_data = dynamic_cast2(callback_result_data, *m_data); + return old_data.raw_data; } struct dnet_addr *callback_result_entry::address() const { - DNET_DATA_BEGIN(); - return m_data->data - .data(); - DNET_DATA_END(0); + return m_data->address(); } struct dnet_cmd *callback_result_entry::command() const { - DNET_DATA_BEGIN(); - return m_data->data - .skip() - .data(); - DNET_DATA_END(0); + return m_data->command(); } data_pointer callback_result_entry::data() const { - DNET_DATA_BEGIN(); - return m_data->data - .skip() - .skip(); - DNET_DATA_END(0); + auto &old_data = dynamic_cast2(callback_result_data, *m_data); + return old_data.data(); } uint64_t callback_result_entry::size() const { - return (m_data->data.size() <= (sizeof(struct dnet_addr) + sizeof(struct dnet_cmd))) - ? (0) - : (m_data->data.size() - (sizeof(struct dnet_addr) + sizeof(struct dnet_cmd))); + auto &old_data = dynamic_cast2(callback_result_data, *m_data); + return old_data.size(); +} + +n2_body *callback_result_entry::body() const +{ + auto &n2_data = dynamic_cast2(n2_callback_result_data, *m_data); + return n2_data.result_body.get(); +} + +bool callback_result_entry::tmp_is_n2_protocol() const +{ + return m_data->tmp_is_n2_protocol; } read_result_entry::read_result_entry() diff --git a/bindings/cpp/session.cpp b/bindings/cpp/session.cpp index 64d9f9e7a..003822c65 100644 --- a/bindings/cpp/session.cpp +++ b/bindings/cpp/session.cpp @@ -209,7 +209,13 @@ const dnet_addr &address::to_raw() const namespace filters { bool positive(const callback_result_entry &entry) { - return entry.status() == 0 && !entry.data().empty(); + if (entry.tmp_is_n2_protocol()) { + // in new mechanic protocol interface won't provide explicit ack + return entry.status() == 0; + } else { + // legacy, hasn't been touched + return entry.status() == 0 && !entry.data().empty(); + } } bool positive_with_ack(const callback_result_entry &entry) @@ -224,12 +230,24 @@ bool negative(const callback_result_entry &entry) bool negative_with_ack(const callback_result_entry &entry) { - return entry.status() != 0 || entry.data().empty(); + if (entry.tmp_is_n2_protocol()) { + // in new mechanic protocol interface won't provide explicit ack + return entry.status() != 0; + } else { + // legacy, hasn't been touched + return entry.status() != 0 || entry.data().empty(); + } } bool all(const callback_result_entry &entry) { - return entry.status() != 0 || !entry.data().empty(); + if (entry.tmp_is_n2_protocol()) { + // in new mechanic protocol interface won't provide explicit ack + return true; + } else { + // legacy, hasn't been touched + return entry.status() != 0 || !entry.data().empty(); + } } bool all_with_ack(const callback_result_entry &entry) @@ -1087,7 +1105,7 @@ struct cas_functor : std::enable_shared_from_this cmd.cmd = DNET_CMD_WRITE; auto data = std::make_shared(&addr, &cmd); - callback_result_entry entry = data; + callback_result_entry entry(data); handler.process(*static_cast(&entry)); handler.complete(error_info()); return; diff --git a/include/elliptics/result_entry.hpp b/include/elliptics/result_entry.hpp index caeb1eb95..bd6df98b1 100644 --- a/include/elliptics/result_entry.hpp +++ b/include/elliptics/result_entry.hpp @@ -23,17 +23,19 @@ #include #include +class n2_body; + namespace ioremap { namespace elliptics { -class callback_result_data; +class callback_result_data_base; class callback_result_entry { public: callback_result_entry(); callback_result_entry(const callback_result_entry &other); - callback_result_entry(const std::shared_ptr &data); - ~callback_result_entry(); + explicit callback_result_entry(const std::shared_ptr &data); + virtual ~callback_result_entry(); callback_result_entry &operator =(const callback_result_entry &other); @@ -49,17 +51,26 @@ class callback_result_entry int status() const; //! Error info for this package if exists error_info error() const; - data_pointer raw_data() const; + struct dnet_addr *address() const; struct dnet_cmd *command() const; + + // TODO(sabramkin): Deprecated methods, for usage with callback_result_data (old) only + data_pointer raw_data() const; data_pointer data() const; uint64_t size() const; template inline T *data() const { return data().data(); } + // TODO(sabramkin): New methods, for usage with n2_callback_result_data (new) only + n2_body *body() const; + + // TODO(sabramkin): Hint to determine what methods to use: new or deprecated + bool tmp_is_n2_protocol() const; + protected: - std::shared_ptr m_data; + std::shared_ptr m_data; }; class read_result_entry : public callback_result_entry diff --git a/library/elliptics.h b/library/elliptics.h index 14d0baa0a..392e111a5 100644 --- a/library/elliptics.h +++ b/library/elliptics.h @@ -811,7 +811,7 @@ int dnet_trans_send(struct dnet_trans *t, struct dnet_io_req *req); int dnet_trans_forward(struct dnet_io_req *r, struct dnet_net_state *orig, struct dnet_net_state *forward); int n2_trans_forward(struct n2_request_info *request_info, struct dnet_net_state *orig, struct dnet_net_state *forward); -int n2_complete_trans_via_response_holder(struct dnet_trans *t, struct n2_response_info *response_info); +int n2_complete_trans_via_response_holder(struct dnet_node *n, struct n2_response_info *response_info); void dnet_io_req_enqueue_net(struct dnet_net_state *st, struct dnet_io_req *r); diff --git a/library/native_protocol/native_protocol.cpp b/library/native_protocol/native_protocol.cpp index cbe9b4398..7e8be166e 100644 --- a/library/native_protocol/native_protocol.cpp +++ b/library/native_protocol/native_protocol.cpp @@ -26,8 +26,9 @@ int protocol::send_request(dnet_net_state *st, std::unique_ptr t(dnet_trans_search(st, cmd.trans), &dnet_trans_put); - if (!t || !t->repliers) + if (!t || !t->repliers) { return -EINVAL; + } *t->repliers = std::move(repliers); } @@ -84,6 +85,10 @@ int protocol::recv_response(dnet_net_state *st, const dnet_cmd &cmd, data_pointe n2_repliers &repliers = *t->repliers; bool last = !(cmd.flags & DNET_FLAGS_MORE); + if (last) { + t->cmd = cmd; + } + if (cmd.status) { return repliers.on_reply_error(cmd.status, last); } diff --git a/library/native_protocol/repliers.hpp b/library/native_protocol/repliers.hpp index 171dddba1..51e728757 100644 --- a/library/native_protocol/repliers.hpp +++ b/library/native_protocol/repliers.hpp @@ -20,6 +20,7 @@ namespace ioremap { namespace elliptics { namespace native { class replier_base { public: replier_base(dnet_net_state *st, const dnet_cmd &cmd); + virtual ~replier_base() = default; int reply(const std::shared_ptr &msg, bool last); int reply_error(int errc, bool last); diff --git a/library/net.c b/library/net.c index 1769d350c..d67cf06fe 100644 --- a/library/net.c +++ b/library/net.c @@ -655,59 +655,65 @@ static int dnet_process_reply(struct dnet_net_state *st, struct dnet_io_req *r) } pthread_mutex_unlock(&st->trans_lock); - if (!t) { - dnet_log(n, DNET_LOG_ERROR, "%s: could not find transaction for reply: trans %llu", - dnet_dump_id(&cmd->id), (unsigned long long)tid); - err = 0; - goto out; - } - - ++t->stats.recv_replies; - t->stats.recv_size += sizeof(struct dnet_cmd) + cmd->size; // TODO: replace protocol-dependent behavior - t->stats.recv_queue_time += r->queue_time; - t->stats.recv_time += r->recv_time; - - if (t->complete) { - if (t->command == DNET_CMD_READ || t->command == DNET_CMD_READ_NEW) { - uint64_t ioflags = 0; - if ((t->command == DNET_CMD_READ) && - (cmd->size >= sizeof(struct dnet_io_attr)) && - (t->alloc_size >= sizeof(struct dnet_cmd) + sizeof(struct dnet_io_attr))) { - struct dnet_io_attr *recv_io = (struct dnet_io_attr *)(cmd + 1); - - struct dnet_cmd *local_cmd = (struct dnet_cmd *)(t + 1); - struct dnet_io_attr *local_io = (struct dnet_io_attr *)(local_cmd + 1); - - ioflags = local_io->flags = recv_io->flags; - local_io->size = recv_io->size; - local_io->offset = recv_io->offset; - local_io->user_flags = recv_io->user_flags; - local_io->total_size = recv_io->total_size; - local_io->timestamp = recv_io->timestamp; - - dnet_convert_io_attr(local_io); + if (t) { + ++t->stats.recv_replies; + t->stats.recv_size += sizeof(struct dnet_cmd) + cmd->size; // TODO: replace protocol-dependent behavior + t->stats.recv_queue_time += r->queue_time; + t->stats.recv_time += r->recv_time; + + if (t->complete) { + if (t->command == DNET_CMD_READ || t->command == DNET_CMD_READ_NEW) { + uint64_t ioflags = 0; + if ((t->command == DNET_CMD_READ) && + (cmd->size >= sizeof(struct dnet_io_attr)) && + (t->alloc_size >= sizeof(struct dnet_cmd) + sizeof(struct dnet_io_attr))) { + struct dnet_io_attr *recv_io = (struct dnet_io_attr *)(cmd + 1); + + struct dnet_cmd *local_cmd = (struct dnet_cmd *)(t + 1); + struct dnet_io_attr *local_io = (struct dnet_io_attr *)(local_cmd + 1); + + ioflags = local_io->flags = recv_io->flags; + local_io->size = recv_io->size; + local_io->offset = recv_io->offset; + local_io->user_flags = recv_io->user_flags; + local_io->total_size = recv_io->total_size; + local_io->timestamp = recv_io->timestamp; + + dnet_convert_io_attr(local_io); + } + + if (st && !(flags & DNET_FLAGS_MORE)) { + struct timespec ts; + long diff; + + clock_gettime(CLOCK_MONOTONIC_RAW, &ts); + diff = DIFF_TIMESPEC(t->start_ts, ts); + + dnet_update_backend_weight(st, cmd, ioflags, diff); + } } - if (st && !(flags & DNET_FLAGS_MORE)) { - struct timespec ts; - long diff; - - clock_gettime(CLOCK_MONOTONIC_RAW, &ts); - diff = DIFF_TIMESPEC(t->start_ts, ts); - - dnet_update_backend_weight(st, cmd, ioflags, diff); - } + t->complete(dnet_state_addr(t->st), cmd, t->priv); } + } - t->complete(dnet_state_addr(t->st), cmd, t->priv); + if (r->io_req_type == DNET_IO_REQ_TYPED_RESPONSE) { + n2_complete_trans_via_response_holder(n, r->response_info); } - if (t->repliers) { - n2_complete_trans_via_response_holder(t, r->response_info); + if (!t) { + dnet_log(n, DNET_LOG_ERROR, "%s: could not find transaction for reply: trans %llu", + dnet_dump_id(&cmd->id), (unsigned long long)tid); + err = 0; + goto out; } if (!(flags & DNET_FLAGS_MORE)) { + int t_cmd_status = t->cmd.status; memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd)); + if (t_cmd_status) + t->cmd.status = t_cmd_status; + dnet_trans_put(t); } else { /* diff --git a/library/net.cpp b/library/net.cpp index c4f62162a..2bbc25f45 100644 --- a/library/net.cpp +++ b/library/net.cpp @@ -1440,9 +1440,10 @@ dnet_cmd n2_convert_to_response_cmd(dnet_cmd cmd) { } n2_repliers n2_make_repliers_via_request_queue(dnet_net_state *st, const dnet_cmd &cmd, n2_repliers repliers) { - auto enqueue_response = [st, cmd = n2_convert_to_response_cmd(cmd)](std::function response_holder) { + auto enqueue_response = [st, cmd = n2_convert_to_response_cmd(cmd)](std::function response_holder, int status) { std::unique_ptr response_info(new n2_response_info{ cmd, std::move(response_holder) }); + response_info->cmd.status = status; auto r = static_cast(calloc(1, sizeof(dnet_io_req))); if (!r) @@ -1461,23 +1462,23 @@ n2_repliers n2_make_repliers_via_request_queue(dnet_net_state *st, const dnet_cm repliers_wrappers.on_reply_error = [on_reply_error = std::move(repliers.on_reply_error), enqueue_response](int errc, bool last) -> int { - return enqueue_response(std::bind(on_reply_error, errc, last)); + return enqueue_response(std::bind(on_reply_error, errc, last), errc); }; repliers_wrappers.on_reply = [on_reply = std::move(repliers.on_reply), enqueue_response](const std::shared_ptr &msg, bool last) -> int { - return enqueue_response(std::bind(on_reply, msg, last)); + return enqueue_response(std::bind(on_reply, msg, last), 0); }; return repliers_wrappers; } -int n2_complete_trans_via_response_holder(dnet_trans *t, n2_response_info *response_info) { - return c_exception_guard(response_info->response_holder, t->st->n, __FUNCTION__); +int n2_complete_trans_via_response_holder(dnet_node *n, n2_response_info *response_info) { + return c_exception_guard(response_info->response_holder, n, __FUNCTION__); } // TODO(sabramkin): Try rework to n2_trans_alloc_send. In new mechanic we don't need to separate alloc and send -static int n2_trans_send(dnet_trans *t, n2_request_info *request_info) { +int n2_trans_send(dnet_trans *t, n2_request_info *request_info) { using namespace ioremap::elliptics; struct dnet_net_state *st = t->st; @@ -1490,6 +1491,15 @@ static int n2_trans_send(dnet_trans *t, n2_request_info *request_info) { dnet_trans_put(t); } BOOST_SCOPE_EXIT_END + auto repliers_wrappers = n2_make_repliers_via_request_queue(st, + request_info->request.cmd, + std::move(request_info->repliers)); + // TODO(sabramkin): It's temporary solution, while transactions are managed outer of protocol. + // We cannot insert transaction to st->trans_root tree when transaction's repliers aren't set. So we must assign + // repliers here, not only relying on protocol::send_request. After refactoring is finished, repliers will be + // passed only to protocol::send_request. + *t->repliers = repliers_wrappers; + pthread_mutex_lock(&st->trans_lock); err = dnet_trans_insert_nolock(st, t); if (!err) { @@ -1504,10 +1514,6 @@ static int n2_trans_send(dnet_trans *t, n2_request_info *request_info) { test_settings.commands_mask & (1 << t->command)) return err; - auto repliers_wrappers = n2_make_repliers_via_request_queue(st, - request_info->request.cmd, - std::move(request_info->repliers)); - n2::net_state_get_protocol(st)->send_request(st, std::move(request_info->request), std::move(repliers_wrappers)); diff --git a/library/trans.c b/library/trans.c index f3e0cd11c..778b24b87 100644 --- a/library/trans.c +++ b/library/trans.c @@ -446,7 +446,7 @@ void dnet_trans_clean_list(struct list_head *head, int error) t->complete(dnet_state_addr(t->st), &t->cmd, t->priv); } if (t->repliers) { - n2_reply_error(t->repliers, error); + n2_reply_error(t->repliers, t->cmd.status); } dnet_trans_put(t); @@ -737,8 +737,10 @@ static int dnet_trans_convert_timed_out_to_responses(struct dnet_net_state *st, if (t->repliers) { // Protocol-independent mechanic + t->cmd.status = -ETIMEDOUT; + r->io_req_type = DNET_IO_REQ_TYPED_RESPONSE; - r->response_info = n2_response_info_create_from_error(&t->cmd, t->repliers, -ETIMEDOUT); + r->response_info = n2_response_info_create_from_error(&t->cmd, t->repliers, t->cmd.status); } else { // Old mechanic. TODO: remove then refactoring complete r->header = r + 1; @@ -788,7 +790,6 @@ static int dnet_trans_check_stall(struct dnet_net_state *st, struct list_head *t static void dnet_trans_enqueue_responses_on_timed_out(struct dnet_node *n, struct list_head *timeout_responses) { struct dnet_io_req *r, *tmp; - list_for_each_entry_safe(r, tmp, timeout_responses, req_entry) { list_del_init(&r->req_entry); diff --git a/library/trans.cpp b/library/trans.cpp new file mode 100644 index 000000000..c7be3970c --- /dev/null +++ b/library/trans.cpp @@ -0,0 +1,94 @@ +#include "elliptics.h" +#include "elliptics/packet.h" +#include "elliptics/interface.h" +#include "library/logger.hpp" +#include "library/n2_protocol.hpp" + +#include + +int n2_trans_send(dnet_trans *t, n2_request_info *request_info); // Implemented in net.cpp + +namespace ioremap { namespace elliptics { + +int n2_trans_send_fail(const n2_request_info &request_info, int err) +{ + return request_info.repliers.on_last_error(err); +} + +int n2_trans_alloc_send_state(dnet_session *s, dnet_net_state *st, n2_request_info &request_info) +{ + // TODO(sabramkin): share the logic with n2_trans_forward (a lot of common actions is done) + + dnet_node *n = st->n; + dnet_cmd *cmd = &request_info.request.cmd; + + std::unique_ptr + t(dnet_trans_alloc(n, 0), &dnet_trans_put); + if (!t) { + return n2_trans_send_fail(request_info, -ENOMEM); + } + + t->complete = nullptr; + t->priv = nullptr; + + const timespec *wait_ts = dnet_session_get_timeout(s); + t->wait_ts = *wait_ts; + request_info.request.deadline = {static_cast(wait_ts->tv_sec), + static_cast(wait_ts->tv_nsec)}; + cmd->flags |= dnet_session_get_cflags(s); + cmd->trace_id = dnet_session_get_trace_id(s); + if (cmd->flags & DNET_FLAGS_DIRECT_BACKEND) + cmd->backend_id = dnet_session_get_direct_backend(s); + + t->repliers = new n2_repliers; // Will be filled at native_protocol::send_request + + t->command = cmd->cmd; + cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans); + memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd)); + + t->st = dnet_state_get(st); + + DNET_LOG_INFO(n, "%s: %s: created %s", + dnet_dump_id(&cmd->id), + dnet_cmd_string(cmd->cmd), + dnet_print_trans(t.get())); + + // TODO(sabramkin): unique_ptr on trans? + int err = n2_trans_send(t.get(), &request_info); + if (err) + return n2_trans_send_fail(request_info, err); + + t.release(); + return 0; +} + +int n2_trans_alloc_send(dnet_session *s, n2_request_info &&request_info, dnet_addr &addr_out) { + dnet_node *n = s->node; + dnet_cmd *cmd = &request_info.request.cmd; + dnet_net_state *st = nullptr; + int err = 0; + + if (dnet_session_get_cflags(s) & DNET_FLAGS_DIRECT) { + st = dnet_state_search_by_addr(n, &s->direct_addr); + } else if (dnet_session_get_cflags(s) & DNET_FLAGS_FORWARD) { + st = dnet_state_search_by_addr(n, &s->forward_addr); + }else { + st = dnet_state_get_first(n, &cmd->id); + } + + if (!st) { + DNET_LOG_ERROR(n, "%s: direct: %d, direct-addr: %s, forward: %d: trans_send: could not find network state for address", + dnet_dump_id(&cmd->id), + !!(dnet_session_get_cflags(s) & DNET_FLAGS_DIRECT), dnet_addr_string(&s->direct_addr), + !!(dnet_session_get_cflags(s) & DNET_FLAGS_FORWARD)); + err = n2_trans_send_fail(request_info, -ENXIO); + } else { + addr_out = st->addr; + err = n2_trans_alloc_send_state(s, st, request_info); + dnet_state_put(st); + } + + return err; +} + +}} // ioremap::elliptics diff --git a/tests/test.cpp b/tests/test.cpp index a30a11536..5efa705a3 100644 --- a/tests/test.cpp +++ b/tests/test.cpp @@ -546,6 +546,14 @@ static void test_bulk_remove(session &sess, size_t test_count) for (auto it = result.begin(); it != result.end(); ++it) { // count only acks since they are the only packets returned by remove() count += (it->status() == 0) && (it->is_ack()); + if (!it->is_ack()) { + std::cout + << ": raw_data_size = " << it->raw_data().size() + << ", data_size = " << it->data().size() + << ", size = " << it->size() + << ", headsize = " << (sizeof(dnet_addr) +sizeof(dnet_cmd)) + << "\n"; + } BOOST_WARN_EQUAL(it->status(), 0); } BOOST_REQUIRE_EQUAL(count, test_count * 2);