Skip to content

Commit 526ab38

Browse files
authored
Merge pull request #88 from fly-apps/replication-slot-removal-fix
Cleaning up monitoring tasks
2 parents 82fca44 + cf827ab commit 526ab38

File tree

6 files changed

+285
-201
lines changed

6 files changed

+285
-201
lines changed

cmd/monitor/main.go

Lines changed: 18 additions & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -2,218 +2,43 @@ package main
22

33
import (
44
"context"
5-
"encoding/json"
65
"fmt"
7-
"net/http"
8-
"os"
6+
"log"
97
"time"
108

119
"github.com/fly-apps/postgres-flex/internal/flypg"
12-
"github.com/fly-apps/postgres-flex/internal/flypg/admin"
13-
"github.com/jackc/pgx/v5"
14-
15-
"golang.org/x/exp/maps"
1610
)
1711

1812
var (
19-
deadMemberMonitorFrequency = time.Minute * 5
20-
readonlyStateMonitorFrequency = time.Minute * 1
13+
deadMemberMonitorFrequency = time.Hour * 1
14+
replicationStateMonitorFrequency = time.Hour * 1
15+
readonlyStateMonitorFrequency = time.Minute * 1
16+
17+
defaultDeadMemberRemovalThreshold = time.Hour * 12
18+
defaultInactiveSlotRemovalThreshold = time.Hour * 12
2119
)
2220

2321
func main() {
2422
ctx := context.Background()
25-
flypgNode, err := flypg.NewNode()
23+
24+
node, err := flypg.NewNode()
2625
if err != nil {
27-
fmt.Printf("failed to reference node: %s\n", err)
28-
os.Exit(1)
26+
panic(fmt.Sprintf("failed to reference node: %s\n", err))
2927
}
3028

3129
// Dead member monitor
30+
log.Println("Monitoring dead members")
3231
go func() {
33-
internal, err := flypg.ReadFromFile("/data/flypg.internal.conf")
34-
if err != nil {
35-
fmt.Printf("failed to open config: %s\n", err)
36-
os.Exit(1)
37-
}
38-
39-
user, err := flypg.ReadFromFile("/data/flypg.user.conf")
40-
if err != nil {
41-
fmt.Printf("failed to open config: %s\n", err)
42-
os.Exit(1)
43-
}
44-
45-
maps.Copy(user, internal)
46-
47-
deadMemberRemovalThreshold, err := time.ParseDuration(fmt.Sprint(internal["standby_clean_interval"]))
48-
if err != nil {
49-
fmt.Printf(fmt.Sprintf("Failed to parse config: %s", err))
50-
os.Exit(1)
51-
}
52-
53-
seenAt := map[int]time.Time{}
54-
55-
ticker := time.NewTicker(deadMemberMonitorFrequency)
56-
defer ticker.Stop()
57-
58-
fmt.Printf("Pruning every %s...\n", deadMemberRemovalThreshold)
59-
60-
for range ticker.C {
61-
err := handleDeadMemberMonitorTick(ctx, flypgNode, seenAt, deadMemberRemovalThreshold)
62-
if err != nil {
63-
fmt.Println(err)
64-
}
32+
if err := monitorDeadMembers(ctx, node); err != nil {
33+
panic(err)
6534
}
6635
}()
6736

6837
// Readonly monitor
69-
ticker := time.NewTicker(readonlyStateMonitorFrequency)
70-
defer ticker.Stop()
71-
for range ticker.C {
72-
if err := handleReadonlyMonitorTick(ctx, flypgNode); err != nil {
73-
fmt.Println(err)
74-
}
75-
}
76-
77-
}
78-
79-
type readonlyStateResponse struct {
80-
Result bool
81-
}
82-
83-
func handleReadonlyMonitorTick(ctx context.Context, node *flypg.Node) error {
84-
conn, err := node.RepMgr.NewLocalConnection(ctx)
85-
if err != nil {
86-
return fmt.Errorf("failed to open local connection: %s", err)
87-
}
88-
defer conn.Close(ctx)
89-
90-
member, err := node.RepMgr.Member(ctx, conn)
91-
if err != nil {
92-
return fmt.Errorf("failed to query local member: %s", err)
93-
}
94-
95-
if member.Role == flypg.PrimaryRoleName {
96-
return nil
97-
}
98-
99-
primary, err := node.RepMgr.PrimaryMember(ctx, conn)
100-
if err != nil {
101-
return fmt.Errorf("failed to query primary member: %s", err)
102-
}
103-
104-
endpoint := fmt.Sprintf("http://[%s]:5500/%s", primary.Hostname, flypg.ReadOnlyStateEndpoint)
105-
resp, err := http.Get(endpoint)
106-
if err != nil {
107-
return fmt.Errorf("failed to query primary readonly state: %s", err)
108-
}
109-
defer resp.Body.Close()
110-
111-
var state readonlyStateResponse
112-
if err := json.NewDecoder(resp.Body).Decode(&state); err != nil {
113-
return fmt.Errorf("failed to decode result: %s", err)
114-
}
115-
116-
if state.Result {
117-
if !flypg.ReadOnlyLockExists() {
118-
fmt.Printf("Setting connections running under %s to readonly\n", node.PrivateIP)
119-
if err := flypg.EnableReadonly(ctx, node); err != nil {
120-
return fmt.Errorf("failed to set connection under %s to readonly: %s", node.PrivateIP, err)
121-
}
122-
}
123-
} else {
124-
if !flypg.ZombieLockExists() && flypg.ReadOnlyLockExists() {
125-
fmt.Printf("Setting connections running under %s to read/write\n", node.PrivateIP)
126-
if err := flypg.DisableReadonly(ctx, node); err != nil {
127-
return fmt.Errorf("failed to set connections under %s read/write: %s", node.PrivateIP, err)
128-
}
129-
}
130-
}
131-
132-
return nil
133-
}
134-
135-
func handleDeadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int]time.Time, deadMemberRemovalThreshold time.Duration) error {
136-
// TODO - We should connect using the flypgadmin user so we can differentiate between
137-
// internal admin connection usage and the actual repmgr process.
138-
conn, err := node.RepMgr.NewLocalConnection(ctx)
139-
if err != nil {
140-
fmt.Printf("failed to open local connection: %s\n", err)
141-
os.Exit(1)
142-
}
143-
defer conn.Close(ctx)
144-
145-
member, err := node.RepMgr.MemberByID(ctx, conn, int(node.RepMgr.ID))
146-
if err != nil {
147-
return err
148-
}
149-
150-
if member.Role != flypg.PrimaryRoleName {
151-
return nil
152-
}
153-
154-
standbys, err := node.RepMgr.StandbyMembers(ctx, conn)
155-
if err != nil {
156-
return fmt.Errorf("failed to query standbys: %s", err)
157-
}
158-
159-
for _, standby := range standbys {
160-
sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname)
161-
if err != nil {
162-
// TODO - Verify the exception that's getting thrown.
163-
if time.Since(seenAt[standby.ID]) >= deadMemberRemovalThreshold {
164-
if err := node.RepMgr.UnregisterMember(ctx, standby); err != nil {
165-
fmt.Printf("failed to unregister member %s: %v", standby.Hostname, err)
166-
continue
167-
}
168-
169-
delete(seenAt, standby.ID)
170-
}
171-
172-
continue
173-
}
174-
defer sConn.Close(ctx)
38+
log.Println("Monitoring readonly state")
39+
go monitorReadOnly(ctx, node)
17540

176-
seenAt[standby.ID] = time.Now()
177-
}
178-
179-
removeOrphanedReplicationSlots(ctx, conn, standbys)
180-
181-
return nil
182-
}
183-
184-
func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standbys []flypg.Member) {
185-
var orphanedSlots []admin.ReplicationSlot
186-
187-
slots, err := admin.ListReplicationSlots(ctx, conn)
188-
if err != nil {
189-
fmt.Printf("failed to list replication slots: %s", err)
190-
}
191-
192-
// An orphaned replication slot is defined as an inactive replication slot that is no longer tied to
193-
// and existing repmgr member.
194-
for _, slot := range slots {
195-
matchFound := false
196-
for _, standby := range standbys {
197-
if slot.MemberID == int32(standby.ID) {
198-
matchFound = true
199-
}
200-
}
201-
202-
if !matchFound && !slot.Active {
203-
orphanedSlots = append(orphanedSlots, slot)
204-
}
205-
}
206-
207-
if len(orphanedSlots) > 0 {
208-
fmt.Printf("%d orphaned replication slot(s) detected\n", len(orphanedSlots))
209-
210-
for _, slot := range orphanedSlots {
211-
fmt.Printf("Dropping replication slot: %s\n", slot.Name)
212-
213-
if err := admin.DropReplicationSlot(ctx, conn, slot.Name); err != nil {
214-
fmt.Printf("failed to drop replication slot %s: %v\n", slot.Name, err)
215-
continue
216-
}
217-
}
218-
}
41+
// Replication slot monitor
42+
log.Println("Monitoring replication slots")
43+
monitorReplicationSlots(ctx, node)
21944
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"github.com/fly-apps/postgres-flex/internal/flypg"
10+
"golang.org/x/exp/maps"
11+
)
12+
13+
func monitorDeadMembers(ctx context.Context, node *flypg.Node) error {
14+
internal, err := flypg.ReadFromFile("/data/flypg.internal.conf")
15+
if err != nil {
16+
return fmt.Errorf("failed to open config: %s", err)
17+
}
18+
19+
user, err := flypg.ReadFromFile("/data/flypg.user.conf")
20+
if err != nil {
21+
return fmt.Errorf("failed to open config: %s", err)
22+
}
23+
24+
maps.Copy(user, internal)
25+
26+
removalThreshold := defaultDeadMemberRemovalThreshold
27+
28+
if internal["deadMemberRemovalThreshold"] != "" {
29+
removalThreshold, err = time.ParseDuration(fmt.Sprint(internal["deadMemberRemovalThreshold"]))
30+
if err != nil {
31+
log.Printf("failed to parse deadMemberRemovalThreshold: %s\n", err)
32+
}
33+
}
34+
35+
seenAt := map[int]time.Time{}
36+
37+
ticker := time.NewTicker(deadMemberMonitorFrequency)
38+
defer ticker.Stop()
39+
40+
log.Printf("Pruning every %s...\n", removalThreshold)
41+
42+
for range ticker.C {
43+
err := deadMemberMonitorTick(ctx, node, seenAt, removalThreshold)
44+
if err != nil {
45+
log.Printf("deadMemberMonitorTick failed with: %s", err)
46+
}
47+
}
48+
49+
return nil
50+
}
51+
52+
func deadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int]time.Time, deadMemberRemovalThreshold time.Duration) error {
53+
// TODO - We should connect using the flypgadmin user so we can differentiate between
54+
// internal admin connection usage and the actual repmgr process.
55+
conn, err := node.RepMgr.NewLocalConnection(ctx)
56+
if err != nil {
57+
return fmt.Errorf("failed to open local connection: %s", err)
58+
}
59+
defer conn.Close(ctx)
60+
61+
member, err := node.RepMgr.MemberByID(ctx, conn, int(node.RepMgr.ID))
62+
if err != nil {
63+
return err
64+
}
65+
66+
if member.Role != flypg.PrimaryRoleName {
67+
return nil
68+
}
69+
70+
standbys, err := node.RepMgr.StandbyMembers(ctx, conn)
71+
if err != nil {
72+
return fmt.Errorf("failed to query standbys: %s", err)
73+
}
74+
75+
for _, standby := range standbys {
76+
sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname)
77+
if err != nil {
78+
// TODO - Verify the exception that's getting thrown.
79+
if time.Since(seenAt[standby.ID]) >= deadMemberRemovalThreshold {
80+
log.Printf("Removing dead member: %s\n", standby.Hostname)
81+
if err := node.RepMgr.UnregisterMember(ctx, standby); err != nil {
82+
log.Printf("failed to unregister member %s: %v", standby.Hostname, err)
83+
continue
84+
}
85+
delete(seenAt, standby.ID)
86+
}
87+
88+
continue
89+
}
90+
defer sConn.Close(ctx)
91+
seenAt[standby.ID] = time.Now()
92+
}
93+
94+
return nil
95+
}

0 commit comments

Comments
 (0)