Improve log streaming performance by using Coroutines instead of platform threads

Refactor log streaming to use a replay which eliminates the need to read the log file each time for a new subscriber and instead caches the elements directly in the sink
Fix bug in log tailing (was running with two threads instead of one)
This commit is contained in:
grimsi
2024-09-21 14:37:59 +02:00
parent 75feb614e6
commit 9219b6747a
6 changed files with 91 additions and 41 deletions
@@ -8,19 +8,17 @@ import {Button, Code, Divider, Tooltip} from "@nextui-org/react";
import {ArrowUDownLeft, SortAscending} from "@phosphor-icons/react";
function LogManagementLayout({getConfig, formik}: any) {
const [logEntries, setLogEntries] = useState<string[]>([]);
const [autoScroll, setAutoScroll] = useState(true);
const [softWrap, setSoftWrap] = useState(false);
const subscribed = useRef(false);
const logEndRef = useRef<null | HTMLDivElement>(null);
useEffect(() => {
if (subscribed.current) return;
LogEndpoint.getApplicationLogs().onNext((newEntry: string | undefined) =>
const sub = LogEndpoint.getApplicationLogs().onNext((newEntry: string | undefined) =>
setLogEntries((currentEntries) => [...currentEntries, newEntry as string])
);
subscribed.current = true;
return () => sub.cancel();
}, []);
useEffect(() => {
@@ -29,4 +29,5 @@ const menuItems: MenuItem[] = [
}
]
export const AdministrationView = withSideMenu(menuItems);
export const AdministrationView = withSideMenu(menuItems);
export default AdministrationView;
+2 -1
View File
@@ -19,4 +19,5 @@ const menuItems = [
}
]
export const ProfileView = withSideMenu(menuItems);
export const ProfileView = withSideMenu(menuItems);
export default ProfileView;
@@ -19,7 +19,6 @@ class LogEndpoint(
// FIXME: see https://vaadin.com/forum/t/can-only-access-flux-endpoint-with-anonymousallowed/167117
@AnonymousAllowed
fun getApplicationLogs(): Flux<String> {
return logService.getInitialLogs()
.concatWith(logService.streamLogs())
return logService.streamLogs()
}
}
@@ -4,9 +4,8 @@ import ch.qos.logback.classic.LoggerContext
import ch.qos.logback.classic.joran.JoranConfigurator
import de.grimsi.gameyfin.config.ConfigProperties
import de.grimsi.gameyfin.config.ConfigService
import de.grimsi.gameyfin.logs.util.AsyncFileTailer
import io.github.oshai.kotlinlogging.KotlinLogging
import org.apache.commons.io.input.Tailer
import org.apache.commons.io.input.TailerListenerAdapter
import org.slf4j.LoggerFactory
import org.springframework.boot.context.event.ApplicationStartedEvent
import org.springframework.boot.logging.LogLevel
@@ -15,10 +14,11 @@ import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.time.Duration
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
@Service
class LogService(
@@ -28,14 +28,15 @@ class LogService(
companion object {
private const val LOG_CONFIG_TEMPLATE = "log-config-template.xml"
private const val LOG_FILE_NAME = "gameyfin"
private val LOG_REFRESH_INTERVAL = Duration.ofSeconds(5)
private val LOG_REFRESH_INTERVAL = 5.seconds
private val LOG_STREAM_RETENTION = 1.days
}
private val log = KotlinLogging.logger {}
private var logFilePath: Path = Paths.get(config.get(ConfigProperties.LogsFolder)!!, "$LOG_FILE_NAME.log")
private val sink: Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
private var logFilePath: Path? = null
private val sink: Sinks.Many<String> = Sinks.many().replay().limit(LOG_STREAM_RETENTION.toJavaDuration())
private var tailer: AsyncFileTailer? = null
@EventListener(ApplicationStartedEvent::class)
fun configureFileLogging() {
@@ -46,21 +47,6 @@ class LogService(
)
}
init {
val tailer = Tailer.builder()
.setFile(logFilePath.toFile())
.setTailerListener(object : TailerListenerAdapter() {
override fun handle(line: String) {
sink.tryEmitNext(line)
}
})
.setDelayDuration(LOG_REFRESH_INTERVAL)
.setTailFromEnd(true)
.get()
Thread(tailer).start()
}
fun configureFileLogging(folder: String, maxHistoryDays: Int, level: LogLevel) {
val context = LoggerFactory.getILoggerFactory() as LoggerContext
val configurator = JoranConfigurator()
@@ -71,7 +57,15 @@ class LogService(
log.info { "Setting log level to $level" }
log.info { "Setting log retention to $maxHistoryDays days" }
configurator.doConfigure(it)
logFilePath = Paths.get(config.get(ConfigProperties.LogsFolder)!!, "$LOG_FILE_NAME.log")
val newLogFilePath = Paths.get(folder, "$LOG_FILE_NAME.log")
if (newLogFilePath != logFilePath) {
logFilePath = newLogFilePath
tailer?.stopTailing()
tailer = AsyncFileTailer(newLogFilePath.toFile(), LOG_REFRESH_INTERVAL, sink)
tailer?.startTailing()
}
}
}
@@ -79,20 +73,13 @@ class LogService(
return sink.asFlux()
}
fun getInitialLogs(): Flux<String> {
return Flux.fromStream(Files.lines(logFilePath))
}
private fun generateLogConfigXml(
folder: String,
maxHistoryDays: Int,
level: LogLevel
): InputStream {
val template = javaClass.classLoader.getResourceAsStream(LOG_CONFIG_TEMPLATE)
if (template == null) {
throw IllegalStateException("Log config template not found")
}
?: throw IllegalStateException("Log config template not found")
val templateString = template.bufferedReader().use { it.readText() }
return templateString
@@ -0,0 +1,64 @@
package de.grimsi.gameyfin.logs.util
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import org.apache.commons.io.input.Tailer
import org.apache.commons.io.input.TailerListenerAdapter
import reactor.core.publisher.Sinks
import java.io.File
import kotlin.time.Duration
import kotlin.time.toJavaDuration
/**
* Wraps Tailer from Apache Commons IO to tail a file asynchronously using Kotlin Coroutines.
* Results are emitted to a sink
*
* @param file The file to tail
* @param interval The interval to check for new lines
* @param sink The sink to emit new lines to
* @see Tailer
*/
class AsyncFileTailer(
private val file: File,
interval: Duration,
private val sink: Sinks.Many<String>
) {
private val log = KotlinLogging.logger {}
private var tailerJob: Job? = null
private val tailer = Tailer.builder()
.setFile(file)
.setTailerListener(object : TailerListenerAdapter() {
override fun handle(line: String?) {
line?.let { sink.tryEmitNext(it) }
}
})
// Who tf thought it was a good idea to start a thread by default?
.setStartThread(false)
.setDelayDuration(interval.toJavaDuration())
.get()
fun startTailing() {
if (tailerJob == null || tailerJob?.isCancelled == true) {
tailerJob = CoroutineScope(Dispatchers.IO).launch {
tailer.run()
}
log.info { "Started tailing the file: ${file.name}" }
} else {
log.error { "File tailing for file ${file.name} is already running!" }
}
}
fun stopTailing() {
tailerJob?.let {
it.cancel()
tailerJob = null
log.info { "Stopped tailing the file: ${file.name}" }
}
}
}