mirror of
https://github.com/alexta69/metube.git
synced 2026-06-13 16:40:05 +00:00
630 lines
19 KiB
Python
630 lines
19 KiB
Python
"""Tests for ``DownloadQueue`` with mocked yt-dlp extraction."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import tempfile
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
import time
|
|
|
|
from ytdl import DownloadInfo, DownloadQueue
|
|
|
|
|
|
@pytest.fixture
|
|
def dq_env():
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
dl = os.path.join(tmp, "downloads")
|
|
st = os.path.join(tmp, "state")
|
|
os.makedirs(dl, exist_ok=True)
|
|
os.makedirs(st, exist_ok=True)
|
|
cfg = MagicMock()
|
|
cfg.STATE_DIR = st
|
|
cfg.DOWNLOAD_DIR = dl
|
|
cfg.AUDIO_DOWNLOAD_DIR = dl
|
|
cfg.TEMP_DIR = dl
|
|
cfg.MAX_CONCURRENT_DOWNLOADS = "3"
|
|
cfg.YTDL_OPTIONS = {}
|
|
cfg.YTDL_OPTIONS_PRESETS = {}
|
|
cfg.CUSTOM_DIRS = True
|
|
cfg.CREATE_CUSTOM_DIRS = True
|
|
cfg.CLEAR_COMPLETED_AFTER = "0"
|
|
cfg.DELETE_FILE_ON_TRASHCAN = False
|
|
cfg.OUTPUT_TEMPLATE = "%(title)s.%(ext)s"
|
|
cfg.OUTPUT_TEMPLATE_CHAPTER = "%(title)s.%(ext)s"
|
|
cfg.OUTPUT_TEMPLATE_PLAYLIST = ""
|
|
cfg.OUTPUT_TEMPLATE_CHANNEL = ""
|
|
yield cfg
|
|
|
|
|
|
def test_cancel_add_increments_generation(dq_env):
|
|
notifier = MagicMock()
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
before = dq._add_generation
|
|
dq.cancel_add()
|
|
assert dq._add_generation == before + 1
|
|
|
|
|
|
def test_get_returns_tuple_of_lists(dq_env):
|
|
notifier = MagicMock()
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
q, done = dq.get()
|
|
assert q == [] and done == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_single_video_goes_to_pending_when_auto_start_false(dq_env):
|
|
notifier = AsyncMock()
|
|
|
|
def fake_extract(self, url, ytdl_options_presets=None, ytdl_options_overrides=None):
|
|
return {
|
|
"_type": "video",
|
|
"id": "vid1",
|
|
"title": "Test Video",
|
|
"url": url,
|
|
"webpage_url": url,
|
|
}
|
|
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_extract):
|
|
result = await dq.add(
|
|
"https://example.com/watch?v=1",
|
|
"video",
|
|
"auto",
|
|
"any",
|
|
"best",
|
|
"",
|
|
"",
|
|
0,
|
|
auto_start=False,
|
|
)
|
|
assert result["status"] == "ok"
|
|
assert dq.pending.exists("https://example.com/watch?v=1")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancel_removes_from_pending(dq_env):
|
|
notifier = AsyncMock()
|
|
|
|
def fake_extract(self, url, ytdl_options_presets=None, ytdl_options_overrides=None):
|
|
return {
|
|
"_type": "video",
|
|
"id": "vid1",
|
|
"title": "Test Video",
|
|
"url": url,
|
|
"webpage_url": url,
|
|
}
|
|
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_extract):
|
|
await dq.add(
|
|
"https://example.com/pending",
|
|
"video",
|
|
"auto",
|
|
"any",
|
|
"best",
|
|
"",
|
|
"",
|
|
0,
|
|
auto_start=False,
|
|
)
|
|
url = "https://example.com/pending"
|
|
await dq.cancel([url])
|
|
assert not dq.pending.exists(url)
|
|
notifier.canceled.assert_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancel_before_start_marks_download_canceled(dq_env):
|
|
"""Regression test for the race condition where cancel() arrives after the
|
|
download has been placed in the queue and ``__start_download`` has been
|
|
scheduled via ``asyncio.create_task`` but has not yet executed. Without the
|
|
fix, the pending task would run ``download.start()`` despite the user
|
|
cancelling, because its ``download.canceled`` guard was never flipped."""
|
|
notifier = AsyncMock()
|
|
|
|
def fake_extract(self, url, ytdl_options_presets=None, ytdl_options_overrides=None):
|
|
return {
|
|
"_type": "video",
|
|
"id": "vid1",
|
|
"title": "Test Video",
|
|
"url": url,
|
|
"webpage_url": url,
|
|
}
|
|
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
url = "https://example.com/race"
|
|
start_mock = AsyncMock()
|
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_extract), \
|
|
patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
|
|
await dq.add(
|
|
url,
|
|
"video",
|
|
"auto",
|
|
"any",
|
|
"best",
|
|
"",
|
|
"",
|
|
0,
|
|
auto_start=True,
|
|
)
|
|
assert dq.queue.exists(url)
|
|
download = dq.queue.get(url)
|
|
assert download.canceled is False
|
|
await dq.cancel([url])
|
|
assert not dq.queue.exists(url)
|
|
assert download.canceled is True
|
|
notifier.canceled.assert_awaited_with(url)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_pending_moves_to_queue(dq_env):
|
|
notifier = AsyncMock()
|
|
|
|
def fake_extract(self, url, ytdl_options_presets=None, ytdl_options_overrides=None):
|
|
return {
|
|
"_type": "video",
|
|
"id": "vid1",
|
|
"title": "Test Video",
|
|
"url": url,
|
|
"webpage_url": url,
|
|
}
|
|
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_extract):
|
|
await dq.add(
|
|
"https://example.com/startme",
|
|
"video",
|
|
"auto",
|
|
"any",
|
|
"best",
|
|
"",
|
|
"",
|
|
0,
|
|
auto_start=False,
|
|
)
|
|
url = "https://example.com/startme"
|
|
# Starting will spawn real download — cancel immediately before worker runs much
|
|
with patch.object(DownloadQueue, "_DownloadQueue__start_download", AsyncMock()):
|
|
await dq.start_pending([url])
|
|
assert not dq.pending.exists(url)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_entry_queues_single_video_without_reextracting(dq_env):
|
|
notifier = AsyncMock()
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
entry = {
|
|
"_type": "video",
|
|
"id": "vid1",
|
|
"title": "Test Video",
|
|
"url": "https://example.com/watch?v=1",
|
|
"webpage_url": "https://example.com/watch?v=1",
|
|
"playlist_index": "01",
|
|
"playlist_title": "Playlist",
|
|
}
|
|
|
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", side_effect=AssertionError("should not re-extract")):
|
|
result = await dq.add_entry(
|
|
entry,
|
|
"video",
|
|
"auto",
|
|
"any",
|
|
"best",
|
|
"",
|
|
"",
|
|
0,
|
|
auto_start=False,
|
|
)
|
|
|
|
assert result["status"] == "ok"
|
|
assert dq.pending.exists("https://example.com/watch?v=1")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_merges_global_preset_and_override_options(dq_env):
|
|
notifier = AsyncMock()
|
|
dq_env.YTDL_OPTIONS = {"writesubtitles": False, "cookiefile": "/tmp/global.txt"}
|
|
dq_env.YTDL_OPTIONS_PRESETS = {
|
|
"Preset A": {"writesubtitles": True, "proxy": "http://preset-a"},
|
|
"Preset B": {"writesubtitles": False, "ratelimit": 1000},
|
|
}
|
|
|
|
def fake_extract(self, url, ytdl_options_presets=None, ytdl_options_overrides=None):
|
|
return {
|
|
"_type": "video",
|
|
"id": "vid2",
|
|
"title": "Preset Video",
|
|
"url": url,
|
|
"webpage_url": url,
|
|
}
|
|
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_extract):
|
|
result = await dq.add(
|
|
"https://example.com/preset",
|
|
"video",
|
|
"auto",
|
|
"any",
|
|
"best",
|
|
"",
|
|
"",
|
|
0,
|
|
auto_start=False,
|
|
ytdl_options_presets=["Preset A", "Preset B"],
|
|
ytdl_options_overrides={"proxy": "http://override", "embed_thumbnail": True},
|
|
)
|
|
|
|
assert result["status"] == "ok"
|
|
queued = dq.pending.get("https://example.com/preset")
|
|
assert queued.ytdl_opts["cookiefile"] == "/tmp/global.txt"
|
|
assert queued.ytdl_opts["writesubtitles"] is False
|
|
assert queued.ytdl_opts["ratelimit"] == 1000
|
|
assert queued.ytdl_opts["proxy"] == "http://override"
|
|
assert queued.ytdl_opts["embed_thumbnail"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_info_preset_null_download_archive_overrides_global(dq_env):
|
|
"""Preset download_archive:null must apply during extract_info (global archive otherwise wins first)."""
|
|
dq_env.YTDL_OPTIONS = {"download_archive": "/tmp/archive.txt"}
|
|
dq_env.YTDL_OPTIONS_PRESETS = {"NoArchive": {"download_archive": None}}
|
|
|
|
captured_params: list = []
|
|
|
|
class FakeYoutubeDL:
|
|
def __init__(self, params=None):
|
|
captured_params.append(params)
|
|
|
|
def extract_info(self, url, download=False):
|
|
return {
|
|
"_type": "video",
|
|
"id": "vid-archive",
|
|
"title": "Archive Test",
|
|
"url": url,
|
|
"webpage_url": url,
|
|
}
|
|
|
|
notifier = AsyncMock()
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
with patch("ytdl.yt_dlp.YoutubeDL", FakeYoutubeDL):
|
|
result = await dq.add(
|
|
"https://example.com/archive-test",
|
|
"video",
|
|
"auto",
|
|
"any",
|
|
"best",
|
|
"",
|
|
"",
|
|
0,
|
|
auto_start=False,
|
|
ytdl_options_presets=["NoArchive"],
|
|
)
|
|
|
|
assert result["status"] == "ok"
|
|
assert len(captured_params) == 1
|
|
extract_params = captured_params[0]
|
|
assert extract_params.get("download_archive") is None
|
|
assert extract_params["extract_flat"] is True
|
|
assert extract_params["noplaylist"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_info_metube_extract_keys_win_over_preset(dq_env):
|
|
"""MeTube's flat-extract settings must not be overridden by presets."""
|
|
dq_env.YTDL_OPTIONS = {}
|
|
dq_env.YTDL_OPTIONS_PRESETS = {
|
|
"TryOverride": {"extract_flat": False, "noplaylist": False},
|
|
}
|
|
|
|
captured_params: list = []
|
|
|
|
class FakeYoutubeDL:
|
|
def __init__(self, params=None):
|
|
captured_params.append(params)
|
|
|
|
def extract_info(self, url, download=False):
|
|
return {
|
|
"_type": "video",
|
|
"id": "vid-flat",
|
|
"title": "Flat Test",
|
|
"url": url,
|
|
"webpage_url": url,
|
|
}
|
|
|
|
notifier = AsyncMock()
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
with patch("ytdl.yt_dlp.YoutubeDL", FakeYoutubeDL):
|
|
result = await dq.add(
|
|
"https://example.com/flat-test",
|
|
"video",
|
|
"auto",
|
|
"any",
|
|
"best",
|
|
"",
|
|
"",
|
|
0,
|
|
auto_start=False,
|
|
ytdl_options_presets=["TryOverride"],
|
|
)
|
|
|
|
assert result["status"] == "ok"
|
|
assert captured_params[0]["extract_flat"] is True
|
|
assert captured_params[0]["noplaylist"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_sets_clip_bounds_on_download_info(dq_env):
|
|
notifier = AsyncMock()
|
|
|
|
def fake_extract(self, url, ytdl_options_presets=None, ytdl_options_overrides=None):
|
|
return {
|
|
"_type": "video",
|
|
"id": "vid1",
|
|
"title": "Test Video",
|
|
"url": url,
|
|
"webpage_url": url,
|
|
}
|
|
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_extract):
|
|
result = await dq.add(
|
|
"https://example.com/clip",
|
|
"video",
|
|
"auto",
|
|
"any",
|
|
"best",
|
|
"",
|
|
"",
|
|
0,
|
|
auto_start=False,
|
|
clip_start=10.0,
|
|
clip_end=99.5,
|
|
)
|
|
|
|
assert result["status"] == "ok"
|
|
download = dq.pending.get("https://example.com/clip")
|
|
assert download.info.clip_start == 10.0
|
|
assert download.info.clip_end == 99.5
|
|
|
|
|
|
def _upcoming_entry(url: str, *, release_timestamp: float | None = None) -> dict:
|
|
return {
|
|
"_type": "video",
|
|
"id": "live1",
|
|
"title": "Upcoming Stream",
|
|
"url": url,
|
|
"webpage_url": url,
|
|
"live_status": "is_upcoming",
|
|
"release_timestamp": release_timestamp if release_timestamp is not None else time.time() + 3600,
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_upcoming_stream_scheduled_without_starting(dq_env):
|
|
notifier = AsyncMock()
|
|
url = "https://example.com/live-upcoming"
|
|
start_mock = AsyncMock()
|
|
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
|
|
result = await dq.add_entry(
|
|
_upcoming_entry(url),
|
|
"video",
|
|
"auto",
|
|
"any",
|
|
"best",
|
|
"",
|
|
"",
|
|
0,
|
|
auto_start=True,
|
|
)
|
|
|
|
assert result["status"] == "ok"
|
|
assert dq.queue.exists(url)
|
|
download = dq.queue.get(url)
|
|
assert download.info.status == "scheduled"
|
|
assert download.info.live_status == "is_upcoming"
|
|
assert download.info.live_release_timestamp is not None
|
|
start_mock.assert_not_called()
|
|
assert url in dq._scheduled_probe_at
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_probe_scheduled_starts_when_live(dq_env):
|
|
notifier = AsyncMock()
|
|
url = "https://example.com/live-upcoming"
|
|
start_mock = AsyncMock()
|
|
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
|
|
await dq.add_entry(
|
|
_upcoming_entry(url),
|
|
"video",
|
|
"auto",
|
|
"any",
|
|
"best",
|
|
"",
|
|
"",
|
|
0,
|
|
auto_start=True,
|
|
)
|
|
|
|
download = dq.queue.get(url)
|
|
|
|
def fake_probe_extract(self, probe_url, ytdl_options_presets=None, ytdl_options_overrides=None):
|
|
assert probe_url == url
|
|
return {
|
|
"_type": "video",
|
|
"id": "live1",
|
|
"title": "Live Now",
|
|
"url": url,
|
|
"webpage_url": url,
|
|
"live_status": "is_live",
|
|
"formats": [{"format_id": "22"}],
|
|
}
|
|
|
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_probe_extract), \
|
|
patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
|
|
await dq._probe_scheduled_download(download)
|
|
|
|
assert url not in dq._scheduled_probe_at
|
|
assert download.info.live_status == "is_live"
|
|
assert download.info.status == "pending"
|
|
start_mock.assert_called_once_with(download)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_import_scheduled_re_registers_monitor(dq_env):
|
|
notifier = AsyncMock()
|
|
url = "https://example.com/live-restart"
|
|
release = time.time() + 7200
|
|
|
|
info = DownloadInfo(
|
|
id="live1",
|
|
title="Upcoming Stream",
|
|
url=url,
|
|
quality="best",
|
|
download_type="video",
|
|
codec="auto",
|
|
format="any",
|
|
folder="",
|
|
custom_name_prefix="",
|
|
error=None,
|
|
entry=None,
|
|
playlist_item_limit=0,
|
|
split_by_chapters=False,
|
|
chapter_template="",
|
|
live_status="is_upcoming",
|
|
live_release_timestamp=release,
|
|
)
|
|
info.status = "scheduled"
|
|
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
start_mock = AsyncMock()
|
|
with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
|
|
await dq._DownloadQueue__add_download(info, True)
|
|
|
|
assert dq.queue.exists(url)
|
|
assert dq.queue.get(url).info.status == "scheduled"
|
|
assert url in dq._scheduled_probe_at
|
|
start_mock.assert_not_called()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_probe_transient_error_retries_without_failing(dq_env):
|
|
"""A single probe failure must not abandon the scheduled stream."""
|
|
import ytdl
|
|
|
|
notifier = AsyncMock()
|
|
url = "https://example.com/live-transient"
|
|
start_mock = AsyncMock()
|
|
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
|
|
await dq.add_entry(
|
|
_upcoming_entry(url),
|
|
"video", "auto", "any", "best", "", "", 0,
|
|
auto_start=True,
|
|
)
|
|
download = dq.queue.get(url)
|
|
|
|
def boom(self, *args, **kwargs):
|
|
raise ytdl.yt_dlp.utils.YoutubeDLError("temporary network glitch")
|
|
|
|
before = time.time()
|
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", boom):
|
|
await dq._probe_scheduled_download(download)
|
|
|
|
# Still scheduled, still monitored, probe rescheduled into the future.
|
|
assert download.info.status == "scheduled"
|
|
assert url in dq._scheduled_probe_at
|
|
assert dq._scheduled_probe_at[url] >= before
|
|
assert dq._scheduled_probe_failures[url] == 1
|
|
notifier.completed.assert_not_called()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_probe_gives_up_after_max_failures(dq_env):
|
|
import ytdl
|
|
|
|
notifier = AsyncMock()
|
|
url = "https://example.com/live-dead"
|
|
start_mock = AsyncMock()
|
|
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
|
|
await dq.add_entry(
|
|
_upcoming_entry(url),
|
|
"video", "auto", "any", "best", "", "", 0,
|
|
auto_start=True,
|
|
)
|
|
download = dq.queue.get(url)
|
|
|
|
def boom(self, *args, **kwargs):
|
|
raise ytdl.yt_dlp.utils.YoutubeDLError("stream was deleted")
|
|
|
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", boom):
|
|
for _ in range(ytdl._LIVE_PROBE_MAX_FAILURES):
|
|
await dq._probe_scheduled_download(download)
|
|
|
|
assert url not in dq._scheduled_probe_at
|
|
assert not dq.queue.exists(url)
|
|
assert dq.done.exists(url)
|
|
assert download.info.status == "error"
|
|
notifier.completed.assert_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_probe_recovers_after_transient_then_starts(dq_env):
|
|
"""A transient failure followed by a successful live probe should start the download."""
|
|
import ytdl
|
|
|
|
notifier = AsyncMock()
|
|
url = "https://example.com/live-recover"
|
|
start_mock = AsyncMock()
|
|
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
with patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
|
|
await dq.add_entry(
|
|
_upcoming_entry(url),
|
|
"video", "auto", "any", "best", "", "", 0,
|
|
auto_start=True,
|
|
)
|
|
download = dq.queue.get(url)
|
|
# The scheduling placeholder error is set on add.
|
|
assert download.info.error
|
|
|
|
def boom(self, *args, **kwargs):
|
|
raise ytdl.yt_dlp.utils.YoutubeDLError("temporary glitch")
|
|
|
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", boom):
|
|
await dq._probe_scheduled_download(download)
|
|
assert dq._scheduled_probe_failures[url] == 1
|
|
|
|
def live_now(self, *args, **kwargs):
|
|
return {
|
|
"_type": "video", "id": "live1", "title": "Live Now",
|
|
"url": url, "webpage_url": url, "live_status": "is_live",
|
|
"formats": [{"format_id": "22"}],
|
|
}
|
|
|
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", live_now), \
|
|
patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
|
|
await dq._probe_scheduled_download(download)
|
|
|
|
assert url not in dq._scheduled_probe_at
|
|
assert url not in dq._scheduled_probe_failures
|
|
assert download.info.status == "pending"
|
|
# Placeholder error/msg cleared now that a real download is starting.
|
|
assert download.info.error is None
|
|
assert download.info.msg is None
|
|
start_mock.assert_called_once_with(download)
|
|
|
|
|
|
def test_seconds_until_next_probe_none_when_empty(dq_env):
|
|
notifier = AsyncMock()
|
|
dq = DownloadQueue(dq_env, notifier)
|
|
assert dq._seconds_until_next_probe() is None
|