diff --git a/cmd/root.go b/cmd/root.go index 9142301e..08e06b47 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -11,6 +11,7 @@ import ( "os" "os/signal" "path/filepath" + "strconv" "strings" "syscall" "time" @@ -37,6 +38,22 @@ import ( var cfgFile string var logger = logrus.New() +// ingestRPCPoolSize returns the max number of pooled HTTP connections to the +// backend RPC. It defaults to the ingest worker count (LWD_INGEST_WORKERS, 8) +// plus headroom for concurrent frontend gRPC traffic, and can be overridden +// directly with LWD_RPC_POOL. +func ingestRPCPoolSize() int { + workers := 8 + if v, err := strconv.Atoi(os.Getenv("LWD_INGEST_WORKERS")); err == nil && v > 0 { + workers = v + } + pool := workers + 16 + if v, err := strconv.Atoi(os.Getenv("LWD_RPC_POOL")); err == nil && v > 0 { + pool = v + } + return pool +} + // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ Use: "lightwalletd", @@ -192,15 +209,15 @@ func startServer(opts *common.Options) error { // of block streamer. var chainName string - var rpcClient *rpcclient.Client + var connCfg *rpcclient.ConnConfig var err error if opts.Darkside { chainName = "darkside" } else { if opts.RPCUser != "" && opts.RPCPassword != "" && opts.RPCHost != "" && opts.RPCPort != "" { - rpcClient, err = frontend.NewZRPCFromFlags(opts) + connCfg, err = frontend.NewZRPCFromFlags(opts) } else { - rpcClient, err = frontend.NewZRPCFromConf(opts.ZcashConfPath) + connCfg, err = frontend.NewZRPCFromConf(opts.ZcashConfPath) } if err != nil { common.Log.WithFields(logrus.Fields{ @@ -208,7 +225,11 @@ func startServer(opts *common.Options) error { }).Fatal("setting up RPC connection to zebrad or zcashd") } // Indirect function for test mocking (so unit tests can talk to stub functions). - common.RawRequest = rpcClient.RawRequest + // zr8: use a connection-pooled, concurrent HTTP requester instead of the + // btcd rpcclient, which serializes all POSTs through a single goroutine and + // throttled the parallel block ingestor. Pool size tracks ingest concurrency + // with headroom for frontend gRPC traffic. + common.RawRequest = frontend.NewZcashRPCRawRequester(connCfg, ingestRPCPoolSize()) // Ensure that we can communicate with zcashd common.FirstRPC() diff --git a/common/common.go b/common/common.go index b422810b..e2bc842c 100644 --- a/common/common.go +++ b/common/common.go @@ -10,9 +10,11 @@ import ( "encoding/json" "errors" "fmt" + "os" "slices" "strconv" "strings" + "sync" "time" "github.com/sirupsen/logrus" @@ -421,7 +423,159 @@ func stopIngestor() { // BlockIngestor runs as a goroutine and polls zcashd for new blocks, adding them // to the cache. The repetition count, rep, is nonzero only for unit-testing. +// +// zr7: in production (rep == 0, non-darkside) it uses a parallel prefetch path +// (blockIngestorParallel) that fetches a window of blocks concurrently and +// commits them to the cache in strict height order. During bulk catch-up this +// saturates the backend instead of being bottlenecked on serial per-block RPC +// latency. Unit tests and darkside mode require deterministic, single-threaded +// RPC ordering, so they keep the original serial path (blockIngestorSerial). func BlockIngestor(c *BlockCache, rep int) { + if rep != 0 || DarksideEnabled { + blockIngestorSerial(c, rep) + return + } + blockIngestorParallel(c) +} + +// ingestWindow is how many blocks ahead of the commit point we prefetch, and +// ingestWorkers is the max number of concurrent getblock RPCs. Both are tunable +// at runtime via env vars so fetch concurrency can be adjusted without a rebuild. +func ingestWindow() int { + if v, err := strconv.Atoi(os.Getenv("LWD_INGEST_WINDOW")); err == nil && v > 0 { + return v + } + return 64 +} + +func ingestWorkers() int { + if v, err := strconv.Atoi(os.Getenv("LWD_INGEST_WORKERS")); err == nil && v > 0 { + return v + } + return 8 +} + +// blockIngestorParallel fetches blocks [next, next+batch) concurrently and +// commits them to the cache in strict height order. The cache requires in-order +// Add(), so only the (latency-dominated) fetch+parse is parallelized; the commit +// stays sequential and performs the same prev-hash chain / reorg checks as the +// serial path. (#1) +// +// It also stops polling the backend's tip on every block: the tip height is +// refreshed only once we've caught up to the last known tip, eliminating one RPC +// round-trip per block during bulk sync. (#2) +func blockIngestorParallel(c *BlockCache) { + lastLog := Time.Now() + lastHeightLogged := 0 + window := ingestWindow() + workers := ingestWorkers() + if workers > window { + workers = window + } + Log.Info("block ingestor: parallel prefetch enabled (workers=", workers, ", window=", window, ")") + + tipHeight := -1 // last known backend tip height; -1 forces a refresh + + for { + // stop if requested + select { + case <-stopIngestorChan: + return + default: + } + + next := c.GetNextHeight() + + // (#2) Only ask the backend for its tip when we've reached the last + // known tip. During bulk catch-up this avoids an RPC per block. + if next > tipHeight { + ci, err := GetBlockChainInfo() + if err != nil { + Log.WithFields(logrus.Fields{ + "error": err, + }).Warn("error " + NodeName + " getblockchaininfo rpc, will retry") + Time.Sleep(8 * time.Second) + continue + } + tipHeight = int(ci.Blocks) + } + + if next > tipHeight { + // Caught up; nothing new. Mirror the serial path's behaviour. + c.Sync() + if lastHeightLogged != next-1 { + lastHeightLogged = next - 1 + Log.Info("Waiting for block: ", next) + } + Time.Sleep(2 * time.Second) + continue + } + + // (#1) Fetch [next, next+batch) concurrently. + batch := tipHeight - next + 1 + if batch > window { + batch = window + } + blocks := make([]*walletrpc.CompactBlock, batch) + errs := make([]error, batch) + sem := make(chan struct{}, workers) + var wg sync.WaitGroup + for j := 0; j < batch; j++ { + wg.Add(1) + sem <- struct{}{} + go func(j int) { + defer wg.Done() + defer func() { <-sem }() + blocks[j], errs[j] = getBlockFromRPC(next + j) + }(j) + } + wg.Wait() + + // Commit in strict height order. Any error, missing block, or chain + // mismatch stops this batch; the outer loop re-evaluates from the + // (possibly rewound) nextBlock and re-fetches as needed. + for j := 0; j < batch; j++ { + height := next + j + if errs[j] != nil { + Log.Info("getblock ", height, " failed, will retry: ", errs[j]) + Time.Sleep(8 * time.Second) + break + } + block := blocks[j] + if block == nil { + // Backend doesn't have this height yet (e.g. a reorg shrank the + // chain since we read the tip). Force a tip refresh and retry. + tipHeight = -1 + break + } + if !c.HashMatch(hash32.T(block.PrevHash)) { + if height == c.GetFirstHeight() { + c.Sync() + Log.Info("Waiting for "+NodeName+" height to reach Sapling activation height ", + "(", c.GetFirstHeight(), ")...") + Time.Sleep(120 * time.Second) + break + } + Log.Info("REORG: dropping block ", height-1, " ", displayHash(c.GetLatestHash())) + c.Reorg(height - 1) + break + } + if err := c.Add(height, block); err != nil { + Log.Fatal("Cache add failed:", err) + } + // Don't log these too often. + if Time.Now().Sub(lastLog).Seconds() >= 4 { + lastLog = Time.Now() + Log.Info("Adding block to cache ", height, " ", displayHash(hash32.T(block.Hash))) + } + } + } +} + +// blockIngestorSerial is the original one-block-at-a-time ingestor, preserved +// verbatim for unit tests (rep != 0) and darkside mode, which depend on +// deterministic single-threaded RPC ordering. +func blockIngestorSerial(c *BlockCache, rep int) { lastLog := Time.Now() lastHeightLogged := 0 diff --git a/frontend/rpc_client.go b/frontend/rpc_client.go index 5aba112c..dfd0fc53 100644 --- a/frontend/rpc_client.go +++ b/frontend/rpc_client.go @@ -5,10 +5,17 @@ package frontend import ( + "bytes" + "encoding/base64" + "encoding/json" "errors" "fmt" + "io" "net" + "net/http" "path/filepath" + "sync/atomic" + "time" "github.com/BurntSushi/toml" "github.com/btcsuite/btcd/rpcclient" @@ -16,17 +23,14 @@ import ( ini "gopkg.in/ini.v1" ) -// NewZRPCFromConf reads the zcashd configuration file. -func NewZRPCFromConf(confPath string) (*rpcclient.Client, error) { - connCfg, err := connFromConf(confPath) - if err != nil { - return nil, err - } - return rpcclient.New(connCfg, nil) +// NewZRPCFromConf reads the zcashd configuration file and returns the parsed +// connection config. +func NewZRPCFromConf(confPath string) (*rpcclient.ConnConfig, error) { + return connFromConf(confPath) } // NewZRPCFromFlags gets zcashd rpc connection information from provided flags. -func NewZRPCFromFlags(opts *common.Options) (*rpcclient.Client, error) { +func NewZRPCFromFlags(opts *common.Options) (*rpcclient.ConnConfig, error) { // Connect to local Zcash RPC server using HTTP POST mode. connCfg := &rpcclient.ConnConfig{ Host: net.JoinHostPort(opts.RPCHost, opts.RPCPort), @@ -35,7 +39,97 @@ func NewZRPCFromFlags(opts *common.Options) (*rpcclient.Client, error) { HTTPPostMode: true, // Zcash only supports HTTP POST mode DisableTLS: true, // Zcash does not provide TLS by default } - return rpcclient.New(connCfg, nil) + return connCfg, nil +} + +// zrpcRequest / zrpcError / zrpcResponse mirror the JSON-RPC 1.0 wire format +// that zcashd/zebrad speak. +type zrpcRequest struct { + Jsonrpc string `json:"jsonrpc"` + ID uint64 `json:"id"` + Method string `json:"method"` + Params []json.RawMessage `json:"params"` +} + +// zrpcError matches btcjson.RPCError's Error() formatting (": ") +// so existing callers that string-match the code (e.g. "-8" for an unknown +// height in common.getBlockFromRPC) keep working unchanged. +type zrpcError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func (e *zrpcError) Error() string { return fmt.Sprintf("%d: %s", e.Code, e.Message) } + +type zrpcResponse struct { + Result json.RawMessage `json:"result"` + Error *zrpcError `json:"error"` +} + +// NewZcashRPCRawRequester returns a common.RawRequest-compatible function backed +// by a connection-pooled net/http client. Unlike the btcd rpcclient (which +// funnels every HTTP POST through a single goroutine, serializing all RPCs), +// this issues each request independently, so concurrent callers — notably the +// parallel block ingestor — actually achieve concurrency against the backend. +// maxConns bounds the connection pool to the backend. +func NewZcashRPCRawRequester(cfg *rpcclient.ConnConfig, maxConns int) func(string, []json.RawMessage) (json.RawMessage, error) { + scheme := "https" + if cfg.DisableTLS { + scheme = "http" + } + url := scheme + "://" + cfg.Host + "/" + auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(cfg.User+":"+cfg.Pass)) + if maxConns < 1 { + maxConns = 1 + } + transport := &http.Transport{ + MaxIdleConns: maxConns, + MaxIdleConnsPerHost: maxConns, + MaxConnsPerHost: maxConns, + IdleConnTimeout: 90 * time.Second, + } + httpClient := &http.Client{Transport: transport, Timeout: 120 * time.Second} + var idCounter uint64 + + return func(method string, params []json.RawMessage) (json.RawMessage, error) { + if params == nil { + params = []json.RawMessage{} + } + reqBody, err := json.Marshal(zrpcRequest{ + Jsonrpc: "1.0", + ID: atomic.AddUint64(&idCounter, 1), + Method: method, + Params: params, + }) + if err != nil { + return nil, err + } + httpReq, err := http.NewRequest("POST", url, bytes.NewReader(reqBody)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Authorization", auth) + + resp, err := httpClient.Do(httpReq) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var r zrpcResponse + if err := json.Unmarshal(body, &r); err != nil { + return nil, fmt.Errorf("error unmarshalling RPC response (HTTP %d): %w: %s", + resp.StatusCode, err, string(body)) + } + if r.Error != nil { + return nil, r.Error + } + return r.Result, nil + } } func connFromConf(confPath string) (*rpcclient.ConnConfig, error) {