Skip to content

Commit bf9eb75

Browse files
hanidamlajfacebook-github-bot
authored andcommitted
open source!
Summary: yay!! this diff open-sources the new c++20 coroutine-based proxygen::coro library we've been spent the past couple of years hardening/productionizing This is unfortunately a pretty big, yet unavoidable diff. The changes: * codemod's filepath prefix rename from `proxygen/facebook/lib/coro/...` -> `proxygen/lib/http/coro/...` * modifies CMake files to upgrade cxx verison, add c-ares as a dependency, and add proxygen::coro sources to the monolithic proxygen target Reviewed By: mjoras Differential Revision: D80372655 fbshipit-source-id: cc1f013c44b5966694cc2d3201f267957c7d8657
1 parent 4bebc8c commit bf9eb75

File tree

155 files changed

+39661
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

155 files changed

+39661
-1
lines changed

CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ project(
1111
proxygen
1212
)
1313

14-
set(CMAKE_CXX_STANDARD 17)
14+
set(CMAKE_CXX_STANDARD 20)
1515
set(CMAKE_CXX_STANDARD_REQUIRED ON)
1616
set(CMAKE_CXX_EXTENSIONS OFF)
1717

@@ -79,6 +79,7 @@ find_package(Zstd REQUIRED)
7979
find_package(ZLIB REQUIRED)
8080
find_package(OpenSSL REQUIRED)
8181
find_package(Threads)
82+
find_package(Cares)
8283
find_package(Boost 1.58 REQUIRED
8384
COMPONENTS
8485
iostreams

cmake/FindCares.cmake

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
6+
7+
find_path(CARES_INCLUDE_DIR NAMES ares.h)
8+
find_library(CARES_LIBRARIES NAMES cares)
9+
10+
include(FindPackageHandleStandardArgs)
11+
find_package_handle_standard_args(Cares DEFAULT_MSG CARES_LIBRARIES CARES_INCLUDE_DIR)
12+
13+
mark_as_advanced(
14+
CARES_LIBRARIES
15+
CARES_INCLUDE_DIR
16+
)
17+
18+
if(NOT TARGET cares)
19+
if("${CARES_LIBRARIES}" MATCHES ".*.a$")
20+
add_library(cares STATIC IMPORTED)
21+
else()
22+
add_library(cares SHARED IMPORTED)
23+
endif()
24+
set_target_properties(
25+
cares
26+
PROPERTIES
27+
IMPORTED_LOCATION ${CARES_LIBRARIES}
28+
INTERFACE_INCLUDE_DIRECTORIES ${CARES_INCLUDE_DIR}
29+
)
30+
endif()

proxygen/lib/CMakeLists.txt

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,54 @@ set(
102102
mvfst::mvfst_state_machine
103103
)
104104

105+
# append proxygen::coro sources
106+
set(
107+
PROXYGEN_CORO_SOURCES
108+
dns/DNSModule.cpp
109+
dns/DNSResolver.cpp
110+
dns/Rfc6724.cpp
111+
http/coro/HTTPBodyEventQueue.cpp
112+
http/coro/HTTPFilterFactoryHandler.cpp
113+
http/coro/HTTPEvents.cpp
114+
http/coro/HTTPError.cpp
115+
http/coro/HTTPCoroSession.cpp
116+
http/coro/HTTPByteEventHelpers.cpp
117+
http/coro/HTTPSourceReader.cpp
118+
http/coro/HTTPSourceFilter.cpp
119+
http/coro/HTTPHandlerChain.cpp
120+
http/coro/HTTPStreamSourceSink.cpp
121+
http/coro/HTTPStreamSource.cpp
122+
http/coro/HTTPTransactionAdaptorSource.cpp
123+
http/coro/util/Transport.cpp
124+
http/coro/util/ExecutorSourceFilter.cpp
125+
http/coro/util/DetachableExecutor.cpp
126+
http/coro/util/CancellableBaton.cpp
127+
http/coro/transport/HTTPConnectTransport.cpp
128+
http/coro/transport/HTTPConnectStream.cpp
129+
http/coro/transport/HTTPConnectAsyncTransport.cpp
130+
http/coro/transport/CoroSSLTransport.cpp
131+
http/coro/client/HTTPCoroSessionPool.cpp
132+
http/coro/client/HTTPCoroConnector.cpp
133+
http/coro/client/HTTPClientConnectionCache.cpp
134+
http/coro/client/HTTPClient.cpp
135+
http/coro/client/CoroDNSResolver.cpp
136+
http/coro/server/HTTPCoroAcceptor.cpp
137+
http/coro/server/HTTPServer.cpp
138+
http/coro/server/handlers/ExpectContinueWrapperHandler.cpp
139+
http/coro/filters/DecompressionFilter.cpp
140+
http/coro/filters/CompressionFilter.cpp
141+
http/coro/filters/HTTPRedirectHandler.cpp
142+
http/coro/filters/DecompressionFilterFactory.cpp
143+
http/coro/filters/MutateFilter.cpp
144+
http/coro/filters/Logger.cpp
145+
http/coro/filters/RateLimitFilter.cpp
146+
http/coro/filters/RequestContextFilterFactory.cpp
147+
http/coro/filters/Status1xxFilter.cpp
148+
http/coro/filters/StatsFilterUtil.cpp
149+
http/coro/filters/TransformFilter.cpp
150+
http/coro/filters/VisitorFilter.cpp
151+
)
152+
105153
add_library(
106154
proxygen
107155
healthcheck/ServerHealthCheckerCallback.cpp
@@ -219,6 +267,7 @@ add_library(
219267
utils/ZstdStreamCompressor.cpp
220268
utils/ZstdStreamDecompressor.cpp
221269
${HTTP3_SOURCES}
270+
${PROXYGEN_CORO_SOURCES}
222271
${PROXYGEN_GENERATED_ROOT}/proxygen/lib/http/HTTPCommonHeaders.cpp
223272
${PROXYGEN_GENERATED_ROOT}/proxygen/lib/utils/TraceEventType.cpp
224273
${PROXYGEN_GENERATED_ROOT}/proxygen/lib/utils/TraceFieldType.cpp
@@ -234,6 +283,7 @@ target_include_directories(
234283
$<BUILD_INTERFACE:${PROXYGEN_FBCODE_ROOT}>
235284
$<BUILD_INTERFACE:${PROXYGEN_GENERATED_ROOT}>
236285
$<INSTALL_INTERFACE:include/>
286+
${CARES_INCLUDE_DIR}
237287
)
238288
target_compile_options(
239289
proxygen PRIVATE
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#include "proxygen/lib/http/coro/HTTPBodyEventQueue.h"
10+
11+
#include <folly/Conv.h>
12+
13+
using folly::coro::co_error;
14+
using folly::coro::co_nothrow;
15+
using folly::coro::co_safe_point;
16+
17+
namespace proxygen::coro {
18+
19+
folly::coro::Task<HTTPHeaderEvent> HTTPBodyEventQueue::readHeaderEvent() {
20+
auto res = co_await co_nothrow(source_.readHeaderEvent());
21+
const auto& ct = co_await folly::coro::co_current_cancellation_token;
22+
if (ct.isCancellationRequested()) {
23+
co_yield co_error(HTTPError{HTTPErrorCode::CORO_CANCELLED});
24+
}
25+
26+
if (res.headers->isResponse() && res.headers->getStatusCode() == 304) {
27+
skipContentLengthValidation();
28+
} else if (shouldValidateContentLength_ && res.isFinal()) {
29+
setExpectedEgressContentLength(
30+
res.headers->getHeaders().getSingleOrEmpty(HTTP_HEADER_CONTENT_LENGTH),
31+
res.eom);
32+
}
33+
34+
co_return res;
35+
}
36+
37+
folly::coro::Task<HTTPBodyEventQueue::ReadBodyResult>
38+
HTTPBodyEventQueue::readBodyEvent(uint32_t max) {
39+
XCHECK(source_);
40+
XLOG(DBG4) << "Waiting for buffer space";
41+
auto bufferSpace = availableBuffer();
42+
if (!bufferSpace) {
43+
bufferSpace = co_await co_nothrow(waitAvailableBuffer());
44+
}
45+
XLOG(DBG4) << "Got buffer space len=" << bufferSpace
46+
<< " waiting for body event";
47+
max = std::min(max, uint32_t(bufferSpace));
48+
auto bodyEvent = co_await co_nothrow(source_.readBodyEvent(max));
49+
const auto& ct = co_await folly::coro::co_current_cancellation_token;
50+
if (ct.isCancellationRequested()) {
51+
co_yield co_error(HTTPError{HTTPErrorCode::CORO_CANCELLED});
52+
}
53+
54+
if (bodyEvent.eventType == HTTPBodyEvent::SUSPEND) {
55+
co_return ReadBodyResult({.resume = std::move(bodyEvent.event.resume)});
56+
}
57+
if (bodyEvent.eventType == HTTPBodyEvent::BODY &&
58+
!bodyEvent.event.body.empty()) {
59+
callback_.onEgressBytesBuffered(
60+
static_cast<int64_t>(bodyEvent.event.body.chainLength()));
61+
}
62+
co_return ReadBodyResult({
63+
.resume = folly::none,
64+
.eom = processBodyEvent(std::move(bodyEvent)),
65+
});
66+
}
67+
68+
void HTTPBodyEventQueue::setExpectedEgressContentLength(
69+
const std::string& contentLen, bool eom) {
70+
if (contentLen.empty()) {
71+
shouldValidateContentLength_ = false;
72+
return;
73+
}
74+
75+
auto convResult = folly::tryTo<uint64_t>(contentLen);
76+
if (convResult.hasError()) {
77+
XLOG(ERR)
78+
<< "Invalid content-length: " << contentLen << ", ex="
79+
<< folly::makeConversionError(convResult.error(), contentLen).what();
80+
shouldValidateContentLength_ = false;
81+
return;
82+
}
83+
84+
expectedContentLength_ = convResult.value();
85+
validateContentLength(eom);
86+
}
87+
88+
void HTTPBodyEventQueue::clear(const HTTPError& err) {
89+
size_t bytesBuffered{0};
90+
for (auto& ev : bodyQueue_) {
91+
for (auto& reg : ev.byteEventRegistrations) {
92+
reg.cancel(err, folly::none);
93+
}
94+
if (ev.eventType == HTTPBodyEvent::EventType::BODY) {
95+
bytesBuffered += ev.event.body.chainLength();
96+
}
97+
}
98+
bodyQueue_.clear();
99+
callback_.onEgressBytesBuffered(-static_cast<int64_t>(bytesBuffered));
100+
}
101+
102+
HTTPBodyEvent HTTPBodyEventQueue::dequeueBodyEvent(uint32_t max) {
103+
XCHECK(!bodyQueue_.empty());
104+
auto bodyEvent = std::move(bodyQueue_.front());
105+
bodyQueue_.pop_front();
106+
if (bodyEvent.eventType == HTTPBodyEvent::BODY) {
107+
auto length = bodyEvent.event.body.chainLength();
108+
auto toReturn = std::min(max, uint32_t(length));
109+
XCHECK_GE(bufferedBodyBytes_, toReturn);
110+
bool wasOverLimit = bufferedBodyBytes_ >= limit_;
111+
bufferedBodyBytes_ -= toReturn;
112+
if (wasOverLimit && bufferedBodyBytes_ < limit_) {
113+
event_.signal();
114+
}
115+
if (toReturn < length) {
116+
// only return part of it, split the buffer and re-push it to the front
117+
auto resultBuf = bodyEvent.event.body.splitAtMost(toReturn);
118+
bodyQueue_.emplace_front(std::move(bodyEvent));
119+
return HTTPBodyEvent(std::move(resultBuf), false);
120+
}
121+
}
122+
return bodyEvent;
123+
}
124+
125+
folly::coro::Task<size_t> HTTPBodyEventQueue::waitAvailableBuffer() {
126+
while (bufferedBodyBytes_ >= limit_) {
127+
event_.reset();
128+
auto status = co_await event_.wait();
129+
if (status == TimedBaton::Status::timedout) {
130+
co_yield folly::coro::co_error(HTTPError(
131+
HTTPErrorCode::READ_TIMEOUT, "timed out waiting for buffer space"));
132+
} else if (status == TimedBaton::Status::cancelled) {
133+
co_yield folly::coro::co_error(HTTPError(
134+
HTTPErrorCode::CORO_CANCELLED, "cancelled waiting for buffer space"));
135+
}
136+
}
137+
co_return limit_ - bufferedBodyBytes_;
138+
}
139+
140+
bool HTTPBodyEventQueue::processBodyEvent(HTTPBodyEvent&& bodyEvent) {
141+
XLOG(DBG4) << "Queuing body event";
142+
for (auto& reg : bodyEvent.byteEventRegistrations) {
143+
if (!reg.streamID) {
144+
reg.streamID = id_;
145+
}
146+
}
147+
bool eom = bodyEvent.eom;
148+
size_t addedBodyLength = 0;
149+
if (bodyEvent.eventType == HTTPBodyEvent::BODY) {
150+
addedBodyLength = bodyEvent.event.body.chainLength();
151+
bufferedBodyBytes_ += addedBodyLength;
152+
observedBodyLength_ += addedBodyLength;
153+
if (!bodyQueue_.empty() &&
154+
bodyQueue_.back().eventType == HTTPBodyEvent::BODY &&
155+
bodyQueue_.back().byteEventRegistrations.empty()) {
156+
// Coalesce this bodyEvent into the last one in queue
157+
auto& back = bodyQueue_.back();
158+
back.event.body.append(bodyEvent.event.body.move());
159+
back.eom = eom;
160+
back.byteEventRegistrations = std::move(bodyEvent.byteEventRegistrations);
161+
} else {
162+
bodyQueue_.emplace_back(std::move(bodyEvent));
163+
}
164+
} else {
165+
bodyQueue_.emplace_back(std::move(bodyEvent));
166+
}
167+
validateContentLength(eom);
168+
return eom;
169+
}
170+
171+
} // namespace proxygen::coro

0 commit comments

Comments
 (0)