55 "crypto/sha1"
66 "fmt"
77 "io"
8+ "strconv"
89 "strings"
910 "time"
1011
@@ -34,6 +35,85 @@ type StreamContent struct {
3435 DomainSettings * embed.DomainSettingsData
3536}
3637
38+ const (
39+ bandwidthTestSize = 30 * 1024 * 1024 // 30MB
40+ bandwidthMultiplier = 1.5
41+ )
42+
43+ type SlowDownloadData struct {
44+ MeasuredSpeedMbps float64
45+ RequiredSpeedMbps float64
46+ BitrateMbps float64
47+ IsRateLimited bool
48+ RateLimitMbps float64
49+ TierName string
50+ }
51+
52+ type SlowDownloadError struct {
53+ Data SlowDownloadData
54+ }
55+
56+ func (e * SlowDownloadError ) Error () string {
57+ return "download speed too slow for streaming"
58+ }
59+
60+ type firstByteReader struct {
61+ r io.Reader
62+ firstByte time.Time
63+ started bool
64+ }
65+
66+ func (r * firstByteReader ) Read (p []byte ) (n int , err error ) {
67+ n , err = r .r .Read (p )
68+ if n > 0 && ! r .started {
69+ r .firstByte = time .Now ()
70+ r .started = true
71+ }
72+ return
73+ }
74+
75+ func getVideoBitrate (mp * api.MediaProbe ) int64 {
76+ if mp .Format .BitRate != "" {
77+ br , err := strconv .ParseInt (mp .Format .BitRate , 10 , 64 )
78+ if err == nil && br > 0 {
79+ return br
80+ }
81+ }
82+ var total int64
83+ for _ , s := range mp .Streams {
84+ if s .BitRate != "" {
85+ br , err := strconv .ParseInt (s .BitRate , 10 , 64 )
86+ if err == nil {
87+ total += br
88+ }
89+ }
90+ }
91+ return total
92+ }
93+
94+ func parseRateLimit (rate string ) int64 {
95+ rate = strings .TrimSpace (rate )
96+ if ! strings .HasSuffix (rate , "M" ) || len (rate ) < 2 {
97+ return 0
98+ }
99+ n , err := strconv .ParseInt (rate [:len (rate )- 1 ], 10 , 64 )
100+ if err != nil {
101+ return 0
102+ }
103+ return n * 1_000_000
104+ }
105+
106+ func isRateLimited (measuredBytesPerSec float64 , rateLimitBitsPerSec int64 ) bool {
107+ return measuredBytesPerSec * 8 >= float64 (rateLimitBitsPerSec )* 0.9
108+ }
109+
110+ func contentProbeURL (downloadURL string ) string {
111+ if i := strings .IndexByte (downloadURL , '?' ); i >= 0 {
112+ return downloadURL [:i ] + "~cp" + downloadURL [i :]
113+ }
114+ return downloadURL + "~cp"
115+ }
116+
37117func (s * ActionScript ) streamContent (ctx context.Context , j * job.Job , c * web.Context , resourceID string , itemID string , template string , settings * models.StreamSettings , vsud * models.VideoStreamUserData , dsd * embed.DomainSettingsData ) (err error ) {
38118 sc := & StreamContent {
39119 Settings : settings ,
@@ -61,32 +141,76 @@ func (s *ActionScript) streamContent(ctx context.Context, j *job.Job, c *web.Con
61141 sc .Item = & exportResponse .Source
62142 se := exportResponse .ExportItems ["stream" ]
63143
64- if se .Meta .Transcode {
65- if ! se .Meta .TranscodeCache {
66- if ! se .ExportMetaItem .Meta .Cache {
67- if err = s .warmUp (ctx , j , "warming up torrent client" , exportResponse .ExportItems ["download" ].URL , exportResponse .ExportItems ["torrent_client_stat" ].URL , int (exportResponse .Source .Size ), 1024 * 1024 , 500 * 1024 , "file" , true ); err != nil {
68- return
69- }
70- }
71- if err = s .warmUp (ctx , j , "warming up transcoder" , exportResponse .ExportItems ["stream" ].URL , exportResponse .ExportItems ["torrent_client_stat" ].URL , 0 , - 1 , - 1 , "stream" , false ); err != nil {
72- return
73- }
144+ var downloadSpeed float64
145+ fileSize := int (exportResponse .Source .Size )
146+ warmupSize := bandwidthTestSize
147+ if half := fileSize / 2 ; half > 0 && warmupSize > half {
148+ warmupSize = half
149+ }
150+ downloadURL := exportResponse .ExportItems ["download" ].URL
151+
152+ // Step 1: Torrent warmup (if original content not cached)
153+ needTorrentWarmup := ! se .ExportMetaItem .Meta .Cache && (! se .Meta .Transcode || ! se .Meta .TranscodeCache )
154+ if needTorrentWarmup {
155+ if downloadSpeed , err = s .warmUp (ctx , j , "warming up torrent client" , downloadURL , exportResponse .ExportItems ["torrent_client_stat" ].URL , fileSize , warmupSize , 500 * 1024 , "file" , true ); err != nil {
156+ return
74157 }
75- j .InProgress ("probing content media info" )
76- mpCtx , mpCancel := context .WithTimeout (ctx , 1 * time .Minute )
77- defer mpCancel ()
78- mp , err := s .api .GetMediaProbe (mpCtx , exportResponse .ExportItems ["media_probe" ].URL )
79- if err != nil {
80- return errors .Wrap (err , "failed to get probe data" )
158+ }
159+
160+ // Step 2: Content probe via ~cp (before transcoder warmup)
161+ j .InProgress ("probing content media info" )
162+ mpCtx , mpCancel := context .WithTimeout (ctx , 1 * time .Minute )
163+ defer mpCancel ()
164+ probeURL := contentProbeURL (downloadURL )
165+ mp , probeErr := s .api .GetMediaProbe (mpCtx , probeURL )
166+ if probeErr != nil {
167+ if mpItem , ok := exportResponse .ExportItems ["media_probe" ]; ok {
168+ mp , probeErr = s .api .GetMediaProbe (mpCtx , mpItem .URL )
81169 }
170+ }
171+ if probeErr != nil {
172+ if se .Meta .Transcode {
173+ return errors .Wrap (probeErr , "failed to get probe data" )
174+ }
175+ log .WithError (probeErr ).Warn ("failed to get content probe" )
176+ } else {
82177 sc .MediaProbe = mp
83178 log .Infof ("got media probe %+v" , mp )
84- j .Done ()
85- } else {
86- if ! se .ExportMetaItem .Meta .Cache {
87- if err = s .warmUp (ctx , j , "warming up torrent client" , exportResponse .ExportItems ["download" ].URL , exportResponse .ExportItems ["torrent_client_stat" ].URL , int (exportResponse .Source .Size ), 1024 * 1024 , 500 * 1024 , "file" , true ); err != nil {
88- return
179+ }
180+ j .Done ()
181+
182+ // Step 3: Bandwidth check
183+ if downloadSpeed > 0 && sc .MediaProbe != nil {
184+ j .InProgress ("checking bandwidth" )
185+ bitrate := getVideoBitrate (sc .MediaProbe )
186+ if bitrate > 0 && downloadSpeed * 8 < float64 (bitrate )* bandwidthMultiplier {
187+ sdd := SlowDownloadData {
188+ MeasuredSpeedMbps : downloadSpeed * 8 / 1_000_000 ,
189+ RequiredSpeedMbps : float64 (bitrate ) * bandwidthMultiplier / 1_000_000 ,
190+ BitrateMbps : float64 (bitrate ) / 1_000_000 ,
191+ }
192+ if c .ApiClaims != nil && c .ApiClaims .Rate != "" {
193+ rateLimitBps := parseRateLimit (c .ApiClaims .Rate )
194+ if rateLimitBps > 0 && isRateLimited (downloadSpeed , rateLimitBps ) {
195+ sdd .IsRateLimited = true
196+ sdd .RateLimitMbps = float64 (rateLimitBps ) / 1_000_000
197+ }
198+ }
199+ if c .Claims != nil && c .Claims .Context != nil && c .Claims .Context .Tier != nil {
200+ sdd .TierName = c .Claims .Context .Tier .Name
89201 }
202+ if sdd .TierName == "" {
203+ sdd .TierName = "free"
204+ }
205+ return & SlowDownloadError {Data : sdd }
206+ }
207+ j .Done ()
208+ }
209+
210+ // Step 4: Transcoder warmup (after bandwidth check)
211+ if se .Meta .Transcode && ! se .Meta .TranscodeCache {
212+ if _ , err = s .warmUp (ctx , j , "warming up transcoder" , exportResponse .ExportItems ["stream" ].URL , exportResponse .ExportItems ["torrent_client_stat" ].URL , 0 , - 1 , - 1 , "stream" , false ); err != nil {
213+ return
90214 }
91215 }
92216 if exportResponse .Source .MediaFormat == ra .Video {
@@ -170,7 +294,7 @@ func (s *ActionScript) download(ctx context.Context, j *job.Job, c *web.Context,
170294 de := resp .ExportItems ["download" ]
171295 //url := de.URL
172296 if ! de .ExportMetaItem .Meta .Cache {
173- if err := s .warmUp (ctx , j , "warming up torrent client" , resp .ExportItems ["download" ].URL , resp .ExportItems ["torrent_client_stat" ].URL , int (resp .Source .Size ), 1024 * 1024 , 0 , "" , true ); err != nil {
297+ if _ , err := s .warmUp (ctx , j , "warming up torrent client" , resp .ExportItems ["download" ].URL , resp .ExportItems ["torrent_client_stat" ].URL , int (resp .Source .Size ), 1024 * 1024 , 0 , "" , true ); err != nil {
174298 return err
175299 }
176300 }
@@ -191,7 +315,7 @@ func (s *ActionScript) download(ctx context.Context, j *job.Job, c *web.Context,
191315 return
192316}
193317
194- func (s * ActionScript ) warmUp (ctx context.Context , j * job.Job , m string , u string , su string , size int , limitStart int , limitEnd int , tagSuff string , useStatus bool ) (err error ) {
318+ func (s * ActionScript ) warmUp (ctx context.Context , j * job.Job , m string , u string , su string , size int , limitStart int , limitEnd int , tagSuff string , useStatus bool ) (downloadSpeed float64 , err error ) {
195319 tag := "download"
196320 if tagSuff != "" {
197321 tag += "-" + tagSuff
@@ -234,28 +358,35 @@ func (s *ActionScript) warmUp(ctx context.Context, j *job.Job, m string, u strin
234358
235359 b , err := s .api .DownloadWithRange (warmupCtx , u , 0 , limitStart )
236360 if err != nil {
237- return errors .Wrap (err , "failed to start download" )
361+ return 0 , errors .Wrap (err , "failed to start download" )
238362 }
239363 defer func (b io.ReadCloser ) {
240364 _ = b .Close ()
241365 }(b )
242366
243- _ , err = io .Copy (io .Discard , b )
367+ fbr := & firstByteReader {r : b }
368+ n , err := io .Copy (io .Discard , fbr )
369+ if fbr .started {
370+ elapsed := time .Since (fbr .firstByte )
371+ if elapsed > 0 && n > 0 {
372+ downloadSpeed = float64 (n ) / elapsed .Seconds ()
373+ }
374+ }
244375
245376 if limitEnd > 0 {
246377 b2 , err := s .api .DownloadWithRange (warmupCtx , u , size - limitEnd , - 1 )
247378 if err != nil {
248- return errors .Wrap (err , "failed to start download" )
379+ return 0 , errors .Wrap (err , "failed to start download" )
249380 }
250381 defer func (b2 io.ReadCloser ) {
251382 _ = b2 .Close ()
252383 }(b2 )
253384 _ , err = io .Copy (io .Discard , b2 )
254385 }
255386 if errors .Is (errors .Cause (err ), context .DeadlineExceeded ) {
256- return errors .Wrap (err , fmt .Sprintf ("failed to download within %v minutes" , s .warmupTimeoutMin ))
387+ return 0 , errors .Wrap (err , fmt .Sprintf ("failed to download within %v minutes" , s .warmupTimeoutMin ))
257388 } else if err != nil {
258- return errors .Wrap (err , "failed to download" )
389+ return 0 , errors .Wrap (err , "failed to download" )
259390 }
260391
261392 j .Done ()
@@ -299,6 +430,17 @@ type ErrorWrapperScript struct {
299430
300431func (s * ErrorWrapperScript ) Run (ctx context.Context , j * job.Job ) (err error ) {
301432 err = s .Script .Run (ctx , j )
433+ if sde , ok := err .(* SlowDownloadError ); ok {
434+ tpl := s .tb .Build ("action/errors/slow_download" ).WithLayoutBody (`{{ template "main" . }}` )
435+ str , terr := tpl .ToString (s .c .WithData (& sde .Data ))
436+ if terr != nil {
437+ return terr
438+ }
439+ log .WithError (err ).WithField ("data" , sde .Data ).Warn ("bandwidth check failed" )
440+ j .Fail ()
441+ j .Custom ("action/errors/slow_download" , strings .TrimSpace (str ))
442+ return nil
443+ }
302444 if errors .Is (errors .Cause (err ), context .DeadlineExceeded ) {
303445 tpl := s .tb .Build ("action/errors/no_peers" ).WithLayoutBody (`{{ template "main" . }}` )
304446 str , terr := tpl .ToString (s .c )
0 commit comments