mirror of
https://github.com/alexta69/metube.git
synced 2026-06-13 16:40:05 +00:00
1403 lines
56 KiB
Python
1403 lines
56 KiB
Python
import os
|
|
import shutil
|
|
import yt_dlp
|
|
import collections
|
|
import collections.abc
|
|
import copy
|
|
import pickle
|
|
from collections import OrderedDict
|
|
import time
|
|
import asyncio
|
|
import multiprocessing
|
|
from functools import partial
|
|
import logging
|
|
import re
|
|
import types
|
|
from typing import Any, Optional
|
|
|
|
import yt_dlp.networking.impersonate
|
|
from yt_dlp.utils import STR_FORMAT_RE_TMPL, STR_FORMAT_TYPES
|
|
from dl_formats import get_format, get_opts, AUDIO_FORMATS
|
|
from datetime import datetime
|
|
from state_store import AtomicJsonStore, from_json_compatible, read_legacy_shelf, to_json_compatible
|
|
from subscriptions import _entry_id
|
|
|
|
log = logging.getLogger('ytdl')
|
|
|
|
_LIVE_CHECK_INTERVAL = 60
|
|
_LIVE_MAX_CHECK_INTERVAL = 3600
|
|
# Consecutive probe failures (network blips, rate limits, transient extractor
|
|
# errors) tolerated before a scheduled live download is abandoned as errored.
|
|
_LIVE_PROBE_MAX_FAILURES = 5
|
|
|
|
|
|
# Characters that are invalid in Windows/NTFS path components. These are pre-
|
|
# sanitised when substituting playlist/channel titles into output templates so
|
|
# that downloads do not fail on NTFS-mounted volumes or Windows Docker hosts.
|
|
_WINDOWS_INVALID_PATH_CHARS = re.compile(r'[\\:*?"<>|]')
|
|
|
|
|
|
def _sanitize_path_component(value: Any) -> Any:
|
|
"""Replace characters that are invalid in Windows path components with '_'.
|
|
|
|
Non-string values (int, float, None, …) are passed through unchanged so
|
|
that numeric format specs (e.g. ``%(playlist_index)02d``) still work.
|
|
Only string values are sanitised because Windows-invalid characters are
|
|
only a concern for human-readable strings (titles, channel names, etc.)
|
|
that may end up as directory names.
|
|
"""
|
|
if not isinstance(value, str):
|
|
return value
|
|
return _WINDOWS_INVALID_PATH_CHARS.sub('_', value)
|
|
|
|
|
|
# Regex matching yt-dlp output-template field references, e.g. ``%(title)s``
|
|
# or ``%(playlist_index)03d``. Built from yt-dlp's own ``STR_FORMAT_RE_TMPL``
|
|
# so that it stays in sync with upstream changes to the template syntax.
|
|
_OUTTMPL_FIELD_RE = re.compile(
|
|
STR_FORMAT_RE_TMPL.format('[^)]+', f'[{STR_FORMAT_TYPES}ljhqBUDS]')
|
|
)
|
|
|
|
|
|
def _resolve_outtmpl_fields(template: str, info_dict: dict, prefixes: tuple[str, ...]) -> str:
|
|
"""Resolve specific fields in an output template using yt-dlp's template engine.
|
|
|
|
Only field references whose root name starts with one of *prefixes* are
|
|
evaluated. All other references are left untouched so that yt-dlp can
|
|
resolve them later during the actual download.
|
|
|
|
This delegates to ``YoutubeDL.evaluate_outtmpl`` for each targeted field
|
|
reference, giving access to the full yt-dlp template syntax (defaults,
|
|
conditional formatting, math operations, datetime formatting, etc.).
|
|
"""
|
|
matches = list(_OUTTMPL_FIELD_RE.finditer(template))
|
|
if not matches:
|
|
return template
|
|
|
|
with yt_dlp.YoutubeDL({'quiet': True}) as ydl:
|
|
for match in reversed(matches):
|
|
key = match.group('key')
|
|
if key is None:
|
|
continue
|
|
root = re.match(r'\w+', key)
|
|
if root is None or not root.group(0).startswith(prefixes):
|
|
continue
|
|
resolved = ydl.evaluate_outtmpl(match.group(0), info_dict)
|
|
template = template[:match.start()] + resolved + template[match.end():]
|
|
|
|
return template
|
|
|
|
_MAX_ENTRY_SANITIZE_DEPTH = 64
|
|
|
|
|
|
def _sanitize_entry_for_pickle(obj, _depth=0):
|
|
"""Recursively normalize yt-dlp ``info_dict`` data so it can be stored in shelve/pickle.
|
|
|
|
Live streams and newer yt-dlp versions may nest generators, iterators, sets, or
|
|
non-serializable objects (e.g. locks) inside the extracted metadata. The previous
|
|
helper only walked plain dict/list/tuple and only expanded ``types.GeneratorType``.
|
|
"""
|
|
if _depth > _MAX_ENTRY_SANITIZE_DEPTH:
|
|
return None
|
|
if obj is None or isinstance(obj, (bool, int, float, str, bytes)):
|
|
return obj
|
|
if isinstance(obj, types.GeneratorType):
|
|
return _sanitize_entry_for_pickle(list(obj), _depth + 1)
|
|
if isinstance(obj, collections.abc.Mapping):
|
|
return {k: _sanitize_entry_for_pickle(v, _depth + 1) for k, v in obj.items()}
|
|
if isinstance(obj, (list, tuple)):
|
|
return type(obj)(_sanitize_entry_for_pickle(x, _depth + 1) for x in obj)
|
|
if isinstance(obj, (set, frozenset)):
|
|
return [_sanitize_entry_for_pickle(x, _depth + 1) for x in obj]
|
|
if isinstance(obj, collections.deque):
|
|
return [_sanitize_entry_for_pickle(x, _depth + 1) for x in obj]
|
|
if isinstance(obj, collections.abc.Iterator):
|
|
try:
|
|
return _sanitize_entry_for_pickle(list(obj), _depth + 1)
|
|
except Exception:
|
|
return None
|
|
try:
|
|
pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
|
|
return obj
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _convert_srt_to_txt_file(subtitle_path: str):
|
|
"""Convert an SRT subtitle file into plain text by stripping cue numbers/timestamps."""
|
|
txt_path = os.path.splitext(subtitle_path)[0] + ".txt"
|
|
try:
|
|
with open(subtitle_path, "r", encoding="utf-8", errors="replace") as infile:
|
|
content = infile.read()
|
|
|
|
# Normalize newlines so cue splitting is consistent across platforms.
|
|
content = content.replace("\r\n", "\n").replace("\r", "\n")
|
|
cues = []
|
|
for block in re.split(r"\n{2,}", content):
|
|
lines = [line.strip() for line in block.split("\n") if line.strip()]
|
|
if not lines:
|
|
continue
|
|
if re.fullmatch(r"\d+", lines[0]):
|
|
lines = lines[1:]
|
|
if lines and "-->" in lines[0]:
|
|
lines = lines[1:]
|
|
|
|
text_lines = []
|
|
for line in lines:
|
|
if "-->" in line:
|
|
continue
|
|
clean_line = re.sub(r"<[^>]+>", "", line).strip()
|
|
if clean_line:
|
|
text_lines.append(clean_line)
|
|
if text_lines:
|
|
cues.append(" ".join(text_lines))
|
|
|
|
with open(txt_path, "w", encoding="utf-8") as outfile:
|
|
if cues:
|
|
outfile.write("\n".join(cues))
|
|
outfile.write("\n")
|
|
return txt_path
|
|
except OSError as exc:
|
|
log.warning(f"Failed to convert subtitle file {subtitle_path} to txt: {exc}")
|
|
return None
|
|
|
|
class DownloadQueueNotifier:
|
|
async def added(self, dl):
|
|
raise NotImplementedError
|
|
|
|
async def updated(self, dl):
|
|
raise NotImplementedError
|
|
|
|
async def completed(self, dl):
|
|
raise NotImplementedError
|
|
|
|
async def canceled(self, id):
|
|
raise NotImplementedError
|
|
|
|
async def cleared(self, id):
|
|
raise NotImplementedError
|
|
|
|
class DownloadInfo:
|
|
def __init__(
|
|
self,
|
|
id,
|
|
title,
|
|
url,
|
|
quality,
|
|
download_type,
|
|
codec,
|
|
format,
|
|
folder,
|
|
custom_name_prefix,
|
|
error,
|
|
entry,
|
|
playlist_item_limit,
|
|
split_by_chapters,
|
|
chapter_template,
|
|
subtitle_language="en",
|
|
subtitle_mode="prefer_manual",
|
|
ytdl_options_presets=None,
|
|
ytdl_options_overrides=None,
|
|
clip_start=None,
|
|
clip_end=None,
|
|
live_status=None,
|
|
live_release_timestamp=None,
|
|
):
|
|
self.id = id if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{id}'
|
|
self.title = title if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{title}'
|
|
self.url = url
|
|
self.quality = quality
|
|
self.download_type = download_type
|
|
self.codec = codec
|
|
self.format = format
|
|
self.folder = folder
|
|
self.custom_name_prefix = custom_name_prefix
|
|
self.msg = self.percent = self.speed = self.eta = None
|
|
self.status = "pending"
|
|
self.size = None
|
|
self.timestamp = time.time_ns()
|
|
self.error = error
|
|
# Strip non-pickleable values (generators, iterators, locks, etc.) for shelve
|
|
self.entry = _sanitize_entry_for_pickle(entry) if entry is not None else None
|
|
self.playlist_item_limit = playlist_item_limit
|
|
self.split_by_chapters = split_by_chapters
|
|
self.chapter_template = chapter_template
|
|
self.subtitle_language = subtitle_language
|
|
self.subtitle_mode = subtitle_mode
|
|
self.ytdl_options_presets = list(ytdl_options_presets or [])
|
|
self.ytdl_options_overrides = dict(ytdl_options_overrides or {})
|
|
self.clip_start = clip_start
|
|
self.clip_end = clip_end
|
|
self.live_status = live_status
|
|
self.live_release_timestamp = live_release_timestamp
|
|
self.subtitle_files = []
|
|
|
|
def __setstate__(self, state):
|
|
"""BACKWARD COMPATIBILITY: migrate old DownloadInfo from persistent queue files."""
|
|
self.__dict__.update(state)
|
|
if 'download_type' not in state:
|
|
old_format = state.get('format', 'any')
|
|
old_video_codec = state.get('video_codec', 'auto')
|
|
old_quality = state.get('quality', 'best')
|
|
old_subtitle_format = state.get('subtitle_format', 'srt')
|
|
|
|
if old_format in AUDIO_FORMATS:
|
|
self.download_type = 'audio'
|
|
self.codec = 'auto'
|
|
elif old_format == 'thumbnail':
|
|
self.download_type = 'thumbnail'
|
|
self.codec = 'auto'
|
|
self.format = 'jpg'
|
|
elif old_format == 'captions':
|
|
self.download_type = 'captions'
|
|
self.codec = 'auto'
|
|
self.format = old_subtitle_format
|
|
else:
|
|
self.download_type = 'video'
|
|
self.codec = old_video_codec
|
|
if old_quality == 'best_ios':
|
|
self.format = 'ios'
|
|
self.quality = 'best'
|
|
elif old_quality == 'audio':
|
|
self.download_type = 'audio'
|
|
self.codec = 'auto'
|
|
self.format = 'm4a'
|
|
self.quality = 'best'
|
|
self.__dict__.pop('video_codec', None)
|
|
self.__dict__.pop('subtitle_format', None)
|
|
|
|
if not getattr(self, "codec", None):
|
|
self.codec = "auto"
|
|
if not hasattr(self, "folder"):
|
|
self.folder = ""
|
|
if not hasattr(self, "custom_name_prefix"):
|
|
self.custom_name_prefix = ""
|
|
if not hasattr(self, "playlist_item_limit"):
|
|
self.playlist_item_limit = 0
|
|
if not hasattr(self, "split_by_chapters"):
|
|
self.split_by_chapters = False
|
|
if not hasattr(self, "chapter_template"):
|
|
self.chapter_template = ""
|
|
if not hasattr(self, "subtitle_language"):
|
|
self.subtitle_language = "en"
|
|
if not hasattr(self, "subtitle_mode"):
|
|
self.subtitle_mode = "prefer_manual"
|
|
legacy_preset = self.__dict__.pop("ytdl_options_preset", None)
|
|
if "ytdl_options_presets" not in self.__dict__:
|
|
if isinstance(legacy_preset, str) and legacy_preset.strip():
|
|
self.ytdl_options_presets = [legacy_preset.strip()]
|
|
elif isinstance(legacy_preset, list):
|
|
self.ytdl_options_presets = [str(x).strip() for x in legacy_preset if str(x).strip()]
|
|
else:
|
|
self.ytdl_options_presets = []
|
|
if not hasattr(self, "ytdl_options_overrides"):
|
|
self.ytdl_options_overrides = {}
|
|
if not hasattr(self, "entry"):
|
|
self.entry = None
|
|
if not hasattr(self, "subtitle_files"):
|
|
self.subtitle_files = []
|
|
if not hasattr(self, "chapter_files"):
|
|
self.chapter_files = []
|
|
if not hasattr(self, "clip_start"):
|
|
self.clip_start = None
|
|
if not hasattr(self, "clip_end"):
|
|
self.clip_end = None
|
|
if not hasattr(self, "live_status"):
|
|
self.live_status = None
|
|
if not hasattr(self, "live_release_timestamp"):
|
|
self.live_release_timestamp = None
|
|
|
|
|
|
_PERSISTED_DOWNLOAD_FIELDS = (
|
|
"id",
|
|
"title",
|
|
"url",
|
|
"quality",
|
|
"download_type",
|
|
"codec",
|
|
"format",
|
|
"folder",
|
|
"custom_name_prefix",
|
|
"playlist_item_limit",
|
|
"split_by_chapters",
|
|
"chapter_template",
|
|
"subtitle_language",
|
|
"subtitle_mode",
|
|
"ytdl_options_presets",
|
|
"ytdl_options_overrides",
|
|
"clip_start",
|
|
"clip_end",
|
|
"live_status",
|
|
"live_release_timestamp",
|
|
"status",
|
|
"timestamp",
|
|
"error",
|
|
"msg",
|
|
"filename",
|
|
"size",
|
|
"chapter_files",
|
|
)
|
|
|
|
|
|
_COMPACT_ENTRY_EXTRA_KEYS = frozenset(("n_entries", "__last_playlist_index"))
|
|
|
|
|
|
def _compact_persisted_entry(entry: Any) -> Optional[dict[str, Any]]:
|
|
if not isinstance(entry, dict):
|
|
return None
|
|
compact = {
|
|
key: value
|
|
for key, value in entry.items()
|
|
if key.startswith("playlist") or key.startswith("channel") or key in _COMPACT_ENTRY_EXTRA_KEYS
|
|
}
|
|
return compact or None
|
|
|
|
|
|
def _download_info_to_record(
|
|
info: DownloadInfo,
|
|
*,
|
|
include_entry: bool,
|
|
) -> dict[str, Any]:
|
|
record: dict[str, Any] = {}
|
|
for key in _PERSISTED_DOWNLOAD_FIELDS:
|
|
if hasattr(info, key):
|
|
value = getattr(info, key)
|
|
if value is not None:
|
|
record[key] = to_json_compatible(value)
|
|
if include_entry:
|
|
compact_entry = _compact_persisted_entry(getattr(info, "entry", None))
|
|
if compact_entry is not None:
|
|
record["entry"] = to_json_compatible(compact_entry)
|
|
return record
|
|
|
|
|
|
def _download_info_from_record(record: dict[str, Any]) -> DownloadInfo:
|
|
info = DownloadInfo.__new__(DownloadInfo)
|
|
info.__setstate__({key: from_json_compatible(value) for key, value in record.items()})
|
|
if not hasattr(info, "msg"):
|
|
info.msg = None
|
|
if not hasattr(info, "percent"):
|
|
info.percent = None
|
|
if not hasattr(info, "speed"):
|
|
info.speed = None
|
|
if not hasattr(info, "eta"):
|
|
info.eta = None
|
|
if not hasattr(info, "status"):
|
|
info.status = "pending"
|
|
if not hasattr(info, "size"):
|
|
info.size = None
|
|
if not hasattr(info, "error"):
|
|
info.error = None
|
|
return info
|
|
|
|
class Download:
|
|
manager = None
|
|
|
|
@classmethod
|
|
def shutdown_manager(cls):
|
|
if cls.manager is not None:
|
|
cls.manager.shutdown()
|
|
cls.manager = None
|
|
|
|
def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info):
|
|
self.download_dir = download_dir
|
|
self.temp_dir = temp_dir
|
|
self.output_template = output_template
|
|
self.output_template_chapter = output_template_chapter
|
|
self.info = info
|
|
self.format = get_format(
|
|
getattr(info, 'download_type', 'video'),
|
|
getattr(info, 'codec', 'auto'),
|
|
format,
|
|
quality,
|
|
)
|
|
self.ytdl_opts = get_opts(
|
|
getattr(info, 'download_type', 'video'),
|
|
getattr(info, 'codec', 'auto'),
|
|
format,
|
|
quality,
|
|
ytdl_opts,
|
|
subtitle_language=getattr(info, 'subtitle_language', 'en'),
|
|
subtitle_mode=getattr(info, 'subtitle_mode', 'prefer_manual'),
|
|
)
|
|
if "impersonate" in self.ytdl_opts:
|
|
self.ytdl_opts["impersonate"] = yt_dlp.networking.impersonate.ImpersonateTarget.from_str(self.ytdl_opts["impersonate"])
|
|
self.canceled = False
|
|
self.tmpfilename = None
|
|
self.status_queue = None
|
|
self.proc = None
|
|
self.loop = None
|
|
self.notifier = None
|
|
|
|
def _download(self):
|
|
log.info(f"Starting download for: {self.info.title} ({self.info.url})")
|
|
try:
|
|
debug_logging = logging.getLogger().isEnabledFor(logging.DEBUG)
|
|
def put_status(st):
|
|
self.status_queue.put({k: v for k, v in st.items() if k in (
|
|
'tmpfilename',
|
|
'filename',
|
|
'status',
|
|
'msg',
|
|
'total_bytes',
|
|
'total_bytes_estimate',
|
|
'downloaded_bytes',
|
|
'speed',
|
|
'eta',
|
|
)})
|
|
|
|
def put_status_postprocessor(d):
|
|
if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished':
|
|
filepath = d['info_dict']['filepath']
|
|
if '__finaldir' in d['info_dict']:
|
|
finaldir = d['info_dict']['__finaldir']
|
|
filename = os.path.join(finaldir, os.path.basename(filepath))
|
|
else:
|
|
filename = filepath
|
|
self.status_queue.put({'status': 'finished', 'filename': filename})
|
|
# For captions-only downloads, yt-dlp may still report a media-like
|
|
# filepath in MoveFiles. Capture subtitle outputs explicitly so the
|
|
# UI can link to real caption files.
|
|
if getattr(self.info, 'download_type', '') == 'captions':
|
|
requested_subtitles = d.get('info_dict', {}).get('requested_subtitles', {}) or {}
|
|
for subtitle in requested_subtitles.values():
|
|
if isinstance(subtitle, dict) and subtitle.get('filepath'):
|
|
self.status_queue.put({'subtitle_file': subtitle['filepath']})
|
|
|
|
# Capture all chapter files when SplitChapters finishes
|
|
elif d.get('postprocessor') == 'SplitChapters' and d.get('status') == 'finished':
|
|
chapters = d.get('info_dict', {}).get('chapters', [])
|
|
if chapters:
|
|
for chapter in chapters:
|
|
if isinstance(chapter, dict) and 'filepath' in chapter:
|
|
log.info(f"Captured chapter file: {chapter['filepath']}")
|
|
self.status_queue.put({'chapter_file': chapter['filepath']})
|
|
else:
|
|
log.warning("SplitChapters finished but no chapter files found in info_dict")
|
|
|
|
ytdl_params = {
|
|
'quiet': not debug_logging,
|
|
'verbose': debug_logging,
|
|
'no_color': True,
|
|
'paths': {"home": self.download_dir, "temp": self.temp_dir},
|
|
'outtmpl': { "default": self.output_template, "chapter": self.output_template_chapter },
|
|
'format': self.format,
|
|
'socket_timeout': 30,
|
|
'ignore_no_formats_error': True,
|
|
'progress_hooks': [put_status],
|
|
'postprocessor_hooks': [put_status_postprocessor],
|
|
**self.ytdl_opts,
|
|
}
|
|
|
|
# Add chapter splitting options if enabled
|
|
if self.info.split_by_chapters:
|
|
ytdl_params['outtmpl']['chapter'] = self.info.chapter_template
|
|
if 'postprocessors' not in ytdl_params:
|
|
ytdl_params['postprocessors'] = []
|
|
ytdl_params['postprocessors'].append({
|
|
'key': 'FFmpegSplitChapters',
|
|
'force_keyframes': False
|
|
})
|
|
|
|
clip_start = getattr(self.info, 'clip_start', None)
|
|
clip_end = getattr(self.info, 'clip_end', None)
|
|
if clip_start is not None or clip_end is not None:
|
|
start = float(clip_start) if clip_start is not None else 0.0
|
|
end = float(clip_end) if clip_end is not None else float('inf')
|
|
ytdl_params['download_ranges'] = yt_dlp.utils.download_range_func(
|
|
None,
|
|
[(start, end)],
|
|
)
|
|
|
|
ret = yt_dlp.YoutubeDL(params=ytdl_params).download([self.info.url])
|
|
self.status_queue.put({'status': 'finished' if ret == 0 else 'error'})
|
|
log.info(f"Finished download for: {self.info.title}")
|
|
except yt_dlp.utils.YoutubeDLError as exc:
|
|
log.error(f"Download error for {self.info.title}: {str(exc)}")
|
|
self.status_queue.put({'status': 'error', 'msg': str(exc)})
|
|
|
|
async def start(self, notifier):
|
|
log.info(f"Preparing download for: {self.info.title}")
|
|
if Download.manager is None:
|
|
Download.manager = multiprocessing.Manager()
|
|
self.status_queue = Download.manager.Queue()
|
|
self.proc = multiprocessing.Process(target=self._download)
|
|
self.proc.start()
|
|
self.loop = asyncio.get_running_loop()
|
|
self.notifier = notifier
|
|
self.info.status = 'preparing'
|
|
await self.notifier.updated(self.info)
|
|
self.status_task = asyncio.create_task(self.update_status())
|
|
await self.loop.run_in_executor(None, self.proc.join)
|
|
# Signal update_status to stop and wait for it to finish
|
|
# so that all status updates (including MoveFiles with correct
|
|
# file size) are processed before _post_download_cleanup runs.
|
|
if self.status_queue is not None:
|
|
self.status_queue.put(None)
|
|
await self.status_task
|
|
|
|
def cancel(self):
|
|
log.info(f"Cancelling download: {self.info.title}")
|
|
if self.running():
|
|
try:
|
|
self.proc.kill()
|
|
except Exception as e:
|
|
log.error(f"Error killing process for {self.info.title}: {e}")
|
|
self.canceled = True
|
|
if self.status_queue is not None:
|
|
self.status_queue.put(None)
|
|
|
|
def close(self):
|
|
log.info(f"Closing download process for: {self.info.title}")
|
|
if self.started():
|
|
self.proc.close()
|
|
|
|
def running(self):
|
|
try:
|
|
return self.proc is not None and self.proc.is_alive()
|
|
except ValueError:
|
|
return False
|
|
|
|
def started(self):
|
|
return self.proc is not None
|
|
|
|
async def update_status(self):
|
|
while True:
|
|
status = await self.loop.run_in_executor(None, self.status_queue.get)
|
|
if status is None:
|
|
log.info(f"Status update finished for: {self.info.title}")
|
|
return
|
|
if self.canceled:
|
|
log.info(f"Download {self.info.title} is canceled; stopping status updates.")
|
|
return
|
|
self.tmpfilename = status.get('tmpfilename')
|
|
if 'filename' in status:
|
|
fileName = status.get('filename')
|
|
rel_name = os.path.relpath(fileName, self.download_dir)
|
|
# For captions mode, ignore media-like placeholders and let subtitle_file
|
|
# statuses define the final file shown in the UI.
|
|
if getattr(self.info, 'download_type', '') == 'captions':
|
|
requested_subtitle_format = str(getattr(self.info, 'format', '')).lower()
|
|
allowed_caption_exts = ('.txt',) if requested_subtitle_format == 'txt' else ('.vtt', '.srt', '.sbv', '.scc', '.ttml', '.dfxp')
|
|
if not rel_name.lower().endswith(allowed_caption_exts):
|
|
continue
|
|
self.info.filename = rel_name
|
|
self.info.size = os.path.getsize(fileName) if os.path.exists(fileName) else None
|
|
if getattr(self.info, 'download_type', '') == 'thumbnail':
|
|
self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename)
|
|
|
|
# Handle chapter files
|
|
log.debug(f"Update status for {self.info.title}: {status}")
|
|
if 'chapter_file' in status:
|
|
chapter_file = status.get('chapter_file')
|
|
if not hasattr(self.info, 'chapter_files'):
|
|
self.info.chapter_files = []
|
|
rel_path = os.path.relpath(chapter_file, self.download_dir)
|
|
file_size = os.path.getsize(chapter_file) if os.path.exists(chapter_file) else None
|
|
#Postprocessor hook called multiple times with chapters. Only insert if not already present.
|
|
existing = next((cf for cf in self.info.chapter_files if cf['filename'] == rel_path), None)
|
|
if not existing:
|
|
self.info.chapter_files.append({'filename': rel_path, 'size': file_size})
|
|
# Skip the rest of status processing for chapter files
|
|
continue
|
|
|
|
if 'subtitle_file' in status:
|
|
subtitle_file = status.get('subtitle_file')
|
|
if not subtitle_file:
|
|
continue
|
|
subtitle_output_file = subtitle_file
|
|
|
|
# txt mode is derived from SRT by stripping cue metadata.
|
|
if getattr(self.info, 'download_type', '') == 'captions' and str(getattr(self.info, 'format', '')).lower() == 'txt':
|
|
converted_txt = _convert_srt_to_txt_file(subtitle_file)
|
|
if converted_txt:
|
|
subtitle_output_file = converted_txt
|
|
if converted_txt != subtitle_file:
|
|
try:
|
|
os.remove(subtitle_file)
|
|
except OSError as exc:
|
|
log.debug(f"Could not remove temporary SRT file {subtitle_file}: {exc}")
|
|
|
|
rel_path = os.path.relpath(subtitle_output_file, self.download_dir)
|
|
file_size = os.path.getsize(subtitle_output_file) if os.path.exists(subtitle_output_file) else None
|
|
existing = next((sf for sf in self.info.subtitle_files if sf['filename'] == rel_path), None)
|
|
if not existing:
|
|
self.info.subtitle_files.append({'filename': rel_path, 'size': file_size})
|
|
# Prefer first subtitle file as the primary result link in captions mode.
|
|
if getattr(self.info, 'download_type', '') == 'captions' and (
|
|
not getattr(self.info, 'filename', None) or
|
|
str(getattr(self.info, 'format', '')).lower() == 'txt'
|
|
):
|
|
self.info.filename = rel_path
|
|
self.info.size = file_size
|
|
continue
|
|
|
|
self.info.status = status['status']
|
|
self.info.msg = status.get('msg')
|
|
if 'downloaded_bytes' in status:
|
|
total = status.get('total_bytes') or status.get('total_bytes_estimate')
|
|
if total:
|
|
self.info.percent = status['downloaded_bytes'] / total * 100
|
|
self.info.speed = status.get('speed')
|
|
self.info.eta = status.get('eta')
|
|
log.debug(f"Updating status for {self.info.title}: {status}")
|
|
await self.notifier.updated(self.info)
|
|
|
|
class PersistentQueue:
|
|
def __init__(self, name, path):
|
|
self.identifier = name
|
|
pdir = os.path.dirname(path)
|
|
if not os.path.isdir(pdir):
|
|
os.mkdir(pdir)
|
|
self.legacy_path = path
|
|
self.path = f"{path}.json"
|
|
self.store = AtomicJsonStore(self.path, kind=f"persistent_queue:{name}")
|
|
self.dict = OrderedDict()
|
|
|
|
def load(self):
|
|
for k, v in self.saved_items():
|
|
self.dict[k] = Download(None, None, None, None, getattr(v, 'quality', 'best'), getattr(v, 'format', 'any'), {}, v)
|
|
|
|
def exists(self, key):
|
|
return key in self.dict
|
|
|
|
def get(self, key):
|
|
return self.dict[key]
|
|
|
|
def items(self):
|
|
return self.dict.items()
|
|
|
|
def saved_items(self):
|
|
items = [
|
|
(item["key"], _download_info_from_record(item["info"]))
|
|
for item in self._load_state_items()
|
|
]
|
|
return sorted(items, key=lambda item: item[1].timestamp)
|
|
|
|
def _should_persist_entry(self) -> bool:
|
|
return self.identifier != "completed"
|
|
|
|
def _serialize_items(self):
|
|
return [
|
|
{
|
|
"key": key,
|
|
"info": _download_info_to_record(
|
|
download.info,
|
|
include_entry=self._should_persist_entry(),
|
|
),
|
|
}
|
|
for key, download in self.dict.items()
|
|
]
|
|
|
|
def _save_dict(self):
|
|
self.store.save({"items": self._serialize_items()})
|
|
|
|
def _load_state_items(self):
|
|
payload = self.store.load()
|
|
if payload is not None:
|
|
items = payload.get("items")
|
|
if isinstance(items, list):
|
|
compact_items = [
|
|
{
|
|
"key": item["key"],
|
|
"info": _download_info_to_record(
|
|
_download_info_from_record(item["info"]),
|
|
include_entry=self._should_persist_entry(),
|
|
),
|
|
}
|
|
for item in items
|
|
if isinstance(item, dict) and "key" in item and "info" in item
|
|
]
|
|
if payload.get("schema_version") != self.store.schema_version or compact_items != items:
|
|
self.store.save({"items": compact_items})
|
|
return compact_items
|
|
log.warning("PersistentQueue:%s state file did not contain an items list", self.identifier)
|
|
return []
|
|
|
|
legacy_items = read_legacy_shelf(self.legacy_path)
|
|
if legacy_items is None:
|
|
return []
|
|
|
|
items = [
|
|
{
|
|
"key": key,
|
|
"info": _download_info_to_record(
|
|
value,
|
|
include_entry=self._should_persist_entry(),
|
|
),
|
|
}
|
|
for key, value in sorted(legacy_items, key=lambda item: item[1].timestamp)
|
|
]
|
|
self.store.save({"items": items})
|
|
return items
|
|
|
|
def put(self, value):
|
|
key = value.info.url
|
|
old = self.dict.get(key)
|
|
self.dict[key] = value
|
|
try:
|
|
self._save_dict()
|
|
except Exception:
|
|
if old is None:
|
|
del self.dict[key]
|
|
else:
|
|
self.dict[key] = old
|
|
raise
|
|
|
|
def delete(self, key):
|
|
if key in self.dict:
|
|
old = self.dict[key]
|
|
del self.dict[key]
|
|
try:
|
|
self._save_dict()
|
|
except Exception:
|
|
self.dict[key] = old
|
|
raise
|
|
|
|
def next(self):
|
|
k, v = next(iter(self.dict.items()))
|
|
return k, v
|
|
|
|
def empty(self):
|
|
return not bool(self.dict)
|
|
|
|
class DownloadQueue:
|
|
def __init__(self, config, notifier):
|
|
self.config = config
|
|
self.notifier = notifier
|
|
self.queue = PersistentQueue("queue", self.config.STATE_DIR + '/queue')
|
|
self.done = PersistentQueue("completed", self.config.STATE_DIR + '/completed')
|
|
self.pending = PersistentQueue("pending", self.config.STATE_DIR + '/pending')
|
|
self.active_downloads = set()
|
|
self.semaphore = asyncio.Semaphore(int(self.config.MAX_CONCURRENT_DOWNLOADS))
|
|
self.done.load()
|
|
self._add_generation = 0
|
|
self._canceled_urls = set() # URLs canceled during current playlist add
|
|
self._scheduled_probe_at: dict[str, float] = {}
|
|
self._scheduled_probe_failures: dict[str, int] = {}
|
|
self._live_monitor_task: Optional[asyncio.Task] = None
|
|
self._live_monitor_wakeup = asyncio.Event()
|
|
|
|
def cancel_add(self):
|
|
self._add_generation += 1
|
|
log.info('Playlist add operation canceled by user')
|
|
|
|
async def __import_queue(self):
|
|
for k, v in self.queue.saved_items():
|
|
await self.__add_download(v, True)
|
|
|
|
async def __import_pending(self):
|
|
for k, v in self.pending.saved_items():
|
|
await self.__add_download(v, False)
|
|
|
|
async def initialize(self):
|
|
log.info("Initializing DownloadQueue")
|
|
self._start_live_monitor()
|
|
asyncio.create_task(self.__import_queue())
|
|
asyncio.create_task(self.__import_pending())
|
|
|
|
def _start_live_monitor(self) -> None:
|
|
if self._live_monitor_task is not None and not self._live_monitor_task.done():
|
|
return
|
|
self._live_monitor_task = asyncio.create_task(self._live_monitor_loop())
|
|
self._live_monitor_task.add_done_callback(
|
|
lambda t: log.error("Live monitor loop failed: %s", t.exception())
|
|
if not t.cancelled() and t.exception()
|
|
else None
|
|
)
|
|
|
|
def _register_scheduled(self, download: Download) -> None:
|
|
self._scheduled_probe_at[download.info.url] = 0
|
|
self._scheduled_probe_failures.pop(download.info.url, None)
|
|
self._start_live_monitor()
|
|
self._wake_live_monitor()
|
|
|
|
def _unregister_scheduled(self, url: str) -> None:
|
|
self._scheduled_probe_at.pop(url, None)
|
|
self._scheduled_probe_failures.pop(url, None)
|
|
|
|
def _wake_live_monitor(self) -> None:
|
|
try:
|
|
self._live_monitor_wakeup.set()
|
|
except RuntimeError:
|
|
pass
|
|
|
|
def _probe_interval_seconds(self, release_timestamp: Any) -> float:
|
|
if release_timestamp is not None:
|
|
try:
|
|
diff = float(release_timestamp) - time.time()
|
|
if diff > 0:
|
|
return max(_LIVE_CHECK_INTERVAL, min(diff, _LIVE_MAX_CHECK_INTERVAL))
|
|
except (TypeError, ValueError):
|
|
pass
|
|
return float(_LIVE_CHECK_INTERVAL)
|
|
|
|
def _seconds_until_next_probe(self) -> Optional[float]:
|
|
"""Time until the earliest scheduled probe, or None when nothing is scheduled."""
|
|
if not self._scheduled_probe_at:
|
|
return None
|
|
return max(0.0, min(self._scheduled_probe_at.values()) - time.time())
|
|
|
|
async def _live_monitor_loop(self) -> None:
|
|
while True:
|
|
timeout = self._seconds_until_next_probe()
|
|
try:
|
|
await asyncio.wait_for(self._live_monitor_wakeup.wait(), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
self._live_monitor_wakeup.clear()
|
|
now = time.time()
|
|
due: list[Download] = []
|
|
for url, probe_at in list(self._scheduled_probe_at.items()):
|
|
if now < probe_at:
|
|
continue
|
|
if not self.queue.exists(url):
|
|
self._unregister_scheduled(url)
|
|
continue
|
|
download = self.queue.get(url)
|
|
if download.info.status != 'scheduled' or download.canceled:
|
|
self._unregister_scheduled(url)
|
|
continue
|
|
due.append(download)
|
|
for download in due:
|
|
try:
|
|
await self._probe_scheduled_download(download)
|
|
except Exception as exc:
|
|
# Defensive: _probe_scheduled_download handles its own errors,
|
|
# but never let an unexpected failure leave probe_at in the past
|
|
# (which would spin this loop) or kill the monitor task.
|
|
log.exception("Scheduled live probe crashed for %s: %s", download.info.url, exc)
|
|
if download.info.url in self._scheduled_probe_at:
|
|
self._scheduled_probe_at[download.info.url] = time.time() + _LIVE_CHECK_INTERVAL
|
|
|
|
async def _probe_scheduled_download(self, download: Download) -> None:
|
|
url = download.info.url
|
|
info = download.info
|
|
if info.status != 'scheduled' or download.canceled:
|
|
self._unregister_scheduled(url)
|
|
return
|
|
|
|
try:
|
|
entry = await asyncio.get_running_loop().run_in_executor(
|
|
None,
|
|
partial(
|
|
self.__extract_info,
|
|
url,
|
|
getattr(info, 'ytdl_options_presets', None),
|
|
getattr(info, 'ytdl_options_overrides', {}) or {},
|
|
),
|
|
)
|
|
except Exception as exc:
|
|
# Treat all probe failures (transient network blips, rate limits,
|
|
# extractor errors) as recoverable up to a point: retry on the next
|
|
# interval and only give up after repeated consecutive failures so a
|
|
# momentary glitch doesn't abandon a stream the user is waiting for.
|
|
fails = self._scheduled_probe_failures.get(url, 0) + 1
|
|
self._scheduled_probe_failures[url] = fails
|
|
if fails >= _LIVE_PROBE_MAX_FAILURES:
|
|
log.warning(
|
|
"Giving up on scheduled live probe for %s after %d consecutive failures: %s",
|
|
info.title, fails, exc,
|
|
)
|
|
info.status = 'error'
|
|
info.msg = str(exc)
|
|
if not info.error:
|
|
info.error = str(exc)
|
|
self._unregister_scheduled(url)
|
|
self.queue.delete(url)
|
|
self.done.put(download)
|
|
await self.notifier.completed(info)
|
|
else:
|
|
log.warning(
|
|
"Scheduled live probe failed for %s (attempt %d/%d), will retry: %s",
|
|
info.title, fails, _LIVE_PROBE_MAX_FAILURES, exc,
|
|
)
|
|
self._scheduled_probe_at[url] = time.time() + _LIVE_CHECK_INTERVAL
|
|
return
|
|
|
|
# Successful probe resets the transient-failure streak.
|
|
self._scheduled_probe_failures.pop(url, None)
|
|
|
|
release_ts = entry.get('release_timestamp')
|
|
live_status = entry.get('live_status')
|
|
if release_ts is not None:
|
|
info.live_release_timestamp = release_ts
|
|
if live_status is not None:
|
|
info.live_status = live_status
|
|
|
|
if live_status == 'is_upcoming':
|
|
self._scheduled_probe_at[url] = time.time() + self._probe_interval_seconds(release_ts)
|
|
await self.notifier.updated(info)
|
|
return
|
|
|
|
self._unregister_scheduled(url)
|
|
info.status = 'pending'
|
|
# Clear the "scheduled to start at ..." placeholder now that the stream
|
|
# is live and a real download is about to begin.
|
|
info.error = None
|
|
info.msg = None
|
|
await self.notifier.updated(info)
|
|
asyncio.create_task(self.__start_download(download))
|
|
|
|
def _schedule_upcoming_download(self, download: Download) -> None:
|
|
download.info.status = 'scheduled'
|
|
self.queue.put(download)
|
|
self._register_scheduled(download)
|
|
|
|
def _force_start_scheduled(self, download: Download) -> None:
|
|
self._unregister_scheduled(download.info.url)
|
|
download.info.status = 'pending'
|
|
download.info.error = None
|
|
download.info.msg = None
|
|
asyncio.create_task(self.__start_download(download))
|
|
|
|
async def __start_download(self, download):
|
|
if download.canceled:
|
|
log.info(f"Download {download.info.title} was canceled, skipping start.")
|
|
return
|
|
async with self.semaphore:
|
|
if download.canceled:
|
|
log.info(f"Download {download.info.title} was canceled, skipping start.")
|
|
return
|
|
await download.start(self.notifier)
|
|
self._post_download_cleanup(download)
|
|
|
|
def _post_download_cleanup(self, download):
|
|
if download.info.status != 'finished':
|
|
if download.tmpfilename and os.path.isfile(download.tmpfilename):
|
|
try:
|
|
os.remove(download.tmpfilename)
|
|
except OSError:
|
|
pass
|
|
download.info.status = 'error'
|
|
download.close()
|
|
if self.queue.exists(download.info.url):
|
|
self.queue.delete(download.info.url)
|
|
if download.canceled:
|
|
asyncio.create_task(self.notifier.canceled(download.info.url))
|
|
else:
|
|
self.done.put(download)
|
|
asyncio.create_task(self.notifier.completed(download.info))
|
|
try:
|
|
clear_after = int(self.config.CLEAR_COMPLETED_AFTER)
|
|
except ValueError:
|
|
log.error(f'CLEAR_COMPLETED_AFTER is set to an invalid value "{self.config.CLEAR_COMPLETED_AFTER}", expected an integer number of seconds')
|
|
clear_after = 0
|
|
if clear_after > 0:
|
|
task = asyncio.create_task(self.__auto_clear_after_delay(download.info.url, clear_after))
|
|
task.add_done_callback(lambda t: log.error(f'Auto-clear task failed: {t.exception()}') if not t.cancelled() and t.exception() else None)
|
|
|
|
async def __auto_clear_after_delay(self, url, delay_seconds):
|
|
await asyncio.sleep(delay_seconds)
|
|
if self.done.exists(url):
|
|
log.debug(f'Auto-clearing completed download: {url}')
|
|
await self.clear([url])
|
|
|
|
def _build_ytdl_options(self, ytdl_options_presets=None, ytdl_options_overrides=None):
|
|
"""Merge global options, presets (in order), and per-download overrides."""
|
|
opts = dict(self.config.YTDL_OPTIONS)
|
|
for preset_name in ytdl_options_presets or []:
|
|
opts.update(self.config.YTDL_OPTIONS_PRESETS.get(preset_name, {}))
|
|
opts.update(ytdl_options_overrides or {})
|
|
return opts
|
|
|
|
def __extract_info(self, url, ytdl_options_presets=None, ytdl_options_overrides=None):
|
|
debug_logging = logging.getLogger().isEnabledFor(logging.DEBUG)
|
|
user_opts = self._build_ytdl_options(ytdl_options_presets, ytdl_options_overrides)
|
|
params = {
|
|
**user_opts,
|
|
'quiet': not debug_logging,
|
|
'verbose': debug_logging,
|
|
'no_color': True,
|
|
'extract_flat': True,
|
|
'ignore_no_formats_error': True,
|
|
'noplaylist': True,
|
|
'paths': {"home": self.config.DOWNLOAD_DIR, "temp": self.config.TEMP_DIR},
|
|
}
|
|
imp = user_opts.get('impersonate')
|
|
if imp is not None:
|
|
params['impersonate'] = yt_dlp.networking.impersonate.ImpersonateTarget.from_str(imp)
|
|
return yt_dlp.YoutubeDL(params=params).extract_info(url, download=False)
|
|
|
|
def __calc_download_path(self, download_type, folder):
|
|
base_directory = self.config.AUDIO_DOWNLOAD_DIR if download_type == 'audio' else self.config.DOWNLOAD_DIR
|
|
if folder:
|
|
if not self.config.CUSTOM_DIRS:
|
|
return None, {'status': 'error', 'msg': 'A folder for the download was specified but CUSTOM_DIRS is not true in the configuration.'}
|
|
dldirectory = os.path.realpath(os.path.join(base_directory, folder))
|
|
real_base_directory = os.path.realpath(base_directory)
|
|
if not dldirectory.startswith(real_base_directory):
|
|
return None, {'status': 'error', 'msg': f'Folder "{folder}" must resolve inside the base download directory "{real_base_directory}"'}
|
|
if not os.path.isdir(dldirectory):
|
|
if not self.config.CREATE_CUSTOM_DIRS:
|
|
return None, {'status': 'error', 'msg': f'Folder "{folder}" for download does not exist inside base directory "{real_base_directory}", and CREATE_CUSTOM_DIRS is not true in the configuration.'}
|
|
os.makedirs(dldirectory, exist_ok=True)
|
|
else:
|
|
dldirectory = base_directory
|
|
return dldirectory, None
|
|
|
|
async def __add_download(self, dl, auto_start):
|
|
dldirectory, error_message = self.__calc_download_path(dl.download_type, dl.folder)
|
|
if error_message is not None:
|
|
return error_message
|
|
output = self.config.OUTPUT_TEMPLATE if len(dl.custom_name_prefix) == 0 else f'{dl.custom_name_prefix}.{self.config.OUTPUT_TEMPLATE}'
|
|
output_chapter = self.config.OUTPUT_TEMPLATE_CHAPTER
|
|
entry = getattr(dl, 'entry', None)
|
|
if entry is not None and entry.get('playlist_index') is not None:
|
|
if len(self.config.OUTPUT_TEMPLATE_PLAYLIST):
|
|
output = self.config.OUTPUT_TEMPLATE_PLAYLIST
|
|
sanitized = {k: _sanitize_path_component(v) for k, v in entry.items()}
|
|
output = _resolve_outtmpl_fields(output, sanitized, ('playlist',))
|
|
if entry is not None and entry.get('channel_index') is not None:
|
|
if len(self.config.OUTPUT_TEMPLATE_CHANNEL):
|
|
output = self.config.OUTPUT_TEMPLATE_CHANNEL
|
|
sanitized = {k: _sanitize_path_component(v) for k, v in entry.items()}
|
|
output = _resolve_outtmpl_fields(output, sanitized, ('channel',))
|
|
ytdl_options = self._build_ytdl_options(
|
|
getattr(dl, 'ytdl_options_presets', None),
|
|
getattr(dl, 'ytdl_options_overrides', {}) or {},
|
|
)
|
|
playlist_item_limit = getattr(dl, 'playlist_item_limit', 0)
|
|
if playlist_item_limit > 0:
|
|
log.info(f'playlist limit is set. Processing only first {playlist_item_limit} entries')
|
|
ytdl_options['playlistend'] = playlist_item_limit
|
|
download = Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, dl.quality, dl.format, ytdl_options, dl)
|
|
is_upcoming = (
|
|
getattr(dl, 'live_status', None) == 'is_upcoming'
|
|
or getattr(dl, 'status', None) == 'scheduled'
|
|
)
|
|
if auto_start is True:
|
|
if is_upcoming:
|
|
self._schedule_upcoming_download(download)
|
|
else:
|
|
self.queue.put(download)
|
|
asyncio.create_task(self.__start_download(download))
|
|
else:
|
|
self.pending.put(download)
|
|
await self.notifier.added(dl)
|
|
|
|
async def __add_entry(
|
|
self,
|
|
entry,
|
|
download_type,
|
|
codec,
|
|
format,
|
|
quality,
|
|
folder,
|
|
custom_name_prefix,
|
|
playlist_item_limit,
|
|
auto_start,
|
|
split_by_chapters,
|
|
chapter_template,
|
|
subtitle_language,
|
|
subtitle_mode,
|
|
ytdl_options_presets,
|
|
ytdl_options_overrides,
|
|
clip_start,
|
|
clip_end,
|
|
already,
|
|
_add_gen=None,
|
|
):
|
|
if not entry:
|
|
return {'status': 'error', 'msg': "Invalid/empty data was given."}
|
|
|
|
error = None
|
|
if "live_status" in entry and "release_timestamp" in entry and entry.get("live_status") == "is_upcoming":
|
|
dt_ts = datetime.fromtimestamp(entry.get("release_timestamp")).strftime('%Y-%m-%d %H:%M:%S %z')
|
|
error = f"Live stream is scheduled to start at {dt_ts}"
|
|
else:
|
|
if "msg" in entry:
|
|
error = entry["msg"]
|
|
|
|
etype = entry.get('_type') or 'video'
|
|
|
|
if etype.startswith('url'):
|
|
log.debug('Processing as a url')
|
|
return await self.add(
|
|
entry['url'],
|
|
download_type,
|
|
codec,
|
|
format,
|
|
quality,
|
|
folder,
|
|
custom_name_prefix,
|
|
playlist_item_limit,
|
|
auto_start,
|
|
split_by_chapters,
|
|
chapter_template,
|
|
subtitle_language,
|
|
subtitle_mode,
|
|
ytdl_options_presets,
|
|
ytdl_options_overrides,
|
|
clip_start,
|
|
clip_end,
|
|
already,
|
|
_add_gen,
|
|
)
|
|
elif etype == 'playlist' or etype == 'channel':
|
|
log.debug(f'Processing as a {etype}')
|
|
entries = entry['entries']
|
|
# Convert generator to list if needed (for len() and slicing operations)
|
|
if isinstance(entries, types.GeneratorType):
|
|
entries = list(entries)
|
|
total_entries = len(entries)
|
|
log.info(f'{etype} detected with {total_entries} entries')
|
|
index_digits = len(str(total_entries))
|
|
results = []
|
|
if playlist_item_limit > 0:
|
|
log.info(f'Item limit is set. Processing only first {playlist_item_limit} entries')
|
|
entries = entries[:playlist_item_limit]
|
|
for index, etr in enumerate(entries, start=1):
|
|
if _add_gen is not None and self._add_generation != _add_gen:
|
|
log.info(f'Playlist add canceled after processing {len(already)} entries')
|
|
return {'status': 'ok', 'msg': f'Canceled - added {len(already)} items before cancel'}
|
|
if "id" not in etr:
|
|
etr["id"] = _entry_id(etr)
|
|
etr["_type"] = "video"
|
|
etr[etype] = entry.get("id") or entry.get("channel_id") or entry.get("channel")
|
|
etr[f"{etype}_index"] = '{{0:0{0:d}d}}'.format(index_digits).format(index)
|
|
etr[f"{etype}_count"] = total_entries
|
|
etr[f"{etype}_autonumber"] = index
|
|
# n_entries: standard yt-dlp field for total count (used by template engine)
|
|
# __last_playlist_index: yt-dlp internal field for auto-padding autonumber
|
|
etr["n_entries"] = total_entries
|
|
etr["__last_playlist_index"] = total_entries
|
|
for property in ("id", "title", "uploader", "uploader_id"):
|
|
if property in entry:
|
|
etr[f"{etype}_{property}"] = entry[property]
|
|
results.append(
|
|
await self.__add_entry(
|
|
etr,
|
|
download_type,
|
|
codec,
|
|
format,
|
|
quality,
|
|
folder,
|
|
custom_name_prefix,
|
|
playlist_item_limit,
|
|
auto_start,
|
|
split_by_chapters,
|
|
chapter_template,
|
|
subtitle_language,
|
|
subtitle_mode,
|
|
ytdl_options_presets,
|
|
ytdl_options_overrides,
|
|
clip_start,
|
|
clip_end,
|
|
already,
|
|
_add_gen,
|
|
)
|
|
)
|
|
if any(res['status'] == 'error' for res in results):
|
|
return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)}
|
|
return {'status': 'ok'}
|
|
elif etype == 'video' or (etype.startswith('url') and 'id' in entry and 'title' in entry):
|
|
log.debug('Processing as a video')
|
|
key = entry.get('webpage_url') or entry['url']
|
|
if key in self._canceled_urls:
|
|
log.info(f'Skipping canceled URL: {entry.get("title") or key}')
|
|
return {'status': 'ok'}
|
|
if not self.queue.exists(key):
|
|
dl = DownloadInfo(
|
|
id=entry['id'],
|
|
title=entry.get('title') or entry['id'],
|
|
url=key,
|
|
quality=quality,
|
|
download_type=download_type,
|
|
codec=codec,
|
|
format=format,
|
|
folder=folder,
|
|
custom_name_prefix=custom_name_prefix,
|
|
error=error,
|
|
entry=entry,
|
|
playlist_item_limit=playlist_item_limit,
|
|
split_by_chapters=split_by_chapters,
|
|
chapter_template=chapter_template,
|
|
subtitle_language=subtitle_language,
|
|
subtitle_mode=subtitle_mode,
|
|
ytdl_options_presets=ytdl_options_presets,
|
|
ytdl_options_overrides=ytdl_options_overrides,
|
|
clip_start=clip_start,
|
|
clip_end=clip_end,
|
|
live_status=entry.get('live_status'),
|
|
live_release_timestamp=entry.get('release_timestamp'),
|
|
)
|
|
await self.__add_download(dl, auto_start)
|
|
return {'status': 'ok'}
|
|
return {'status': 'error', 'msg': f'Unsupported resource "{etype}"'}
|
|
|
|
async def add(
|
|
self,
|
|
url,
|
|
download_type,
|
|
codec,
|
|
format,
|
|
quality,
|
|
folder,
|
|
custom_name_prefix,
|
|
playlist_item_limit,
|
|
auto_start=True,
|
|
split_by_chapters=False,
|
|
chapter_template=None,
|
|
subtitle_language="en",
|
|
subtitle_mode="prefer_manual",
|
|
ytdl_options_presets=None,
|
|
ytdl_options_overrides=None,
|
|
clip_start=None,
|
|
clip_end=None,
|
|
already=None,
|
|
_add_gen=None,
|
|
):
|
|
if ytdl_options_presets is None:
|
|
ytdl_options_presets = []
|
|
log.info(
|
|
f'adding {url}: {download_type=} {codec=} {format=} {quality=} {already=} {folder=} {custom_name_prefix=} '
|
|
f'{playlist_item_limit=} {auto_start=} {split_by_chapters=} {chapter_template=} '
|
|
f'{subtitle_language=} {subtitle_mode=} {ytdl_options_presets=} {clip_start=} {clip_end=}'
|
|
)
|
|
if already is None:
|
|
_add_gen = self._add_generation
|
|
self._canceled_urls.clear()
|
|
already = set() if already is None else already
|
|
if url in already:
|
|
log.info('recursion detected, skipping')
|
|
return {'status': 'ok'}
|
|
else:
|
|
already.add(url)
|
|
try:
|
|
entry = await asyncio.get_running_loop().run_in_executor(
|
|
None,
|
|
partial(self.__extract_info, url, ytdl_options_presets, ytdl_options_overrides),
|
|
)
|
|
except yt_dlp.utils.YoutubeDLError as exc:
|
|
return {'status': 'error', 'msg': str(exc)}
|
|
return await self.__add_entry(
|
|
entry,
|
|
download_type,
|
|
codec,
|
|
format,
|
|
quality,
|
|
folder,
|
|
custom_name_prefix,
|
|
playlist_item_limit,
|
|
auto_start,
|
|
split_by_chapters,
|
|
chapter_template,
|
|
subtitle_language,
|
|
subtitle_mode,
|
|
ytdl_options_presets,
|
|
ytdl_options_overrides,
|
|
clip_start,
|
|
clip_end,
|
|
already,
|
|
_add_gen,
|
|
)
|
|
|
|
async def add_entry(
|
|
self,
|
|
entry,
|
|
download_type,
|
|
codec,
|
|
format,
|
|
quality,
|
|
folder,
|
|
custom_name_prefix,
|
|
playlist_item_limit,
|
|
auto_start=True,
|
|
split_by_chapters=False,
|
|
chapter_template=None,
|
|
subtitle_language="en",
|
|
subtitle_mode="prefer_manual",
|
|
ytdl_options_presets=None,
|
|
ytdl_options_overrides=None,
|
|
clip_start=None,
|
|
clip_end=None,
|
|
):
|
|
if ytdl_options_presets is None:
|
|
ytdl_options_presets = []
|
|
normalized_entry = copy.deepcopy(entry) if isinstance(entry, dict) else entry
|
|
already = set()
|
|
return await self.__add_entry(
|
|
normalized_entry,
|
|
download_type,
|
|
codec,
|
|
format,
|
|
quality,
|
|
folder,
|
|
custom_name_prefix,
|
|
playlist_item_limit,
|
|
auto_start,
|
|
split_by_chapters,
|
|
chapter_template,
|
|
subtitle_language,
|
|
subtitle_mode,
|
|
ytdl_options_presets,
|
|
ytdl_options_overrides,
|
|
clip_start,
|
|
clip_end,
|
|
already,
|
|
None,
|
|
)
|
|
|
|
async def start_pending(self, ids):
|
|
for id in ids:
|
|
if self.pending.exists(id):
|
|
dl = self.pending.get(id)
|
|
self.pending.delete(id)
|
|
if getattr(dl.info, 'live_status', None) == 'is_upcoming':
|
|
self._schedule_upcoming_download(dl)
|
|
else:
|
|
self.queue.put(dl)
|
|
asyncio.create_task(self.__start_download(dl))
|
|
continue
|
|
if self.queue.exists(id):
|
|
dl = self.queue.get(id)
|
|
if dl.info.status == 'scheduled':
|
|
self._force_start_scheduled(dl)
|
|
continue
|
|
log.warning(f'requested start for non-existent download {id}')
|
|
return {'status': 'ok'}
|
|
|
|
async def cancel(self, ids):
|
|
for id in ids:
|
|
# Track URL so playlist add loop won't re-queue it
|
|
self._canceled_urls.add(id)
|
|
if self.pending.exists(id):
|
|
self.pending.delete(id)
|
|
await self.notifier.canceled(id)
|
|
continue
|
|
if not self.queue.exists(id):
|
|
log.warning(f'requested cancel for non-existent download {id}')
|
|
continue
|
|
dl = self.queue.get(id)
|
|
if dl.info.status == 'scheduled':
|
|
self._unregister_scheduled(id)
|
|
if dl.started():
|
|
dl.cancel()
|
|
else:
|
|
dl.canceled = True
|
|
self.queue.delete(id)
|
|
await self.notifier.canceled(id)
|
|
return {'status': 'ok'}
|
|
|
|
async def clear(self, ids):
|
|
for id in ids:
|
|
if not self.done.exists(id):
|
|
log.warning(f'requested delete for non-existent download {id}')
|
|
continue
|
|
if self.config.DELETE_FILE_ON_TRASHCAN:
|
|
dl = self.done.get(id)
|
|
try:
|
|
dldirectory, _ = self.__calc_download_path(dl.info.download_type, dl.info.folder)
|
|
os.remove(os.path.join(dldirectory, dl.info.filename))
|
|
except Exception as e:
|
|
log.warning(f'deleting file for download {id} failed with error message {e!r}')
|
|
self.done.delete(id)
|
|
await self.notifier.cleared(id)
|
|
return {'status': 'ok'}
|
|
|
|
def get(self):
|
|
return (list((k, v.info) for k, v in self.queue.items()) +
|
|
list((k, v.info) for k, v in self.pending.items()),
|
|
list((k, v.info) for k, v in self.done.items()))
|