Ecosistema — Librerías utilizadas
Problemas resueltos en producción
| Problema | Técnica |
|---|---|
| Leak de threads nativos AWS CRT | Interrupción manual por nombre de thread |
| SSH tunneling sin port-forwarding | Reverse tunnel + authorized_keys restringido + challenge DSA |
| Provisioning JITR async | Secuencia MQTT multi-step con hook pattern |
| Protocolo binario Z-Wave | Bitmasks, endianness, lookup tables EDN |
| CLI Zigbee en tiempo real | go-loop sobre stdout, parser regex de frames |
Lenguaje
| Tema | Nivel |
|---|---|
| core.async — concurrencia CSP | go-loop, alt!!, sliding-buffer, señalización entre componentes |
| Hook pattern — request/response sobre pub/sub | Correlación MQTT async expuesta como API HTTP sync |
| Atoms y gestión de estado | Estado mutable, swap!, reset!, transformaciones nested |
| reify — interfaces Java inline | Integración con SDKs JVM sin clases separadas |
| add-watch — persistencia reactiva | Side-effects reactivos sin mezclar I/O con lógica |
| Interop Java | Métodos, constructores, threads, ProcessBuilder |
| Lazy sequences y transformaciones funcionales | Pipelines con ->> , filter, map, reduce |
| Macros y DSLs | defjob, defroutes, middleware chain con -> |
| EDN como base de datos | Persistencia con atom + spit/slurp |
| Destructuring | Args, let, doseq, regex groups, nested |
| Var quoting para hot-reload | #‘var para recargar handler sin reiniciar servidor |
Ver Arquitectura del sistema para el contexto completo.
| Librería | Versión | Uso |
|---|---|---|
| http-kit | 2.8.1 | Servidor HTTP async, pool de 16 threads configurable, queue size 20480 |
| Compojure | 1.7.2 | DSL de routing sobre Ring |
| Ring | 1.15.3 | Abstracción HTTP estándar, middleware chain |
| ring-json | 0.5.1 | Middleware para parsear y serializar JSON automáticamente |
| ring-cors | 0.1.13 | CORS headers en middleware |
| ring-codec | 1.3.0 | Codecs para query params |
| Librería | Versión | Uso |
|---|---|---|
| aws-iot-device-sdk | 1.27.4 | MQTT5 sobre WebSocket con SigV4/mTLS |
| aws-crt | 0.38.13 | AWS Common Runtime — bindings a librería C nativa para el stack de red |
| aws-sts | 2.33.10 | Credentials temporales |
| Faraday | 1.12.3 | Cliente DynamoDB (consulta estado de gateways online) |
| Librería | Versión | Uso |
|---|---|---|
| core.async | 1.8.741 | Channels, go-loops, alt!!, timeout — motor de concurrencia principal |
| Librería | Versión | Uso |
|---|---|---|
| data.json | 2.5.1 | Serialización JSON pura Clojure |
| Jackson databind | 2.16.1 | Serialización Java para interop con SDKs |
| Librería | Versión | Uso |
|---|---|---|
| buddy-core | 1.12.0 | DSA signing para challenge/response en túneles SSH |
| jbcrypt | 0.4 | Hashing de passwords de usuarios |
| Librería | Versión | Uso |
|---|---|---|
| Quartzite | 2.1.0 | Jobs cron-style con Quartz JVM (polling de dispositivos) |
| Librería | Versión | Uso |
|---|---|---|
| zipgateway-codec | 2.0.0 | SDK propietario SilLabs — frames binarios Z-Wave |
| zipparser | 1.0.0-SNAPSHOT | Parser del protocolo ZIP para Z-Wave |
| Librería | Versión | Uso |
|---|---|---|
| Timbre | 6.8.0 | Logging estructurado con formatters y appenders configurables en runtime |
| nREPL | 1.4.0 | REPL remoto TCP (puerto 7779) para debugging en producción |
| commons-lang | 2.6 | String utilities del ecosistema Java |
El estado mutable del sistema vive exclusivamente en atoms. No se usan refs ni agents.
(def db (atom {}))
(def hooks (atom {}))
(def mqtt-client (atom nil))
@db
(deref hooks)
(reset! mqtt-client new-client)
(swap! db assoc-in [:devices device-id] device-data)
(swap! db update :count inc)
(swap! hooks assoc-in [gw-id packet-id] {:matches filter :chan ch})
(swap! hooks #(update-in % [gw-id] dissoc packet-id))
Dónde se usa: base de datos de dispositivos, registro de hooks MQTT, pool de puertos SSH, estado de conexión MQTT, store de tokens de auth.
Se usa add-watch para disparar side-effects en cada cambio del atom de dispositivos, sin mezclar I/O con la lógica de transformación:
(add-watch db :persist
(fn [_key _ref _old-val new-val]
(spit "devices.db" (pr-str new-val))))
Equivalente a un “computed property” reactivo pero para I/O. Evita el patrón imperativo de llamar a save! manualmente después de cada mutación.
El motor de toda la comunicación async entre componentes.
(go-loop []
(when-let [line (<! input-chan)]
(dispatch-frame (parse-frame line))
(recur)))
(go-loop []
(let [frame (receive-udp-frame!)]
(handle-frame frame)
(recur)))
El patrón más importante del sistema. Convierte operaciones async en sync para la API HTTP:
(defn send-command-wait [gw command filter]
(let [ch (async/chan (async/sliding-buffer 10))
id (register-hook! gw ch filter)] ; registrar ANTES de enviar
(publish-mqtt! gw command id)
(let [result (alt!!
ch ([val] val)
(timeout 10000) ([_] {:error :timeout}))]
(unregister-hook! gw id)
result)))
Por qué sliding-buffer: si el HTTP handler ya timeouted y descartó el resultado, los mensajes subsiguientes del dispositivo deben descartarse en lugar de bloquear.
(def conn-signal (async/chan (async/sliding-buffer 1)))
(reify Mqtt5ClientOptions$LifecycleEvents
(onConnectionSuccess [this client ret]
(>!! conn-signal :connected)))
(alt!!
conn-signal ([_] (log/info "MQTT conectado"))
(timeout 15000) ([_] (throw (ex-info "MQTT timeout" {}))))
Permite implementar interfaces Java inline sin crear clases separadas. Fundamental para integrar con SDKs del ecosistema JVM:
(reify Mqtt5ClientOptions$LifecycleEvents
(onAttemptingConnect [this client] (log/info "Conectando..."))
(onConnectionSuccess [this client ret]
(reset! connected? true)
(>!! conn-signal :connected))
(onConnectionFailure [this client err]
(log/error "Falló conexión" (.toString err)))
(onDisconnection [this client data]
(reset! connected? false)
(schedule-reconnect!))
(onStopped [this client data] (log/info "Detenido")))
(reify Mqtt5ClientOptions$PublishEvents
(onMessageReceived [this client publish-return]
(let [topic (.getTopic (.getPublishPacket publish-return))
payload (parse-payload publish-return)]
(dispatch-to-hooks! topic payload))))
El patrón arquitectónico más complejo y reutilizable del proyecto. Resuelve el problema de exponer operaciones async (MQTT fire-and-forget) como síncronas desde una API HTTP.
(def hooks (atom {}))
(defn register-hook! [gw filter]
(let [id (str (random-uuid))
ch (async/chan (async/sliding-buffer 10))]
(swap! hooks assoc-in [gw id] {:matches filter :chan ch})
[id ch]))
(defn dispatch-to-hooks! [gw event]
(doseq [[id {:keys [matches chan]}] (get @hooks gw)]
(when (matches-filter? matches event)
(>!! chan event))))
;; Usa clojure.data/diff — si la diferencia izquierda es nil, el filtro matchea
(defn matches-filter? [filter data]
(let [[only-in-filter _ _] (data/diff filter data)]
(nil? only-in-filter)))
(defn send-and-wait [gw command expected-event-filter]
(let [[id ch] (register-hook! gw expected-event-filter)]
(publish-mqtt! gw (assoc command :packet_id id))
(let [result (alt!!
ch ([v] v)
(timeout 10000) ([_] {:error :timeout}))]
(swap! hooks #(update-in % [gw] dissoc id))
result)))
(->> (vals (:devices @db))
(filter #(= (:protocol %) "zwave"))
(map #(select-keys % [:id :name :type]))
(sort-by :name))
(->> (shell/sh "bash" "-c" "lsof -i TCP -n -P")
:out
str/split-lines
(map parse-lsof-line)
(filter #(= (:proc %) "sshd")))
;; Convierte lista de días a bitmask
(reduce
(fn [acc day]
(assoc acc (.indexOf days-array day) 1))
[0 0 0 0 0 0 0]
["monday" "wednesday"])
;; => [0 1 0 1 0 0 0]
(.getTopic publish-packet)
(.build builder)
(.toString exception)
(ZipGatewayCodec/ZipFrameBuilder.)
(import '(software.amazon.awssdk.iot.mqtt5
Mqtt5Client
Mqtt5ClientOptions$LifecycleEvents))
;; El SDK usa threads C nativos que hay que interrumpir manualmente
(->> (Thread/getAllStackTraces)
keys
(filter #(re-matches #"AwsEventLoop(.*)" (.getName %)))
(map #(.interrupt %))
doall)
(let [pb (ProcessBuilder. ["Z3Gateway" "-n" "1" "-p" "/dev/ttyUSB1"])]
(.redirectErrorStream pb true)
(let [proc (.start pb)
reader (io/reader (.getInputStream proc))]
(go-loop []
(when-let [line (.readLine reader)]
(handle-line line)
(recur)))))
(defjob PollDevicesJob [ctx]
(poll-all-devices!))
(defroutes app-routes
(POST "/login" req (handle-login req))
(GET "/:gw/devices" [gw] (list-devices gw))
(PUT "/:gw/devices/:id/lock" [gw id :as req] (lock-device gw id req))
(route/not-found {:error "not found"}))
(def app
(-> app-routes
(wrap-json-body {:keywords? true})
(wrap-json-response)
(wrap-cors :access-control-allow-origin #".*")
(wrap-params)))
(defn handle-connection [{:keys [gw-id protocol cert-path]}] ...)
(let [{:keys [proc user port open?]} connection-info] ...)
(let [{{:keys [type payload]} :data} event] ...)
(doseq [[id hook] (get @hooks gw-id)]
(when (matches-filter? (:matches hook) event)
(>!! (:chan hook) event)))
(let [[_ gw-id] (re-matches #"mqtt/hubapp/gateway_(.*)/event/v1" topic)]
(when gw-id (dispatch-event! gw-id payload)))
;; #'app pasa la var, no el valor — permite recargar el handler sin reiniciar el servidor
(server/run-server #'app {:port 9096 :thread 16})
(spit "devices.db" (pr-str @db))
(reset! db (read-string (slurp "devices.db")))
(def config (read-string (slurp "gateway.conf")))
El SDK de AWS IoT usa una librería C nativa para el stack de red. Al reconectar el cliente MQTT, los threads del event loop nativo no se limpian solos, acumulándose hasta crashear el proceso.
Solución: antes de cada reconexión, se interrumpen manualmente los threads filtrando por nombre:
(defn kill-native-threads! []
(->> (Thread/getAllStackTraces)
keys
(filter #(re-matches #"AwsEventLoop(.*)" (.getName %)))
(map #(.interrupt %))
doall))
Thread/interrupt es la única forma de señalizar threads bloqueados en I/O nativo desde el lado JVM.
Los gateways están en redes domésticas con NAT. Para acceso remoto sin configurar port-forwarding:
$aws/things/{name}/tunnels/notifyiot-<gw-id> con shell=/bin/falseauthorized_keys con restricciones duras:
no-pty,no-X11-forwarding,permitopen="localhost:9123",command="/bin/echo" ssh-rsa [KEY]
localproxy de AWS para conectar el túnelEl gateway llega a campo sin certificados permanentes. Flow completamente async vía MQTT:
$aws/certificates/create/json$aws/provisioning-templates/my-fleet/provision/jsonCada paso es un exchange MQTT distinto, con el hook pattern manejando la correlación.
Z-Wave habla UDP a localhost con frames binarios propietarios:
(bit-test bitmask (dec node-id))(reverse (partition 2 hex-str))El Z3Gateway no tiene API binaria — se interactúa vía stdin/stdout de un proceso hijo. Se implementó un parser con regex sobre el stream de líneas:
Formato de frame: "$len,dep,sep,clus,nodeid,cmd:payload[...]#"
Un go-loop lee line-seq del proceso, parsea con regex, y despacha a los hooks correspondientes.
| Aspecto | Gateway (fw) | Proxy |
|---|---|---|
| Corre en | Hardware embebido Linux | Servidor cloud |
| Protocolos físicos | Z-Wave (UDP binario), Zigbee (CLI) | Solo MQTT5 WebSocket |
| Estado | Atom + EDN a disco | Solo en memoria |
| Auth | Sin auth (red local) | BCrypt passwords + tokens con expiración |
| AWS | IoT Device SDK (mTLS) | IoT SDK (SigV4) + DynamoDB + Lightsail |
| MQTT auth | Certificados mTLS | IAM Access Key + SigV4 |
| Timeout MQTT | 30 segundos | 10 segundos |
| Buffer channels | Sin buffer especificado | sliding-buffer 10 |
Los dos comparten el mismo hook pattern para correlación MQTT — surgió en el gateway y fue evolucionado en el proxy con filtros más complejos (data/diff) y sliding buffers.