From f44993af42da029d68a138d33786de48e9497bbc Mon Sep 17 00:00:00 2001 From: grimsi <9295182+grimsi@users.noreply.github.com> Date: Mon, 26 May 2025 10:19:46 +0200 Subject: [PATCH] Add buffer to websocket event queues --- .../src/main/frontend/state/ConfigState.ts | 16 +++++---- gameyfin/src/main/frontend/state/GameState.ts | 28 ++++++++------- .../src/main/frontend/state/LibraryState.ts | 28 ++++++++------- .../src/main/frontend/state/PluginState.ts | 22 ++++++------ .../grimsi/gameyfin/config/ConfigEndpoint.kt | 4 +-- .../grimsi/gameyfin/config/ConfigService.kt | 28 +++++++++------ .../gameyfin/core/plugins/PluginEndpoint.kt | 4 +-- .../gameyfin/core/plugins/PluginService.kt | 34 ++++++++++++------- .../de/grimsi/gameyfin/games/GameEndpoint.kt | 2 +- .../de/grimsi/gameyfin/games/GameService.kt | 9 +++-- .../gameyfin/libraries/LibraryEndpoint.kt | 2 +- .../gameyfin/libraries/LibraryService.kt | 9 +++-- 12 files changed, 111 insertions(+), 75 deletions(-) diff --git a/gameyfin/src/main/frontend/state/ConfigState.ts b/gameyfin/src/main/frontend/state/ConfigState.ts index 2390090..61aab5c 100644 --- a/gameyfin/src/main/frontend/state/ConfigState.ts +++ b/gameyfin/src/main/frontend/state/ConfigState.ts @@ -5,7 +5,7 @@ import ConfigUpdateDto from "Frontend/generated/de/grimsi/gameyfin/config/dto/Co import {Subscription} from "@vaadin/hilla-frontend"; type ConfigState = { - subscription?: Subscription; + subscription?: Subscription; isLoaded: boolean; state: Record; config: NestedConfig; @@ -32,12 +32,14 @@ export async function initializeConfigState() { }); // Subscribe to real-time updates - configState.subscription = ConfigEndpoint.subscribe().onNext((updateDto: ConfigUpdateDto) => { - Object.entries(updateDto.updates).forEach(([key, value]) => { - if (configState.state[key]) { - configState.state[key].value = value; - } - }); + configState.subscription = ConfigEndpoint.subscribe().onNext((updateDtos: ConfigUpdateDto[]) => { + updateDtos.forEach((updateDto: ConfigUpdateDto) => { + Object.entries(updateDto.updates).forEach(([key, value]) => { + if (configState.state[key]) { + configState.state[key].value = value; + } + }); + }) }); } diff --git a/gameyfin/src/main/frontend/state/GameState.ts b/gameyfin/src/main/frontend/state/GameState.ts index 80f7d9c..5ae0713 100644 --- a/gameyfin/src/main/frontend/state/GameState.ts +++ b/gameyfin/src/main/frontend/state/GameState.ts @@ -6,7 +6,7 @@ import GameDto from "Frontend/generated/de/grimsi/gameyfin/games/dto/GameDto"; import Rand from "rand-seed"; type GameState = { - subscription?: Subscription; + subscription?: Subscription; isLoaded: boolean; state: Record; games: GameDto[]; @@ -125,18 +125,20 @@ export async function initializeGameState() { }); // Subscribe to real-time updates - gameState.subscription = GameEndpoint.subscribe().onNext((gameEvent) => { - switch (gameEvent.type) { - case "created": - case "updated": - //@ts-ignore - gameState.state[gameEvent.game.id] = gameEvent.game; - break; - case "deleted": - //@ts-ignore - delete gameState.state[gameEvent.gameId]; - break; - } + gameState.subscription = GameEndpoint.subscribe().onNext((gameEvents: GameEvent[]) => { + gameEvents.forEach((gameEvent: GameEvent) => { + switch (gameEvent.type) { + case "created": + case "updated": + //@ts-ignore + gameState.state[gameEvent.game.id] = gameEvent.game; + break; + case "deleted": + //@ts-ignore + delete gameState.state[gameEvent.gameId]; + break; + } + }) }); return gameState; diff --git a/gameyfin/src/main/frontend/state/LibraryState.ts b/gameyfin/src/main/frontend/state/LibraryState.ts index 668a568..f1365ce 100644 --- a/gameyfin/src/main/frontend/state/LibraryState.ts +++ b/gameyfin/src/main/frontend/state/LibraryState.ts @@ -5,7 +5,7 @@ import LibraryDto from "Frontend/generated/de/grimsi/gameyfin/libraries/dto/Libr import LibraryEvent from "Frontend/generated/de/grimsi/gameyfin/libraries/dto/LibraryEvent"; type LibraryState = { - subscription?: Subscription; + subscription?: Subscription; isLoaded: boolean; state: Record; libraries: LibraryDto[]; @@ -39,18 +39,20 @@ export async function initializeLibraryState() { }); // Subscribe to real-time updates - libraryState.subscription = LibraryEndpoint.subscribe().onNext((libraryEvent) => { - switch (libraryEvent.type) { - case "created": - case "updated": - //@ts-ignore - libraryState.state[libraryEvent.library.id] = libraryEvent.library; - break; - case "deleted": - //@ts-ignore - delete libraryState.state[libraryEvent.libraryId]; - break; - } + libraryState.subscription = LibraryEndpoint.subscribe().onNext((libraryEvents: LibraryEvent[]) => { + libraryEvents.forEach((libraryEvent: LibraryEvent) => { + switch (libraryEvent.type) { + case "created": + case "updated": + //@ts-ignore + libraryState.state[libraryEvent.library.id] = libraryEvent.library; + break; + case "deleted": + //@ts-ignore + delete libraryState.state[libraryEvent.libraryId]; + break; + } + }) }); return libraryState; diff --git a/gameyfin/src/main/frontend/state/PluginState.ts b/gameyfin/src/main/frontend/state/PluginState.ts index 16a8123..4a0198b 100644 --- a/gameyfin/src/main/frontend/state/PluginState.ts +++ b/gameyfin/src/main/frontend/state/PluginState.ts @@ -5,7 +5,7 @@ import {proxy} from "valtio/index"; import {PluginEndpoint} from "Frontend/generated/endpoints"; type PluginState = { - subscription?: Subscription; + subscription?: Subscription; isLoaded: boolean; state: Record; plugins: PluginDto[]; @@ -36,15 +36,17 @@ export async function initializePluginState() { }); // Subscribe to real-time updates - pluginState.subscription = PluginEndpoint.subscribe().onNext((updateDto: PluginUpdateDto) => { - // Make sure the plugin exists in the state - if (pluginState.state[updateDto.id]) { - // Update the existing plugin by merging the new data using destructuring - pluginState.state[updateDto.id] = { - ...pluginState.state[updateDto.id], - ...updateDto - }; - } + pluginState.subscription = PluginEndpoint.subscribe().onNext((updateDtos: PluginUpdateDto[]) => { + updateDtos.forEach((updateDto: PluginUpdateDto) => { + // Make sure the plugin exists in the state + if (pluginState.state[updateDto.id]) { + // Update the existing plugin by merging the new data using destructuring + pluginState.state[updateDto.id] = { + ...pluginState.state[updateDto.id], + ...updateDto + }; + } + }) }); } diff --git a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/config/ConfigEndpoint.kt b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/config/ConfigEndpoint.kt index 76d01aa..bdc80e8 100644 --- a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/config/ConfigEndpoint.kt +++ b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/config/ConfigEndpoint.kt @@ -24,9 +24,9 @@ class ConfigEndpoint( /** CRUD endpoints for admins **/ @PermitAll - fun subscribe(): Flux { + fun subscribe(): Flux> { val user = SecurityContextHolder.getContext().authentication.principal as UserDetails - return if (user.isAdmin()) configService.subscribe() + return if (user.isAdmin()) ConfigService.subscribe() else Flux.empty() } diff --git a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/config/ConfigService.kt b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/config/ConfigService.kt index 45c87df..818b976 100644 --- a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/config/ConfigService.kt +++ b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/config/ConfigService.kt @@ -10,6 +10,8 @@ import org.springframework.stereotype.Service import reactor.core.publisher.Flux import reactor.core.publisher.Sinks import java.io.Serializable +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.toJavaDuration @Service class ConfigService( @@ -17,17 +19,23 @@ class ConfigService( ) { companion object { private val log = KotlinLogging.logger {} - } - private val configUpdates = Sinks.many().multicast().onBackpressureBuffer(1024, false) + /* Websockets */ + private val configUpdates = Sinks.many().multicast().onBackpressureBuffer(1024, false) - fun subscribe(): Flux { - log.debug { "New subscription for configUpdates (#${configUpdates.currentSubscriberCount()})" } - return configUpdates.asFlux() - .doOnSubscribe { log.debug { "Subscriber added to configUpdates [${configUpdates.currentSubscriberCount()}]" } } - .doFinally { - log.debug { "Subscriber removed from configUpdates with signal type $it [${configUpdates.currentSubscriberCount()}]" } - } + fun subscribe(): Flux> { + log.debug { "New subscription for configUpdates (#${configUpdates.currentSubscriberCount()})" } + return configUpdates.asFlux() + .buffer(100.milliseconds.toJavaDuration()) + .doOnSubscribe { log.debug { "Subscriber added to configUpdates [${configUpdates.currentSubscriberCount()}]" } } + .doFinally { + log.debug { "Subscriber removed from configUpdates with signal type $it [${configUpdates.currentSubscriberCount()}]" } + } + } + + fun emit(update: ConfigUpdateDto) { + configUpdates.tryEmitNext(update) + } } /** @@ -135,7 +143,7 @@ class ConfigService( set(key, value) } } - configUpdates.tryEmitNext(update) + emit(update) } /** diff --git a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/core/plugins/PluginEndpoint.kt b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/core/plugins/PluginEndpoint.kt index c7dfbd5..75a7988 100644 --- a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/core/plugins/PluginEndpoint.kt +++ b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/core/plugins/PluginEndpoint.kt @@ -18,9 +18,9 @@ class PluginEndpoint( ) { @PermitAll - fun subscribe(): Flux { + fun subscribe(): Flux> { val user = SecurityContextHolder.getContext().authentication.principal as UserDetails - return if (user.isAdmin()) pluginService.subscribe() + return if (user.isAdmin()) PluginService.subscribe() else Flux.empty() } diff --git a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/core/plugins/PluginService.kt b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/core/plugins/PluginService.kt index 2ec2306..d9749ee 100644 --- a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/core/plugins/PluginService.kt +++ b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/core/plugins/PluginService.kt @@ -20,6 +20,8 @@ import org.springframework.data.repository.findByIdOrNull import org.springframework.stereotype.Service import reactor.core.publisher.Flux import reactor.core.publisher.Sinks +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.toJavaDuration @Service class PluginService( @@ -29,26 +31,34 @@ class PluginService( ) { companion object { private val log = KotlinLogging.logger {} + + /* Websockets */ + private val pluginUpdates = Sinks.many().multicast().onBackpressureBuffer(1024, false) + + fun subscribe(): Flux> { + log.debug { "New subscription for pluginUpdates" } + return pluginUpdates.asFlux() + .buffer(100.milliseconds.toJavaDuration()) + .doOnSubscribe { log.debug { "Subscriber added to pluginUpdates [${pluginUpdates.currentSubscriberCount()}]" } } + .doFinally { + log.debug { "Subscriber removed from pluginUpdates with signal type $it [${pluginUpdates.currentSubscriberCount()}]" } + } + } + + fun emit(update: PluginUpdateDto) { + pluginUpdates.tryEmitNext(update) + } } - private val pluginUpdates = Sinks.many().multicast().onBackpressureBuffer(1024, false) private val pluginConfigValidationCache = mutableMapOf() init { pluginManager.addPluginStateListener { event -> - pluginUpdates.tryEmitNext(PluginUpdateDto(id = event.plugin.pluginId, state = event.pluginState)) + val update = PluginUpdateDto(id = event.plugin.pluginId, state = event.pluginState) + emit(update) } } - fun subscribe(): Flux { - log.debug { "New subscription for pluginUpdates" } - return pluginUpdates.asFlux() - .doOnSubscribe { log.debug { "Subscriber added to pluginUpdates [${pluginUpdates.currentSubscriberCount()}]" } } - .doFinally { - log.debug { "Subscriber removed from pluginUpdates with signal type $it [${pluginUpdates.currentSubscriberCount()}]" } - } - } - fun getSupportedPluginTypes(): List { return pluginManager.plugins .flatMap { pluginManager.getExtensionTypes(it.pluginId) } @@ -113,7 +123,7 @@ class PluginService( // Emit update event val update = PluginUpdateDto(pluginId, config = config, configValidation = result) - pluginUpdates.tryEmitNext(update) + emit(update) } fun validatePluginConfig(pluginId: String, forceRevalidation: Boolean = false): PluginConfigValidationResult { diff --git a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/games/GameEndpoint.kt b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/games/GameEndpoint.kt index 943311d..24a9000 100644 --- a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/games/GameEndpoint.kt +++ b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/games/GameEndpoint.kt @@ -14,7 +14,7 @@ import reactor.core.publisher.Flux class GameEndpoint( private val gameService: GameService ) { - fun subscribe(): Flux { + fun subscribe(): Flux> { return GameService.subscribe() } diff --git a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/games/GameService.kt b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/games/GameService.kt index 2a303cf..e2e303c 100644 --- a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/games/GameService.kt +++ b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/games/GameService.kt @@ -29,6 +29,8 @@ import org.springframework.transaction.annotation.Transactional import reactor.core.publisher.Flux import reactor.core.publisher.Sinks import java.nio.file.Path +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.toJavaDuration @Service class GameService( @@ -44,10 +46,13 @@ class GameService( /* Websockets */ private val gameEvents = Sinks.many().multicast().onBackpressureBuffer(1024, false) - fun subscribe(): Flux { + fun subscribe(): Flux> { log.debug { "New subscription for gameUpdates" } return gameEvents.asFlux() - .doOnSubscribe { log.debug { "Subscriber added to gameEvents [${gameEvents.currentSubscriberCount()}]" } } + .buffer(100.milliseconds.toJavaDuration()) + .doOnSubscribe { + log.debug { "Subscriber added to gameEvents [${gameEvents.currentSubscriberCount()}]" } + } .doFinally { log.debug { "Subscriber removed from gameEvents with signal type $it [${gameEvents.currentSubscriberCount()}]" } } diff --git a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/libraries/LibraryEndpoint.kt b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/libraries/LibraryEndpoint.kt index 8c174fb..c86f87d 100644 --- a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/libraries/LibraryEndpoint.kt +++ b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/libraries/LibraryEndpoint.kt @@ -15,7 +15,7 @@ import reactor.core.publisher.Flux class LibraryEndpoint( private val libraryService: LibraryService ) { - fun subscribe(): Flux { + fun subscribe(): Flux> { return LibraryService.subscribe() } diff --git a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/libraries/LibraryService.kt b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/libraries/LibraryService.kt index 690d724..6ab817b 100644 --- a/gameyfin/src/main/kotlin/de/grimsi/gameyfin/libraries/LibraryService.kt +++ b/gameyfin/src/main/kotlin/de/grimsi/gameyfin/libraries/LibraryService.kt @@ -15,7 +15,9 @@ import java.util.concurrent.Callable import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicInteger +import kotlin.time.Duration.Companion.milliseconds import kotlin.time.measureTimedValue +import kotlin.time.toJavaDuration @Service class LibraryService( @@ -32,10 +34,13 @@ class LibraryService( /* Websockets */ private val libraryEvents = Sinks.many().multicast().onBackpressureBuffer(1024, false) - fun subscribe(): Flux { + fun subscribe(): Flux> { log.debug { "New subscription for libraryEvents" } return libraryEvents.asFlux() - .doOnSubscribe { log.debug { "Subscriber added to libraryEvents [${libraryEvents.currentSubscriberCount()}]" } } + .buffer(100.milliseconds.toJavaDuration()) + .doOnSubscribe { + log.debug { "Subscriber added to libraryEvents [${libraryEvents.currentSubscriberCount()}]" } + } .doFinally { log.debug { "Subscriber removed from libraryEvents with signal type $it [${libraryEvents.currentSubscriberCount()}]" } }