mirror of
https://github.com/alexta69/metube.git
synced 2026-06-13 16:40:05 +00:00
cr fixes
This commit is contained in:
@@ -114,6 +114,49 @@ async def test_cancel_removes_from_pending(dq_env):
|
|||||||
notifier.canceled.assert_awaited()
|
notifier.canceled.assert_awaited()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cancel_before_start_marks_download_canceled(dq_env):
|
||||||
|
"""Regression test for the race condition where cancel() arrives after the
|
||||||
|
download has been placed in the queue and ``__start_download`` has been
|
||||||
|
scheduled via ``asyncio.create_task`` but has not yet executed. Without the
|
||||||
|
fix, the pending task would run ``download.start()`` despite the user
|
||||||
|
cancelling, because its ``download.canceled`` guard was never flipped."""
|
||||||
|
notifier = AsyncMock()
|
||||||
|
|
||||||
|
def fake_extract(self, url, ytdl_options_presets=None, ytdl_options_overrides=None):
|
||||||
|
return {
|
||||||
|
"_type": "video",
|
||||||
|
"id": "vid1",
|
||||||
|
"title": "Test Video",
|
||||||
|
"url": url,
|
||||||
|
"webpage_url": url,
|
||||||
|
}
|
||||||
|
|
||||||
|
dq = DownloadQueue(dq_env, notifier)
|
||||||
|
url = "https://example.com/race"
|
||||||
|
start_mock = AsyncMock()
|
||||||
|
with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_extract), \
|
||||||
|
patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock):
|
||||||
|
await dq.add(
|
||||||
|
url,
|
||||||
|
"video",
|
||||||
|
"auto",
|
||||||
|
"any",
|
||||||
|
"best",
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
0,
|
||||||
|
auto_start=True,
|
||||||
|
)
|
||||||
|
assert dq.queue.exists(url)
|
||||||
|
download = dq.queue.get(url)
|
||||||
|
assert download.canceled is False
|
||||||
|
await dq.cancel([url])
|
||||||
|
assert not dq.queue.exists(url)
|
||||||
|
assert download.canceled is True
|
||||||
|
notifier.canceled.assert_awaited_with(url)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_start_pending_moves_to_queue(dq_env):
|
async def test_start_pending_moves_to_queue(dq_env):
|
||||||
notifier = AsyncMock()
|
notifier = AsyncMock()
|
||||||
|
|||||||
+5
-3
@@ -608,10 +608,12 @@
|
|||||||
@if (batchImportStatus) {
|
@if (batchImportStatus) {
|
||||||
<small>{{ batchImportStatus }}</small>
|
<small>{{ batchImportStatus }}</small>
|
||||||
}
|
}
|
||||||
@if (importInProgress) {
|
@if (batchImportTotal > 0) {
|
||||||
<div class="progress mt-2" style="height: 20px;">
|
<div class="progress mt-2" style="height: 20px;">
|
||||||
<div class="progress-bar" role="progressbar"
|
<div class="progress-bar"
|
||||||
[style.width.%]="batchImportTotal > 0 ? (batchImportCount / batchImportTotal) * 100 : 0"
|
[class.bg-danger]="batchImportFailures > 0"
|
||||||
|
role="progressbar"
|
||||||
|
[style.width.%]="(batchImportCount / batchImportTotal) * 100"
|
||||||
[attr.aria-valuenow]="batchImportCount"
|
[attr.aria-valuenow]="batchImportCount"
|
||||||
[attr.aria-valuemin]="0"
|
[attr.aria-valuemin]="0"
|
||||||
[attr.aria-valuemax]="batchImportTotal">
|
[attr.aria-valuemax]="batchImportTotal">
|
||||||
|
|||||||
+59
-29
@@ -1,7 +1,7 @@
|
|||||||
import { AsyncPipe, DatePipe, KeyValuePipe, NgTemplateOutlet } from '@angular/common';
|
import { AsyncPipe, DatePipe, KeyValuePipe, NgTemplateOutlet } from '@angular/common';
|
||||||
import { HttpClient } from '@angular/common/http';
|
import { HttpClient } from '@angular/common/http';
|
||||||
import { AfterViewInit, ChangeDetectionStrategy, ChangeDetectorRef, Component, DestroyRef, ElementRef, viewChild, inject, OnDestroy, OnInit } from '@angular/core';
|
import { AfterViewInit, ChangeDetectionStrategy, ChangeDetectorRef, Component, DestroyRef, ElementRef, viewChild, inject, OnDestroy, OnInit } from '@angular/core';
|
||||||
import { Observable, Subscription, map, distinctUntilChanged, finalize } from 'rxjs';
|
import { Observable, Subject, Subscription, from, map, distinctUntilChanged, finalize, mergeMap, takeUntil, tap } from 'rxjs';
|
||||||
import { FormsModule } from '@angular/forms';
|
import { FormsModule } from '@angular/forms';
|
||||||
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
|
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
|
||||||
import { FontAwesomeModule } from '@fortawesome/angular-fontawesome';
|
import { FontAwesomeModule } from '@fortawesome/angular-fontawesome';
|
||||||
@@ -106,8 +106,13 @@ export class App implements AfterViewInit, OnInit, OnDestroy {
|
|||||||
batchImportStatus = '';
|
batchImportStatus = '';
|
||||||
batchImportCount = 0;
|
batchImportCount = 0;
|
||||||
batchImportTotal = 0;
|
batchImportTotal = 0;
|
||||||
|
batchImportFailures = 0;
|
||||||
importInProgress = false;
|
importInProgress = false;
|
||||||
cancelImportFlag = false;
|
private batchImportCancel$ = new Subject<void>();
|
||||||
|
// Maximum number of /add requests to have in-flight at once during a batch
|
||||||
|
// import. Keeps the server from being hit with hundreds of simultaneous
|
||||||
|
// yt-dlp metadata extractions when a user pastes a huge URL list.
|
||||||
|
private static readonly BATCH_IMPORT_CONCURRENCY = 4;
|
||||||
ytDlpOptionsUpdateTime: string | null = null;
|
ytDlpOptionsUpdateTime: string | null = null;
|
||||||
ytDlpVersion: string | null = null;
|
ytDlpVersion: string | null = null;
|
||||||
metubeVersion: string | null = null;
|
metubeVersion: string | null = null;
|
||||||
@@ -1175,8 +1180,10 @@ export class App implements AfterViewInit, OnInit, OnDestroy {
|
|||||||
this.batchImportModalOpen = true;
|
this.batchImportModalOpen = true;
|
||||||
this.batchImportText = '';
|
this.batchImportText = '';
|
||||||
this.batchImportStatus = '';
|
this.batchImportStatus = '';
|
||||||
|
this.batchImportCount = 0;
|
||||||
|
this.batchImportTotal = 0;
|
||||||
|
this.batchImportFailures = 0;
|
||||||
this.importInProgress = false;
|
this.importInProgress = false;
|
||||||
this.cancelImportFlag = false;
|
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
const textarea = document.getElementById('batch-import-textarea');
|
const textarea = document.getElementById('batch-import-textarea');
|
||||||
if (textarea instanceof HTMLTextAreaElement) {
|
if (textarea instanceof HTMLTextAreaElement) {
|
||||||
@@ -1203,39 +1210,62 @@ export class App implements AfterViewInit, OnInit, OnDestroy {
|
|||||||
}
|
}
|
||||||
this.importInProgress = true;
|
this.importInProgress = true;
|
||||||
this.batchImportCount = 0;
|
this.batchImportCount = 0;
|
||||||
|
this.batchImportFailures = 0;
|
||||||
this.batchImportTotal = urls.length;
|
this.batchImportTotal = urls.length;
|
||||||
this.batchImportStatus = `Sending ${urls.length} URLs to server...`;
|
this.updateBatchImportStatus();
|
||||||
|
|
||||||
const promises = urls.map(url =>
|
from(urls).pipe(
|
||||||
new Promise<void>((resolve) => {
|
mergeMap(
|
||||||
this.downloads.add(this.buildAddPayload({ url })).subscribe({
|
url => this.downloads.add(this.buildAddPayload({ url })).pipe(
|
||||||
next: (status: Status) => {
|
// downloads.add() already catches HTTP errors and emits a single
|
||||||
if (status.status === 'error') {
|
// Status value, so `tap` (not `finalize`) is the right place to
|
||||||
console.error(`Error adding URL ${url}: ${status.msg}`);
|
// count. This avoids incrementing the counter when an in-flight
|
||||||
}
|
// request is aborted by cancellation.
|
||||||
this.batchImportCount++;
|
tap((status: Status) => {
|
||||||
resolve();
|
if (status.status === 'error') {
|
||||||
},
|
this.batchImportFailures++;
|
||||||
error: (err) => {
|
console.error(`Error adding URL ${url}: ${status.msg}`);
|
||||||
console.error(`Error importing URL ${url}:`, err);
|
|
||||||
this.batchImportCount++;
|
|
||||||
resolve();
|
|
||||||
}
|
}
|
||||||
});
|
this.batchImportCount++;
|
||||||
})
|
this.updateBatchImportStatus();
|
||||||
);
|
this.cdr.markForCheck();
|
||||||
|
}),
|
||||||
Promise.all(promises).then(() => {
|
),
|
||||||
this.batchImportStatus = `All ${urls.length} URLs sent to server.`;
|
App.BATCH_IMPORT_CONCURRENCY,
|
||||||
this.importInProgress = false;
|
),
|
||||||
});
|
takeUntil(this.batchImportCancel$),
|
||||||
|
takeUntilDestroyed(this.destroyRef),
|
||||||
|
finalize(() => {
|
||||||
|
this.importInProgress = false;
|
||||||
|
this.updateBatchImportStatus(true);
|
||||||
|
this.cdr.markForCheck();
|
||||||
|
}),
|
||||||
|
).subscribe();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cancel the batch import process
|
private updateBatchImportStatus(done = false): void {
|
||||||
|
const parts: string[] = [];
|
||||||
|
if (done) {
|
||||||
|
const processed = this.batchImportCount;
|
||||||
|
if (processed < this.batchImportTotal) {
|
||||||
|
parts.push(`Import cancelled after ${processed} of ${this.batchImportTotal} URLs.`);
|
||||||
|
} else {
|
||||||
|
parts.push(`Finished importing ${this.batchImportTotal} URLs.`);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
parts.push(`Importing ${this.batchImportCount} of ${this.batchImportTotal} URLs...`);
|
||||||
|
}
|
||||||
|
if (this.batchImportFailures > 0) {
|
||||||
|
parts.push(`${this.batchImportFailures} failed.`);
|
||||||
|
}
|
||||||
|
this.batchImportStatus = parts.join(' ');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel the batch import process: aborts in-flight and pending requests
|
||||||
|
// immediately via the cancellation Subject wired into the pipeline.
|
||||||
cancelBatchImport(): void {
|
cancelBatchImport(): void {
|
||||||
if (this.importInProgress) {
|
if (this.importInProgress) {
|
||||||
this.cancelImportFlag = true;
|
this.batchImportCancel$.next();
|
||||||
this.batchImportStatus += ' Cancelling...';
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user