Refactor telemetry pipeline into metadata-driven architecture
This commit is contained in:
@@ -74,6 +74,10 @@ public class BlockPollingService {
|
|||||||
Integer rawValue = rawValueForSensor(sensor, block, result);
|
Integer rawValue = rawValueForSensor(sensor, block, result);
|
||||||
Object value = convertValue(sensor, rawValue);
|
Object value = convertValue(sensor, rawValue);
|
||||||
|
|
||||||
|
if (value == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
telemetryCache.put(new TelemetrySnapshot(
|
telemetryCache.put(new TelemetrySnapshot(
|
||||||
sensor.getId(),
|
sensor.getId(),
|
||||||
sensor.getKey(),
|
sensor.getKey(),
|
||||||
@@ -123,21 +127,68 @@ public class BlockPollingService {
|
|||||||
return result.values().get(offset);
|
return result.values().get(offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Object convertValue(SensorDefinition sensor, Integer rawValue) {
|
private Object convertValue(
|
||||||
|
SensorDefinition sensor,
|
||||||
|
Integer rawValue
|
||||||
|
) {
|
||||||
if (sensor.getValueType() == SensorValueType.BOOLEAN) {
|
if (sensor.getValueType() == SensorValueType.BOOLEAN) {
|
||||||
|
|
||||||
Integer bitOffset = sensor.getBitOffset();
|
Integer bitOffset = sensor.getBitOffset();
|
||||||
|
|
||||||
if (bitOffset == null) {
|
if (bitOffset == null) {
|
||||||
throw new IllegalStateException("BOOLEAN sensor requires bitOffset.");
|
throw new IllegalStateException(
|
||||||
|
"BOOLEAN sensor requires bitOffset."
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ((rawValue >> bitOffset) & 1) == 1;
|
return ((rawValue >> bitOffset) & 1) == 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sensor.getValueType() == SensorValueType.DECIMAL) {
|
int decodedRawValue = decodeSignedRawValue(
|
||||||
return rawValue / Math.pow(10, sensor.getDecimalPlaces());
|
rawValue,
|
||||||
|
sensor.getSigned()
|
||||||
|
);
|
||||||
|
|
||||||
|
double scaledValue =
|
||||||
|
decodedRawValue * sensor.getScaleFactor();
|
||||||
|
|
||||||
|
if (!isWithinValidRange(sensor, scaledValue)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sensor.getValueType() == SensorValueType.INTEGER) {
|
||||||
|
return (int) Math.round(scaledValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
return scaledValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int decodeSignedRawValue(
|
||||||
|
Integer rawValue,
|
||||||
|
Boolean signed
|
||||||
|
) {
|
||||||
|
if (rawValue == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Boolean.TRUE.equals(signed) && rawValue > 32767) {
|
||||||
|
return rawValue - 65536;
|
||||||
}
|
}
|
||||||
|
|
||||||
return rawValue;
|
return rawValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isWithinValidRange(
|
||||||
|
SensorDefinition sensor,
|
||||||
|
double value
|
||||||
|
) {
|
||||||
|
Double validMin = sensor.getValidMin();
|
||||||
|
Double validMax = sensor.getValidMax();
|
||||||
|
|
||||||
|
if (validMin != null && value < validMin) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return validMax == null || value <= validMax;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
+49
-11
@@ -53,10 +53,13 @@ public class SensorTelemetryReader {
|
|||||||
|
|
||||||
Integer rawValue = result.values().getFirst();
|
Integer rawValue = result.values().getFirst();
|
||||||
|
|
||||||
Object value = convertValue(
|
Object value = convertValue(sensorDefinition, rawValue);
|
||||||
sensorDefinition,
|
|
||||||
rawValue
|
if (value == null) {
|
||||||
);
|
throw new IllegalStateException(
|
||||||
|
"Sensor value is invalid: " + sensorDefinition.getKey()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
TelemetrySnapshot snapshot = new TelemetrySnapshot(
|
TelemetrySnapshot snapshot = new TelemetrySnapshot(
|
||||||
sensorDefinition.getId(),
|
sensorDefinition.getId(),
|
||||||
@@ -80,9 +83,7 @@ public class SensorTelemetryReader {
|
|||||||
SensorDefinition sensorDefinition,
|
SensorDefinition sensorDefinition,
|
||||||
Integer rawValue
|
Integer rawValue
|
||||||
) {
|
) {
|
||||||
|
|
||||||
if (sensorDefinition.getValueType() == SensorValueType.BOOLEAN) {
|
if (sensorDefinition.getValueType() == SensorValueType.BOOLEAN) {
|
||||||
|
|
||||||
Integer bitOffset = sensorDefinition.getBitOffset();
|
Integer bitOffset = sensorDefinition.getBitOffset();
|
||||||
|
|
||||||
if (bitOffset == null) {
|
if (bitOffset == null) {
|
||||||
@@ -94,14 +95,51 @@ public class SensorTelemetryReader {
|
|||||||
return ((rawValue >> bitOffset) & 1) == 1;
|
return ((rawValue >> bitOffset) & 1) == 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sensorDefinition.getValueType() == SensorValueType.DECIMAL) {
|
int decodedRawValue = decodeSignedRawValue(
|
||||||
|
rawValue,
|
||||||
|
sensorDefinition.getSigned()
|
||||||
|
);
|
||||||
|
|
||||||
return rawValue / Math.pow(
|
double scaledValue =
|
||||||
10,
|
decodedRawValue * sensorDefinition.getScaleFactor();
|
||||||
sensorDefinition.getDecimalPlaces()
|
|
||||||
);
|
if (!isWithinValidRange(sensorDefinition, scaledValue)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sensorDefinition.getValueType() == SensorValueType.INTEGER) {
|
||||||
|
return (int) Math.round(scaledValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
return scaledValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int decodeSignedRawValue(
|
||||||
|
Integer rawValue,
|
||||||
|
Boolean signed
|
||||||
|
) {
|
||||||
|
if (rawValue == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Boolean.TRUE.equals(signed) && rawValue > 32767) {
|
||||||
|
return rawValue - 65536;
|
||||||
}
|
}
|
||||||
|
|
||||||
return rawValue;
|
return rawValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isWithinValidRange(
|
||||||
|
SensorDefinition sensorDefinition,
|
||||||
|
double value
|
||||||
|
) {
|
||||||
|
Double validMin = sensorDefinition.getValidMin();
|
||||||
|
Double validMax = sensorDefinition.getValidMax();
|
||||||
|
|
||||||
|
if (validMin != null && value < validMin) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return validMax == null || value <= validMax;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -49,6 +49,18 @@ public class SensorDefinition {
|
|||||||
@Column(name = "created_at", nullable = false)
|
@Column(name = "created_at", nullable = false)
|
||||||
private Instant createdAt;
|
private Instant createdAt;
|
||||||
|
|
||||||
|
@Column(name = "scale_factor", nullable = false)
|
||||||
|
private Double scaleFactor;
|
||||||
|
|
||||||
|
@Column(name = "signed", nullable = false)
|
||||||
|
private Boolean signed;
|
||||||
|
|
||||||
|
@Column(name = "valid_min")
|
||||||
|
private Double validMin;
|
||||||
|
|
||||||
|
@Column(name = "valid_max")
|
||||||
|
private Double validMax;
|
||||||
|
|
||||||
protected SensorDefinition() {
|
protected SensorDefinition() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,7 +75,11 @@ public class SensorDefinition {
|
|||||||
Integer decimalPlaces,
|
Integer decimalPlaces,
|
||||||
SensorSourceType sourceType,
|
SensorSourceType sourceType,
|
||||||
Integer pollingIntervalSeconds,
|
Integer pollingIntervalSeconds,
|
||||||
Boolean enabled
|
Boolean enabled,
|
||||||
|
Double scaleFactor,
|
||||||
|
Boolean signed,
|
||||||
|
Double validMin,
|
||||||
|
Double validMax
|
||||||
) {
|
) {
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.name = name;
|
this.name = name;
|
||||||
@@ -77,6 +93,17 @@ public class SensorDefinition {
|
|||||||
this.pollingIntervalSeconds = pollingIntervalSeconds;
|
this.pollingIntervalSeconds = pollingIntervalSeconds;
|
||||||
this.enabled = enabled;
|
this.enabled = enabled;
|
||||||
this.createdAt = Instant.now();
|
this.createdAt = Instant.now();
|
||||||
|
this.scaleFactor = scaleFactor != null
|
||||||
|
? scaleFactor
|
||||||
|
: Math.pow(10, -decimalPlaces);
|
||||||
|
|
||||||
|
this.signed = signed != null
|
||||||
|
? signed
|
||||||
|
: false;
|
||||||
|
|
||||||
|
this.validMin = validMin;
|
||||||
|
|
||||||
|
this.validMax = validMax;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getId() {
|
public Integer getId() {
|
||||||
@@ -178,4 +205,36 @@ public class SensorDefinition {
|
|||||||
public void setCreatedAt(Instant createdAt) {
|
public void setCreatedAt(Instant createdAt) {
|
||||||
this.createdAt = createdAt;
|
this.createdAt = createdAt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Double getScaleFactor(){
|
||||||
|
return scaleFactor;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setScaleFactor(Double scaleFactor) {
|
||||||
|
this.scaleFactor = scaleFactor;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getSigned() {
|
||||||
|
return signed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSigned(Boolean signed) {
|
||||||
|
this.signed = signed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Double getValidMin() {
|
||||||
|
return validMin;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setValidMin(Double validMin) {
|
||||||
|
this.validMin = validMin;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Double getValidMax() {
|
||||||
|
return validMax;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setValidMax(Double validMax) {
|
||||||
|
this.validMax = validMax;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -8,6 +8,10 @@ public record SensorDefinitionConfig(
|
|||||||
String valueType,
|
String valueType,
|
||||||
String unit,
|
String unit,
|
||||||
Integer decimalPlaces,
|
Integer decimalPlaces,
|
||||||
|
Double scaleFactor,
|
||||||
|
Boolean signed,
|
||||||
|
Double validMin,
|
||||||
|
Double validMax,
|
||||||
Integer pollingIntervalSeconds,
|
Integer pollingIntervalSeconds,
|
||||||
Boolean enabled
|
Boolean enabled
|
||||||
) {
|
) {
|
||||||
|
|||||||
+6
-1
@@ -55,7 +55,12 @@ public class SensorDefinitionImportService {
|
|||||||
config.decimalPlaces(),
|
config.decimalPlaces(),
|
||||||
SensorSourceType.MODBUS,
|
SensorSourceType.MODBUS,
|
||||||
config.pollingIntervalSeconds(),
|
config.pollingIntervalSeconds(),
|
||||||
config.enabled()
|
config.enabled(),
|
||||||
|
|
||||||
|
config.scaleFactor(),
|
||||||
|
config.signed(),
|
||||||
|
config.validMin(),
|
||||||
|
config.validMax()
|
||||||
);
|
);
|
||||||
|
|
||||||
repository.save(sensorDefinition);
|
repository.save(sensorDefinition);
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -17,6 +17,14 @@ CREATE TABLE sensor_definition (
|
|||||||
|
|
||||||
decimal_places INTEGER NOT NULL DEFAULT 0,
|
decimal_places INTEGER NOT NULL DEFAULT 0,
|
||||||
|
|
||||||
|
scale_factor REAL NOT NULL DEFAULT 1.0,
|
||||||
|
|
||||||
|
signed BOOLEAN NOT NULL DEFAULT FALSE,
|
||||||
|
|
||||||
|
valid_min REAL,
|
||||||
|
|
||||||
|
valid_max REAL,
|
||||||
|
|
||||||
source_type VARCHAR(50) NOT NULL,
|
source_type VARCHAR(50) NOT NULL,
|
||||||
|
|
||||||
polling_interval_seconds INTEGER NOT NULL DEFAULT 1,
|
polling_interval_seconds INTEGER NOT NULL DEFAULT 1,
|
||||||
|
|||||||
Reference in New Issue
Block a user