11#[ cfg( test) ]
22mod tests {
3+ use crate :: pubsub:: { BasePubSub , BasePubSubApiServer } ;
34 use crate :: rpc:: { EthApiExt , EthApiOverrideServer } ;
45 use crate :: state:: FlashblocksState ;
56 use crate :: subscription:: { Flashblock , FlashblocksReceiver , Metadata } ;
@@ -38,9 +39,15 @@ mod tests {
3839 use std:: sync:: Arc ;
3940 use tokio:: sync:: { mpsc, oneshot} ;
4041
42+ // ws
43+ use futures_util:: { SinkExt , StreamExt } ;
44+ use serde_json:: json;
45+ use tokio_tungstenite:: { connect_async, tungstenite:: Message } ;
46+
4147 pub struct NodeContext {
4248 sender : mpsc:: Sender < ( Flashblock , oneshot:: Sender < ( ) > ) > ,
4349 http_api_addr : SocketAddr ,
50+ ws_api_addr : SocketAddr ,
4451 _node_exit_future : NodeExitFuture ,
4552 _node : Box < dyn Any + Sync + Send > ,
4653 _task_manager : TaskManager ,
@@ -85,6 +92,10 @@ mod tests {
8592
8693 Ok ( receipt)
8794 }
95+
96+ pub fn ws_url ( & self ) -> String {
97+ format ! ( "ws://{}" , self . ws_api_addr)
98+ }
8899 }
89100
90101 async fn setup_node ( ) -> eyre:: Result < NodeContext > {
@@ -112,7 +123,12 @@ mod tests {
112123 // Use with_unused_ports() to let Reth allocate random ports and avoid port collisions
113124 let node_config = NodeConfig :: new ( chain_spec. clone ( ) )
114125 . with_network ( network_config. clone ( ) )
115- . with_rpc ( RpcServerArgs :: default ( ) . with_unused_ports ( ) . with_http ( ) )
126+ . with_rpc (
127+ RpcServerArgs :: default ( )
128+ . with_unused_ports ( )
129+ . with_http ( )
130+ . with_ws ( ) ,
131+ )
116132 . with_unused_ports ( ) ;
117133
118134 let node = OpNode :: new ( RollupArgs :: default ( ) ) ;
@@ -142,6 +158,10 @@ mod tests {
142158
143159 ctx. modules . replace_configured ( api_ext. into_rpc ( ) ) ?;
144160
161+ // Register base_subscribe subscription endpoint
162+ let base_pubsub = BasePubSub :: new ( flashblocks_state. clone ( ) ) ;
163+ ctx. modules . merge_configured ( base_pubsub. into_rpc ( ) ) ?;
164+
145165 tokio:: spawn ( async move {
146166 while let Some ( ( payload, tx) ) = receiver. recv ( ) . await {
147167 flashblocks_state. on_flashblock_received ( payload) ;
@@ -159,9 +179,15 @@ mod tests {
159179 . http_local_addr ( )
160180 . ok_or_else ( || eyre:: eyre!( "Failed to get http api address" ) ) ?;
161181
182+ let ws_api_addr = node
183+ . rpc_server_handle ( )
184+ . ws_local_addr ( )
185+ . ok_or_else ( || eyre:: eyre!( "Failed to get websocket api address" ) ) ?;
186+
162187 Ok ( NodeContext {
163188 sender,
164189 http_api_addr,
190+ ws_api_addr,
165191 _node_exit_future : node_exit_future,
166192 _node : Box :: new ( node) ,
167193 _task_manager : tasks,
@@ -881,4 +907,180 @@ mod tests {
881907
882908 Ok ( ( ) )
883909 }
910+
911+ // base_ methods
912+ #[ tokio:: test]
913+ async fn test_base_subscribe_new_flashblocks ( ) -> eyre:: Result < ( ) > {
914+ reth_tracing:: init_test_tracing ( ) ;
915+ let node = setup_node ( ) . await ?;
916+ let ws_url = node. ws_url ( ) ;
917+ let ( mut ws_stream, _) = connect_async ( & ws_url) . await ?;
918+
919+ ws_stream
920+ . send ( Message :: Text (
921+ json ! ( {
922+ "jsonrpc" : "2.0" ,
923+ "id" : 1 ,
924+ "method" : "base_subscribe" ,
925+ "params" : [ "newFlashblocks" ]
926+ } )
927+ . to_string ( )
928+ . into ( ) ,
929+ ) )
930+ . await ?;
931+
932+ let response = ws_stream. next ( ) . await . unwrap ( ) ?;
933+ let sub: serde_json:: Value = serde_json:: from_str ( response. to_text ( ) ?) ?;
934+ assert_eq ! ( sub[ "jsonrpc" ] , "2.0" ) ;
935+ assert_eq ! ( sub[ "id" ] , 1 ) ;
936+ let subscription_id = sub[ "result" ] . as_str ( ) . expect ( "subscription id expected" ) ;
937+
938+ node. send_payload ( create_first_payload ( ) ) . await ?;
939+
940+ let notification = ws_stream. next ( ) . await . unwrap ( ) ?;
941+ let notif: serde_json:: Value = serde_json:: from_str ( notification. to_text ( ) ?) ?;
942+ assert_eq ! ( notif[ "method" ] , "base_subscription" ) ;
943+ assert_eq ! ( notif[ "params" ] [ "subscription" ] , subscription_id) ;
944+
945+ let block = & notif[ "params" ] [ "result" ] ;
946+ assert_eq ! ( block[ "number" ] , "0x1" ) ;
947+ assert ! ( block[ "hash" ] . is_string( ) ) ;
948+ assert ! ( block[ "parentHash" ] . is_string( ) ) ;
949+ assert ! ( block[ "transactions" ] . is_array( ) ) ;
950+ assert_eq ! ( block[ "transactions" ] . as_array( ) . unwrap( ) . len( ) , 1 ) ;
951+
952+ Ok ( ( ) )
953+ }
954+
955+ #[ tokio:: test]
956+ async fn test_base_subscribe_multiple_flashblocks ( ) -> eyre:: Result < ( ) > {
957+ reth_tracing:: init_test_tracing ( ) ;
958+ let node = setup_node ( ) . await ?;
959+ let ws_url = node. ws_url ( ) ;
960+ let ( mut ws_stream, _) = connect_async ( & ws_url) . await ?;
961+
962+ ws_stream
963+ . send ( Message :: Text (
964+ json ! ( {
965+ "jsonrpc" : "2.0" ,
966+ "id" : 1 ,
967+ "method" : "base_subscribe" ,
968+ "params" : [ "newFlashblocks" ]
969+ } )
970+ . to_string ( )
971+ . into ( ) ,
972+ ) )
973+ . await ?;
974+
975+ let response = ws_stream. next ( ) . await . unwrap ( ) ?;
976+ let sub: serde_json:: Value = serde_json:: from_str ( response. to_text ( ) ?) ?;
977+ let subscription_id = sub[ "result" ] . as_str ( ) . expect ( "subscription id expected" ) ;
978+
979+ node. send_payload ( create_first_payload ( ) ) . await ?;
980+
981+ let notif1 = ws_stream. next ( ) . await . unwrap ( ) ?;
982+ let notif1: serde_json:: Value = serde_json:: from_str ( notif1. to_text ( ) ?) ?;
983+ assert_eq ! ( notif1[ "params" ] [ "subscription" ] , subscription_id) ;
984+
985+ let block1 = & notif1[ "params" ] [ "result" ] ;
986+ assert_eq ! ( block1[ "number" ] , "0x1" ) ;
987+ assert_eq ! ( block1[ "transactions" ] . as_array( ) . unwrap( ) . len( ) , 1 ) ;
988+
989+ node. send_payload ( create_second_payload ( ) ) . await ?;
990+
991+ let notif2 = ws_stream. next ( ) . await . unwrap ( ) ?;
992+ let notif2: serde_json:: Value = serde_json:: from_str ( notif2. to_text ( ) ?) ?;
993+ assert_eq ! ( notif2[ "params" ] [ "subscription" ] , subscription_id) ;
994+
995+ let block2 = & notif2[ "params" ] [ "result" ] ;
996+ assert_eq ! ( block1[ "number" ] , block2[ "number" ] ) ; // Same block, incremental updates
997+ assert_eq ! ( block2[ "transactions" ] . as_array( ) . unwrap( ) . len( ) , 5 ) ;
998+
999+ Ok ( ( ) )
1000+ }
1001+
1002+ #[ tokio:: test]
1003+ async fn test_base_unsubscribe ( ) -> eyre:: Result < ( ) > {
1004+ reth_tracing:: init_test_tracing ( ) ;
1005+ let node = setup_node ( ) . await ?;
1006+ let ws_url = node. ws_url ( ) ;
1007+ let ( mut ws_stream, _) = connect_async ( & ws_url) . await ?;
1008+
1009+ ws_stream
1010+ . send ( Message :: Text (
1011+ json ! ( {
1012+ "jsonrpc" : "2.0" ,
1013+ "id" : 1 ,
1014+ "method" : "base_subscribe" ,
1015+ "params" : [ "newFlashblocks" ]
1016+ } )
1017+ . to_string ( )
1018+ . into ( ) ,
1019+ ) )
1020+ . await ?;
1021+
1022+ let response = ws_stream. next ( ) . await . unwrap ( ) ?;
1023+ let sub: serde_json:: Value = serde_json:: from_str ( response. to_text ( ) ?) ?;
1024+ let subscription_id = sub[ "result" ] . as_str ( ) . expect ( "subscription id expected" ) ;
1025+
1026+ ws_stream
1027+ . send ( Message :: Text (
1028+ json ! ( {
1029+ "jsonrpc" : "2.0" ,
1030+ "id" : 2 ,
1031+ "method" : "base_unsubscribe" ,
1032+ "params" : [ subscription_id]
1033+ } )
1034+ . to_string ( )
1035+ . into ( ) ,
1036+ ) )
1037+ . await ?;
1038+
1039+ let unsub = ws_stream. next ( ) . await . unwrap ( ) ?;
1040+ let unsub: serde_json:: Value = serde_json:: from_str ( unsub. to_text ( ) ?) ?;
1041+ assert_eq ! ( unsub[ "jsonrpc" ] , "2.0" ) ;
1042+ assert_eq ! ( unsub[ "id" ] , 2 ) ;
1043+ assert_eq ! ( unsub[ "result" ] , true ) ;
1044+
1045+ Ok ( ( ) )
1046+ }
1047+
1048+ #[ tokio:: test]
1049+ async fn test_base_subscribe_multiple_clients ( ) -> eyre:: Result < ( ) > {
1050+ reth_tracing:: init_test_tracing ( ) ;
1051+ let node = setup_node ( ) . await ?;
1052+ let ws_url = node. ws_url ( ) ;
1053+ let ( mut ws1, _) = connect_async ( & ws_url) . await ?;
1054+ let ( mut ws2, _) = connect_async ( & ws_url) . await ?;
1055+
1056+ let req = json ! ( {
1057+ "jsonrpc" : "2.0" ,
1058+ "id" : 1 ,
1059+ "method" : "base_subscribe" ,
1060+ "params" : [ "newFlashblocks" ]
1061+ } ) ;
1062+ ws1. send ( Message :: Text ( req. to_string ( ) . into ( ) ) ) . await ?;
1063+ ws2. send ( Message :: Text ( req. to_string ( ) . into ( ) ) ) . await ?;
1064+
1065+ let _sub1 = ws1. next ( ) . await . unwrap ( ) ?;
1066+ let _sub2 = ws2. next ( ) . await . unwrap ( ) ?;
1067+
1068+ node. send_payload ( create_first_payload ( ) ) . await ?;
1069+
1070+ let notif1 = ws1. next ( ) . await . unwrap ( ) ?;
1071+ let notif1: serde_json:: Value = serde_json:: from_str ( notif1. to_text ( ) ?) ?;
1072+ let notif2 = ws2. next ( ) . await . unwrap ( ) ?;
1073+ let notif2: serde_json:: Value = serde_json:: from_str ( notif2. to_text ( ) ?) ?;
1074+
1075+ assert_eq ! ( notif1[ "method" ] , "base_subscription" ) ;
1076+ assert_eq ! ( notif2[ "method" ] , "base_subscription" ) ;
1077+
1078+ let block1 = & notif1[ "params" ] [ "result" ] ;
1079+ let block2 = & notif2[ "params" ] [ "result" ] ;
1080+ assert_eq ! ( block1[ "number" ] , "0x1" ) ;
1081+ assert_eq ! ( block1[ "number" ] , block2[ "number" ] ) ;
1082+ assert_eq ! ( block1[ "hash" ] , block2[ "hash" ] ) ;
1083+
1084+ Ok ( ( ) )
1085+ }
8841086}
0 commit comments