From 809b79b6f6984b080bade9d980a3f253dc4c4341 Mon Sep 17 00:00:00 2001 From: litoral05 Date: Wed, 20 May 2026 09:11:24 +0100 Subject: [PATCH] Add live telemetry WebSocket broadcasting --- .../scheduler/AcquisitionSchedulerConfig.java | 2 +- .../AcquisitionSchedulerService.java | 9 ++++- .../websocket/config/WebSocketConfig.java | 25 +++++++++++++ .../telemetry/TelemetryBroadcastMessage.java | 13 +++++++ .../TelemetryWebSocketPublisher.java | 37 +++++++++++++++++++ 5 files changed, 84 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/litoralregas/backend/websocket/config/WebSocketConfig.java create mode 100644 src/main/java/com/litoralregas/backend/websocket/telemetry/TelemetryBroadcastMessage.java create mode 100644 src/main/java/com/litoralregas/backend/websocket/telemetry/TelemetryWebSocketPublisher.java diff --git a/src/main/java/com/litoralregas/backend/acquisition/scheduler/AcquisitionSchedulerConfig.java b/src/main/java/com/litoralregas/backend/acquisition/scheduler/AcquisitionSchedulerConfig.java index 9c4f87f..df75fb9 100644 --- a/src/main/java/com/litoralregas/backend/acquisition/scheduler/AcquisitionSchedulerConfig.java +++ b/src/main/java/com/litoralregas/backend/acquisition/scheduler/AcquisitionSchedulerConfig.java @@ -8,7 +8,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration public class AcquisitionSchedulerConfig { - @Bean + @Bean(name = "acquisitionTaskScheduler") public TaskScheduler acquisitionTaskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(1); diff --git a/src/main/java/com/litoralregas/backend/acquisition/scheduler/AcquisitionSchedulerService.java b/src/main/java/com/litoralregas/backend/acquisition/scheduler/AcquisitionSchedulerService.java index 81eff12..e4cf805 100644 --- a/src/main/java/com/litoralregas/backend/acquisition/scheduler/AcquisitionSchedulerService.java +++ b/src/main/java/com/litoralregas/backend/acquisition/scheduler/AcquisitionSchedulerService.java @@ -2,7 +2,9 @@ package com.litoralregas.backend.acquisition.scheduler; import com.litoralregas.backend.acquisition.polling.AcquisitionPollResult; import com.litoralregas.backend.acquisition.block.BlockPollingService; +import com.litoralregas.backend.websocket.telemetry.TelemetryWebSocketPublisher; import jakarta.annotation.PostConstruct; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Service; @@ -15,6 +17,7 @@ public class AcquisitionSchedulerService { private final BlockPollingService blockPollingService; private final AcquisitionSchedulerProperties properties; private final TaskScheduler taskScheduler; + private final TelemetryWebSocketPublisher telemetryWebSocketPublisher; private final AcquisitionRuntimeStatus runtimeStatus = new AcquisitionRuntimeStatus(); @@ -23,11 +26,13 @@ public class AcquisitionSchedulerService { public AcquisitionSchedulerService( BlockPollingService blockPollingService, AcquisitionSchedulerProperties properties, - TaskScheduler taskScheduler + @Qualifier("acquisitionTaskScheduler") TaskScheduler taskScheduler, + TelemetryWebSocketPublisher telemetryWebSocketPublisher ) { this.blockPollingService = blockPollingService; this.properties = properties; this.taskScheduler = taskScheduler; + this.telemetryWebSocketPublisher = telemetryWebSocketPublisher; } @PostConstruct @@ -65,6 +70,8 @@ public class AcquisitionSchedulerService { runtimeStatus.setLastSuccessfulReads(result.successfulReads()); runtimeStatus.setLastFailedReads(result.failedReads()); + telemetryWebSocketPublisher.publishLatestTelemetry(); + } catch (Exception exception) { runtimeStatus.setLastError(exception.getMessage()); diff --git a/src/main/java/com/litoralregas/backend/websocket/config/WebSocketConfig.java b/src/main/java/com/litoralregas/backend/websocket/config/WebSocketConfig.java new file mode 100644 index 0000000..8ce33b9 --- /dev/null +++ b/src/main/java/com/litoralregas/backend/websocket/config/WebSocketConfig.java @@ -0,0 +1,25 @@ +package com.litoralregas.backend.websocket.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.simp.config.MessageBrokerRegistry; +import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; +import org.springframework.web.socket.config.annotation.StompEndpointRegistry; +import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; + +@Configuration +@EnableWebSocketMessageBroker +public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { + + @Override + public void configureMessageBroker(MessageBrokerRegistry registry) { + registry.enableSimpleBroker("/topic"); + + registry.setApplicationDestinationPrefixes("/app"); + } + + @Override + public void registerStompEndpoints(StompEndpointRegistry registry) { + registry.addEndpoint("/ws") + .setAllowedOriginPatterns("*"); + } +} \ No newline at end of file diff --git a/src/main/java/com/litoralregas/backend/websocket/telemetry/TelemetryBroadcastMessage.java b/src/main/java/com/litoralregas/backend/websocket/telemetry/TelemetryBroadcastMessage.java new file mode 100644 index 0000000..fedcf70 --- /dev/null +++ b/src/main/java/com/litoralregas/backend/websocket/telemetry/TelemetryBroadcastMessage.java @@ -0,0 +1,13 @@ +package com.litoralregas.backend.websocket.telemetry; + +import com.litoralregas.backend.acquisition.telemetry.TelemetrySnapshot; + +import java.time.Instant; +import java.util.Collection; + +public record TelemetryBroadcastMessage( + Instant timestamp, + Integer sensorCount, + Collection snapshots +) { +} \ No newline at end of file diff --git a/src/main/java/com/litoralregas/backend/websocket/telemetry/TelemetryWebSocketPublisher.java b/src/main/java/com/litoralregas/backend/websocket/telemetry/TelemetryWebSocketPublisher.java new file mode 100644 index 0000000..5028410 --- /dev/null +++ b/src/main/java/com/litoralregas/backend/websocket/telemetry/TelemetryWebSocketPublisher.java @@ -0,0 +1,37 @@ +package com.litoralregas.backend.websocket.telemetry; + +import com.litoralregas.backend.acquisition.telemetry.TelemetryCache; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Service; + +import java.time.Instant; + +@Service +public class TelemetryWebSocketPublisher { + + private static final String DESTINATION = "/topic/telemetry/latest"; + + private final SimpMessagingTemplate messagingTemplate; + private final TelemetryCache telemetryCache; + + public TelemetryWebSocketPublisher( + SimpMessagingTemplate messagingTemplate, + TelemetryCache telemetryCache + ) { + this.messagingTemplate = messagingTemplate; + this.telemetryCache = telemetryCache; + } + + public void publishLatestTelemetry() { + TelemetryBroadcastMessage message = new TelemetryBroadcastMessage( + Instant.now(), + telemetryCache.getAll().size(), + telemetryCache.getAll() + ); + + messagingTemplate.convertAndSend( + DESTINATION, + message + ); + } +} \ No newline at end of file