Add buffer to websocket event queues

This commit is contained in:
grimsi
2025-05-26 10:19:46 +02:00
parent daa8b7ee6c
commit f44993af42
12 changed files with 111 additions and 75 deletions
@@ -5,7 +5,7 @@ import ConfigUpdateDto from "Frontend/generated/de/grimsi/gameyfin/config/dto/Co
import {Subscription} from "@vaadin/hilla-frontend";
type ConfigState = {
subscription?: Subscription<ConfigUpdateDto>;
subscription?: Subscription<ConfigUpdateDto[]>;
isLoaded: boolean;
state: Record<string, ConfigEntryDto>;
config: NestedConfig;
@@ -32,12 +32,14 @@ export async function initializeConfigState() {
});
// Subscribe to real-time updates
configState.subscription = ConfigEndpoint.subscribe().onNext((updateDto: ConfigUpdateDto) => {
Object.entries(updateDto.updates).forEach(([key, value]) => {
if (configState.state[key]) {
configState.state[key].value = value;
}
});
configState.subscription = ConfigEndpoint.subscribe().onNext((updateDtos: ConfigUpdateDto[]) => {
updateDtos.forEach((updateDto: ConfigUpdateDto) => {
Object.entries(updateDto.updates).forEach(([key, value]) => {
if (configState.state[key]) {
configState.state[key].value = value;
}
});
})
});
}
+15 -13
View File
@@ -6,7 +6,7 @@ import GameDto from "Frontend/generated/de/grimsi/gameyfin/games/dto/GameDto";
import Rand from "rand-seed";
type GameState = {
subscription?: Subscription<GameEvent>;
subscription?: Subscription<GameEvent[]>;
isLoaded: boolean;
state: Record<number, GameDto>;
games: GameDto[];
@@ -125,18 +125,20 @@ export async function initializeGameState() {
});
// Subscribe to real-time updates
gameState.subscription = GameEndpoint.subscribe().onNext((gameEvent) => {
switch (gameEvent.type) {
case "created":
case "updated":
//@ts-ignore
gameState.state[gameEvent.game.id] = gameEvent.game;
break;
case "deleted":
//@ts-ignore
delete gameState.state[gameEvent.gameId];
break;
}
gameState.subscription = GameEndpoint.subscribe().onNext((gameEvents: GameEvent[]) => {
gameEvents.forEach((gameEvent: GameEvent) => {
switch (gameEvent.type) {
case "created":
case "updated":
//@ts-ignore
gameState.state[gameEvent.game.id] = gameEvent.game;
break;
case "deleted":
//@ts-ignore
delete gameState.state[gameEvent.gameId];
break;
}
})
});
return gameState;
@@ -5,7 +5,7 @@ import LibraryDto from "Frontend/generated/de/grimsi/gameyfin/libraries/dto/Libr
import LibraryEvent from "Frontend/generated/de/grimsi/gameyfin/libraries/dto/LibraryEvent";
type LibraryState = {
subscription?: Subscription<LibraryEvent>;
subscription?: Subscription<LibraryEvent[]>;
isLoaded: boolean;
state: Record<number, LibraryDto>;
libraries: LibraryDto[];
@@ -39,18 +39,20 @@ export async function initializeLibraryState() {
});
// Subscribe to real-time updates
libraryState.subscription = LibraryEndpoint.subscribe().onNext((libraryEvent) => {
switch (libraryEvent.type) {
case "created":
case "updated":
//@ts-ignore
libraryState.state[libraryEvent.library.id] = libraryEvent.library;
break;
case "deleted":
//@ts-ignore
delete libraryState.state[libraryEvent.libraryId];
break;
}
libraryState.subscription = LibraryEndpoint.subscribe().onNext((libraryEvents: LibraryEvent[]) => {
libraryEvents.forEach((libraryEvent: LibraryEvent) => {
switch (libraryEvent.type) {
case "created":
case "updated":
//@ts-ignore
libraryState.state[libraryEvent.library.id] = libraryEvent.library;
break;
case "deleted":
//@ts-ignore
delete libraryState.state[libraryEvent.libraryId];
break;
}
})
});
return libraryState;
+12 -10
View File
@@ -5,7 +5,7 @@ import {proxy} from "valtio/index";
import {PluginEndpoint} from "Frontend/generated/endpoints";
type PluginState = {
subscription?: Subscription<PluginUpdateDto>;
subscription?: Subscription<PluginUpdateDto[]>;
isLoaded: boolean;
state: Record<string, PluginDto>;
plugins: PluginDto[];
@@ -36,15 +36,17 @@ export async function initializePluginState() {
});
// Subscribe to real-time updates
pluginState.subscription = PluginEndpoint.subscribe().onNext((updateDto: PluginUpdateDto) => {
// Make sure the plugin exists in the state
if (pluginState.state[updateDto.id]) {
// Update the existing plugin by merging the new data using destructuring
pluginState.state[updateDto.id] = {
...pluginState.state[updateDto.id],
...updateDto
};
}
pluginState.subscription = PluginEndpoint.subscribe().onNext((updateDtos: PluginUpdateDto[]) => {
updateDtos.forEach((updateDto: PluginUpdateDto) => {
// Make sure the plugin exists in the state
if (pluginState.state[updateDto.id]) {
// Update the existing plugin by merging the new data using destructuring
pluginState.state[updateDto.id] = {
...pluginState.state[updateDto.id],
...updateDto
};
}
})
});
}
@@ -24,9 +24,9 @@ class ConfigEndpoint(
/** CRUD endpoints for admins **/
@PermitAll
fun subscribe(): Flux<ConfigUpdateDto> {
fun subscribe(): Flux<List<ConfigUpdateDto>> {
val user = SecurityContextHolder.getContext().authentication.principal as UserDetails
return if (user.isAdmin()) configService.subscribe()
return if (user.isAdmin()) ConfigService.subscribe()
else Flux.empty()
}
@@ -10,6 +10,8 @@ import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import java.io.Serializable
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.toJavaDuration
@Service
class ConfigService(
@@ -17,17 +19,23 @@ class ConfigService(
) {
companion object {
private val log = KotlinLogging.logger {}
}
private val configUpdates = Sinks.many().multicast().onBackpressureBuffer<ConfigUpdateDto>(1024, false)
/* Websockets */
private val configUpdates = Sinks.many().multicast().onBackpressureBuffer<ConfigUpdateDto>(1024, false)
fun subscribe(): Flux<ConfigUpdateDto> {
log.debug { "New subscription for configUpdates (#${configUpdates.currentSubscriberCount()})" }
return configUpdates.asFlux()
.doOnSubscribe { log.debug { "Subscriber added to configUpdates [${configUpdates.currentSubscriberCount()}]" } }
.doFinally {
log.debug { "Subscriber removed from configUpdates with signal type $it [${configUpdates.currentSubscriberCount()}]" }
}
fun subscribe(): Flux<List<ConfigUpdateDto>> {
log.debug { "New subscription for configUpdates (#${configUpdates.currentSubscriberCount()})" }
return configUpdates.asFlux()
.buffer(100.milliseconds.toJavaDuration())
.doOnSubscribe { log.debug { "Subscriber added to configUpdates [${configUpdates.currentSubscriberCount()}]" } }
.doFinally {
log.debug { "Subscriber removed from configUpdates with signal type $it [${configUpdates.currentSubscriberCount()}]" }
}
}
fun emit(update: ConfigUpdateDto) {
configUpdates.tryEmitNext(update)
}
}
/**
@@ -135,7 +143,7 @@ class ConfigService(
set(key, value)
}
}
configUpdates.tryEmitNext(update)
emit(update)
}
/**
@@ -18,9 +18,9 @@ class PluginEndpoint(
) {
@PermitAll
fun subscribe(): Flux<PluginUpdateDto> {
fun subscribe(): Flux<List<PluginUpdateDto>> {
val user = SecurityContextHolder.getContext().authentication.principal as UserDetails
return if (user.isAdmin()) pluginService.subscribe()
return if (user.isAdmin()) PluginService.subscribe()
else Flux.empty()
}
@@ -20,6 +20,8 @@ import org.springframework.data.repository.findByIdOrNull
import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.toJavaDuration
@Service
class PluginService(
@@ -29,26 +31,34 @@ class PluginService(
) {
companion object {
private val log = KotlinLogging.logger {}
/* Websockets */
private val pluginUpdates = Sinks.many().multicast().onBackpressureBuffer<PluginUpdateDto>(1024, false)
fun subscribe(): Flux<List<PluginUpdateDto>> {
log.debug { "New subscription for pluginUpdates" }
return pluginUpdates.asFlux()
.buffer(100.milliseconds.toJavaDuration())
.doOnSubscribe { log.debug { "Subscriber added to pluginUpdates [${pluginUpdates.currentSubscriberCount()}]" } }
.doFinally {
log.debug { "Subscriber removed from pluginUpdates with signal type $it [${pluginUpdates.currentSubscriberCount()}]" }
}
}
fun emit(update: PluginUpdateDto) {
pluginUpdates.tryEmitNext(update)
}
}
private val pluginUpdates = Sinks.many().multicast().onBackpressureBuffer<PluginUpdateDto>(1024, false)
private val pluginConfigValidationCache = mutableMapOf<String, PluginConfigValidationResult>()
init {
pluginManager.addPluginStateListener { event ->
pluginUpdates.tryEmitNext(PluginUpdateDto(id = event.plugin.pluginId, state = event.pluginState))
val update = PluginUpdateDto(id = event.plugin.pluginId, state = event.pluginState)
emit(update)
}
}
fun subscribe(): Flux<PluginUpdateDto> {
log.debug { "New subscription for pluginUpdates" }
return pluginUpdates.asFlux()
.doOnSubscribe { log.debug { "Subscriber added to pluginUpdates [${pluginUpdates.currentSubscriberCount()}]" } }
.doFinally {
log.debug { "Subscriber removed from pluginUpdates with signal type $it [${pluginUpdates.currentSubscriberCount()}]" }
}
}
fun getSupportedPluginTypes(): List<String> {
return pluginManager.plugins
.flatMap { pluginManager.getExtensionTypes(it.pluginId) }
@@ -113,7 +123,7 @@ class PluginService(
// Emit update event
val update = PluginUpdateDto(pluginId, config = config, configValidation = result)
pluginUpdates.tryEmitNext(update)
emit(update)
}
fun validatePluginConfig(pluginId: String, forceRevalidation: Boolean = false): PluginConfigValidationResult {
@@ -14,7 +14,7 @@ import reactor.core.publisher.Flux
class GameEndpoint(
private val gameService: GameService
) {
fun subscribe(): Flux<GameEvent> {
fun subscribe(): Flux<List<GameEvent>> {
return GameService.subscribe()
}
@@ -29,6 +29,8 @@ import org.springframework.transaction.annotation.Transactional
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import java.nio.file.Path
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.toJavaDuration
@Service
class GameService(
@@ -44,10 +46,13 @@ class GameService(
/* Websockets */
private val gameEvents = Sinks.many().multicast().onBackpressureBuffer<GameEvent>(1024, false)
fun subscribe(): Flux<GameEvent> {
fun subscribe(): Flux<List<GameEvent>> {
log.debug { "New subscription for gameUpdates" }
return gameEvents.asFlux()
.doOnSubscribe { log.debug { "Subscriber added to gameEvents [${gameEvents.currentSubscriberCount()}]" } }
.buffer(100.milliseconds.toJavaDuration())
.doOnSubscribe {
log.debug { "Subscriber added to gameEvents [${gameEvents.currentSubscriberCount()}]" }
}
.doFinally {
log.debug { "Subscriber removed from gameEvents with signal type $it [${gameEvents.currentSubscriberCount()}]" }
}
@@ -15,7 +15,7 @@ import reactor.core.publisher.Flux
class LibraryEndpoint(
private val libraryService: LibraryService
) {
fun subscribe(): Flux<LibraryEvent> {
fun subscribe(): Flux<List<LibraryEvent>> {
return LibraryService.subscribe()
}
@@ -15,7 +15,9 @@ import java.util.concurrent.Callable
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.measureTimedValue
import kotlin.time.toJavaDuration
@Service
class LibraryService(
@@ -32,10 +34,13 @@ class LibraryService(
/* Websockets */
private val libraryEvents = Sinks.many().multicast().onBackpressureBuffer<LibraryEvent>(1024, false)
fun subscribe(): Flux<LibraryEvent> {
fun subscribe(): Flux<List<LibraryEvent>> {
log.debug { "New subscription for libraryEvents" }
return libraryEvents.asFlux()
.doOnSubscribe { log.debug { "Subscriber added to libraryEvents [${libraryEvents.currentSubscriberCount()}]" } }
.buffer(100.milliseconds.toJavaDuration())
.doOnSubscribe {
log.debug { "Subscriber added to libraryEvents [${libraryEvents.currentSubscriberCount()}]" }
}
.doFinally {
log.debug { "Subscriber removed from libraryEvents with signal type $it [${libraryEvents.currentSubscriberCount()}]" }
}