Java — Experiencia Técnica

EcosistemaLibrerías utilizadas

Problemas resueltos en producción

ProblemaTécnica
Telemetría hot+cold transparentethenCombine con routing por rango de fechas
Hadoop en 10 MB (sin Kerberos/Netty/Jetty)14 exclusiones en Gradle + CVE constraints
Parquet en memoria sin disco temporalByteArrayOutputFile implementando interfaz OutputFile
WebSocket Matter 10 MB sin CloseStatus 1009Buffer override + exponential backoff + watchdog timer
Normalización IPv6 para Z-WaveInetAddress.getByName() como normalizador
Parsing EDN sin librería ClojureAuto-detección JSON/EDN + parser regex mínimo
Ownership sin N+1 ni JWT bloatJOIN FETCH en una sola query por request

Lenguaje

TemaNivel
CompletableFuturethenCombine, orTimeout, allOf, request/response MQTT sin reactive
Hook pattern con ConcurrentHashMapCorrelación async con predicados y cleanup automático
Streams y functional programmingfilter, map, toList(), reduce para bitmasks, pipelines
Annotations Spring@Async, @Scheduled, @ConditionalOnProperty, @Transactional
Thread safetyAtomicBoolean, ConcurrentHashMap, synchronized, CountDownLatch
Switch expressions (Java 14+)Dispatch exhaustivo por protocolo, enum, estado Athena
ProcessBuilderSubprocesos sin shell — sin injection
NIO y permisos POSIXEscritura atómica ATOMIC_MOVE, PosixFilePermission
Records (Java 14+)Value types inmutables para DTOs e internos
Pattern matching instanceof (Java 16+)Type-safe casting sin cast explícito
Generics y TypeReferenceDeserialización JSON a tipos genéricos sin unchecked warnings
Enums con comportamientoSwitch expression dentro del enum
try-with-resourcesParquet writer, streams, conexiones

Patrones de diseño

PatrónImplementación
FacadeGatewayApiService — punto único para todos los protocolos
Event-Driven ReportHandlersZ-Wave/Zigbee/Matter → DeviceService → TelemetryBuffer
Multi-Tier StorageDynamoDB (hot 48h) + S3/Athena (cold) con routing transparente
Factory con Fallback ClasspathDevice descriptors: filesystem override → classpath default
Seguridad en profundidadFirewall + OpenSSH restrict + usuario restringido

Ver Arquitectura del sistema para diagramas de capas y flujo de comandos.


Librerías Utilizadas

Framework Base

LibreríaVersiónUso
Spring Boot4.0.2Framework principal — DI, REST, scheduling, lifecycle
Spring Data JPA(BOM)ORM sobre Hibernate — repos auto-generados
Spring Security(BOM)Filtros de auth, JWT, rutas públicas/protegidas
Spring WebSocket(BOM)Cliente WebSocket para Matter server (gateway-side)
Hibernate ORM7.x (BOM)JPA provider; PostgreSQL (cloud) y SQLite (gateway)
Lombok1.18.36Genera getters/setters/@Slf4j en compile-time
SpringDoc OpenAPI3.0.1Swagger UI + OpenAPI 3.0 autodocs

Persistencia

LibreríaVersiónDóndeUso
PostgreSQL JDBC(latest)cloud-sideBase de datos relacional para users/gateways/tunnels
HikariCP(BOM)cloud-sideConnection pooling (bundled con Spring Data JPA)
Flyway1.xcloud-sideMigraciones de schema versionadas (V1–V3 SQL)
SQLite JDBC3.47.2.0gateway-sideBD embebida para device state (max 1 conexión)

AWS SDK v2

LibreríaVersiónUso
AWS IoT Device SDK1.30.0MQTT5 con mTLS (gateway) y WebSocket+SigV4 (cloud)
aws-crt0.38.13Bindings C nativos para el stack de red AWS
DynamoDB2.41.23Telemetría hot (48h TTL) — cliente async
S32.41.23Archivo frío de telemetría en Parquet
Athena2.41.23SQL sobre datos fríos en S3 — cliente async
Lightsail2.41.23Gestión de firewall para túneles SSH
STS2.33.10Credentials temporales

Datos / Serialización

LibreríaVersiónUso
Jackson2.18.2JSON principal — TypeReference<> para deserialización polimórfica
JSON.org20240303Parsing alternativo en gateway-side
Apache Parquet1.15.1Formato columnar para archivo frío en S3
Apache Avro1.12.0Schema para los archivos Parquet
Apache Hadoop3.4.1Solo hadoop-common — 14 exclusiones de deps para minimizar footprint

Seguridad

LibreríaVersiónUso
JJWT0.12.6Generación y validación de tokens JWT (HS256 en gateway, RS256 en cloud)
jjwt-api / jjwt-impl / jjwt-jackson0.12.6Las tres partes del stack JJWT

Protocolo IoT (gateway-side)

LibreríaFuenteUso
zipgateway-codecJARs locales en libs/Frames binarios Z-Wave (Z/IP protocol)
zipparser-javaJARs locales en libs/Parser del protocolo ZIP para Z-Wave

Testing

LibreríaUso
JUnit 5Tests unitarios e integración
MockitoMocking de servicios y repos
AssertJAssertions fluidas
TestcontainersPostgreSQL real en contenedor para tests de integración
MockMvcTests de controllers HTTP

Cobertura del Lenguaje Java

CompletableFuture — Async sin Reactive

El mecanismo de concurrencia central en ambos proyectos. Sin RxJava, sin WebFlux.

public CompletableFuture<Map> sendAsync(String gwId, Map command) {
    String correlationId = UUID.randomUUID().toString();
    CompletableFuture<String> future = new CompletableFuture<>();
    pending.put(correlationId, future);  // registrar ANTES de publicar
    publishMqtt(gwId, correlationId, command);
    return future
        .orTimeout(30, TimeUnit.SECONDS)
        .thenApply(json -> jackson.readValue(json, Map.class))
        .exceptionally(ex -> Map.of("error", ex.getMessage()))
        .whenComplete((r, ex) -> pending.remove(correlationId));
}

// Merge paralelo hot+cold
CompletableFuture<List> hot  = dynamoService.query(gwId, from, to);
CompletableFuture<List> cold = athenaService.query(gwId, from, to);
return hot.thenCombine(cold, (h, c) -> merge(h, c));

// Múltiples gateways en paralelo
List<CompletableFuture<GwSummary>> futures = gateways.stream()
    .map(gw -> CompletableFuture.supplyAsync(() -> summarize(gw)))
    .toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

Hook Pattern con ConcurrentHashMap

Correlación request/response async — el mismo patrón del hook de Clojure, implementado con CompletableFuture:

private final ConcurrentHashMap<String, CompletableFuture<String>> pending = new ConcurrentHashMap<>();

public CompletableFuture<String> register(String id) {
    var future = new CompletableFuture<String>();
    pending.put(id, future);
    return future;
}

public void complete(String id, String payload) {
    var future = pending.remove(id);
    if (future != null) future.complete(payload);
}

// Cleanup fallback cada 60s (futures ya expiradas por orTimeout)
@Scheduled(fixedDelay = 60_000)
public void cleanupExpired() {
    pending.entrySet().removeIf(e -> e.getValue().isDone());
}

// gateway-side: misma idea con Predicate para matching por contenido
record PendingHook(Predicate<Map<String, Object>> matcher, CompletableFuture<Map<String, Object>> future) {}
private final ConcurrentHashMap<String, PendingHook> hooks = new ConcurrentHashMap<>();

Records (Java 14+)

record LoginRequest(String username, String password) {}
record LoginResponse(String token, List<String> gateways) {}
record RegisterRequest(String username, String password, String adminKey) {}

record PendingHook(Predicate<Map<String, Object>> matcher, CompletableFuture<Map<String, Object>> future) {}
record ShellResult(int exitCode, String stdout, String stderr) {}
record AggregatedPoint(String timestamp, double value) {}

Switch Expressions (Java 14+)

yield switch (dev.getProtocol()) {
    case "zwave"  -> zwaveController.parseDevice(dev);
    case "zigbee" -> zigbeeController.parseDevice(dev);
    case "matter" -> matterController.parseDevice(dev);
    default       -> Map.of("id", dev.getId(), "protocol", dev.getProtocol());
};

double result = switch (fn) {
    case AVG   -> values.stream().mapToDouble(Double::doubleValue).average().orElse(0);
    case MIN   -> values.stream().mapToDouble(Double::doubleValue).min().orElse(0);
    case MAX   -> values.stream().mapToDouble(Double::doubleValue).max().orElse(0);
    case SUM   -> values.stream().mapToDouble(Double::doubleValue).sum();
    case COUNT -> values.size();
};

switch (status.state()) {
    case SUCCEEDED -> future.complete(results);
    case FAILED, CANCELLED -> future.completeExceptionally(new RuntimeException(reason));
    default -> scheduleNextCheck();
}

Pattern Matching instanceof (Java 16+)

if (payload instanceof Map<?, ?> m && m.containsKey("nodeId")) {
    processNode((String) m.get("nodeId"));
}

if (event.get("event_type") instanceof String type && type.equals("node_added")) {
    handleNodeAdded(event);
}

Streams y Functional Programming

List<Device> locks = deviceService.findAll().stream()
    .filter(d -> d.getType().equals("lock"))
    .filter(d -> d.getProtocol().equals("zwave"))
    .sorted(Comparator.comparing(Device::getName))
    .toList();

boolean owns = user.getGateways().stream()
    .anyMatch(gw -> gw.getId().equals(requestedGwId));

// Reduce para bitmask de días de la semana
int bitmask = days.stream()
    .mapToInt(day -> 1 << DAY_ORDER.indexOf(day))
    .reduce(0, (a, b) -> a | b);

List<SshTunnel> tunnels = Arrays.stream(lsofOutput.split("\n"))
    .skip(1)
    .map(this::parseLsofLine)
    .filter(t -> t.proc().equals("sshd"))
    .filter(t -> t.port() >= PORT_POOL_START)
    .toList();

Annotations del Ecosistema Spring

// Lifecycle
@PostConstruct   // init del cliente MQTT
@PreDestroy      // cleanup de executors

// Concurrencia
@Async("gatewayExecutor")               // thread pool dedicado (core=10, max=20, queue=50)
@Scheduled(cron = "0 0 2 * * *")       // daily a las 2AM UTC
@Scheduled(fixedDelay = 60_000)

// Conditional beans
@ConditionalOnProperty(name = "athena.enabled", havingValue = "true")
@ConditionalOnProperty(name = "archive.enabled", havingValue = "true")

// Security
@AuthenticationPrincipal UserDetails user

// JPA
@Transactional(readOnly = true)
@Column(columnDefinition = "TEXT")      // JSON columns en SQLite

// Testing
@Nested @DisplayName("cuando el gateway no existe")
@MockitoBean                            // Spring Boot 3.4+ reemplaza @MockBean

NIO y Manejo de Archivos

// Escritura atómica de authorized_keys (evita race condition con sshd)
Path tmp = Files.createTempFile(parent, ".authorized_keys", ".tmp");
Files.writeString(tmp, newContent);
Files.move(tmp, target, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);

// Permisos POSIX para private key (0400)
Path keyFile = Files.createFile(path,
    PosixFilePermissions.asFileAttribute(
        Set.of(PosixFilePermission.OWNER_READ)));

try {
    setPosixPermissions(keyFile);
} catch (UnsupportedOperationException e) {
    log.warn("POSIX permissions not supported on this OS");
}

Generics y TypeReference

Map<String, Object> payload = jackson.readValue(
    json,
    new TypeReference<Map<String, Object>>() {}
);

ConcurrentHashMap<String, ConcurrentHashMap<String, PendingHook>> hooks =
    new ConcurrentHashMap<>();

Enums con Comportamiento

public enum AggregationFn {
    AVG, MIN, MAX, SUM, COUNT;

    public double apply(List<Double> values) {
        return switch (this) {
            case AVG   -> values.stream().mapToDouble(d -> d).average().orElse(0);
            case MIN   -> values.stream().mapToDouble(d -> d).min().orElse(0);
            case MAX   -> values.stream().mapToDouble(d -> d).max().orElse(0);
            case SUM   -> values.stream().mapToDouble(d -> d).sum();
            case COUNT -> values.size();
        };
    }
}

@Enumerated(EnumType.STRING)
private GatewayStatus status;

try-with-resources

try (ParquetWriter<GenericRecord> writer = AvroParquetWriter
        .<GenericRecord>builder(outputFile)
        .withSchema(SCHEMA)
        .withCompressionCodec(CompressionCodecName.SNAPPY)
        .build()) {
    for (TelemetryRecord record : records) {
        writer.write(toAvroRecord(record));
    }
}

ProcessBuilder (sin shell)

// Sin shell → sin injection
ProcessBuilder pb = new ProcessBuilder(
    "ssh", "-o", "StrictHostKeyChecking=no",
    "-R", port + ":localhost:" + localPort,
    user + "@" + host
);
pb.redirectErrorStream(true);
Process proc = pb.start();

Thread reader = new Thread(() -> {
    try (BufferedReader br = new BufferedReader(
            new InputStreamReader(proc.getInputStream()))) {
        br.lines().forEach(line -> log.info("SSH: {}", line));
    }
});
reader.setDaemon(true);
reader.start();

Thread Safety Explícito

private final AtomicBoolean connected = new AtomicBoolean(false);
private final AtomicLong messageIdCounter = new AtomicLong(0);

// WebSocket no es thread-safe
public void send(WebSocketSession session, String message) throws IOException {
    synchronized (session) {
        session.sendMessage(new TextMessage(message));
    }
}

CountDownLatch latch = new CountDownLatch(1);
mqttClient.connect(callbacks.onSuccess(latch::countDown));
if (!latch.await(15, TimeUnit.SECONDS)) {
    throw new RuntimeException("MQTT connection timeout at startup");
}

Problemas Técnicos Difíciles Resueltos

1. Telemetría Hot+Cold Transparente

Los usuarios consultan datos en DynamoDB (últimas 48h) o S3/Athena (más de 48h), o ambos si el rango cruza el límite:

if (from.isBefore(cutoff) && to.isAfter(cutoff)) {
    var hot  = telemetryService.query(gwId, cutoff, to);
    var cold = athenaService.query(gwId, from, cutoff);
    return hot.thenCombine(cold, (h, c) -> merge(h, c))
              .thenApply(ResponseEntity::ok);
} else if (to.isBefore(cutoff)) {
    return athenaService.query(gwId, from, to).thenApply(ResponseEntity::ok);
} else {
    return telemetryService.query(gwId, from, to).thenApply(ResponseEntity::ok);
}

3. Parquet en Memoria sin Disco Temporal

El ArchiveService necesita escribir Parquet a S3 sin usar /tmp. Se implementó ByteArrayOutputFile que implementa la interfaz OutputFile de Parquet pero escribe a un ByteArrayOutputStream:

public class ByteArrayOutputFile implements OutputFile {
    private final ByteArrayOutputStream baos = new ByteArrayOutputStream();

    @Override
    public PositionOutputStream create(long blockSizeHint) {
        return new ByteArrayPositionOutputStream(baos);
    }

    public byte[] toByteArray() { return baos.toByteArray(); }
}

ByteArrayOutputFile output = new ByteArrayOutputFile();
writeParquet(records, output);
s3Client.putObject(req, RequestBody.fromBytes(output.toByteArray()));

2. Hadoop en 10 MB (sin Kerberos, sin Netty, sin Jetty)

Parquet requiere hadoop-common pero el JAR completo son 500+ MB con dependencias que rompen en un VPS de 415 MB de RAM. Solución: 14 exclusiones en build.gradle:

implementation("org.apache.hadoop:hadoop-common:3.4.1") {
    exclude group: "org.apache.hadoop", module: "hadoop-auth"
    exclude group: "io.netty", module: "netty-all"
    exclude group: "org.eclipse.jetty"
    exclude group: "com.sun.jersey"
    exclude group: "org.apache.kerby"
    // ... 9 más
}
constraints {
    implementation("com.google.guava:guava:33.4.0-jre")          // CVE-2023-2976
    implementation("org.apache.commons:commons-compress:1.27.1") // CVE-2024-25710
}

4. Buffer de 10 MB para WebSocket de Matter

El python-matter-server responde al start_listening con el dump completo de todos los nodos (puede superar 10 MB). Buffer default de Tomcat: 8 KB → CloseStatus 1009 "message too big".

@PostConstruct
public void init() {
    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
    container.setDefaultMaxTextMessageBufferSize(10 * 1024 * 1024);
    connectWithBackoff();
}

private void connectWithBackoff() {
    int delay = BASE_DELAY;
    while (!connected.get()) {
        try {
            session = container.connectToServer(this, new URI(wsUrl));
            return;
        } catch (Exception e) {
            int jitter = (int)(delay * 0.2 * Math.random());
            Thread.sleep(delay + jitter);
            delay = Math.min(delay * 2, MAX_DELAY);  // 5s → 10s → 20s → ... → 300s
        }
    }
}

// Watchdog: detecta TCP drops sin FIN
@Scheduled(fixedDelay = 300_000)
public void checkConnection() {
    if (!session.isOpen()) reconnect();
}

5. Normalización de Direcciones IPv6 para Z-Wave

fd00:bbbb::3a y fd00:bbbb:0:0:0:0:0:3a son el mismo nodo pero se tratan como keys diferentes en los hooks:

String normalized = InetAddress.getByName(rawAddress).getHostAddress();

6. Parsing de Credenciales EDN (sin librería Clojure)

Los gateways más viejos tenían credenciales en formato EDN. El gateway Java debía soportar ambos formatos sin incluir la librería Clojure:

public Credentials loadCreds(String content) {
    String trimmed = content.stripLeading();
    return trimmed.startsWith("{")
        ? parseJson(trimmed)
        : parseEdn(trimmed);
}

private String extractEdnValue(String content, String key) {
    Pattern p = Pattern.compile(":" + key + "\\s+\"((?:[^\"\\\\]|\\\\.)*)\"");
    Matcher m = p.matcher(content);
    return m.find() ? m.group(1).replace("\\\"", "\"").replace("\\n", "\n") : null;
}

7. Gateway Ownership sin N+1 ni JWT bloat

El JWT no puede contener la lista de gateways (se agregan post-login y el token no se refresca). Cache Redis descartado (VPS de 415 MB). Solución: un JOIN FETCH por request:

@Query("SELECT u FROM User u JOIN FETCH u.gateways WHERE u.username = :username")
Optional<User> findByUsernameWithGateways(String username);

boolean owns = user.getGateways().stream()
    .anyMatch(gw -> gw.getId().equals(gwId));
if (!owns) response.sendError(403); else chain.doFilter(request, response);

Patrones de Diseño Usados

Facade (GatewayApiService)

Punto de entrada único para todos los comandos REST y MQTT. Los controllers de protocolo (ZWave, Zigbee, Matter) no se exponen directamente. El facade maneja validación, dispatch al protocolo correcto según device.protocol, y normalización de respuesta a formato común.

Multi-Tier Storage con Routing Transparente

Request de telemetría

¿from/to cruza las 48h?
    ├── Sí → lanzar DynamoDB + Athena en paralelo → merge
    ├── to < 48h → solo Athena (cold)
    └── from > 48h → solo DynamoDB (hot)

Event-Driven con Report Handlers

Z-Wave unsolicited UDP  → ZWaveReportHandler  → DeviceService.update() → TelemetryBuffer
Zigbee Z3Gateway stdout → ZigbeeReportHandler → DeviceService.update() → TelemetryBuffer
Matter WebSocket event  → MatterReportHandler → DeviceService.update() → TelemetryBuffer

                                                      MQTT publish cada 60s (batch, cap 500)

Factory con Fallback Classpath

DeviceDescriptorService carga templates JSON de dispositivos:

  1. Busca en filesystem (/root/devices/) → overrides del operador
  2. Si no encuentra, busca en classpath (/resources/devices/) → defaults bundleados
  3. Cache en memoria por protocolo

Seguridad en Profundidad para SSH

  1. Firewall: puerto abierto en Lightsail solo mientras el túnel está activo
  2. OpenSSH: authorized_keys con restrict,port-forwarding,permitlisten="0.0.0.0:PORT"
  3. Usuario de sistema: shell=/bin/false, home dir solo con .ssh/authorized_keys