-
-
Notifications
You must be signed in to change notification settings - Fork 341
Description
Help needed: How to use this library in a multi-thread context? Do I need to put a lock on every call to member functions of storage_t?
I know this question has been asked and the answer was that there is no special thing done here in this lib about multithreading.
But it would be very nice if there is some documentation for using this library in multiple threads. A little demo explaining what kind of locking is necessary would be perfect!
- What kind of locking is sufficient when there are multiple reading and writing threads using default sqlite3 serialized mode?
Here is how I tried to use this lib in more than one threads:
Simple multithread read write demo
Here's a little example of multithread testing code,
#include <iostream>
#include <string>
#include "thirdparty/sqlite_orm.h"
#include <thread>
#include <chrono>
#include <exception>
struct Person {
int id; // key
std::string name;
std::string desc;
};
void oom_reproduce() {
using namespace sqlite_orm;
auto s = make_storage(
"test.db",
make_table("person", make_column("id", &Person::id, primary_key()),
make_column("name", &Person::name),
make_column("desc", &Person::name)));
s.sync_schema();
std::cout << "DONE sync_schema" << std::endl;
// multi thread read write test
auto w_cnt = 10;
auto r_cnt = 10;
auto w_interval = 10; // ms
auto r_interval = 10;
auto write_fn = [&]() {
try {
for (;;) {
std::this_thread::sleep_for(std::chrono::milliseconds(w_interval));
std::cout << "w";
Person p{-1, "name", "desc"};
// LOCK HERE
s.insert(p);
}
} catch (const std::exception &e) {
std::cout << "exception: " << e.what() << std::endl;
std::abort();
}
};
auto read_fn = [&]() {
try {
for (;;) {
std::this_thread::sleep_for(std::chrono::milliseconds(r_interval));
std::cout << "r";
// LOCK HERE
s.get_all<Person>();
}
} catch (const std::exception &e) {
std::cout << "exception: " << e.what() << std::endl;
std::abort();
}
};
for (int i = 0; i < w_cnt; i++) {
std::thread(write_fn).detach();
}
for (int i = 0; i < r_cnt; i++) {
std::thread(read_fn).detach();
}
for (;;) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main() {
std::cout << "start" << std::endl;
oom_reproduce();
return EXIT_SUCCESS;
}Without any locking, this code will crash. Sample output:
start
DONE sync_schema
wwwwwwwexception: out of memory: out of memory
aborted
About the out of memory error message:
After some digging, I found the root cause is a call to sqlite3 api with a NULL db connection.
For example, in prepare functions, calling sqlite3_prepare_v2 with first argument as a nullptr will produce an out of memory error message and the return code of sqlite3_prepare_v2 will be 21, which is SQLITE_MISUSE .
Why db is null, my reasoning
template<class T>
prepared_statement_t<replace_t<T>> prepare(replace_t<T> rep) {
auto con = this->get_connection();
sqlite3_stmt *stmt;
using object_type = typename expression_object_type<decltype(rep)>::type;
this->assert_mapped_type<object_type>();
auto db = con.get();
using context_t = serializator_context<impl_type>;
context_t context{this->impl};
context.skip_table_name = false;
context.replace_bindable_with_question = true;
auto query = serialize(rep, context);
if(sqlite3_prepare_v2(db, query.c_str(), -1, &stmt, nullptr) == SQLITE_OK) {
return {std::move(rep), stmt, con};
} else {
throw std::system_error(std::error_code(sqlite3_errcode(db), get_sqlite_error_category()),
sqlite3_errmsg(db));
}
}This is taken from sqlite_orm.h. In some multithreaded test, I found that db could be NULL. Setting breakpoint to the throw line, and call this function in multiple threads, I captured these variables:
| Name | Value | Type | |
|---|---|---|---|
| ◢ | con | {holder={filename="db/test.sqlite" db=0x0000000000000000 ...} } | sqlite_orm::internal::connection_ref |
| ◢ holder | {filename="db/test.sqlite" db=0x0000000000000000 _retain_count=...} | sqlite_orm::internal::connection_holder & | |
| db | 0x0000000000000000 | sqlite3 * | |
| _retain_count | 5 | int |
struct connection_holder {
connection_holder(std::string filename_) : filename(move(filename_)) {}
void retain() {
++this->_retain_count;
if(1 == this->_retain_count) {
auto rc = sqlite3_open(this->filename.c_str(), &this->db);
if(rc != SQLITE_OK) {
throw std::system_error(std::error_code(sqlite3_errcode(this->db), get_sqlite_error_category()),
sqlite3_errmsg(this->db));
}
}
}
void release() {
--this->_retain_count;
if(0 == this->_retain_count) {
auto rc = sqlite3_close(this->db);
if(rc != SQLITE_OK) {
throw std::system_error(std::error_code(sqlite3_errcode(this->db), get_sqlite_error_category()),
sqlite3_errmsg(this->db));
}
}
}
sqlite3 *get() const {
return this->db;
}
int retain_count() const {
return this->_retain_count;
}
const std::string filename;
protected:
sqlite3 *db = nullptr;
int _retain_count = 0;
};_retain_count is 5, and db is NULL. This does not seem to be a valid state. So my observation is that, calling retain and release in multiple threads will result in invalid state.
Is this reasoning correct?
locking
To make my demo work, I wrapped all crud operations, replace, insert, update, etc. with a lock.
std::lock_guard<std::mutex> g(m);
// call storage_t member function
auto result = storage.get_all<MyData>();Complete demo with locking:
#include <iostream>
#include <string>
#include "thirdparty/sqlite_orm_mod.h"
#include <thread>
#include <chrono>
#include <exception>
#include <mutex>
struct Person {
int id; // key
std::string name;
std::string desc;
};
std::mutex m;
#define USELOCK
#ifdef USELOCK
#define LOCK_SCOPE std::lock_guard<std::mutex> g(m);
#else
#define LOCK_SCOPE
#endif
void oom_reproduce() {
using namespace sqlite_orm;
auto s = make_storage(
"test.db",
make_table("person", make_column("id", &Person::id, primary_key()),
make_column("name", &Person::name),
make_column("desc", &Person::name)));
s.sync_schema();
std::cout << "DONE sync_schema" << std::endl;
auto w_cnt = 10;
auto r_cnt = 10;
auto w_interval = 10; // ms
auto r_interval = 10;
auto write_fn = [&]() {
try {
for (;;) {
std::this_thread::sleep_for(std::chrono::milliseconds(w_interval));
std::cout << "w";
Person p{-1, "name", "desc"};
LOCK_SCOPE
s.insert(p);
}
} catch (const std::exception &e) {
std::cout << std::this_thread::get_id() << ", exception: " << e.what() << std::endl;
std::abort();
}
};
auto read_fn = [&]() {
try {
for (;;) {
std::this_thread::sleep_for(std::chrono::milliseconds(r_interval));
std::cout << "r";
LOCK_SCOPE
s.get_all<Person>();
}
} catch (const std::exception &e) {
std::cout << std::this_thread::get_id() << ", exception: " << e.what() << std::endl;
std::abort();
}
};
for (int i = 0; i < w_cnt; i++) {
std::thread(write_fn).detach();
}
for (int i = 0; i < r_cnt; i++) {
std::thread(read_fn).detach();
}
for (;;) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main() {
std::cout << "start" << std::endl;
oom_reproduce();
return EXIT_SUCCESS;
}