From 556e35ead008732c9d8c6cfe1ef5626786debad2 Mon Sep 17 00:00:00 2001 From: Blake Emerson Date: Thu, 11 Jun 2026 12:41:33 -0500 Subject: [PATCH 1/3] Parallel block ingest with connection-pooled RPC client Speeds up initial sync by fetching blocks concurrently instead of one at a time, and replacing the serializing btcd rpcclient with a connection-pooled net/http JSON-RPC client so the parallel workers can actually issue concurrent requests. --- cmd/root.go | 29 ++++++-- common/common.go | 154 +++++++++++++++++++++++++++++++++++++++++ frontend/rpc_client.go | 112 +++++++++++++++++++++++++++--- 3 files changed, 282 insertions(+), 13 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 9142301e..08e06b47 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -11,6 +11,7 @@ import ( "os" "os/signal" "path/filepath" + "strconv" "strings" "syscall" "time" @@ -37,6 +38,22 @@ import ( var cfgFile string var logger = logrus.New() +// ingestRPCPoolSize returns the max number of pooled HTTP connections to the +// backend RPC. It defaults to the ingest worker count (LWD_INGEST_WORKERS, 8) +// plus headroom for concurrent frontend gRPC traffic, and can be overridden +// directly with LWD_RPC_POOL. +func ingestRPCPoolSize() int { + workers := 8 + if v, err := strconv.Atoi(os.Getenv("LWD_INGEST_WORKERS")); err == nil && v > 0 { + workers = v + } + pool := workers + 16 + if v, err := strconv.Atoi(os.Getenv("LWD_RPC_POOL")); err == nil && v > 0 { + pool = v + } + return pool +} + // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ Use: "lightwalletd", @@ -192,15 +209,15 @@ func startServer(opts *common.Options) error { // of block streamer. var chainName string - var rpcClient *rpcclient.Client + var connCfg *rpcclient.ConnConfig var err error if opts.Darkside { chainName = "darkside" } else { if opts.RPCUser != "" && opts.RPCPassword != "" && opts.RPCHost != "" && opts.RPCPort != "" { - rpcClient, err = frontend.NewZRPCFromFlags(opts) + connCfg, err = frontend.NewZRPCFromFlags(opts) } else { - rpcClient, err = frontend.NewZRPCFromConf(opts.ZcashConfPath) + connCfg, err = frontend.NewZRPCFromConf(opts.ZcashConfPath) } if err != nil { common.Log.WithFields(logrus.Fields{ @@ -208,7 +225,11 @@ func startServer(opts *common.Options) error { }).Fatal("setting up RPC connection to zebrad or zcashd") } // Indirect function for test mocking (so unit tests can talk to stub functions). - common.RawRequest = rpcClient.RawRequest + // zr8: use a connection-pooled, concurrent HTTP requester instead of the + // btcd rpcclient, which serializes all POSTs through a single goroutine and + // throttled the parallel block ingestor. Pool size tracks ingest concurrency + // with headroom for frontend gRPC traffic. + common.RawRequest = frontend.NewZcashRPCRawRequester(connCfg, ingestRPCPoolSize()) // Ensure that we can communicate with zcashd common.FirstRPC() diff --git a/common/common.go b/common/common.go index b422810b..e2bc842c 100644 --- a/common/common.go +++ b/common/common.go @@ -10,9 +10,11 @@ import ( "encoding/json" "errors" "fmt" + "os" "slices" "strconv" "strings" + "sync" "time" "github.com/sirupsen/logrus" @@ -421,7 +423,159 @@ func stopIngestor() { // BlockIngestor runs as a goroutine and polls zcashd for new blocks, adding them // to the cache. The repetition count, rep, is nonzero only for unit-testing. +// +// zr7: in production (rep == 0, non-darkside) it uses a parallel prefetch path +// (blockIngestorParallel) that fetches a window of blocks concurrently and +// commits them to the cache in strict height order. During bulk catch-up this +// saturates the backend instead of being bottlenecked on serial per-block RPC +// latency. Unit tests and darkside mode require deterministic, single-threaded +// RPC ordering, so they keep the original serial path (blockIngestorSerial). func BlockIngestor(c *BlockCache, rep int) { + if rep != 0 || DarksideEnabled { + blockIngestorSerial(c, rep) + return + } + blockIngestorParallel(c) +} + +// ingestWindow is how many blocks ahead of the commit point we prefetch, and +// ingestWorkers is the max number of concurrent getblock RPCs. Both are tunable +// at runtime via env vars so fetch concurrency can be adjusted without a rebuild. +func ingestWindow() int { + if v, err := strconv.Atoi(os.Getenv("LWD_INGEST_WINDOW")); err == nil && v > 0 { + return v + } + return 64 +} + +func ingestWorkers() int { + if v, err := strconv.Atoi(os.Getenv("LWD_INGEST_WORKERS")); err == nil && v > 0 { + return v + } + return 8 +} + +// blockIngestorParallel fetches blocks [next, next+batch) concurrently and +// commits them to the cache in strict height order. The cache requires in-order +// Add(), so only the (latency-dominated) fetch+parse is parallelized; the commit +// stays sequential and performs the same prev-hash chain / reorg checks as the +// serial path. (#1) +// +// It also stops polling the backend's tip on every block: the tip height is +// refreshed only once we've caught up to the last known tip, eliminating one RPC +// round-trip per block during bulk sync. (#2) +func blockIngestorParallel(c *BlockCache) { + lastLog := Time.Now() + lastHeightLogged := 0 + window := ingestWindow() + workers := ingestWorkers() + if workers > window { + workers = window + } + Log.Info("block ingestor: parallel prefetch enabled (workers=", workers, ", window=", window, ")") + + tipHeight := -1 // last known backend tip height; -1 forces a refresh + + for { + // stop if requested + select { + case <-stopIngestorChan: + return + default: + } + + next := c.GetNextHeight() + + // (#2) Only ask the backend for its tip when we've reached the last + // known tip. During bulk catch-up this avoids an RPC per block. + if next > tipHeight { + ci, err := GetBlockChainInfo() + if err != nil { + Log.WithFields(logrus.Fields{ + "error": err, + }).Warn("error " + NodeName + " getblockchaininfo rpc, will retry") + Time.Sleep(8 * time.Second) + continue + } + tipHeight = int(ci.Blocks) + } + + if next > tipHeight { + // Caught up; nothing new. Mirror the serial path's behaviour. + c.Sync() + if lastHeightLogged != next-1 { + lastHeightLogged = next - 1 + Log.Info("Waiting for block: ", next) + } + Time.Sleep(2 * time.Second) + continue + } + + // (#1) Fetch [next, next+batch) concurrently. + batch := tipHeight - next + 1 + if batch > window { + batch = window + } + blocks := make([]*walletrpc.CompactBlock, batch) + errs := make([]error, batch) + sem := make(chan struct{}, workers) + var wg sync.WaitGroup + for j := 0; j < batch; j++ { + wg.Add(1) + sem <- struct{}{} + go func(j int) { + defer wg.Done() + defer func() { <-sem }() + blocks[j], errs[j] = getBlockFromRPC(next + j) + }(j) + } + wg.Wait() + + // Commit in strict height order. Any error, missing block, or chain + // mismatch stops this batch; the outer loop re-evaluates from the + // (possibly rewound) nextBlock and re-fetches as needed. + for j := 0; j < batch; j++ { + height := next + j + if errs[j] != nil { + Log.Info("getblock ", height, " failed, will retry: ", errs[j]) + Time.Sleep(8 * time.Second) + break + } + block := blocks[j] + if block == nil { + // Backend doesn't have this height yet (e.g. a reorg shrank the + // chain since we read the tip). Force a tip refresh and retry. + tipHeight = -1 + break + } + if !c.HashMatch(hash32.T(block.PrevHash)) { + if height == c.GetFirstHeight() { + c.Sync() + Log.Info("Waiting for "+NodeName+" height to reach Sapling activation height ", + "(", c.GetFirstHeight(), ")...") + Time.Sleep(120 * time.Second) + break + } + Log.Info("REORG: dropping block ", height-1, " ", displayHash(c.GetLatestHash())) + c.Reorg(height - 1) + break + } + if err := c.Add(height, block); err != nil { + Log.Fatal("Cache add failed:", err) + } + // Don't log these too often. + if Time.Now().Sub(lastLog).Seconds() >= 4 { + lastLog = Time.Now() + Log.Info("Adding block to cache ", height, " ", displayHash(hash32.T(block.Hash))) + } + } + } +} + +// blockIngestorSerial is the original one-block-at-a-time ingestor, preserved +// verbatim for unit tests (rep != 0) and darkside mode, which depend on +// deterministic single-threaded RPC ordering. +func blockIngestorSerial(c *BlockCache, rep int) { lastLog := Time.Now() lastHeightLogged := 0 diff --git a/frontend/rpc_client.go b/frontend/rpc_client.go index 5aba112c..dfd0fc53 100644 --- a/frontend/rpc_client.go +++ b/frontend/rpc_client.go @@ -5,10 +5,17 @@ package frontend import ( + "bytes" + "encoding/base64" + "encoding/json" "errors" "fmt" + "io" "net" + "net/http" "path/filepath" + "sync/atomic" + "time" "github.com/BurntSushi/toml" "github.com/btcsuite/btcd/rpcclient" @@ -16,17 +23,14 @@ import ( ini "gopkg.in/ini.v1" ) -// NewZRPCFromConf reads the zcashd configuration file. -func NewZRPCFromConf(confPath string) (*rpcclient.Client, error) { - connCfg, err := connFromConf(confPath) - if err != nil { - return nil, err - } - return rpcclient.New(connCfg, nil) +// NewZRPCFromConf reads the zcashd configuration file and returns the parsed +// connection config. +func NewZRPCFromConf(confPath string) (*rpcclient.ConnConfig, error) { + return connFromConf(confPath) } // NewZRPCFromFlags gets zcashd rpc connection information from provided flags. -func NewZRPCFromFlags(opts *common.Options) (*rpcclient.Client, error) { +func NewZRPCFromFlags(opts *common.Options) (*rpcclient.ConnConfig, error) { // Connect to local Zcash RPC server using HTTP POST mode. connCfg := &rpcclient.ConnConfig{ Host: net.JoinHostPort(opts.RPCHost, opts.RPCPort), @@ -35,7 +39,97 @@ func NewZRPCFromFlags(opts *common.Options) (*rpcclient.Client, error) { HTTPPostMode: true, // Zcash only supports HTTP POST mode DisableTLS: true, // Zcash does not provide TLS by default } - return rpcclient.New(connCfg, nil) + return connCfg, nil +} + +// zrpcRequest / zrpcError / zrpcResponse mirror the JSON-RPC 1.0 wire format +// that zcashd/zebrad speak. +type zrpcRequest struct { + Jsonrpc string `json:"jsonrpc"` + ID uint64 `json:"id"` + Method string `json:"method"` + Params []json.RawMessage `json:"params"` +} + +// zrpcError matches btcjson.RPCError's Error() formatting (": ") +// so existing callers that string-match the code (e.g. "-8" for an unknown +// height in common.getBlockFromRPC) keep working unchanged. +type zrpcError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func (e *zrpcError) Error() string { return fmt.Sprintf("%d: %s", e.Code, e.Message) } + +type zrpcResponse struct { + Result json.RawMessage `json:"result"` + Error *zrpcError `json:"error"` +} + +// NewZcashRPCRawRequester returns a common.RawRequest-compatible function backed +// by a connection-pooled net/http client. Unlike the btcd rpcclient (which +// funnels every HTTP POST through a single goroutine, serializing all RPCs), +// this issues each request independently, so concurrent callers — notably the +// parallel block ingestor — actually achieve concurrency against the backend. +// maxConns bounds the connection pool to the backend. +func NewZcashRPCRawRequester(cfg *rpcclient.ConnConfig, maxConns int) func(string, []json.RawMessage) (json.RawMessage, error) { + scheme := "https" + if cfg.DisableTLS { + scheme = "http" + } + url := scheme + "://" + cfg.Host + "/" + auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(cfg.User+":"+cfg.Pass)) + if maxConns < 1 { + maxConns = 1 + } + transport := &http.Transport{ + MaxIdleConns: maxConns, + MaxIdleConnsPerHost: maxConns, + MaxConnsPerHost: maxConns, + IdleConnTimeout: 90 * time.Second, + } + httpClient := &http.Client{Transport: transport, Timeout: 120 * time.Second} + var idCounter uint64 + + return func(method string, params []json.RawMessage) (json.RawMessage, error) { + if params == nil { + params = []json.RawMessage{} + } + reqBody, err := json.Marshal(zrpcRequest{ + Jsonrpc: "1.0", + ID: atomic.AddUint64(&idCounter, 1), + Method: method, + Params: params, + }) + if err != nil { + return nil, err + } + httpReq, err := http.NewRequest("POST", url, bytes.NewReader(reqBody)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Authorization", auth) + + resp, err := httpClient.Do(httpReq) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var r zrpcResponse + if err := json.Unmarshal(body, &r); err != nil { + return nil, fmt.Errorf("error unmarshalling RPC response (HTTP %d): %w: %s", + resp.StatusCode, err, string(body)) + } + if r.Error != nil { + return nil, r.Error + } + return r.Result, nil + } } func connFromConf(confPath string) (*rpcclient.ConnConfig, error) { From 43a3cc949e8664f4a968d3b03b22210a5f01d211 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Thu, 11 Jun 2026 16:49:07 -0500 Subject: [PATCH 2/3] Detect same-height tip reorgs while caught up The parallel ingestor's idle loop only compared the backend's tip height to the cache, so a reorg that replaced the tip block without advancing the height went unnoticed until the chain advanced (~75s), during which the orphaned tip was served to wallets. The serial path caught this within one 2s poll by comparing getbestblockhash to the cache tip hash. Compare the tip hash from the existing getblockchaininfo idle poll (bestblockhash field, so no extra RPC) against the cache tip and walk back one block on mismatch, mirroring the serial path; the prev-hash check on re-fetch walks back further if the reorg is deeper. This also unwinds promptly when the backend's chain shrinks below our tip instead of waiting for it to regrow. Co-Authored-By: Claude Fable 5 --- common/common.go | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/common/common.go b/common/common.go index e2bc842c..d1a7424a 100644 --- a/common/common.go +++ b/common/common.go @@ -464,6 +464,11 @@ func ingestWorkers() int { // It also stops polling the backend's tip on every block: the tip height is // refreshed only once we've caught up to the last known tip, eliminating one RPC // round-trip per block during bulk sync. (#2) +// +// While caught up, the backend's tip hash (which getblockchaininfo already +// returns, so no extra RPC) is compared against the cache tip so that a reorg +// that replaces the tip block without advancing the height is detected within +// one poll interval, matching the serial path's getbestblockhash check. (#3) func blockIngestorParallel(c *BlockCache) { lastLog := Time.Now() lastHeightLogged := 0 @@ -474,7 +479,8 @@ func blockIngestorParallel(c *BlockCache) { } Log.Info("block ingestor: parallel prefetch enabled (workers=", workers, ", window=", window, ")") - tipHeight := -1 // last known backend tip height; -1 forces a refresh + tipHeight := -1 // last known backend tip height; -1 forces a refresh + var tipHash hash32.T // backend tip hash (big-endian); Nil if unknown for { // stop if requested @@ -498,10 +504,27 @@ func blockIngestorParallel(c *BlockCache) { continue } tipHeight = int(ci.Blocks) + if tipHash, err = hash32.Decode(ci.BestBlockHash); err != nil { + // No usable tip hash from this backend; fall back to + // height-only idle detection. + tipHash = hash32.Nil + } } if next > tipHeight { - // Caught up; nothing new. Mirror the serial path's behaviour. + // Caught up; nothing new. + // (#3) The backend's tip hash not matching ours means the tip was + // replaced by a reorg that didn't advance the height (or the chain + // shrank below our tip). Walk back one block, same as the serial + // path does when getbestblockhash stops matching; the prev-hash + // check on re-fetch walks back further if the reorg is deeper. + if latest := c.GetLatestHash(); latest != hash32.Nil && + tipHash != hash32.Nil && tipHash != hash32.Reverse(latest) { + Log.Info("REORG: dropping block ", next-1, " ", displayHash(latest)) + c.Reorg(next - 1) + continue + } + // Mirror the serial path's behaviour. c.Sync() if lastHeightLogged != next-1 { lastHeightLogged = next - 1 From 240081bb259c2ab8f3b7938c3f923da04fd21cb0 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Fri, 12 Jun 2026 05:46:00 -0500 Subject: [PATCH 3/3] Add unit test coverage for the parallel block ingestor The test harness previously only exercised the serial path (rep != 0). Drive blockIngestorParallel directly through catch-up, idle sync, a same-height tip reorg, a transient fetch error, a missing block forcing a tip refresh, and a two-block walk-back after the backend's chain shrinks, then stop it via stopIngestorChan. Because batch fetches run concurrently and complete in any order, the stub replies by requested height under a mutex instead of asserting a global call sequence, and keys its phases off the getblockchaininfo call count, which only the ingestor's main loop issues. Workers are capped at 2 (below the batch of 4) to exercise the fetch semaphore. Verified the test fails against the pre-fix idle loop (height-only tip comparison) and passes with it; whole suite passes under -race. Co-Authored-By: Claude Fable 5 --- common/common_test.go | 242 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 242 insertions(+) diff --git a/common/common_test.go b/common/common_test.go index e8ffcc64..7bab7286 100644 --- a/common/common_test.go +++ b/common/common_test.go @@ -13,6 +13,7 @@ import ( "math" "os" "strings" + "sync" "testing" "time" @@ -38,6 +39,7 @@ const ( testBlockid40 = "0000000000000000000000000000000000000000000000000000000000380640" testBlockid41 = "0000000000000000000000000000000000000000000000000000000000380641" testBlockid42 = "0000000000000000000000000000000000000000000000000000000000380642" + testBlockid43 = "0000000000000000000000000000000000000000000000000000000000380643" ) // TestMain does common setup that's shared across multiple tests @@ -412,6 +414,246 @@ func TestBlockIngestor(t *testing.T) { os.RemoveAll(unitTestPath) } +// ------------------------------------------ blockIngestorParallel() + +// parallelIngestStub drives blockIngestorParallel through catch-up, idle sync, +// a same-height tip reorg, a transient fetch error, a missing block (tip +// refresh), and a two-block walk-back after the backend's chain shrinks. +// +// Unlike the serial stubs, it can't assert one global call sequence: the batch +// getblock fetches arrive concurrently and in any order. So it replies based +// on the requested height, and sequences phases on the getblockchaininfo call +// count (gbci), which only the ingestor's single main loop issues. The phase +// is stable while fetchers run because the ingestor blocks in wg.Wait(). +// +// Schedule, by getblockchaininfo call: +// +// 1: tip=380643 -> fetches 380640-43 concurrently, commits all +// 2: tip=380643, hash match -> idle (synced), sleep +// 3: tip=380643, hash differs -> same-height reorg: drop and re-fetch 380643 +// 4: tip=380644 -> fetch 380644 fails (-32), retry gets -8 (nil), +// forcing a tip refresh +// 5: tip=380643, hash differs -> drop 380643; re-fetch gets -8 (nil) +// 6: tip=380642, hash differs -> chain shrank: drop 380642, re-fetch, commit +// 7: tip=380643 -> fetch 380643, commit (back to full height) +// 8+: tip=380643, hash match -> idle; signal the test to stop the ingestor +type parallelIngestStub struct { + mu sync.Mutex + gbci int // getblockchaininfo calls so far (the phase) + verboseReq map[string]int // per-height getblock-verbose request counts + rawReq map[string]int // per-hash getblock-raw request counts + syncedOnce sync.Once + synced chan struct{} // closed once the final phase is reached +} + +func (s *parallelIngestStub) rawRequest(method string, params []json.RawMessage) (json.RawMessage, error) { + s.mu.Lock() + defer s.mu.Unlock() + switch method { + case "getblockchaininfo": + return s.getblockchaininfo() + case "getblock": + var arg string + if err := json.Unmarshal(params[0], &arg); err != nil { + testT.Error("could not unmarshal getblock arg:", params[0]) + return nil, errors.New("bad arg") + } + return s.getblock(arg, string(params[1])) + } + testT.Error("unexpected method", method) + return nil, errors.New("unexpected method") +} + +func (s *parallelIngestStub) getblockchaininfo() (json.RawMessage, error) { + s.gbci++ + reply := func(tip int, hash string) (json.RawMessage, error) { + r, _ := json.Marshal(&ZcashdRpcReplyGetblockchaininfo{ + Blocks: tip, + BestBlockHash: hash, + }) + return r, nil + } + // Used where the ingestor doesn't consult the hash (next <= tip). + dummyHash := strings.Repeat("11", 32) + switch s.gbci { + case 1: + return reply(380643, dummyHash) + case 2: + // The whole batch must have been committed before the ingestor + // asks for the tip again, each block fetched exactly once. + if testcache.GetNextHeight() != 380644 { + testT.Error("batch not committed before tip refresh:", testcache.GetNextHeight()) + } + for _, h := range []string{"380640", "380641", "380642", "380643"} { + if s.verboseReq[h] != 1 { + testT.Error("unexpected catch-up fetch count for", h, ":", s.verboseReq[h]) + } + } + return reply(380643, displayHash(testcache.GetLatestHash())) + case 3: + // Same height, different hash: a reorg replaced the tip block. + return reply(380643, strings.Repeat("45", 32)) + case 4: + // The dropped tip must have been re-fetched and re-committed. + if s.verboseReq["380643"] != 2 { + testT.Error("tip not re-fetched after same-height reorg:", s.verboseReq["380643"]) + } + if testcache.GetNextHeight() != 380644 { + testT.Error("tip not re-committed after same-height reorg:", testcache.GetNextHeight()) + } + return reply(380644, dummyHash) + case 5: + return reply(380643, strings.Repeat("56", 32)) + case 6: + return reply(380642, strings.Repeat("56", 32)) + case 7: + return reply(380643, dummyHash) + default: // 8 and beyond: synced; idle until the test stops the ingestor + s.syncedOnce.Do(func() { close(s.synced) }) + return reply(380643, displayHash(testcache.GetLatestHash())) + } +} + +func (s *parallelIngestStub) getblock(arg string, verbose string) (json.RawMessage, error) { + fakeIds := map[string]string{ + "380640": testBlockid40, + "380641": testBlockid41, + "380642": testBlockid42, + "380643": testBlockid43, + } + rawBlocks := map[string]int{ + testBlockid40: 0, + testBlockid41: 1, + testBlockid42: 2, + testBlockid43: 3, + } + if id, ok := fakeIds[arg]; ok || arg == "380644" { + if verbose != "1" { + testT.Error("expected verbose getblock for height", arg) + } + s.verboseReq[arg]++ + n := s.verboseReq[arg] + phase := s.gbci + expected := false + switch arg { + case "380640", "380641": + expected = n == 1 && phase == 1 + case "380642": + // catch-up, then re-fetch after the chain-shrink walk-back + expected = n == 1 && phase == 1 || n == 2 && phase == 6 + case "380643": + switch { + case n == 1 && phase == 1, n == 2 && phase == 3, n == 4 && phase == 7: + expected = true + case n == 3 && phase == 5: + // the replacement tip isn't available yet + return nil, errors.New("-8: Block height out of range") + } + case "380644": + switch { + case n == 1 && phase == 4: + // transient failure; the ingestor logs, sleeps, retries + return nil, errors.New("-32: server busy") + case n == 2 && phase == 4: + // gone: forces a tip refresh + return nil, errors.New("-8: Block height out of range") + } + testT.Error("unexpected getblock 380644: request", n, "phase", phase) + return nil, errors.New("-8: Block height out of range") + } + if !expected { + testT.Error("unexpected getblock", arg, ": request", n, "phase", phase) + } + txids := "\"" + testTxid + "\"" + if arg == "380643" { + // this block has two transactions; getBlockFromRPC requires the + // verbose txid list to match + txids += ", \"" + testTxid + "\"" + } + return []byte("{\"Tx\": [" + txids + "], \"Hash\": \"" + id + "\"}"), nil + } + if i, ok := rawBlocks[arg]; ok { + if verbose != "0" { + testT.Error("expected raw getblock for hash", arg) + } + s.rawReq[arg]++ + return blocks[i], nil + } + testT.Error("unexpected getblock arg", arg) + return nil, errors.New("unexpected getblock arg") +} + +func TestBlockIngestorParallel(t *testing.T) { + testT = t + // Fewer workers than the batch of 4 exercises the semaphore. + t.Setenv("LWD_INGEST_WORKERS", "2") + stub := ¶llelIngestStub{ + verboseReq: make(map[string]int), + rawReq: make(map[string]int), + synced: make(chan struct{}), + } + RawRequest = stub.rawRequest + Time.Sleep = sleepStub + Time.Now = nowStub + os.RemoveAll(unitTestPath) + testcache = NewBlockCache(unitTestPath, unitTestChain, 380640, -1) + + done := make(chan struct{}) + go func() { + blockIngestorParallel(testcache) + close(done) + }() + + select { + case <-stub.synced: + case <-time.After(30 * time.Second): + t.Error("timeout waiting for ingestor to reach the synced phase") + } + select { + case stopIngestorChan <- struct{}{}: + case <-time.After(5 * time.Second): + t.Fatal("ingestor did not accept stop") + } + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("ingestor did not exit after stop") + } + + if got := testcache.GetNextHeight(); got != 380644 { + t.Error("unexpected nextBlock after ingest:", got) + } + stub.mu.Lock() + wantVerbose := map[string]int{ + "380640": 1, // catch-up only + "380641": 1, // catch-up only + "380642": 2, // catch-up + chain-shrink walk-back + "380643": 4, // catch-up + same-height reorg + unavailable + recovery + "380644": 2, // transient error + gone + } + for h, want := range wantVerbose { + if stub.verboseReq[h] != want { + t.Error("height", h, "verbose requests:", stub.verboseReq[h], "want", want) + } + } + wantRaw := map[string]int{ + testBlockid40: 1, + testBlockid41: 1, + testBlockid42: 2, + testBlockid43: 3, // every successful verbose fetch except the -8s + } + for id, want := range wantRaw { + if stub.rawReq[id] != want { + t.Error("hash", id, "raw requests:", stub.rawReq[id], "want", want) + } + } + stub.mu.Unlock() + + sleepCount = 0 + sleepDuration = 0 + os.RemoveAll(unitTestPath) +} + // ------------------------------------------ GetBlockRange() // There are four test blocks, 0..3