diff --git a/cmd/root.go b/cmd/root.go index 9142301e..08e06b47 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -11,6 +11,7 @@ import ( "os" "os/signal" "path/filepath" + "strconv" "strings" "syscall" "time" @@ -37,6 +38,22 @@ import ( var cfgFile string var logger = logrus.New() +// ingestRPCPoolSize returns the max number of pooled HTTP connections to the +// backend RPC. It defaults to the ingest worker count (LWD_INGEST_WORKERS, 8) +// plus headroom for concurrent frontend gRPC traffic, and can be overridden +// directly with LWD_RPC_POOL. +func ingestRPCPoolSize() int { + workers := 8 + if v, err := strconv.Atoi(os.Getenv("LWD_INGEST_WORKERS")); err == nil && v > 0 { + workers = v + } + pool := workers + 16 + if v, err := strconv.Atoi(os.Getenv("LWD_RPC_POOL")); err == nil && v > 0 { + pool = v + } + return pool +} + // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ Use: "lightwalletd", @@ -192,15 +209,15 @@ func startServer(opts *common.Options) error { // of block streamer. var chainName string - var rpcClient *rpcclient.Client + var connCfg *rpcclient.ConnConfig var err error if opts.Darkside { chainName = "darkside" } else { if opts.RPCUser != "" && opts.RPCPassword != "" && opts.RPCHost != "" && opts.RPCPort != "" { - rpcClient, err = frontend.NewZRPCFromFlags(opts) + connCfg, err = frontend.NewZRPCFromFlags(opts) } else { - rpcClient, err = frontend.NewZRPCFromConf(opts.ZcashConfPath) + connCfg, err = frontend.NewZRPCFromConf(opts.ZcashConfPath) } if err != nil { common.Log.WithFields(logrus.Fields{ @@ -208,7 +225,11 @@ func startServer(opts *common.Options) error { }).Fatal("setting up RPC connection to zebrad or zcashd") } // Indirect function for test mocking (so unit tests can talk to stub functions). - common.RawRequest = rpcClient.RawRequest + // zr8: use a connection-pooled, concurrent HTTP requester instead of the + // btcd rpcclient, which serializes all POSTs through a single goroutine and + // throttled the parallel block ingestor. Pool size tracks ingest concurrency + // with headroom for frontend gRPC traffic. + common.RawRequest = frontend.NewZcashRPCRawRequester(connCfg, ingestRPCPoolSize()) // Ensure that we can communicate with zcashd common.FirstRPC() diff --git a/common/common.go b/common/common.go index b422810b..d1a7424a 100644 --- a/common/common.go +++ b/common/common.go @@ -10,9 +10,11 @@ import ( "encoding/json" "errors" "fmt" + "os" "slices" "strconv" "strings" + "sync" "time" "github.com/sirupsen/logrus" @@ -421,7 +423,182 @@ func stopIngestor() { // BlockIngestor runs as a goroutine and polls zcashd for new blocks, adding them // to the cache. The repetition count, rep, is nonzero only for unit-testing. +// +// zr7: in production (rep == 0, non-darkside) it uses a parallel prefetch path +// (blockIngestorParallel) that fetches a window of blocks concurrently and +// commits them to the cache in strict height order. During bulk catch-up this +// saturates the backend instead of being bottlenecked on serial per-block RPC +// latency. Unit tests and darkside mode require deterministic, single-threaded +// RPC ordering, so they keep the original serial path (blockIngestorSerial). func BlockIngestor(c *BlockCache, rep int) { + if rep != 0 || DarksideEnabled { + blockIngestorSerial(c, rep) + return + } + blockIngestorParallel(c) +} + +// ingestWindow is how many blocks ahead of the commit point we prefetch, and +// ingestWorkers is the max number of concurrent getblock RPCs. Both are tunable +// at runtime via env vars so fetch concurrency can be adjusted without a rebuild. +func ingestWindow() int { + if v, err := strconv.Atoi(os.Getenv("LWD_INGEST_WINDOW")); err == nil && v > 0 { + return v + } + return 64 +} + +func ingestWorkers() int { + if v, err := strconv.Atoi(os.Getenv("LWD_INGEST_WORKERS")); err == nil && v > 0 { + return v + } + return 8 +} + +// blockIngestorParallel fetches blocks [next, next+batch) concurrently and +// commits them to the cache in strict height order. The cache requires in-order +// Add(), so only the (latency-dominated) fetch+parse is parallelized; the commit +// stays sequential and performs the same prev-hash chain / reorg checks as the +// serial path. (#1) +// +// It also stops polling the backend's tip on every block: the tip height is +// refreshed only once we've caught up to the last known tip, eliminating one RPC +// round-trip per block during bulk sync. (#2) +// +// While caught up, the backend's tip hash (which getblockchaininfo already +// returns, so no extra RPC) is compared against the cache tip so that a reorg +// that replaces the tip block without advancing the height is detected within +// one poll interval, matching the serial path's getbestblockhash check. (#3) +func blockIngestorParallel(c *BlockCache) { + lastLog := Time.Now() + lastHeightLogged := 0 + window := ingestWindow() + workers := ingestWorkers() + if workers > window { + workers = window + } + Log.Info("block ingestor: parallel prefetch enabled (workers=", workers, ", window=", window, ")") + + tipHeight := -1 // last known backend tip height; -1 forces a refresh + var tipHash hash32.T // backend tip hash (big-endian); Nil if unknown + + for { + // stop if requested + select { + case <-stopIngestorChan: + return + default: + } + + next := c.GetNextHeight() + + // (#2) Only ask the backend for its tip when we've reached the last + // known tip. During bulk catch-up this avoids an RPC per block. + if next > tipHeight { + ci, err := GetBlockChainInfo() + if err != nil { + Log.WithFields(logrus.Fields{ + "error": err, + }).Warn("error " + NodeName + " getblockchaininfo rpc, will retry") + Time.Sleep(8 * time.Second) + continue + } + tipHeight = int(ci.Blocks) + if tipHash, err = hash32.Decode(ci.BestBlockHash); err != nil { + // No usable tip hash from this backend; fall back to + // height-only idle detection. + tipHash = hash32.Nil + } + } + + if next > tipHeight { + // Caught up; nothing new. + // (#3) The backend's tip hash not matching ours means the tip was + // replaced by a reorg that didn't advance the height (or the chain + // shrank below our tip). Walk back one block, same as the serial + // path does when getbestblockhash stops matching; the prev-hash + // check on re-fetch walks back further if the reorg is deeper. + if latest := c.GetLatestHash(); latest != hash32.Nil && + tipHash != hash32.Nil && tipHash != hash32.Reverse(latest) { + Log.Info("REORG: dropping block ", next-1, " ", displayHash(latest)) + c.Reorg(next - 1) + continue + } + // Mirror the serial path's behaviour. + c.Sync() + if lastHeightLogged != next-1 { + lastHeightLogged = next - 1 + Log.Info("Waiting for block: ", next) + } + Time.Sleep(2 * time.Second) + continue + } + + // (#1) Fetch [next, next+batch) concurrently. + batch := tipHeight - next + 1 + if batch > window { + batch = window + } + blocks := make([]*walletrpc.CompactBlock, batch) + errs := make([]error, batch) + sem := make(chan struct{}, workers) + var wg sync.WaitGroup + for j := 0; j < batch; j++ { + wg.Add(1) + sem <- struct{}{} + go func(j int) { + defer wg.Done() + defer func() { <-sem }() + blocks[j], errs[j] = getBlockFromRPC(next + j) + }(j) + } + wg.Wait() + + // Commit in strict height order. Any error, missing block, or chain + // mismatch stops this batch; the outer loop re-evaluates from the + // (possibly rewound) nextBlock and re-fetches as needed. + for j := 0; j < batch; j++ { + height := next + j + if errs[j] != nil { + Log.Info("getblock ", height, " failed, will retry: ", errs[j]) + Time.Sleep(8 * time.Second) + break + } + block := blocks[j] + if block == nil { + // Backend doesn't have this height yet (e.g. a reorg shrank the + // chain since we read the tip). Force a tip refresh and retry. + tipHeight = -1 + break + } + if !c.HashMatch(hash32.T(block.PrevHash)) { + if height == c.GetFirstHeight() { + c.Sync() + Log.Info("Waiting for "+NodeName+" height to reach Sapling activation height ", + "(", c.GetFirstHeight(), ")...") + Time.Sleep(120 * time.Second) + break + } + Log.Info("REORG: dropping block ", height-1, " ", displayHash(c.GetLatestHash())) + c.Reorg(height - 1) + break + } + if err := c.Add(height, block); err != nil { + Log.Fatal("Cache add failed:", err) + } + // Don't log these too often. + if Time.Now().Sub(lastLog).Seconds() >= 4 { + lastLog = Time.Now() + Log.Info("Adding block to cache ", height, " ", displayHash(hash32.T(block.Hash))) + } + } + } +} + +// blockIngestorSerial is the original one-block-at-a-time ingestor, preserved +// verbatim for unit tests (rep != 0) and darkside mode, which depend on +// deterministic single-threaded RPC ordering. +func blockIngestorSerial(c *BlockCache, rep int) { lastLog := Time.Now() lastHeightLogged := 0 diff --git a/common/common_test.go b/common/common_test.go index e8ffcc64..7bab7286 100644 --- a/common/common_test.go +++ b/common/common_test.go @@ -13,6 +13,7 @@ import ( "math" "os" "strings" + "sync" "testing" "time" @@ -38,6 +39,7 @@ const ( testBlockid40 = "0000000000000000000000000000000000000000000000000000000000380640" testBlockid41 = "0000000000000000000000000000000000000000000000000000000000380641" testBlockid42 = "0000000000000000000000000000000000000000000000000000000000380642" + testBlockid43 = "0000000000000000000000000000000000000000000000000000000000380643" ) // TestMain does common setup that's shared across multiple tests @@ -412,6 +414,246 @@ func TestBlockIngestor(t *testing.T) { os.RemoveAll(unitTestPath) } +// ------------------------------------------ blockIngestorParallel() + +// parallelIngestStub drives blockIngestorParallel through catch-up, idle sync, +// a same-height tip reorg, a transient fetch error, a missing block (tip +// refresh), and a two-block walk-back after the backend's chain shrinks. +// +// Unlike the serial stubs, it can't assert one global call sequence: the batch +// getblock fetches arrive concurrently and in any order. So it replies based +// on the requested height, and sequences phases on the getblockchaininfo call +// count (gbci), which only the ingestor's single main loop issues. The phase +// is stable while fetchers run because the ingestor blocks in wg.Wait(). +// +// Schedule, by getblockchaininfo call: +// +// 1: tip=380643 -> fetches 380640-43 concurrently, commits all +// 2: tip=380643, hash match -> idle (synced), sleep +// 3: tip=380643, hash differs -> same-height reorg: drop and re-fetch 380643 +// 4: tip=380644 -> fetch 380644 fails (-32), retry gets -8 (nil), +// forcing a tip refresh +// 5: tip=380643, hash differs -> drop 380643; re-fetch gets -8 (nil) +// 6: tip=380642, hash differs -> chain shrank: drop 380642, re-fetch, commit +// 7: tip=380643 -> fetch 380643, commit (back to full height) +// 8+: tip=380643, hash match -> idle; signal the test to stop the ingestor +type parallelIngestStub struct { + mu sync.Mutex + gbci int // getblockchaininfo calls so far (the phase) + verboseReq map[string]int // per-height getblock-verbose request counts + rawReq map[string]int // per-hash getblock-raw request counts + syncedOnce sync.Once + synced chan struct{} // closed once the final phase is reached +} + +func (s *parallelIngestStub) rawRequest(method string, params []json.RawMessage) (json.RawMessage, error) { + s.mu.Lock() + defer s.mu.Unlock() + switch method { + case "getblockchaininfo": + return s.getblockchaininfo() + case "getblock": + var arg string + if err := json.Unmarshal(params[0], &arg); err != nil { + testT.Error("could not unmarshal getblock arg:", params[0]) + return nil, errors.New("bad arg") + } + return s.getblock(arg, string(params[1])) + } + testT.Error("unexpected method", method) + return nil, errors.New("unexpected method") +} + +func (s *parallelIngestStub) getblockchaininfo() (json.RawMessage, error) { + s.gbci++ + reply := func(tip int, hash string) (json.RawMessage, error) { + r, _ := json.Marshal(&ZcashdRpcReplyGetblockchaininfo{ + Blocks: tip, + BestBlockHash: hash, + }) + return r, nil + } + // Used where the ingestor doesn't consult the hash (next <= tip). + dummyHash := strings.Repeat("11", 32) + switch s.gbci { + case 1: + return reply(380643, dummyHash) + case 2: + // The whole batch must have been committed before the ingestor + // asks for the tip again, each block fetched exactly once. + if testcache.GetNextHeight() != 380644 { + testT.Error("batch not committed before tip refresh:", testcache.GetNextHeight()) + } + for _, h := range []string{"380640", "380641", "380642", "380643"} { + if s.verboseReq[h] != 1 { + testT.Error("unexpected catch-up fetch count for", h, ":", s.verboseReq[h]) + } + } + return reply(380643, displayHash(testcache.GetLatestHash())) + case 3: + // Same height, different hash: a reorg replaced the tip block. + return reply(380643, strings.Repeat("45", 32)) + case 4: + // The dropped tip must have been re-fetched and re-committed. + if s.verboseReq["380643"] != 2 { + testT.Error("tip not re-fetched after same-height reorg:", s.verboseReq["380643"]) + } + if testcache.GetNextHeight() != 380644 { + testT.Error("tip not re-committed after same-height reorg:", testcache.GetNextHeight()) + } + return reply(380644, dummyHash) + case 5: + return reply(380643, strings.Repeat("56", 32)) + case 6: + return reply(380642, strings.Repeat("56", 32)) + case 7: + return reply(380643, dummyHash) + default: // 8 and beyond: synced; idle until the test stops the ingestor + s.syncedOnce.Do(func() { close(s.synced) }) + return reply(380643, displayHash(testcache.GetLatestHash())) + } +} + +func (s *parallelIngestStub) getblock(arg string, verbose string) (json.RawMessage, error) { + fakeIds := map[string]string{ + "380640": testBlockid40, + "380641": testBlockid41, + "380642": testBlockid42, + "380643": testBlockid43, + } + rawBlocks := map[string]int{ + testBlockid40: 0, + testBlockid41: 1, + testBlockid42: 2, + testBlockid43: 3, + } + if id, ok := fakeIds[arg]; ok || arg == "380644" { + if verbose != "1" { + testT.Error("expected verbose getblock for height", arg) + } + s.verboseReq[arg]++ + n := s.verboseReq[arg] + phase := s.gbci + expected := false + switch arg { + case "380640", "380641": + expected = n == 1 && phase == 1 + case "380642": + // catch-up, then re-fetch after the chain-shrink walk-back + expected = n == 1 && phase == 1 || n == 2 && phase == 6 + case "380643": + switch { + case n == 1 && phase == 1, n == 2 && phase == 3, n == 4 && phase == 7: + expected = true + case n == 3 && phase == 5: + // the replacement tip isn't available yet + return nil, errors.New("-8: Block height out of range") + } + case "380644": + switch { + case n == 1 && phase == 4: + // transient failure; the ingestor logs, sleeps, retries + return nil, errors.New("-32: server busy") + case n == 2 && phase == 4: + // gone: forces a tip refresh + return nil, errors.New("-8: Block height out of range") + } + testT.Error("unexpected getblock 380644: request", n, "phase", phase) + return nil, errors.New("-8: Block height out of range") + } + if !expected { + testT.Error("unexpected getblock", arg, ": request", n, "phase", phase) + } + txids := "\"" + testTxid + "\"" + if arg == "380643" { + // this block has two transactions; getBlockFromRPC requires the + // verbose txid list to match + txids += ", \"" + testTxid + "\"" + } + return []byte("{\"Tx\": [" + txids + "], \"Hash\": \"" + id + "\"}"), nil + } + if i, ok := rawBlocks[arg]; ok { + if verbose != "0" { + testT.Error("expected raw getblock for hash", arg) + } + s.rawReq[arg]++ + return blocks[i], nil + } + testT.Error("unexpected getblock arg", arg) + return nil, errors.New("unexpected getblock arg") +} + +func TestBlockIngestorParallel(t *testing.T) { + testT = t + // Fewer workers than the batch of 4 exercises the semaphore. + t.Setenv("LWD_INGEST_WORKERS", "2") + stub := ¶llelIngestStub{ + verboseReq: make(map[string]int), + rawReq: make(map[string]int), + synced: make(chan struct{}), + } + RawRequest = stub.rawRequest + Time.Sleep = sleepStub + Time.Now = nowStub + os.RemoveAll(unitTestPath) + testcache = NewBlockCache(unitTestPath, unitTestChain, 380640, -1) + + done := make(chan struct{}) + go func() { + blockIngestorParallel(testcache) + close(done) + }() + + select { + case <-stub.synced: + case <-time.After(30 * time.Second): + t.Error("timeout waiting for ingestor to reach the synced phase") + } + select { + case stopIngestorChan <- struct{}{}: + case <-time.After(5 * time.Second): + t.Fatal("ingestor did not accept stop") + } + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("ingestor did not exit after stop") + } + + if got := testcache.GetNextHeight(); got != 380644 { + t.Error("unexpected nextBlock after ingest:", got) + } + stub.mu.Lock() + wantVerbose := map[string]int{ + "380640": 1, // catch-up only + "380641": 1, // catch-up only + "380642": 2, // catch-up + chain-shrink walk-back + "380643": 4, // catch-up + same-height reorg + unavailable + recovery + "380644": 2, // transient error + gone + } + for h, want := range wantVerbose { + if stub.verboseReq[h] != want { + t.Error("height", h, "verbose requests:", stub.verboseReq[h], "want", want) + } + } + wantRaw := map[string]int{ + testBlockid40: 1, + testBlockid41: 1, + testBlockid42: 2, + testBlockid43: 3, // every successful verbose fetch except the -8s + } + for id, want := range wantRaw { + if stub.rawReq[id] != want { + t.Error("hash", id, "raw requests:", stub.rawReq[id], "want", want) + } + } + stub.mu.Unlock() + + sleepCount = 0 + sleepDuration = 0 + os.RemoveAll(unitTestPath) +} + // ------------------------------------------ GetBlockRange() // There are four test blocks, 0..3 diff --git a/frontend/rpc_client.go b/frontend/rpc_client.go index 5aba112c..dfd0fc53 100644 --- a/frontend/rpc_client.go +++ b/frontend/rpc_client.go @@ -5,10 +5,17 @@ package frontend import ( + "bytes" + "encoding/base64" + "encoding/json" "errors" "fmt" + "io" "net" + "net/http" "path/filepath" + "sync/atomic" + "time" "github.com/BurntSushi/toml" "github.com/btcsuite/btcd/rpcclient" @@ -16,17 +23,14 @@ import ( ini "gopkg.in/ini.v1" ) -// NewZRPCFromConf reads the zcashd configuration file. -func NewZRPCFromConf(confPath string) (*rpcclient.Client, error) { - connCfg, err := connFromConf(confPath) - if err != nil { - return nil, err - } - return rpcclient.New(connCfg, nil) +// NewZRPCFromConf reads the zcashd configuration file and returns the parsed +// connection config. +func NewZRPCFromConf(confPath string) (*rpcclient.ConnConfig, error) { + return connFromConf(confPath) } // NewZRPCFromFlags gets zcashd rpc connection information from provided flags. -func NewZRPCFromFlags(opts *common.Options) (*rpcclient.Client, error) { +func NewZRPCFromFlags(opts *common.Options) (*rpcclient.ConnConfig, error) { // Connect to local Zcash RPC server using HTTP POST mode. connCfg := &rpcclient.ConnConfig{ Host: net.JoinHostPort(opts.RPCHost, opts.RPCPort), @@ -35,7 +39,97 @@ func NewZRPCFromFlags(opts *common.Options) (*rpcclient.Client, error) { HTTPPostMode: true, // Zcash only supports HTTP POST mode DisableTLS: true, // Zcash does not provide TLS by default } - return rpcclient.New(connCfg, nil) + return connCfg, nil +} + +// zrpcRequest / zrpcError / zrpcResponse mirror the JSON-RPC 1.0 wire format +// that zcashd/zebrad speak. +type zrpcRequest struct { + Jsonrpc string `json:"jsonrpc"` + ID uint64 `json:"id"` + Method string `json:"method"` + Params []json.RawMessage `json:"params"` +} + +// zrpcError matches btcjson.RPCError's Error() formatting (": ") +// so existing callers that string-match the code (e.g. "-8" for an unknown +// height in common.getBlockFromRPC) keep working unchanged. +type zrpcError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func (e *zrpcError) Error() string { return fmt.Sprintf("%d: %s", e.Code, e.Message) } + +type zrpcResponse struct { + Result json.RawMessage `json:"result"` + Error *zrpcError `json:"error"` +} + +// NewZcashRPCRawRequester returns a common.RawRequest-compatible function backed +// by a connection-pooled net/http client. Unlike the btcd rpcclient (which +// funnels every HTTP POST through a single goroutine, serializing all RPCs), +// this issues each request independently, so concurrent callers — notably the +// parallel block ingestor — actually achieve concurrency against the backend. +// maxConns bounds the connection pool to the backend. +func NewZcashRPCRawRequester(cfg *rpcclient.ConnConfig, maxConns int) func(string, []json.RawMessage) (json.RawMessage, error) { + scheme := "https" + if cfg.DisableTLS { + scheme = "http" + } + url := scheme + "://" + cfg.Host + "/" + auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(cfg.User+":"+cfg.Pass)) + if maxConns < 1 { + maxConns = 1 + } + transport := &http.Transport{ + MaxIdleConns: maxConns, + MaxIdleConnsPerHost: maxConns, + MaxConnsPerHost: maxConns, + IdleConnTimeout: 90 * time.Second, + } + httpClient := &http.Client{Transport: transport, Timeout: 120 * time.Second} + var idCounter uint64 + + return func(method string, params []json.RawMessage) (json.RawMessage, error) { + if params == nil { + params = []json.RawMessage{} + } + reqBody, err := json.Marshal(zrpcRequest{ + Jsonrpc: "1.0", + ID: atomic.AddUint64(&idCounter, 1), + Method: method, + Params: params, + }) + if err != nil { + return nil, err + } + httpReq, err := http.NewRequest("POST", url, bytes.NewReader(reqBody)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Authorization", auth) + + resp, err := httpClient.Do(httpReq) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var r zrpcResponse + if err := json.Unmarshal(body, &r); err != nil { + return nil, fmt.Errorf("error unmarshalling RPC response (HTTP %d): %w: %s", + resp.StatusCode, err, string(body)) + } + if r.Error != nil { + return nil, r.Error + } + return r.Result, nil + } } func connFromConf(confPath string) (*rpcclient.ConnConfig, error) {