diff --git a/app/tests/test_download_queue.py b/app/tests/test_download_queue.py index 20bf863..7e07c58 100644 --- a/app/tests/test_download_queue.py +++ b/app/tests/test_download_queue.py @@ -7,8 +7,9 @@ import tempfile from unittest.mock import AsyncMock, MagicMock, patch import pytest +import time -from ytdl import DownloadQueue +from ytdl import DownloadInfo, DownloadQueue @pytest.fixture @@ -386,3 +387,243 @@ async def test_add_sets_clip_bounds_on_download_info(dq_env): download = dq.pending.get("https://example.com/clip") assert download.info.clip_start == 10.0 assert download.info.clip_end == 99.5 + + +def _upcoming_entry(url: str, *, release_timestamp: float | None = None) -> dict: + return { + "_type": "video", + "id": "live1", + "title": "Upcoming Stream", + "url": url, + "webpage_url": url, + "live_status": "is_upcoming", + "release_timestamp": release_timestamp if release_timestamp is not None else time.time() + 3600, + } + + +@pytest.mark.asyncio +async def test_add_upcoming_stream_scheduled_without_starting(dq_env): + notifier = AsyncMock() + url = "https://example.com/live-upcoming" + start_mock = AsyncMock() + + dq = DownloadQueue(dq_env, notifier) + with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock): + result = await dq.add_entry( + _upcoming_entry(url), + "video", + "auto", + "any", + "best", + "", + "", + 0, + auto_start=True, + ) + + assert result["status"] == "ok" + assert dq.queue.exists(url) + download = dq.queue.get(url) + assert download.info.status == "scheduled" + assert download.info.live_status == "is_upcoming" + assert download.info.live_release_timestamp is not None + start_mock.assert_not_called() + assert url in dq._scheduled_probe_at + + +@pytest.mark.asyncio +async def test_probe_scheduled_starts_when_live(dq_env): + notifier = AsyncMock() + url = "https://example.com/live-upcoming" + start_mock = AsyncMock() + + dq = DownloadQueue(dq_env, notifier) + with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock): + await dq.add_entry( + _upcoming_entry(url), + "video", + "auto", + "any", + "best", + "", + "", + 0, + auto_start=True, + ) + + download = dq.queue.get(url) + + def fake_probe_extract(self, probe_url, ytdl_options_presets=None, ytdl_options_overrides=None): + assert probe_url == url + return { + "_type": "video", + "id": "live1", + "title": "Live Now", + "url": url, + "webpage_url": url, + "live_status": "is_live", + "formats": [{"format_id": "22"}], + } + + with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_probe_extract), \ + patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock): + await dq._probe_scheduled_download(download) + + assert url not in dq._scheduled_probe_at + assert download.info.live_status == "is_live" + assert download.info.status == "pending" + start_mock.assert_called_once_with(download) + + +@pytest.mark.asyncio +async def test_import_scheduled_re_registers_monitor(dq_env): + notifier = AsyncMock() + url = "https://example.com/live-restart" + release = time.time() + 7200 + + info = DownloadInfo( + id="live1", + title="Upcoming Stream", + url=url, + quality="best", + download_type="video", + codec="auto", + format="any", + folder="", + custom_name_prefix="", + error=None, + entry=None, + playlist_item_limit=0, + split_by_chapters=False, + chapter_template="", + live_status="is_upcoming", + live_release_timestamp=release, + ) + info.status = "scheduled" + + dq = DownloadQueue(dq_env, notifier) + start_mock = AsyncMock() + with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock): + await dq._DownloadQueue__add_download(info, True) + + assert dq.queue.exists(url) + assert dq.queue.get(url).info.status == "scheduled" + assert url in dq._scheduled_probe_at + start_mock.assert_not_called() + + +@pytest.mark.asyncio +async def test_probe_transient_error_retries_without_failing(dq_env): + """A single probe failure must not abandon the scheduled stream.""" + import ytdl + + notifier = AsyncMock() + url = "https://example.com/live-transient" + start_mock = AsyncMock() + + dq = DownloadQueue(dq_env, notifier) + with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock): + await dq.add_entry( + _upcoming_entry(url), + "video", "auto", "any", "best", "", "", 0, + auto_start=True, + ) + download = dq.queue.get(url) + + def boom(self, *args, **kwargs): + raise ytdl.yt_dlp.utils.YoutubeDLError("temporary network glitch") + + before = time.time() + with patch.object(DownloadQueue, "_DownloadQueue__extract_info", boom): + await dq._probe_scheduled_download(download) + + # Still scheduled, still monitored, probe rescheduled into the future. + assert download.info.status == "scheduled" + assert url in dq._scheduled_probe_at + assert dq._scheduled_probe_at[url] >= before + assert dq._scheduled_probe_failures[url] == 1 + notifier.completed.assert_not_called() + + +@pytest.mark.asyncio +async def test_probe_gives_up_after_max_failures(dq_env): + import ytdl + + notifier = AsyncMock() + url = "https://example.com/live-dead" + start_mock = AsyncMock() + + dq = DownloadQueue(dq_env, notifier) + with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock): + await dq.add_entry( + _upcoming_entry(url), + "video", "auto", "any", "best", "", "", 0, + auto_start=True, + ) + download = dq.queue.get(url) + + def boom(self, *args, **kwargs): + raise ytdl.yt_dlp.utils.YoutubeDLError("stream was deleted") + + with patch.object(DownloadQueue, "_DownloadQueue__extract_info", boom): + for _ in range(ytdl._LIVE_PROBE_MAX_FAILURES): + await dq._probe_scheduled_download(download) + + assert url not in dq._scheduled_probe_at + assert not dq.queue.exists(url) + assert dq.done.exists(url) + assert download.info.status == "error" + notifier.completed.assert_awaited() + + +@pytest.mark.asyncio +async def test_probe_recovers_after_transient_then_starts(dq_env): + """A transient failure followed by a successful live probe should start the download.""" + import ytdl + + notifier = AsyncMock() + url = "https://example.com/live-recover" + start_mock = AsyncMock() + + dq = DownloadQueue(dq_env, notifier) + with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock): + await dq.add_entry( + _upcoming_entry(url), + "video", "auto", "any", "best", "", "", 0, + auto_start=True, + ) + download = dq.queue.get(url) + # The scheduling placeholder error is set on add. + assert download.info.error + + def boom(self, *args, **kwargs): + raise ytdl.yt_dlp.utils.YoutubeDLError("temporary glitch") + + with patch.object(DownloadQueue, "_DownloadQueue__extract_info", boom): + await dq._probe_scheduled_download(download) + assert dq._scheduled_probe_failures[url] == 1 + + def live_now(self, *args, **kwargs): + return { + "_type": "video", "id": "live1", "title": "Live Now", + "url": url, "webpage_url": url, "live_status": "is_live", + "formats": [{"format_id": "22"}], + } + + with patch.object(DownloadQueue, "_DownloadQueue__extract_info", live_now), \ + patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock): + await dq._probe_scheduled_download(download) + + assert url not in dq._scheduled_probe_at + assert url not in dq._scheduled_probe_failures + assert download.info.status == "pending" + # Placeholder error/msg cleared now that a real download is starting. + assert download.info.error is None + assert download.info.msg is None + start_mock.assert_called_once_with(download) + + +def test_seconds_until_next_probe_none_when_empty(dq_env): + notifier = AsyncMock() + dq = DownloadQueue(dq_env, notifier) + assert dq._seconds_until_next_probe() is None diff --git a/app/ytdl.py b/app/ytdl.py index d928c0f..31bca5e 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -24,6 +24,12 @@ from subscriptions import _entry_id log = logging.getLogger('ytdl') +_LIVE_CHECK_INTERVAL = 60 +_LIVE_MAX_CHECK_INTERVAL = 3600 +# Consecutive probe failures (network blips, rate limits, transient extractor +# errors) tolerated before a scheduled live download is abandoned as errored. +_LIVE_PROBE_MAX_FAILURES = 5 + # Characters that are invalid in Windows/NTFS path components. These are pre- # sanitised when substituting playlist/channel titles into output templates so @@ -194,6 +200,8 @@ class DownloadInfo: ytdl_options_overrides=None, clip_start=None, clip_end=None, + live_status=None, + live_release_timestamp=None, ): self.id = id if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{id}' self.title = title if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{title}' @@ -220,6 +228,8 @@ class DownloadInfo: self.ytdl_options_overrides = dict(ytdl_options_overrides or {}) self.clip_start = clip_start self.clip_end = clip_end + self.live_status = live_status + self.live_release_timestamp = live_release_timestamp self.subtitle_files = [] def __setstate__(self, state): @@ -292,6 +302,10 @@ class DownloadInfo: self.clip_start = None if not hasattr(self, "clip_end"): self.clip_end = None + if not hasattr(self, "live_status"): + self.live_status = None + if not hasattr(self, "live_release_timestamp"): + self.live_release_timestamp = None _PERSISTED_DOWNLOAD_FIELDS = ( @@ -313,6 +327,8 @@ _PERSISTED_DOWNLOAD_FIELDS = ( "ytdl_options_overrides", "clip_start", "clip_end", + "live_status", + "live_release_timestamp", "status", "timestamp", "error", @@ -757,6 +773,10 @@ class DownloadQueue: self.done.load() self._add_generation = 0 self._canceled_urls = set() # URLs canceled during current playlist add + self._scheduled_probe_at: dict[str, float] = {} + self._scheduled_probe_failures: dict[str, int] = {} + self._live_monitor_task: Optional[asyncio.Task] = None + self._live_monitor_wakeup = asyncio.Event() def cancel_add(self): self._add_generation += 1 @@ -772,9 +792,165 @@ class DownloadQueue: async def initialize(self): log.info("Initializing DownloadQueue") + self._start_live_monitor() asyncio.create_task(self.__import_queue()) asyncio.create_task(self.__import_pending()) + def _start_live_monitor(self) -> None: + if self._live_monitor_task is not None and not self._live_monitor_task.done(): + return + self._live_monitor_task = asyncio.create_task(self._live_monitor_loop()) + self._live_monitor_task.add_done_callback( + lambda t: log.error("Live monitor loop failed: %s", t.exception()) + if not t.cancelled() and t.exception() + else None + ) + + def _register_scheduled(self, download: Download) -> None: + self._scheduled_probe_at[download.info.url] = 0 + self._scheduled_probe_failures.pop(download.info.url, None) + self._start_live_monitor() + self._wake_live_monitor() + + def _unregister_scheduled(self, url: str) -> None: + self._scheduled_probe_at.pop(url, None) + self._scheduled_probe_failures.pop(url, None) + + def _wake_live_monitor(self) -> None: + try: + self._live_monitor_wakeup.set() + except RuntimeError: + pass + + def _probe_interval_seconds(self, release_timestamp: Any) -> float: + if release_timestamp is not None: + try: + diff = float(release_timestamp) - time.time() + if diff > 0: + return max(_LIVE_CHECK_INTERVAL, min(diff, _LIVE_MAX_CHECK_INTERVAL)) + except (TypeError, ValueError): + pass + return float(_LIVE_CHECK_INTERVAL) + + def _seconds_until_next_probe(self) -> Optional[float]: + """Time until the earliest scheduled probe, or None when nothing is scheduled.""" + if not self._scheduled_probe_at: + return None + return max(0.0, min(self._scheduled_probe_at.values()) - time.time()) + + async def _live_monitor_loop(self) -> None: + while True: + timeout = self._seconds_until_next_probe() + try: + await asyncio.wait_for(self._live_monitor_wakeup.wait(), timeout=timeout) + except asyncio.TimeoutError: + pass + self._live_monitor_wakeup.clear() + now = time.time() + due: list[Download] = [] + for url, probe_at in list(self._scheduled_probe_at.items()): + if now < probe_at: + continue + if not self.queue.exists(url): + self._unregister_scheduled(url) + continue + download = self.queue.get(url) + if download.info.status != 'scheduled' or download.canceled: + self._unregister_scheduled(url) + continue + due.append(download) + for download in due: + try: + await self._probe_scheduled_download(download) + except Exception as exc: + # Defensive: _probe_scheduled_download handles its own errors, + # but never let an unexpected failure leave probe_at in the past + # (which would spin this loop) or kill the monitor task. + log.exception("Scheduled live probe crashed for %s: %s", download.info.url, exc) + if download.info.url in self._scheduled_probe_at: + self._scheduled_probe_at[download.info.url] = time.time() + _LIVE_CHECK_INTERVAL + + async def _probe_scheduled_download(self, download: Download) -> None: + url = download.info.url + info = download.info + if info.status != 'scheduled' or download.canceled: + self._unregister_scheduled(url) + return + + try: + entry = await asyncio.get_running_loop().run_in_executor( + None, + partial( + self.__extract_info, + url, + getattr(info, 'ytdl_options_presets', None), + getattr(info, 'ytdl_options_overrides', {}) or {}, + ), + ) + except Exception as exc: + # Treat all probe failures (transient network blips, rate limits, + # extractor errors) as recoverable up to a point: retry on the next + # interval and only give up after repeated consecutive failures so a + # momentary glitch doesn't abandon a stream the user is waiting for. + fails = self._scheduled_probe_failures.get(url, 0) + 1 + self._scheduled_probe_failures[url] = fails + if fails >= _LIVE_PROBE_MAX_FAILURES: + log.warning( + "Giving up on scheduled live probe for %s after %d consecutive failures: %s", + info.title, fails, exc, + ) + info.status = 'error' + info.msg = str(exc) + if not info.error: + info.error = str(exc) + self._unregister_scheduled(url) + self.queue.delete(url) + self.done.put(download) + await self.notifier.completed(info) + else: + log.warning( + "Scheduled live probe failed for %s (attempt %d/%d), will retry: %s", + info.title, fails, _LIVE_PROBE_MAX_FAILURES, exc, + ) + self._scheduled_probe_at[url] = time.time() + _LIVE_CHECK_INTERVAL + return + + # Successful probe resets the transient-failure streak. + self._scheduled_probe_failures.pop(url, None) + + release_ts = entry.get('release_timestamp') + live_status = entry.get('live_status') + if release_ts is not None: + info.live_release_timestamp = release_ts + if live_status is not None: + info.live_status = live_status + + if live_status == 'is_upcoming': + self._scheduled_probe_at[url] = time.time() + self._probe_interval_seconds(release_ts) + await self.notifier.updated(info) + return + + self._unregister_scheduled(url) + info.status = 'pending' + # Clear the "scheduled to start at ..." placeholder now that the stream + # is live and a real download is about to begin. + info.error = None + info.msg = None + await self.notifier.updated(info) + asyncio.create_task(self.__start_download(download)) + + def _schedule_upcoming_download(self, download: Download) -> None: + download.info.status = 'scheduled' + self.queue.put(download) + self._register_scheduled(download) + + def _force_start_scheduled(self, download: Download) -> None: + self._unregister_scheduled(download.info.url) + download.info.status = 'pending' + download.info.error = None + download.info.msg = None + asyncio.create_task(self.__start_download(download)) + async def __start_download(self, download): if download.canceled: log.info(f"Download {download.info.title} was canceled, skipping start.") @@ -886,9 +1062,16 @@ class DownloadQueue: log.info(f'playlist limit is set. Processing only first {playlist_item_limit} entries') ytdl_options['playlistend'] = playlist_item_limit download = Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, dl.quality, dl.format, ytdl_options, dl) + is_upcoming = ( + getattr(dl, 'live_status', None) == 'is_upcoming' + or getattr(dl, 'status', None) == 'scheduled' + ) if auto_start is True: - self.queue.put(download) - asyncio.create_task(self.__start_download(download)) + if is_upcoming: + self._schedule_upcoming_download(download) + else: + self.queue.put(download) + asyncio.create_task(self.__start_download(download)) else: self.pending.put(download) await self.notifier.added(dl) @@ -1036,6 +1219,8 @@ class DownloadQueue: ytdl_options_overrides=ytdl_options_overrides, clip_start=clip_start, clip_end=clip_end, + live_status=entry.get('live_status'), + live_release_timestamp=entry.get('release_timestamp'), ) await self.__add_download(dl, auto_start) return {'status': 'ok'} @@ -1156,13 +1341,21 @@ class DownloadQueue: async def start_pending(self, ids): for id in ids: - if not self.pending.exists(id): - log.warning(f'requested start for non-existent download {id}') + if self.pending.exists(id): + dl = self.pending.get(id) + self.pending.delete(id) + if getattr(dl.info, 'live_status', None) == 'is_upcoming': + self._schedule_upcoming_download(dl) + else: + self.queue.put(dl) + asyncio.create_task(self.__start_download(dl)) continue - dl = self.pending.get(id) - self.queue.put(dl) - self.pending.delete(id) - asyncio.create_task(self.__start_download(dl)) + if self.queue.exists(id): + dl = self.queue.get(id) + if dl.info.status == 'scheduled': + self._force_start_scheduled(dl) + continue + log.warning(f'requested start for non-existent download {id}') return {'status': 'ok'} async def cancel(self, ids): @@ -1177,6 +1370,8 @@ class DownloadQueue: log.warning(f'requested cancel for non-existent download {id}') continue dl = self.queue.get(id) + if dl.info.status == 'scheduled': + self._unregister_scheduled(id) if dl.started(): dl.cancel() else: diff --git a/ui/src/app/app.html b/ui/src/app/app.html index f9699fd..0bd8300 100644 --- a/ui/src/app/app.html +++ b/ui/src/app/app.html @@ -706,16 +706,31 @@
-
{{ download.value.title }}
- +
+ {{ download.value.title }} + @if (download.value.live_status === 'is_live' && download.value.status !== 'scheduled') { + LIVE + } +
+ @if (download.value.status === 'scheduled') { + + + Waiting for stream + @if (liveCountdownSeconds(download.value); as secs) { + - starts in {{ secs | eta }} + } + + } @else { + + }
{{ download.value.speed | speed }} {{ download.value.eta | eta }}
- @if (download.value.status === 'pending') { + @if (download.value.status === 'pending' || download.value.status === 'scheduled') { } diff --git a/ui/src/app/app.spec.ts b/ui/src/app/app.spec.ts index cdaf356..a77b171 100644 --- a/ui/src/app/app.spec.ts +++ b/ui/src/app/app.spec.ts @@ -182,6 +182,37 @@ describe('App', () => { expect(payload.ytdlOptionsOverrides).toBe(''); }); + it('shows waiting badge for scheduled live stream', () => { + downloads.queue.set('https://example.com/live', { + id: 'live1', + title: 'Upcoming Stream', + url: 'https://example.com/live', + download_type: 'video', + quality: 'best', + format: 'any', + folder: '', + custom_name_prefix: '', + playlist_item_limit: 0, + status: 'scheduled', + live_status: 'is_upcoming', + live_release_timestamp: Date.now() / 1000 + 3600, + msg: '', + percent: 0, + speed: 0, + eta: 0, + filename: '', + checked: false, + }); + downloads.queueChanged.next(); + + const fixture = TestBed.createComponent(App); + fixture.detectChanges(); + + const root = fixture.nativeElement as HTMLElement; + expect(root.textContent).toContain('Waiting for stream'); + expect(root.textContent).toContain('starts in'); + }); + it('includes titleRegex in subscribe payload', () => { const fixture = TestBed.createComponent(App); const app = fixture.componentInstance; diff --git a/ui/src/app/app.ts b/ui/src/app/app.ts index 5dc8709..7245ab3 100644 --- a/ui/src/app/app.ts +++ b/ui/src/app/app.ts @@ -132,6 +132,7 @@ export class App implements AfterViewInit, OnInit, OnDestroy { lastCopiedErrorId: string | null = null; private previousDownloadType = 'video'; private addRequestSub?: Subscription; + private liveCountdownTimer?: ReturnType; private selectionsByType: Record { this.updateMetrics(); + this.syncLiveCountdownTimer(); this.cdr.markForCheck(); }); this.downloads.doneChanged.pipe(takeUntilDestroyed(this.destroyRef)).subscribe(() => { @@ -295,6 +297,7 @@ export class App implements AfterViewInit, OnInit, OnDestroy { // Subscribe to real-time updates this.downloads.updated.pipe(takeUntilDestroyed(this.destroyRef)).subscribe(() => { this.updateMetrics(); + this.syncLiveCountdownTimer(); this.cdr.markForCheck(); }); @@ -337,6 +340,9 @@ export class App implements AfterViewInit, OnInit, OnDestroy { ngOnDestroy() { this.addRequestSub?.unsubscribe(); + if (this.liveCountdownTimer) { + clearInterval(this.liveCountdownTimer); + } this.colorSchemeMediaQuery.removeEventListener('change', this.onColorSchemeChanged); } @@ -1106,6 +1112,26 @@ export class App implements AfterViewInit, OnInit, OnDestroy { this.downloads.startById([id]).subscribe(); } + liveCountdownSeconds(download: Download): number | null { + const ts = download.live_release_timestamp; + if (ts == null || download.status !== 'scheduled') { + return null; + } + return Math.max(0, ts - Date.now() / 1000); + } + + private syncLiveCountdownTimer() { + const hasScheduled = Array.from(this.downloads.queue.values()).some( + (download) => download.status === 'scheduled', + ); + if (hasScheduled && !this.liveCountdownTimer) { + this.liveCountdownTimer = setInterval(() => this.cdr.markForCheck(), 1000); + } else if (!hasScheduled && this.liveCountdownTimer) { + clearInterval(this.liveCountdownTimer); + this.liveCountdownTimer = undefined; + } + } + retryDownload(key: string, download: Download) { this.addDownload({ url: download.url, @@ -1631,7 +1657,7 @@ export class App implements AfterViewInit, OnInit, OnDestroy { speed += download.speed || 0; } else if (download.status === 'preparing') { active++; - } else if (download.status === 'pending') { + } else if (download.status === 'pending' || download.status === 'scheduled') { queued++; } }); diff --git a/ui/src/app/interfaces/download.ts b/ui/src/app/interfaces/download.ts index 935d4da..50de3cd 100644 --- a/ui/src/app/interfaces/download.ts +++ b/ui/src/app/interfaces/download.ts @@ -18,6 +18,8 @@ export interface Download { ytdl_options_overrides?: Record; clip_start?: number; clip_end?: number; + live_status?: string; + live_release_timestamp?: number; status: string; msg: string; percent: number;