diff --git a/app/tests/test_ytdl_utils.py b/app/tests/test_ytdl_utils.py index 54c9400..06d6d50 100644 --- a/app/tests/test_ytdl_utils.py +++ b/app/tests/test_ytdl_utils.py @@ -24,7 +24,7 @@ class _ImpersonateTarget: fake_impersonate.ImpersonateTarget = _ImpersonateTarget fake_networking.impersonate = fake_impersonate -fake_utils.STR_FORMAT_RE_TMPL = r"(?P)%\((?P{})\)(?P[-0-9.]*{})" +fake_utils.STR_FORMAT_RE_TMPL = r"(?P)%\((?P(?P{}))\)(?P[-0-9.]*{})" fake_utils.STR_FORMAT_TYPES = "diouxXeEfFgGcrsa" fake_yt_dlp.networking = fake_networking fake_yt_dlp.utils = fake_utils @@ -37,11 +37,15 @@ from ytdl import ( DownloadInfo, _compact_persisted_entry, _convert_srt_to_txt_file, - _outtmpl_substitute_field, + _resolve_outtmpl_fields, _sanitize_entry_for_pickle, _sanitize_path_component, ) +# Detect whether the real yt-dlp is loaded (as opposed to the minimal fake +# shim above). _resolve_outtmpl_fields needs YoutubeDL at runtime. +_has_real_ytdlp = hasattr(sys.modules.get("yt_dlp"), "YoutubeDL") + class SanitizePathComponentTests(unittest.TestCase): def test_replaces_windows_invalid_chars(self): @@ -52,15 +56,39 @@ class SanitizePathComponentTests(unittest.TestCase): self.assertEqual(_sanitize_path_component(42), 42) -class OuttmplSubstituteFieldTests(unittest.TestCase): - def test_simple_substitution(self): - self.assertEqual(_outtmpl_substitute_field("%(title)s", "title", "Hello"), "Hello") +@unittest.skipUnless(_has_real_ytdlp, "requires real yt-dlp") +class ResolveOuttmplFieldsTests(unittest.TestCase): + """Tests for _resolve_outtmpl_fields (delegates to yt-dlp's template engine).""" + + def test_simple_playlist_substitution(self): + info = {"playlist_title": "My PL", "playlist_index": "03"} + result = _resolve_outtmpl_fields("%(playlist_title)s/%(title)s.%(ext)s", info, ("playlist",)) + self.assertEqual(result, "My PL/%(title)s.%(ext)s") def test_format_spec_int(self): - self.assertEqual(_outtmpl_substitute_field("%(idx)02d", "idx", 3), "03") + info = {"playlist_index": "3"} + result = _resolve_outtmpl_fields("%(playlist_index)02d-%(title)s", info, ("playlist",)) + self.assertEqual(result, "03-%(title)s") - def test_missing_field_unchanged(self): - self.assertEqual(_outtmpl_substitute_field("%(other)s", "title", "x"), "%(other)s") + def test_non_targeted_fields_unchanged(self): + info = {"playlist_title": "PL"} + result = _resolve_outtmpl_fields("%(title)s/%(ext)s", info, ("playlist",)) + self.assertEqual(result, "%(title)s/%(ext)s") + + def test_default_value(self): + info = {"playlist_index": "1"} + result = _resolve_outtmpl_fields("%(playlist_title|Unknown)s/%(playlist_index)s", info, ("playlist",)) + self.assertEqual(result, "Unknown/1") + + def test_channel_prefix(self): + info = {"channel": "MyChan", "channel_index": "05"} + result = _resolve_outtmpl_fields("%(channel)s/%(channel_index)02d-%(title)s", info, ("channel",)) + self.assertEqual(result, "MyChan/05-%(title)s") + + def test_math_operation(self): + info = {"playlist_index": "3"} + result = _resolve_outtmpl_fields("%(playlist_index+100)d", info, ("playlist",)) + self.assertEqual(result, "103") class SanitizeEntryForPickleTests(unittest.TestCase): diff --git a/app/ytdl.py b/app/ytdl.py index 6aac00d..15f42f1 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -13,7 +13,6 @@ import logging import re import types from typing import Any, Optional -from functools import lru_cache import yt_dlp.networking.impersonate from yt_dlp.utils import STR_FORMAT_RE_TMPL, STR_FORMAT_TYPES @@ -24,13 +23,6 @@ from state_store import AtomicJsonStore, from_json_compatible, read_legacy_shelf log = logging.getLogger('ytdl') -@lru_cache(maxsize=None) -def _compile_outtmpl_pattern(field: str) -> re.Pattern: - """Compile a regex pattern to match a specific field in an output template, including optional format specifiers.""" - conversion_types = f"[{re.escape(STR_FORMAT_TYPES)}]" - return re.compile(STR_FORMAT_RE_TMPL.format(re.escape(field), conversion_types)) - - # Characters that are invalid in Windows/NTFS path components. These are pre- # sanitised when substituting playlist/channel titles into output templates so # that downloads do not fail on NTFS-mounted volumes or Windows Docker hosts. @@ -41,44 +33,51 @@ def _sanitize_path_component(value: Any) -> Any: """Replace characters that are invalid in Windows path components with '_'. Non-string values (int, float, None, …) are passed through unchanged so - that ``_outtmpl_substitute_field`` can still coerce them with format specs - (e.g. ``%(playlist_index)02d``). Only string values are sanitised because - Windows-invalid characters are only a concern for human-readable strings - (titles, channel names, etc.) that may end up as directory names. + that numeric format specs (e.g. ``%(playlist_index)02d``) still work. + Only string values are sanitised because Windows-invalid characters are + only a concern for human-readable strings (titles, channel names, etc.) + that may end up as directory names. """ if not isinstance(value, str): return value return _WINDOWS_INVALID_PATH_CHARS.sub('_', value) -def _outtmpl_substitute_field(template: str, field: str, value: Any) -> str: - """Substitute a single field in an output template, applying any format specifiers to the value.""" - pattern = _compile_outtmpl_pattern(field) +# Regex matching yt-dlp output-template field references, e.g. ``%(title)s`` +# or ``%(playlist_index)03d``. Built from yt-dlp's own ``STR_FORMAT_RE_TMPL`` +# so that it stays in sync with upstream changes to the template syntax. +_OUTTMPL_FIELD_RE = re.compile( + STR_FORMAT_RE_TMPL.format('[^)]+', f'[{STR_FORMAT_TYPES}ljhqBUDS]') +) - def replacement(match: re.Match) -> str: - if match.group("has_key") is None: - return match.group(0) - prefix = match.group("prefix") or "" - format_spec = match.group("format") +def _resolve_outtmpl_fields(template: str, info_dict: dict, prefixes: tuple[str, ...]) -> str: + """Resolve specific fields in an output template using yt-dlp's template engine. - if not format_spec: - return f"{prefix}{value}" + Only field references whose root name starts with one of *prefixes* are + evaluated. All other references are left untouched so that yt-dlp can + resolve them later during the actual download. - conversion_type = format_spec[-1] - try: - if conversion_type in "diouxX": - coerced_value = int(value) - elif conversion_type in "eEfFgG": - coerced_value = float(value) - else: - coerced_value = value + This delegates to ``YoutubeDL.evaluate_outtmpl`` for each targeted field + reference, giving access to the full yt-dlp template syntax (defaults, + conditional formatting, math operations, datetime formatting, etc.). + """ + matches = list(_OUTTMPL_FIELD_RE.finditer(template)) + if not matches: + return template - return f"{prefix}{('%' + format_spec) % coerced_value}" - except (ValueError, TypeError): - return f"{prefix}{value}" + with yt_dlp.YoutubeDL({'quiet': True}) as ydl: + for match in reversed(matches): + key = match.group('key') + if key is None: + continue + root = re.match(r'\w+', key) + if root is None or not root.group(0).startswith(prefixes): + continue + resolved = ydl.evaluate_outtmpl(match.group(0), info_dict) + template = template[:match.start()] + resolved + template[match.end():] - return pattern.sub(replacement, template) + return template _MAX_ENTRY_SANITIZE_DEPTH = 64 @@ -818,15 +817,13 @@ class DownloadQueue: if entry is not None and entry.get('playlist_index') is not None: if len(self.config.OUTPUT_TEMPLATE_PLAYLIST): output = self.config.OUTPUT_TEMPLATE_PLAYLIST - for property, value in entry.items(): - if property.startswith("playlist"): - output = _outtmpl_substitute_field(output, property, _sanitize_path_component(value)) + sanitized = {k: _sanitize_path_component(v) for k, v in entry.items()} + output = _resolve_outtmpl_fields(output, sanitized, ('playlist',)) if entry is not None and entry.get('channel_index') is not None: if len(self.config.OUTPUT_TEMPLATE_CHANNEL): output = self.config.OUTPUT_TEMPLATE_CHANNEL - for property, value in entry.items(): - if property.startswith("channel"): - output = _outtmpl_substitute_field(output, property, _sanitize_path_component(value)) + sanitized = {k: _sanitize_path_component(v) for k, v in entry.items()} + output = _resolve_outtmpl_fields(output, sanitized, ('channel',)) ytdl_options = dict(self.config.YTDL_OPTIONS) playlist_item_limit = getattr(dl, 'playlist_item_limit', 0) if playlist_item_limit > 0: