|  | 
| 23 | 23 | #import "Firestore/Source/API/FIRDocumentReference+Internal.h" | 
| 24 | 24 | #import "Firestore/Source/API/FIRFieldPath+Internal.h" | 
| 25 | 25 | #import "Firestore/Source/API/FIRFirestore+Internal.h" | 
|  | 26 | +#import "Firestore/Source/API/FIRListenerRegistration+Internal.h" | 
| 26 | 27 | #import "Firestore/Source/API/FIRPipelineBridge+Internal.h" | 
| 27 | 28 | #import "Firestore/Source/API/FSTUserDataReader.h" | 
| 28 | 29 | #import "Firestore/Source/API/FSTUserDataWriter.h" | 
|  | 
| 37 | 38 | #include "Firestore/core/src/api/ordering.h" | 
| 38 | 39 | #include "Firestore/core/src/api/pipeline.h" | 
| 39 | 40 | #include "Firestore/core/src/api/pipeline_result.h" | 
|  | 41 | +#include "Firestore/core/src/api/pipeline_result_change.h" | 
| 40 | 42 | #include "Firestore/core/src/api/pipeline_snapshot.h" | 
|  | 43 | +#include "Firestore/core/src/api/query_listener_registration.h" | 
|  | 44 | +#include "Firestore/core/src/api/realtime_pipeline.h" | 
|  | 45 | +#include "Firestore/core/src/api/realtime_pipeline_snapshot.h" | 
|  | 46 | +#include "Firestore/core/src/api/snapshot_metadata.h" | 
| 41 | 47 | #include "Firestore/core/src/api/stages.h" | 
|  | 48 | +#include "Firestore/core/src/core/event_listener.h" | 
|  | 49 | +#include "Firestore/core/src/core/firestore_client.h" | 
|  | 50 | +#include "Firestore/core/src/core/listen_options.h" | 
|  | 51 | +#include "Firestore/core/src/core/view_snapshot.h" | 
| 42 | 52 | #include "Firestore/core/src/util/error_apple.h" | 
| 43 | 53 | #include "Firestore/core/src/util/status.h" | 
| 44 | 54 | #include "Firestore/core/src/util/string_apple.h" | 
|  | 
| 51 | 61 | using firebase::firestore::api::Constant; | 
| 52 | 62 | using firebase::firestore::api::DatabaseSource; | 
| 53 | 63 | using firebase::firestore::api::DistinctStage; | 
|  | 64 | +using firebase::firestore::api::DocumentChange; | 
| 54 | 65 | using firebase::firestore::api::DocumentReference; | 
| 55 | 66 | using firebase::firestore::api::DocumentsSource; | 
| 56 | 67 | using firebase::firestore::api::Expr; | 
|  | 
| 63 | 74 | using firebase::firestore::api::OffsetStage; | 
| 64 | 75 | using firebase::firestore::api::Ordering; | 
| 65 | 76 | using firebase::firestore::api::Pipeline; | 
|  | 77 | +using firebase::firestore::api::PipelineResultChange; | 
|  | 78 | +using firebase::firestore::api::QueryListenerRegistration; | 
|  | 79 | +using firebase::firestore::api::RealtimePipeline; | 
|  | 80 | +using firebase::firestore::api::RealtimePipelineSnapshot; | 
| 66 | 81 | using firebase::firestore::api::RemoveFieldsStage; | 
| 67 | 82 | using firebase::firestore::api::ReplaceWith; | 
| 68 | 83 | using firebase::firestore::api::Sample; | 
| 69 | 84 | using firebase::firestore::api::SelectStage; | 
|  | 85 | +using firebase::firestore::api::SnapshotMetadata; | 
| 70 | 86 | using firebase::firestore::api::SortStage; | 
| 71 | 87 | using firebase::firestore::api::Union; | 
| 72 | 88 | using firebase::firestore::api::Unnest; | 
| 73 | 89 | using firebase::firestore::api::Where; | 
|  | 90 | +using firebase::firestore::core::EventListener; | 
|  | 91 | +using firebase::firestore::core::ViewSnapshot; | 
| 74 | 92 | using firebase::firestore::model::FieldPath; | 
| 75 | 93 | using firebase::firestore::nanopb::SharedMessage; | 
| 76 | 94 | using firebase::firestore::util::MakeCallback; | 
| @@ -928,6 +946,48 @@ - (nullable id)get:(id)field | 
| 928 | 946 | 
 | 
| 929 | 947 | @end | 
| 930 | 948 | 
 | 
|  | 949 | +@implementation __FIRPipelineResultChangeBridge { | 
|  | 950 | +  api::PipelineResultChange change_; | 
|  | 951 | +  std::shared_ptr<api::Firestore> db_; | 
|  | 952 | +} | 
|  | 953 | + | 
|  | 954 | +- (FIRDocumentChangeType)type { | 
|  | 955 | +  switch (change_.type()) { | 
|  | 956 | +    case PipelineResultChange::Type::Added: | 
|  | 957 | +      return FIRDocumentChangeTypeAdded; | 
|  | 958 | +    case PipelineResultChange::Type::Modified: | 
|  | 959 | +      return FIRDocumentChangeTypeModified; | 
|  | 960 | +    case PipelineResultChange::Type::Removed: | 
|  | 961 | +      return FIRDocumentChangeTypeRemoved; | 
|  | 962 | +  } | 
|  | 963 | + | 
|  | 964 | +  HARD_FAIL("Unknown PipelineResultChange::Type: %s", change_.type()); | 
|  | 965 | +} | 
|  | 966 | + | 
|  | 967 | +- (__FIRPipelineResultBridge *)result { | 
|  | 968 | +  return [[__FIRPipelineResultBridge alloc] initWithCppResult:change_.result() db:db_]; | 
|  | 969 | +} | 
|  | 970 | + | 
|  | 971 | +- (NSUInteger)oldIndex { | 
|  | 972 | +  return change_.old_index() == PipelineResultChange::npos ? NSNotFound : change_.old_index(); | 
|  | 973 | +} | 
|  | 974 | + | 
|  | 975 | +- (NSUInteger)newIndex { | 
|  | 976 | +  return change_.new_index() == PipelineResultChange::npos ? NSNotFound : change_.new_index(); | 
|  | 977 | +} | 
|  | 978 | + | 
|  | 979 | +- (id)initWithCppChange:(api::PipelineResultChange)change db:(std::shared_ptr<api::Firestore>)db { | 
|  | 980 | +  self = [super init]; | 
|  | 981 | +  if (self) { | 
|  | 982 | +    change_ = std::move(change); | 
|  | 983 | +    db_ = std::move(db); | 
|  | 984 | +  } | 
|  | 985 | + | 
|  | 986 | +  return self; | 
|  | 987 | +} | 
|  | 988 | + | 
|  | 989 | +@end | 
|  | 990 | + | 
| 931 | 991 | @implementation FIRPipelineBridge { | 
| 932 | 992 |   NSArray<FIRStageBridge *> *_stages; | 
| 933 | 993 |   FIRFirestore *firestore; | 
| @@ -965,4 +1025,127 @@ - (void)executeWithCompletion:(void (^)(__FIRPipelineSnapshotBridge *_Nullable r | 
| 965 | 1025 | 
 | 
| 966 | 1026 | @end | 
| 967 | 1027 | 
 | 
|  | 1028 | +@interface __FIRRealtimePipelineSnapshotBridge () | 
|  | 1029 | + | 
|  | 1030 | +@property(nonatomic, strong, readwrite) NSArray<__FIRPipelineResultBridge *> *results; | 
|  | 1031 | + | 
|  | 1032 | +@property(nonatomic, strong, readwrite) NSArray<__FIRPipelineResultChangeBridge *> *changes; | 
|  | 1033 | + | 
|  | 1034 | +@end | 
|  | 1035 | + | 
|  | 1036 | +@implementation __FIRRealtimePipelineSnapshotBridge { | 
|  | 1037 | +  absl::optional<api::RealtimePipelineSnapshot> snapshot_; | 
|  | 1038 | +  NSMutableArray<__FIRPipelineResultBridge *> *results_; | 
|  | 1039 | +  NSMutableArray<__FIRPipelineResultChangeBridge *> *changes_; | 
|  | 1040 | +} | 
|  | 1041 | + | 
|  | 1042 | +- (id)initWithCppSnapshot:(api::RealtimePipelineSnapshot)snapshot { | 
|  | 1043 | +  self = [super init]; | 
|  | 1044 | +  if (self) { | 
|  | 1045 | +    snapshot_ = std::move(snapshot); | 
|  | 1046 | +    if (!snapshot_.has_value()) { | 
|  | 1047 | +      results_ = nil; | 
|  | 1048 | +    } else { | 
|  | 1049 | +      NSMutableArray<__FIRPipelineResultBridge *> *results = [NSMutableArray array]; | 
|  | 1050 | +      for (auto &result : snapshot_.value().view_snapshot().documents()) { | 
|  | 1051 | +        [results addObject:[[__FIRPipelineResultBridge alloc] | 
|  | 1052 | +                               initWithCppResult:api::PipelineResult(result) | 
|  | 1053 | +                                              db:snapshot_.value().firestore()]]; | 
|  | 1054 | +      } | 
|  | 1055 | +      results_ = results; | 
|  | 1056 | + | 
|  | 1057 | +      NSMutableArray<__FIRPipelineResultChangeBridge *> *changes = [NSMutableArray array]; | 
|  | 1058 | +      for (auto &change : snapshot_.value().CalculateResultChanges(false)) { | 
|  | 1059 | +        [changes addObject:[[__FIRPipelineResultChangeBridge alloc] | 
|  | 1060 | +                               initWithCppChange:change | 
|  | 1061 | +                                              db:snapshot_.value().firestore()]]; | 
|  | 1062 | +      } | 
|  | 1063 | +      changes_ = changes; | 
|  | 1064 | +    } | 
|  | 1065 | +  } | 
|  | 1066 | + | 
|  | 1067 | +  return self; | 
|  | 1068 | +} | 
|  | 1069 | + | 
|  | 1070 | +- (NSArray<__FIRPipelineResultBridge *> *)results { | 
|  | 1071 | +  return results_; | 
|  | 1072 | +} | 
|  | 1073 | + | 
|  | 1074 | +- (NSArray<__FIRPipelineResultChangeBridge *> *)changes { | 
|  | 1075 | +  return changes_; | 
|  | 1076 | +} | 
|  | 1077 | + | 
|  | 1078 | +@end | 
|  | 1079 | + | 
|  | 1080 | +@implementation FIRRealtimePipelineBridge { | 
|  | 1081 | +  NSArray<FIRStageBridge *> *_stages; | 
|  | 1082 | +  FIRFirestore *firestore; | 
|  | 1083 | +  std::shared_ptr<api::RealtimePipeline> cpp_pipeline; | 
|  | 1084 | +} | 
|  | 1085 | + | 
|  | 1086 | +- (id)initWithStages:(NSArray<FIRStageBridge *> *)stages db:(FIRFirestore *)db { | 
|  | 1087 | +  _stages = stages; | 
|  | 1088 | +  firestore = db; | 
|  | 1089 | +  return [super init]; | 
|  | 1090 | +} | 
|  | 1091 | + | 
|  | 1092 | +- (id<FIRListenerRegistration>) | 
|  | 1093 | +    addSnapshotListenerWithOptions:(FIRSnapshotListenOptions *)options | 
|  | 1094 | +                          listener: | 
|  | 1095 | +                              (void (^)(__FIRRealtimePipelineSnapshotBridge *_Nullable snapshot, | 
|  | 1096 | +                                        NSError *_Nullable error))listener { | 
|  | 1097 | +  std::shared_ptr<api::Firestore> wrapped_firestore = firestore.wrapped; | 
|  | 1098 | + | 
|  | 1099 | +  std::vector<std::shared_ptr<firebase::firestore::api::EvaluableStage>> cpp_stages; | 
|  | 1100 | +  for (FIRStageBridge *stage in _stages) { | 
|  | 1101 | +    auto evaluable_stage = std::dynamic_pointer_cast<api::EvaluableStage>( | 
|  | 1102 | +        [stage cppStageWithReader:firestore.dataReader]); | 
|  | 1103 | +    if (evaluable_stage) { | 
|  | 1104 | +      cpp_stages.push_back(evaluable_stage); | 
|  | 1105 | +    } else { | 
|  | 1106 | +      HARD_FAIL("Failed to convert cpp stage to EvaluableStage for RealtimePipeline"); | 
|  | 1107 | +    } | 
|  | 1108 | +  } | 
|  | 1109 | + | 
|  | 1110 | +  cpp_pipeline = std::make_shared<RealtimePipeline>( | 
|  | 1111 | +      cpp_stages, std::make_unique<remote::Serializer>(wrapped_firestore->database_id())); | 
|  | 1112 | + | 
|  | 1113 | +  // Convert from ViewSnapshots to RealtimePipelineSnapshots. | 
|  | 1114 | +  auto view_listener = EventListener<ViewSnapshot>::Create( | 
|  | 1115 | +      [listener, wrapped_firestore](StatusOr<ViewSnapshot> maybe_snapshot) { | 
|  | 1116 | +        if (!maybe_snapshot.status().ok()) { | 
|  | 1117 | +          listener(nil, MakeNSError(maybe_snapshot.status())); | 
|  | 1118 | +          return; | 
|  | 1119 | +        } | 
|  | 1120 | + | 
|  | 1121 | +        ViewSnapshot snapshot = std::move(maybe_snapshot).ValueOrDie(); | 
|  | 1122 | +        SnapshotMetadata metadata(snapshot.has_pending_writes(), snapshot.from_cache()); | 
|  | 1123 | + | 
|  | 1124 | +        listener( | 
|  | 1125 | +            [[__FIRRealtimePipelineSnapshotBridge alloc] | 
|  | 1126 | +                initWithCppSnapshot:RealtimePipelineSnapshot(wrapped_firestore, std::move(snapshot), | 
|  | 1127 | +                                                             std::move(metadata))], | 
|  | 1128 | +            nil); | 
|  | 1129 | +      }); | 
|  | 1130 | + | 
|  | 1131 | +  // Call the view_listener on the user Executor. | 
|  | 1132 | +  auto async_listener = core::AsyncEventListener<ViewSnapshot>::Create( | 
|  | 1133 | +      wrapped_firestore->client()->user_executor(), std::move(view_listener)); | 
|  | 1134 | + | 
|  | 1135 | +  std::shared_ptr<core::QueryListener> query_listener = wrapped_firestore->client()->ListenToQuery( | 
|  | 1136 | +      // TODO(pipeline): pass options properly | 
|  | 1137 | +      *cpp_pipeline, core::ListenOptions(), async_listener); | 
|  | 1138 | + | 
|  | 1139 | +  return [[FSTListenerRegistration alloc] | 
|  | 1140 | +      initWithRegistration:absl::make_unique<QueryListenerRegistration>(wrapped_firestore->client(), | 
|  | 1141 | +                                                                        std::move(async_listener), | 
|  | 1142 | +                                                                        std::move(query_listener))]; | 
|  | 1143 | +} | 
|  | 1144 | + | 
|  | 1145 | +- (std::shared_ptr<api::RealtimePipeline>)cppPipelineWithReader:(FSTUserDataReader *)reader { | 
|  | 1146 | +  return cpp_pipeline; | 
|  | 1147 | +} | 
|  | 1148 | + | 
|  | 1149 | +@end | 
|  | 1150 | + | 
| 968 | 1151 | NS_ASSUME_NONNULL_END | 
0 commit comments