-
Notifications
You must be signed in to change notification settings - Fork 20
Make HttpServer.accept concurrent
#179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* Wait for headers of multiple connections concurrently in accept * Add compat with clients using HttpServer.create(handshakeTimeout)
Ivansete-status
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the wonderful PR! 🙌
I'm just adding some nitpicks that I hope you find useful
websock/http/server.nim
Outdated
| maxConcurrentHandshakes*: int | ||
| handshakeResults*: AsyncQueue[HandshakeResult] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can these be private?
| maxConcurrentHandshakes*: int | |
| handshakeResults*: AsyncQueue[HandshakeResult] | |
| maxConcurrentHandshakes: int | |
| handshakeResults: AsyncQueue[HandshakeResult] |
Also, is seems something simpler can be used:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make these fields private, good idea!
I think we are stuck with AsyncXXX data structures here because of the signaling/synchronization. I'm not entirely sure, but I think that's the case here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AsyncQueue supports bounds which means this PR can remove most of the accounting and just use the queue:
proc worker =
let socket = try: ok await accept(...) except ..
await queue.addLast(socket)
proc accept =
let sockorerr = await queue.popFirst...
...this also makes it easier to skip the queue/worker entirely if the bound is 1.
websock/http/server.nim
Outdated
| raise (ref HttpError)(msg: error) | ||
| if server.closed: | ||
| if server.handshakeResults.isNil or server.handshakeResults.len == 0: | ||
| raise newException(TransportUseClosedError, "Server is closed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to use the same kind of exception because we don't know how they are actually handled
| raise newException(TransportUseClosedError, "Server is closed") | |
| raise newException(HttpError, "Server is closed") |
Besides, we can raise that "Server is closed" exception under the if server.closed:, can't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not seeing things (maybe I'm wrong here) nim-libp2p would like to actually know what specifically has failed, so for example if it's a transport error, specifically it will catch it like this. If we have to emulate trying to read from an underlying closed socket, that would be a transport error.
The reason we are emulating it here instead of letting the underlying socket accept produce it for us is because we created all this dispatcher/worker machinery that is mediated by a queue. Here we could create extra code to force the flow to go into a fake dispatcher/worker that isn't needed anymore, and it would reach the underlying socket accept, and it would produce a transport error.
Then if nil/zero checks underneath if server.closed will emulate the previous accept behavior more closely by allowing it to throw transport/stream exceptions for all calls to accept. It will dequeue every result before it runs out of elements in the queue and is left with the last "signal" exception in there. But could also just if server.closed: raise exception here I guess? The queue would be GC'd in any case when the closed HttpServer is disposed of. Hmm...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just a way-too-long way to say that the previous version of accept doesn't check for server.closed. Sorry!
websock/http/server.nim
Outdated
| if server.handshakeResults.isNil or server.handshakeResults.len == 0: | ||
| raise newException(TransportUseClosedError, "Server is closed") | ||
|
|
||
| if server.handshakeResults.isNil: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not used to use that form, but interesting to keep consistency with the above
| if server.handshakeResults.isNil: | |
| if isNil(server.handshakeResults): |
websock/http/server.nim
Outdated
| raise newException(TransportUseClosedError, "Server is closed") | ||
|
|
||
| if server.handshakeResults.isNil: | ||
| server.handshakeResults = newAsyncQueue[HandshakeResult]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe better to have this in the create procs?
| type | ||
| HttpAsyncCallback* = proc(request: HttpRequest) {.async.} | ||
|
|
||
| HandshakeResult* = Result[HttpRequest, ref CatchableError] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this can be simpler by just using the error message. Also, is interesting to make every error message unique so that we clearly now where it happened, instead of always using Server is closed. IMHO, in the future we should avoid raising exceptions.
| HandshakeResult* = Result[HttpRequest, ref CatchableError] | |
| HandshakeResult* = Result[HttpRequest, string] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I'm saving all exception types here is to guarantee full compatibility with the previous version of this accept proc, which hands off all kinds of exceptions to any and all existing code currently using nim-websock.
If we change the error bookkeeping inside the dispatcher/worker, we either won't throw exceptions on the outside or we will throw a single exception type when pumping it out of handshakeResults, which will change how accept behaves and how callers of accept currently expect it to behave, which may introduce bugs.
This patch is just trying to solve the Slowloris attack for the websocket transport here while also trying to not touch anything else. I'd leave changes to accept's runtime behavior (or revamping of the websocket connection-accept API) for later. Unless we want to address that now, because your suggestion is actually better, as would be just consuming most errors internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for passing the original exception(s) - it gets a bit messy with explicit raises types - there's the possibility to cut down on the number of exception types raised, though it's an open question if that's a good idea or not.
| let res = await server.handshakeResults.popFirst() | ||
|
|
||
| if res.isErr: | ||
| let err = res.error | ||
| if err of AcceptDispatcherFinishedError: | ||
| server.handshakeResults.addLastNoWait(res) | ||
| raise newException(TransportUseClosedError, "Server is closed") | ||
| raise err | ||
|
|
||
| return res.value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the type AcceptDispatcherFinishedError is not needed
| let res = await server.handshakeResults.popFirst() | |
| if res.isErr: | |
| let err = res.error | |
| if err of AcceptDispatcherFinishedError: | |
| server.handshakeResults.addLastNoWait(res) | |
| raise newException(TransportUseClosedError, "Server is closed") | |
| raise err | |
| return res.value | |
| let res = (await server.handshakeResults.popFirst()).valueOr: | |
| if server.closed: | |
| raise newException(HttpError, "Server closed pop first: " & $error) | |
| raise newException(HttpError, "Error found in websock when pop first: " & $error) | |
| return res |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need an exception type to store in the Result[E] that isn't normally raised. It's just a signal that the server is closed so popFirst() doesn't block. When we pop this special signal, we put it back in to make sure we don't block any accept callers (can me more than one). It's better to use a custom type instead of trying to guess and reuse an exception type that isn't thrown, or creating some other out-of-band signaling mechanism here.
| if server.maxConcurrentHandshakes > 0 and | ||
| activeHandshakes >= server.maxConcurrentHandshakes: | ||
| slotAvailable.clear() | ||
| await slotAvailable.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this is needed if may I ask?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If for some configuration error we leave maxConcurrentHandshakes at 0 (or if the user just doesn't want a limit to the concurrent handshakes for some reason) then we should not wait for capacity (which is never going to arrive and will block forever). We also don't wait for capacity if we haven't exceeded it (>= server.maxConcurrentHandshakes).
| HandshakeResult.err(newException(HttpError, error)) | ||
| ) | ||
| except AsyncQueueFullError: | ||
| discard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That may not be needed if we change to deque type but the discard should be used in a very limited occasions :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using discard here as a "no-op," it doesn't compile if I leave the exception handler block empty, so it looks like "discard" is the best alternative. This exception never triggers (and if it would ever somehow trigger we have far worse problems) so I guess I get a pass here for the empty exception handler.
websock/http/server.nim
Outdated
| var headersTimeout = headersTimeout | ||
| if handshakeTimeout > 0.seconds and handshakeTimeout < headersTimeout: | ||
| headersTimeout = handshakeTimeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When assigning the headersTimeout below, we could have something like that:
var server = HttpServer(
...
headersTimeout: if handshakeTimeout > 0.seconds: min(handshakeTimeout, headersTimeout) else: headersTimeout* Move newAsyncQueue to HttpServer.create * Error out of accept simply `if server.closed` * Track handshakeDispatcher future (remove asyncSpawn) * Nicer code for the handshake/header timeout workaround * Make new HttpServer vars private
|
I'm leaning on taking out the dispatcher/worker lambdas out of |
There are a few ways to skin this cat unfortunately - the original suggestion is to simply deal with this on the consumer side and spawn multiple accepts: #79 (comment) Having this functionality inside websock makes the api somewhat easier to use at the expense of using up file descriptors while the accepts are ongoing which may be surprising to users of the API. Neither of the solutions address stuffing the queue with N slow connections, ie it's an amelioration rather than a solution - certainly a worth-while one, but still something that should probably be documented in the API docs. A few alternative ways to address issues like this that I can think of:
The situation is similar to other "upgrades" (like negotiating TLS on a regular socket) which could serve as inspiration for a path forward.
Have you given any thought to what this could look like? One more thing scheduled for deprecation is the "callback-style" server ( A related feature missing is the ability to share port with regular http connections - solving this would require exposing support for https://status-im.github.io/nim-chronos/http_server_middleware.html - this would mean adding support more broadly for interop with chronos' native http offering. |
I'm trying now to solve it in nim-libp2p's wstransport without touching nim-websock. Which would be ideal, but it's harder for some reason. Still figuring it out.
I think that, maybe, there's something about #79 (comment) that doesn't work. nim-libp2p is doing what that comment suggests (or a version of it that looks like it), but it looks like it just doesn't work. It's still serializing the low-level TCP socket accept and the header parsing timeout -- unless I'm just seeing things. I will try to make that work first.
These seem harder and trickier to do, but maybe that's just a skill issue on my end. But they seem to be good ideas.
The hook is a good additional idea. It would check e.g. IP addresses or IP blocks (/), which would be good but would still benefit (combine with) what we're solving or ameliorating here, and I don't think that particular hook idea can shoulder the whole issue in any case.
If we decide nim-websock is where this fix goes, we can try to generalize it first.
What I am thinking now is that the current The #79 (comment) mentions some immediately obvious solutions like exposing the low-level header-parsing machinery for the caller to orchestrate, e.g. return the stream on accept so that you can parse it later, etc. It looks like a last resort. I think that what I would prefer is some blunt instrument -- this PR, but put it in a new " The above would require an additional param to HttpServer ("maxConcurrentAcceptMany"... ugh) that is just used when acceptMany is called. Can't be a param to acceptMany. We could also throw in the towel at some point and just implement acceptMany, make nim-libp2p use it (1-line change), close the WSS vuln issue (we made it N times better, where N = maxConcurrentHandshakes, and FDs, per HttpServer potentially), then deprecate acceptMany later when we figure out how to effectively get rid of the blocking from failed client headers. And if acceptMany never gets deprecated, still the consolation is that once everything is QUIC, we don't allocate an FD for each client socket. Then it's just tasks in the chronos pool and RAM use so we can time out thousands of truncated headers simultaneously.
Great to know this; was also thinking about using that somehow to fix the issue.
I don't have a good grasp of this, but I don't mind e.g. an "acceptMany" API in nim-websock (or whatever we'd add in nim-libp2p to fix the issue instead) becoming roadkill when the nim-websock implementation has to significantly change. There's a chance that the nim-websock accept API (if it's still there in this form) would use the new back-end and then just work. |
|
oops, github closed this automatically when the error branch was merged which wasn't the intent - probably because the target branch went away, it also can't be reopened :/ |
Don't worry, this PR was submitted the wrong way anyways -- I should have pushed a branch to the EDIT: Also this PR targeting the |
NOTE: This PR is based off of #178 (
websock-errors) and is currently using that PR/branch as the base branch for merge. It adds a single commit towebsock-errors.This PR fixes waku-org/nwaku#3634 by making the accept method in HttpServer relieve the underlying transport of multiple incoming socket connections simultaneously, instead of waiting the full headersTimeout on one accepted socket before erroring out and allowing the next client socket to be fetched from the accept socket.
This PR also adds compatibility with nim-libp2p's reliance on handshakeTimeout.
Passes nim-websock's
nimble testand ad-hoc testing with a single nwaku instance.This patch works, but there's probably other ways to fix the issue and I'm not sure if this is the right way to do it. All feedback welcome here!
̶C̶l̶o̶s̶e̶s̶ addresses waku-org/nwaku#3634 (actually closed by nwaku nim-websock vendor bump later)