diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index 8e59f0adb..685a5ed36 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -60,7 +60,9 @@ foreach(example scheduled_send service_bus multithreaded_client - multithreaded_client_flow_control) + multithreaded_client_flow_control + tx_send + tx_recv) add_executable(${example} ${example}.cpp) target_link_libraries(${example} Proton::cpp Threads::Threads) endforeach() diff --git a/cpp/examples/tx_recv.cpp b/cpp/examples/tx_recv.cpp new file mode 100644 index 000000000..81f773a26 --- /dev/null +++ b/cpp/examples/tx_recv.cpp @@ -0,0 +1,150 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +class tx_recv : public proton::messaging_handler { + private: + proton::receiver receiver; + std::string conn_url_; + std::string addr_; + int total; + int batch_size; + int received = 0; + int current_batch = 0; + int batch_index = 0; + + public: + tx_recv(const std::string& u, const std::string &a, int c, int b): + conn_url_(u), addr_(a), total(c), batch_size(b) {} + + void on_container_start(proton::container &c) override { + c.connect(conn_url_); + } + + void on_connection_open(proton::connection& c) override { + // NOTE:credit_window(0) disables automatic flow control. + // We will use flow control to receive batches of messages in a transaction. + std::cout << "In this example we abort/commit transaction alternatively." << std::endl; + receiver = c.open_receiver(addr_, proton::receiver_options().credit_window(0)); + } + + void on_session_open(proton::session &s) override { + std::cout << "New session is open" << std::endl; + s.transaction_declare(); + } + + void on_session_error(proton::session &s) override { + std::cout << "Session error: " << s.error().what() << std::endl; + s.connection().close(); + exit(-1); + } + + void on_session_transaction_declared(proton::session &s) override { + std::cout << "Transaction is declared: " << s.transaction_id() << std::endl; + receiver.add_credit(batch_size); + } + + void on_session_transaction_committed(proton::session &s) override { + std::cout << "Transaction commited" << std::endl; + received += current_batch; + current_batch = 0; + if (received == total) { + std::cout << "All received messages committed, closing connection." << std::endl; + s.connection().close(); + } + else { + std::cout << "Re-declaring transaction now... to receive next batch." << std::endl; + s.transaction_declare(); + } + } + + void on_session_transaction_aborted(proton::session &s) override { + std::cout << "Transaction aborted!" << std::endl; + std::cout << "Re-declaring transaction now..." << std::endl; + current_batch = 0; + s.transaction_declare(); + } + + void on_message(proton::delivery &d, proton::message &msg) override { + std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl; + auto session = d.session(); + d.accept(); + current_batch += 1; + if (current_batch == batch_size) { + // Batch complete + if (batch_index % 2 == 1) { + std::cout << "Commiting transaction..." << std::endl; + session.transaction_commit(); + } else { + std::cout << "Aborting transaction..." << std::endl; + session.transaction_abort(); + } + batch_index++; + } + } +}; + +int main(int argc, char **argv) { + std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672"; + std::string addr = argc > 2 ? argv[2] : "examples"; + int message_count = 6; + int batch_size = 3; + example::options opts(argc, argv); + + opts.add_value(conn_url, 'u', "url", "connect and send to URL", "URL"); + opts.add_value(addr, 'a', "address", "connect and send to address", "URL"); + opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT"); + opts.add_value(batch_size, 'b', "batch_size", "number of messages in each transaction", "BATCH_SIZE"); + + try { + opts.parse(); + + tx_recv recv(conn_url, addr, message_count, batch_size); + proton::container(recv).run(); + + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} diff --git a/cpp/examples/tx_send.cpp b/cpp/examples/tx_send.cpp new file mode 100644 index 000000000..e1b38fad7 --- /dev/null +++ b/cpp/examples/tx_send.cpp @@ -0,0 +1,165 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +class tx_send : public proton::messaging_handler { + private: + std::string conn_url_; + std::string addr_; + int total; + int batch_size; + int sent; + int batch_index = 0; + int current_batch = 0; + int committed = 0; + std::atomic unique_msg_id; + + public: + tx_send(const std::string& u, const std::string& a, int c, int b): + conn_url_(u), addr_(a), total(c), batch_size(b), sent(0), unique_msg_id(10000) {} + + void on_container_start(proton::container &c) override { + c.connect(conn_url_); + } + + void on_connection_open(proton::connection& c) override { + std::cout << "In this example we abort/commit transaction alternatively." << std::endl; + c.open_session(); + } + + void on_session_open(proton::session& s) override { + std::cout << "New session is open, declaring transaction now..." << std::endl; + s.transaction_declare(); + } + + void on_session_transaction_declared(proton::session& s) override { + std::cout << "Transaction is declared: " << s.transaction_id() << std::endl; + s.open_sender(addr_); + } + + void on_session_error(proton::session &s) override { + std::cout << "Session error: " << s.error().what() << std::endl; + s.connection().close(); + exit(-1); + } + + void on_session_transaction_error(proton::session &s) override { + std::cout << "Transaction error!" << std::endl; + s.connection().close(); + exit(-1); + } + + void on_sendable(proton::sender& sender) override { + proton::session session = sender.session(); + while (session.transaction_is_declared() && sender.credit() && + (committed + current_batch) < total) { + proton::message msg; + + msg.id(std::atomic_fetch_add(&unique_msg_id, 1)); + msg.body(std::map{{"sequence", committed + current_batch}}); + std::cout << "Sending [sender batch " << batch_index << "]: " << msg << std::endl; + sender.send(msg); + current_batch += 1; + if(current_batch == batch_size) + { + if (batch_index % 2 == 0) { + std::cout << "Commiting transaction..." << std::endl; + session.transaction_commit(); + } else { + std::cout << "Aborting transaction..." << std::endl; + session.transaction_abort(); + } + batch_index++; + } + } + } + + void on_session_transaction_committed(proton::session &s) override { + committed += current_batch; + current_batch = 0; + std::cout << "Transaction commited" << std::endl; + if(committed == total) { + std::cout << "All messages committed, closing connection." << std::endl; + s.connection().close(); + } + else { + std::cout << "Re-declaring transaction now..." << std::endl; + s.transaction_declare(); + } + } + + void on_session_transaction_aborted(proton::session &s) override { + std::cout << "Transaction aborted!" << std::endl; + std::cout << "Re-delaring transaction now..." << std::endl; + current_batch = 0; + s.transaction_declare(); + } + + void on_sender_close(proton::sender &s) override { + current_batch = 0; + } + +}; + +int main(int argc, char **argv) { + std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672"; + std::string addr = argc > 2 ? argv[2] : "examples"; + int message_count = 6; + int batch_size = 3; + example::options opts(argc, argv); + + opts.add_value(conn_url, 'u', "url", "connect and send to URL", "URL"); + opts.add_value(addr, 'a', "address", "connect and send to address", "URL"); + opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT"); + opts.add_value(batch_size, 'b', "batch_size", "number of messages in each transaction", "BATCH_SIZE"); + + try { + opts.parse(); + + tx_send send(conn_url, addr, message_count, batch_size); + proton::container(send).run(); + + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} diff --git a/cpp/include/proton/messaging_handler.hpp b/cpp/include/proton/messaging_handler.hpp index 213dbe73e..b6e229cb2 100644 --- a/cpp/include/proton/messaging_handler.hpp +++ b/cpp/include/proton/messaging_handler.hpp @@ -172,6 +172,18 @@ PN_CPP_CLASS_EXTERN messaging_handler { /// The remote peer closed the session with an error condition. PN_CPP_EXTERN virtual void on_session_error(session&); + /// Called when a local transaction is declared. + PN_CPP_EXTERN virtual void on_session_transaction_declared(session&); + + /// Called when a local transaction is discharged successfully. + PN_CPP_EXTERN virtual void on_session_transaction_committed(session&); + + /// Called when a local transaction is discharged unsuccessfully (aborted). + PN_CPP_EXTERN virtual void on_session_transaction_aborted(session&); + + /// Called when a local transaction operation fails. + PN_CPP_EXTERN virtual void on_session_transaction_error(session&); + /// The remote peer opened the link. PN_CPP_EXTERN virtual void on_receiver_open(receiver&); diff --git a/cpp/include/proton/session.hpp b/cpp/include/proton/session.hpp index 60522c817..7da0740d6 100644 --- a/cpp/include/proton/session.hpp +++ b/cpp/include/proton/session.hpp @@ -105,14 +105,32 @@ PN_CPP_CLASS_EXTERN session : public internal::object, public endp /// Get user data from this session. PN_CPP_EXTERN void* user_data() const; + /// Declare a new local transaction on this session. + PN_CPP_EXTERN void transaction_declare(bool settle_before_discharge = false); + + /// Commit the currently declared transaction. + PN_CPP_EXTERN void transaction_commit(); + + /// Abort the currently declared transaction. + PN_CPP_EXTERN void transaction_abort(); + + /// Return true if a transaction is currently declared. + PN_CPP_EXTERN bool transaction_is_declared() const; + + /// Return the identifier of the current transaction. + PN_CPP_EXTERN binary transaction_id() const; + + /// Return the error condition associated with transaction. + PN_CPP_EXTERN error_condition transaction_error() const; + /// @cond INTERNAL - friend class internal::factory; - friend class session_iterator; + friend class internal::factory; + friend class sender; + friend class session_iterator; /// @endcond }; /// @cond INTERNAL - /// An iterator of sessions. class session_iterator : public internal::iter_base { public: @@ -126,7 +144,6 @@ class session_iterator : public internal::iter_base { typedef internal::iter_range session_range; /// @endcond - } // proton #endif // PROTON_SESSION_HPP diff --git a/cpp/src/contexts.cpp b/cpp/src/contexts.cpp index a3b393968..0fd9d4607 100644 --- a/cpp/src/contexts.cpp +++ b/cpp/src/contexts.cpp @@ -27,6 +27,7 @@ #include "proton/connection_options.hpp" #include "proton/error.hpp" +#include "proton/messaging_handler.hpp" #include "proton/reconnect_options.hpp" #include @@ -87,6 +88,12 @@ link_context& link_context::get(pn_link_t* l) { return ref(id(pn_link_attachments(l), LINK_CONTEXT)); } +transaction_context::transaction_context(pn_link_t* coordinator_sender, bool settle_before_discharge) : + coordinator(coordinator_sender) +{} + +session_context::session_context() : handler(nullptr), user_data_(nullptr) {} + session_context& session_context::get(pn_session_t* s) { return ref(id(pn_session_attachments(s), SESSION_CONTEXT)); } diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp index 7ebab1d7b..66f854a7a 100644 --- a/cpp/src/contexts.hpp +++ b/cpp/src/contexts.hpp @@ -28,6 +28,7 @@ #include "proton/message.hpp" #include "proton/object.h" +#include "proton/condition.h" #include @@ -147,15 +148,35 @@ class link_context : public context { void* user_data_; }; +class transaction_context; + class session_context : public context { public: - session_context() : handler(0), user_data_(nullptr) {} + session_context(); static session_context& get(pn_session_t* s); messaging_handler* handler; + std::unique_ptr transaction_context_; void* user_data_; }; +// This is not a context object on its own, but an optional part of session +class transaction_context { + public: + transaction_context(pn_link_t* coordinator, bool settle_before_discharge); + pn_link_t* coordinator; + pn_condition_t* error = nullptr; + binary transaction_id; + bool failed = false; + enum class State { + NO_TRANSACTION, + DECLARING, + DECLARED, + DISCHARGING, + }; + State state = State::NO_TRANSACTION; +}; + class transfer_context : public context { public: transfer_context() : user_data_(nullptr) {} diff --git a/cpp/src/delivery.cpp b/cpp/src/delivery.cpp index 9f03eaf73..e7230b205 100644 --- a/cpp/src/delivery.cpp +++ b/cpp/src/delivery.cpp @@ -31,10 +31,20 @@ #include "proton/binary.hpp" +#include + namespace { void settle_delivery(pn_delivery_t* o, uint64_t state) { - pn_delivery_update(o, state); + proton::session session = proton::make_wrapper(o).session(); + if(session.transaction_is_declared()) { + // Transactional disposition + auto disp = pn_transactional_disposition(pn_delivery_local(o)); + pn_transactional_disposition_set_id(disp, pn_bytes(session.transaction_id())); + pn_transactional_disposition_set_outcome_type(disp, state); + } else { + pn_delivery_update(o, state); + } pn_delivery_settle(o); } diff --git a/cpp/src/handler.cpp b/cpp/src/handler.cpp index 1632efda6..21ceb1532 100644 --- a/cpp/src/handler.cpp +++ b/cpp/src/handler.cpp @@ -65,6 +65,12 @@ void messaging_handler::on_session_open(session &s) { pn_session_open(unwrap(s)); } } + +void messaging_handler::on_session_transaction_declared(session &) {} +void messaging_handler::on_session_transaction_committed(session &) {} +void messaging_handler::on_session_transaction_aborted(session &) {} +void messaging_handler::on_session_transaction_error(session &s) { on_session_error(s); } + void messaging_handler::on_receiver_close(receiver &) {} void messaging_handler::on_receiver_error(receiver &l) { on_error(l.error()); } void messaging_handler::on_receiver_open(receiver &l) { diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp index f90cd7613..a4ea33b64 100644 --- a/cpp/src/messaging_adapter.cpp +++ b/cpp/src/messaging_adapter.cpp @@ -30,6 +30,7 @@ #include "proton/receiver_options.hpp" #include "proton/sender.hpp" #include "proton/sender_options.hpp" +#include "proton/target_options.hpp" #include "proton/session.hpp" #include "proton/tracker.hpp" #include "proton/transport.hpp" @@ -37,6 +38,7 @@ #include "contexts.hpp" #include "msg.hpp" #include "proton_bits.hpp" +#include "types_internal.hpp" #include #include @@ -69,7 +71,7 @@ void on_link_flow(messaging_handler& handler, pn_event_t* event) { // TODO: process session flow data, if no link-specific data, just return. if (!lnk) return; int state = pn_link_state(lnk); - if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) { + if (((state & PN_LOCAL_ACTIVE) && (state & PN_REMOTE_ACTIVE))) { link_context& lctx = link_context::get(lnk); if (pn_link_is_sender(lnk)) { if (pn_link_credit(lnk) > 0) { @@ -110,6 +112,81 @@ void message_decode(message& msg, proton::delivery delivery) { pn_link_advance(unwrap(link)); } +bool transaction_coordinator_sender(const sender& s) { + auto& txn_context = session_context::get(unwrap(s.session())).transaction_context_; + return txn_context && (txn_context->coordinator == unwrap(s)); +} + +void handle_transaction_coordinator_outcome(messaging_handler& handler, tracker t) { + auto session = t.session(); + auto& session_context = session_context::get(unwrap(session)); + auto& transaction_context = session_context.transaction_context_; + auto state = transaction_context->state; + auto disposition = pn_delivery_remote(unwrap(t)); + if (auto *declared_disp = pn_declared_disposition(disposition); declared_disp) { + switch (state) { + case transaction_context::State::DECLARING: { + pn_bytes_t txn_id = pn_declared_disposition_get_id(declared_disp); + transaction_context->transaction_id = proton::bin(txn_id); + transaction_context->state = transaction_context::State::DECLARED; + handler.on_session_transaction_declared(session); + return; + } + case transaction_context::State::NO_TRANSACTION: + case transaction_context::State::DECLARED: + case transaction_context::State::DISCHARGING: + // Don't throw error here, instead close link with error + make_wrapper(transaction_context->coordinator).close(error_condition{"amqp:not-allowed", "Received transaction declared disposition in invalid state"}); + transaction_context.release(); + } + } else if (pn_disposition_type(disposition) == PN_ACCEPTED) { + switch (state) { + case transaction_context::State::DISCHARGING: { + if (transaction_context->failed) { + // Transaction abort is successful + transaction_context->state = transaction_context::State::NO_TRANSACTION; + handler.on_session_transaction_aborted(session); + return; + } else { + // Transaction commit is successful + transaction_context->state = transaction_context::State::NO_TRANSACTION; + handler.on_session_transaction_committed(session); + return; + } + } + case transaction_context::State::NO_TRANSACTION: + case transaction_context::State::DECLARING: + case transaction_context::State::DECLARED: + // TODO: Don't throw error here, instead detach link or close session? + make_wrapper(transaction_context->coordinator).close(error_condition{"amqp:not-allowed", "Received transaction accepted disposition in invalid state"}); + transaction_context.release(); + } + } else if (auto rejected_disp = pn_rejected_disposition(disposition); rejected_disp) { + switch (state) { + case transaction_context::State::DECLARING: + transaction_context->state = transaction_context::State::NO_TRANSACTION; + transaction_context->error = pn_rejected_disposition_condition(rejected_disp); + handler.on_session_transaction_error(session); + return; + case transaction_context::State::DISCHARGING: + // Note that rollback cannot fail in AMQP as the outcome would be the same, + // so don't count rejected discharge as an error (although it is a protocol error). + if (!transaction_context->failed) { + transaction_context->state = transaction_context::State::NO_TRANSACTION; + transaction_context->error = pn_rejected_disposition_condition(rejected_disp); + handler.on_session_transaction_error(session); + return; + } + case transaction_context::State::NO_TRANSACTION: + case transaction_context::State::DECLARED: + // TODO: Don't throw error here, instead detach link or close session? + make_wrapper(transaction_context->coordinator).close(error_condition{"amqp:not-allowed", "Received transaction rejected disposition in invalid state"}); + transaction_context.release(); + } + } + // TODO: Don't ignore unexpected disposition here, instead detach link or close session? +} + void on_delivery(messaging_handler& handler, pn_event_t* event) { pn_link_t *lnk = pn_event_link(event); pn_delivery_t *dlv = pn_event_delivery(event); @@ -165,6 +242,12 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) { // sender if (pn_delivery_updated(dlv)) { tracker t(make_wrapper(dlv)); + // Check for outcome from a transaction coordinator + if (transaction_coordinator_sender(t.sender())) { + handle_transaction_coordinator_outcome(handler, t); + t.settle(); + return; + } ot.on_settled_span(t); switch(pn_delivery_remote_state(dlv)) { case PN_ACCEPTED: @@ -274,13 +357,6 @@ void on_link_local_open(messaging_handler& handler, pn_event_t* event) { void on_link_remote_open(messaging_handler& handler, pn_event_t* event) { auto lnk = pn_event_link(event); - // Currently don't implement (transaction) coordinator - if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) { - auto error = pn_link_condition(lnk); - pn_condition_set_name(error, "amqp:not-implemented"); - pn_link_close(lnk); - return; - } if (pn_link_state(lnk) & PN_LOCAL_UNINIT) { // Incoming link // Copy source and target from remote end. pn_terminus_copy(pn_link_source(lnk), pn_link_remote_source(lnk)); @@ -320,10 +396,24 @@ void on_connection_wake(messaging_handler& handler, pn_event_t* event) { } -void messaging_adapter::dispatch(messaging_handler& handler, pn_event_t* event) +void messaging_adapter::dispatch(messaging_handler& h, pn_event_t* event) { pn_event_type_t type = pn_event_type(event); + // If this is an event for an (internal) transaction coordinator link set the handler to a null handler + // Unless its a delivery event which we need to process for transaction outcomes + messaging_handler& handler = [&]() -> messaging_handler& { + if (pn_link_t *lnk = pn_event_link(event); + type != PN_DELIVERY && + lnk && pn_link_is_sender(lnk) && + transaction_coordinator_sender(sender(make_wrapper(lnk)))) { + static messaging_handler null_handler; + return null_handler; + } else { + return h; + } + }(); + // Only handle events we are interested in switch(type) { case PN_CONNECTION_BOUND: on_connection_bound(handler, event); break; diff --git a/cpp/src/node_options.cpp b/cpp/src/node_options.cpp index fd489baf3..d6ec52df1 100644 --- a/cpp/src/node_options.cpp +++ b/cpp/src/node_options.cpp @@ -203,6 +203,4 @@ target_options& target_options::dynamic_properties(const target::dynamic_propert void target_options::apply(target& s) const { impl_->apply(s); } - - } // namespace proton diff --git a/cpp/src/sender.cpp b/cpp/src/sender.cpp index 942e755b0..7a251cc07 100644 --- a/cpp/src/sender.cpp +++ b/cpp/src/sender.cpp @@ -23,6 +23,7 @@ #include "proton/link.hpp" #include "proton/sender_options.hpp" +#include "proton/session.hpp" #include "proton/source.hpp" #include "proton/target.hpp" #include "proton/tracker.hpp" @@ -34,6 +35,7 @@ #include "proton_bits.hpp" #include "contexts.hpp" #include "tracing_private.hpp" +#include "types_internal.hpp" #include @@ -84,6 +86,13 @@ tracker sender::send(const message &message, const binary &tag) { pn_delivery_settle(dlv); if (!pn_link_credit(pn_object())) link_context::get(pn_object()).draining = false; + + // If transaction is declared + if (session().transaction_is_declared()) { + auto disp = pn_transactional_disposition(pn_delivery_local(unwrap(track))); + pn_transactional_disposition_set_id(disp, pn_bytes(session().transaction_id())); + } + return track; } diff --git a/cpp/src/session.cpp b/cpp/src/session.cpp index b8f777a00..9efd9c1e7 100644 --- a/cpp/src/session.cpp +++ b/cpp/src/session.cpp @@ -21,15 +21,26 @@ #include "proton/session.hpp" #include "proton/connection.hpp" +#include "proton/container.hpp" +#include "proton/delivery.hpp" +#include "proton/error.hpp" +#include "proton/messaging_handler.hpp" #include "proton/receiver_options.hpp" #include "proton/sender_options.hpp" #include "proton/session_options.hpp" +#include "proton/target_options.hpp" +#include "proton/tracker.hpp" +#include "proton/transfer.hpp" +#include "proton/types.hpp" #include "contexts.hpp" #include "link_namer.hpp" +#include "proactor_container_impl.hpp" #include "proton_bits.hpp" +#include "types_internal.hpp" #include +#include "proton/delivery.h" #include #include @@ -148,4 +159,87 @@ void* session::user_data() const { return sctx.user_data_; } +namespace { + +std::unique_ptr& get_transaction_context(const session& s) { + return session_context::get(unwrap(s)).transaction_context_; +} + +bool transaction_is_empty(const session& s) { + auto& txn = get_transaction_context(s); + return !txn || txn->state == transaction_context::State::NO_TRANSACTION; +} + +proton::tracker transaction_send_ctrl(sender&& coordinator, const symbol& descriptor, const value& value) { + proton::value msg_value; + proton::codec::encoder enc(msg_value); + enc << proton::codec::start::described() + << descriptor + << value + << proton::codec::finish(); + + return coordinator.send(msg_value); +} + +void transaction_discharge(const session& s, bool failed) { + auto& transaction_context = get_transaction_context(s); + if (transaction_context->state != transaction_context::State::DECLARED) + throw proton::error("Only a declared txn can be discharged."); + transaction_context->state = transaction_context::State::DISCHARGING; + + transaction_context->failed = failed; + transaction_send_ctrl( + make_wrapper(transaction_context->coordinator), + "amqp:discharge:list", std::list{transaction_context->transaction_id, failed}); +} + +pn_link_t* open_coordinator_sender(session& s) { + auto l = pn_sender(unwrap(s), next_link_name(s.connection()).c_str()); + auto t = pn_link_target(l); + pn_terminus_set_type(t, PN_COORDINATOR); + auto caps = pn_terminus_capabilities(t); + // As we only have a single symbol in the capabilities we don't have to create an array + pn_data_put_symbol(caps, pn_bytes("amqp:local-transactions")); + pn_link_open(l); + return l; +} + +} + +void session::transaction_declare(bool settle_before_discharge) { + if (!transaction_is_empty(*this)) + throw proton::error("Session already declared transaction"); + + auto& txn_context = get_transaction_context(*this); + if (!txn_context) { + txn_context = std::make_unique(open_coordinator_sender(*this), settle_before_discharge); + } + + // Declare txn + txn_context->state = transaction_context::State::DECLARING; + + transaction_send_ctrl(make_wrapper(txn_context->coordinator), "amqp:declare:list", std::list{}); +} + + +binary session::transaction_id() const { + auto& txn_context = get_transaction_context(*this); + if (txn_context) { + return txn_context->transaction_id; + } else { + return binary(); + } +} +void session::transaction_commit() { transaction_discharge(*this, false); } +void session::transaction_abort() { transaction_discharge(*this, true); } +bool session::transaction_is_declared() const { return (!transaction_is_empty(*this)) && get_transaction_context(*this)->state == transaction_context::State::DECLARED; } +error_condition session::transaction_error() const { + auto& txn_context = get_transaction_context(*this); + if (txn_context) { + return make_wrapper(txn_context->error); + } else { + return error_condition(); + } +} + } // namespace proton diff --git a/cpp/src/terminus.cpp b/cpp/src/terminus.cpp index 0cdc25c93..141341342 100644 --- a/cpp/src/terminus.cpp +++ b/cpp/src/terminus.cpp @@ -58,7 +58,7 @@ value terminus::node_properties() const { std::vector terminus::capabilities() const { value caps(pn_terminus_capabilities(object_)); - return caps.empty() ? std::vector() : caps.get >(); + return get_multiple>(caps); } terminus::dynamic_property_map terminus::dynamic_properties() const { diff --git a/cpp/src/transaction_test.cpp b/cpp/src/transaction_test.cpp new file mode 100644 index 000000000..f2c7836c5 --- /dev/null +++ b/cpp/src/transaction_test.cpp @@ -0,0 +1,309 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +#include +#include +#include +#include +#include +#include "proton/codec/decoder.hpp" +#include "proton/codec/encoder.hpp" +#include +#include +#include +#include +#include +#include +#include +#include "proton_bits.hpp" +#include +#include +#include +#include +#include // C++ API doesn't export disposition +#include "test_bits.hpp" // For RUN_ARGV_TEST and ASSERT_EQUAL +#include "types_internal.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { +std::mutex m; +std::condition_variable cv; +bool listener_ready = false; +int listener_port; +const proton::binary fake_txn_id("prqs5678-abcd-efgh-1a2b-3c4d5e6f7g8e"); +} // namespace + +void wait_for_promise_or_fail(std::promise& prom, const std::string& what) { + if (prom.get_future().wait_for(std::chrono::seconds(5)) == std::future_status::timeout) { + FAIL("Test FAILED: Did not receive '" << what << "' in time."); + } +} + +class FakeBroker : public proton::messaging_handler { + private: + class listener_ready_handler : public proton::listen_handler { + void on_open(proton::listener& l) override { + std::lock_guard lk(m); + listener_port = l.port(); + listener_ready = true; + cv.notify_one(); + } + }; + std::string url; + listener_ready_handler listen_handler; + proton::receiver coordinator_link; + + public: + proton::listener listener; + std::map> transactions_messages; + std::promise declare_promise; + std::promise commit_promise; + std::promise abort_promise; + + FakeBroker(const std::string& s) : url(s) {} + + void on_container_start(proton::container& c) override { + listener = c.listen(url, listen_handler); + } + + void on_connection_open(proton::connection& c) override { + c.open(proton::connection_options{}.offered_capabilities({"ANONYMOUS-RELAY"})); + } + + void on_receiver_open(proton::receiver& r) override { + // Identify the transaction link. + if(r.target().capabilities().size() > 0 && + r.target().capabilities()[0] == proton::symbol("amqp:local-transactions")) { + coordinator_link = r; + } + r.open(); + } + + void on_message(proton::delivery& d, proton::message& m) override { + if (coordinator_link.active() && d.receiver() == coordinator_link) { + handle_transaction_control(d, m); + } else { + handle_application_message(d, m); + } + } + + void handle_application_message(proton::delivery& d, proton::message& m) { + auto disp = pn_transactional_disposition(pn_delivery_remote(unwrap(d))); + if (disp != NULL) { + // transactional message + proton::binary txn_id = proton::bin(pn_transactional_disposition_get_id(disp)); + transactions_messages[txn_id].push_back(m); + } + } + + void handle_transaction_control(proton::delivery& d, proton::message& m) { + proton::codec::decoder dec(m.body()); + proton::symbol descriptor; + proton::value _value; + proton::type_id _t = dec.next_type(); + + if (_t == proton::type_id::DESCRIBED) { + proton::codec::start s; + dec >> s >> descriptor >> _value >> proton::codec::finish(); + } else { + std::cerr << "[fake_broker] Invalid transaction control message format: " << to_string(m) << std::endl; + d.reject(); + return; + } + + if (descriptor == "amqp:declare:list") { + pn_bytes_t txn_id = pn_bytes(fake_txn_id.size(), reinterpret_cast(&fake_txn_id[0])); + + pn_delivery_t* pd = proton::unwrap(d); + pn_disposition_t* disp = pn_delivery_local(pd); + pn_declared_disposition_t* declared_disp = pn_declared_disposition(disp); + pn_declared_disposition_set_id(declared_disp, txn_id); + pn_delivery_settle(pd); + + std::cout << "[BROKER] transaction declared: " << fake_txn_id << std::endl; + declare_promise.set_value(); + } else if (descriptor == "amqp:discharge:list") { + // Commit / Abort transaction. + std::vector vd; + proton::get(_value, vd); + ASSERT_EQUAL(vd.size(), 2u); + proton::binary txn_id = vd[0].get(); + bool is_abort = vd[1].get(); + if (!is_abort) { + // Commit + std::cout << "[BROKER] transaction commited:" << txn_id << std::endl; + // As part of this test, we don't need to forward transactions_messages. + // We are leaving the messages here to count them later on. + commit_promise.set_value(); + d.accept(); + } else { + // Abort + std::cout << "[BROKER] transaction aborted:" << txn_id << std::endl; + transactions_messages.erase(txn_id); + abort_promise.set_value(); + d.accept(); + } + // Closing the connection as we are testing till commit/abort. + d.receiver().close(); + d.connection().close(); + listener.stop(); + } + } +}; + +class test_client : public proton::messaging_handler { + private: + std::string server_address_; + int messages_left_; + bool is_commit_; + + public: + proton::binary last_txn_id; + proton::sender sender_; + std::promise block_declare_transaction_on_session; + std::promise transaction_finished_promise; + + test_client(const std::string& s) : + server_address_(s), messages_left_(0) {} + + void on_container_start(proton::container& c) override { + c.connect(server_address_); + } + + void on_connection_open(proton::connection& c) override { + sender_ = c.open_sender("/test"); + } + + void on_session_open(proton::session& s) override { + wait_for_promise_or_fail(block_declare_transaction_on_session, "waiting on test to be ready"); + s.transaction_declare(); + } + + void schedule_messages_in_transaction(int count, bool is_commit) { + messages_left_ = count; + is_commit_ = is_commit; + } + + void on_sendable(proton::sender&) override { + send(); + } + + void send() { + proton::session session = sender_.session(); + while (session.transaction_is_declared() && sender_.credit() && + messages_left_ > 0) { + proton::message msg("hello"); + sender_.send(msg); + messages_left_--; + if (messages_left_ == 0) { + if (is_commit_) { + std::cout << "Client: Committing transaction." << std::endl; + session.transaction_commit(); + } else { + std::cout << "Client: Aborting transaction." << std::endl; + session.transaction_abort(); + } + } + } + } + + void on_session_transaction_declared(proton::session &s) override { + last_txn_id = s.transaction_id(); + std::cout << "Client: Transaction declared successfully: " << last_txn_id << std::endl; + send(); + } + + void on_session_transaction_committed(proton::session &s) override { + std::cout << "Client: Transaction committed" << std::endl; + transaction_finished_promise.set_value(); + s.connection().close(); + } + + void on_session_transaction_aborted(proton::session &s) override { + std::cout << "Client: Transaction aborted" << std::endl; + transaction_finished_promise.set_value(); + s.connection().close(); + } +}; + +void test_transaction_commit(FakeBroker &broker, test_client &client) { + std::cout << "Starting test_transaction_commit..." << std::endl; + + const unsigned int messages_in_txn = 5; + client.schedule_messages_in_transaction(messages_in_txn, true); + client.block_declare_transaction_on_session.set_value(); + + wait_for_promise_or_fail(broker.declare_promise, "declare in broker"); + wait_for_promise_or_fail(broker.commit_promise, "commit in broker"); + + // Only one transaction + ASSERT_EQUAL(broker.transactions_messages.size(), 1u); + // Check message count inside broker + ASSERT_EQUAL(broker.transactions_messages[fake_txn_id].size(), messages_in_txn); +} + +void test_transaction_abort(FakeBroker &broker, test_client &client) { + std::cout << "Starting test_transaction_abort..." << std::endl; + + const unsigned int messages_in_txn = 5; + client.schedule_messages_in_transaction(messages_in_txn, false); + client.block_declare_transaction_on_session.set_value(); + + wait_for_promise_or_fail(broker.declare_promise, "declare in broker"); + wait_for_promise_or_fail(broker.commit_promise, "commit in broker"); + + // Only zero transactions + ASSERT_EQUAL(broker.transactions_messages.size(), 0u); +} + +int main(int argc, char** argv) { + int tests_failed = 0; + + std::string broker_address("127.0.0.1:0"); + FakeBroker broker(broker_address); + + proton::container broker_container(broker); + std::thread broker_thread([&broker_container]() -> void { broker_container.run(); }); + + // Wait for the listener + std::unique_lock lk(m); + cv.wait(lk, [] { return listener_ready; }); + + + std::string server_address = "127.0.0.1:" + std::to_string(listener_port); + test_client client(server_address); + + proton::container client_container(client); + std::thread client_thread([&client_container]() -> void { client_container.run(); }); + + RUN_ARGV_TEST(tests_failed, test_transaction_commit(broker, client)); + + broker_thread.join(); + client_thread.join(); + + return tests_failed; +} diff --git a/cpp/tests.cmake b/cpp/tests.cmake index b00d50129..85f1ae269 100644 --- a/cpp/tests.cmake +++ b/cpp/tests.cmake @@ -63,6 +63,9 @@ add_cpp_test(link_test) add_cpp_test(credit_test) add_cpp_test(delivery_test) add_cpp_test(context_test) +add_cpp_test(transaction_test) +target_link_libraries(transaction_test qpid-proton-core) + if (ENABLE_JSONCPP) add_cpp_test(connect_config_test) target_link_libraries(connect_config_test qpid-proton-core) # For pn_sasl_enabled