33set -uo pipefail
44# set -x
55
6- trap " _ctrl_c" INT
6+ # ## Library ###
7+ function read_messages_forever() (
8+ trap " cleanup" INT
79
8- function read_messages_forever() {
9- local db_uri=$1
10- local log_name=$2
11- local per_message_callback=$3
12- local message=
10+ local g_log_name=$1
11+ local g_db_uri=$2
12+ local g_per_message_callback=$3
13+
14+ local g_poll_loop_pid=
15+ local g_psql_input=
16+ local g_psql_output=
1317
14- _setup_poll " ${db_uri} " " ${log_name} "
15- # read existing once
16- _read_log_entries_from_db " ${db_uri} " " ${log_name} " " ${per_message_callback} "
17- while read line; do
18- if echo " ${line} " | grep -q " Asynchronous notification \" log_${log_name} \" received" ; then
19- _read_log_entries_from_db " ${db_uri} " " ${log_name} " " ${per_message_callback} "
20- fi
21- done < ${output}
22- }
23-
24- function _setup_poll() {
25- local db_uri=$1
26- local log_name=$2
18+ # started at the bottom
19+ function run_poll_loop() {
20+ # read existing once
21+ _read_log_entries_from_db
22+ _poll_psql
23+ while read line; do
24+ if echo " ${line} " | \
25+ grep -q " Asynchronous notification \" log_${g_log_name} \" received" ; then
26+ _read_log_entries_from_db
27+ fi
28+ done < ${g_psql_output}
29+ }
2730
28- # setup polled psql in background
29- input=$( mktemp -t --dry-run psql-input.XXXX)
30- output=$( mktemp -t --dry-run psql-output.XXXX)
31- mkfifo ${input} ${output}
32- psql " ${db_uri} " < ${input} 2>&1 > ${output} &
33- psql_pid=$!
34- exec 3> ${input}
35- _poll_loop " ${log_name} " &
36- poll_loop_pid=$!
37- }
31+ function _read_log_entries_from_db() {
32+ local message=
3833
39- function _poll_loop() {
40- local log_name=$1
34+ while true ; do
35+ message=$( echo " SELECT read_log_entry('${g_log_name} ')" | \
36+ psql --quiet --tuples-only --no-align --no-psqlrc " ${g_db_uri} " )
37+ if [ " ${message} " ]; then
38+ ${g_per_message_callback} " ${message} "
39+ else
40+ break
41+ fi
42+ done
43+ }
4144
42- echo " LISTEN log_${log_name} ;" >&3
43- while true ; do
44- echo " SELECT 1;" >&3
45- sleep 1
46- done
47- }
45+ function _poll_psql() {
46+ _setup_named_pipes
47+ psql --no-psqlrc " ${g_db_uri} " < ${g_psql_input} 2>&1 > ${g_psql_output} &
48+ exec 3> ${g_psql_input} # keep input open
49+ _poll_loop &
50+ g_poll_loop_pid=$!
51+ }
4852
49- function _read_log_entries_from_db() {
50- local db_uri=$1
51- local log_name=$2
52- local per_message_callback=$3
53- local message=
53+ function _setup_named_pipes() {
54+ g_psql_input=$( mktemp -t --dry-run psql-psql_input.XXXX)
55+ g_psql_output=$( mktemp -t --dry-run psql-psql_output.XXXX)
56+ mkfifo ${g_psql_input} ${g_psql_output}
57+ }
58+
59+ function _poll_loop() {
60+ echo " LISTEN log_${g_log_name} ;" > ${g_psql_input}
61+ while true ; do
62+ echo " SELECT 1;" > ${g_psql_input}
63+ sleep 1
64+ done
65+ }
5466
55- while true ; do
56- message=$( echo " SELECT read_log_entry('${log_name} ')" | psql -qtAX " ${db_uri} " )
57- if [ " ${message} " ]; then
58- ${per_message_callback} " ${message} "
59- else
60- break
61- fi
62- done
63- }
67+ function cleanup() {
68+ echo " Exiting"
69+ kill ${g_poll_loop_pid} 2> /dev/null
70+ echo " end; \quit" > ${g_psql_input}
71+ rm -f ${g_psql_input}
72+ rm -f ${g_psql_output}
73+ exit
74+ }
6475
65- function _ctrl_c() {
66- echo " Exiting"
67- kill ${poll_loop_pid}
68- kill ${psql_pid}
69- rm -f ${input}
70- rm -f ${output}
71- exit
72- }
76+ run_poll_loop
77+ )
7378
7479
80+ # ##### Production code #########
7581function print_message() {
7682 local message=$1
7783 echo " Received: ${message} "
7884}
7985
80- read_messages_forever " postgres://samba@/postgres" " orderdata " " print_message"
86+ read_messages_forever " orderdata " " postgres://samba@/postgres" " print_message"
0 commit comments