Files
metube/app/tests/test_ytdl_utils.py
copilot-swe-agent[bot] 981e6c1003 Propagate missing playlist context fields (playlist_count, playlist_autonumber, n_entries, __last_playlist_index)
The playlist/channel processing loop now sets playlist_count,
playlist_autonumber, n_entries, and __last_playlist_index on each
video entry so that templates like %(playlist_autonumber)s,
%(playlist_count)s, and %(playlist_index&{} - |)s resolve correctly
instead of showing NA.

Also updates _compact_persisted_entry to preserve n_entries and
__last_playlist_index across restarts.

Fixes #692

Agent-Logs-Url: https://github.com/alexta69/metube/sessions/b5aeb55a-3197-4a14-b8b4-96c9a67796e8

Co-authored-by: alexta69 <7450369+alexta69@users.noreply.github.com>
2026-04-01 19:59:32 +00:00

314 lines
11 KiB
Python

"""Tests for pure helpers and migration logic in ``ytdl``."""
from __future__ import annotations
import pickle
import sys
import tempfile
import threading
import types
import unittest
from pathlib import Path
fake_yt_dlp = types.ModuleType("yt_dlp")
fake_networking = types.ModuleType("yt_dlp.networking")
fake_impersonate = types.ModuleType("yt_dlp.networking.impersonate")
fake_utils = types.ModuleType("yt_dlp.utils")
class _ImpersonateTarget:
@staticmethod
def from_str(value):
return value
fake_impersonate.ImpersonateTarget = _ImpersonateTarget
fake_networking.impersonate = fake_impersonate
# The inner ``key`` group mirrors the real ``STR_FORMAT_RE_TMPL`` so that
# ``_OUTTMPL_FIELD_RE`` (compiled at import time) has the named group that
# ``_resolve_outtmpl_fields`` reads via ``match.group('key')``.
fake_utils.STR_FORMAT_RE_TMPL = r"(?P<prefix>)%\((?P<has_key>(?P<key>{}))\)(?P<format>[-0-9.]*{})"
fake_utils.STR_FORMAT_TYPES = "diouxXeEfFgGcrsa"
fake_yt_dlp.networking = fake_networking
fake_yt_dlp.utils = fake_utils
sys.modules.setdefault("yt_dlp", fake_yt_dlp)
sys.modules.setdefault("yt_dlp.networking", fake_networking)
sys.modules.setdefault("yt_dlp.networking.impersonate", fake_impersonate)
sys.modules.setdefault("yt_dlp.utils", fake_utils)
from ytdl import (
DownloadInfo,
_compact_persisted_entry,
_convert_srt_to_txt_file,
_resolve_outtmpl_fields,
_sanitize_entry_for_pickle,
_sanitize_path_component,
)
# Detect whether the real yt-dlp is loaded (as opposed to the minimal fake
# shim above). _resolve_outtmpl_fields needs YoutubeDL at runtime.
_has_real_ytdlp = hasattr(sys.modules.get("yt_dlp"), "YoutubeDL")
class SanitizePathComponentTests(unittest.TestCase):
def test_replaces_windows_invalid_chars(self):
self.assertEqual(_sanitize_path_component('a:b*c?d"e<f>g|h'), "a_b_c_d_e_f_g_h")
def test_non_string_passthrough(self):
self.assertIs(_sanitize_path_component(None), None)
self.assertEqual(_sanitize_path_component(42), 42)
@unittest.skipUnless(_has_real_ytdlp, "requires real yt-dlp")
class ResolveOuttmplFieldsTests(unittest.TestCase):
"""Tests for _resolve_outtmpl_fields (delegates to yt-dlp's template engine)."""
def test_simple_playlist_substitution(self):
info = {"playlist_title": "My PL", "playlist_index": "03"}
result = _resolve_outtmpl_fields("%(playlist_title)s/%(title)s.%(ext)s", info, ("playlist",))
self.assertEqual(result, "My PL/%(title)s.%(ext)s")
def test_format_spec_int(self):
info = {"playlist_index": "3"}
result = _resolve_outtmpl_fields("%(playlist_index)02d-%(title)s", info, ("playlist",))
self.assertEqual(result, "03-%(title)s")
def test_non_targeted_fields_unchanged(self):
info = {"playlist_title": "PL"}
result = _resolve_outtmpl_fields("%(title)s/%(ext)s", info, ("playlist",))
self.assertEqual(result, "%(title)s/%(ext)s")
def test_default_value(self):
info = {"playlist_index": "1"}
result = _resolve_outtmpl_fields("%(playlist_title|Unknown)s/%(playlist_index)s", info, ("playlist",))
self.assertEqual(result, "Unknown/1")
def test_channel_prefix(self):
info = {"channel": "MyChan", "channel_index": "05"}
result = _resolve_outtmpl_fields("%(channel)s/%(channel_index)02d-%(title)s", info, ("channel",))
self.assertEqual(result, "MyChan/05-%(title)s")
def test_math_operation(self):
info = {"playlist_index": "3"}
result = _resolve_outtmpl_fields("%(playlist_index+100)d", info, ("playlist",))
self.assertEqual(result, "103")
def test_playlist_count_and_autonumber(self):
info = {
"playlist_title": "My PL",
"playlist_index": "03",
"playlist_count": 10,
"playlist_autonumber": 3,
"n_entries": 10,
"__last_playlist_index": 10,
}
result = _resolve_outtmpl_fields(
"%(playlist_title)s/%(playlist_autonumber)s of %(playlist_count)s - %(title)s.%(ext)s",
info,
("playlist",),
)
# playlist_autonumber is auto-padded by yt-dlp using __last_playlist_index
self.assertEqual(result, "My PL/03 of 10 - %(title)s.%(ext)s")
def test_conditional_playlist_index(self):
info = {
"playlist_index": "5",
"playlist_count": 10,
}
result = _resolve_outtmpl_fields(
"%(playlist_index&{} - |)s%(title)s.%(ext)s",
info,
("playlist",),
)
self.assertEqual(result, "5 - %(title)s.%(ext)s")
class SanitizeEntryForPickleTests(unittest.TestCase):
def test_nested(self):
def g():
yield 1
obj = {"a": g(), "b": [g()]}
out = _sanitize_entry_for_pickle(obj)
self.assertEqual(out, {"a": [1], "b": [[1]]})
pickle.dumps(out)
def test_plain(self):
self.assertEqual(_sanitize_entry_for_pickle(5), 5)
def test_set_converted_to_list(self):
obj = {"s": {1, 2}}
out = _sanitize_entry_for_pickle(obj)
self.assertEqual(sorted(out["s"]), [1, 2])
pickle.dumps(out)
def test_map_iterator(self):
out = _sanitize_entry_for_pickle({"m": map(int, ["1", "2"])})
self.assertEqual(out, {"m": [1, 2]})
def test_lock_replaced_with_none(self):
lock = threading.Lock()
out = _sanitize_entry_for_pickle({"k": lock})
self.assertIsNone(out["k"])
pickle.dumps(out)
def test_ordered_dict(self):
from collections import OrderedDict
od = OrderedDict([("z", 1), ("a", 2)])
out = _sanitize_entry_for_pickle(od)
self.assertEqual(out, {"z": 1, "a": 2})
class ConvertSrtToTxtTests(unittest.TestCase):
def test_basic_conversion(self):
srt = """1
00:00:01,000 --> 00:00:02,000
Hello <b>world</b>
2
00:00:03,000 --> 00:00:04,000
Second line
"""
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "sub.srt"
path.write_text(srt, encoding="utf-8")
txt_path = _convert_srt_to_txt_file(str(path))
self.assertIsNotNone(txt_path)
self.assertTrue(txt_path.endswith(".txt"))
content = Path(txt_path).read_text(encoding="utf-8")
self.assertIn("Hello world", content)
self.assertIn("Second line", content)
class DownloadInfoSetstateTests(unittest.TestCase):
def _base_state(self, **kwargs):
base = {
"id": "id1",
"title": "t",
"url": "http://example.com/v",
"folder": "",
"custom_name_prefix": "",
"error": None,
"entry": None,
"playlist_item_limit": 0,
"split_by_chapters": False,
"chapter_template": "",
"msg": None,
"percent": None,
"speed": None,
"eta": None,
"status": "pending",
"size": None,
"timestamp": 0,
}
base.update(kwargs)
return base
def test_migrates_old_audio_format(self):
state = self._base_state(format="m4a", quality="best")
di = DownloadInfo.__new__(DownloadInfo)
di.__setstate__(state)
self.assertEqual(di.download_type, "audio")
self.assertEqual(di.codec, "auto")
def test_migrates_thumbnail(self):
state = self._base_state(format="thumbnail", quality="best")
di = DownloadInfo.__new__(DownloadInfo)
di.__setstate__(state)
self.assertEqual(di.download_type, "thumbnail")
self.assertEqual(di.format, "jpg")
def test_migrates_captions(self):
state = self._base_state(format="captions", subtitle_format="vtt", quality="best")
di = DownloadInfo.__new__(DownloadInfo)
di.__setstate__(state)
self.assertEqual(di.download_type, "captions")
self.assertEqual(di.format, "vtt")
def test_migrates_best_ios(self):
state = self._base_state(
format="any", quality="best_ios", video_codec="auto"
)
di = DownloadInfo.__new__(DownloadInfo)
di.__setstate__(state)
self.assertEqual(di.format, "ios")
self.assertEqual(di.quality, "best")
def test_migrates_quality_audio(self):
state = self._base_state(format="mp4", quality="audio", video_codec="h264")
di = DownloadInfo.__new__(DownloadInfo)
di.__setstate__(state)
self.assertEqual(di.download_type, "audio")
self.assertEqual(di.format, "m4a")
def test_new_state_has_subtitle_files(self):
state = self._base_state(
download_type="video",
codec="auto",
format="any",
quality="best",
)
di = DownloadInfo.__new__(DownloadInfo)
di.__setstate__(state)
self.assertEqual(di.subtitle_files, [])
def test_missing_optional_fields_are_defaulted(self):
state = self._base_state(
download_type="video",
codec="auto",
format="any",
quality="best",
)
state.pop("folder")
state.pop("custom_name_prefix")
state.pop("playlist_item_limit")
state.pop("split_by_chapters")
state.pop("chapter_template")
di = DownloadInfo.__new__(DownloadInfo)
di.__setstate__(state)
self.assertEqual(di.folder, "")
self.assertEqual(di.custom_name_prefix, "")
self.assertEqual(di.playlist_item_limit, 0)
self.assertFalse(di.split_by_chapters)
self.assertEqual(di.chapter_template, "")
class CompactPersistedEntryTests(unittest.TestCase):
def test_keeps_only_playlist_and_channel_keys(self):
entry = {
"playlist_index": "01",
"playlist_title": "Playlist",
"playlist_count": 10,
"playlist_autonumber": 1,
"channel_index": "02",
"channel_title": "Channel",
"n_entries": 10,
"__last_playlist_index": 10,
"formats": [{"id": "huge"}],
"description": "big blob",
}
compact = _compact_persisted_entry(entry)
self.assertEqual(
compact,
{
"playlist_index": "01",
"playlist_title": "Playlist",
"playlist_count": 10,
"playlist_autonumber": 1,
"channel_index": "02",
"channel_title": "Channel",
"n_entries": 10,
"__last_playlist_index": 10,
},
)
def test_returns_none_when_no_restart_relevant_keys_exist(self):
self.assertIsNone(_compact_persisted_entry({"id": "x", "title": "y"}))
if __name__ == "__main__":
unittest.main()