From 6951016f19ee06a2869b80d207dd875aae6c92f2 Mon Sep 17 00:00:00 2001 From: Simon Pilkington Date: Fri, 15 Aug 2025 11:11:03 +1000 Subject: [PATCH 1/2] Use correct safe collection concurrency primatives. --- .swiftformat | 2 +- Package.resolved | 24 +- Package.swift | 5 +- ...itePrimaryKeyTable+bulkUpdateSupport.swift | 18 +- ...moDBCompositePrimaryKeyTable+execute.swift | 20 +- ...oDBCompositePrimaryKeyTable+getItems.swift | 2 +- ...CompositePrimaryKeyTable+updateItems.swift | 95 ++-- .../AWSDynamoDBCompositePrimaryKeyTable.swift | 8 +- .../CustomRowTypeIdentifier.swift | 10 +- .../DynamoDBClientProtocol.swift | 25 +- ...lobberVersionedItemWithHistoricalRow.swift | 2 +- ...yKeyTable+conditionallyInTransaction.swift | 7 +- ...maryKeyTable+conditionallyUpdateItem.swift | 2 +- .../DynamoDBCompositePrimaryKeyTable.swift | 10 +- .../InMemoryDataRepresentations.swift | 14 +- ...namoDBCompositePrimaryKeyTable+query.swift | 14 +- ...DBCompositePrimaryKeyTable+transform.swift | 8 +- ...oDBCompositePrimaryKeyTableWithIndex.swift | 10 +- ...BCompositePrimaryKeysProjectionStore.swift | 7 +- .../InternalKeyedDecodingContainer.swift | 7 +- ...InternalSingleValueEncodingContainer.swift | 7 +- .../PolymorphicOperationReturnType.swift | 2 +- .../PolymorphicWriteEntry.swift | 8 +- .../QueryInput+forSortKeyCondition.swift | 9 +- Sources/DynamoDBTables/RowWithIndex.swift | 2 +- .../DynamoDBTables/RowWithItemVersion.swift | 2 +- .../DynamoDBTables/Sequence+concurrency.swift | 422 ++++++++++++++++++ ...encyDynamoDBCompositePrimaryKeyTable.swift | 14 +- .../TypedDatabaseItemWithTimeToLive.swift | 4 +- .../DynamoDBTablesMacros/BaseEntryMacro.swift | 6 +- ...ynamoDBCompositePrimaryKeyTableTests.swift | 2 + ...ynamoDBCompositePrimaryKeyTableTests.swift | 2 +- .../TestConfiguration.swift | 4 +- ...Item+RowWithItemVersionProtocolTests.swift | 8 +- 34 files changed, 588 insertions(+), 194 deletions(-) create mode 100644 Sources/DynamoDBTables/Sequence+concurrency.swift diff --git a/.swiftformat b/.swiftformat index d1ed421..1ebc37a 100644 --- a/.swiftformat +++ b/.swiftformat @@ -1,5 +1,5 @@ # Swift version ---swiftversion 5.8 +--swiftversion 6.1 # file options --exclude .build diff --git a/Package.resolved b/Package.resolved index 7bc795a..71382c1 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "13b2eb8b8f0d979ce0d4c53cbf8649c671c922ab761e78916e46299bee6ea407", + "originHash" : "4d3e06790cea6d7326e933f8433c6f6854f0bd90072b780af74424588538f4e9", "pins" : [ { "identity" : "aws-crt-swift", @@ -19,15 +19,6 @@ "version" : "1.5.20" } }, - { - "identity" : "collectionconcurrencykit", - "kind" : "remoteSourceControl", - "location" : "https://github.com/JohnSundell/CollectionConcurrencyKit", - "state" : { - "revision" : "b4f23e24b5a1bff301efc5e70871083ca029ff95", - "version" : "0.2.0" - } - }, { "identity" : "grpc-swift", "kind" : "remoteSourceControl", @@ -69,8 +60,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/tachyonics/smockable", "state" : { - "revision" : "2ebc2a33ba83f2bf87e784346ad8b70794150379", - "version" : "0.3.0" + "revision" : "2df8f192eab94fd789d56f57119eb83eec3601fd", + "version" : "0.4.0" } }, { @@ -271,15 +262,6 @@ "version" : "1.6.2" } }, - { - "identity" : "swiftformat", - "kind" : "remoteSourceControl", - "location" : "https://github.com/nicklockwood/SwiftFormat", - "state" : { - "revision" : "b83a8f132ff5430f3b6ff7d912baa407caea9e29", - "version" : "0.57.2" - } - }, { "identity" : "thrift-swift", "kind" : "remoteSourceControl", diff --git a/Package.swift b/Package.swift index dc6af79..bbecab1 100644 --- a/Package.swift +++ b/Package.swift @@ -43,10 +43,8 @@ let package = Package( .package(url: "https://github.com/awslabs/aws-sdk-swift.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-metrics.git", "1.0.0" ..< "3.0.0"), - .package(url: "https://github.com/JohnSundell/CollectionConcurrencyKit", from: "0.2.0"), - .package(url: "https://github.com/nicklockwood/SwiftFormat", from: "0.53.9"), .package(url: "https://github.com/apple/swift-syntax", from: "601.0.0"), - .package(url: "https://github.com/tachyonics/smockable", from: "0.3.0"), + .package(url: "https://github.com/tachyonics/smockable", from: "0.4.0"), ], targets: [ .macro(name: "DynamoDBTablesMacros", dependencies: [ @@ -59,7 +57,6 @@ let package = Package( .product(name: "Logging", package: "swift-log"), .product(name: "Metrics", package: "swift-metrics"), .product(name: "AWSDynamoDB", package: "aws-sdk-swift"), - .product(name: "CollectionConcurrencyKit", package: "CollectionConcurrencyKit"), .product(name: "Smockable", package: "smockable"), ], swiftSettings: swiftSettings), diff --git a/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+bulkUpdateSupport.swift b/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+bulkUpdateSupport.swift index 37195d2..f60a4e5 100644 --- a/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+bulkUpdateSupport.swift +++ b/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+bulkUpdateSupport.swift @@ -36,11 +36,11 @@ enum AttributeDifference: Equatable { var path: String { switch self { case .update(path: let path, value: _): - return path + path case let .remove(path: path): - return path + path case .listAppend(path: let path, value: _): - return path + path } } } @@ -241,17 +241,17 @@ extension GenericAWSDynamoDBCompositePrimaryKeyTable { private func combinePath(basePath: String?, newComponent: String) -> String { if let basePath { - return "\(basePath).\"\(newComponent)\"" + "\(basePath).\"\(newComponent)\"" } else { - return "\"\(newComponent)\"" + "\"\(newComponent)\"" } } private func updateAttribute(newPath: String, attribute: DynamoDBClientTypes.AttributeValue) throws -> [AttributeDifference] { if let newValue = try getFlattenedAttribute(attribute: attribute) { - return [.update(path: newPath, value: newValue)] + [.update(path: newPath, value: newValue)] } else { - return [.remove(path: newPath)] + [.remove(path: newPath)] } } @@ -310,9 +310,9 @@ extension GenericAWSDynamoDBCompositePrimaryKeyTable { /// single quote by doubling it. E.g. 'foo'bar' becomes 'foo''bar'. private func sanitizeString(_ string: String) -> String { if self.tableConfiguration.escapeSingleQuoteInPartiQL { - return string.replacingOccurrences(of: "'", with: "''") + string.replacingOccurrences(of: "'", with: "''") } else { - return string + string } } } diff --git a/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+execute.swift b/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+execute.swift index 102daaa..41a6c23 100644 --- a/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+execute.swift +++ b/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+execute.swift @@ -40,18 +40,16 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable { { let attributesFilterString = attributesFilter?.joined(separator: ", ") ?? "*" - let partitionWhereClause: String - if partitionKeys.count == 1 { - partitionWhereClause = "\(partitionKeyAttributeName)='\(partitionKeys[0])'" + let partitionWhereClause = if partitionKeys.count == 1 { + "\(partitionKeyAttributeName)='\(partitionKeys[0])'" } else { - partitionWhereClause = "\(partitionKeyAttributeName) IN ['\(partitionKeys.joined(separator: "', '"))']" + "\(partitionKeyAttributeName) IN ['\(partitionKeys.joined(separator: "', '"))']" } - let whereClausePostfix: String - if let additionalWhereClause { - whereClausePostfix = " \(additionalWhereClause)" + let whereClausePostfix = if let additionalWhereClause { + " \(additionalWhereClause)" } else { - whereClausePostfix = "" + "" } return """ @@ -125,7 +123,7 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable { nextToken: nil) } - return itemLists.flatMap { $0 } + return itemLists.flatMap(\.self) } func execute( @@ -193,7 +191,7 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable { nextToken: nil) } - return itemLists.flatMap { $0 } + return itemLists.flatMap(\.self) } // function to return a future with the results of an execute call and all future paginated calls @@ -320,7 +318,7 @@ extension [DynamoDBTableError] { // iterate through all errors return self.compactMap { error in - return switch error { + switch error { case .accessDenied: canPassThrough(state: &seenAccessDenied) ? error : nil case .internalServerError: diff --git a/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+getItems.swift b/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+getItems.swift index 0a73a78..32a63a9 100644 --- a/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+getItems.swift +++ b/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+getItems.swift @@ -42,7 +42,7 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable { monitors the unprocessed items returned in the response from DynamoDB and uses an exponential backoff algorithm to retry those items using the same retry configuration as the underlying DynamoDB client. */ - private class GetItemsRetriable { + private class GetItemsRetriable { typealias OutputType = [CompositePrimaryKey: TypedTTLDatabaseItem] let dynamodb: DynamoClient diff --git a/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+updateItems.swift b/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+updateItems.swift index ff82869..f9505fa 100644 --- a/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+updateItems.swift +++ b/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+updateItems.swift @@ -25,7 +25,6 @@ // import AWSDynamoDB -import CollectionConcurrencyKit import Foundation import Logging @@ -38,7 +37,7 @@ public enum AWSDynamoDBLimits { public static let maxStatementLength = 8192 } -private struct AWSDynamoDBPolymorphicWriteEntryTransform: PolymorphicWriteEntryTransform { +private struct AWSDynamoDBPolymorphicWriteEntryTransform: PolymorphicWriteEntryTransform { typealias TableType = GenericAWSDynamoDBCompositePrimaryKeyTable let statement: String @@ -48,12 +47,12 @@ private struct AWSDynamoDBPolymorphicWriteEntryTransform: PolymorphicTransactionConstraintTransform { +private struct AWSDynamoDBPolymorphicTransactionConstraintTransform: PolymorphicTransactionConstraintTransform { typealias TableType = GenericAWSDynamoDBCompositePrimaryKeyTable let statement: String - init(_ entry: TransactionConstraintEntry, + init(_ entry: TransactionConstraintEntry, table: TableType) throws { self.statement = try table.entryToStatement(entry) @@ -75,34 +74,32 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable { internal func entryToStatement( _ entry: WriteEntry) throws -> String { - let statement: String - switch entry { + let statement: String = switch entry { case let .update(new: new, existing: existing): - statement = try getUpdateExpression(tableName: self.targetTableName, - newItem: new, - existingItem: existing) + try getUpdateExpression(tableName: self.targetTableName, + newItem: new, + existingItem: existing) case let .insert(new: new): - statement = try getInsertExpression(tableName: self.targetTableName, - newItem: new) + try getInsertExpression(tableName: self.targetTableName, + newItem: new) case let .deleteAtKey(key: key): - statement = try getDeleteExpression(tableName: self.targetTableName, - existingKey: key) + try getDeleteExpression(tableName: self.targetTableName, + existingKey: key) case let .deleteItem(existing: existing): - statement = try getDeleteExpression(tableName: self.targetTableName, - existingItem: existing) + try getDeleteExpression(tableName: self.targetTableName, + existingItem: existing) } return statement } internal func entryToStatement( - _ entry: TransactionConstraintEntry) throws -> String + _ entry: TransactionConstraintEntry) throws -> String { - let statement: String - switch entry { + let statement: String = switch entry { case let .required(existing: existing): - statement = getExistsExpression(tableName: self.targetTableName, - existingItem: existing) + getExistsExpression(tableName: self.targetTableName, + existingItem: existing) } return statement @@ -236,11 +233,10 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable { var isTransactionConflict = false let reasons = try zip(cancellationReasons, keys).compactMap { cancellationReason, entryKey -> DynamoDBTableError? in - let key: CompositePrimaryKey? - if let item = cancellationReason.item { - key = try DynamoDBDecoder().decode(.m(item)) + let key: CompositePrimaryKey? = if let item = cancellationReason.item { + try DynamoDBDecoder().decode(.m(item)) } else { - key = nil + nil } let partitionKey = key?.partitionKey ?? entryKey.partitionKey @@ -339,11 +335,10 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable { var isTransactionConflict = false let reasons = try zip(cancellationReasons, inputKeys).compactMap { cancellationReason, entryKey -> DynamoDBTableError? in - let key: CompositePrimaryKey? - if let item = cancellationReason.item { - key = try DynamoDBDecoder().decode(.m(item)) + let key: CompositePrimaryKey? = if let item = cancellationReason.item { + try DynamoDBDecoder().decode(.m(item)) } else { - key = nil + nil } let partitionKey = key?.partitionKey ?? entryKey.partitionKey @@ -469,21 +464,20 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable { } let statements = try entries.map { entry -> DynamoDBClientTypes.BatchStatementRequest in - let statement: String - switch entry { + let statement: String = switch entry { case let .update(new: new, existing: existing): - statement = try getUpdateExpression(tableName: self.targetTableName, - newItem: new, - existingItem: existing) + try getUpdateExpression(tableName: self.targetTableName, + newItem: new, + existingItem: existing) case let .insert(new: new): - statement = try getInsertExpression(tableName: self.targetTableName, - newItem: new) + try getInsertExpression(tableName: self.targetTableName, + newItem: new) case let .deleteAtKey(key: key): - statement = try getDeleteExpression(tableName: self.targetTableName, - existingKey: key) + try getDeleteExpression(tableName: self.targetTableName, + existingKey: key) case let .deleteItem(existing: existing): - statement = try getDeleteExpression(tableName: self.targetTableName, - existingItem: existing) + try getDeleteExpression(tableName: self.targetTableName, + existingItem: existing) } return DynamoDBClientTypes.BatchStatementRequest(consistentRead: self.tableConfiguration.consistentRead, statement: statement) @@ -515,15 +509,13 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable { } } - func bulkWriteWithFallback( + func bulkWriteWithFallback( _ entries: [WriteEntry]) async throws { - // fall back to singel operation if the write entry exceeds the statement length limitation - var bulkWriteEntries: [WriteEntry] = [] - let errors: [DynamoDBTableError] = try await entries.concurrentCompactMap { entry -> DynamoDBTableError? in + // fall back to single operation if the write entry exceeds the statement length limitation + let results: [Result, DynamoDBTableError>] = try await entries.concurrentMap { entry in do { try self.validateEntry(entry: entry) - bulkWriteEntries.append(entry) } catch DynamoDBTableError.statementLengthExceeded { do { switch entry { @@ -537,11 +529,22 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable { try await self.deleteItem(existingItem: existing) } } catch let error as DynamoDBTableError { - return error + return .failure(error) } } - return nil + return .success(entry) + } + + var bulkWriteEntries: [WriteEntry] = [] + var errors: [DynamoDBTableError] = [] + for result in results { + switch result { + case let .success(entry): + bulkWriteEntries.append(entry) + case let .failure(error): + errors.append(error) + } } do { diff --git a/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable.swift b/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable.swift index 40d7b19..b33f578 100644 --- a/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable.swift +++ b/Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable.swift @@ -25,14 +25,14 @@ // import AwsCommonRuntimeKit -import AWSDynamoDB +@preconcurrency import AWSDynamoDB import ClientRuntime import Foundation import Logging import Metrics import SmithyIdentity -public struct AWSDynamoDBTableMetrics { +public struct AWSDynamoDBTableMetrics: Sendable { // metric to record if the `TransactWrite` API is retried let transactWriteRetryCountRecorder: Metrics.Recorder? @@ -41,7 +41,7 @@ public struct AWSDynamoDBTableMetrics { } } -public struct AWSDynamoDBTableConfiguration { +public struct AWSDynamoDBTableConfiguration: Sendable { public let consistentRead: Bool public let escapeSingleQuoteInPartiQL: Bool public let retry: RetryConfiguration @@ -80,7 +80,7 @@ public struct AWSDynamoDBTableConfiguration { /// ``` public typealias AWSDynamoDBCompositePrimaryKeyTable = GenericAWSDynamoDBCompositePrimaryKeyTable -public struct GenericAWSDynamoDBCompositePrimaryKeyTable: DynamoDBCompositePrimaryKeyTable { +public struct GenericAWSDynamoDBCompositePrimaryKeyTable: DynamoDBCompositePrimaryKeyTable, Sendable { let dynamodb: Client let targetTableName: String public let tableConfiguration: AWSDynamoDBTableConfiguration diff --git a/Sources/DynamoDBTables/CustomRowTypeIdentifier.swift b/Sources/DynamoDBTables/CustomRowTypeIdentifier.swift index c40e39d..31d92e9 100644 --- a/Sources/DynamoDBTables/CustomRowTypeIdentifier.swift +++ b/Sources/DynamoDBTables/CustomRowTypeIdentifier.swift @@ -32,13 +32,13 @@ public protocol CustomRowTypeIdentifier { func getTypeRowIdentifier(type: Any.Type) -> String { let typeRowIdentifier: String - // if this type has a custom row identity - if let customAttributesTypeType = type as? CustomRowTypeIdentifier.Type, - let identifier = customAttributesTypeType.rowTypeIdentifier + // if this type has a custom row identity + = if let customAttributesTypeType = type as? CustomRowTypeIdentifier.Type, + let identifier = customAttributesTypeType.rowTypeIdentifier { - typeRowIdentifier = identifier + identifier } else { - typeRowIdentifier = String(describing: type) + String(describing: type) } return typeRowIdentifier diff --git a/Sources/DynamoDBTables/DynamoDBClientProtocol.swift b/Sources/DynamoDBTables/DynamoDBClientProtocol.swift index cc89324..d38fc11 100644 --- a/Sources/DynamoDBTables/DynamoDBClientProtocol.swift +++ b/Sources/DynamoDBTables/DynamoDBClientProtocol.swift @@ -49,9 +49,8 @@ import Smockable /// as the table implementation may call these methods from multiple concurrent contexts. @Smock public protocol DynamoDBClientProtocol { - // MARK: - Single Item Operations - + /// Puts an item into a DynamoDB table. /// /// This operation either creates a new item or replaces an existing item with the same key. @@ -71,7 +70,7 @@ public protocol DynamoDBClientProtocol { /// let response = try await client.putItem(input: putInput) /// ``` func putItem(input: AWSDynamoDB.PutItemInput) async throws -> AWSDynamoDB.PutItemOutput - + /// Retrieves an item from a DynamoDB table by its primary key. /// /// This operation returns the item attributes for the specified key. If the item doesn't exist, @@ -91,7 +90,7 @@ public protocol DynamoDBClientProtocol { /// let response = try await client.getItem(input: getInput) /// ``` func getItem(input: AWSDynamoDB.GetItemInput) async throws -> AWSDynamoDB.GetItemOutput - + /// Deletes an item from a DynamoDB table. /// /// This operation removes an item with the specified key. Conditional expressions can be used @@ -111,9 +110,9 @@ public protocol DynamoDBClientProtocol { /// let response = try await client.deleteItem(input: deleteInput) /// ``` func deleteItem(input: AWSDynamoDB.DeleteItemInput) async throws -> AWSDynamoDB.DeleteItemOutput - + // MARK: - Query Operations - + /// Queries items from a DynamoDB table using a partition key and optional sort key conditions. /// /// This operation efficiently retrieves items that share the same partition key value. @@ -133,9 +132,9 @@ public protocol DynamoDBClientProtocol { /// let response = try await client.query(input: queryInput) /// ``` func query(input: AWSDynamoDB.QueryInput) async throws -> AWSDynamoDB.QueryOutput - + // MARK: - Batch Operations - + /// Retrieves multiple items from one or more DynamoDB tables in a single request. /// /// This operation allows efficient retrieval of up to 100 items across multiple tables. @@ -155,7 +154,7 @@ public protocol DynamoDBClientProtocol { /// let response = try await client.batchGetItem(input: batchInput) /// ``` func batchGetItem(input: AWSDynamoDB.BatchGetItemInput) async throws -> AWSDynamoDB.BatchGetItemOutput - + /// Executes multiple PartiQL statements in a single batch request. /// /// This operation allows executing up to 25 PartiQL statements (SELECT, INSERT, UPDATE, DELETE) @@ -175,9 +174,9 @@ public protocol DynamoDBClientProtocol { /// let response = try await client.batchExecuteStatement(input: batchInput) /// ``` func batchExecuteStatement(input: AWSDynamoDB.BatchExecuteStatementInput) async throws -> AWSDynamoDB.BatchExecuteStatementOutput - + // MARK: - Advanced Operations - + /// Executes a single PartiQL statement against a DynamoDB table. /// /// PartiQL is a SQL-compatible query language for DynamoDB that supports SELECT, INSERT, @@ -196,7 +195,7 @@ public protocol DynamoDBClientProtocol { /// let response = try await client.executeStatement(input: executeInput) /// ``` func executeStatement(input: AWSDynamoDB.ExecuteStatementInput) async throws -> AWSDynamoDB.ExecuteStatementOutput - + /// Executes multiple PartiQL statements as a single transaction. /// /// This operation ensures that all statements either succeed together or fail together, @@ -223,4 +222,4 @@ public protocol DynamoDBClientProtocol { /// Retroactive conformance of AWS DynamoDB Client to the protocol. extension AWSDynamoDB.DynamoDBClient: DynamoDBClientProtocol { // No implementation needed - the client already has all required methods -} \ No newline at end of file +} diff --git a/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable+clobberVersionedItemWithHistoricalRow.swift b/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable+clobberVersionedItemWithHistoricalRow.swift index e00f469..f91c2ae 100644 --- a/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable+clobberVersionedItemWithHistoricalRow.swift +++ b/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable+clobberVersionedItemWithHistoricalRow.swift @@ -44,7 +44,7 @@ public extension DynamoDBCompositePrimaryKeyTable { version number. - completion: completion handler providing an error that was thrown or nil */ - func clobberVersionedItemWithHistoricalRow( + func clobberVersionedItemWithHistoricalRow( forPrimaryKey partitionKey: String, andHistoricalKey historicalKey: String, item: ItemType, diff --git a/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable+conditionallyInTransaction.swift b/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable+conditionallyInTransaction.swift index bae9b79..a28a559 100644 --- a/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable+conditionallyInTransaction.swift +++ b/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable+conditionallyInTransaction.swift @@ -25,7 +25,6 @@ // DynamoDBTables // -import CollectionConcurrencyKit import Foundation public enum ConditionalTransactWriteError: Error { @@ -54,7 +53,7 @@ public extension DynamoDBCompositePrimaryKeyTable { forKeys keys: [CompositePrimaryKey], withRetries retries: Int = 10, constraints: [TransactionConstraintEntry] = [], - writeEntryProvider: @escaping (CompositePrimaryKey, TypedTTLDatabaseItem?) + writeEntryProvider: @Sendable @escaping (CompositePrimaryKey, TypedTTLDatabaseItem?) async throws -> WriteEntry?) async throws -> [WriteEntry] { @@ -101,7 +100,7 @@ public extension DynamoDBCompositePrimaryKeyTable { ReturnedType: PolymorphicOperationReturnType & BatchCapableReturnType>( forKeys keys: [CompositePrimaryKey], withRetries retries: Int = 10, - writeEntryProvider: @escaping (CompositePrimaryKey, ReturnedType?) + writeEntryProvider: @Sendable @escaping (CompositePrimaryKey, ReturnedType?) async throws -> WriteEntryType?) async throws -> [WriteEntryType] where WriteEntryType.AttributesType == ReturnedType.AttributesType { @@ -133,7 +132,7 @@ public extension DynamoDBCompositePrimaryKeyTable { forKeys keys: [CompositePrimaryKey], withRetries retries: Int = 10, constraints: [TransactionConstraintEntryType], - writeEntryProvider: @escaping (CompositePrimaryKey, ReturnedType?) + writeEntryProvider: @Sendable @escaping (CompositePrimaryKey, ReturnedType?) async throws -> WriteEntryType?) async throws -> [WriteEntryType] where WriteEntryType.AttributesType == ReturnedType.AttributesType, WriteEntryType.AttributesType == TransactionConstraintEntryType.AttributesType diff --git a/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable+conditionallyUpdateItem.swift b/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable+conditionallyUpdateItem.swift index 0b8a825..1753f07 100644 --- a/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable+conditionallyUpdateItem.swift +++ b/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable+conditionallyUpdateItem.swift @@ -43,7 +43,7 @@ public extension DynamoDBCompositePrimaryKeyTable { withRetries: the number of times to attempt to retry the update before failing. updatedPayloadProvider: the provider that will return updated payloads. */ - func conditionallyUpdateItem( + func conditionallyUpdateItem( forKey key: CompositePrimaryKey, withRetries retries: Int = 10, timeToLiveAttributesType _: TimeToLiveAttributesType.Type = StandardTimeToLiveAttributes.self, diff --git a/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable.swift b/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable.swift index 73b631a..0fc37f6 100644 --- a/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable.swift +++ b/Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable.swift @@ -82,7 +82,7 @@ public enum AttributeCondition: Sendable { case beginsWith(String) } -public enum WriteEntry { +public enum WriteEntry: Sendable { case update(new: TypedTTLDatabaseItem, existing: TypedTTLDatabaseItem) case insert(new: TypedTTLDatabaseItem) @@ -92,13 +92,13 @@ public enum WriteEntry { switch self { case .update(new: let new, existing: _): - return new.compositePrimaryKey + new.compositePrimaryKey case let .insert(new: new): - return new.compositePrimaryKey + new.compositePrimaryKey case let .deleteAtKey(key: key): - return key + key case let .deleteItem(existing: existing): - return existing.compositePrimaryKey + existing.compositePrimaryKey } } } diff --git a/Sources/DynamoDBTables/InMemoryDataRepresentations.swift b/Sources/DynamoDBTables/InMemoryDataRepresentations.swift index fbc7d4a..6ae2d07 100644 --- a/Sources/DynamoDBTables/InMemoryDataRepresentations.swift +++ b/Sources/DynamoDBTables/InMemoryDataRepresentations.swift @@ -147,20 +147,20 @@ enum InMemoryWriteEntry: Sendable { var compositePrimaryKey: CompositePrimaryKey { switch self { case .update(new: let new, existing: _): - return new.compositePrimaryKey + new.compositePrimaryKey case let .insert(new: new): - return new.compositePrimaryKey + new.compositePrimaryKey case let .deleteAtKey(key: key): - return key + key case let .deleteItem(existing: existing): - return existing.compositePrimaryKey + existing.compositePrimaryKey } } } extension WriteEntry { func inMemoryForm() throws -> InMemoryWriteEntry { - return switch self { + switch self { case let .update(new: new, existing: existing): try .update(new: new.inMemoryFormWithKey(), existing: existing.inMemoryFormWithKey()) case let .insert(new: new): @@ -179,14 +179,14 @@ enum InMemoryTransactionConstraintEntry: S var compositePrimaryKey: CompositePrimaryKey { switch self { case let .required(existing: existing): - return existing.compositePrimaryKey + existing.compositePrimaryKey } } } extension TransactionConstraintEntry { func inMemoryForm() throws -> InMemoryTransactionConstraintEntry { - return switch self { + switch self { case let .required(existing: existing): try .required(existing: existing.inMemoryFormWithKey()) } diff --git a/Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTable+query.swift b/Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTable+query.swift index 127edc1..792a169 100644 --- a/Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTable+query.swift +++ b/Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTable+query.swift @@ -155,11 +155,10 @@ public extension InMemoryDynamoDBCompositePrimaryKeyTable { let rawItems: [ReturnedType] = try await polymorphicQuery(forPartitionKey: partitionKey, sortKeyCondition: sortKeyCondition) - let items: [ReturnedType] - if !scanIndexForward { - items = rawItems.reversed() + let items: [ReturnedType] = if !scanIndexForward { + rawItems.reversed() } else { - items = rawItems + rawItems } let startIndex: Int @@ -265,11 +264,10 @@ public extension InMemoryDynamoDBCompositePrimaryKeyTable { let rawItems: [TypedTTLDatabaseItem] = try await query(forPartitionKey: partitionKey, sortKeyCondition: sortKeyCondition) - let items: [TypedTTLDatabaseItem] - if !scanIndexForward { - items = rawItems.reversed() + let items: [TypedTTLDatabaseItem] = if !scanIndexForward { + rawItems.reversed() } else { - items = rawItems + rawItems } let startIndex: Int diff --git a/Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTable+transform.swift b/Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTable+transform.swift index 7dcfb55..7965a97 100644 --- a/Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTable+transform.swift +++ b/Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTable+transform.swift @@ -77,7 +77,7 @@ struct InMemoryPolymorphicTransactionConstraintTransform: PolymorphicTransaction let sortKey: String let rowVersion: Int - init(_ entry: TransactionConstraintEntry, + init(_ entry: TransactionConstraintEntry, table _: TableType) throws { switch entry { @@ -286,11 +286,9 @@ extension InMemoryDynamoDBCompositePrimaryKeyTable { -> DynamoDBTableError? { let errors = constraints.compactMap { entry -> DynamoDBTableError? in - let existingItem: InMemoryDatabaseItemWithKey - - switch entry { + let existingItem: InMemoryDatabaseItemWithKey = switch entry { case let .required(existing: existing): - existingItem = existing + existing } let compositePrimaryKey = existingItem.compositePrimaryKey diff --git a/Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTableWithIndex.swift b/Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTableWithIndex.swift index 488dcc9..4bb45a0 100644 --- a/Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTableWithIndex.swift +++ b/Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTableWithIndex.swift @@ -34,7 +34,7 @@ public enum GSIError: Error { case unknownIndex(name: String) } -public struct InMemoryDynamoDBCompositePrimaryKeyTableWithIndex: DynamoDBCompositePrimaryKeyTable { +public struct InMemoryDynamoDBCompositePrimaryKeyTableWithIndex: DynamoDBCompositePrimaryKeyTable, Sendable { public let primaryTable: InMemoryDynamoDBCompositePrimaryKeyTable public let gsiDataStore: InMemoryDynamoDBCompositePrimaryKeyTable @@ -108,13 +108,13 @@ public struct InMemoryDynamoDBCompositePrimaryKeyTableWithIndex] = try await query(forPartitionKey: partitionKey, sortKeyCondition: sortKeyCondition) - let items: [CompositePrimaryKey] - if !scanIndexForward { - items = rawItems.reversed() + let items: [CompositePrimaryKey] = if !scanIndexForward { + rawItems.reversed() } else { - items = rawItems + rawItems } let startIndex: Int diff --git a/Sources/DynamoDBTables/InternalKeyedDecodingContainer.swift b/Sources/DynamoDBTables/InternalKeyedDecodingContainer.swift index 6432c94..f64a274 100644 --- a/Sources/DynamoDBTables/InternalKeyedDecodingContainer.swift +++ b/Sources/DynamoDBTables/InternalKeyedDecodingContainer.swift @@ -167,11 +167,10 @@ struct InternalKeyedDecodingContainer: KeyedDecodingContainerProto } private func getAttributeName(key: CodingKey) -> String { - let attributeName: String - if let attributeNameTransform = decodingContainer.attributeNameTransform { - attributeName = attributeNameTransform(key.stringValue) + let attributeName: String = if let attributeNameTransform = decodingContainer.attributeNameTransform { + attributeNameTransform(key.stringValue) } else { - attributeName = key.stringValue + key.stringValue } return attributeName diff --git a/Sources/DynamoDBTables/InternalSingleValueEncodingContainer.swift b/Sources/DynamoDBTables/InternalSingleValueEncodingContainer.swift index d3518fe..76b7221 100644 --- a/Sources/DynamoDBTables/InternalSingleValueEncodingContainer.swift +++ b/Sources/DynamoDBTables/InternalSingleValueEncodingContainer.swift @@ -148,11 +148,10 @@ class InternalSingleValueEncodingContainer: SingleValueEncodingContainer { } private func getAttributeName(key: CodingKey) -> String { - let attributeName: String - if let attributeNameTransform { - attributeName = attributeNameTransform(key.stringValue) + let attributeName: String = if let attributeNameTransform { + attributeNameTransform(key.stringValue) } else { - attributeName = key.stringValue + key.stringValue } return attributeName diff --git a/Sources/DynamoDBTables/PolymorphicOperationReturnType.swift b/Sources/DynamoDBTables/PolymorphicOperationReturnType.swift index 464c6ce..8caced4 100644 --- a/Sources/DynamoDBTables/PolymorphicOperationReturnType.swift +++ b/Sources/DynamoDBTables/PolymorphicOperationReturnType.swift @@ -33,7 +33,7 @@ public protocol BatchCapableReturnType { func getItemKey() -> CompositePrimaryKey } -public protocol PolymorphicOperationReturnType { +public protocol PolymorphicOperationReturnType: Sendable { associatedtype AttributesType: PrimaryKeyAttributes associatedtype TimeToLiveAttributesType: TimeToLiveAttributes diff --git a/Sources/DynamoDBTables/PolymorphicWriteEntry.swift b/Sources/DynamoDBTables/PolymorphicWriteEntry.swift index 0f0748a..41693f6 100644 --- a/Sources/DynamoDBTables/PolymorphicWriteEntry.swift +++ b/Sources/DynamoDBTables/PolymorphicWriteEntry.swift @@ -46,7 +46,7 @@ public protocol PolymorphicTransactionConstraintTransform { // Conforming types are provided by the application to express the different possible write entries // and how they can be converted to the table-provided transform type. -public protocol PolymorphicWriteEntry { +public protocol PolymorphicWriteEntry: Sendable { associatedtype AttributesType: PrimaryKeyAttributes func handle(context: Context) throws -> Context.WriteEntryTransformType @@ -57,20 +57,20 @@ public protocol PolymorphicWriteEntry { public typealias StandardTransactionConstraintEntry = TransactionConstraintEntry -public enum TransactionConstraintEntry { +public enum TransactionConstraintEntry: Sendable { case required(existing: TypedTTLDatabaseItem) public var compositePrimaryKey: CompositePrimaryKey { switch self { case let .required(existing: existing): - return existing.compositePrimaryKey + existing.compositePrimaryKey } } } // Conforming types are provided by the application to express the different possible constraint entries // and how they can be converted to the table-provided transform type. -public protocol PolymorphicTransactionConstraintEntry { +public protocol PolymorphicTransactionConstraintEntry: Sendable { associatedtype AttributesType: PrimaryKeyAttributes func handle(context: Context) throws -> Context.WriteTransactionConstraintType diff --git a/Sources/DynamoDBTables/QueryInput+forSortKeyCondition.swift b/Sources/DynamoDBTables/QueryInput+forSortKeyCondition.swift index 2677951..053a2f0 100644 --- a/Sources/DynamoDBTables/QueryInput+forSortKeyCondition.swift +++ b/Sources/DynamoDBTables/QueryInput+forSortKeyCondition.swift @@ -84,12 +84,11 @@ extension QueryInput { expressionAttributeValues = [":pk": DynamoDBClientTypes.AttributeValue.s(partitionKey)] } - let inputExclusiveStartKey: [String: DynamoDBClientTypes.AttributeValue]? - if let exclusiveStartKey = exclusiveStartKey?.data(using: .utf8) { - inputExclusiveStartKey = try JSONDecoder().decode([String: DynamoDBClientTypes.AttributeValue].self, - from: exclusiveStartKey) + let inputExclusiveStartKey: [String: DynamoDBClientTypes.AttributeValue]? = if let exclusiveStartKey = exclusiveStartKey?.data(using: .utf8) { + try JSONDecoder().decode([String: DynamoDBClientTypes.AttributeValue].self, + from: exclusiveStartKey) } else { - inputExclusiveStartKey = nil + nil } return AWSDynamoDB.QueryInput(consistentRead: consistentRead, diff --git a/Sources/DynamoDBTables/RowWithIndex.swift b/Sources/DynamoDBTables/RowWithIndex.swift index ee14ca6..1e5cada 100644 --- a/Sources/DynamoDBTables/RowWithIndex.swift +++ b/Sources/DynamoDBTables/RowWithIndex.swift @@ -51,7 +51,7 @@ public func createRowWithIndexCodingKey(stringValue: String) -> RowWithIndexCodi RowWithIndexCodingKey(stringValue: stringValue)! } -public struct RowWithIndex: Codable, CustomRowTypeIdentifier { +public struct RowWithIndex: Codable, CustomRowTypeIdentifier, Sendable { public static var rowTypeIdentifier: String? { let rowTypeIdentity = getTypeRowIdentifier(type: RowType.self) let indexIdentity = Identity.identity diff --git a/Sources/DynamoDBTables/RowWithItemVersion.swift b/Sources/DynamoDBTables/RowWithItemVersion.swift index ada5eec..5e178d0 100644 --- a/Sources/DynamoDBTables/RowWithItemVersion.swift +++ b/Sources/DynamoDBTables/RowWithItemVersion.swift @@ -26,7 +26,7 @@ import Foundation -public struct RowWithItemVersion: Codable, CustomRowTypeIdentifier { +public struct RowWithItemVersion: Codable, Sendable, CustomRowTypeIdentifier { public static var rowTypeIdentifier: String? { let rowTypeIdentity = getTypeRowIdentifier(type: RowType.self) diff --git a/Sources/DynamoDBTables/Sequence+concurrency.swift b/Sources/DynamoDBTables/Sequence+concurrency.swift new file mode 100644 index 0000000..2ef5c51 --- /dev/null +++ b/Sources/DynamoDBTables/Sequence+concurrency.swift @@ -0,0 +1,422 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the DynamoDBTables open source project +// +// This file is forked CollectionConcurrencyKit +// Copyright (c) John Sundell 2021 +// MIT license, see LICENSE.md file for details +// +// Changes specified by +// https://github.com/swift-server-community/dynamo-db-tables/compare/9ab0e7a..main +// Copyright (c) 2024 the DynamoDBTables authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of DynamoDBTables authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +// +// Sequence+concurrency.swift +// DynamoDBTables +// +// +// +// + +// MARK: - ForEach + +extension Sequence { + /// Run an async closure for each element within the sequence. + /// + /// The closure calls will be performed in order, by waiting for + /// each call to complete before proceeding with the next one. If + /// any of the closure calls throw an error, then the iteration + /// will be terminated and the error rethrown. + /// + /// - parameter operation: The closure to run for each element. + /// - throws: Rethrows any error thrown by the passed closure. + func asyncForEach( + _ operation: @Sendable (Element) async throws -> Void) async rethrows + { + for element in self { + try await operation(element) + } + } + /* + /// Run an async closure for each element within the sequence. + /// + /// The closure calls will be performed concurrently, but the call + /// to this function won't return until all of the closure calls + /// have completed. + /// + /// - parameter priority: Any specific `TaskPriority` to assign to + /// the async tasks that will perform the closure calls. The + /// default is `nil` (meaning that the system picks a priority). + /// - parameter operation: The closure to run for each element. + func concurrentForEach( + withPriority priority: TaskPriority? = nil, + _ operation: @Sendable @escaping (Element) async -> Void + ) async { + await withTaskGroup(of: Void.self) { group in + for element in self { + group.addTask(priority: priority) { + await operation(element) + } + } + } + } + + /// Run an async closure for each element within the sequence. + /// + /// The closure calls will be performed concurrently, but the call + /// to this function won't return until all of the closure calls + /// have completed. If any of the closure calls throw an error, + /// then the first error will be rethrown once all closure calls have + /// completed. + /// + /// - parameter priority: Any specific `TaskPriority` to assign to + /// the async tasks that will perform the closure calls. The + /// default is `nil` (meaning that the system picks a priority). + /// - parameter operation: The closure to run for each element. + /// - throws: Rethrows any error thrown by the passed closure. + func concurrentForEach( + withPriority priority: TaskPriority? = nil, + _ operation: @Sendable @escaping (Element) async throws -> Void + ) async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + for element in self { + group.addTask(priority: priority) { + try await operation(element) + } + } + + // Propagate any errors thrown by the group's tasks: + for try await _ in group {} + } + }*/ +} + +// MARK: - Map + +extension Sequence where Element: Sendable { + /* /// Transform the sequence into an array of new values using + /// an async closure. + /// + /// The closure calls will be performed in order, by waiting for + /// each call to complete before proceeding with the next one. If + /// any of the closure calls throw an error, then the iteration + /// will be terminated and the error rethrown. + /// + /// - parameter transform: The transform to run on each element. + /// - returns: The transformed values as an array. The order of + /// the transformed values will match the original sequence. + /// - throws: Rethrows any error thrown by the passed closure. + func asyncMap( + _ transform: @Sendable (Element) async throws -> T + ) async rethrows -> [T] { + var values = [T]() + + for element in self { + try await values.append(transform(element)) + } + + return values + } + */ + /// Transform the sequence into an array of new values using + /// an async closure. + /// + /// The closure calls will be performed concurrently, but the call + /// to this function won't return until all of the closure calls + /// have completed. + /// + /// - parameter priority: Any specific `TaskPriority` to assign to + /// the async tasks that will perform the closure calls. The + /// default is `nil` (meaning that the system picks a priority). + /// - parameter transform: The transform to run on each element. + /// - returns: The transformed values as an array. The order of + /// the transformed values will match the original sequence. + func concurrentMap( + withPriority priority: TaskPriority? = nil, + _ transform: @Sendable @escaping (Element) async -> T) async -> [T] + { + await withTaskGroup(of: (offset: Int, value: T).self) { group in + var taskCount = 0 + for (idx, element) in enumerated() { + taskCount += 1 + + group.addTask(priority: priority) { + await (idx, transform(element)) + } + } + + var res = [T?](repeating: nil, count: taskCount) + while let next = await group.next() { + res[next.offset] = next.value + } + return res as! [T] + } + } + + /// Transform the sequence into an array of new values using + /// an async closure. + /// + /// The closure calls will be performed concurrently, but the call + /// to this function won't return until all of the closure calls + /// have completed. If any of the closure calls throw an error, + /// then the first error will be rethrown once all closure calls have + /// completed. + /// + /// - parameter priority: Any specific `TaskPriority` to assign to + /// the async tasks that will perform the closure calls. The + /// default is `nil` (meaning that the system picks a priority). + /// - parameter transform: The transform to run on each element. + /// - returns: The transformed values as an array. The order of + /// the transformed values will match the original sequence. + /// - throws: Rethrows any error thrown by the passed closure. + func concurrentMap( + withPriority priority: TaskPriority? = nil, + _ transform: @Sendable @escaping (Element) async throws -> T) async throws -> [T] + { + try await withThrowingTaskGroup(of: (offset: Int, value: T).self) { group in + var taskCount = 0 + for (idx, element) in enumerated() { + taskCount += 1 + + group.addTask(priority: priority) { + try await (idx, transform(element)) + } + } + + var res = [T?](repeating: nil, count: taskCount) + while let next = try await group.next() { + res[next.offset] = next.value + } + return res as! [T] + } + } +} + +// MARK: - CompactMap + +extension Sequence where Element: Sendable { + /// Transform the sequence into an array of new values using + /// an async closure that returns optional values. Only the + /// non-`nil` return values will be included in the new array. + /// + /// The closure calls will be performed in order, by waiting for + /// each call to complete before proceeding with the next one. If + /// any of the closure calls throw an error, then the iteration + /// will be terminated and the error rethrown. + /// + /// - parameter transform: The transform to run on each element. + /// - returns: The transformed values as an array. The order of + /// the transformed values will match the original sequence, + /// except for the values that were transformed into `nil`. + /// - throws: Rethrows any error thrown by the passed closure. + func asyncCompactMap( + _ transform: @Sendable (Element) async throws -> T?) async rethrows -> [T] + { + var values = [T]() + + for element in self { + guard let value = try await transform(element) else { + continue + } + + values.append(value) + } + + return values + } + + /// Transform the sequence into an array of new values using + /// an async closure that returns optional values. Only the + /// non-`nil` return values will be included in the new array. + /// + /// The closure calls will be performed concurrently, but the call + /// to this function won't return until all of the closure calls + /// have completed. + /// + /// - parameter priority: Any specific `TaskPriority` to assign to + /// the async tasks that will perform the closure calls. The + /// default is `nil` (meaning that the system picks a priority). + /// - parameter transform: The transform to run on each element. + /// - returns: The transformed values as an array. The order of + /// the transformed values will match the original sequence, + /// except for the values that were transformed into `nil`. + func concurrentCompactMap( + withPriority priority: TaskPriority? = nil, + _ transform: @Sendable @escaping (Element) async -> T?) async -> [T] + { + await withTaskGroup(of: (offset: Int, value: T?).self) { group in + var taskCount = 0 + for (idx, element) in enumerated() { + taskCount += 1 + + group.addTask(priority: priority) { + await (idx, transform(element)) + } + } + + var res = [T??](repeating: nil, count: taskCount) + while let next = await group.next() { + res[next.offset] = next.value + } + return (res as! [T?]).compactMap(\.self) + } + } + + /// Transform the sequence into an array of new values using + /// an async closure that returns optional values. Only the + /// non-`nil` return values will be included in the new array. + /// + /// The closure calls will be performed concurrently, but the call + /// to this function won't return until all of the closure calls + /// have completed. If any of the closure calls throw an error, + /// then the first error will be rethrown once all closure calls have + /// completed. + /// + /// - parameter priority: Any specific `TaskPriority` to assign to + /// the async tasks that will perform the closure calls. The + /// default is `nil` (meaning that the system picks a priority). + /// - parameter transform: The transform to run on each element. + /// - returns: The transformed values as an array. The order of + /// the transformed values will match the original sequence, + /// except for the values that were transformed into `nil`. + /// - throws: Rethrows any error thrown by the passed closure. + func concurrentCompactMap( + withPriority priority: TaskPriority? = nil, + _ transform: @Sendable @escaping (Element) async throws -> T?) async throws -> [T] + { + try await withThrowingTaskGroup(of: (offset: Int, value: T?).self) { group in + var taskCount = 0 + for (idx, element) in enumerated() { + taskCount += 1 + + group.addTask(priority: priority) { + try await (idx, transform(element)) + } + } + + var res = [T??](repeating: nil, count: taskCount) + while let next = try await group.next() { + res[next.offset] = next.value + } + return (res as! [T?]).compactMap(\.self) + } + } +} + +// MARK: - FlatMap + +extension Sequence where Element: Sendable { + /* /// Transform the sequence into an array of new values using + /// an async closure that returns sequences. The returned sequences + /// will be flattened into the array returned from this function. + /// + /// The closure calls will be performed in order, by waiting for + /// each call to complete before proceeding with the next one. If + /// any of the closure calls throw an error, then the iteration + /// will be terminated and the error rethrown. + /// + /// - parameter transform: The transform to run on each element. + /// - returns: The transformed values as an array. The order of + /// the transformed values will match the original sequence, + /// with the results of each closure call appearing in-order + /// within the returned array. + /// - throws: Rethrows any error thrown by the passed closure. + func asyncFlatMap( + _ transform: @Sendable (Element) async throws -> T + ) async rethrows -> [T.Element] { + var values = [T.Element]() + + for element in self { + try await values.append(contentsOf: transform(element)) + } + + return values + } + */ + /// Transform the sequence into an array of new values using + /// an async closure that returns sequences. The returned sequences + /// will be flattened into the array returned from this function. + /// + /// The closure calls will be performed concurrently, but the call + /// to this function won't return until all of the closure calls + /// have completed. + /// + /// - parameter priority: Any specific `TaskPriority` to assign to + /// the async tasks that will perform the closure calls. The + /// default is `nil` (meaning that the system picks a priority). + /// - parameter transform: The transform to run on each element. + /// - returns: The transformed values as an array. The order of + /// the transformed values will match the original sequence, + /// with the results of each closure call appearing in-order + /// within the returned array. + func concurrentFlatMap( + withPriority priority: TaskPriority? = nil, + _ transform: @Sendable @escaping (Element) async -> T) async -> [T.Element] + { + await withTaskGroup(of: (offset: Int, value: T).self) { group in + var taskCount = 0 + for (idx, element) in enumerated() { + taskCount += 1 + + group.addTask(priority: priority) { + await (idx, transform(element)) + } + } + + var res = [T?](repeating: nil, count: taskCount) + while let next = await group.next() { + res[next.offset] = next.value + } + return (res as! [T]).flatMap(\.self) + } + } + + /// Transform the sequence into an array of new values using + /// an async closure that returns sequences. The returned sequences + /// will be flattened into the array returned from this function. + /// + /// The closure calls will be performed concurrently, but the call + /// to this function won't return until all of the closure calls + /// have completed. If any of the closure calls throw an error, + /// then the first error will be rethrown once all closure calls have + /// completed. + /// + /// - parameter priority: Any specific `TaskPriority` to assign to + /// the async tasks that will perform the closure calls. The + /// default is `nil` (meaning that the system picks a priority). + /// - parameter transform: The transform to run on each element. + /// - returns: The transformed values as an array. The order of + /// the transformed values will match the original sequence, + /// with the results of each closure call appearing in-order + /// within the returned array. + /// - throws: Rethrows any error thrown by the passed closure. + func concurrentFlatMap( + withPriority priority: TaskPriority? = nil, + _ transform: @Sendable @escaping (Element) async throws -> T) async throws -> [T.Element] + { + try await withThrowingTaskGroup(of: (offset: Int, value: T).self) { group in + var taskCount = 0 + for (idx, element) in enumerated() { + taskCount += 1 + + group.addTask(priority: priority) { + try await (idx, transform(element)) + } + } + + var res = [T?](repeating: nil, count: taskCount) + while let next = try await group.next() { + res[next.offset] = next.value + } + return (res as! [T]).flatMap(\.self) + } + } +} diff --git a/Sources/DynamoDBTables/SimulateConcurrencyDynamoDBCompositePrimaryKeyTable.swift b/Sources/DynamoDBTables/SimulateConcurrencyDynamoDBCompositePrimaryKeyTable.swift index 7a75d0e..1771ac5 100644 --- a/Sources/DynamoDBTables/SimulateConcurrencyDynamoDBCompositePrimaryKeyTable.swift +++ b/Sources/DynamoDBTables/SimulateConcurrencyDynamoDBCompositePrimaryKeyTable.swift @@ -34,8 +34,8 @@ private let maxStatementLength = 8192 to a database by incrementing a row's version every time it is added for a specified number of requests. */ -public class SimulateConcurrencyDynamoDBCompositePrimaryKeyTable: DynamoDBCompositePrimaryKeyTable { - let wrappedDynamoDBTable: DynamoDBCompositePrimaryKeyTable +public actor SimulateConcurrencyDynamoDBCompositePrimaryKeyTable: DynamoDBCompositePrimaryKeyTable, Sendable { + let wrappedDynamoDBTable: Wrapped let simulateConcurrencyModifications: Int var previousConcurrencyModifications: Int let simulateOnInsertItem: Bool @@ -50,7 +50,7 @@ public class SimulateConcurrencyDynamoDBCompositePrimaryKeyTable: DynamoDBCompos - simulateOnInsertItem: if this instance should simulate concurrency on insertItem. - simulateOnUpdateItem: if this instance should simulate concurrency on updateItem. */ - public init(wrappedDynamoDBTable: DynamoDBCompositePrimaryKeyTable, simulateConcurrencyModifications: Int, + public init(wrappedDynamoDBTable: Wrapped, simulateConcurrencyModifications: Int, simulateOnInsertItem: Bool = true, simulateOnUpdateItem: Bool = true) { self.wrappedDynamoDBTable = wrappedDynamoDBTable @@ -138,13 +138,13 @@ public class SimulateConcurrencyDynamoDBCompositePrimaryKeyTable: DynamoDBCompos try await entries.asyncForEach { entry in switch entry { case let .update(new: new, existing: existing): - return try await self.updateItem(newItem: new, existingItem: existing) + try await self.updateItem(newItem: new, existingItem: existing) case let .insert(new: new): - return try await self.insertItem(new) + try await self.insertItem(new) case let .deleteAtKey(key: key): - return try await self.deleteItem(forKey: key) + try await self.deleteItem(forKey: key) case let .deleteItem(existing: existing): - return try await self.deleteItem(existingItem: existing) + try await self.deleteItem(existingItem: existing) } } } diff --git a/Sources/DynamoDBTables/TypedDatabaseItemWithTimeToLive.swift b/Sources/DynamoDBTables/TypedDatabaseItemWithTimeToLive.swift index 7a79bdc..85ab1b3 100644 --- a/Sources/DynamoDBTables/TypedDatabaseItemWithTimeToLive.swift +++ b/Sources/DynamoDBTables/TypedDatabaseItemWithTimeToLive.swift @@ -42,8 +42,8 @@ public struct RowStatus: Sendable, Codable { } public struct TypedTTLDatabaseItem: Codable + RowType: Codable & Sendable, + TimeToLiveAttributesType: TimeToLiveAttributes>: Codable, Sendable { public let compositePrimaryKey: CompositePrimaryKey public let createDate: Date diff --git a/Sources/DynamoDBTablesMacros/BaseEntryMacro.swift b/Sources/DynamoDBTablesMacros/BaseEntryMacro.swift index c9f89c5..33f3043 100644 --- a/Sources/DynamoDBTablesMacros/BaseEntryMacro.swift +++ b/Sources/DynamoDBTablesMacros/BaseEntryMacro.swift @@ -46,11 +46,11 @@ enum BaseEntryDiagnostic: String, DiagnosticMes var message: String { switch self { case .notAttachedToEnumDeclaration: - return "@\(Attributes.macroName) must be attached to an enum declaration." + "@\(Attributes.macroName) must be attached to an enum declaration." case .enumMustNotHaveZeroCases: - return "@\(Attributes.macroName) decorated enum must be have at least a singe case." + "@\(Attributes.macroName) decorated enum must be have at least a singe case." case .enumCasesMustHaveASingleParameter: - return "@\(Attributes.macroName) decorated enum can only have case entries with a single parameter." + "@\(Attributes.macroName) decorated enum can only have case entries with a single parameter." } } } diff --git a/Tests/DynamoDBTablesTests/InMemoryDynamoDBCompositePrimaryKeyTableTests.swift b/Tests/DynamoDBTablesTests/InMemoryDynamoDBCompositePrimaryKeyTableTests.swift index aec918f..ca1e10b 100644 --- a/Tests/DynamoDBTablesTests/InMemoryDynamoDBCompositePrimaryKeyTableTests.swift +++ b/Tests/DynamoDBTablesTests/InMemoryDynamoDBCompositePrimaryKeyTableTests.swift @@ -1311,6 +1311,7 @@ struct InMemoryDynamoDBCompositePrimaryKeyTableTests { try await table.insertItem(databaseItem1) + @Sendable func writeEntryProvider(key: StandardCompositePrimaryKey, existingItem: StandardTypedDatabaseItem?) throws -> StandardWriteEntry? { @@ -1360,6 +1361,7 @@ struct InMemoryDynamoDBCompositePrimaryKeyTableTests { try await table.insertItem(databaseItem1) + @Sendable func writeEntryProvider(key: StandardCompositePrimaryKey, existingItem: TestQueryableTypes?) throws -> TestPolymorphicWriteEntry? { diff --git a/Tests/DynamoDBTablesTests/SimulateConcurrencyDynamoDBCompositePrimaryKeyTableTests.swift b/Tests/DynamoDBTablesTests/SimulateConcurrencyDynamoDBCompositePrimaryKeyTableTests.swift index c4c6f05..de06d52 100644 --- a/Tests/DynamoDBTablesTests/SimulateConcurrencyDynamoDBCompositePrimaryKeyTableTests.swift +++ b/Tests/DynamoDBTablesTests/SimulateConcurrencyDynamoDBCompositePrimaryKeyTableTests.swift @@ -56,7 +56,7 @@ struct SimulateConcurrencyDynamoDBCompositePrimaryKeyTableTests { } } - private func verifyWithUpdate(table: SimulateConcurrencyDynamoDBCompositePrimaryKeyTable, + private func verifyWithUpdate(table: SimulateConcurrencyDynamoDBCompositePrimaryKeyTable, databaseItem: StandardTypedDatabaseItem, key: StandardCompositePrimaryKey, expectedFailureCount: Int) async throws diff --git a/Tests/DynamoDBTablesTests/TestConfiguration.swift b/Tests/DynamoDBTablesTests/TestConfiguration.swift index ebd8657..d9bc86d 100644 --- a/Tests/DynamoDBTablesTests/TestConfiguration.swift +++ b/Tests/DynamoDBTablesTests/TestConfiguration.swift @@ -63,9 +63,9 @@ extension TestQueryableTypes: BatchCapableReturnType { func getItemKey() -> CompositePrimaryKey { switch self { case let .testTypeA(databaseItem): - return databaseItem.compositePrimaryKey + databaseItem.compositePrimaryKey case let .testTypeB(databaseItem): - return databaseItem.compositePrimaryKey + databaseItem.compositePrimaryKey } } } diff --git a/Tests/DynamoDBTablesTests/TypedDatabaseItem+RowWithItemVersionProtocolTests.swift b/Tests/DynamoDBTablesTests/TypedDatabaseItem+RowWithItemVersionProtocolTests.swift index 5341dbe..d48c047 100644 --- a/Tests/DynamoDBTablesTests/TypedDatabaseItem+RowWithItemVersionProtocolTests.swift +++ b/Tests/DynamoDBTablesTests/TypedDatabaseItem+RowWithItemVersionProtocolTests.swift @@ -26,8 +26,8 @@ import Foundation -@testable import DynamoDBTables import AWSDynamoDB +@testable import DynamoDBTables import Testing private let ORIGINAL_PAYLOAD = "Payload" @@ -117,9 +117,9 @@ struct TypedTTLDatabaseItemRowWithItemVersionProtocolTests { } func getTable(escapeSingleQuoteInPartiQL: Bool = false) throws -> GenericAWSDynamoDBCompositePrimaryKeyTable { - GenericAWSDynamoDBCompositePrimaryKeyTable(tableName: "DummyTable", - client: MockDynamoDBClientProtocol(), - tableConfiguration: .init(escapeSingleQuoteInPartiQL: escapeSingleQuoteInPartiQL)) + GenericAWSDynamoDBCompositePrimaryKeyTable(tableName: "DummyTable", + client: MockDynamoDBClientProtocol(), + tableConfiguration: .init(escapeSingleQuoteInPartiQL: escapeSingleQuoteInPartiQL)) } @Test From 9d2c73be795b8056d5ca85505813dfc37f080313 Mon Sep 17 00:00:00 2001 From: Simon Pilkington Date: Fri, 15 Aug 2025 11:43:57 +1000 Subject: [PATCH 2/2] Fix linting. --- .../DynamoDBTables/Sequence+concurrency.swift | 130 +++--------------- 1 file changed, 18 insertions(+), 112 deletions(-) diff --git a/Sources/DynamoDBTables/Sequence+concurrency.swift b/Sources/DynamoDBTables/Sequence+concurrency.swift index 2ef5c51..7e8c744 100644 --- a/Sources/DynamoDBTables/Sequence+concurrency.swift +++ b/Sources/DynamoDBTables/Sequence+concurrency.swift @@ -22,9 +22,6 @@ // Sequence+concurrency.swift // DynamoDBTables // -// -// -// // MARK: - ForEach @@ -45,87 +42,11 @@ extension Sequence { try await operation(element) } } - /* - /// Run an async closure for each element within the sequence. - /// - /// The closure calls will be performed concurrently, but the call - /// to this function won't return until all of the closure calls - /// have completed. - /// - /// - parameter priority: Any specific `TaskPriority` to assign to - /// the async tasks that will perform the closure calls. The - /// default is `nil` (meaning that the system picks a priority). - /// - parameter operation: The closure to run for each element. - func concurrentForEach( - withPriority priority: TaskPriority? = nil, - _ operation: @Sendable @escaping (Element) async -> Void - ) async { - await withTaskGroup(of: Void.self) { group in - for element in self { - group.addTask(priority: priority) { - await operation(element) - } - } - } - } - - /// Run an async closure for each element within the sequence. - /// - /// The closure calls will be performed concurrently, but the call - /// to this function won't return until all of the closure calls - /// have completed. If any of the closure calls throw an error, - /// then the first error will be rethrown once all closure calls have - /// completed. - /// - /// - parameter priority: Any specific `TaskPriority` to assign to - /// the async tasks that will perform the closure calls. The - /// default is `nil` (meaning that the system picks a priority). - /// - parameter operation: The closure to run for each element. - /// - throws: Rethrows any error thrown by the passed closure. - func concurrentForEach( - withPriority priority: TaskPriority? = nil, - _ operation: @Sendable @escaping (Element) async throws -> Void - ) async throws { - try await withThrowingTaskGroup(of: Void.self) { group in - for element in self { - group.addTask(priority: priority) { - try await operation(element) - } - } - - // Propagate any errors thrown by the group's tasks: - for try await _ in group {} - } - }*/ } // MARK: - Map extension Sequence where Element: Sendable { - /* /// Transform the sequence into an array of new values using - /// an async closure. - /// - /// The closure calls will be performed in order, by waiting for - /// each call to complete before proceeding with the next one. If - /// any of the closure calls throw an error, then the iteration - /// will be terminated and the error rethrown. - /// - /// - parameter transform: The transform to run on each element. - /// - returns: The transformed values as an array. The order of - /// the transformed values will match the original sequence. - /// - throws: Rethrows any error thrown by the passed closure. - func asyncMap( - _ transform: @Sendable (Element) async throws -> T - ) async rethrows -> [T] { - var values = [T]() - - for element in self { - try await values.append(transform(element)) - } - - return values - } - */ /// Transform the sequence into an array of new values using /// an async closure. /// @@ -157,7 +78,7 @@ extension Sequence where Element: Sendable { while let next = await group.next() { res[next.offset] = next.value } - return res as! [T] + return res.asNonOptionalCollection() } } @@ -195,7 +116,7 @@ extension Sequence where Element: Sendable { while let next = try await group.next() { res[next.offset] = next.value } - return res as! [T] + return res.asNonOptionalCollection() } } } @@ -266,7 +187,7 @@ extension Sequence where Element: Sendable { while let next = await group.next() { res[next.offset] = next.value } - return (res as! [T?]).compactMap(\.self) + return (res.asNonOptionalCollection()).compactMap(\.self) } } @@ -306,7 +227,7 @@ extension Sequence where Element: Sendable { while let next = try await group.next() { res[next.offset] = next.value } - return (res as! [T?]).compactMap(\.self) + return (res.asNonOptionalCollection()).compactMap(\.self) } } } @@ -314,33 +235,6 @@ extension Sequence where Element: Sendable { // MARK: - FlatMap extension Sequence where Element: Sendable { - /* /// Transform the sequence into an array of new values using - /// an async closure that returns sequences. The returned sequences - /// will be flattened into the array returned from this function. - /// - /// The closure calls will be performed in order, by waiting for - /// each call to complete before proceeding with the next one. If - /// any of the closure calls throw an error, then the iteration - /// will be terminated and the error rethrown. - /// - /// - parameter transform: The transform to run on each element. - /// - returns: The transformed values as an array. The order of - /// the transformed values will match the original sequence, - /// with the results of each closure call appearing in-order - /// within the returned array. - /// - throws: Rethrows any error thrown by the passed closure. - func asyncFlatMap( - _ transform: @Sendable (Element) async throws -> T - ) async rethrows -> [T.Element] { - var values = [T.Element]() - - for element in self { - try await values.append(contentsOf: transform(element)) - } - - return values - } - */ /// Transform the sequence into an array of new values using /// an async closure that returns sequences. The returned sequences /// will be flattened into the array returned from this function. @@ -375,7 +269,7 @@ extension Sequence where Element: Sendable { while let next = await group.next() { res[next.offset] = next.value } - return (res as! [T]).flatMap(\.self) + return (res.asNonOptionalCollection()).flatMap(\.self) } } @@ -416,7 +310,19 @@ extension Sequence where Element: Sendable { while let next = try await group.next() { res[next.offset] = next.value } - return (res as! [T]).flatMap(\.self) + return (res.asNonOptionalCollection()).flatMap(\.self) + } + } +} + +extension Collection{ + func asNonOptionalCollection() -> [T] where Element == Optional { + return self.map { optional in + guard let finalValue = optional else { + fatalError("Mapped task did not complete as expected") + } + + return finalValue } } }