Skip to content

Commit ed57c41

Browse files
committed
fix: speed up full sync progress
1 parent 43c346c commit ed57c41

3 files changed

Lines changed: 126 additions & 9 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ bin/discrawl sync --channels 111,222 --since 2026-03-01T00:00:00Z
151151

152152
`sync` already uses parallel channel workers. `--concurrency` overrides the default, and the default is auto-sized from `GOMAXPROCS` with a floor of `8` and a cap of `32`.
153153
When `--channels` includes a forum channel id, `discrawl` expands that forum's threads and syncs their messages as part of the targeted run.
154+
Long runs now emit periodic progress logs to stderr so large backfills do not look hung.
154155

155156
### `tail`
156157

internal/syncer/channel_catalog_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,48 @@ func TestSyncSkipsUnchangedThreadsWhenHistoryComplete(t *testing.T) {
187187
require.Zero(t, client.messageCalls["t1"])
188188
}
189189

190+
func TestSyncSkipsUnchangedTextChannelsWhenHistoryComplete(t *testing.T) {
191+
t.Parallel()
192+
193+
ctx := context.Background()
194+
s, err := store.Open(ctx, filepath.Join(t.TempDir(), "discrawl.db"))
195+
require.NoError(t, err)
196+
defer func() { _ = s.Close() }()
197+
198+
require.NoError(t, s.UpsertGuild(ctx, store.GuildRecord{ID: "g1", Name: "Guild", RawJSON: `{}`}))
199+
require.NoError(t, s.UpsertChannel(ctx, store.ChannelRecord{
200+
ID: "c1",
201+
GuildID: "g1",
202+
Kind: "text",
203+
Name: "general",
204+
RawJSON: `{"id":"c1"}`,
205+
}))
206+
require.NoError(t, s.SetSyncState(ctx, channelLatestScope("c1"), "200"))
207+
require.NoError(t, s.SetSyncState(ctx, channelHistoryCompleteScope("c1"), "1"))
208+
209+
client := &fakeClient{
210+
guilds: []*discordgo.UserGuild{{ID: "g1", Name: "Guild"}},
211+
guildByID: map[string]*discordgo.Guild{
212+
"g1": {ID: "g1", Name: "Guild"},
213+
},
214+
channels: map[string][]*discordgo.Channel{
215+
"g1": {{
216+
ID: "c1",
217+
GuildID: "g1",
218+
Name: "general",
219+
Type: discordgo.ChannelTypeGuildText,
220+
LastMessageID: "200",
221+
}},
222+
},
223+
}
224+
225+
svc := New(client, s, nil)
226+
stats, err := svc.Sync(ctx, SyncOptions{Full: true, GuildIDs: []string{"g1"}})
227+
require.NoError(t, err)
228+
require.Zero(t, stats.Messages)
229+
require.Zero(t, client.messageCalls["c1"])
230+
}
231+
190232
func TestFullSyncReusesStoredThreadParents(t *testing.T) {
191233
t.Parallel()
192234

internal/syncer/message_sync.go

Lines changed: 83 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"sync"
7+
"time"
78

89
"github.com/bwmarrin/discordgo"
910
"github.com/steipete/discrawl/internal/store"
@@ -19,11 +20,12 @@ func (s *Syncer) syncMessageChannels(
1920
if len(messageChannels) == 0 {
2021
return 0, nil
2122
}
23+
progress := newMessageSyncProgress(s, guildID, len(messageChannels), opts)
2224
workers := opts.Concurrency
2325
if workers <= 1 {
24-
return s.syncMessageChannelsSerial(ctx, guildID, messageChannels, opts)
26+
return s.syncMessageChannelsSerial(ctx, guildID, messageChannels, opts, progress)
2527
}
26-
return s.syncMessageChannelsConcurrent(ctx, guildID, messageChannels, opts, workers)
28+
return s.syncMessageChannelsConcurrent(ctx, guildID, messageChannels, opts, workers, progress)
2729
}
2830

2931
func filterMessageChannels(channels []*discordgo.Channel, requested []string) []*discordgo.Channel {
@@ -64,7 +66,7 @@ func requestedMessageTarget(channel *discordgo.Channel, channelByID map[string]*
6466
return parent != nil && parent.Type == discordgo.ChannelTypeGuildForum
6567
}
6668

67-
func (s *Syncer) syncMessageChannelsSerial(ctx context.Context, guildID string, channels []*discordgo.Channel, opts SyncOptions) (int, error) {
69+
func (s *Syncer) syncMessageChannelsSerial(ctx context.Context, guildID string, channels []*discordgo.Channel, opts SyncOptions, progress *messageSyncProgress) (int, error) {
6870
total := 0
6971
for _, channel := range channels {
7072
count, err := s.syncChannelMessages(ctx, guildID, channel, opts.Full, opts.Embeddings)
@@ -78,6 +80,7 @@ func (s *Syncer) syncMessageChannelsSerial(ctx context.Context, guildID string,
7880
if err := s.clearUnavailableChannel(ctx, channel.ID); err != nil {
7981
return total, err
8082
}
83+
progress.record(channel, count)
8184
}
8285
return total, nil
8386
}
@@ -88,9 +91,11 @@ func (s *Syncer) syncMessageChannelsConcurrent(
8891
channels []*discordgo.Channel,
8992
opts SyncOptions,
9093
workers int,
94+
progress *messageSyncProgress,
9195
) (int, error) {
9296
type result struct {
9397
channelID string
98+
channel *discordgo.Channel
9499
count int
95100
err error
96101
}
@@ -119,7 +124,7 @@ func (s *Syncer) syncMessageChannelsConcurrent(
119124
err = s.clearUnavailableChannel(ctx, channel.ID)
120125
}
121126
select {
122-
case results <- result{channelID: channel.ID, count: count, err: err}:
127+
case results <- result{channelID: channel.ID, channel: channel, count: count, err: err}:
123128
case <-ctx.Done():
124129
return
125130
}
@@ -151,6 +156,7 @@ func (s *Syncer) syncMessageChannelsConcurrent(
151156
var firstErr error
152157
for result := range results {
153158
total += result.count
159+
progress.record(result.channel, result.count)
154160
if result.err != nil && firstErr == nil {
155161
firstErr = fmt.Errorf("sync channel %s: %w", result.channelID, result.err)
156162
}
@@ -174,12 +180,12 @@ func (s *Syncer) syncChannelMessages(ctx context.Context, guildID string, channe
174180
if err := s.seedChannelSyncState(ctx, channel.ID, &state); err != nil {
175181
return 0, err
176182
}
177-
if shouldSkipThreadSync(channel, state) {
183+
if shouldSkipChannelSync(channel, state) {
178184
return 0, nil
179185
}
180186
return s.syncFullChannelHistory(ctx, channel, state, embeddings)
181187
}
182-
if shouldSkipThreadSync(channel, state) {
188+
if shouldSkipChannelSync(channel, state) {
183189
return 0, nil
184190
}
185191
return s.syncIncrementalChannelHistory(ctx, channel, state, embeddings)
@@ -192,11 +198,14 @@ type channelSyncState struct {
192198
BackfillComplete bool
193199
}
194200

195-
func shouldSkipThreadSync(channel *discordgo.Channel, state channelSyncState) bool {
196-
if !isThreadChannel(channel) || !state.BackfillComplete {
201+
func shouldSkipChannelSync(channel *discordgo.Channel, state channelSyncState) bool {
202+
if !state.BackfillComplete || channel == nil {
197203
return false
198204
}
199-
if channel == nil || channel.LastMessageID == "" || state.Latest == "" {
205+
if channel.LastMessageID == "" {
206+
return state.Latest == ""
207+
}
208+
if state.Latest == "" {
200209
return false
201210
}
202211
return maxSnowflake(state.Latest, channel.LastMessageID) == state.Latest
@@ -418,3 +427,68 @@ func buildMessageMutations(ctx context.Context, messages []*discordgo.Message, c
418427
}
419428
return mutations, newest, nil
420429
}
430+
431+
type messageSyncProgress struct {
432+
syncer *Syncer
433+
guildID string
434+
totalChannels int
435+
startedAt time.Time
436+
lastLogAt time.Time
437+
processed int
438+
messages int
439+
}
440+
441+
func newMessageSyncProgress(s *Syncer, guildID string, totalChannels int, opts SyncOptions) *messageSyncProgress {
442+
if s == nil || s.logger == nil || totalChannels == 0 {
443+
return nil
444+
}
445+
progress := &messageSyncProgress{
446+
syncer: s,
447+
guildID: guildID,
448+
totalChannels: totalChannels,
449+
startedAt: time.Now(),
450+
lastLogAt: time.Now(),
451+
}
452+
s.logger.Info(
453+
"message sync started",
454+
"guild_id", guildID,
455+
"channels", totalChannels,
456+
"full", opts.Full,
457+
"concurrency", max(1, opts.Concurrency),
458+
)
459+
return progress
460+
}
461+
462+
func (p *messageSyncProgress) record(channel *discordgo.Channel, count int) {
463+
if p == nil || p.syncer == nil || p.syncer.logger == nil {
464+
return
465+
}
466+
p.processed++
467+
p.messages += count
468+
now := time.Now()
469+
shouldLog := p.processed == p.totalChannels ||
470+
p.processed == 1 ||
471+
p.processed%100 == 0 ||
472+
now.Sub(p.lastLogAt) >= 15*time.Second
473+
if !shouldLog {
474+
return
475+
}
476+
p.lastLogAt = now
477+
channelID := ""
478+
channelName := ""
479+
if channel != nil {
480+
channelID = channel.ID
481+
channelName = channel.Name
482+
}
483+
p.syncer.logger.Info(
484+
"message sync progress",
485+
"guild_id", p.guildID,
486+
"processed_channels", p.processed,
487+
"total_channels", p.totalChannels,
488+
"remaining_channels", p.totalChannels-p.processed,
489+
"messages_written", p.messages,
490+
"last_channel_id", channelID,
491+
"last_channel_name", channelName,
492+
"elapsed", now.Sub(p.startedAt).Round(time.Second).String(),
493+
)
494+
}

0 commit comments

Comments
 (0)