Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
Expand All @@ -37,6 +38,22 @@ import (
var cfgFile string
var logger = logrus.New()

// ingestRPCPoolSize returns the max number of pooled HTTP connections to the
// backend RPC. It defaults to the ingest worker count (LWD_INGEST_WORKERS, 8)
// plus headroom for concurrent frontend gRPC traffic, and can be overridden
// directly with LWD_RPC_POOL.
func ingestRPCPoolSize() int {
workers := 8
if v, err := strconv.Atoi(os.Getenv("LWD_INGEST_WORKERS")); err == nil && v > 0 {
workers = v
}
pool := workers + 16
if v, err := strconv.Atoi(os.Getenv("LWD_RPC_POOL")); err == nil && v > 0 {
pool = v
}
return pool
}

// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "lightwalletd",
Expand Down Expand Up @@ -192,23 +209,27 @@ func startServer(opts *common.Options) error {
// of block streamer.

var chainName string
var rpcClient *rpcclient.Client
var connCfg *rpcclient.ConnConfig
var err error
if opts.Darkside {
chainName = "darkside"
} else {
if opts.RPCUser != "" && opts.RPCPassword != "" && opts.RPCHost != "" && opts.RPCPort != "" {
rpcClient, err = frontend.NewZRPCFromFlags(opts)
connCfg, err = frontend.NewZRPCFromFlags(opts)
} else {
rpcClient, err = frontend.NewZRPCFromConf(opts.ZcashConfPath)
connCfg, err = frontend.NewZRPCFromConf(opts.ZcashConfPath)
}
if err != nil {
common.Log.WithFields(logrus.Fields{
"error": err,
}).Fatal("setting up RPC connection to zebrad or zcashd")
}
// Indirect function for test mocking (so unit tests can talk to stub functions).
common.RawRequest = rpcClient.RawRequest
// zr8: use a connection-pooled, concurrent HTTP requester instead of the
// btcd rpcclient, which serializes all POSTs through a single goroutine and
// throttled the parallel block ingestor. Pool size tracks ingest concurrency
// with headroom for frontend gRPC traffic.
common.RawRequest = frontend.NewZcashRPCRawRequester(connCfg, ingestRPCPoolSize())

// Ensure that we can communicate with zcashd
common.FirstRPC()
Expand Down
177 changes: 177 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"slices"
"strconv"
"strings"
"sync"
"time"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -421,7 +423,182 @@ func stopIngestor() {

// BlockIngestor runs as a goroutine and polls zcashd for new blocks, adding them
// to the cache. The repetition count, rep, is nonzero only for unit-testing.
//
// zr7: in production (rep == 0, non-darkside) it uses a parallel prefetch path
// (blockIngestorParallel) that fetches a window of blocks concurrently and
// commits them to the cache in strict height order. During bulk catch-up this
// saturates the backend instead of being bottlenecked on serial per-block RPC
// latency. Unit tests and darkside mode require deterministic, single-threaded
// RPC ordering, so they keep the original serial path (blockIngestorSerial).
func BlockIngestor(c *BlockCache, rep int) {
if rep != 0 || DarksideEnabled {
blockIngestorSerial(c, rep)
return
}
blockIngestorParallel(c)
}

// ingestWindow is how many blocks ahead of the commit point we prefetch, and
// ingestWorkers is the max number of concurrent getblock RPCs. Both are tunable
// at runtime via env vars so fetch concurrency can be adjusted without a rebuild.
func ingestWindow() int {
if v, err := strconv.Atoi(os.Getenv("LWD_INGEST_WINDOW")); err == nil && v > 0 {
return v
}
return 64
}

func ingestWorkers() int {
if v, err := strconv.Atoi(os.Getenv("LWD_INGEST_WORKERS")); err == nil && v > 0 {
return v
}
return 8
}

// blockIngestorParallel fetches blocks [next, next+batch) concurrently and
// commits them to the cache in strict height order. The cache requires in-order
// Add(), so only the (latency-dominated) fetch+parse is parallelized; the commit
// stays sequential and performs the same prev-hash chain / reorg checks as the
// serial path. (#1)
//
// It also stops polling the backend's tip on every block: the tip height is
// refreshed only once we've caught up to the last known tip, eliminating one RPC
// round-trip per block during bulk sync. (#2)
//
// While caught up, the backend's tip hash (which getblockchaininfo already
// returns, so no extra RPC) is compared against the cache tip so that a reorg
// that replaces the tip block without advancing the height is detected within
// one poll interval, matching the serial path's getbestblockhash check. (#3)
func blockIngestorParallel(c *BlockCache) {
lastLog := Time.Now()
lastHeightLogged := 0
window := ingestWindow()
workers := ingestWorkers()
if workers > window {
workers = window
}
Log.Info("block ingestor: parallel prefetch enabled (workers=", workers, ", window=", window, ")")

tipHeight := -1 // last known backend tip height; -1 forces a refresh
var tipHash hash32.T // backend tip hash (big-endian); Nil if unknown

for {
// stop if requested
select {
case <-stopIngestorChan:
return
default:
}

next := c.GetNextHeight()

// (#2) Only ask the backend for its tip when we've reached the last
// known tip. During bulk catch-up this avoids an RPC per block.
if next > tipHeight {
ci, err := GetBlockChainInfo()
if err != nil {
Log.WithFields(logrus.Fields{
"error": err,
}).Warn("error " + NodeName + " getblockchaininfo rpc, will retry")
Time.Sleep(8 * time.Second)
continue
}
tipHeight = int(ci.Blocks)
if tipHash, err = hash32.Decode(ci.BestBlockHash); err != nil {
// No usable tip hash from this backend; fall back to
// height-only idle detection.
tipHash = hash32.Nil
}
}

if next > tipHeight {
// Caught up; nothing new.
// (#3) The backend's tip hash not matching ours means the tip was
// replaced by a reorg that didn't advance the height (or the chain
// shrank below our tip). Walk back one block, same as the serial
// path does when getbestblockhash stops matching; the prev-hash
// check on re-fetch walks back further if the reorg is deeper.
if latest := c.GetLatestHash(); latest != hash32.Nil &&
tipHash != hash32.Nil && tipHash != hash32.Reverse(latest) {
Log.Info("REORG: dropping block ", next-1, " ", displayHash(latest))
c.Reorg(next - 1)
continue
}
// Mirror the serial path's behaviour.
c.Sync()
if lastHeightLogged != next-1 {
lastHeightLogged = next - 1
Log.Info("Waiting for block: ", next)
}
Time.Sleep(2 * time.Second)
continue
}

// (#1) Fetch [next, next+batch) concurrently.
batch := tipHeight - next + 1
if batch > window {
batch = window
}
blocks := make([]*walletrpc.CompactBlock, batch)
errs := make([]error, batch)
sem := make(chan struct{}, workers)
var wg sync.WaitGroup
for j := 0; j < batch; j++ {
wg.Add(1)
sem <- struct{}{}
go func(j int) {
defer wg.Done()
defer func() { <-sem }()
blocks[j], errs[j] = getBlockFromRPC(next + j)
}(j)
}
wg.Wait()

// Commit in strict height order. Any error, missing block, or chain
// mismatch stops this batch; the outer loop re-evaluates from the
// (possibly rewound) nextBlock and re-fetches as needed.
for j := 0; j < batch; j++ {
height := next + j
if errs[j] != nil {
Log.Info("getblock ", height, " failed, will retry: ", errs[j])
Time.Sleep(8 * time.Second)
break
}
block := blocks[j]
if block == nil {
// Backend doesn't have this height yet (e.g. a reorg shrank the
// chain since we read the tip). Force a tip refresh and retry.
tipHeight = -1
break
}
if !c.HashMatch(hash32.T(block.PrevHash)) {
if height == c.GetFirstHeight() {
c.Sync()
Log.Info("Waiting for "+NodeName+" height to reach Sapling activation height ",
"(", c.GetFirstHeight(), ")...")
Time.Sleep(120 * time.Second)
break
}
Log.Info("REORG: dropping block ", height-1, " ", displayHash(c.GetLatestHash()))
c.Reorg(height - 1)
break
}
if err := c.Add(height, block); err != nil {
Log.Fatal("Cache add failed:", err)
}
// Don't log these too often.
if Time.Now().Sub(lastLog).Seconds() >= 4 {
lastLog = Time.Now()
Log.Info("Adding block to cache ", height, " ", displayHash(hash32.T(block.Hash)))
}
}
}
}

// blockIngestorSerial is the original one-block-at-a-time ingestor, preserved
// verbatim for unit tests (rep != 0) and darkside mode, which depend on
// deterministic single-threaded RPC ordering.
func blockIngestorSerial(c *BlockCache, rep int) {
lastLog := Time.Now()
lastHeightLogged := 0

Expand Down
Loading