Skip to content

Commit e2f49c8

Browse files
authored
Alow NamedMutex to share an ApiError among threads (#1249)
It is done to prevent request repetition if the same request in other thread have ended up with an error. This change fixes an issue when task scheduler is overloaded by the same requests that fails. Relates-To: OLPSUP-15885 Signed-off-by: Mykola Malik <[email protected]>
1 parent 0f8bcdd commit e2f49c8

File tree

4 files changed

+165
-4
lines changed

4 files changed

+165
-4
lines changed

olp-cpp-sdk-dataservice-read/src/repositories/DataRepository.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,25 @@ BlobApi::DataResponse DataRepository::GetBlobData(
162162
}
163163
}
164164

165+
// Check if other threads have faced an error.
166+
const auto optional_error = mutex.GetError();
167+
if (optional_error) {
168+
OLP_SDK_LOG_DEBUG_F(kLogTag,
169+
"Found error in NamedMutex, aborting, hrn='%s', "
170+
"key='%s', error='%s'",
171+
catalog_.ToCatalogHRNString().c_str(),
172+
data_handle->c_str(),
173+
optional_error->GetMessage().c_str());
174+
return *optional_error;
175+
}
176+
165177
auto storage_api_lookup = lookup_client_.LookupApi(
166178
service, "v1", static_cast<client::FetchOptions>(fetch_option), context);
167179

168180
if (!storage_api_lookup.IsSuccessful()) {
181+
// Store an error to share it with other threads.
182+
mutex.SetError(storage_api_lookup.GetError());
183+
169184
return storage_api_lookup.GetError();
170185
}
171186

@@ -204,6 +219,9 @@ BlobApi::DataResponse DataRepository::GetBlobData(
204219
catalog_.ToCatalogHRNString().c_str(), data_handle->c_str());
205220
repository.Clear(layer, data_handle.value());
206221
}
222+
223+
// Store an error to share it with other threads.
224+
mutex.SetError(storage_response.GetError());
207225
}
208226

209227
return storage_response;

olp-cpp-sdk-dataservice-read/src/repositories/NamedMutex.cpp

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2020 HERE Europe B.V.
2+
* Copyright (C) 2020-2021 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,11 +30,14 @@ class NamedMutexStorage::Impl {
3030
public:
3131
std::mutex& AquireLock(const std::string& resource);
3232
void ReleaseLock(const std::string& resource);
33+
void SetError(const std::string& resource, const client::ApiError& error);
34+
boost::optional<client::ApiError> GetError(const std::string& resource);
3335

3436
private:
3537
struct RefCounterMutex {
3638
std::mutex mutex;
3739
uint32_t use_count{0u};
40+
boost::optional<client::ApiError> optional_error;
3841
};
3942

4043
std::mutex mutex_;
@@ -61,6 +64,26 @@ void NamedMutexStorage::Impl::ReleaseLock(const std::string& resource) {
6164
}
6265
}
6366

67+
void NamedMutexStorage::Impl::SetError(const std::string& resource,
68+
const client::ApiError& error) {
69+
std::lock_guard<std::mutex> lock(mutex_);
70+
auto mutex_it = mutexes_.find(resource);
71+
if (mutex_it != mutexes_.end()) {
72+
mutex_it->second.optional_error = error;
73+
}
74+
}
75+
76+
boost::optional<client::ApiError> NamedMutexStorage::Impl::GetError(
77+
const std::string& resource) {
78+
std::lock_guard<std::mutex> lock(mutex_);
79+
auto mutex_it = mutexes_.find(resource);
80+
if (mutex_it == mutexes_.end()) {
81+
return boost::none;
82+
}
83+
84+
return mutex_it->second.optional_error;
85+
}
86+
6487
NamedMutexStorage::NamedMutexStorage() : impl_(std::make_shared<Impl>()) {}
6588

6689
std::mutex& NamedMutexStorage::AquireLock(const std::string& resource) {
@@ -71,6 +94,16 @@ void NamedMutexStorage::ReleaseLock(const std::string& resource) {
7194
impl_->ReleaseLock(resource);
7295
}
7396

97+
void NamedMutexStorage::SetError(const std::string& resource,
98+
const client::ApiError& error) {
99+
impl_->SetError(resource, error);
100+
}
101+
102+
boost::optional<client::ApiError> NamedMutexStorage::GetError(
103+
const std::string& resource) {
104+
return impl_->GetError(resource);
105+
}
106+
74107
NamedMutex::NamedMutex(NamedMutexStorage& storage, const std::string& name)
75108
: storage_{storage}, name_{name}, mutex_{storage_.AquireLock(name_)} {}
76109

@@ -82,6 +115,14 @@ bool NamedMutex::try_lock() { return mutex_.try_lock(); }
82115

83116
void NamedMutex::unlock() { mutex_.unlock(); }
84117

118+
void NamedMutex::SetError(const client::ApiError& error) {
119+
storage_.SetError(name_, error);
120+
}
121+
122+
boost::optional<client::ApiError> NamedMutex::GetError() {
123+
return storage_.GetError(name_);
124+
}
125+
85126
} // namespace repository
86127
} // namespace read
87128
} // namespace dataservice

olp-cpp-sdk-dataservice-read/src/repositories/NamedMutex.h

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2020 HERE Europe B.V.
2+
* Copyright (C) 2020-2021 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,14 +24,18 @@
2424
#include <string>
2525
#include <unordered_map>
2626

27+
#include <olp/core/client/ApiError.h>
28+
#include <boost/optional.hpp>
29+
2730
namespace olp {
2831
namespace dataservice {
2932
namespace read {
3033
namespace repository {
3134

3235
/*
3336
* @brief A mutex storage class, used to store and access mutex primitives by
34-
* name, so it can be used in different places and different conditions.
37+
* name, so it can be used in different places and different conditions. Has an
38+
* ability to share an `ApiError` among threads that uses the same mutex.
3539
*/
3640
class NamedMutexStorage {
3741
public:
@@ -40,14 +44,33 @@ class NamedMutexStorage {
4044
std::mutex& AquireLock(const std::string& resource);
4145
void ReleaseLock(const std::string& resource);
4246

47+
/**
48+
* @brief Saves an error to share it among threads.
49+
*
50+
* @param resource A name of a mutex to store an error for.
51+
* @param error An error to be stored.
52+
*/
53+
void SetError(const std::string& resource, const client::ApiError& error);
54+
55+
/**
56+
* @brief Gets the stored error for provided resource.
57+
*
58+
* @param resource A name of a mutex to get an error for.
59+
*
60+
* @return The stored `ApiError` instance or `boost::none` if no error has
61+
* been stored for this resource.
62+
*/
63+
boost::optional<client::ApiError> GetError(const std::string& resource);
64+
4365
private:
4466
class Impl;
4567
std::shared_ptr<Impl> impl_;
4668
};
4769

4870
/*
4971
* @brief A synchronization primitive that can be used to protect shared data
50-
* from being simultaneously accessed by multiple threads.
72+
* from being simultaneously accessed by multiple threads. Has an ability to
73+
* share an `ApiError` among the threads.
5174
*/
5275
class NamedMutex final {
5376
public:
@@ -60,10 +83,27 @@ class NamedMutex final {
6083

6184
~NamedMutex();
6285

86+
// These method names are lower snake case to be able to use NamedMutex with
87+
// std::unique_lock.
6388
void lock();
6489
bool try_lock();
6590
void unlock();
6691

92+
/**
93+
* @brief Saves an error to share it among threads.
94+
*
95+
* @param error An error to be stored.
96+
*/
97+
void SetError(const client::ApiError& error);
98+
99+
/**
100+
* @brief Gets the stored error for this mutex.
101+
*
102+
* @return The stored `ApiError` instance or `boost::none` if no error has
103+
* been stored for this mutex.
104+
*/
105+
boost::optional<client::ApiError> GetError();
106+
67107
private:
68108
NamedMutexStorage& storage_;
69109
std::string name_;

olp-cpp-sdk-dataservice-read/tests/DataRepositoryTest.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <olp/dataservice/read/DataRequest.h>
3131
#include <olp/dataservice/read/TileRequest.h>
3232
#include <repositories/DataRepository.h>
33+
#include <repositories/NamedMutex.h>
3334
#include <repositories/PartitionsCacheRepository.h>
3435
#include <repositories/PartitionsRepository.h>
3536

@@ -273,6 +274,67 @@ TEST_F(DataRepositoryTest, GetBlobDataInProgressCancel) {
273274
olp::client::ErrorCode::Cancelled);
274275
}
275276

277+
TEST_F(DataRepositoryTest, GetBlobDataSimultaniousFailedCalls) {
278+
// The lookup data must be requested from the network only once
279+
EXPECT_CALL(*network_mock_, Send(IsGetRequest(kUrlLookup), _, _, _, _))
280+
.WillOnce(ReturnHttpResponse(olp::http::NetworkResponse().WithStatus(
281+
olp::http::HttpStatusCode::OK),
282+
kUrlResponseLookup));
283+
284+
std::promise<void> network_request_started_promise;
285+
std::promise<void> finish_network_request_promise;
286+
287+
auto wait = [&]() {
288+
network_request_started_promise.set_value();
289+
finish_network_request_promise.get_future().wait();
290+
};
291+
292+
// The blob data must be requested from the network only once
293+
EXPECT_CALL(*network_mock_, Send(IsGetRequest(kUrlBlobData269), _, _, _, _))
294+
.WillOnce(testing::DoAll(
295+
testing::InvokeWithoutArgs(wait),
296+
ReturnHttpResponse(olp::http::NetworkResponse().WithStatus(
297+
olp::http::HttpStatusCode::REQUEST_TIMEOUT),
298+
"Timeout")));
299+
300+
olp::client::CancellationContext context;
301+
olp::dataservice::read::repository::NamedMutexStorage storage;
302+
303+
olp::dataservice::read::DataRequest request;
304+
request.WithDataHandle(kUrlBlobDataHandle);
305+
306+
olp::client::HRN hrn(GetTestCatalog());
307+
ApiLookupClient lookup_client(hrn, *settings_);
308+
DataRepository repository(hrn, *settings_, lookup_client, storage);
309+
310+
// Start first request in a separate thread
311+
std::thread first_request_thread([&]() {
312+
auto response =
313+
repository.GetBlobData(kLayerId, kService, request, context);
314+
EXPECT_FALSE(response.IsSuccessful());
315+
});
316+
317+
// Wait untill network request processing started
318+
network_request_started_promise.get_future().wait();
319+
320+
// Get a mutex from the storage. It guarantees that when the second thread
321+
// accuares the mutex, the stored error will not be cleaned up in scope of
322+
// ReleaseLock call from the first thread
323+
olp::dataservice::read::repository::NamedMutex mutex(
324+
storage, hrn.ToString() + kService + kUrlBlobDataHandle);
325+
326+
// Start second request in a separate thread
327+
std::thread second_request_thread([&]() {
328+
auto response =
329+
repository.GetBlobData(kLayerId, kService, request, context);
330+
EXPECT_FALSE(response.IsSuccessful());
331+
});
332+
333+
finish_network_request_promise.set_value();
334+
first_request_thread.join();
335+
second_request_thread.join();
336+
}
337+
276338
TEST_F(DataRepositoryTest, GetVersionedDataTile) {
277339
EXPECT_CALL(*network_mock_, Send(IsGetRequest(kUrlLookup), _, _, _, _))
278340
.WillOnce(ReturnHttpResponse(olp::http::NetworkResponse().WithStatus(

0 commit comments

Comments
 (0)