mirror of
https://github.com/alexta69/metube.git
synced 2026-06-13 16:40:05 +00:00
implement tests
This commit is contained in:
+5
-5
@@ -623,14 +623,14 @@ def get_custom_dirs():
|
||||
return result
|
||||
|
||||
@routes.get(config.URL_PREFIX)
|
||||
def index(request):
|
||||
async def index(request):
|
||||
response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/browser/index.html'))
|
||||
if 'metube_theme' not in request.cookies:
|
||||
response.set_cookie('metube_theme', config.DEFAULT_THEME)
|
||||
return response
|
||||
|
||||
@routes.get(config.URL_PREFIX + 'robots.txt')
|
||||
def robots(request):
|
||||
async def robots(request):
|
||||
if config.ROBOTS_TXT:
|
||||
response = web.FileResponse(os.path.join(config.BASE_DIR, config.ROBOTS_TXT))
|
||||
else:
|
||||
@@ -640,7 +640,7 @@ def robots(request):
|
||||
return response
|
||||
|
||||
@routes.get(config.URL_PREFIX + 'version')
|
||||
def version(request):
|
||||
async def version(request):
|
||||
return web.json_response({
|
||||
"yt-dlp": yt_dlp_version,
|
||||
"version": os.getenv("METUBE_VERSION", "dev")
|
||||
@@ -648,11 +648,11 @@ def version(request):
|
||||
|
||||
if config.URL_PREFIX != '/':
|
||||
@routes.get('/')
|
||||
def index_redirect_root(request):
|
||||
async def index_redirect_root(request):
|
||||
return web.HTTPFound(config.URL_PREFIX)
|
||||
|
||||
@routes.get(config.URL_PREFIX[:-1])
|
||||
def index_redirect_dir(request):
|
||||
async def index_redirect_dir(request):
|
||||
return web.HTTPFound(config.URL_PREFIX)
|
||||
|
||||
routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE)
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
"""Pytest configuration: set env and filesystem layout before importing ``main``."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def _ensure_test_env() -> None:
|
||||
if os.environ.get("METUBE_TEST_ENV_READY"):
|
||||
return
|
||||
tmp = tempfile.mkdtemp(prefix="metube-pytest-")
|
||||
base = Path(tmp)
|
||||
browser = base / "ui" / "dist" / "metube" / "browser"
|
||||
browser.mkdir(parents=True)
|
||||
(browser / "index.html").write_text("<html><body></body></html>", encoding="utf-8")
|
||||
dl = base / "downloads"
|
||||
st = base / "state"
|
||||
dl.mkdir(parents=True)
|
||||
st.mkdir(parents=True)
|
||||
os.environ["DOWNLOAD_DIR"] = str(dl)
|
||||
os.environ["STATE_DIR"] = str(st)
|
||||
os.environ["TEMP_DIR"] = str(dl)
|
||||
os.environ["YTDL_OPTIONS"] = "{}"
|
||||
os.environ["YTDL_OPTIONS_FILE"] = ""
|
||||
os.environ["BASE_DIR"] = str(base)
|
||||
os.environ["LOGLEVEL"] = "INFO"
|
||||
os.environ["METUBE_TEST_ENV_READY"] = "1"
|
||||
|
||||
|
||||
_ensure_test_env()
|
||||
@@ -0,0 +1,207 @@
|
||||
"""HTTP handler tests for ``main`` using mocked ``web.Request`` (no TestServer)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
from aiohttp import web
|
||||
|
||||
import main
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_dqueue(monkeypatch):
|
||||
d = MagicMock()
|
||||
d.initialize = AsyncMock(return_value=None)
|
||||
d.add = AsyncMock(return_value={"status": "ok"})
|
||||
d.cancel = AsyncMock(return_value={"status": "ok"})
|
||||
d.start_pending = AsyncMock(return_value={"status": "ok"})
|
||||
d.cancel_add = MagicMock()
|
||||
d.queue = MagicMock()
|
||||
d.done = MagicMock()
|
||||
d.pending = MagicMock()
|
||||
d.queue.saved_items = MagicMock(return_value=[])
|
||||
d.done.saved_items = MagicMock(return_value=[])
|
||||
d.pending.saved_items = MagicMock(return_value=[])
|
||||
d.get = MagicMock(return_value=([], []))
|
||||
monkeypatch.setattr(main, "dqueue", d)
|
||||
return d
|
||||
|
||||
|
||||
def _valid_video_add_body(**kwargs):
|
||||
base = {
|
||||
"url": "https://example.com/watch?v=1",
|
||||
"download_type": "video",
|
||||
"codec": "auto",
|
||||
"format": "any",
|
||||
"quality": "best",
|
||||
}
|
||||
base.update(kwargs)
|
||||
return base
|
||||
|
||||
|
||||
def _json_request(body: dict | None):
|
||||
req = MagicMock(spec=web.Request)
|
||||
req.json = AsyncMock(return_value=body)
|
||||
return req
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_ok(mock_dqueue):
|
||||
req = _json_request(_valid_video_add_body())
|
||||
resp = await main.add(req)
|
||||
assert resp.status == 200
|
||||
text = resp.text
|
||||
data = json.loads(text)
|
||||
assert data["status"] == "ok"
|
||||
mock_dqueue.add.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_missing_url_returns_400(mock_dqueue):
|
||||
req = _json_request({"download_type": "video", "quality": "best", "format": "any"})
|
||||
with pytest.raises(web.HTTPBadRequest):
|
||||
await main.add(req)
|
||||
mock_dqueue.add.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_invalid_download_type(mock_dqueue):
|
||||
req = _json_request(_valid_video_add_body(download_type="invalid"))
|
||||
with pytest.raises(web.HTTPBadRequest):
|
||||
await main.add(req)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_invalid_video_quality(mock_dqueue):
|
||||
req = _json_request(_valid_video_add_body(quality="9999"))
|
||||
with pytest.raises(web.HTTPBadRequest):
|
||||
await main.add(req)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_invalid_subtitle_language(mock_dqueue):
|
||||
req = _json_request(
|
||||
{
|
||||
"url": "https://example.com/v",
|
||||
"download_type": "captions",
|
||||
"codec": "auto",
|
||||
"format": "srt",
|
||||
"quality": "best",
|
||||
"subtitle_language": "bad language!",
|
||||
}
|
||||
)
|
||||
with pytest.raises(web.HTTPBadRequest):
|
||||
await main.add(req)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_custom_name_prefix_path_traversal(mock_dqueue):
|
||||
req = _json_request(_valid_video_add_body(custom_name_prefix="../evil"))
|
||||
with pytest.raises(web.HTTPBadRequest):
|
||||
await main.add(req)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_chapter_template_path_traversal(mock_dqueue):
|
||||
req = _json_request(
|
||||
_valid_video_add_body(
|
||||
split_by_chapters=True,
|
||||
chapter_template="/etc/passwd%(title)s",
|
||||
)
|
||||
)
|
||||
with pytest.raises(web.HTTPBadRequest):
|
||||
await main.add(req)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_invalid_json_body(mock_dqueue):
|
||||
req = MagicMock(spec=web.Request)
|
||||
req.json = AsyncMock(side_effect=json.JSONDecodeError("msg", "", 0))
|
||||
with pytest.raises(web.HTTPBadRequest):
|
||||
await main.add(req)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_missing_ids(mock_dqueue):
|
||||
req = _json_request({"where": "queue"})
|
||||
with pytest.raises(web.HTTPBadRequest):
|
||||
await main.delete(req)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_queue_calls_cancel(mock_dqueue):
|
||||
req = _json_request({"where": "queue", "ids": ["http://x"]})
|
||||
resp = await main.delete(req)
|
||||
assert resp.status == 200
|
||||
mock_dqueue.cancel.assert_awaited_once_with(["http://x"])
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_pending(mock_dqueue):
|
||||
req = _json_request({"ids": ["a"]})
|
||||
resp = await main.start(req)
|
||||
assert resp.status == 200
|
||||
mock_dqueue.start_pending.assert_awaited_once_with(["a"])
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_history_shape(mock_dqueue):
|
||||
mock_dqueue.queue.saved_items.return_value = []
|
||||
mock_dqueue.done.saved_items.return_value = []
|
||||
mock_dqueue.pending.saved_items.return_value = []
|
||||
req = MagicMock(spec=web.Request)
|
||||
resp = await main.history(req)
|
||||
assert resp.status == 200
|
||||
data = json.loads(resp.text)
|
||||
assert set(data.keys()) == {"done", "queue", "pending"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_version_json(mock_dqueue):
|
||||
req = MagicMock(spec=web.Request)
|
||||
resp = await main.version(req)
|
||||
assert resp.status == 200
|
||||
body = json.loads(resp.text)
|
||||
assert "yt-dlp" in body and "version" in body
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cookie_status(mock_dqueue):
|
||||
req = MagicMock(spec=web.Request)
|
||||
resp = await main.cookie_status(req)
|
||||
assert resp.status == 200
|
||||
data = json.loads(resp.text)
|
||||
assert data.get("status") == "ok"
|
||||
assert "has_cookies" in data
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_options_add_cors(mock_dqueue):
|
||||
req = MagicMock(spec=web.Request)
|
||||
resp = await main.add_cors(req)
|
||||
assert resp.status == 200
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_upload_cookies_missing_field(mock_dqueue):
|
||||
req = MagicMock(spec=web.Request)
|
||||
reader = MagicMock()
|
||||
field = MagicMock()
|
||||
field.name = "wrongname"
|
||||
reader.next = AsyncMock(side_effect=[field, None])
|
||||
req.multipart = AsyncMock(return_value=reader)
|
||||
resp = await main.upload_cookies(req)
|
||||
assert resp.status == 400
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_legacy_format_migrated(mock_dqueue):
|
||||
req = _json_request({"url": "https://example.com/v", "format": "m4a", "quality": "best"})
|
||||
resp = await main.add(req)
|
||||
assert resp.status == 200
|
||||
call = mock_dqueue.add.await_args
|
||||
assert call is not None
|
||||
assert call.args[1] == "audio"
|
||||
@@ -0,0 +1,78 @@
|
||||
"""Tests for ``Config`` (env parsing, yt-dlp options, frontend_safe)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from main import Config
|
||||
|
||||
|
||||
def _base_env(**overrides: str) -> dict[str, str]:
|
||||
env = {k: str(v) for k, v in Config._DEFAULTS.items()}
|
||||
env.update(overrides)
|
||||
return env
|
||||
|
||||
|
||||
class ConfigTests(unittest.TestCase):
|
||||
def test_url_prefix_gets_trailing_slash(self):
|
||||
with patch.dict(os.environ, _base_env(URL_PREFIX="foo"), clear=False):
|
||||
c = Config()
|
||||
self.assertEqual(c.URL_PREFIX, "foo/")
|
||||
|
||||
def test_ytdl_options_json_loaded(self):
|
||||
opts = {"quiet": True, "no_warnings": True}
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
_base_env(YTDL_OPTIONS=json.dumps(opts)),
|
||||
clear=False,
|
||||
):
|
||||
c = Config()
|
||||
self.assertEqual(c.YTDL_OPTIONS["quiet"], True)
|
||||
|
||||
def test_invalid_ytdl_options_exits(self):
|
||||
with patch.dict(os.environ, _base_env(YTDL_OPTIONS="not-json"), clear=False):
|
||||
with self.assertRaises(SystemExit):
|
||||
Config()
|
||||
|
||||
def test_invalid_boolean_env_exits(self):
|
||||
with patch.dict(os.environ, _base_env(CUSTOM_DIRS="maybe"), clear=False):
|
||||
with self.assertRaises(SystemExit):
|
||||
Config()
|
||||
|
||||
def test_frontend_safe_excludes_secrets(self):
|
||||
with patch.dict(os.environ, _base_env(), clear=False):
|
||||
c = Config()
|
||||
safe = c.frontend_safe()
|
||||
self.assertNotIn("YTDL_OPTIONS", safe)
|
||||
self.assertNotIn("HOST", safe)
|
||||
|
||||
def test_runtime_override_roundtrip(self):
|
||||
with patch.dict(os.environ, _base_env(), clear=False):
|
||||
c = Config()
|
||||
c.set_runtime_override("cookiefile", "/tmp/c.txt")
|
||||
self.assertEqual(c.YTDL_OPTIONS.get("cookiefile"), "/tmp/c.txt")
|
||||
c.remove_runtime_override("cookiefile")
|
||||
self.assertIsNone(c.YTDL_OPTIONS.get("cookiefile"))
|
||||
|
||||
def test_ytdl_options_file_merges(self):
|
||||
with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
|
||||
json.dump({"extractor_args": {"youtube": {"player_client": ["web"]}}}, f)
|
||||
path = f.name
|
||||
try:
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
_base_env(YTDL_OPTIONS="{}", YTDL_OPTIONS_FILE=path),
|
||||
clear=False,
|
||||
):
|
||||
c = Config()
|
||||
self.assertIn("extractor_args", c.YTDL_OPTIONS)
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,6 +1,16 @@
|
||||
"""Tests for ``app.dl_formats`` format selectors and yt-dlp option mapping."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
import unittest
|
||||
|
||||
from app.dl_formats import get_format, get_opts
|
||||
from app.dl_formats import (
|
||||
_normalize_caption_mode,
|
||||
_normalize_subtitle_language,
|
||||
get_format,
|
||||
get_opts,
|
||||
)
|
||||
|
||||
|
||||
class DlFormatsTests(unittest.TestCase):
|
||||
@@ -16,6 +26,114 @@ class DlFormatsTests(unittest.TestCase):
|
||||
opts = get_opts("audio", "auto", "mp3", "best", {})
|
||||
self.assertTrue(opts.get("writethumbnail"))
|
||||
|
||||
def test_custom_format_passthrough(self):
|
||||
self.assertEqual(get_format("video", "auto", "custom:bestvideo+bestaudio", "best"), "bestvideo+bestaudio")
|
||||
|
||||
def test_thumbnail_and_captions_format_strings(self):
|
||||
self.assertEqual(get_format("thumbnail", "auto", "jpg", "best"), "bestaudio/best")
|
||||
self.assertEqual(get_format("captions", "auto", "srt", "best"), "bestaudio/best")
|
||||
|
||||
def test_audio_formats(self):
|
||||
for fmt in ("m4a", "mp3", "opus", "wav", "flac"):
|
||||
with self.subTest(fmt=fmt):
|
||||
self.assertIn(f"ext={fmt}", get_format("audio", "auto", fmt, "best"))
|
||||
|
||||
def test_video_unknown_format_raises(self):
|
||||
with self.assertRaises(ValueError):
|
||||
get_format("video", "auto", "mkv", "best")
|
||||
|
||||
def test_unknown_download_type_raises(self):
|
||||
with self.assertRaises(ValueError):
|
||||
get_format("unknown", "auto", "any", "best")
|
||||
|
||||
def test_video_any_mp4_ios_with_height_quality(self):
|
||||
self.assertIn("height<=1080", get_format("video", "auto", "any", "1080"))
|
||||
self.assertNotIn("height<=", get_format("video", "auto", "any", "best"))
|
||||
self.assertNotIn("height<=", get_format("video", "auto", "any", "worst"))
|
||||
|
||||
def test_video_codec_filters(self):
|
||||
self.assertIn("h264", get_format("video", "h264", "any", "best"))
|
||||
self.assertIn("hevc", get_format("video", "h265", "any", "best"))
|
||||
self.assertIn("av0?1", get_format("video", "av1", "any", "best"))
|
||||
self.assertIn("vp0?9", get_format("video", "vp9", "any", "best"))
|
||||
|
||||
def test_video_mp4_includes_m4a_audio(self):
|
||||
s = get_format("video", "auto", "mp4", "720")
|
||||
self.assertIn("[ext=m4a]", s)
|
||||
|
||||
def test_video_ios_selector_contains_avc_pattern(self):
|
||||
s = get_format("video", "auto", "ios", "best")
|
||||
self.assertIn("h26[45]", s)
|
||||
|
||||
def test_get_opts_deepcopy_does_not_mutate_input(self):
|
||||
base = {"postprocessors": [{"key": "Existing"}]}
|
||||
orig = copy.deepcopy(base)
|
||||
get_opts("audio", "auto", "mp3", "best", base)
|
||||
self.assertEqual(base, orig)
|
||||
|
||||
def test_get_opts_audio_m4a_postprocessors(self):
|
||||
opts = get_opts("audio", "auto", "m4a", "best", {})
|
||||
keys = [p["key"] for p in opts["postprocessors"]]
|
||||
self.assertIn("FFmpegExtractAudio", keys)
|
||||
|
||||
def test_get_opts_audio_mp3_quality_not_best(self):
|
||||
opts = get_opts("audio", "auto", "mp3", "192", {})
|
||||
ext = next(p for p in opts["postprocessors"] if p["key"] == "FFmpegExtractAudio")
|
||||
self.assertEqual(ext["preferredquality"], "192")
|
||||
|
||||
def test_get_opts_thumbnail_skip_download(self):
|
||||
opts = get_opts("thumbnail", "auto", "jpg", "best", {})
|
||||
self.assertTrue(opts.get("skip_download"))
|
||||
self.assertTrue(opts.get("writethumbnail"))
|
||||
|
||||
def test_get_opts_captions_manual_only(self):
|
||||
opts = get_opts(
|
||||
"captions", "auto", "vtt", "best", {}, subtitle_language="fr", subtitle_mode="manual_only"
|
||||
)
|
||||
self.assertTrue(opts.get("writesubtitles"))
|
||||
self.assertFalse(opts.get("writeautomaticsub"))
|
||||
self.assertEqual(opts["subtitleslangs"], ["fr"])
|
||||
|
||||
def test_get_opts_captions_auto_only(self):
|
||||
opts = get_opts(
|
||||
"captions", "auto", "srt", "best", {}, subtitle_language="de", subtitle_mode="auto_only"
|
||||
)
|
||||
self.assertFalse(opts.get("writesubtitles"))
|
||||
self.assertTrue(opts.get("writeautomaticsub"))
|
||||
self.assertEqual(opts["subtitleslangs"], ["de-orig", "de"])
|
||||
|
||||
def test_get_opts_captions_prefer_auto(self):
|
||||
opts = get_opts(
|
||||
"captions", "auto", "srt", "best", {}, subtitle_language="es", subtitle_mode="prefer_auto"
|
||||
)
|
||||
self.assertTrue(opts.get("writesubtitles"))
|
||||
self.assertTrue(opts.get("writeautomaticsub"))
|
||||
self.assertEqual(opts["subtitleslangs"], ["es-orig", "es"])
|
||||
|
||||
def test_get_opts_captions_prefer_manual_default_branch(self):
|
||||
opts = get_opts(
|
||||
"captions", "auto", "srt", "best", {}, subtitle_language="it", subtitle_mode="prefer_manual"
|
||||
)
|
||||
self.assertEqual(opts["subtitleslangs"], ["it", "it-orig"])
|
||||
|
||||
def test_get_opts_captions_txt_maps_to_srt_format(self):
|
||||
opts = get_opts("captions", "auto", "txt", "best", {})
|
||||
self.assertEqual(opts["subtitlesformat"], "srt")
|
||||
|
||||
def test_get_opts_merges_existing_postprocessors(self):
|
||||
opts = get_opts("audio", "auto", "opus", "best", {"postprocessors": [{"key": "SponsorBlock"}]})
|
||||
keys = [p["key"] for p in opts["postprocessors"]]
|
||||
self.assertIn("SponsorBlock", keys)
|
||||
self.assertIn("FFmpegExtractAudio", keys)
|
||||
|
||||
def test_normalize_caption_mode_invalid_defaults(self):
|
||||
self.assertEqual(_normalize_caption_mode(""), "prefer_manual")
|
||||
self.assertEqual(_normalize_caption_mode("not_a_mode"), "prefer_manual")
|
||||
|
||||
def test_normalize_subtitle_language_empty_defaults_en(self):
|
||||
self.assertEqual(_normalize_subtitle_language(""), "en")
|
||||
self.assertEqual(_normalize_subtitle_language(" "), "en")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -0,0 +1,146 @@
|
||||
"""Tests for ``DownloadQueue`` with mocked yt-dlp extraction."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from ytdl import DownloadQueue
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dq_env():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
dl = os.path.join(tmp, "downloads")
|
||||
st = os.path.join(tmp, "state")
|
||||
os.makedirs(dl, exist_ok=True)
|
||||
os.makedirs(st, exist_ok=True)
|
||||
cfg = MagicMock()
|
||||
cfg.STATE_DIR = st
|
||||
cfg.DOWNLOAD_DIR = dl
|
||||
cfg.AUDIO_DOWNLOAD_DIR = dl
|
||||
cfg.TEMP_DIR = dl
|
||||
cfg.MAX_CONCURRENT_DOWNLOADS = "3"
|
||||
cfg.YTDL_OPTIONS = {}
|
||||
cfg.CUSTOM_DIRS = True
|
||||
cfg.CREATE_CUSTOM_DIRS = True
|
||||
cfg.CLEAR_COMPLETED_AFTER = "0"
|
||||
cfg.DELETE_FILE_ON_TRASHCAN = False
|
||||
cfg.OUTPUT_TEMPLATE = "%(title)s.%(ext)s"
|
||||
cfg.OUTPUT_TEMPLATE_CHAPTER = "%(title)s.%(ext)s"
|
||||
cfg.OUTPUT_TEMPLATE_PLAYLIST = ""
|
||||
cfg.OUTPUT_TEMPLATE_CHANNEL = ""
|
||||
yield cfg
|
||||
|
||||
|
||||
def test_cancel_add_increments_generation(dq_env):
|
||||
notifier = MagicMock()
|
||||
dq = DownloadQueue(dq_env, notifier)
|
||||
before = dq._add_generation
|
||||
dq.cancel_add()
|
||||
assert dq._add_generation == before + 1
|
||||
|
||||
|
||||
def test_get_returns_tuple_of_lists(dq_env):
|
||||
notifier = MagicMock()
|
||||
dq = DownloadQueue(dq_env, notifier)
|
||||
q, done = dq.get()
|
||||
assert q == [] and done == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_single_video_goes_to_pending_when_auto_start_false(dq_env):
|
||||
notifier = AsyncMock()
|
||||
|
||||
def fake_extract(self, url):
|
||||
return {
|
||||
"_type": "video",
|
||||
"id": "vid1",
|
||||
"title": "Test Video",
|
||||
"url": url,
|
||||
"webpage_url": url,
|
||||
}
|
||||
|
||||
dq = DownloadQueue(dq_env, notifier)
|
||||
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_extract):
|
||||
result = await dq.add(
|
||||
"https://example.com/watch?v=1",
|
||||
"video",
|
||||
"auto",
|
||||
"any",
|
||||
"best",
|
||||
"",
|
||||
"",
|
||||
0,
|
||||
auto_start=False,
|
||||
)
|
||||
assert result["status"] == "ok"
|
||||
assert dq.pending.exists("https://example.com/watch?v=1")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancel_removes_from_pending(dq_env):
|
||||
notifier = AsyncMock()
|
||||
|
||||
def fake_extract(self, url):
|
||||
return {
|
||||
"_type": "video",
|
||||
"id": "vid1",
|
||||
"title": "Test Video",
|
||||
"url": url,
|
||||
"webpage_url": url,
|
||||
}
|
||||
|
||||
dq = DownloadQueue(dq_env, notifier)
|
||||
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_extract):
|
||||
await dq.add(
|
||||
"https://example.com/pending",
|
||||
"video",
|
||||
"auto",
|
||||
"any",
|
||||
"best",
|
||||
"",
|
||||
"",
|
||||
0,
|
||||
auto_start=False,
|
||||
)
|
||||
url = "https://example.com/pending"
|
||||
await dq.cancel([url])
|
||||
assert not dq.pending.exists(url)
|
||||
notifier.canceled.assert_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_pending_moves_to_queue(dq_env):
|
||||
notifier = AsyncMock()
|
||||
|
||||
def fake_extract(self, url):
|
||||
return {
|
||||
"_type": "video",
|
||||
"id": "vid1",
|
||||
"title": "Test Video",
|
||||
"url": url,
|
||||
"webpage_url": url,
|
||||
}
|
||||
|
||||
dq = DownloadQueue(dq_env, notifier)
|
||||
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_extract):
|
||||
await dq.add(
|
||||
"https://example.com/startme",
|
||||
"video",
|
||||
"auto",
|
||||
"any",
|
||||
"best",
|
||||
"",
|
||||
"",
|
||||
0,
|
||||
auto_start=False,
|
||||
)
|
||||
url = "https://example.com/startme"
|
||||
# Starting will spawn real download — cancel immediately before worker runs much
|
||||
with patch.object(DownloadQueue, "_DownloadQueue__start_download", AsyncMock()):
|
||||
await dq.start_pending([url])
|
||||
assert not dq.pending.exists(url)
|
||||
@@ -0,0 +1,105 @@
|
||||
"""Tests for pure helpers in ``main`` (legacy API migration, logging, JSON serializer)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import unittest
|
||||
|
||||
import main
|
||||
|
||||
|
||||
class MigrateLegacyRequestTests(unittest.TestCase):
|
||||
def test_already_new_schema_unchanged(self):
|
||||
post = {"download_type": "video", "codec": "h264", "format": "mp4", "quality": "1080"}
|
||||
before = post.copy()
|
||||
self.assertIs(main._migrate_legacy_request(post), post)
|
||||
self.assertEqual(post, before)
|
||||
|
||||
def test_legacy_audio_m4a(self):
|
||||
post = {"format": "m4a", "quality": "best"}
|
||||
main._migrate_legacy_request(post)
|
||||
self.assertEqual(post["download_type"], "audio")
|
||||
self.assertEqual(post["codec"], "auto")
|
||||
self.assertEqual(post["format"], "m4a")
|
||||
|
||||
def test_legacy_thumbnail(self):
|
||||
post = {"format": "thumbnail", "quality": "best"}
|
||||
main._migrate_legacy_request(post)
|
||||
self.assertEqual(post["download_type"], "thumbnail")
|
||||
self.assertEqual(post["format"], "jpg")
|
||||
self.assertEqual(post["quality"], "best")
|
||||
|
||||
def test_legacy_captions_with_subtitle_format(self):
|
||||
post = {"format": "captions", "subtitle_format": "vtt", "quality": "best"}
|
||||
main._migrate_legacy_request(post)
|
||||
self.assertEqual(post["download_type"], "captions")
|
||||
self.assertEqual(post["format"], "vtt")
|
||||
|
||||
def test_legacy_video_best_ios(self):
|
||||
post = {"format": "any", "quality": "best_ios", "video_codec": "auto"}
|
||||
main._migrate_legacy_request(post)
|
||||
self.assertEqual(post["download_type"], "video")
|
||||
self.assertEqual(post["format"], "ios")
|
||||
self.assertEqual(post["quality"], "best")
|
||||
|
||||
def test_legacy_video_quality_audio_maps_to_m4a(self):
|
||||
post = {"format": "mp4", "quality": "audio", "video_codec": "h264"}
|
||||
main._migrate_legacy_request(post)
|
||||
self.assertEqual(post["download_type"], "audio")
|
||||
self.assertEqual(post["format"], "m4a")
|
||||
self.assertEqual(post["quality"], "best")
|
||||
|
||||
def test_legacy_video_default(self):
|
||||
post = {"format": "mp4", "quality": "1080", "video_codec": "h265"}
|
||||
main._migrate_legacy_request(post)
|
||||
self.assertEqual(post["download_type"], "video")
|
||||
self.assertEqual(post["codec"], "h265")
|
||||
self.assertEqual(post["format"], "mp4")
|
||||
self.assertEqual(post["quality"], "1080")
|
||||
|
||||
|
||||
class ParseLogLevelTests(unittest.TestCase):
|
||||
def test_valid_levels(self):
|
||||
self.assertEqual(main.parseLogLevel("INFO"), logging.INFO)
|
||||
self.assertEqual(main.parseLogLevel("debug"), logging.DEBUG)
|
||||
|
||||
def test_invalid_returns_none(self):
|
||||
self.assertIsNone(main.parseLogLevel("not_a_level"))
|
||||
self.assertIsNone(main.parseLogLevel(123))
|
||||
|
||||
|
||||
class ObjectSerializerTests(unittest.TestCase):
|
||||
def test_dict_like_object(self):
|
||||
class Obj:
|
||||
def __init__(self):
|
||||
self.a = 1
|
||||
|
||||
ser = main.ObjectSerializer()
|
||||
self.assertEqual(json.loads(ser.encode(Obj())), {"a": 1})
|
||||
|
||||
def test_generator_becomes_list(self):
|
||||
ser = main.ObjectSerializer()
|
||||
|
||||
def gen():
|
||||
yield 1
|
||||
yield 2
|
||||
|
||||
self.assertEqual(json.loads(ser.encode(gen())), [1, 2])
|
||||
|
||||
def test_string_not_split_to_chars(self):
|
||||
ser = main.ObjectSerializer()
|
||||
self.assertEqual(json.loads(ser.encode("hello")), "hello")
|
||||
|
||||
|
||||
class FrontendSafeTests(unittest.TestCase):
|
||||
def test_only_expected_keys(self):
|
||||
safe = main.config.frontend_safe()
|
||||
for key in main.Config._FRONTEND_KEYS:
|
||||
self.assertIn(key, safe)
|
||||
self.assertNotIn("YTDL_OPTIONS", safe)
|
||||
self.assertNotIn("DOWNLOAD_DIR", safe)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,76 @@
|
||||
"""Integration tests for ``PersistentQueue`` (shelve-backed storage)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
from ytdl import DownloadInfo, PersistentQueue
|
||||
|
||||
|
||||
class _FakeDownload:
|
||||
__slots__ = ("info",)
|
||||
|
||||
def __init__(self, info: DownloadInfo):
|
||||
self.info = info
|
||||
|
||||
|
||||
def _make_info(url: str = "https://example.com/v") -> DownloadInfo:
|
||||
return DownloadInfo(
|
||||
id="id1",
|
||||
title="Title",
|
||||
url=url,
|
||||
quality="best",
|
||||
download_type="video",
|
||||
codec="auto",
|
||||
format="any",
|
||||
folder="",
|
||||
custom_name_prefix="",
|
||||
error=None,
|
||||
entry=None,
|
||||
playlist_item_limit=0,
|
||||
split_by_chapters=False,
|
||||
chapter_template="",
|
||||
)
|
||||
|
||||
|
||||
class PersistentQueueTests(unittest.TestCase):
|
||||
def test_put_get_delete_roundtrip(self):
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
path = os.path.join(tmp, "queue")
|
||||
pq = PersistentQueue("queue", path)
|
||||
dl = _FakeDownload(_make_info("http://a.example"))
|
||||
pq.put(dl)
|
||||
self.assertTrue(pq.exists("http://a.example"))
|
||||
self.assertFalse(pq.empty())
|
||||
got = pq.get("http://a.example")
|
||||
self.assertEqual(got.info.url, "http://a.example")
|
||||
pq.delete("http://a.example")
|
||||
self.assertFalse(pq.exists("http://a.example"))
|
||||
|
||||
def test_saved_items_sorted_by_timestamp(self):
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
path = os.path.join(tmp, "queue")
|
||||
pq = PersistentQueue("queue", path)
|
||||
a = _FakeDownload(_make_info("http://first.example"))
|
||||
b = _FakeDownload(_make_info("http://second.example"))
|
||||
a.info.timestamp = 100
|
||||
b.info.timestamp = 200
|
||||
pq.put(a)
|
||||
pq.put(b)
|
||||
keys = [k for k, _ in pq.saved_items()]
|
||||
self.assertEqual(keys, ["http://first.example", "http://second.example"])
|
||||
|
||||
def test_load_restores_from_shelve(self):
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
path = os.path.join(tmp, "queue")
|
||||
pq1 = PersistentQueue("queue", path)
|
||||
pq1.put(_FakeDownload(_make_info("http://load.example")))
|
||||
pq2 = PersistentQueue("queue", path)
|
||||
pq2.load()
|
||||
self.assertTrue(pq2.exists("http://load.example"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,146 @@
|
||||
"""Tests for pure helpers and migration logic in ``ytdl``."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from ytdl import (
|
||||
DownloadInfo,
|
||||
_convert_generators_to_lists,
|
||||
_convert_srt_to_txt_file,
|
||||
_outtmpl_substitute_field,
|
||||
_sanitize_path_component,
|
||||
)
|
||||
|
||||
|
||||
class SanitizePathComponentTests(unittest.TestCase):
|
||||
def test_replaces_windows_invalid_chars(self):
|
||||
self.assertEqual(_sanitize_path_component('a:b*c?d"e<f>g|h'), "a_b_c_d_e_f_g_h")
|
||||
|
||||
def test_non_string_passthrough(self):
|
||||
self.assertIs(_sanitize_path_component(None), None)
|
||||
self.assertEqual(_sanitize_path_component(42), 42)
|
||||
|
||||
|
||||
class OuttmplSubstituteFieldTests(unittest.TestCase):
|
||||
def test_simple_substitution(self):
|
||||
self.assertEqual(_outtmpl_substitute_field("%(title)s", "title", "Hello"), "Hello")
|
||||
|
||||
def test_format_spec_int(self):
|
||||
self.assertEqual(_outtmpl_substitute_field("%(idx)02d", "idx", 3), "03")
|
||||
|
||||
def test_missing_field_unchanged(self):
|
||||
self.assertEqual(_outtmpl_substitute_field("%(other)s", "title", "x"), "%(other)s")
|
||||
|
||||
|
||||
class ConvertGeneratorsToListsTests(unittest.TestCase):
|
||||
def test_nested(self):
|
||||
def g():
|
||||
yield 1
|
||||
|
||||
obj = {"a": g(), "b": [g()]}
|
||||
out = _convert_generators_to_lists(obj)
|
||||
self.assertEqual(out, {"a": [1], "b": [[1]]})
|
||||
|
||||
def test_plain(self):
|
||||
self.assertEqual(_convert_generators_to_lists(5), 5)
|
||||
|
||||
|
||||
class ConvertSrtToTxtTests(unittest.TestCase):
|
||||
def test_basic_conversion(self):
|
||||
srt = """1
|
||||
00:00:01,000 --> 00:00:02,000
|
||||
Hello <b>world</b>
|
||||
|
||||
2
|
||||
00:00:03,000 --> 00:00:04,000
|
||||
Second line
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
path = Path(tmp) / "sub.srt"
|
||||
path.write_text(srt, encoding="utf-8")
|
||||
txt_path = _convert_srt_to_txt_file(str(path))
|
||||
self.assertIsNotNone(txt_path)
|
||||
self.assertTrue(txt_path.endswith(".txt"))
|
||||
content = Path(txt_path).read_text(encoding="utf-8")
|
||||
self.assertIn("Hello world", content)
|
||||
self.assertIn("Second line", content)
|
||||
|
||||
|
||||
class DownloadInfoSetstateTests(unittest.TestCase):
|
||||
def _base_state(self, **kwargs):
|
||||
base = {
|
||||
"id": "id1",
|
||||
"title": "t",
|
||||
"url": "http://example.com/v",
|
||||
"folder": "",
|
||||
"custom_name_prefix": "",
|
||||
"error": None,
|
||||
"entry": None,
|
||||
"playlist_item_limit": 0,
|
||||
"split_by_chapters": False,
|
||||
"chapter_template": "",
|
||||
"msg": None,
|
||||
"percent": None,
|
||||
"speed": None,
|
||||
"eta": None,
|
||||
"status": "pending",
|
||||
"size": None,
|
||||
"timestamp": 0,
|
||||
}
|
||||
base.update(kwargs)
|
||||
return base
|
||||
|
||||
def test_migrates_old_audio_format(self):
|
||||
state = self._base_state(format="m4a", quality="best")
|
||||
di = DownloadInfo.__new__(DownloadInfo)
|
||||
di.__setstate__(state)
|
||||
self.assertEqual(di.download_type, "audio")
|
||||
self.assertEqual(di.codec, "auto")
|
||||
|
||||
def test_migrates_thumbnail(self):
|
||||
state = self._base_state(format="thumbnail", quality="best")
|
||||
di = DownloadInfo.__new__(DownloadInfo)
|
||||
di.__setstate__(state)
|
||||
self.assertEqual(di.download_type, "thumbnail")
|
||||
self.assertEqual(di.format, "jpg")
|
||||
|
||||
def test_migrates_captions(self):
|
||||
state = self._base_state(format="captions", subtitle_format="vtt", quality="best")
|
||||
di = DownloadInfo.__new__(DownloadInfo)
|
||||
di.__setstate__(state)
|
||||
self.assertEqual(di.download_type, "captions")
|
||||
self.assertEqual(di.format, "vtt")
|
||||
|
||||
def test_migrates_best_ios(self):
|
||||
state = self._base_state(
|
||||
format="any", quality="best_ios", video_codec="auto"
|
||||
)
|
||||
di = DownloadInfo.__new__(DownloadInfo)
|
||||
di.__setstate__(state)
|
||||
self.assertEqual(di.format, "ios")
|
||||
self.assertEqual(di.quality, "best")
|
||||
|
||||
def test_migrates_quality_audio(self):
|
||||
state = self._base_state(format="mp4", quality="audio", video_codec="h264")
|
||||
di = DownloadInfo.__new__(DownloadInfo)
|
||||
di.__setstate__(state)
|
||||
self.assertEqual(di.download_type, "audio")
|
||||
self.assertEqual(di.format, "m4a")
|
||||
|
||||
def test_new_state_has_subtitle_files(self):
|
||||
state = self._base_state(
|
||||
download_type="video",
|
||||
codec="auto",
|
||||
format="any",
|
||||
quality="best",
|
||||
)
|
||||
di = DownloadInfo.__new__(DownloadInfo)
|
||||
di.__setstate__(state)
|
||||
self.assertEqual(di.subtitle_files, [])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user