Programación concurrente en Clojure: agentes
Clojure es un lenguaje de programación pensado por y para la concurrencia. Las características funcionales del lenguaje, como la inmutabilidad de sus estructuras de datos, contribuyen a que mantener varios hilos de ejecución sea sencillo, cada uno con su propia copia inmutable de los datos, ya que de esta manera, es imposible que un hilo modifique el estado de la computación de otro hilo.
Sin embargo, hay veces que es necesario compartir el estado entre varios hilos o, dado que Clojure se ejecuta sobre la máquina virtual de Java y es posible acceder a todos los objetos y clases de Java, trabajamos con estructuras de datos no inmutables.
Clojure ofrece diferentes formas para tratar estas situaciones automáticamente, de forma que se eliminen los problemas de concurrencia que aparecen en estas circunstancias, los agentes son una de ellas.
Un agente es una abstracción que mantiene el estado de una computación, actualizándolo mediante la ejecución de forma asíncrona de funciones sobre ese estado que son enviadas por otros hilos mediante la función (send). Estas funciones se van almacenando en una estructura FIFO y van siendo aplicadas secuencialmente por el agente en su propio pool de hilos.
Si habéis trabajado con Erlang, podéis pensar en los agentes Clojure como una versión recortada de los servidores Erlang.
Un agente clojure se declara con la función (agent):
(agent initial-state)
Donde initial-state puede ser cualquier estructura de datos. Por ejemplo, vamos a construir un logger asíncrono y thread-safe. Nuestro logger mantendrá pues un nivel de logging y un sitio donde escribir el log, por defecto le daremos el nivel "1" y la salida estándar:
(def *logger* (agent {:level 1 :output *out*}))
Para loguear los mensajes, necesitamos una función (log), que recibirá un nivel de logging y algo que loguear, como usar números para los niveles de log queda algo feo, definamos una función que transforma claves para los niveles de log en enteros:
(defn level-name-to-int
"Returns a numeric identifier for the levels of the logger"
([level]
(let [levels {:debug 0
:info 1
:warning 2
:error 3
:critical 4}]
(let [to-return (level levels)]
(if (nil? to-return) 6 to-return)))))
Ahora que tenemos niveles de log, necesitamos alguna forma de cambiar el nivel de logging actual almacenado en el agente del logger. Como ya mencionamos, para cambiar el estado de un agente, necesitamos una función que será enviada al agente para que la ejecute en su pool de hilos sobre el estado actual almacenado. La función de actualización debe recibir como primer argumento el estado del agente, una serie de parámetros opcionales y devolver un estado del agente modificado. Usando notación Haskell:
updateFunction :: AgentState -> [Args] -> AgentState
En nuestro caso, nuestra función para modificar el nivel de logging del agente puede ser:
(defn reset-logger-level
"Initializes the logger with a given level"
([level]
(send *logger* (fn [a l] {:level (level-name-to-int l) :output (:output a)}) level)))
Como se puede ver, enviamos al agente *logger* una función lambda, donde se transforma la estructura de dats almacenada en el agente, sustituyendo el viejo nivel de logeo con el suministrado en el argumento "level".
Ahora, necesitamos la función de logueo. Crearemos dos versiones, una que recibe un nivel y el código a loguear, y si el nivel del agente logger es adecuado loguea el resultado del código pasado a la función de log y lo devuelve tal cual. La segunda, recibirá además una descripción que logueará junto al resultado de ejecutar el códigos suministrado como argumento:
(defn log "logs something in the logger with the provided level of log" ([level to-log] (do (send *logger* (fn [a level msg] (do (when (<= (:level a) (level-name-to-int level)) (let [to-write (str (. (keyword-to-string level) toUpperCase) " " (. (new java.util.Date) toString) " => "(str msg) "\n") writer (:output a)] (do (. writer (write to-write 0 (. to-write length))) (. writer flush)))) a)) level to-log) to-log)) ([level desc to-log] (do (send *logger* (fn [a level msg] (do (when (<= (:level a) (level-name-to-int level)) (let [to-write (str (. (keyword-to-string level) toUpperCase) " " (. (new java.util.Date) toString) " => " desc " " (str msg) "\n") writer (:output a)] (do (. writer (write to-write 0 (. to-write length))) (. writer flush)))) a)) level to-log) to-log)))
En este caso, como en el anterior, enviamos una función lambda al agente con send para que haga el trabajo, esta función loguea, el resultado de la ejecución comprobando el nivel de log y añadiendo alguna información extra como la hora.
Es interesante notar que la función log no es bloqueante, en cuanto se ha enviado la función lambda al agente, retorna y la ejecución de la computación continúa, se haya logueado el mensaje o no, además como la ejecución de funciones en el pool del agente es FIFO, los mensajes se van a loguear en el orden de ejecución, sin que se "cuelen" mensajes en el orden de ejecución (la función send es atómica).
Aquí se puede comprobar como funciona en ejecución nuestro flamante logger:
user=> (reset-logger-level :debug) (reset-logger-level :debug) #=(clojure.lang.Agent. "clojure.lang.Agent@219009") user=> (log :info "hola?" (+ 1 2)) (log :info "hola?" (+ 1 2)) 3 user=> INFO Mon Jan 12 00:25:54 CET 2009 => hola? 3 (reset-logger-level :error) (reset-logger-level :error) #=(clojure.lang.Agent. "clojure.lang.Agent@219009") user=> (log :info "ahora no... " (* 3 3)) (log :info "ahora no... " (* 3 3)) 9 user=>
Como se puede ver, se puede meter llamadas a log en medio de la ejecución del código ya que es transparente para una llamada a función, además se puede comprobarla ejecución asíncrona del logger, comprobando como se cuela el logueo de la llamada para (log :info "hola?" (+ 1 2)) antes de que la shell de clojure muestre el cursor para la siguiente entrada de datos por parte del usuario.
Para terminar, vamos a hacer que en vez de loguear en la salida estándar, mande el log a un fichero. Como ya hemos dicho, Clojure es 100% java, y la variable especial de salida estándar *out*, no es más que un objeto java.io.OutputStreamWriter, así que vamos a hacer un par de funciones, una para cambiar el destino de logueo almacenado en el estado del agente, y otro para poder iniciarlo con un fichero de salida:
(defn reset-logger
"Initializes the logger with a given level and output writer"
([level writer]
(send *logger* (fn [a l w] {:level (level-name-to-int l) :output w}) level writer)))
(defn reset-logger-with-file-output
"Resets the logger setting the output to the file whose path is provided as an argument
with the level specified"
([level file-path]
(reset-logger level (new FileWriter (new File file-path) true))))
Ya tenemos terminado nuestro logger, de ultimísima tecnología, con el que no tendremos problemas como el que te puedes encontrar en Merb, cuando el tiempo que tarda en mostrar el log de tu aplicación web acaba provocando un timeout por parte del cliente. ;)
Por último, queda comentar sobre los agentes que les ocurre cuando la función de actualización lanza una excepción: intentemos loguear en null, algo que seguro no es una buena idea:
(send *logger* (fn [_] {:output nil :level 1}))
(send *logger* (fn [_] {:output nil :level 1}))
#=(clojure.lang.Agent. "clojure.lang.Agent@219009")
user=>
(log :error "????")
"????"
El mensaje no ha sido loguead, como esperábamos, pero tampoco parece que haya explotado nada, intentemos loguear algo más
user=> (log :error "are you alive?") (log :error "are you alive?") java.lang.Exception: Agent has errors (NO_SOURCE_FILE:0) user=>
Vaya, parece que nuestro agente se ha quedado en un estado de error. Este es el comportamiento de los agentes Clojure ante los errores, se quedan suspendidos hasta que reseteemos el estado del mismo con la función (clear-agent-errors)
user=> (clear-agent-errors *logger*) (clear-agent-errors *logger*) nil
¡Listo para usar otra vez!
El código, como siempre, lo podéis descargar de Github
