Clojure — Experiencia Técnica

EcosistemaLibrerías utilizadas

Problemas resueltos en producción

ProblemaTécnica
Leak de threads nativos AWS CRTInterrupción manual por nombre de thread
SSH tunneling sin port-forwardingReverse tunnel + authorized_keys restringido + challenge DSA
Provisioning JITR asyncSecuencia MQTT multi-step con hook pattern
Protocolo binario Z-WaveBitmasks, endianness, lookup tables EDN
CLI Zigbee en tiempo realgo-loop sobre stdout, parser regex de frames

Lenguaje

TemaNivel
core.async — concurrencia CSPgo-loop, alt!!, sliding-buffer, señalización entre componentes
Hook pattern — request/response sobre pub/subCorrelación MQTT async expuesta como API HTTP sync
Atoms y gestión de estadoEstado mutable, swap!, reset!, transformaciones nested
reify — interfaces Java inlineIntegración con SDKs JVM sin clases separadas
add-watch — persistencia reactivaSide-effects reactivos sin mezclar I/O con lógica
Interop JavaMétodos, constructores, threads, ProcessBuilder
Lazy sequences y transformaciones funcionalesPipelines con ->> , filter, map, reduce
Macros y DSLsdefjob, defroutes, middleware chain con ->
EDN como base de datosPersistencia con atom + spit/slurp
DestructuringArgs, let, doseq, regex groups, nested
Var quoting para hot-reload#‘var para recargar handler sin reiniciar servidor

Ver Arquitectura del sistema para el contexto completo.


Librerías Utilizadas

Web / HTTP

LibreríaVersiónUso
http-kit2.8.1Servidor HTTP async, pool de 16 threads configurable, queue size 20480
Compojure1.7.2DSL de routing sobre Ring
Ring1.15.3Abstracción HTTP estándar, middleware chain
ring-json0.5.1Middleware para parsear y serializar JSON automáticamente
ring-cors0.1.13CORS headers en middleware
ring-codec1.3.0Codecs para query params

AWS / Cloud

LibreríaVersiónUso
aws-iot-device-sdk1.27.4MQTT5 sobre WebSocket con SigV4/mTLS
aws-crt0.38.13AWS Common Runtime — bindings a librería C nativa para el stack de red
aws-sts2.33.10Credentials temporales
Faraday1.12.3Cliente DynamoDB (consulta estado de gateways online)

Async / Concurrencia

LibreríaVersiónUso
core.async1.8.741Channels, go-loops, alt!!, timeout — motor de concurrencia principal

Datos / Serialización

LibreríaVersiónUso
data.json2.5.1Serialización JSON pura Clojure
Jackson databind2.16.1Serialización Java para interop con SDKs

Seguridad / Auth

LibreríaVersiónUso
buddy-core1.12.0DSA signing para challenge/response en túneles SSH
jbcrypt0.4Hashing de passwords de usuarios

Scheduling / Jobs

LibreríaVersiónUso
Quartzite2.1.0Jobs cron-style con Quartz JVM (polling de dispositivos)

Protocolo IoT

LibreríaVersiónUso
zipgateway-codec2.0.0SDK propietario SilLabs — frames binarios Z-Wave
zipparser1.0.0-SNAPSHOTParser del protocolo ZIP para Z-Wave

Utilidades

LibreríaVersiónUso
Timbre6.8.0Logging estructurado con formatters y appenders configurables en runtime
nREPL1.4.0REPL remoto TCP (puerto 7779) para debugging en producción
commons-lang2.6String utilities del ecosistema Java

Cobertura del Lenguaje Clojure

Atoms y Gestión de Estado

El estado mutable del sistema vive exclusivamente en atoms. No se usan refs ni agents.

(def db (atom {}))
(def hooks (atom {}))
(def mqtt-client (atom nil))

@db
(deref hooks)

(reset! mqtt-client new-client)

(swap! db assoc-in [:devices device-id] device-data)
(swap! db update :count inc)

(swap! hooks assoc-in [gw-id packet-id] {:matches filter :chan ch})
(swap! hooks #(update-in % [gw-id] dissoc packet-id))

Dónde se usa: base de datos de dispositivos, registro de hooks MQTT, pool de puertos SSH, estado de conexión MQTT, store de tokens de auth.

add-watch — Persistencia Reactiva

Se usa add-watch para disparar side-effects en cada cambio del atom de dispositivos, sin mezclar I/O con la lógica de transformación:

(add-watch db :persist
  (fn [_key _ref _old-val new-val]
    (spit "devices.db" (pr-str new-val))))

Equivalente a un “computed property” reactivo pero para I/O. Evita el patrón imperativo de llamar a save! manualmente después de cada mutación.

core.async — Concurrencia por Canales (CSP)

El motor de toda la comunicación async entre componentes.

go-loop — Listeners de larga duración

(go-loop []
  (when-let [line (<! input-chan)]
    (dispatch-frame (parse-frame line))
    (recur)))

(go-loop []
  (let [frame (receive-udp-frame!)]
    (handle-frame frame)
    (recur)))

alt!! — Race entre respuesta y timeout

El patrón más importante del sistema. Convierte operaciones async en sync para la API HTTP:

(defn send-command-wait [gw command filter]
  (let [ch (async/chan (async/sliding-buffer 10))
        id (register-hook! gw ch filter)]  ; registrar ANTES de enviar
    (publish-mqtt! gw command id)
    (let [result (alt!!
                   ch              ([val] val)
                   (timeout 10000) ([_] {:error :timeout}))]
      (unregister-hook! gw id)
      result)))

Por qué sliding-buffer: si el HTTP handler ya timeouted y descartó el resultado, los mensajes subsiguientes del dispositivo deben descartarse en lugar de bloquear.

Channels para señalización de conexión

(def conn-signal (async/chan (async/sliding-buffer 1)))

(reify Mqtt5ClientOptions$LifecycleEvents
  (onConnectionSuccess [this client ret]
    (>!! conn-signal :connected)))

(alt!!
  conn-signal ([_] (log/info "MQTT conectado"))
  (timeout 15000) ([_] (throw (ex-info "MQTT timeout" {}))))

reify — Implementar Interfaces Java

Permite implementar interfaces Java inline sin crear clases separadas. Fundamental para integrar con SDKs del ecosistema JVM:

(reify Mqtt5ClientOptions$LifecycleEvents
  (onAttemptingConnect [this client] (log/info "Conectando..."))
  (onConnectionSuccess [this client ret]
    (reset! connected? true)
    (>!! conn-signal :connected))
  (onConnectionFailure [this client err]
    (log/error "Falló conexión" (.toString err)))
  (onDisconnection [this client data]
    (reset! connected? false)
    (schedule-reconnect!))
  (onStopped [this client data] (log/info "Detenido")))

(reify Mqtt5ClientOptions$PublishEvents
  (onMessageReceived [this client publish-return]
    (let [topic   (.getTopic (.getPublishPacket publish-return))
          payload (parse-payload publish-return)]
      (dispatch-to-hooks! topic payload))))

Hook Pattern — Request/Response sobre Pub/Sub

El patrón arquitectónico más complejo y reutilizable del proyecto. Resuelve el problema de exponer operaciones async (MQTT fire-and-forget) como síncronas desde una API HTTP.

(def hooks (atom {}))

(defn register-hook! [gw filter]
  (let [id  (str (random-uuid))
        ch  (async/chan (async/sliding-buffer 10))]
    (swap! hooks assoc-in [gw id] {:matches filter :chan ch})
    [id ch]))

(defn dispatch-to-hooks! [gw event]
  (doseq [[id {:keys [matches chan]}] (get @hooks gw)]
    (when (matches-filter? matches event)
      (>!! chan event))))

;; Usa clojure.data/diff — si la diferencia izquierda es nil, el filtro matchea
(defn matches-filter? [filter data]
  (let [[only-in-filter _ _] (data/diff filter data)]
    (nil? only-in-filter)))

(defn send-and-wait [gw command expected-event-filter]
  (let [[id ch] (register-hook! gw expected-event-filter)]
    (publish-mqtt! gw (assoc command :packet_id id))
    (let [result (alt!!
                   ch           ([v] v)
                   (timeout 10000) ([_] {:error :timeout}))]
      (swap! hooks #(update-in % [gw] dissoc id))
      result)))

Lazy Sequences y Transformaciones Funcionales

(->> (vals (:devices @db))
     (filter #(= (:protocol %) "zwave"))
     (map #(select-keys % [:id :name :type]))
     (sort-by :name))

(->> (shell/sh "bash" "-c" "lsof -i TCP -n -P")
     :out
     str/split-lines
     (map parse-lsof-line)
     (filter #(= (:proc %) "sshd")))

;; Convierte lista de días a bitmask
(reduce
  (fn [acc day]
    (assoc acc (.indexOf days-array day) 1))
  [0 0 0 0 0 0 0]
  ["monday" "wednesday"])
;; => [0 1 0 1 0 0 0]

Interop Java

(.getTopic publish-packet)
(.build builder)
(.toString exception)

(ZipGatewayCodec/ZipFrameBuilder.)

(import '(software.amazon.awssdk.iot.mqtt5
          Mqtt5Client
          Mqtt5ClientOptions$LifecycleEvents))

;; El SDK usa threads C nativos que hay que interrumpir manualmente
(->> (Thread/getAllStackTraces)
     keys
     (filter #(re-matches #"AwsEventLoop(.*)" (.getName %)))
     (map #(.interrupt %))
     doall)

(let [pb (ProcessBuilder. ["Z3Gateway" "-n" "1" "-p" "/dev/ttyUSB1"])]
  (.redirectErrorStream pb true)
  (let [proc   (.start pb)
        reader (io/reader (.getInputStream proc))]
    (go-loop []
      (when-let [line (.readLine reader)]
        (handle-line line)
        (recur)))))

Macros y DSLs

(defjob PollDevicesJob [ctx]
  (poll-all-devices!))

(defroutes app-routes
  (POST "/login" req (handle-login req))
  (GET  "/:gw/devices" [gw] (list-devices gw))
  (PUT  "/:gw/devices/:id/lock" [gw id :as req] (lock-device gw id req))
  (route/not-found {:error "not found"}))

(def app
  (-> app-routes
      (wrap-json-body {:keywords? true})
      (wrap-json-response)
      (wrap-cors :access-control-allow-origin #".*")
      (wrap-params)))

Destructuring

(defn handle-connection [{:keys [gw-id protocol cert-path]}] ...)

(let [{:keys [proc user port open?]} connection-info] ...)

(let [{{:keys [type payload]} :data} event] ...)

(doseq [[id hook] (get @hooks gw-id)]
  (when (matches-filter? (:matches hook) event)
    (>!! (:chan hook) event)))

(let [[_ gw-id] (re-matches #"mqtt/hubapp/gateway_(.*)/event/v1" topic)]
  (when gw-id (dispatch-event! gw-id payload)))

Var Quoting para Hot-Reload

;; #'app pasa la var, no el valor — permite recargar el handler sin reiniciar el servidor
(server/run-server #'app {:port 9096 :thread 16})

EDN como Base de Datos

(spit "devices.db" (pr-str @db))

(reset! db (read-string (slurp "devices.db")))

(def config (read-string (slurp "gateway.conf")))

Problemas Técnicos Resueltos

1. Leak de Threads Nativos (AWS CRT)

El SDK de AWS IoT usa una librería C nativa para el stack de red. Al reconectar el cliente MQTT, los threads del event loop nativo no se limpian solos, acumulándose hasta crashear el proceso.

Solución: antes de cada reconexión, se interrumpen manualmente los threads filtrando por nombre:

(defn kill-native-threads! []
  (->> (Thread/getAllStackTraces)
       keys
       (filter #(re-matches #"AwsEventLoop(.*)" (.getName %)))
       (map #(.interrupt %))
       doall))

Thread/interrupt es la única forma de señalizar threads bloqueados en I/O nativo desde el lado JVM.

2. SSH Tunneling Seguro sin Abrir Puertos del Router

Los gateways están en redes domésticas con NAT. Para acceso remoto sin configurar port-forwarding:

  1. Gateway recibe notificación via MQTT: $aws/things/{name}/tunnels/notify
  2. Proxy crea usuario de sistema iot-<gw-id> con shell=/bin/false
  3. Proxy escribe authorized_keys con restricciones duras:
    no-pty,no-X11-forwarding,permitopen="localhost:9123",command="/bin/echo" ssh-rsa [KEY]
  4. Abre el puerto en AWS Lightsail firewall via CLI
  5. Gateway spawnea localproxy de AWS para conectar el túnel
  6. Proxy monitorea la conexión con challenge/response DSA firmado

3. Provisioning JITR (Just-In-Time Registration)

El gateway llega a campo sin certificados permanentes. Flow completamente async vía MQTT:

  1. Conectar con certificado efímero de bootstrapping
  2. Publicar en $aws/certificates/create/json
  3. AWS responde con certificado nuevo en topic de respuesta
  4. Guardar a disco, desconectar, reconectar con cert permanente
  5. Publicar en $aws/provisioning-templates/my-fleet/provision/json
  6. AWS registra el “thing” y responde con configuración final

Cada paso es un exchange MQTT distinto, con el hook pattern manejando la correlación.

4. Parsing de Protocolo Binario Z-Wave

Z-Wave habla UDP a localhost con frames binarios propietarios:

  • Bitmask operations para listas de nodos: (bit-test bitmask (dec node-id))
  • Little-endian ↔ big-endian: (reverse (partition 2 hex-str))
  • Lookup tables EDN para command classes
  • Secuencias multi-step: GetPINCode → GetWeekdaySchedule → GetYearDaySchedule para provisionar cerraduras

5. Parsing de CLI Zigbee en Tiempo Real

El Z3Gateway no tiene API binaria — se interactúa vía stdin/stdout de un proceso hijo. Se implementó un parser con regex sobre el stream de líneas:

Formato de frame: "$len,dep,sep,clus,nodeid,cmd:payload[...]#"

Un go-loop lee line-seq del proceso, parsea con regex, y despacha a los hooks correspondientes.


Arquitectura: Gateway vs Proxy

AspectoGateway (fw)Proxy
Corre enHardware embebido LinuxServidor cloud
Protocolos físicosZ-Wave (UDP binario), Zigbee (CLI)Solo MQTT5 WebSocket
EstadoAtom + EDN a discoSolo en memoria
AuthSin auth (red local)BCrypt passwords + tokens con expiración
AWSIoT Device SDK (mTLS)IoT SDK (SigV4) + DynamoDB + Lightsail
MQTT authCertificados mTLSIAM Access Key + SigV4
Timeout MQTT30 segundos10 segundos
Buffer channelsSin buffer especificadosliding-buffer 10

Los dos comparten el mismo hook pattern para correlación MQTT — surgió en el gateway y fue evolucionado en el proxy con filtros más complejos (data/diff) y sliding buffers.