Skip to content

Commit b6db210

Browse files
committed
feat: update lassie to sync Retriever
* Retriever#Retrieve() calls are now synchronous, so we get to wait for the direct return value and error synchronously * Change the AwaitGet call order and make it cancellable * Make the provider context-cancel aware for cleaner shutdown * Other minor fixes and adaptions to the new Lassie code
1 parent 4b4dcae commit b6db210

File tree

7 files changed

+116
-182
lines changed

7 files changed

+116
-182
lines changed

autoretrieve.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ import (
2424
"github.com/application-research/autoretrieve/paychannelmanager"
2525
lassieclient "github.com/filecoin-project/lassie/pkg/client"
2626
lassieeventrecorder "github.com/filecoin-project/lassie/pkg/eventrecorder"
27+
"github.com/filecoin-project/lassie/pkg/indexerlookup"
2728
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
2829
rpcstmgr "github.com/filecoin-project/lotus/chain/stmgr/rpc"
2930
"github.com/filecoin-project/lotus/chain/wallet"
3031
lcli "github.com/filecoin-project/lotus/cli"
3132
"github.com/filecoin-project/lotus/paychmgr"
32-
"github.com/ipfs/go-cid"
3333
ipfsdatastore "github.com/ipfs/go-datastore"
3434
"github.com/ipfs/go-datastore/namespace"
3535
flatfs "github.com/ipfs/go-ds-flatfs"
@@ -154,27 +154,23 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) {
154154
// Initialize Filecoin retriever
155155
var retriever *lassieretriever.Retriever
156156
if !cfg.DisableRetrieval {
157-
var ep lassieretriever.Endpoint
157+
var candidateFinder lassieretriever.CandidateFinder
158158
switch cfg.LookupEndpointType {
159159
case EndpointTypeEstuary:
160-
logger.Infof("Using Estuary endpoint type")
161-
ep = endpoint.NewEstuaryEndpoint(cfg.LookupEndpointURL, minerPeerGetter)
160+
logger.Infof("Using Estuary candidate finder type")
161+
candidateFinder = endpoint.NewEstuaryEndpoint(cfg.LookupEndpointURL, minerPeerGetter)
162162
case EndpointTypeIndexer:
163-
logger.Infof("Using indexer endpoint type")
164-
ep = endpoint.NewIndexerEndpoint(cfg.LookupEndpointURL)
163+
logger.Infof("Using indexer candidate finder type")
164+
candidateFinder = indexerlookup.NewCandidateFinder(cfg.LookupEndpointURL)
165165
default:
166-
return nil, errors.New("unrecognized endpoint type")
166+
return nil, errors.New("unrecognized candidate finder type")
167167
}
168168

169169
retrieverCfg, err := cfg.ExtractFilecoinRetrieverConfig(cctx.Context, minerPeerGetter)
170170
if err != nil {
171171
return nil, err
172172
}
173173

174-
confirmer := func(c cid.Cid) (bool, error) {
175-
return blockManager.Has(cctx.Context, c)
176-
}
177-
178174
// Instantiate client
179175
retrievalClient, err := lassieclient.NewClient(
180176
blockstore,
@@ -190,17 +186,19 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) {
190186
return nil, err
191187
}
192188

193-
retriever, err = lassieretriever.NewRetriever(cctx.Context, retrieverCfg, retrievalClient, ep, confirmer)
189+
retriever, err = lassieretriever.NewRetriever(cctx.Context, retrieverCfg, retrievalClient, candidateFinder)
194190
if err != nil {
195191
return nil, err
196192
}
193+
<-retriever.Start()
197194
if cfg.EventRecorderEndpointURL != "" {
198195
logger.Infof("Reporting retrieval events to %v", cfg.EventRecorderEndpointURL)
199196
eventRecorderEndpointAuthorization, err := loadEventRecorderAuth(dataDirPath(cctx))
200197
if err != nil {
201198
return nil, err
202199
}
203-
retriever.RegisterListener(lassieeventrecorder.NewEventRecorder(cctx.Context, cfg.InstanceId, cfg.EventRecorderEndpointURL, eventRecorderEndpointAuthorization))
200+
eventRecorder := lassieeventrecorder.NewEventRecorder(cctx.Context, cfg.InstanceId, cfg.EventRecorderEndpointURL, eventRecorderEndpointAuthorization)
201+
retriever.RegisterSubscriber(eventRecorder.RecordEvent)
204202
}
205203
}
206204

bitswap/provider.go

Lines changed: 55 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77

88
"github.com/application-research/autoretrieve/blocks"
99
"github.com/application-research/autoretrieve/metrics"
10+
"github.com/dustin/go-humanize"
1011
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
12+
"github.com/filecoin-project/lassie/pkg/types"
1113
"github.com/ipfs/go-bitswap/message"
1214
bitswap_message_pb "github.com/ipfs/go-bitswap/message/pb"
1315
"github.com/ipfs/go-bitswap/network"
@@ -133,15 +135,15 @@ func NewProvider(
133135
provider.network.Start(provider)
134136

135137
for i := 0; i < int(config.RequestWorkers); i++ {
136-
go provider.handleRequests()
138+
go provider.handleRequests(ctx)
137139
}
138140

139141
for i := 0; i < int(config.ResponseWorkers); i++ {
140-
go provider.handleResponses()
142+
go provider.handleResponses(ctx)
141143
}
142144

143145
for i := 0; i < int(config.RetrievalWorkers); i++ {
144-
go provider.handleRetrievals()
146+
go provider.handleRetrievals(ctx)
145147
}
146148

147149
return provider, nil
@@ -160,10 +162,8 @@ func (provider *Provider) ReceiveMessage(ctx context.Context, sender peer.ID, in
160162
provider.requestQueue.PushTasks(sender, tasks...)
161163
}
162164

163-
func (provider *Provider) handleRequests() {
164-
ctx := context.Background()
165-
166-
for {
165+
func (provider *Provider) handleRequests(ctx context.Context) {
166+
for ctx.Err() == nil {
167167
peerID, tasks, _ := provider.requestQueue.PopTasks(100)
168168
if len(tasks) == 0 {
169169
time.Sleep(time.Millisecond * 250)
@@ -256,10 +256,8 @@ func (provider *Provider) handleRequest(
256256
return nil
257257
}
258258

259-
func (provider *Provider) handleResponses() {
260-
ctx := context.Background()
261-
262-
for {
259+
func (provider *Provider) handleResponses(ctx context.Context) {
260+
for ctx.Err() == nil {
263261
peerID, tasks, _ := provider.responseQueue.PopTasks(targetMessageSize)
264262
if len(tasks) == 0 {
265263
time.Sleep(time.Millisecond * 250)
@@ -291,15 +289,15 @@ func (provider *Provider) handleResponses() {
291289
log.Debugf("Sending have for %s", cid)
292290

293291
// Response metric
294-
ctx, _ = tag.New(ctx, tag.Insert(metrics.BitswapTopic, "HAVE"))
295-
stats.Record(ctx, metrics.BitswapResponseCount.M(1))
292+
taggedCtx, _ := tag.New(ctx, tag.Insert(metrics.BitswapTopic, "HAVE"))
293+
stats.Record(taggedCtx, metrics.BitswapResponseCount.M(1))
296294
case actionSendDontHave:
297295
msg.AddDontHave(cid)
298296
log.Debugf("Sending dont have for %s", cid)
299297

300298
// Response metric
301-
ctx, _ = tag.New(ctx, tag.Insert(metrics.BitswapTopic, "DONT_HAVE"), tag.Insert(metrics.BitswapDontHaveReason, data.reason))
302-
stats.Record(ctx, metrics.BitswapResponseCount.M(1))
299+
taggedCtx, _ := tag.New(ctx, tag.Insert(metrics.BitswapTopic, "DONT_HAVE"), tag.Insert(metrics.BitswapDontHaveReason, data.reason))
300+
stats.Record(taggedCtx, metrics.BitswapResponseCount.M(1))
303301
case actionSendBlock:
304302
block, err := provider.blockManager.Get(ctx, cid)
305303
if err != nil {
@@ -310,8 +308,8 @@ func (provider *Provider) handleResponses() {
310308
log.Debugf("Sending block for %s", cid)
311309

312310
// Response metric
313-
ctx, _ = tag.New(ctx, tag.Insert(metrics.BitswapTopic, "BLOCK"))
314-
stats.Record(ctx, metrics.BitswapResponseCount.M(1))
311+
taggedCtx, _ := tag.New(ctx, tag.Insert(metrics.BitswapTopic, "BLOCK"))
312+
stats.Record(taggedCtx, metrics.BitswapResponseCount.M(1))
315313
}
316314
}
317315

@@ -325,10 +323,8 @@ func (provider *Provider) handleResponses() {
325323
}
326324
}
327325

328-
func (provider *Provider) handleRetrievals() {
329-
ctx := context.Background()
330-
331-
for {
326+
func (provider *Provider) handleRetrievals(ctx context.Context) {
327+
for ctx.Err() == nil {
332328
peerID, tasks, _ := provider.retrievalQueue.PopTasks(1)
333329
if len(tasks) == 0 {
334330
time.Sleep(time.Millisecond * 250)
@@ -344,38 +340,52 @@ func (provider *Provider) handleRetrievals() {
344340
continue
345341
}
346342

347-
log.Debugf("Requesting retrieval for %s", cid)
343+
retrievalId, err := types.NewRetrievalID()
344+
if err != nil {
345+
log.Errorf("Failed to create retrieval ID: %s", err.Error())
346+
}
347+
348+
log.Debugf("Starting retrieval for %s (%s)", cid, retrievalId)
349+
350+
// Start a background blockstore fetch with a callback to send the block
351+
// to the peer once it's available.
352+
blockCtx, blockCancel := context.WithCancel(ctx)
353+
if provider.blockManager.AwaitBlock(blockCtx, cid, func(block blocks.Block, err error) {
354+
if err != nil {
355+
log.Debugf("Async block load failed: %s", err)
356+
provider.queueSendDontHave(peerID, task.Priority, cid, "failed_block_load")
357+
} else {
358+
log.Debugf("Async block load completed: %s", cid)
359+
provider.queueSendBlock(peerID, task.Priority, cid, block.Size)
360+
}
361+
blockCancel()
362+
}) {
363+
// If the block was already in the blockstore then we don't need to
364+
// start a retrieval.
365+
continue
366+
}
348367

349368
// Try to start a new retrieval (if it's already running then no
350369
// need to error, just continue on to await block)
351-
if err := provider.retriever.Request(cid); err != nil {
352-
if !errors.As(err, &lassieretriever.ErrRetrievalAlreadyRunning{}) {
353-
if errors.Is(err, lassieretriever.ErrNoCandidates) {
354-
// Just do a debug print if there were no candidates because this happens a lot
355-
log.Debugf("No candidates for %s", cid)
356-
} else {
357-
// Otherwise, there was a real failure, print with more importance
358-
log.Errorf("Request for %s failed: %v", cid, err)
359-
}
360-
} else {
370+
result, err := provider.retriever.Retrieve(ctx, retrievalId, cid)
371+
if err != nil {
372+
if errors.Is(err, lassieretriever.ErrRetrievalAlreadyRunning) {
361373
log.Debugf("Retrieval already running for %s, no new one will be started", cid)
374+
continue // Don't send dont_have or run blockCancel(), let it async load
375+
} else if errors.Is(err, lassieretriever.ErrNoCandidates) {
376+
// Just do a debug print if there were no candidates because this happens a lot
377+
log.Debugf("No candidates for %s (%s)", cid, retrievalId)
378+
provider.queueSendDontHave(peerID, task.Priority, cid, "no_candidates")
379+
} else {
380+
// Otherwise, there was a real failure, print with more importance
381+
log.Errorf("Retrieval for %s (%s) failed: %v", cid, retrievalId, err)
382+
provider.queueSendDontHave(peerID, task.Priority, cid, "retrieval_failed")
362383
}
363384
} else {
364-
log.Infof("Started retrieval for %s", cid)
385+
log.Infof("Retrieval for %s (%s) completed (duration: %s, bytes: %s, blocks: %d)", cid, retrievalId, result.Duration, humanize.IBytes(result.Size), result.Blocks)
365386
}
366387

367-
// TODO: if retriever.Request() is changed to be blocking, make
368-
// blockManager.AwaitBlock() cancellable and cancel it after the
369-
// request finishes if there's an error
370-
provider.blockManager.AwaitBlock(ctx, cid, func(block blocks.Block, err error) {
371-
if err != nil {
372-
log.Debugf("Async block load failed: %s", err)
373-
provider.queueSendDontHave(peerID, task.Priority, block.Cid, "failed_block_load")
374-
} else {
375-
log.Debugf("Async block load completed: %s", block.Cid)
376-
provider.queueSendBlock(peerID, task.Priority, block.Cid, block.Size)
377-
}
378-
})
388+
blockCancel()
379389
}
380390

381391
provider.retrievalQueue.TasksDone(peerID, tasks...)

blocks/manager.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type Manager struct {
3333
}
3434

3535
type waitListEntry struct {
36+
ctx context.Context
3637
callback func(Block, error)
3738
registeredAt time.Time
3839
}
@@ -55,7 +56,13 @@ func NewManager(inner blockstore.Blockstore, getAwaitTimeout time.Duration) *Man
5556
return mgr
5657
}
5758

58-
func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(Block, error)) {
59+
// AwaitBlock will wait for a block to be added to the blockstore and then
60+
// call the callback with the block. If the block is already in the blockstore,
61+
// the callback will be called immediately. If the block is not in the blockstore
62+
// or the context is cancelled, the callback will not be called.
63+
// Returns true if the block was already in the blockstore, allowing the
64+
// callback to be called, or false otherwise.
65+
func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(Block, error)) bool {
5966
// We need to lock the blockstore here to make sure the requested block
6067
// doesn't get added while being added to the waitlist
6168
mgr.waitListLk.Lock()
@@ -68,22 +75,25 @@ func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(B
6875
if !ipld.IsNotFound(err) {
6976
mgr.waitListLk.Unlock()
7077
callback(Block{}, err)
71-
return
78+
return false
7279
}
7380

7481
mgr.waitList[cid] = append(mgr.waitList[cid], waitListEntry{
82+
ctx: ctx,
7583
callback: callback,
7684
registeredAt: time.Now(),
7785
})
7886

7987
mgr.waitListLk.Unlock()
80-
return
88+
return false
8189
}
8290

8391
mgr.waitListLk.Unlock()
8492

85-
// Otherwise, we can immediately run the callback
93+
// Otherwise, we can immediately run the callback and notify the caller of
94+
// success
8695
callback(Block{cid, size}, nil)
96+
return true
8797
}
8898

8999
func (mgr *Manager) Put(ctx context.Context, block blocks.Block) error {
@@ -149,20 +159,22 @@ func (mgr *Manager) startPollCleanup() {
149159
for cid := range mgr.waitList {
150160
// For each element in the slice for this CID...
151161
for i := 0; i < len(mgr.waitList[cid]); i++ {
152-
// ...check if it's timed out...
153-
if time.Since(mgr.waitList[cid][i].registeredAt) > mgr.getAwaitTimeout {
162+
// ...check whether the waiter context was cancelled or it's been in the
163+
// list too long...
164+
if mgr.waitList[cid][i].ctx.Err() != nil || time.Since(mgr.waitList[cid][i].registeredAt) > mgr.getAwaitTimeout {
154165
// ...and if so, delete this element by replacing it with
155166
// the last element of the slice and shrinking the length by
156-
// 1, and step the index back
157-
mgr.waitList[cid][i].callback(Block{}, ErrWaitTimeout)
167+
// 1, and step the index back.
168+
if mgr.waitList[cid][i].ctx.Err() == nil {
169+
mgr.waitList[cid][i].callback(Block{}, ErrWaitTimeout)
170+
}
158171
mgr.waitList[cid][i] = mgr.waitList[cid][len(mgr.waitList[cid])-1]
159172
mgr.waitList[cid] = mgr.waitList[cid][:len(mgr.waitList[cid])-1]
160173
i--
161174
}
162175
}
163176

164-
// If the slice is empty now, remove it entirely from the waitList
165-
// map
177+
// If the slice is empty now, remove it entirely from the waitList map
166178
if len(mgr.waitList[cid]) == 0 {
167179
delete(mgr.waitList, cid)
168180
}

endpoint/estuary.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
"github.com/application-research/autoretrieve/minerpeergetter"
1313
"github.com/filecoin-project/go-address"
14-
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
14+
"github.com/filecoin-project/lassie/pkg/types"
1515
"github.com/ipfs/go-cid"
1616
)
1717

@@ -39,7 +39,7 @@ func NewEstuaryEndpoint(url string, mpg *minerpeergetter.MinerPeerGetter) *Estua
3939
}
4040
}
4141

42-
func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]lassieretriever.RetrievalCandidate, error) {
42+
func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]types.RetrievalCandidate, error) {
4343
// Create URL with CID
4444
endpointURL, err := url.Parse(ee.url)
4545
if err != nil {
@@ -63,13 +63,13 @@ func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]l
6363
return nil, ErrEndpointBodyInvalid
6464
}
6565

66-
converted := make([]lassieretriever.RetrievalCandidate, 0, len(unfiltered))
66+
converted := make([]types.RetrievalCandidate, 0, len(unfiltered))
6767
for _, original := range unfiltered {
6868
minerPeer, err := ee.mpg.MinerPeer(ctx, original.Miner)
6969
if err != nil {
7070
return nil, fmt.Errorf("%w: failed to get miner peer: %v", ErrEndpointRequestFailed, err)
7171
}
72-
converted = append(converted, lassieretriever.RetrievalCandidate{
72+
converted = append(converted, types.RetrievalCandidate{
7373
MinerPeer: minerPeer,
7474
RootCid: original.RootCid,
7575
})

0 commit comments

Comments
 (0)