Skip to content

Commit 21516be

Browse files
authored
Remove required consul call (#133)
* Cleanup * Fix lint * This is a bit easier to read * cleanup * Fix comments
1 parent 701552a commit 21516be

File tree

3 files changed

+164
-67
lines changed

3 files changed

+164
-67
lines changed

internal/flypg/node.go

Lines changed: 88 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -212,98 +212,59 @@ func (n *Node) PostInit(ctx context.Context) error {
212212
return fmt.Errorf("unrecoverable zombie")
213213
}
214214

215-
store, err := state.NewStore()
215+
conn, err := n.NewLocalConnection(ctx, "postgres")
216216
if err != nil {
217-
return fmt.Errorf("failed initialize cluster state store. %v", err)
217+
return fmt.Errorf("failed to establish connection to member: %s", err)
218218
}
219+
defer func() { _ = conn.Close(ctx) }()
219220

220-
clusterInitialized, err := store.IsInitializationFlagSet()
221+
// Check to see if we have already been registered with repmgr.
222+
registered, err := isRegistered(ctx, conn, n)
221223
if err != nil {
222-
return fmt.Errorf("failed to verify cluster state: %s", err)
224+
return fmt.Errorf("failed to verify member registration: %s", err)
223225
}
224226

225-
// If the cluster has not yet been initialized, configure ourself as the primary
226-
if !clusterInitialized {
227-
// Verify we reside within the clusters primary region
228-
if !n.RepMgr.eligiblePrimary() {
229-
return fmt.Errorf("unable to configure myself as primary since I do not reside within the primary region %q", n.PrimaryRegion)
230-
}
231-
232-
conn, err := n.NewLocalConnection(ctx, "postgres")
233-
if err != nil {
234-
return fmt.Errorf("failed to establish connection to local node: %s", err)
235-
}
236-
defer func() { _ = conn.Close(ctx) }()
237-
238-
// Create required users
239-
if err := n.setupCredentials(ctx, conn); err != nil {
240-
return fmt.Errorf("failed to create required users: %s", err)
241-
}
242-
243-
// Setup repmgr database and extension
244-
if err := n.RepMgr.enable(ctx, conn); err != nil {
245-
fmt.Printf("failed to setup repmgr: %s\n", err)
246-
}
247-
248-
// Register ourself as the primary
249-
if err := n.RepMgr.registerPrimary(); err != nil {
250-
return fmt.Errorf("failed to register repmgr primary: %s", err)
251-
}
252-
253-
// Set initialization flag within consul so future members know they are joining
254-
// an existing cluster.
255-
if err := store.SetInitializationFlag(); err != nil {
256-
return fmt.Errorf("failed to register cluster with consul")
257-
}
258-
259-
// Ensure connection is closed.
260-
if err := conn.Close(ctx); err != nil {
261-
return fmt.Errorf("failed to close connection: %s", err)
262-
}
263-
264-
} else {
265-
conn, err := n.RepMgr.NewLocalConnection(ctx)
227+
if registered {
228+
// Existing member
229+
repConn, err := n.RepMgr.NewLocalConnection(ctx)
266230
if err != nil {
267231
return fmt.Errorf("failed to establish connection to local repmgr: %s", err)
268232
}
269-
defer func() { _ = conn.Close(ctx) }()
233+
defer func() { _ = repConn.Close(ctx) }()
270234

271-
member, err := n.RepMgr.Member(ctx, conn)
235+
member, err := n.RepMgr.Member(ctx, repConn)
272236
if err != nil {
273-
if !errors.Is(err, pgx.ErrNoRows) {
274-
return fmt.Errorf("failed to resolve member role: %s", err)
275-
}
237+
return fmt.Errorf("failed to resolve member role: %s", err)
276238
}
277239

278-
role := ""
279-
if member != nil {
280-
role = member.Role
281-
}
282-
283-
switch role {
240+
switch member.Role {
284241
case PrimaryRoleName:
285242
// Verify cluster state to ensure we are the actual primary and not a zombie.
286243
primary, err := PerformScreening(ctx, conn, n)
287244
if errors.Is(err, ErrZombieDiagnosisUndecided) {
288245
fmt.Println("Unable to confirm that we are the true primary!")
246+
// Turn member read-only
289247
if err := Quarantine(ctx, n, primary); err != nil {
290248
return fmt.Errorf("failed to quarantine failed primary: %s", err)
291249
}
292250
} else if errors.Is(err, ErrZombieDiscovered) {
293251
fmt.Printf("The majority of registered members agree that '%s' is the real primary.\n", primary)
252+
// Turn member read-only
294253
if err := Quarantine(ctx, n, primary); err != nil {
295254
return fmt.Errorf("failed to quarantine failed primary: %s", err)
296255
}
297256

298-
// Issue panic to force a process restart so we can attempt to rejoin the cluster we've diverged from.
299257
panic(err)
300258
} else if err != nil {
301259
return fmt.Errorf("failed to run zombie diagnosis: %s", err)
302260
}
303261

304262
// This should never happen
305263
if primary != n.PrivateIP {
306-
return fmt.Errorf("resolved primary '%s' does not match ourself '%s'. this should not happen", primary, n.PrivateIP)
264+
return fmt.Errorf("resolved primary '%s' does not match ourself '%s'. this should not happen",
265+
primary,
266+
n.PrivateIP,
267+
)
307268
}
308269

309270
// Readonly lock is set when disk capacity is dangerously high.
@@ -312,21 +273,82 @@ func (n *Node) PostInit(ctx context.Context) error {
312273
return fmt.Errorf("failed to unset read-only: %s", err)
313274
}
314275
}
276+
case StandbyRoleName:
277+
// Register existing standby to take-on any configuration changes.
278+
if err := n.RepMgr.registerStandby(); err != nil {
279+
fmt.Printf("failed to register standby: %s\n", err)
280+
}
315281
default:
316-
if role != "" {
317-
fmt.Println("Updating existing standby")
318-
} else {
319-
fmt.Println("Registering a new standby")
282+
return fmt.Errorf("member has unknown role: %q", member.Role)
283+
}
284+
} else {
285+
// New member
286+
287+
// Check with consul to see if the cluster has already been initialized
288+
store, err := state.NewStore()
289+
if err != nil {
290+
return fmt.Errorf("failed initialize cluster state store. %v", err)
291+
}
292+
293+
// The initialization flag is set after the primary is registered.
294+
clusterInitialized, err := store.IsInitializationFlagSet()
295+
if err != nil {
296+
return fmt.Errorf("failed to verify cluster state: %s", err)
297+
}
298+
299+
if !clusterInitialized {
300+
// Configure as primary
301+
fmt.Println("Registering primary")
302+
303+
// Verify we reside within the clusters primary region
304+
if !n.RepMgr.eligiblePrimary() {
305+
return fmt.Errorf("unable to configure as the primary. expected region: %q, got: %q",
306+
n.PrimaryRegion,
307+
n.RepMgr.Region,
308+
)
309+
}
310+
311+
// Create required users
312+
if err := n.setupCredentials(ctx, conn); err != nil {
313+
return fmt.Errorf("failed to create required users: %s", err)
320314
}
321315

322-
// Register ourself as a standby
316+
// Setup repmgr database and extension
317+
if err := n.RepMgr.enable(ctx, conn); err != nil {
318+
fmt.Printf("failed to setup repmgr: %s\n", err)
319+
}
320+
321+
// Register ourself as the primary
322+
if err := n.RepMgr.registerPrimary(); err != nil {
323+
return fmt.Errorf("failed to register repmgr primary: %s", err)
324+
}
325+
326+
// Set initialization flag within consul so future members know they are joining
327+
// an existing cluster.
328+
if err := store.SetInitializationFlag(); err != nil {
329+
return fmt.Errorf("failed to register cluster with consul")
330+
}
331+
332+
// Let the boot process know that we've already been configured.
333+
if err := issueRegistrationCertificate(); err != nil {
334+
return fmt.Errorf("failed to issue registration certificate: %s", err)
335+
}
336+
337+
// Ensure connection is closed.
338+
if err := conn.Close(ctx); err != nil {
339+
return fmt.Errorf("failed to close connection: %s", err)
340+
}
341+
} else {
342+
// Configure as standby
343+
fmt.Println("Registering standby")
323344
if err := n.RepMgr.registerStandby(); err != nil {
324345
fmt.Printf("failed to register standby: %s\n", err)
325346
}
326-
}
327347

328-
if err := conn.Close(ctx); err != nil {
329-
return fmt.Errorf("failed to close connection: %s", err)
348+
// Let the boot process know that we've already been configured.
349+
if err := issueRegistrationCertificate(); err != nil {
350+
return fmt.Errorf("failed to issue registration certificate: %s", err)
351+
}
330352
}
331353
}
332354

internal/flypg/registration.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package flypg
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"os"
8+
9+
"github.com/fly-apps/postgres-flex/internal/flypg/admin"
10+
"github.com/jackc/pgx/v5"
11+
)
12+
13+
const rCertPath = "/data/.registration"
14+
15+
func isRegistered(ctx context.Context, conn *pgx.Conn, n *Node) (bool, error) {
16+
// Short-circuit if we are holding a certificate
17+
if hasRegistrationCertificate() {
18+
return true, nil
19+
}
20+
21+
// Below is for backwards compatibility
22+
databases, err := admin.ListDatabases(ctx, conn)
23+
if err != nil {
24+
return false, err
25+
}
26+
27+
repmgrExists := false
28+
for _, db := range databases {
29+
if db.Name == n.RepMgr.DatabaseName {
30+
repmgrExists = true
31+
break
32+
}
33+
}
34+
35+
if !repmgrExists {
36+
return false, nil
37+
}
38+
39+
repConn, err := n.RepMgr.NewLocalConnection(ctx)
40+
if err != nil {
41+
return false, err
42+
}
43+
defer func() { _ = repConn.Close(ctx) }()
44+
45+
member, err := n.RepMgr.Member(ctx, conn)
46+
if err != nil {
47+
if errors.Is(err, pgx.ErrNoRows) {
48+
return false, nil
49+
}
50+
return false, fmt.Errorf("failed to resolve member role: %s", err)
51+
}
52+
53+
// If we are active, issue registration certificate
54+
if member.Active {
55+
if err := issueRegistrationCertificate(); err != nil {
56+
fmt.Println("failed to issue registration certificate.")
57+
return true, nil
58+
}
59+
}
60+
61+
return true, nil
62+
}
63+
64+
func issueRegistrationCertificate() error {
65+
return os.WriteFile(rCertPath, []byte(""), 0600)
66+
}
67+
68+
func hasRegistrationCertificate() bool {
69+
if _, err := os.Stat(rCertPath); err != nil {
70+
if os.IsNotExist(err) {
71+
return false
72+
}
73+
}
74+
return true
75+
}

internal/flypg/repmgr.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) {
391391

392392
conn, err := r.NewRemoteConnection(ctx, ip.String())
393393
if err != nil {
394-
fmt.Printf("failed to resolve %s\n", ip.String())
394+
fmt.Printf("failed to resolve %s: %s\n", ip.String(), err)
395395
continue
396396
}
397397
defer func() { _ = conn.Close(ctx) }()

0 commit comments

Comments
 (0)