@@ -2,6 +2,7 @@ package main
22
33import (
44 "context"
5+ "errors"
56 "flag"
67 "fmt"
78 "log"
@@ -10,6 +11,7 @@ import (
1011 "time"
1112
1213 "github.com/fly-apps/postgres-flex/internal/flypg"
14+ "github.com/jackc/pgx/v5"
1315)
1416
1517const eventLogFile = "/data/event.log"
@@ -26,6 +28,8 @@ func main() {
2628 details := flag.String("details", "", "details")
2729 flag.Parse()
2830
31+ ctx := context.Background()
32+
2933 logFile, err := os.OpenFile(eventLogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
3034 if err != nil {
3135 fmt.Printf("failed to open event log: %s", err)
@@ -91,6 +95,38 @@ func main() {
9195 os.Exit(1)
9296 }
9397
98+ case "child_node_disconnect", "child_node_reconnect", "child_node_new_connect":
99+ node, err := flypg.NewNode()
100+ if err != nil {
101+ log.Printf("failed to initialize node: %s", err)
102+ os.Exit(1)
103+ }
104+
105+ conn, err := node.RepMgr.NewLocalConnection(ctx)
106+ if err != nil {
107+ log.Printf("failed to open local connection: %s", err)
108+ os.Exit(1)
109+ }
110+ defer conn.Close(ctx)
111+
112+ member, err := node.RepMgr.Member(ctx, conn)
113+ if err != nil {
114+ log.Printf("failed to resolve member: %s", err)
115+ os.Exit(1)
116+ }
117+
118+ if member.Role != flypg.PrimaryRoleName {
119+ // We should never get here.
120+ log.Println("skipping since we are not the primary")
121+ os.Exit(0)
122+ }
123+
124+ if err := evaluateClusterState(ctx, conn, node); err != nil {
125+ log.Printf("failed to evaluate cluster state: %s", err)
126+ os.Exit(0)
127+ }
128+
129+ os.Exit(0)
94130 default:
95131 // noop
96132 }
@@ -118,3 +154,50 @@ func reconfigurePGBouncer(id int) error {
118154
119155 return nil
120156}
157+
158+ func evaluateClusterState(ctx context.Context, conn *pgx.Conn, node *flypg.Node) error {
159+ standbys, err := node.RepMgr.StandbyMembers(ctx, conn)
160+ if err != nil {
161+ if !errors.Is(err, pgx.ErrNoRows) {
162+ return fmt.Errorf("failed to query standbys")
163+ }
164+ }
165+
166+ sample, err := flypg.TakeDNASample(ctx, node, standbys)
167+ if err != nil {
168+ return fmt.Errorf("failed to evaluate cluster data: %s", err)
169+ }
170+
171+ log.Println(flypg.DNASampleString(sample))
172+
173+ primary, err := flypg.ZombieDiagnosis(sample)
174+ if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
175+ // Quarantine primary
176+ if err := flypg.Quarantine(ctx, conn, node, primary); err != nil {
177+ return fmt.Errorf("failed to quarantine failed primary: %s", err)
178+ }
179+
180+ return fmt.Errorf("primary has been quarantined: %s", err)
181+ } else if err != nil {
182+ return fmt.Errorf("failed to run zombie diagnosis: %s", err)
183+ }
184+
185+ // Clear zombie lock if it exists
186+ if flypg.ZombieLockExists() {
187+ log.Println("Clearing zombie lock and enabling read/write")
188+ if err := flypg.RemoveZombieLock(); err != nil {
189+ return fmt.Errorf("failed to remove zombie lock: %s", err)
190+ }
191+
192+ log.Println("Broadcasting readonly state change")
193+ if err := flypg.BroadcastReadonlyChange(ctx, node, false); err != nil {
194+ log.Printf("errors while disabling readonly: %s", err)
195+ }
196+ }
197+
198+ if err := node.PGBouncer.ConfigurePrimary(ctx, primary, true); err != nil {
199+ return fmt.Errorf("failed to reconfigure pgbouncer primary %s", err)
200+ }
201+
202+ return nil
203+ }
0 commit comments