support live streams (closes #302, closes #752, closes #978)

This commit is contained in:
Alex Shnitman
2026-06-13 17:39:14 +03:00
parent 72d60ea55a
commit 5429200fba
6 changed files with 524 additions and 14 deletions
+242 -1
View File
@@ -7,8 +7,9 @@ import tempfile
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import time
from ytdl import DownloadQueue
from ytdl import DownloadInfo, DownloadQueue
@pytest.fixture
@@ -386,3 +387,243 @@ async def test_add_sets_clip_bounds_on_download_info(dq_env):
download = dq.pending.get("https://example.com/clip")
assert download.info.clip_start == 10.0
assert download.info.clip_end == 99.5
def _upcoming_entry(url: str, *, release_timestamp: float | None = None) -> dict:
return {
"_type": "video",
"id": "live1",
"title": "Upcoming Stream",
"url": url,
"webpage_url": url,
"live_status": "is_upcoming",
"release_timestamp": release_timestamp if release_timestamp is not None else time.time() + 3600,
}
@pytest.mark.asyncio
async def test_add_upcoming_stream_scheduled_without_starting(dq_env):
notifier = AsyncMock()
url = "https://example.com/live-upcoming"
start_mock = AsyncMock()
dq = DownloadQueue(dq_env, notifier)
with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
result = await dq.add_entry(
_upcoming_entry(url),
"video",
"auto",
"any",
"best",
"",
"",
0,
auto_start=True,
)
assert result["status"] == "ok"
assert dq.queue.exists(url)
download = dq.queue.get(url)
assert download.info.status == "scheduled"
assert download.info.live_status == "is_upcoming"
assert download.info.live_release_timestamp is not None
start_mock.assert_not_called()
assert url in dq._scheduled_probe_at
@pytest.mark.asyncio
async def test_probe_scheduled_starts_when_live(dq_env):
notifier = AsyncMock()
url = "https://example.com/live-upcoming"
start_mock = AsyncMock()
dq = DownloadQueue(dq_env, notifier)
with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
await dq.add_entry(
_upcoming_entry(url),
"video",
"auto",
"any",
"best",
"",
"",
0,
auto_start=True,
)
download = dq.queue.get(url)
def fake_probe_extract(self, probe_url, ytdl_options_presets=None, ytdl_options_overrides=None):
assert probe_url == url
return {
"_type": "video",
"id": "live1",
"title": "Live Now",
"url": url,
"webpage_url": url,
"live_status": "is_live",
"formats": [{"format_id": "22"}],
}
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_probe_extract), \
patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
await dq._probe_scheduled_download(download)
assert url not in dq._scheduled_probe_at
assert download.info.live_status == "is_live"
assert download.info.status == "pending"
start_mock.assert_called_once_with(download)
@pytest.mark.asyncio
async def test_import_scheduled_re_registers_monitor(dq_env):
notifier = AsyncMock()
url = "https://example.com/live-restart"
release = time.time() + 7200
info = DownloadInfo(
id="live1",
title="Upcoming Stream",
url=url,
quality="best",
download_type="video",
codec="auto",
format="any",
folder="",
custom_name_prefix="",
error=None,
entry=None,
playlist_item_limit=0,
split_by_chapters=False,
chapter_template="",
live_status="is_upcoming",
live_release_timestamp=release,
)
info.status = "scheduled"
dq = DownloadQueue(dq_env, notifier)
start_mock = AsyncMock()
with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
await dq._DownloadQueue__add_download(info, True)
assert dq.queue.exists(url)
assert dq.queue.get(url).info.status == "scheduled"
assert url in dq._scheduled_probe_at
start_mock.assert_not_called()
@pytest.mark.asyncio
async def test_probe_transient_error_retries_without_failing(dq_env):
"""A single probe failure must not abandon the scheduled stream."""
import ytdl
notifier = AsyncMock()
url = "https://example.com/live-transient"
start_mock = AsyncMock()
dq = DownloadQueue(dq_env, notifier)
with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
await dq.add_entry(
_upcoming_entry(url),
"video", "auto", "any", "best", "", "", 0,
auto_start=True,
)
download = dq.queue.get(url)
def boom(self, *args, **kwargs):
raise ytdl.yt_dlp.utils.YoutubeDLError("temporary network glitch")
before = time.time()
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", boom):
await dq._probe_scheduled_download(download)
# Still scheduled, still monitored, probe rescheduled into the future.
assert download.info.status == "scheduled"
assert url in dq._scheduled_probe_at
assert dq._scheduled_probe_at[url] >= before
assert dq._scheduled_probe_failures[url] == 1
notifier.completed.assert_not_called()
@pytest.mark.asyncio
async def test_probe_gives_up_after_max_failures(dq_env):
import ytdl
notifier = AsyncMock()
url = "https://example.com/live-dead"
start_mock = AsyncMock()
dq = DownloadQueue(dq_env, notifier)
with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
await dq.add_entry(
_upcoming_entry(url),
"video", "auto", "any", "best", "", "", 0,
auto_start=True,
)
download = dq.queue.get(url)
def boom(self, *args, **kwargs):
raise ytdl.yt_dlp.utils.YoutubeDLError("stream was deleted")
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", boom):
for _ in range(ytdl._LIVE_PROBE_MAX_FAILURES):
await dq._probe_scheduled_download(download)
assert url not in dq._scheduled_probe_at
assert not dq.queue.exists(url)
assert dq.done.exists(url)
assert download.info.status == "error"
notifier.completed.assert_awaited()
@pytest.mark.asyncio
async def test_probe_recovers_after_transient_then_starts(dq_env):
"""A transient failure followed by a successful live probe should start the download."""
import ytdl
notifier = AsyncMock()
url = "https://example.com/live-recover"
start_mock = AsyncMock()
dq = DownloadQueue(dq_env, notifier)
with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
await dq.add_entry(
_upcoming_entry(url),
"video", "auto", "any", "best", "", "", 0,
auto_start=True,
)
download = dq.queue.get(url)
# The scheduling placeholder error is set on add.
assert download.info.error
def boom(self, *args, **kwargs):
raise ytdl.yt_dlp.utils.YoutubeDLError("temporary glitch")
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", boom):
await dq._probe_scheduled_download(download)
assert dq._scheduled_probe_failures[url] == 1
def live_now(self, *args, **kwargs):
return {
"_type": "video", "id": "live1", "title": "Live Now",
"url": url, "webpage_url": url, "live_status": "is_live",
"formats": [{"format_id": "22"}],
}
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", live_now), \
patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
await dq._probe_scheduled_download(download)
assert url not in dq._scheduled_probe_at
assert url not in dq._scheduled_probe_failures
assert download.info.status == "pending"
# Placeholder error/msg cleared now that a real download is starting.
assert download.info.error is None
assert download.info.msg is None
start_mock.assert_called_once_with(download)
def test_seconds_until_next_probe_none_when_empty(dq_env):
notifier = AsyncMock()
dq = DownloadQueue(dq_env, notifier)
assert dq._seconds_until_next_probe() is None
+203 -8
View File
@@ -24,6 +24,12 @@ from subscriptions import _entry_id
log = logging.getLogger('ytdl')
_LIVE_CHECK_INTERVAL = 60
_LIVE_MAX_CHECK_INTERVAL = 3600
# Consecutive probe failures (network blips, rate limits, transient extractor
# errors) tolerated before a scheduled live download is abandoned as errored.
_LIVE_PROBE_MAX_FAILURES = 5
# Characters that are invalid in Windows/NTFS path components. These are pre-
# sanitised when substituting playlist/channel titles into output templates so
@@ -194,6 +200,8 @@ class DownloadInfo:
ytdl_options_overrides=None,
clip_start=None,
clip_end=None,
live_status=None,
live_release_timestamp=None,
):
self.id = id if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{id}'
self.title = title if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{title}'
@@ -220,6 +228,8 @@ class DownloadInfo:
self.ytdl_options_overrides = dict(ytdl_options_overrides or {})
self.clip_start = clip_start
self.clip_end = clip_end
self.live_status = live_status
self.live_release_timestamp = live_release_timestamp
self.subtitle_files = []
def __setstate__(self, state):
@@ -292,6 +302,10 @@ class DownloadInfo:
self.clip_start = None
if not hasattr(self, "clip_end"):
self.clip_end = None
if not hasattr(self, "live_status"):
self.live_status = None
if not hasattr(self, "live_release_timestamp"):
self.live_release_timestamp = None
_PERSISTED_DOWNLOAD_FIELDS = (
@@ -313,6 +327,8 @@ _PERSISTED_DOWNLOAD_FIELDS = (
"ytdl_options_overrides",
"clip_start",
"clip_end",
"live_status",
"live_release_timestamp",
"status",
"timestamp",
"error",
@@ -757,6 +773,10 @@ class DownloadQueue:
self.done.load()
self._add_generation = 0
self._canceled_urls = set() # URLs canceled during current playlist add
self._scheduled_probe_at: dict[str, float] = {}
self._scheduled_probe_failures: dict[str, int] = {}
self._live_monitor_task: Optional[asyncio.Task] = None
self._live_monitor_wakeup = asyncio.Event()
def cancel_add(self):
self._add_generation += 1
@@ -772,9 +792,165 @@ class DownloadQueue:
async def initialize(self):
log.info("Initializing DownloadQueue")
self._start_live_monitor()
asyncio.create_task(self.__import_queue())
asyncio.create_task(self.__import_pending())
def _start_live_monitor(self) -> None:
if self._live_monitor_task is not None and not self._live_monitor_task.done():
return
self._live_monitor_task = asyncio.create_task(self._live_monitor_loop())
self._live_monitor_task.add_done_callback(
lambda t: log.error("Live monitor loop failed: %s", t.exception())
if not t.cancelled() and t.exception()
else None
)
def _register_scheduled(self, download: Download) -> None:
self._scheduled_probe_at[download.info.url] = 0
self._scheduled_probe_failures.pop(download.info.url, None)
self._start_live_monitor()
self._wake_live_monitor()
def _unregister_scheduled(self, url: str) -> None:
self._scheduled_probe_at.pop(url, None)
self._scheduled_probe_failures.pop(url, None)
def _wake_live_monitor(self) -> None:
try:
self._live_monitor_wakeup.set()
except RuntimeError:
pass
def _probe_interval_seconds(self, release_timestamp: Any) -> float:
if release_timestamp is not None:
try:
diff = float(release_timestamp) - time.time()
if diff > 0:
return max(_LIVE_CHECK_INTERVAL, min(diff, _LIVE_MAX_CHECK_INTERVAL))
except (TypeError, ValueError):
pass
return float(_LIVE_CHECK_INTERVAL)
def _seconds_until_next_probe(self) -> Optional[float]:
"""Time until the earliest scheduled probe, or None when nothing is scheduled."""
if not self._scheduled_probe_at:
return None
return max(0.0, min(self._scheduled_probe_at.values()) - time.time())
async def _live_monitor_loop(self) -> None:
while True:
timeout = self._seconds_until_next_probe()
try:
await asyncio.wait_for(self._live_monitor_wakeup.wait(), timeout=timeout)
except asyncio.TimeoutError:
pass
self._live_monitor_wakeup.clear()
now = time.time()
due: list[Download] = []
for url, probe_at in list(self._scheduled_probe_at.items()):
if now < probe_at:
continue
if not self.queue.exists(url):
self._unregister_scheduled(url)
continue
download = self.queue.get(url)
if download.info.status != 'scheduled' or download.canceled:
self._unregister_scheduled(url)
continue
due.append(download)
for download in due:
try:
await self._probe_scheduled_download(download)
except Exception as exc:
# Defensive: _probe_scheduled_download handles its own errors,
# but never let an unexpected failure leave probe_at in the past
# (which would spin this loop) or kill the monitor task.
log.exception("Scheduled live probe crashed for %s: %s", download.info.url, exc)
if download.info.url in self._scheduled_probe_at:
self._scheduled_probe_at[download.info.url] = time.time() + _LIVE_CHECK_INTERVAL
async def _probe_scheduled_download(self, download: Download) -> None:
url = download.info.url
info = download.info
if info.status != 'scheduled' or download.canceled:
self._unregister_scheduled(url)
return
try:
entry = await asyncio.get_running_loop().run_in_executor(
None,
partial(
self.__extract_info,
url,
getattr(info, 'ytdl_options_presets', None),
getattr(info, 'ytdl_options_overrides', {}) or {},
),
)
except Exception as exc:
# Treat all probe failures (transient network blips, rate limits,
# extractor errors) as recoverable up to a point: retry on the next
# interval and only give up after repeated consecutive failures so a
# momentary glitch doesn't abandon a stream the user is waiting for.
fails = self._scheduled_probe_failures.get(url, 0) + 1
self._scheduled_probe_failures[url] = fails
if fails >= _LIVE_PROBE_MAX_FAILURES:
log.warning(
"Giving up on scheduled live probe for %s after %d consecutive failures: %s",
info.title, fails, exc,
)
info.status = 'error'
info.msg = str(exc)
if not info.error:
info.error = str(exc)
self._unregister_scheduled(url)
self.queue.delete(url)
self.done.put(download)
await self.notifier.completed(info)
else:
log.warning(
"Scheduled live probe failed for %s (attempt %d/%d), will retry: %s",
info.title, fails, _LIVE_PROBE_MAX_FAILURES, exc,
)
self._scheduled_probe_at[url] = time.time() + _LIVE_CHECK_INTERVAL
return
# Successful probe resets the transient-failure streak.
self._scheduled_probe_failures.pop(url, None)
release_ts = entry.get('release_timestamp')
live_status = entry.get('live_status')
if release_ts is not None:
info.live_release_timestamp = release_ts
if live_status is not None:
info.live_status = live_status
if live_status == 'is_upcoming':
self._scheduled_probe_at[url] = time.time() + self._probe_interval_seconds(release_ts)
await self.notifier.updated(info)
return
self._unregister_scheduled(url)
info.status = 'pending'
# Clear the "scheduled to start at ..." placeholder now that the stream
# is live and a real download is about to begin.
info.error = None
info.msg = None
await self.notifier.updated(info)
asyncio.create_task(self.__start_download(download))
def _schedule_upcoming_download(self, download: Download) -> None:
download.info.status = 'scheduled'
self.queue.put(download)
self._register_scheduled(download)
def _force_start_scheduled(self, download: Download) -> None:
self._unregister_scheduled(download.info.url)
download.info.status = 'pending'
download.info.error = None
download.info.msg = None
asyncio.create_task(self.__start_download(download))
async def __start_download(self, download):
if download.canceled:
log.info(f"Download {download.info.title} was canceled, skipping start.")
@@ -886,9 +1062,16 @@ class DownloadQueue:
log.info(f'playlist limit is set. Processing only first {playlist_item_limit} entries')
ytdl_options['playlistend'] = playlist_item_limit
download = Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, dl.quality, dl.format, ytdl_options, dl)
is_upcoming = (
getattr(dl, 'live_status', None) == 'is_upcoming'
or getattr(dl, 'status', None) == 'scheduled'
)
if auto_start is True:
self.queue.put(download)
asyncio.create_task(self.__start_download(download))
if is_upcoming:
self._schedule_upcoming_download(download)
else:
self.queue.put(download)
asyncio.create_task(self.__start_download(download))
else:
self.pending.put(download)
await self.notifier.added(dl)
@@ -1036,6 +1219,8 @@ class DownloadQueue:
ytdl_options_overrides=ytdl_options_overrides,
clip_start=clip_start,
clip_end=clip_end,
live_status=entry.get('live_status'),
live_release_timestamp=entry.get('release_timestamp'),
)
await self.__add_download(dl, auto_start)
return {'status': 'ok'}
@@ -1156,13 +1341,21 @@ class DownloadQueue:
async def start_pending(self, ids):
for id in ids:
if not self.pending.exists(id):
log.warning(f'requested start for non-existent download {id}')
if self.pending.exists(id):
dl = self.pending.get(id)
self.pending.delete(id)
if getattr(dl.info, 'live_status', None) == 'is_upcoming':
self._schedule_upcoming_download(dl)
else:
self.queue.put(dl)
asyncio.create_task(self.__start_download(dl))
continue
dl = self.pending.get(id)
self.queue.put(dl)
self.pending.delete(id)
asyncio.create_task(self.__start_download(dl))
if self.queue.exists(id):
dl = self.queue.get(id)
if dl.info.status == 'scheduled':
self._force_start_scheduled(dl)
continue
log.warning(f'requested start for non-existent download {id}')
return {'status': 'ok'}
async def cancel(self, ids):
@@ -1177,6 +1370,8 @@ class DownloadQueue:
log.warning(f'requested cancel for non-existent download {id}')
continue
dl = self.queue.get(id)
if dl.info.status == 'scheduled':
self._unregister_scheduled(id)
if dl.started():
dl.cancel()
else:
+19 -4
View File
@@ -706,16 +706,31 @@
</td>
<td title="{{ download.value.filename }}">
<div class="d-flex flex-column flex-sm-row align-items-center row-gap-2 column-gap-3">
<div>{{ download.value.title }} </div>
<ngb-progressbar height="1.5rem" [showValue]="download.value.status !== 'preparing'" [striped]="download.value.status === 'preparing'" [animated]="download.value.status === 'preparing'" type="success"
[value]="download.value.status === 'preparing' ? 100 : download.value.percent" class="download-progressbar" />
<div class="d-flex align-items-center flex-wrap gap-2">
<span>{{ download.value.title }}</span>
@if (download.value.live_status === 'is_live' && download.value.status !== 'scheduled') {
<span class="badge bg-danger">LIVE</span>
}
</div>
@if (download.value.status === 'scheduled') {
<span class="badge bg-warning text-dark">
<fa-icon [icon]="faClock" />
Waiting for stream
@if (liveCountdownSeconds(download.value); as secs) {
- starts in {{ secs | eta }}
}
</span>
} @else {
<ngb-progressbar height="1.5rem" [showValue]="download.value.status !== 'preparing'" [striped]="download.value.status === 'preparing'" [animated]="download.value.status === 'preparing'" type="success"
[value]="download.value.status === 'preparing' ? 100 : download.value.percent" class="download-progressbar" />
}
</div>
</td>
<td>{{ download.value.speed | speed }}</td>
<td>{{ download.value.eta | eta }}</td>
<td>
<div class="d-flex">
@if (download.value.status === 'pending') {
@if (download.value.status === 'pending' || download.value.status === 'scheduled') {
<button type="button" class="btn btn-link" [attr.aria-label]="'Start download for ' + download.value.title" (click)="downloadItemByKey(download.key)"><fa-icon [icon]="faDownload" /></button>
}
<button type="button" class="btn btn-link" [attr.aria-label]="'Remove ' + download.value.title + ' from queue'" (click)="delDownload('queue', download.key)"><fa-icon [icon]="faTrashAlt" /></button>
+31
View File
@@ -182,6 +182,37 @@ describe('App', () => {
expect(payload.ytdlOptionsOverrides).toBe('');
});
it('shows waiting badge for scheduled live stream', () => {
downloads.queue.set('https://example.com/live', {
id: 'live1',
title: 'Upcoming Stream',
url: 'https://example.com/live',
download_type: 'video',
quality: 'best',
format: 'any',
folder: '',
custom_name_prefix: '',
playlist_item_limit: 0,
status: 'scheduled',
live_status: 'is_upcoming',
live_release_timestamp: Date.now() / 1000 + 3600,
msg: '',
percent: 0,
speed: 0,
eta: 0,
filename: '',
checked: false,
});
downloads.queueChanged.next();
const fixture = TestBed.createComponent(App);
fixture.detectChanges();
const root = fixture.nativeElement as HTMLElement;
expect(root.textContent).toContain('Waiting for stream');
expect(root.textContent).toContain('starts in');
});
it('includes titleRegex in subscribe payload', () => {
const fixture = TestBed.createComponent(App);
const app = fixture.componentInstance;
+27 -1
View File
@@ -132,6 +132,7 @@ export class App implements AfterViewInit, OnInit, OnDestroy {
lastCopiedErrorId: string | null = null;
private previousDownloadType = 'video';
private addRequestSub?: Subscription;
private liveCountdownTimer?: ReturnType<typeof setInterval>;
private selectionsByType: Record<string, {
codec: string;
format: string;
@@ -285,6 +286,7 @@ export class App implements AfterViewInit, OnInit, OnDestroy {
// Subscribe to download updates
this.downloads.queueChanged.pipe(takeUntilDestroyed(this.destroyRef)).subscribe(() => {
this.updateMetrics();
this.syncLiveCountdownTimer();
this.cdr.markForCheck();
});
this.downloads.doneChanged.pipe(takeUntilDestroyed(this.destroyRef)).subscribe(() => {
@@ -295,6 +297,7 @@ export class App implements AfterViewInit, OnInit, OnDestroy {
// Subscribe to real-time updates
this.downloads.updated.pipe(takeUntilDestroyed(this.destroyRef)).subscribe(() => {
this.updateMetrics();
this.syncLiveCountdownTimer();
this.cdr.markForCheck();
});
@@ -337,6 +340,9 @@ export class App implements AfterViewInit, OnInit, OnDestroy {
ngOnDestroy() {
this.addRequestSub?.unsubscribe();
if (this.liveCountdownTimer) {
clearInterval(this.liveCountdownTimer);
}
this.colorSchemeMediaQuery.removeEventListener('change', this.onColorSchemeChanged);
}
@@ -1106,6 +1112,26 @@ export class App implements AfterViewInit, OnInit, OnDestroy {
this.downloads.startById([id]).subscribe();
}
liveCountdownSeconds(download: Download): number | null {
const ts = download.live_release_timestamp;
if (ts == null || download.status !== 'scheduled') {
return null;
}
return Math.max(0, ts - Date.now() / 1000);
}
private syncLiveCountdownTimer() {
const hasScheduled = Array.from(this.downloads.queue.values()).some(
(download) => download.status === 'scheduled',
);
if (hasScheduled && !this.liveCountdownTimer) {
this.liveCountdownTimer = setInterval(() => this.cdr.markForCheck(), 1000);
} else if (!hasScheduled && this.liveCountdownTimer) {
clearInterval(this.liveCountdownTimer);
this.liveCountdownTimer = undefined;
}
}
retryDownload(key: string, download: Download) {
this.addDownload({
url: download.url,
@@ -1631,7 +1657,7 @@ export class App implements AfterViewInit, OnInit, OnDestroy {
speed += download.speed || 0;
} else if (download.status === 'preparing') {
active++;
} else if (download.status === 'pending') {
} else if (download.status === 'pending' || download.status === 'scheduled') {
queued++;
}
});
+2
View File
@@ -18,6 +18,8 @@ export interface Download {
ytdl_options_overrides?: Record<string, unknown>;
clip_start?: number;
clip_end?: number;
live_status?: string;
live_release_timestamp?: number;
status: string;
msg: string;
percent: number;