Add live telemetry WebSocket broadcasting
This commit is contained in:
+1
-1
@@ -8,7 +8,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
|||||||
@Configuration
|
@Configuration
|
||||||
public class AcquisitionSchedulerConfig {
|
public class AcquisitionSchedulerConfig {
|
||||||
|
|
||||||
@Bean
|
@Bean(name = "acquisitionTaskScheduler")
|
||||||
public TaskScheduler acquisitionTaskScheduler() {
|
public TaskScheduler acquisitionTaskScheduler() {
|
||||||
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
|
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
|
||||||
scheduler.setPoolSize(1);
|
scheduler.setPoolSize(1);
|
||||||
|
|||||||
+8
-1
@@ -2,7 +2,9 @@ package com.litoralregas.backend.acquisition.scheduler;
|
|||||||
|
|
||||||
import com.litoralregas.backend.acquisition.polling.AcquisitionPollResult;
|
import com.litoralregas.backend.acquisition.polling.AcquisitionPollResult;
|
||||||
import com.litoralregas.backend.acquisition.block.BlockPollingService;
|
import com.litoralregas.backend.acquisition.block.BlockPollingService;
|
||||||
|
import com.litoralregas.backend.websocket.telemetry.TelemetryWebSocketPublisher;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.scheduling.TaskScheduler;
|
import org.springframework.scheduling.TaskScheduler;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
@@ -15,6 +17,7 @@ public class AcquisitionSchedulerService {
|
|||||||
private final BlockPollingService blockPollingService;
|
private final BlockPollingService blockPollingService;
|
||||||
private final AcquisitionSchedulerProperties properties;
|
private final AcquisitionSchedulerProperties properties;
|
||||||
private final TaskScheduler taskScheduler;
|
private final TaskScheduler taskScheduler;
|
||||||
|
private final TelemetryWebSocketPublisher telemetryWebSocketPublisher;
|
||||||
|
|
||||||
private final AcquisitionRuntimeStatus runtimeStatus = new AcquisitionRuntimeStatus();
|
private final AcquisitionRuntimeStatus runtimeStatus = new AcquisitionRuntimeStatus();
|
||||||
|
|
||||||
@@ -23,11 +26,13 @@ public class AcquisitionSchedulerService {
|
|||||||
public AcquisitionSchedulerService(
|
public AcquisitionSchedulerService(
|
||||||
BlockPollingService blockPollingService,
|
BlockPollingService blockPollingService,
|
||||||
AcquisitionSchedulerProperties properties,
|
AcquisitionSchedulerProperties properties,
|
||||||
TaskScheduler taskScheduler
|
@Qualifier("acquisitionTaskScheduler") TaskScheduler taskScheduler,
|
||||||
|
TelemetryWebSocketPublisher telemetryWebSocketPublisher
|
||||||
) {
|
) {
|
||||||
this.blockPollingService = blockPollingService;
|
this.blockPollingService = blockPollingService;
|
||||||
this.properties = properties;
|
this.properties = properties;
|
||||||
this.taskScheduler = taskScheduler;
|
this.taskScheduler = taskScheduler;
|
||||||
|
this.telemetryWebSocketPublisher = telemetryWebSocketPublisher;
|
||||||
}
|
}
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
@@ -65,6 +70,8 @@ public class AcquisitionSchedulerService {
|
|||||||
runtimeStatus.setLastSuccessfulReads(result.successfulReads());
|
runtimeStatus.setLastSuccessfulReads(result.successfulReads());
|
||||||
runtimeStatus.setLastFailedReads(result.failedReads());
|
runtimeStatus.setLastFailedReads(result.failedReads());
|
||||||
|
|
||||||
|
telemetryWebSocketPublisher.publishLatestTelemetry();
|
||||||
|
|
||||||
} catch (Exception exception) {
|
} catch (Exception exception) {
|
||||||
runtimeStatus.setLastError(exception.getMessage());
|
runtimeStatus.setLastError(exception.getMessage());
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,25 @@
|
|||||||
|
package com.litoralregas.backend.websocket.config;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
|
||||||
|
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
|
||||||
|
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
|
||||||
|
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableWebSocketMessageBroker
|
||||||
|
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configureMessageBroker(MessageBrokerRegistry registry) {
|
||||||
|
registry.enableSimpleBroker("/topic");
|
||||||
|
|
||||||
|
registry.setApplicationDestinationPrefixes("/app");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerStompEndpoints(StompEndpointRegistry registry) {
|
||||||
|
registry.addEndpoint("/ws")
|
||||||
|
.setAllowedOriginPatterns("*");
|
||||||
|
}
|
||||||
|
}
|
||||||
+13
@@ -0,0 +1,13 @@
|
|||||||
|
package com.litoralregas.backend.websocket.telemetry;
|
||||||
|
|
||||||
|
import com.litoralregas.backend.acquisition.telemetry.TelemetrySnapshot;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
public record TelemetryBroadcastMessage(
|
||||||
|
Instant timestamp,
|
||||||
|
Integer sensorCount,
|
||||||
|
Collection<TelemetrySnapshot> snapshots
|
||||||
|
) {
|
||||||
|
}
|
||||||
+37
@@ -0,0 +1,37 @@
|
|||||||
|
package com.litoralregas.backend.websocket.telemetry;
|
||||||
|
|
||||||
|
import com.litoralregas.backend.acquisition.telemetry.TelemetryCache;
|
||||||
|
import org.springframework.messaging.simp.SimpMessagingTemplate;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class TelemetryWebSocketPublisher {
|
||||||
|
|
||||||
|
private static final String DESTINATION = "/topic/telemetry/latest";
|
||||||
|
|
||||||
|
private final SimpMessagingTemplate messagingTemplate;
|
||||||
|
private final TelemetryCache telemetryCache;
|
||||||
|
|
||||||
|
public TelemetryWebSocketPublisher(
|
||||||
|
SimpMessagingTemplate messagingTemplate,
|
||||||
|
TelemetryCache telemetryCache
|
||||||
|
) {
|
||||||
|
this.messagingTemplate = messagingTemplate;
|
||||||
|
this.telemetryCache = telemetryCache;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void publishLatestTelemetry() {
|
||||||
|
TelemetryBroadcastMessage message = new TelemetryBroadcastMessage(
|
||||||
|
Instant.now(),
|
||||||
|
telemetryCache.getAll().size(),
|
||||||
|
telemetryCache.getAll()
|
||||||
|
);
|
||||||
|
|
||||||
|
messagingTemplate.convertAndSend(
|
||||||
|
DESTINATION,
|
||||||
|
message
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user