Skip to content

Commit 667f8dd

Browse files
authored
add initial stream bridge (#964)
1 parent 00f8d2c commit 667f8dd

File tree

3 files changed

+306
-0
lines changed

3 files changed

+306
-0
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
/// Errors that are particular to the NIO-based Smithy HTTP client.
9+
public enum NIOHTTPClientError: Error {
10+
11+
case streamingError(Error)
12+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import AsyncHTTPClient
9+
import Foundation
10+
import NIO
11+
import Smithy
12+
import SmithyStreams
13+
14+
/// Handles streaming between Smithy streams and AsyncHTTPClient
15+
final class NIOHTTPClientStreamBridge {
16+
17+
/// Convert Smithy ByteStream to AsyncHTTPClient request body
18+
static func convertRequestBody(
19+
from body: ByteStream,
20+
allocator: ByteBufferAllocator,
21+
chunkSize: Int = CHUNK_SIZE_BYTES
22+
) async throws -> AsyncHTTPClient.HTTPClientRequest.Body {
23+
switch body {
24+
case .noStream:
25+
// No body to send
26+
return .bytes(allocator.buffer(capacity: 0))
27+
28+
case .data(let data):
29+
// Convert Data to ByteBuffer
30+
if let data = data {
31+
var buffer = allocator.buffer(capacity: data.count)
32+
buffer.writeBytes(data)
33+
return .bytes(buffer)
34+
} else {
35+
return .bytes(allocator.buffer(capacity: 0))
36+
}
37+
38+
case .stream(let stream):
39+
// Handle streaming request body
40+
return try await convertStreamToRequestBody(stream: stream, allocator: allocator, chunkSize: chunkSize)
41+
}
42+
}
43+
44+
/// Convert AsyncHTTPClient response body to Smithy ByteStream
45+
static func convertResponseBody(
46+
from response: AsyncHTTPClient.HTTPClientResponse
47+
) async -> ByteStream {
48+
let bufferedStream = BufferedStream()
49+
50+
do {
51+
var iterator = response.body.makeAsyncIterator()
52+
while let buffer = try await iterator.next() {
53+
// Convert ByteBuffer to Data and write to buffered stream
54+
if let bytes = buffer.getBytes(at: buffer.readerIndex, length: buffer.readableBytes) {
55+
let data = Data(bytes)
56+
try bufferedStream.write(contentsOf: data)
57+
}
58+
}
59+
bufferedStream.close()
60+
} catch {
61+
bufferedStream.closeWithError(error)
62+
}
63+
64+
return .stream(bufferedStream)
65+
}
66+
67+
/// Convert a Smithy Stream to AsyncHTTPClient request body
68+
private static func convertStreamToRequestBody(
69+
stream: Smithy.Stream,
70+
allocator: ByteBufferAllocator,
71+
chunkSize: Int = CHUNK_SIZE_BYTES
72+
) async throws -> AsyncHTTPClient.HTTPClientRequest.Body {
73+
if let streamLength = stream.length {
74+
let asyncSequence = StreamToAsyncSequence(stream: stream, allocator: allocator, chunkSize: chunkSize)
75+
return .stream(asyncSequence, length: .known(Int64(streamLength)))
76+
} else {
77+
do {
78+
let data = try await stream.readToEndAsync()
79+
if let data = data {
80+
var buffer = allocator.buffer(capacity: data.count)
81+
buffer.writeBytes(data)
82+
return .bytes(buffer)
83+
} else {
84+
return .bytes(allocator.buffer(capacity: 0))
85+
}
86+
} catch {
87+
throw NIOHTTPClientError.streamingError(error)
88+
}
89+
}
90+
}
91+
}
92+
93+
/// AsyncSequence adapter that converts a Smithy Stream to ByteBuffer sequence for AsyncHTTPClient
94+
internal struct StreamToAsyncSequence: AsyncSequence, Sendable {
95+
typealias Element = ByteBuffer
96+
97+
private let stream: Smithy.Stream
98+
private let allocator: ByteBufferAllocator
99+
private let chunkSize: Int
100+
101+
init(stream: Smithy.Stream, allocator: ByteBufferAllocator, chunkSize: Int = CHUNK_SIZE_BYTES) {
102+
self.stream = stream
103+
self.allocator = allocator
104+
self.chunkSize = chunkSize
105+
}
106+
107+
func makeAsyncIterator() -> AsyncIterator {
108+
AsyncIterator(stream: stream, allocator: allocator, chunkSize: chunkSize)
109+
}
110+
111+
struct AsyncIterator: AsyncIteratorProtocol {
112+
private let stream: Smithy.Stream
113+
private let allocator: ByteBufferAllocator
114+
private let chunkSize: Int
115+
private var isFinished = false
116+
117+
init(stream: Smithy.Stream, allocator: ByteBufferAllocator, chunkSize: Int) {
118+
self.stream = stream
119+
self.allocator = allocator
120+
self.chunkSize = chunkSize
121+
}
122+
123+
mutating func next() async throws -> ByteBuffer? {
124+
guard !isFinished else { return nil }
125+
126+
do {
127+
// Read a chunk from the stream (using configurable chunk size)
128+
let data = try await stream.readAsync(upToCount: chunkSize)
129+
130+
if let data = data, !data.isEmpty {
131+
var buffer = allocator.buffer(capacity: data.count)
132+
buffer.writeBytes(data)
133+
return buffer
134+
} else {
135+
isFinished = true
136+
stream.close()
137+
return nil
138+
}
139+
} catch {
140+
isFinished = true
141+
stream.close()
142+
throw NIOHTTPClientError.streamingError(error)
143+
}
144+
}
145+
}
146+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import AsyncHTTPClient
9+
import Foundation
10+
import NIO
11+
import SmithyTestUtil
12+
import XCTest
13+
import class SmithyStreams.BufferedStream
14+
import enum Smithy.LogAgentLevel
15+
import protocol Smithy.LogAgent
16+
import enum Smithy.ByteStream
17+
@testable import ClientRuntime
18+
19+
class NIOHTTPClientStreamBridgeTests: XCTestCase {
20+
let allocator = ByteBufferAllocator()
21+
22+
func test_convertResponseBody_streamsAllDataCorrectly() async throws {
23+
24+
// The maximum size of input streaming data in the tests
25+
let maxDataSize = 65_536 // 64 kb
26+
27+
// Create & fill a buffer with random bytes, for use in later test setup
28+
// Random buffer is reused because creating random data is slow
29+
// We are responsible for deallocating it
30+
let randomBuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: maxDataSize)
31+
defer { randomBuffer.deallocate() }
32+
33+
for i in 0..<maxDataSize {
34+
randomBuffer[i] = UInt8.random(in: UInt8.min...UInt8.max)
35+
}
36+
37+
// Run this test repeatedly to help uncover any problems
38+
let numberOfRuns = 100
39+
40+
for run in 1...numberOfRuns {
41+
// Test with varying data sizes up to the maximum
42+
let dataSize = min(run * 1000, maxDataSize)
43+
44+
// Create original test data
45+
let originalData = Data(bytes: randomBuffer, count: dataSize)
46+
47+
// Create a mock AsyncHTTPClient response with the test data
48+
var buffer = allocator.buffer(capacity: originalData.count)
49+
buffer.writeBytes(originalData)
50+
51+
let response = AsyncHTTPClient.HTTPClientResponse(
52+
version: .http1_1,
53+
status: .ok,
54+
headers: [:],
55+
body: .bytes(buffer)
56+
)
57+
58+
let resultStream = await NIOHTTPClientStreamBridge.convertResponseBody(from: response)
59+
let convertedData = try await readAllData(from: resultStream)
60+
61+
XCTAssertEqual(convertedData, originalData,
62+
"Run \(run) failed (dataSize: \(dataSize))")
63+
}
64+
}
65+
66+
func test_convertRequestBody_withNoStream() async throws {
67+
let byteStream = ByteStream.noStream
68+
69+
let result = try await NIOHTTPClientStreamBridge.convertRequestBody(
70+
from: byteStream,
71+
allocator: allocator
72+
)
73+
74+
// Convert the body to an async sequence and verify it's empty
75+
var totalBytes = 0
76+
for try await buffer in result {
77+
totalBytes += buffer.readableBytes
78+
}
79+
80+
XCTAssertEqual(totalBytes, 0)
81+
}
82+
83+
func test_convertRequestBody_withData() async throws {
84+
let testData = "Hello, World!".data(using: .utf8)!
85+
let byteStream = ByteStream.data(testData)
86+
87+
let result = try await NIOHTTPClientStreamBridge.convertRequestBody(
88+
from: byteStream,
89+
allocator: allocator
90+
)
91+
92+
var collectedData = Data()
93+
for try await buffer in result {
94+
collectedData.append(Data(buffer: buffer))
95+
}
96+
97+
XCTAssertEqual(collectedData, testData)
98+
}
99+
100+
func test_convertRequestBody_withStream() async throws {
101+
// Create random test data
102+
let dataSize = 1000
103+
let randomBuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: dataSize)
104+
defer { randomBuffer.deallocate() }
105+
106+
for i in 0..<dataSize {
107+
randomBuffer[i] = UInt8.random(in: UInt8.min...UInt8.max)
108+
}
109+
110+
let testData = Data(bytes: randomBuffer, count: dataSize)
111+
let bufferedStream = BufferedStream(data: testData, isClosed: true)
112+
let byteStream = ByteStream.stream(bufferedStream)
113+
114+
let result = try await NIOHTTPClientStreamBridge.convertRequestBody(
115+
from: byteStream,
116+
allocator: allocator,
117+
chunkSize: 100 // try a non-default chunk size
118+
)
119+
120+
var collectedData = Data()
121+
for try await buffer in result {
122+
collectedData.append(Data(buffer: buffer))
123+
}
124+
125+
XCTAssertEqual(collectedData, testData)
126+
}
127+
128+
private func readAllData(from byteStream: ByteStream) async throws -> Data {
129+
switch byteStream {
130+
case .stream(let stream):
131+
return try await stream.readToEndAsync() ?? Data()
132+
case .data(let data):
133+
return data ?? Data()
134+
case .noStream:
135+
return Data()
136+
}
137+
}
138+
}
139+
140+
private extension Data {
141+
init(buffer: ByteBuffer) {
142+
if let bytes = buffer.getBytes(at: buffer.readerIndex, length: buffer.readableBytes) {
143+
self.init(bytes)
144+
} else {
145+
self.init()
146+
}
147+
}
148+
}

0 commit comments

Comments
 (0)