diff --git a/app/tests/test_download_queue.py b/app/tests/test_download_queue.py index a078403..71632c8 100644 --- a/app/tests/test_download_queue.py +++ b/app/tests/test_download_queue.py @@ -114,6 +114,49 @@ async def test_cancel_removes_from_pending(dq_env): notifier.canceled.assert_awaited() +@pytest.mark.asyncio +async def test_cancel_before_start_marks_download_canceled(dq_env): + """Regression test for the race condition where cancel() arrives after the + download has been placed in the queue and ``__start_download`` has been + scheduled via ``asyncio.create_task`` but has not yet executed. Without the + fix, the pending task would run ``download.start()`` despite the user + cancelling, because its ``download.canceled`` guard was never flipped.""" + notifier = AsyncMock() + + def fake_extract(self, url, ytdl_options_presets=None, ytdl_options_overrides=None): + return { + "_type": "video", + "id": "vid1", + "title": "Test Video", + "url": url, + "webpage_url": url, + } + + dq = DownloadQueue(dq_env, notifier) + url = "https://example.com/race" + start_mock = AsyncMock() + with patch.object(DownloadQueue, "_DownloadQueue__extract_info", fake_extract), \ + patch.object(DownloadQueue, "_DownloadQueue__start_download", start_mock): + await dq.add( + url, + "video", + "auto", + "any", + "best", + "", + "", + 0, + auto_start=True, + ) + assert dq.queue.exists(url) + download = dq.queue.get(url) + assert download.canceled is False + await dq.cancel([url]) + assert not dq.queue.exists(url) + assert download.canceled is True + notifier.canceled.assert_awaited_with(url) + + @pytest.mark.asyncio async def test_start_pending_moves_to_queue(dq_env): notifier = AsyncMock() diff --git a/ui/src/app/app.html b/ui/src/app/app.html index 2b294fd..9efb6a9 100644 --- a/ui/src/app/app.html +++ b/ui/src/app/app.html @@ -608,10 +608,12 @@ @if (batchImportStatus) { {{ batchImportStatus }} } - @if (importInProgress) { + @if (batchImportTotal > 0) {
-
diff --git a/ui/src/app/app.ts b/ui/src/app/app.ts index 9394680..cd88283 100644 --- a/ui/src/app/app.ts +++ b/ui/src/app/app.ts @@ -1,7 +1,7 @@ import { AsyncPipe, DatePipe, KeyValuePipe, NgTemplateOutlet } from '@angular/common'; import { HttpClient } from '@angular/common/http'; import { AfterViewInit, ChangeDetectionStrategy, ChangeDetectorRef, Component, DestroyRef, ElementRef, viewChild, inject, OnDestroy, OnInit } from '@angular/core'; -import { Observable, Subscription, map, distinctUntilChanged, finalize } from 'rxjs'; +import { Observable, Subject, Subscription, from, map, distinctUntilChanged, finalize, mergeMap, takeUntil, tap } from 'rxjs'; import { FormsModule } from '@angular/forms'; import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; import { FontAwesomeModule } from '@fortawesome/angular-fontawesome'; @@ -106,8 +106,13 @@ export class App implements AfterViewInit, OnInit, OnDestroy { batchImportStatus = ''; batchImportCount = 0; batchImportTotal = 0; + batchImportFailures = 0; importInProgress = false; - cancelImportFlag = false; + private batchImportCancel$ = new Subject(); + // Maximum number of /add requests to have in-flight at once during a batch + // import. Keeps the server from being hit with hundreds of simultaneous + // yt-dlp metadata extractions when a user pastes a huge URL list. + private static readonly BATCH_IMPORT_CONCURRENCY = 4; ytDlpOptionsUpdateTime: string | null = null; ytDlpVersion: string | null = null; metubeVersion: string | null = null; @@ -1175,8 +1180,10 @@ export class App implements AfterViewInit, OnInit, OnDestroy { this.batchImportModalOpen = true; this.batchImportText = ''; this.batchImportStatus = ''; + this.batchImportCount = 0; + this.batchImportTotal = 0; + this.batchImportFailures = 0; this.importInProgress = false; - this.cancelImportFlag = false; setTimeout(() => { const textarea = document.getElementById('batch-import-textarea'); if (textarea instanceof HTMLTextAreaElement) { @@ -1203,39 +1210,62 @@ export class App implements AfterViewInit, OnInit, OnDestroy { } this.importInProgress = true; this.batchImportCount = 0; + this.batchImportFailures = 0; this.batchImportTotal = urls.length; - this.batchImportStatus = `Sending ${urls.length} URLs to server...`; + this.updateBatchImportStatus(); - const promises = urls.map(url => - new Promise((resolve) => { - this.downloads.add(this.buildAddPayload({ url })).subscribe({ - next: (status: Status) => { - if (status.status === 'error') { - console.error(`Error adding URL ${url}: ${status.msg}`); - } - this.batchImportCount++; - resolve(); - }, - error: (err) => { - console.error(`Error importing URL ${url}:`, err); - this.batchImportCount++; - resolve(); + from(urls).pipe( + mergeMap( + url => this.downloads.add(this.buildAddPayload({ url })).pipe( + // downloads.add() already catches HTTP errors and emits a single + // Status value, so `tap` (not `finalize`) is the right place to + // count. This avoids incrementing the counter when an in-flight + // request is aborted by cancellation. + tap((status: Status) => { + if (status.status === 'error') { + this.batchImportFailures++; + console.error(`Error adding URL ${url}: ${status.msg}`); } - }); - }) - ); - - Promise.all(promises).then(() => { - this.batchImportStatus = `All ${urls.length} URLs sent to server.`; - this.importInProgress = false; - }); + this.batchImportCount++; + this.updateBatchImportStatus(); + this.cdr.markForCheck(); + }), + ), + App.BATCH_IMPORT_CONCURRENCY, + ), + takeUntil(this.batchImportCancel$), + takeUntilDestroyed(this.destroyRef), + finalize(() => { + this.importInProgress = false; + this.updateBatchImportStatus(true); + this.cdr.markForCheck(); + }), + ).subscribe(); } - // Cancel the batch import process + private updateBatchImportStatus(done = false): void { + const parts: string[] = []; + if (done) { + const processed = this.batchImportCount; + if (processed < this.batchImportTotal) { + parts.push(`Import cancelled after ${processed} of ${this.batchImportTotal} URLs.`); + } else { + parts.push(`Finished importing ${this.batchImportTotal} URLs.`); + } + } else { + parts.push(`Importing ${this.batchImportCount} of ${this.batchImportTotal} URLs...`); + } + if (this.batchImportFailures > 0) { + parts.push(`${this.batchImportFailures} failed.`); + } + this.batchImportStatus = parts.join(' '); + } + + // Cancel the batch import process: aborts in-flight and pending requests + // immediately via the cancellation Subject wired into the pipeline. cancelBatchImport(): void { if (this.importInProgress) { - this.cancelImportFlag = true; - this.batchImportStatus += ' Cancelling...'; + this.batchImportCancel$.next(); } }