Skip to content

Commit b39d479

Browse files
committed
rounding out some edges
1 parent 243e2cf commit b39d479

File tree

4 files changed

+140
-73
lines changed

4 files changed

+140
-73
lines changed

pkg/flypg/node.go

Lines changed: 87 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ type Credentials struct {
2727
}
2828

2929
type Node struct {
30-
AppName string
31-
PrivateIP string
32-
DataDir string
33-
Port int
34-
PGConfig *PGConfig
30+
AppName string
31+
PrivateIP string
32+
PrimaryRegion string
33+
DataDir string
34+
Port int
35+
PGConfig *PGConfig
3536

3637
SUCredentials Credentials
3738
OperatorCredentials Credentials
@@ -59,6 +60,11 @@ func NewNode() (*Node, error) {
5960
}
6061
node.PrivateIP = ipv6.String()
6162

63+
node.PrimaryRegion = os.Getenv("PRIMARY_REGION")
64+
if node.PrimaryRegion == "" {
65+
return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set")
66+
}
67+
6268
if port, err := strconv.Atoi(os.Getenv("PG_PORT")); err == nil {
6369
node.Port = port
6470
}
@@ -99,6 +105,7 @@ func NewNode() (*Node, error) {
99105
node.RepMgr = RepMgr{
100106
ID: rand.Int31(),
101107
AppName: node.AppName,
108+
PrimaryRegion: node.PrimaryRegion,
102109
Region: os.Getenv("FLY_REGION"),
103110
ConfigPath: "/data/repmgr.conf",
104111
InternalConfigPath: "/data/repmgr.internal.conf",
@@ -120,21 +127,49 @@ func (n *Node) Init(ctx context.Context) error {
120127
return err
121128
}
122129

123-
// Attempt to re-introduce zombie back into the cluster.
124130
if ZombieLockExists() {
125131
fmt.Println("Zombie lock detected!")
126-
zHostname, err := readZombieLock()
132+
primaryStr, err := readZombieLock()
127133
if err != nil {
128-
return fmt.Errorf("failed to read zombie lock: %s", zHostname)
134+
return fmt.Errorf("failed to read zombie lock: %s", primaryStr)
129135
}
130136

131-
if zHostname != "" {
132-
ip := net.ParseIP(zHostname)
137+
// If the zombie lock contains a hostname, it means we were able to resolve the real primary and
138+
// will attempt to rejoin it.
139+
if primaryStr != "" {
140+
ip := net.ParseIP(primaryStr)
133141
if ip == nil {
134-
return fmt.Errorf("zombie.lock file contained an invalid ipv6 address")
142+
return fmt.Errorf("zombie.lock file contains an invalid ipv6 address")
143+
}
144+
145+
conn, err := n.RepMgr.NewRemoteConnection(ctx, ip.String())
146+
if err != nil {
147+
return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", ip.String(), err)
135148
}
149+
defer conn.Close(ctx)
150+
151+
primary, err := n.RepMgr.PrimaryMember(ctx, conn)
152+
if err != nil {
153+
return fmt.Errorf("failed to confirm primary on recover target %s: %s", ip.String(), err)
154+
}
155+
156+
// Confirm that our rejoin target still identifies itself as the primary.
157+
if primary.Hostname != ip.String() {
158+
// Clear the zombie.lock file so we can attempt to re-resolve the correct primary.
159+
if err := removeZombieLock(); err != nil {
160+
return fmt.Errorf("failed to remove zombie lock: %s", err)
161+
}
136162

137-
if err := n.RepMgr.rejoinCluster(zHostname); err != nil {
163+
return ErrZombieLockPrimaryMismatch
164+
}
165+
166+
// If the primary does not reside within our primary region, we cannot rejoin until it is.
167+
if primary.Region != n.PrimaryRegion {
168+
fmt.Printf("Primary region mismatch detected. The primary lives in '%s', while PRIMARY_REGION is set to '%s'\n", primary.Region, n.PrimaryRegion)
169+
return ErrZombieLockRegionMismatch
170+
}
171+
172+
if err := n.RepMgr.rejoinCluster(primary.Hostname); err != nil {
138173
return fmt.Errorf("failed to rejoin cluster: %s", err)
139174
}
140175

@@ -146,8 +181,8 @@ func (n *Node) Init(ctx context.Context) error {
146181
utils.RunCommand("pg_ctl -D /data/postgresql/ stop")
147182
} else {
148183
// TODO - Provide link to documention on how to address this
149-
fmt.Println("The zombie lock file does not contain a hostname.")
150-
fmt.Println("This likely means that we were unable to build a consensus on who the real primary is.")
184+
fmt.Println("Zombie lock file does not contain a hostname.")
185+
fmt.Println("This likely means that we were unable to determine who the real primary is.")
151186
}
152187
}
153188

@@ -232,7 +267,7 @@ func (n *Node) PostInit(ctx context.Context) error {
232267
if !clusterInitialized {
233268
// Check if we can be a primary
234269
if !repmgr.eligiblePrimary() {
235-
return fmt.Errorf("no primary to follow and can't configure self as primary because primary region is '%s' and we are in '%s'", os.Getenv("PRIMARY_REGION"), repmgr.Region)
270+
return fmt.Errorf("no primary to follow and can't configure self as primary because primary region is '%s' and we are in '%s'", n.PrimaryRegion, repmgr.Region)
236271
}
237272

238273
// Create required users
@@ -264,14 +299,13 @@ func (n *Node) PostInit(ctx context.Context) error {
264299

265300
member, err := repmgr.Member(ctx, conn)
266301
if err != nil {
267-
// member will not be resolveable if the member has not yet been registered
268302
if !errors.Is(err, pgx.ErrNoRows) {
269303
return fmt.Errorf("failed to resolve member role: %s", err)
270304
}
271305
}
272306

273307
role := ""
274-
if member != nil && member.Role != "" {
308+
if member != nil {
275309
role = member.Role
276310
}
277311

@@ -284,14 +318,15 @@ func (n *Node) PostInit(ctx context.Context) error {
284318
}
285319
}
286320

287-
totalMembers := len(standbys) + 1 // include self
288-
totalActive := 1 // include self
321+
totalMembers := len(standbys) + 1
322+
totalActive := 1
289323
totalInactive := 0
290324
totalConflicts := 0
291325

292326
conflictMap := map[string]int{}
293327

294328
for _, standby := range standbys {
329+
// Check for connectivity
295330
mConn, err := repmgr.NewRemoteConnection(ctx, standby.Hostname)
296331
if err != nil {
297332
fmt.Printf("failed to connect to %s", standby.Hostname)
@@ -300,6 +335,7 @@ func (n *Node) PostInit(ctx context.Context) error {
300335
}
301336
defer mConn.Close(ctx)
302337

338+
// Verify the primary
303339
primary, err := repmgr.PrimaryMember(ctx, mConn)
304340
if err != nil {
305341
fmt.Printf("failed to resolve primary from standby %s", standby.Hostname)
@@ -309,45 +345,56 @@ func (n *Node) PostInit(ctx context.Context) error {
309345

310346
totalActive++
311347

348+
// Record conflict when primary doesn't match.
312349
if primary.Hostname != n.PrivateIP {
313350
totalConflicts++
314351
conflictMap[primary.Hostname]++
315352
}
316353
}
317354

318-
// Using the cluster state metrics, determine whether it's safe to boot as a primary.
319355
primary, err := ZombieDiagnosis(n.PrivateIP, totalMembers, totalInactive, totalActive, conflictMap)
320-
if errors.Is(err, ErrZombieDiscovered) {
321-
fmt.Println("Unable to confirm we are the real primary!")
356+
if errors.Is(err, ErrZombieDiagnosisUndecided) {
357+
fmt.Println("Unable to confirm that we are the true primary!")
322358
fmt.Printf("Registered members: %d, Active member(s): %d, Inactive member(s): %d, Conflicts detected: %d\n",
323359
totalMembers,
324360
totalActive,
325361
totalInactive,
326362
totalConflicts,
327363
)
328364

329-
fmt.Println("Identifying ourself as a Zombie")
365+
fmt.Println("Writing zombie.lock file.")
366+
if err := writeZombieLock(""); err != nil {
367+
return fmt.Errorf("failed to set zombie lock: %s", err)
368+
}
369+
370+
fmt.Println("Turning all user-created databases readonly.")
371+
if err := admin.SetReadOnly(ctx, conn); err != nil {
372+
return fmt.Errorf("failed to set read-only: %s", err)
373+
}
374+
375+
// TODO - Add link to docs
376+
fmt.Println("Please refer to following documentation for more information: <insert-doc-link-here>.")
377+
378+
} else if errors.Is(err, ErrZombieDiscovered) {
379+
fmt.Println("Zombie primary discovered!")
380+
fmt.Printf("The majority of registered members agree that '%s' is the real primary.\n", primary)
330381

331-
// If primary is non-empty we were able to build a consensus on who the real primary is.
332-
if primary != "" {
333-
fmt.Printf("Majority of members agree that %s is the real primary\n", primary)
334-
fmt.Println("Reconfiguring PGBouncer to point to the real primary")
335-
if err := n.PGBouncer.ConfigurePrimary(ctx, primary, true); err != nil {
336-
return fmt.Errorf("failed to reconfigure pgbouncer: %s", err)
337-
}
382+
fmt.Printf("Reconfiguring PGBouncer to point to '%s'\n", primary)
383+
if err := n.PGBouncer.ConfigurePrimary(ctx, primary, true); err != nil {
384+
return fmt.Errorf("failed to reconfigure pgbouncer: %s", err)
338385
}
339-
// Create a zombie.lock file containing the resolved primary.
340-
// Note: This will be an empty string if we are unable to resolve the real primary.
386+
387+
fmt.Println("Writing zombie.lock file")
341388
if err := writeZombieLock(primary); err != nil {
342389
return fmt.Errorf("failed to set zombie lock: %s", err)
343390
}
344391

345-
fmt.Println("User-created databases are being made readonly")
392+
fmt.Println("Turning user-created databases read-only")
346393
if err := admin.SetReadOnly(ctx, conn); err != nil {
347394
return fmt.Errorf("failed to set read-only: %s", err)
348395
}
349396

350-
panic("zombie primary detected.")
397+
panic(err)
351398
} else if err != nil {
352399
return fmt.Errorf("failed to run zombie diagnosis: %s", err)
353400
}
@@ -385,20 +432,7 @@ func (n *Node) PostInit(ctx context.Context) error {
385432
}
386433
defer repConn.Close(ctx)
387434

388-
if err := n.ReconfigurePGBouncerPrimary(ctx, repConn); err != nil {
389-
return fmt.Errorf("failed to configure PGBouncer: %s", err)
390-
}
391-
392-
return nil
393-
}
394-
395-
func (n *Node) NewLocalConnection(ctx context.Context, database string) (*pgx.Conn, error) {
396-
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.Port))
397-
return openConnection(ctx, host, database, n.OperatorCredentials)
398-
}
399-
400-
func (n *Node) ReconfigurePGBouncerPrimary(ctx context.Context, conn *pgx.Conn) error {
401-
member, err := n.RepMgr.PrimaryMember(ctx, conn)
435+
member, err := n.RepMgr.PrimaryMember(ctx, repConn)
402436
if err != nil {
403437
return fmt.Errorf("failed to find primary: %s", err)
404438
}
@@ -410,6 +444,11 @@ func (n *Node) ReconfigurePGBouncerPrimary(ctx context.Context, conn *pgx.Conn)
410444
return nil
411445
}
412446

447+
func (n *Node) NewLocalConnection(ctx context.Context, database string) (*pgx.Conn, error) {
448+
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.Port))
449+
return openConnection(ctx, host, database, n.OperatorCredentials)
450+
}
451+
413452
func (n *Node) initializePG() error {
414453
if n.isPGInitialized() {
415454
return nil

pkg/flypg/repmgr.go

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const (
2323
type RepMgr struct {
2424
ID int32
2525
AppName string
26+
PrimaryRegion string
2627
Region string
2728
PrivateIP string
2829
DataDir string
@@ -242,11 +243,12 @@ type Member struct {
242243
ID int
243244
Hostname string
244245
Active bool
246+
Region string
245247
Role string
246248
}
247249

248250
func Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) {
249-
sql := "select node_id, node_name, active, type from repmgr.nodes;"
251+
sql := "select node_id, node_name, location, active, type from repmgr.nodes;"
250252
rows, err := pg.Query(ctx, sql)
251253
if err != nil {
252254
return nil, err
@@ -256,7 +258,7 @@ func Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) {
256258

257259
for rows.Next() {
258260
var member Member
259-
if err := rows.Scan(&member.ID, &member.Hostname, &member.Active, &member.Role); err != nil {
261+
if err := rows.Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role); err != nil {
260262
return nil, err
261263
}
262264

@@ -283,8 +285,8 @@ func (r *RepMgr) Member(ctx context.Context, conn *pgx.Conn) (*Member, error) {
283285

284286
func (r *RepMgr) PrimaryMember(ctx context.Context, pg *pgx.Conn) (*Member, error) {
285287
var member Member
286-
sql := "select node_id, node_name, active, type from repmgr.nodes where type = 'primary' and active = true;"
287-
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Active, &member.Role)
288+
sql := "select node_id, node_name, location, active, type from repmgr.nodes where type = 'primary' and active = true;"
289+
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role)
288290
if err != nil {
289291
return nil, err
290292
}
@@ -311,9 +313,9 @@ func (r *RepMgr) StandbyMembers(ctx context.Context, conn *pgx.Conn) ([]Member,
311313

312314
func (r *RepMgr) MemberByID(ctx context.Context, pg *pgx.Conn, id int) (*Member, error) {
313315
var member Member
314-
sql := fmt.Sprintf("select node_id, node_name, active, type from repmgr.nodes where node_id = %d;", id)
316+
sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_id = %d;", id)
315317

316-
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Active, &member.Role)
318+
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role)
317319
if err != nil {
318320
return nil, err
319321
}
@@ -323,9 +325,9 @@ func (r *RepMgr) MemberByID(ctx context.Context, pg *pgx.Conn, id int) (*Member,
323325

324326
func (r *RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname string) (*Member, error) {
325327
var member Member
326-
sql := fmt.Sprintf("select node_id, node_name, active, type from repmgr.nodes where node_name = '%s';", hostname)
328+
sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_name = '%s';", hostname)
327329

328-
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Active, &member.Role)
330+
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role)
329331
if err != nil {
330332
return nil, err
331333
}
@@ -334,10 +336,7 @@ func (r *RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname st
334336
}
335337

336338
func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) {
337-
primaryRegion := os.Getenv("PRIMARY_REGION")
338-
339-
targets := fmt.Sprintf("%s.%s", primaryRegion, r.AppName)
340-
ips, err := privnet.AllPeers(ctx, targets)
339+
ips, err := r.InRegionPeerIPs(ctx)
341340
if err != nil {
342341
return nil, err
343342
}
@@ -375,6 +374,27 @@ func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) {
375374
return target, nil
376375
}
377376

377+
func (r *RepMgr) InRegionPeerIPs(ctx context.Context) ([]net.IPAddr, error) {
378+
targets := fmt.Sprintf("%s.%s", r.PrimaryRegion, r.AppName)
379+
380+
return privnet.AllPeers(ctx, targets)
381+
}
382+
383+
func (r *RepMgr) HostInRegion(ctx context.Context, hostname string) (bool, error) {
384+
ips, err := r.InRegionPeerIPs(ctx)
385+
if err != nil {
386+
return false, err
387+
}
388+
389+
for _, ip := range ips {
390+
if ip.String() == hostname {
391+
return true, nil
392+
}
393+
}
394+
395+
return false, nil
396+
}
397+
378398
func (r *RepMgr) UnregisterMember(ctx context.Context, member Member) error {
379399
if err := r.unregisterStandby(member.ID); err != nil {
380400
return fmt.Errorf("failed to unregister member %d from repmgr: %s", member.ID, err)
@@ -397,5 +417,5 @@ func (r *RepMgr) UnregisterMemberByHostname(ctx context.Context, conn *pgx.Conn,
397417
}
398418

399419
func (r *RepMgr) eligiblePrimary() bool {
400-
return r.Region == os.Getenv("PRIMARY_REGION")
420+
return r.Region == r.PrimaryRegion
401421
}

0 commit comments

Comments
 (0)