99#include "event.h"
1010#include "errors.h"
1111
12- typedef struct {
13- int64 id ;
14- StringInfo body ;
15- struct curl_slist * request_headers ;
16- int32 timeout_milliseconds ;
17- } CurlData ;
18-
1912static size_t
2013body_cb (void * contents , size_t size , size_t nmemb , void * userp )
2114{
@@ -48,18 +41,15 @@ static struct curl_slist *pg_text_array_to_slist(ArrayType *array,
4841 return headers ;
4942}
5043
51- // We need a different memory context here, as the parent function will have an SPI memory context, which has a shorter lifetime.
52- static void init_curl_handle (CURLM * curl_mhandle , MemoryContext curl_memctx , int64 id , Datum urlBin , NullableDatum bodyBin , NullableDatum headersBin , Datum methodBin , int32 timeout_milliseconds ){
53- MemoryContext old_ctx = MemoryContextSwitchTo (curl_memctx );
54-
55- CurlData * cdata = palloc (sizeof (CurlData ));
56- cdata -> id = id ;
44+ void init_curl_handle (CurlData * cdata , RequestQueueRow row ){
45+ cdata -> id = row .id ;
5746 cdata -> body = makeStringInfo ();
47+ cdata -> ez_handle = curl_easy_init ();
5848
59- cdata -> timeout_milliseconds = timeout_milliseconds ;
49+ cdata -> timeout_milliseconds = row . timeout_milliseconds ;
6050
61- if (!headersBin .isnull ) {
62- ArrayType * pgHeaders = DatumGetArrayTypeP (headersBin .value );
51+ if (!row . headersBin .isnull ) {
52+ ArrayType * pgHeaders = DatumGetArrayTypeP (row . headersBin .value );
6353 struct curl_slist * request_headers = NULL ;
6454
6555 request_headers = pg_text_array_to_slist (pgHeaders , request_headers );
@@ -69,64 +59,55 @@ static void init_curl_handle(CURLM *curl_mhandle, MemoryContext curl_memctx, int
6959 cdata -> request_headers = request_headers ;
7060 }
7161
72- char * url = TextDatumGetCString (urlBin );
62+ cdata -> url = TextDatumGetCString (row . url );
7363
74- char * reqBody = !bodyBin .isnull ? TextDatumGetCString (bodyBin .value ) : NULL ;
64+ cdata -> req_body = !row . bodyBin .isnull ? TextDatumGetCString (row . bodyBin .value ) : NULL ;
7565
76- char * method = TextDatumGetCString (methodBin );
77- if (strcasecmp (method , "GET" ) != 0 && strcasecmp (method , "POST" ) != 0 && strcasecmp (method , "DELETE" ) != 0 ) {
78- ereport (ERROR , errmsg ("Unsupported request method %s" , method ));
79- }
66+ cdata -> method = TextDatumGetCString (row .method );
8067
81- CURL * curl_ez_handle = curl_easy_init ();
82- if (! curl_ez_handle )
83- ereport ( ERROR , errmsg ( "curl_easy_init()" ));
68+ if ( strcasecmp ( cdata -> method , "GET" ) != 0 && strcasecmp ( cdata -> method , "POST" ) != 0 && strcasecmp ( cdata -> method , "DELETE" ) != 0 ) {
69+ ereport ( ERROR , errmsg ( "Unsupported request method %s" , cdata -> method ));
70+ }
8471
85- if (strcasecmp (method , "GET" ) == 0 ) {
86- if (reqBody ) {
87- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POSTFIELDS , reqBody );
88- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_CUSTOMREQUEST , "GET" );
72+ if (strcasecmp (cdata -> method , "GET" ) == 0 ) {
73+ if (cdata -> req_body ) {
74+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POSTFIELDS , cdata -> req_body );
75+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_CUSTOMREQUEST , "GET" );
8976 }
9077 }
9178
92- if (strcasecmp (method , "POST" ) == 0 ) {
93- if (reqBody ) {
94- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POSTFIELDS , reqBody );
79+ if (strcasecmp (cdata -> method , "POST" ) == 0 ) {
80+ if (cdata -> req_body ) {
81+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POSTFIELDS , cdata -> req_body );
9582 }
9683 else {
97- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POST , 1L );
98- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POSTFIELDSIZE , 0L );
84+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POST , 1L );
85+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POSTFIELDSIZE , 0L );
9986 }
10087 }
10188
102- if (strcasecmp (method , "DELETE" ) == 0 ) {
103- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_CUSTOMREQUEST , "DELETE" );
104- if (reqBody ) {
105- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POSTFIELDS , reqBody );
89+ if (strcasecmp (cdata -> method , "DELETE" ) == 0 ) {
90+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_CUSTOMREQUEST , "DELETE" );
91+ if (cdata -> req_body ) {
92+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POSTFIELDS , cdata -> req_body );
10693 }
10794 }
10895
109- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_WRITEFUNCTION , body_cb );
110- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_WRITEDATA , cdata );
111- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_HEADER , 0L );
112- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_URL , url );
113- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_HTTPHEADER , cdata -> request_headers );
114- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_TIMEOUT_MS , (long ) cdata -> timeout_milliseconds );
115- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_PRIVATE , cdata );
116- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_FOLLOWLOCATION , (long ) true);
96+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_WRITEFUNCTION , body_cb );
97+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_WRITEDATA , cdata );
98+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_HEADER , 0L );
99+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_URL , cdata -> url );
100+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_HTTPHEADER , cdata -> request_headers );
101+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_TIMEOUT_MS , (long ) cdata -> timeout_milliseconds );
102+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_PRIVATE , cdata );
103+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_FOLLOWLOCATION , (long ) true);
117104 if (log_min_messages <= DEBUG2 )
118- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_VERBOSE , 1L );
105+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_VERBOSE , 1L );
119106#if LIBCURL_VERSION_NUM >= 0x075500 /* libcurl 7.85.0 */
120- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_PROTOCOLS_STR , "http,https" );
107+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_PROTOCOLS_STR , "http,https" );
121108#else
122- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_PROTOCOLS , CURLPROTO_HTTP | CURLPROTO_HTTPS );
109+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_PROTOCOLS , CURLPROTO_HTTP | CURLPROTO_HTTPS );
123110#endif
124-
125- EREPORT_MULTI (
126- curl_multi_add_handle (curl_mhandle , curl_ez_handle )
127- );
128-
129- MemoryContextSwitchTo (old_ctx );
130111}
131112
132113void set_curl_mhandle (WorkerState * wstate ){
@@ -137,8 +118,6 @@ void set_curl_mhandle(WorkerState *wstate){
137118}
138119
139120uint64 delete_expired_responses (char * ttl , int batch_size ){
140- SPI_connect ();
141-
142121 int ret_code = SPI_execute_with_args ("\
143122 WITH\
144123 rows AS (\
@@ -164,14 +143,10 @@ uint64 delete_expired_responses(char *ttl, int batch_size){
164143 ereport (ERROR , errmsg ("Error expiring response table rows: %s" , SPI_result_code_string (ret_code )));
165144 }
166145
167- SPI_finish ();
168-
169146 return affected_rows ;
170147}
171148
172- uint64 consume_request_queue (CURLM * curl_mhandle , int batch_size , MemoryContext curl_memctx ){
173- SPI_connect ();
174-
149+ uint64 consume_request_queue (const int batch_size ){
175150 int ret_code = SPI_execute_with_args ("\
176151 WITH\
177152 rows AS (\
@@ -191,47 +166,40 @@ uint64 consume_request_queue(CURLM *curl_mhandle, int batch_size, MemoryContext
191166 if (ret_code != SPI_OK_DELETE_RETURNING )
192167 ereport (ERROR , errmsg ("Error getting http request queue: %s" , SPI_result_code_string (ret_code )));
193168
194- uint64 affected_rows = SPI_processed ;
195-
196- for (size_t j = 0 ; j < affected_rows ; j ++ ) {
197- bool tupIsNull = false;
198-
199- int64 id = DatumGetInt64 (SPI_getbinval (SPI_tuptable -> vals [j ], SPI_tuptable -> tupdesc , 1 , & tupIsNull ));
200- EREPORT_NULL_ATTR (tupIsNull , id );
169+ return SPI_processed ;
170+ }
201171
202- int32 timeout_milliseconds = DatumGetInt32 (SPI_getbinval (SPI_tuptable -> vals [j ], SPI_tuptable -> tupdesc , 4 , & tupIsNull ));
203- EREPORT_NULL_ATTR (tupIsNull , timeout_milliseconds );
172+ // This has an implicit dependency on the execution of delete_return_request_queue,
173+ // unfortunately we're not able to make this dependency explicit
174+ // due to the design of SPI (which uses global variables)
175+ RequestQueueRow get_request_queue_row (HeapTuple spi_tupval , TupleDesc spi_tupdesc ){
176+ bool tupIsNull = false;
204177
205- Datum method = SPI_getbinval (SPI_tuptable -> vals [ j ], SPI_tuptable -> tupdesc , 2 , & tupIsNull );
206- EREPORT_NULL_ATTR (tupIsNull , method );
178+ int64 id = DatumGetInt64 ( SPI_getbinval (spi_tupval , spi_tupdesc , 1 , & tupIsNull ) );
179+ EREPORT_NULL_ATTR (tupIsNull , id );
207180
208- Datum url = SPI_getbinval (SPI_tuptable -> vals [ j ], SPI_tuptable -> tupdesc , 3 , & tupIsNull );
209- EREPORT_NULL_ATTR (tupIsNull , url );
181+ Datum method = SPI_getbinval (spi_tupval , spi_tupdesc , 2 , & tupIsNull );
182+ EREPORT_NULL_ATTR (tupIsNull , method );
210183
211- NullableDatum headersBin = {
212- .value = SPI_getbinval (SPI_tuptable -> vals [j ], SPI_tuptable -> tupdesc , 5 , & tupIsNull ),
213- .isnull = tupIsNull
214- };
184+ Datum url = SPI_getbinval (spi_tupval , spi_tupdesc , 3 , & tupIsNull );
185+ EREPORT_NULL_ATTR (tupIsNull , url );
215186
216- NullableDatum bodyBin = {
217- .value = SPI_getbinval (SPI_tuptable -> vals [j ], SPI_tuptable -> tupdesc , 6 , & tupIsNull ),
218- .isnull = tupIsNull
219- };
187+ int32 timeout_milliseconds = DatumGetInt32 (SPI_getbinval (spi_tupval , spi_tupdesc , 4 , & tupIsNull ));
188+ EREPORT_NULL_ATTR (tupIsNull , timeout_milliseconds );
220189
221- init_curl_handle (curl_mhandle , curl_memctx , id , url , bodyBin , headersBin , method , timeout_milliseconds );
222- }
190+ NullableDatum headersBin = {
191+ .value = SPI_getbinval (spi_tupval , spi_tupdesc , 5 , & tupIsNull ),
192+ .isnull = tupIsNull
193+ };
223194
224- SPI_finish ();
195+ NullableDatum bodyBin = {
196+ .value = SPI_getbinval (spi_tupval , spi_tupdesc , 6 , & tupIsNull ),
197+ .isnull = tupIsNull
198+ };
225199
226- return affected_rows ;
227- }
228-
229- static void pfree_curl_data (CurlData * cdata ){
230- if (cdata -> body ){
231- destroyStringInfo (cdata -> body );
232- }
233- if (cdata -> request_headers ) //curl_slist_free_all already handles the NULL case, but be explicit about it
234- curl_slist_free_all (cdata -> request_headers );
200+ return (RequestQueueRow ){
201+ id , method , url , timeout_milliseconds , headersBin , bodyBin
202+ };
235203}
236204
237205static Jsonb * jsonb_headers_from_curl_handle (CURL * ez_handle ){
@@ -253,11 +221,14 @@ static Jsonb *jsonb_headers_from_curl_handle(CURL *ez_handle){
253221 return jsonb_headers ;
254222}
255223
256- static void insert_response (CURL * ez_handle , CurlData * cdata , CURLcode curl_return_code ){
224+ void insert_response (CURL * ez_handle , CURLcode curl_return_code ){
257225 enum { nparams = 7 }; // using an enum because const size_t nparams doesn't compile
258226 Datum vals [nparams ];
259227 char nulls [nparams ]; MemSet (nulls , 'n' , nparams );
260228
229+ CurlData * cdata = NULL ;
230+ EREPORT_CURL_GETINFO (ez_handle , CURLINFO_PRIVATE , & cdata );
231+
261232 vals [0 ] = Int64GetDatum (cdata -> id );
262233 nulls [0 ] = ' ' ;
263234
@@ -318,36 +289,13 @@ static void insert_response(CURL *ez_handle, CurlData *cdata, CURLcode curl_retu
318289 }
319290}
320291
321- // Switch back to the curl memory context, which has the curl handles stored
322- void insert_curl_responses (WorkerState * wstate , MemoryContext curl_memctx ){
323- MemoryContext old_ctx = MemoryContextSwitchTo (curl_memctx );
324- int msgs_left = 0 ;
325- CURLMsg * msg = NULL ;
326- CURLM * curl_mhandle = wstate -> curl_mhandle ;
327-
328- while ((msg = curl_multi_info_read (curl_mhandle , & msgs_left ))) {
329- if (msg -> msg == CURLMSG_DONE ) {
330- CURLcode return_code = msg -> data .result ;
331- CURL * ez_handle = msg -> easy_handle ;
332- CurlData * cdata = NULL ;
333- EREPORT_CURL_GETINFO (ez_handle , CURLINFO_PRIVATE , & cdata );
334-
335- SPI_connect ();
336- insert_response (ez_handle , cdata , return_code );
337- SPI_finish ();
338-
339- pfree_curl_data (cdata );
292+ void pfree_curl_data (CurlData * cdata ){
293+ pfree (cdata -> url );
294+ pfree (cdata -> method );
340295
341- int res = curl_multi_remove_handle (curl_mhandle , ez_handle );
342- if (res != CURLM_OK )
343- ereport (ERROR , errmsg ("curl_multi_remove_handle: %s" , curl_multi_strerror (res )));
344-
345- curl_easy_cleanup (ez_handle );
346- } else {
347- ereport (ERROR , errmsg ("curl_multi_info_read(), CURLMsg=%d\n" , msg -> msg ));
348- }
349- }
296+ if (cdata -> body )
297+ destroyStringInfo (cdata -> body );
350298
351- MemoryContextSwitchTo (old_ctx );
299+ if (cdata -> request_headers ) //curl_slist_free_all already handles the NULL case, but be explicit about it
300+ curl_slist_free_all (cdata -> request_headers );
352301}
353-
0 commit comments