Files
metube/app/subscriptions.py

839 lines
30 KiB
Python

"""Channel/playlist subscriptions: periodic yt-dlp flat extract + queue new videos."""
from __future__ import annotations
import asyncio
import copy
import logging
import os
import re
import time
import types
import uuid
from dataclasses import dataclass, field, fields
from typing import Any, Optional
import yt_dlp
import yt_dlp.networking.impersonate
from state_store import AtomicJsonStore, read_legacy_shelf
log = logging.getLogger("subscriptions")
VIDEO_ONLY_MSG = (
"This URL points to a single video, not a channel or playlist. Use Download instead."
)
_MEDIA_HINT_FIELDS = (
"duration",
"timestamp",
"release_timestamp",
"upload_date",
"view_count",
"live_status",
"availability",
)
def _impersonate_opt(ytdl_options: dict) -> dict:
opts = dict(ytdl_options)
if "impersonate" in opts:
opts["impersonate"] = yt_dlp.networking.impersonate.ImpersonateTarget.from_str(
opts["impersonate"]
)
return opts
def _build_ydl_params(config, *, playlistend: Optional[int] = None) -> dict:
params: dict[str, Any] = {
"quiet": not logging.getLogger().isEnabledFor(logging.DEBUG),
"verbose": logging.getLogger().isEnabledFor(logging.DEBUG),
"no_color": True,
"extract_flat": True,
"ignore_no_formats_error": True,
"lazy_playlist": True,
"paths": {"home": config.DOWNLOAD_DIR, "temp": config.TEMP_DIR},
**config.YTDL_OPTIONS,
}
params = _impersonate_opt(params)
if playlistend is not None and playlistend > 0:
params["playlistend"] = playlistend
return params
def _is_media_entry(entry: Any) -> bool:
if not isinstance(entry, dict):
return False
etype = str(entry.get("_type") or "")
if etype in ("playlist", "multi_video", "channel"):
return False
if entry.get("entries"):
return False
url = _entry_video_url(entry)
if not url:
return False
ie_key = str(entry.get("ie_key") or entry.get("extractor_key") or "").lower()
if any(token in ie_key for token in ("playlist", "channel", "tab")):
return any(entry.get(field) is not None for field in _MEDIA_HINT_FIELDS)
return True
def extract_flat_playlist(config, url: str, playlistend: int, *, _depth: int = 0):
"""Return (info_dict, entries_list) for playlist/channel URLs."""
params = _build_ydl_params(config, playlistend=playlistend)
with yt_dlp.YoutubeDL(params=params) as ydl:
info = ydl.extract_info(url, download=False)
if not info:
return None, []
etype = info.get("_type") or "video"
if etype == "video":
return info, []
if etype in ("playlist", "channel"):
entries = info.get("entries") or []
if isinstance(entries, types.GeneratorType):
entries = list(entries)
# Drop None placeholders from incomplete flat playlists
entries = [e for e in entries if e]
media_entries = [e for e in entries if _is_media_entry(e)]
if media_entries:
return info, media_entries
if _depth < 1:
for ent in entries[:5]:
nested_url = _entry_video_url(ent)
if not nested_url:
continue
nested_info, nested_entries = extract_flat_playlist(
config,
nested_url,
playlistend,
_depth=_depth + 1,
)
if nested_entries:
return nested_info, nested_entries
return info, entries
if etype.startswith("url") and info.get("url"):
# Single nested URL without playlist wrapper — treat as non-subscribable
return info, []
return info, []
def _entry_video_url(entry: dict) -> Optional[str]:
return entry.get("webpage_url") or entry.get("url")
def _entry_id(entry: dict) -> Optional[str]:
eid = entry.get("id")
if eid is not None:
return str(eid)
url = _entry_video_url(entry)
return url
def _is_subscriber_only_entry(entry: dict) -> bool:
"""True when yt-dlp marks the entry as channel member-only (subscriber_only availability)."""
return str(entry.get("availability") or "") == "subscriber_only"
def coerce_optional_bool(value: Any, *, default: bool = False, field_name: str = "value") -> bool:
"""Parse optional JSON booleans for subscription settings."""
if value is None:
return default
try:
return _coerce_bool(value)
except ValueError as exc:
raise ValueError(f"{field_name} must be a boolean") from exc
@dataclass
class SubscriptionInfo:
id: str
name: str
url: str
enabled: bool = True
check_interval_minutes: int = 60
download_type: str = "video"
codec: str = "auto"
format: str = "any"
quality: str = "best"
folder: str = ""
custom_name_prefix: str = ""
auto_start: bool = True
playlist_item_limit: int = 0
split_by_chapters: bool = False
chapter_template: str = ""
subtitle_language: str = "en"
subtitle_mode: str = "prefer_manual"
ytdl_options_presets: list[str] = field(default_factory=list)
ytdl_options_overrides: dict[str, Any] = field(default_factory=dict)
title_regex: str = ""
skip_subscriber_only: bool = False
last_checked: Optional[float] = None
seen_ids: list[str] = field(default_factory=list)
error: Optional[str] = None
timestamp: float = field(default_factory=time.time)
def seen_set(self) -> set[str]:
return set(self.seen_ids)
def to_public_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"url": self.url,
"enabled": self.enabled,
"check_interval_minutes": self.check_interval_minutes,
"download_type": self.download_type,
"codec": self.codec,
"format": self.format,
"quality": self.quality,
"folder": self.folder,
"title_regex": self.title_regex,
"skip_subscriber_only": self.skip_subscriber_only,
"last_checked": self.last_checked,
"seen_count": len(self.seen_ids),
"error": self.error,
}
def _subscription_to_record(sub: SubscriptionInfo) -> dict[str, Any]:
return {
"id": sub.id,
"name": sub.name,
"url": sub.url,
"enabled": sub.enabled,
"check_interval_minutes": sub.check_interval_minutes,
"download_type": sub.download_type,
"codec": sub.codec,
"format": sub.format,
"quality": sub.quality,
"folder": sub.folder,
"custom_name_prefix": sub.custom_name_prefix,
"auto_start": sub.auto_start,
"playlist_item_limit": sub.playlist_item_limit,
"split_by_chapters": sub.split_by_chapters,
"chapter_template": sub.chapter_template,
"subtitle_language": sub.subtitle_language,
"subtitle_mode": sub.subtitle_mode,
"ytdl_options_presets": list(sub.ytdl_options_presets),
"ytdl_options_overrides": sub.ytdl_options_overrides,
"title_regex": sub.title_regex,
"skip_subscriber_only": sub.skip_subscriber_only,
"last_checked": sub.last_checked,
"seen_ids": list(sub.seen_ids),
"error": sub.error,
}
def _normalize_subscription_record(rec: dict[str, Any]) -> dict[str, Any]:
"""Migrate legacy ytdl_options_preset (str) to ytdl_options_presets (list)."""
out = dict(rec)
if "ytdl_options_presets" not in out:
old = out.pop("ytdl_options_preset", None)
if old is None:
out["ytdl_options_presets"] = []
elif isinstance(old, list):
out["ytdl_options_presets"] = [str(x).strip() for x in old if str(x).strip()]
elif isinstance(old, str):
out["ytdl_options_presets"] = [old.strip()] if old.strip() else []
else:
out["ytdl_options_presets"] = []
else:
out.pop("ytdl_options_preset", None)
return out
def _subscription_from_record(record: Any) -> Optional[SubscriptionInfo]:
field_names = {f.name for f in fields(SubscriptionInfo)}
if isinstance(record, SubscriptionInfo):
return record
if isinstance(record, dict):
try:
normalized = _normalize_subscription_record(dict(record))
return SubscriptionInfo(**{k: v for k, v in normalized.items() if k in field_names})
except TypeError:
return None
return None
def _normalize_title_regex_value(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value.strip()
return str(value).strip()
def validate_title_regex(value: Any) -> str:
"""Return stored title regex string; non-empty values must compile (re.error on failure)."""
s = _normalize_title_regex_value(value)
if s:
re.compile(s)
return s
def _coerce_bool(value: Any) -> bool:
"""Accept JSON booleans and common string forms used by API clients."""
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"true", "1", "on"}:
return True
if lowered in {"false", "0", "off"}:
return False
raise ValueError("enabled must be a boolean")
class SubscriptionNotifier:
"""Hook for Socket.IO / UI updates."""
async def subscription_added(self, sub: SubscriptionInfo) -> None:
raise NotImplementedError
async def subscription_updated(self, sub: SubscriptionInfo) -> None:
raise NotImplementedError
async def subscription_removed(self, sub_id: str) -> None:
raise NotImplementedError
async def subscriptions_all(self, subs: list[SubscriptionInfo]) -> None:
raise NotImplementedError
class SubscriptionManager:
def __init__(self, config, download_queue, notifier: SubscriptionNotifier):
self.config = config
self.dqueue = download_queue
self.notifier = notifier
pdir = config.STATE_DIR
if not os.path.isdir(pdir):
os.makedirs(pdir, exist_ok=True)
self._legacy_path = os.path.join(pdir, "subscriptions")
self._path = os.path.join(pdir, "subscriptions.json")
self._store = AtomicJsonStore(self._path, kind="subscriptions")
self._subs: dict[str, SubscriptionInfo] = {}
self._url_index: dict[str, str] = {} # normalized url -> id
self._pending_urls: set[str] = set()
self._lock = asyncio.Lock()
self._loop_task: Optional[asyncio.Task] = None
self._load_all()
def close(self) -> None:
# No persistent shelf handle to close.
return
def _normalize_url(self, url: str) -> str:
return (url or "").strip()
def _normalize_seen_ids(self, seen_ids: list[str]) -> list[str]:
max_seen = int(getattr(self.config, "SUBSCRIPTION_MAX_SEEN_IDS", 50000))
normalized = [str(sid) for sid in dict.fromkeys(seen_ids)]
if len(normalized) > max_seen:
normalized = normalized[:max_seen]
return normalized
def _load_all(self) -> None:
payload = self._store.load()
loaded_from_legacy = False
if payload is not None:
records = payload.get("items") or []
else:
legacy_items = read_legacy_shelf(self._legacy_path)
records = [raw for _key, raw in legacy_items] if legacy_items else []
if records:
loaded_from_legacy = True
loaded_subs = self._iter_valid_subs(records)
compact_records = []
for sub in loaded_subs:
sub.seen_ids = self._normalize_seen_ids(sub.seen_ids)
self._subs[sub.id] = sub
self._url_index[self._normalize_url(sub.url)] = sub.id
compact_records.append(_subscription_to_record(sub))
if loaded_from_legacy or (
payload is not None
and (
payload.get("schema_version") != self._store.schema_version
or compact_records != records
)
):
self._store.save({"items": compact_records})
def _iter_valid_subs(self, records: list[Any]) -> list[SubscriptionInfo]:
subs: list[SubscriptionInfo] = []
for record in records:
sub = _subscription_from_record(record)
if sub is not None:
subs.append(sub)
return subs
def _save_locked(self) -> None:
self._store.save({"items": [_subscription_to_record(sub) for sub in self._subs.values()]})
async def _queue_subscription_entries(
self,
entries: list[dict],
*,
download_type: str,
codec: str,
format: str,
quality: str,
folder: str,
custom_name_prefix: str,
playlist_item_limit: int,
auto_start: bool,
split_by_chapters: bool,
chapter_template: str,
subtitle_language: str,
subtitle_mode: str,
ytdl_options_presets: Optional[list[str]] = None,
ytdl_options_overrides: Optional[dict[str, Any]] = None,
) -> tuple[list[str], list[str]]:
queued_ids: list[str] = []
queue_errors: list[str] = []
presets = list(ytdl_options_presets or [])
for ent in entries:
eid = _entry_id(ent)
vurl = _entry_video_url(ent)
if not eid or not vurl:
continue
queue_entry = dict(ent)
if "id" not in queue_entry:
queue_entry["id"] = eid
queue_entry["_type"] = "video"
queue_entry["webpage_url"] = vurl
result = await self.dqueue.add_entry(
queue_entry,
download_type,
codec,
format,
quality,
folder or None,
custom_name_prefix,
playlist_item_limit,
auto_start,
split_by_chapters,
chapter_template or None,
subtitle_language,
subtitle_mode,
presets,
ytdl_options_overrides,
)
if isinstance(result, dict) and result.get("status") == "error":
msg = str(result.get("msg") or f"Queueing failed for {vurl}")
queue_errors.append(msg)
log.warning("Subscription queueing failed for %s: %s", vurl, msg)
continue
queued_ids.append(eid)
return queued_ids, queue_errors
def list_all(self) -> list[SubscriptionInfo]:
return list(self._subs.values())
def get(self, sub_id: str) -> Optional[SubscriptionInfo]:
return self._subs.get(sub_id)
def start_background_loop(self) -> None:
if self._loop_task is not None and not self._loop_task.done():
return
self._loop_task = asyncio.create_task(self._periodic_loop())
self._loop_task.add_done_callback(
lambda t: log.error("Subscription loop failed: %s", t.exception())
if not t.cancelled() and t.exception()
else None
)
async def _periodic_loop(self) -> None:
while True:
await asyncio.sleep(60)
try:
await self.run_due_checks()
except Exception as e:
log.exception("Subscription periodic check error: %s", e)
async def run_due_checks(self) -> None:
now = time.time()
due: list[SubscriptionInfo] = []
async with self._lock:
for sub in list(self._subs.values()):
if not sub.enabled:
continue
interval_sec = max(60, int(sub.check_interval_minutes) * 60)
if sub.last_checked is None:
due.append(sub)
continue
if now - sub.last_checked < interval_sec:
continue
due.append(sub)
for sub in due:
await self._check_one_unlocked(sub)
async def add_subscription(
self,
url: str,
*,
check_interval_minutes: int,
download_type: str,
codec: str,
format: str,
quality: str,
folder: str,
custom_name_prefix: str,
auto_start: bool,
playlist_item_limit: int,
split_by_chapters: bool,
chapter_template: str,
subtitle_language: str,
subtitle_mode: str,
ytdl_options_presets: Optional[list[str]] = None,
ytdl_options_overrides: Optional[dict[str, Any]] = None,
title_regex: Any = None,
skip_subscriber_only: Any = None,
) -> dict:
url = self._normalize_url(url)
if not url:
return {"status": "error", "msg": "Missing URL"}
try:
title_regex_stored = validate_title_regex(title_regex)
except re.error as exc:
return {"status": "error", "msg": f"Invalid title_regex: {exc}"}
try:
skip_so = coerce_optional_bool(
skip_subscriber_only,
default=False,
field_name="skip_subscriber_only",
)
except ValueError as exc:
return {"status": "error", "msg": str(exc)}
async with self._lock:
if url in self._url_index or url in self._pending_urls:
return {"status": "error", "msg": "This URL is already subscribed"}
self._pending_urls.add(url)
try:
scan_first = max(int(getattr(self.config, "SUBSCRIPTION_SCAN_PLAYLIST_END", 50)), 1)
try:
info, entries = extract_flat_playlist(self.config, url, scan_first)
except yt_dlp.utils.YoutubeDLError as exc:
return {"status": "error", "msg": str(exc)}
if not info:
return {"status": "error", "msg": "Could not resolve URL"}
etype = info.get("_type") or "video"
if etype not in ("playlist", "channel"):
return {"status": "error", "msg": VIDEO_ONLY_MSG}
name = (
info.get("title")
or info.get("channel")
or info.get("playlist_title")
or info.get("uploader")
or url
)
seen_entries = [ent for ent in entries if _is_media_entry(ent)]
all_ids: list[str] = []
for ent in seen_entries:
if ent.get("live_status") == "is_upcoming":
continue # Don't mark scheduled streams as seen; queue them when they go live
eid = _entry_id(ent)
if eid:
all_ids.append(eid)
sub = SubscriptionInfo(
id=str(uuid.uuid4()),
name=str(name),
url=url,
enabled=True,
check_interval_minutes=max(1, int(check_interval_minutes)),
download_type=download_type,
codec=codec,
format=format,
quality=quality,
folder=folder or "",
custom_name_prefix=custom_name_prefix or "",
auto_start=bool(auto_start),
playlist_item_limit=int(playlist_item_limit),
split_by_chapters=bool(split_by_chapters),
chapter_template=chapter_template or "",
subtitle_language=subtitle_language,
subtitle_mode=subtitle_mode,
ytdl_options_presets=list(ytdl_options_presets or []),
ytdl_options_overrides=dict(ytdl_options_overrides or {}),
title_regex=title_regex_stored,
skip_subscriber_only=skip_so,
last_checked=time.time(),
seen_ids=list(dict.fromkeys(all_ids)),
error=None,
)
async with self._lock:
if url in self._url_index:
return {"status": "error", "msg": "This URL is already subscribed"}
self._subs[sub.id] = sub
self._url_index[url] = sub.id
try:
self._save_locked()
except Exception:
self._subs.pop(sub.id, None)
self._url_index.pop(url, None)
raise
await self.notifier.subscription_added(sub)
return {"status": "ok", "subscription": sub.to_public_dict()}
finally:
async with self._lock:
self._pending_urls.discard(url)
async def delete_subscriptions(self, ids: list[str]) -> dict:
removed: list[str] = []
async with self._lock:
previous_subs = self._subs.copy()
previous_index = self._url_index.copy()
for sid in ids:
sub = self._subs.pop(sid, None)
if sub:
normalized_url = self._normalize_url(sub.url)
self._url_index.pop(normalized_url, None)
removed.append(sid)
if removed:
try:
self._save_locked()
except Exception:
self._subs = previous_subs
self._url_index = previous_index
raise
for sid in removed:
await self.notifier.subscription_removed(sid)
return {"status": "ok"}
async def update_subscription(self, sub_id: str, changes: dict) -> dict:
validated_tr: Optional[str] = None
if "title_regex" in changes:
try:
validated_tr = validate_title_regex(changes["title_regex"])
except re.error as exc:
return {"status": "error", "msg": f"Invalid title_regex: {exc}"}
skip_so_set = False
validated_skip_so = False
if "skip_subscriber_only" in changes:
try:
validated_skip_so = coerce_optional_bool(
changes["skip_subscriber_only"],
field_name="skip_subscriber_only",
)
skip_so_set = True
except ValueError as exc:
return {"status": "error", "msg": str(exc)}
async with self._lock:
sub = self._subs.get(sub_id)
if not sub:
return {"status": "error", "msg": "Subscription not found"}
previous = copy.deepcopy(sub)
old_enabled = sub.enabled
if "enabled" in changes:
sub.enabled = _coerce_bool(changes["enabled"])
if "check_interval_minutes" in changes:
sub.check_interval_minutes = max(1, int(changes["check_interval_minutes"]))
if "name" in changes and changes["name"]:
sub.name = str(changes["name"])
if validated_tr is not None:
sub.title_regex = validated_tr
if skip_so_set:
sub.skip_subscriber_only = validated_skip_so
try:
self._save_locked()
except Exception:
self._subs[sub_id] = previous
raise
updated = sub
if "enabled" in changes and updated.enabled != old_enabled:
log.info(
"Subscription %s %s",
updated.name,
"resumed" if updated.enabled else "paused",
)
await self.notifier.subscription_updated(updated)
return {"status": "ok", "subscription": updated.to_public_dict()}
async def check_now(self, ids: Optional[list[str]] = None) -> dict:
async with self._lock:
targets = (
[self._subs[i] for i in ids if i in self._subs]
if ids
else [s for s in self._subs.values() if s.enabled]
)
log.info(
"Manual subscription check requested for %d subscription(s)",
len(targets),
)
for sub in targets:
await self._check_one_unlocked(sub)
return {"status": "ok"}
async def _check_one_unlocked(self, sub: SubscriptionInfo) -> None:
sid = sub.id
scan = int(getattr(self.config, "SUBSCRIPTION_SCAN_PLAYLIST_END", 50))
log.info("Checking subscription: %s", sub.name)
try:
info, entries = extract_flat_playlist(self.config, sub.url, scan)
except yt_dlp.utils.YoutubeDLError as exc:
async with self._lock:
cur = self._subs.get(sid)
if cur:
previous = copy.deepcopy(cur)
cur.error = str(exc)
try:
self._save_locked()
except Exception:
self._subs[sid] = previous
raise
sub = cur
log.warning("Subscription check failed for %s: %s", sub.name, exc)
await self.notifier.subscription_updated(sub)
return
entries = [ent for ent in entries if _is_media_entry(ent)]
etype = (info or {}).get("_type") or "video"
if etype == "video" or not entries:
async with self._lock:
cur = self._subs.get(sid)
if cur:
previous = copy.deepcopy(cur)
cur.error = VIDEO_ONLY_MSG
try:
self._save_locked()
except Exception:
self._subs[sid] = previous
raise
sub = cur
log.warning("Subscription %s no longer resolves to a subscribable feed", sub.name)
await self.notifier.subscription_updated(sub)
return
async with self._lock:
cur = self._subs.get(sid)
if not cur:
return
seen = cur.seen_set()
seen_ids_snapshot = list(cur.seen_ids)
dl_type = cur.download_type
dl_codec = cur.codec
dl_format = cur.format
dl_quality = cur.quality
dl_folder = cur.folder
dl_prefix = cur.custom_name_prefix
dl_plimit = cur.playlist_item_limit
dl_autostart = cur.auto_start
dl_split = cur.split_by_chapters
dl_chapter = cur.chapter_template
dl_sublang = cur.subtitle_language
dl_submode = cur.subtitle_mode
dl_ytdl_presets = list(cur.ytdl_options_presets)
dl_ytdl_overrides = dict(cur.ytdl_options_overrides)
dl_title_regex = cur.title_regex or ""
dl_skip_subscriber_only = bool(cur.skip_subscriber_only)
new_entries: list[dict] = []
for ent in entries:
eid = _entry_id(ent)
if not eid:
continue
if eid in seen and ent.get("live_status") != "is_live":
continue
new_entries.append(ent)
pattern_re: Optional[re.Pattern[str]] = None
if dl_title_regex:
try:
pattern_re = re.compile(dl_title_regex)
except re.error:
log.warning(
"Invalid stored title_regex on subscription %s, ignoring filter",
sub.name,
)
queue_entries: list[dict] = []
filtered_ids: list[str] = []
for ent in new_entries:
eid = _entry_id(ent)
if pattern_re is not None:
title = str(ent.get("title") or "")
if not pattern_re.search(title):
if eid:
filtered_ids.append(eid)
continue
queue_entries.append(ent)
subscriber_filtered_ids: list[str] = []
if dl_skip_subscriber_only:
kept_entries: list[dict] = []
for ent in queue_entries:
eid = _entry_id(ent)
if _is_subscriber_only_entry(ent):
if eid:
subscriber_filtered_ids.append(eid)
continue
kept_entries.append(ent)
queue_entries = kept_entries
queued_ids, queue_errors = await self._queue_subscription_entries(
queue_entries,
download_type=dl_type,
codec=dl_codec,
format=dl_format,
quality=dl_quality,
folder=dl_folder,
custom_name_prefix=dl_prefix,
playlist_item_limit=dl_plimit,
auto_start=dl_autostart,
split_by_chapters=dl_split,
chapter_template=dl_chapter or "",
subtitle_language=dl_sublang,
subtitle_mode=dl_submode,
ytdl_options_presets=dl_ytdl_presets,
ytdl_options_overrides=dl_ytdl_overrides,
)
log.info(
"Subscription check finished for %s: %d new, %d filtered, %d subscriber_skipped, %d queued, %d failed",
sub.name,
len(new_entries),
len(filtered_ids),
len(subscriber_filtered_ids),
len(queued_ids),
len(queue_errors),
)
merged = list(
dict.fromkeys(
queued_ids + filtered_ids + subscriber_filtered_ids + seen_ids_snapshot
)
)
max_seen = int(getattr(self.config, "SUBSCRIPTION_MAX_SEEN_IDS", 50000))
if len(merged) > max_seen:
merged = merged[:max_seen]
async with self._lock:
cur = self._subs.get(sid)
if not cur:
return
previous = copy.deepcopy(cur)
cur.seen_ids = merged
cur.last_checked = time.time()
cur.error = "; ".join(queue_errors[:3]) if queue_errors else None
try:
self._save_locked()
except Exception:
self._subs[sid] = previous
raise
sub = cur
await self.notifier.subscription_updated(sub)
async def emit_all(self) -> None:
await self.notifier.subscriptions_all(self.list_all())