Ecosistema — Librerías utilizadas
Problemas resueltos en producción
| Problema | Técnica |
|---|---|
| Telemetría hot+cold transparente | thenCombine con routing por rango de fechas |
| Hadoop en 10 MB (sin Kerberos/Netty/Jetty) | 14 exclusiones en Gradle + CVE constraints |
| Parquet en memoria sin disco temporal | ByteArrayOutputFile implementando interfaz OutputFile |
| WebSocket Matter 10 MB sin CloseStatus 1009 | Buffer override + exponential backoff + watchdog timer |
| Normalización IPv6 para Z-Wave | InetAddress.getByName() como normalizador |
| Parsing EDN sin librería Clojure | Auto-detección JSON/EDN + parser regex mínimo |
| Ownership sin N+1 ni JWT bloat | JOIN FETCH en una sola query por request |
Lenguaje
| Tema | Nivel |
|---|---|
| CompletableFuture | thenCombine, orTimeout, allOf, request/response MQTT sin reactive |
| Hook pattern con ConcurrentHashMap | Correlación async con predicados y cleanup automático |
| Streams y functional programming | filter, map, toList(), reduce para bitmasks, pipelines |
| Annotations Spring | @Async, @Scheduled, @ConditionalOnProperty, @Transactional |
| Thread safety | AtomicBoolean, ConcurrentHashMap, synchronized, CountDownLatch |
| Switch expressions (Java 14+) | Dispatch exhaustivo por protocolo, enum, estado Athena |
| ProcessBuilder | Subprocesos sin shell — sin injection |
| NIO y permisos POSIX | Escritura atómica ATOMIC_MOVE, PosixFilePermission |
| Records (Java 14+) | Value types inmutables para DTOs e internos |
| Pattern matching instanceof (Java 16+) | Type-safe casting sin cast explícito |
| Generics y TypeReference | Deserialización JSON a tipos genéricos sin unchecked warnings |
| Enums con comportamiento | Switch expression dentro del enum |
| try-with-resources | Parquet writer, streams, conexiones |
Patrones de diseño
| Patrón | Implementación |
|---|---|
| Facade | GatewayApiService — punto único para todos los protocolos |
| Event-Driven ReportHandlers | Z-Wave/Zigbee/Matter → DeviceService → TelemetryBuffer |
| Multi-Tier Storage | DynamoDB (hot 48h) + S3/Athena (cold) con routing transparente |
| Factory con Fallback Classpath | Device descriptors: filesystem override → classpath default |
| Seguridad en profundidad | Firewall + OpenSSH restrict + usuario restringido |
Ver Arquitectura del sistema para diagramas de capas y flujo de comandos.
| Librería | Versión | Uso |
|---|---|---|
| Spring Boot | 4.0.2 | Framework principal — DI, REST, scheduling, lifecycle |
| Spring Data JPA | (BOM) | ORM sobre Hibernate — repos auto-generados |
| Spring Security | (BOM) | Filtros de auth, JWT, rutas públicas/protegidas |
| Spring WebSocket | (BOM) | Cliente WebSocket para Matter server (gateway-side) |
| Hibernate ORM | 7.x (BOM) | JPA provider; PostgreSQL (cloud) y SQLite (gateway) |
| Lombok | 1.18.36 | Genera getters/setters/@Slf4j en compile-time |
| SpringDoc OpenAPI | 3.0.1 | Swagger UI + OpenAPI 3.0 autodocs |
| Librería | Versión | Dónde | Uso |
|---|---|---|---|
| PostgreSQL JDBC | (latest) | cloud-side | Base de datos relacional para users/gateways/tunnels |
| HikariCP | (BOM) | cloud-side | Connection pooling (bundled con Spring Data JPA) |
| Flyway | 1.x | cloud-side | Migraciones de schema versionadas (V1–V3 SQL) |
| SQLite JDBC | 3.47.2.0 | gateway-side | BD embebida para device state (max 1 conexión) |
| Librería | Versión | Uso |
|---|---|---|
| AWS IoT Device SDK | 1.30.0 | MQTT5 con mTLS (gateway) y WebSocket+SigV4 (cloud) |
| aws-crt | 0.38.13 | Bindings C nativos para el stack de red AWS |
| DynamoDB | 2.41.23 | Telemetría hot (48h TTL) — cliente async |
| S3 | 2.41.23 | Archivo frío de telemetría en Parquet |
| Athena | 2.41.23 | SQL sobre datos fríos en S3 — cliente async |
| Lightsail | 2.41.23 | Gestión de firewall para túneles SSH |
| STS | 2.33.10 | Credentials temporales |
| Librería | Versión | Uso |
|---|---|---|
| Jackson | 2.18.2 | JSON principal — TypeReference<> para deserialización polimórfica |
| JSON.org | 20240303 | Parsing alternativo en gateway-side |
| Apache Parquet | 1.15.1 | Formato columnar para archivo frío en S3 |
| Apache Avro | 1.12.0 | Schema para los archivos Parquet |
| Apache Hadoop | 3.4.1 | Solo hadoop-common — 14 exclusiones de deps para minimizar footprint |
| Librería | Versión | Uso |
|---|---|---|
| JJWT | 0.12.6 | Generación y validación de tokens JWT (HS256 en gateway, RS256 en cloud) |
| jjwt-api / jjwt-impl / jjwt-jackson | 0.12.6 | Las tres partes del stack JJWT |
| Librería | Fuente | Uso |
|---|---|---|
| zipgateway-codec | JARs locales en libs/ | Frames binarios Z-Wave (Z/IP protocol) |
| zipparser-java | JARs locales en libs/ | Parser del protocolo ZIP para Z-Wave |
| Librería | Uso |
|---|---|
| JUnit 5 | Tests unitarios e integración |
| Mockito | Mocking de servicios y repos |
| AssertJ | Assertions fluidas |
| Testcontainers | PostgreSQL real en contenedor para tests de integración |
| MockMvc | Tests de controllers HTTP |
El mecanismo de concurrencia central en ambos proyectos. Sin RxJava, sin WebFlux.
public CompletableFuture<Map> sendAsync(String gwId, Map command) {
String correlationId = UUID.randomUUID().toString();
CompletableFuture<String> future = new CompletableFuture<>();
pending.put(correlationId, future); // registrar ANTES de publicar
publishMqtt(gwId, correlationId, command);
return future
.orTimeout(30, TimeUnit.SECONDS)
.thenApply(json -> jackson.readValue(json, Map.class))
.exceptionally(ex -> Map.of("error", ex.getMessage()))
.whenComplete((r, ex) -> pending.remove(correlationId));
}
// Merge paralelo hot+cold
CompletableFuture<List> hot = dynamoService.query(gwId, from, to);
CompletableFuture<List> cold = athenaService.query(gwId, from, to);
return hot.thenCombine(cold, (h, c) -> merge(h, c));
// Múltiples gateways en paralelo
List<CompletableFuture<GwSummary>> futures = gateways.stream()
.map(gw -> CompletableFuture.supplyAsync(() -> summarize(gw)))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
Correlación request/response async — el mismo patrón del hook de Clojure, implementado con CompletableFuture:
private final ConcurrentHashMap<String, CompletableFuture<String>> pending = new ConcurrentHashMap<>();
public CompletableFuture<String> register(String id) {
var future = new CompletableFuture<String>();
pending.put(id, future);
return future;
}
public void complete(String id, String payload) {
var future = pending.remove(id);
if (future != null) future.complete(payload);
}
// Cleanup fallback cada 60s (futures ya expiradas por orTimeout)
@Scheduled(fixedDelay = 60_000)
public void cleanupExpired() {
pending.entrySet().removeIf(e -> e.getValue().isDone());
}
// gateway-side: misma idea con Predicate para matching por contenido
record PendingHook(Predicate<Map<String, Object>> matcher, CompletableFuture<Map<String, Object>> future) {}
private final ConcurrentHashMap<String, PendingHook> hooks = new ConcurrentHashMap<>();
record LoginRequest(String username, String password) {}
record LoginResponse(String token, List<String> gateways) {}
record RegisterRequest(String username, String password, String adminKey) {}
record PendingHook(Predicate<Map<String, Object>> matcher, CompletableFuture<Map<String, Object>> future) {}
record ShellResult(int exitCode, String stdout, String stderr) {}
record AggregatedPoint(String timestamp, double value) {}
yield switch (dev.getProtocol()) {
case "zwave" -> zwaveController.parseDevice(dev);
case "zigbee" -> zigbeeController.parseDevice(dev);
case "matter" -> matterController.parseDevice(dev);
default -> Map.of("id", dev.getId(), "protocol", dev.getProtocol());
};
double result = switch (fn) {
case AVG -> values.stream().mapToDouble(Double::doubleValue).average().orElse(0);
case MIN -> values.stream().mapToDouble(Double::doubleValue).min().orElse(0);
case MAX -> values.stream().mapToDouble(Double::doubleValue).max().orElse(0);
case SUM -> values.stream().mapToDouble(Double::doubleValue).sum();
case COUNT -> values.size();
};
switch (status.state()) {
case SUCCEEDED -> future.complete(results);
case FAILED, CANCELLED -> future.completeExceptionally(new RuntimeException(reason));
default -> scheduleNextCheck();
}
if (payload instanceof Map<?, ?> m && m.containsKey("nodeId")) {
processNode((String) m.get("nodeId"));
}
if (event.get("event_type") instanceof String type && type.equals("node_added")) {
handleNodeAdded(event);
}
List<Device> locks = deviceService.findAll().stream()
.filter(d -> d.getType().equals("lock"))
.filter(d -> d.getProtocol().equals("zwave"))
.sorted(Comparator.comparing(Device::getName))
.toList();
boolean owns = user.getGateways().stream()
.anyMatch(gw -> gw.getId().equals(requestedGwId));
// Reduce para bitmask de días de la semana
int bitmask = days.stream()
.mapToInt(day -> 1 << DAY_ORDER.indexOf(day))
.reduce(0, (a, b) -> a | b);
List<SshTunnel> tunnels = Arrays.stream(lsofOutput.split("\n"))
.skip(1)
.map(this::parseLsofLine)
.filter(t -> t.proc().equals("sshd"))
.filter(t -> t.port() >= PORT_POOL_START)
.toList();
// Lifecycle
@PostConstruct // init del cliente MQTT
@PreDestroy // cleanup de executors
// Concurrencia
@Async("gatewayExecutor") // thread pool dedicado (core=10, max=20, queue=50)
@Scheduled(cron = "0 0 2 * * *") // daily a las 2AM UTC
@Scheduled(fixedDelay = 60_000)
// Conditional beans
@ConditionalOnProperty(name = "athena.enabled", havingValue = "true")
@ConditionalOnProperty(name = "archive.enabled", havingValue = "true")
// Security
@AuthenticationPrincipal UserDetails user
// JPA
@Transactional(readOnly = true)
@Column(columnDefinition = "TEXT") // JSON columns en SQLite
// Testing
@Nested @DisplayName("cuando el gateway no existe")
@MockitoBean // Spring Boot 3.4+ reemplaza @MockBean
// Escritura atómica de authorized_keys (evita race condition con sshd)
Path tmp = Files.createTempFile(parent, ".authorized_keys", ".tmp");
Files.writeString(tmp, newContent);
Files.move(tmp, target, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
// Permisos POSIX para private key (0400)
Path keyFile = Files.createFile(path,
PosixFilePermissions.asFileAttribute(
Set.of(PosixFilePermission.OWNER_READ)));
try {
setPosixPermissions(keyFile);
} catch (UnsupportedOperationException e) {
log.warn("POSIX permissions not supported on this OS");
}
Map<String, Object> payload = jackson.readValue(
json,
new TypeReference<Map<String, Object>>() {}
);
ConcurrentHashMap<String, ConcurrentHashMap<String, PendingHook>> hooks =
new ConcurrentHashMap<>();
public enum AggregationFn {
AVG, MIN, MAX, SUM, COUNT;
public double apply(List<Double> values) {
return switch (this) {
case AVG -> values.stream().mapToDouble(d -> d).average().orElse(0);
case MIN -> values.stream().mapToDouble(d -> d).min().orElse(0);
case MAX -> values.stream().mapToDouble(d -> d).max().orElse(0);
case SUM -> values.stream().mapToDouble(d -> d).sum();
case COUNT -> values.size();
};
}
}
@Enumerated(EnumType.STRING)
private GatewayStatus status;
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(outputFile)
.withSchema(SCHEMA)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()) {
for (TelemetryRecord record : records) {
writer.write(toAvroRecord(record));
}
}
// Sin shell → sin injection
ProcessBuilder pb = new ProcessBuilder(
"ssh", "-o", "StrictHostKeyChecking=no",
"-R", port + ":localhost:" + localPort,
user + "@" + host
);
pb.redirectErrorStream(true);
Process proc = pb.start();
Thread reader = new Thread(() -> {
try (BufferedReader br = new BufferedReader(
new InputStreamReader(proc.getInputStream()))) {
br.lines().forEach(line -> log.info("SSH: {}", line));
}
});
reader.setDaemon(true);
reader.start();
private final AtomicBoolean connected = new AtomicBoolean(false);
private final AtomicLong messageIdCounter = new AtomicLong(0);
// WebSocket no es thread-safe
public void send(WebSocketSession session, String message) throws IOException {
synchronized (session) {
session.sendMessage(new TextMessage(message));
}
}
CountDownLatch latch = new CountDownLatch(1);
mqttClient.connect(callbacks.onSuccess(latch::countDown));
if (!latch.await(15, TimeUnit.SECONDS)) {
throw new RuntimeException("MQTT connection timeout at startup");
}
Los usuarios consultan datos en DynamoDB (últimas 48h) o S3/Athena (más de 48h), o ambos si el rango cruza el límite:
if (from.isBefore(cutoff) && to.isAfter(cutoff)) {
var hot = telemetryService.query(gwId, cutoff, to);
var cold = athenaService.query(gwId, from, cutoff);
return hot.thenCombine(cold, (h, c) -> merge(h, c))
.thenApply(ResponseEntity::ok);
} else if (to.isBefore(cutoff)) {
return athenaService.query(gwId, from, to).thenApply(ResponseEntity::ok);
} else {
return telemetryService.query(gwId, from, to).thenApply(ResponseEntity::ok);
}
El ArchiveService necesita escribir Parquet a S3 sin usar /tmp. Se implementó ByteArrayOutputFile que implementa la interfaz OutputFile de Parquet pero escribe a un ByteArrayOutputStream:
public class ByteArrayOutputFile implements OutputFile {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@Override
public PositionOutputStream create(long blockSizeHint) {
return new ByteArrayPositionOutputStream(baos);
}
public byte[] toByteArray() { return baos.toByteArray(); }
}
ByteArrayOutputFile output = new ByteArrayOutputFile();
writeParquet(records, output);
s3Client.putObject(req, RequestBody.fromBytes(output.toByteArray()));
Parquet requiere hadoop-common pero el JAR completo son 500+ MB con dependencias que rompen en un VPS de 415 MB de RAM. Solución: 14 exclusiones en build.gradle:
implementation("org.apache.hadoop:hadoop-common:3.4.1") {
exclude group: "org.apache.hadoop", module: "hadoop-auth"
exclude group: "io.netty", module: "netty-all"
exclude group: "org.eclipse.jetty"
exclude group: "com.sun.jersey"
exclude group: "org.apache.kerby"
// ... 9 más
}
constraints {
implementation("com.google.guava:guava:33.4.0-jre") // CVE-2023-2976
implementation("org.apache.commons:commons-compress:1.27.1") // CVE-2024-25710
}
El python-matter-server responde al start_listening con el dump completo de todos los nodos (puede superar 10 MB). Buffer default de Tomcat: 8 KB → CloseStatus 1009 "message too big".
@PostConstruct
public void init() {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.setDefaultMaxTextMessageBufferSize(10 * 1024 * 1024);
connectWithBackoff();
}
private void connectWithBackoff() {
int delay = BASE_DELAY;
while (!connected.get()) {
try {
session = container.connectToServer(this, new URI(wsUrl));
return;
} catch (Exception e) {
int jitter = (int)(delay * 0.2 * Math.random());
Thread.sleep(delay + jitter);
delay = Math.min(delay * 2, MAX_DELAY); // 5s → 10s → 20s → ... → 300s
}
}
}
// Watchdog: detecta TCP drops sin FIN
@Scheduled(fixedDelay = 300_000)
public void checkConnection() {
if (!session.isOpen()) reconnect();
}
fd00:bbbb::3a y fd00:bbbb:0:0:0:0:0:3a son el mismo nodo pero se tratan como keys diferentes en los hooks:
String normalized = InetAddress.getByName(rawAddress).getHostAddress();
Los gateways más viejos tenían credenciales en formato EDN. El gateway Java debía soportar ambos formatos sin incluir la librería Clojure:
public Credentials loadCreds(String content) {
String trimmed = content.stripLeading();
return trimmed.startsWith("{")
? parseJson(trimmed)
: parseEdn(trimmed);
}
private String extractEdnValue(String content, String key) {
Pattern p = Pattern.compile(":" + key + "\\s+\"((?:[^\"\\\\]|\\\\.)*)\"");
Matcher m = p.matcher(content);
return m.find() ? m.group(1).replace("\\\"", "\"").replace("\\n", "\n") : null;
}
El JWT no puede contener la lista de gateways (se agregan post-login y el token no se refresca). Cache Redis descartado (VPS de 415 MB). Solución: un JOIN FETCH por request:
@Query("SELECT u FROM User u JOIN FETCH u.gateways WHERE u.username = :username")
Optional<User> findByUsernameWithGateways(String username);
boolean owns = user.getGateways().stream()
.anyMatch(gw -> gw.getId().equals(gwId));
if (!owns) response.sendError(403); else chain.doFilter(request, response);
Punto de entrada único para todos los comandos REST y MQTT. Los controllers de protocolo (ZWave, Zigbee, Matter) no se exponen directamente. El facade maneja validación, dispatch al protocolo correcto según device.protocol, y normalización de respuesta a formato común.
Request de telemetría
↓
¿from/to cruza las 48h?
├── Sí → lanzar DynamoDB + Athena en paralelo → merge
├── to < 48h → solo Athena (cold)
└── from > 48h → solo DynamoDB (hot)
Z-Wave unsolicited UDP → ZWaveReportHandler → DeviceService.update() → TelemetryBuffer
Zigbee Z3Gateway stdout → ZigbeeReportHandler → DeviceService.update() → TelemetryBuffer
Matter WebSocket event → MatterReportHandler → DeviceService.update() → TelemetryBuffer
↓
MQTT publish cada 60s (batch, cap 500)
DeviceDescriptorService carga templates JSON de dispositivos:
/root/devices/) → overrides del operador/resources/devices/) → defaults bundleadosauthorized_keys con restrict,port-forwarding,permitlisten="0.0.0.0:PORT"shell=/bin/false, home dir solo con .ssh/authorized_keys