Like Refs, Agents provide shared access to mutable state. Where Refs support coordinated, synchronous change of multiple locations, Agents provide independent, asynchronous change of individual locations. Agents are bound to a single storage location for their lifetime, and only allow mutation of that location (to a new state) to occur as a result of an action. Actions are functions (and, optionally, additional arguments) that are asynchronously applied to an Agent's state and whose return value becomes the Agent's new state. Because actions are functions they can also be multimethods and therefore actions are potentially polymorphic. Also, because the set of functions is open, the set of actions supported by an Agent is also open, a sharp contrast to pattern matching message handling loops.
Clojure's Agents are reactive, not autonomous - there is no imperative message loop and no blocking receive. The state of an Agent should be itself immutable (preferably an instance of one of the persistent collections), and the state of an Agent is always immediately available for reading by any thread (using the deref function, reader macro @) without any messages, i.e. observation does not require cooperation/coordination.
Agent action dispatches take the form (send agent fn args*). send always returns immediately. At some point later, in another thread, the fn will be applied to the state of the Agent and the args, if any were supplied. The return value of the function will become the new state of the Agent. If during the function execution any other dispatches are made (directly or indirectly), they will be held until after the state of the Agent has been changed. If any exceptions are thrown by an action function, no nested dispatches will occur, and the exception will be cached in the Agent itself. When an Agent has cached errors any subsequent interactions will immediately throw an exception, until the agent's errors are cleared. Agent errors can be examined with agent-errors and cleared with clear-agent-errors.
The actions of all Agents get interleaved amongst threads in a thread pool. At any point in time, at most one action for each Agent is being executed. Actions dispatched to an agent from another single agent or thread will occur in the order they were sent, potentially interleaved with actions dispatched to the same agent from other sources.
Agents are integrated with the STM - any dispatches made in a transaction are held until it commits, and are discarded if it is retried or aborted.
As with all of Clojure's concurrency support, no user-code locking is involved.
(agent init-state)
Creates and returns an agent with an initial value of init-state.
(deref agent)
@agent
Returns the state of the agent. Ok to call at any time.
(send agent action-fn args*)
Dispatch an action to an agent. Returns the agent immediately. Subsequently, in a thread in a thread pool, the state of the agent will be set to the value of:
(apply action-fn state-of-agent args)
(send-off agent action-fn args*)
Dispatch a potentially blocking action to an agent. Returns the
agent immediately. Subsequently, in a separate thread, the state of
the agent will be set to the value of:
(apply action-fn state-of-agent args)
(agent-errors agent)
Returns a sequence of the exceptions thrown during asynchronous actions of the agent.
(clear-agent-errors agent)
Clears any exceptions thrown during asynchronous actions of the agent, allowing subsequent actions to occur.
(await agents+)
Blocks the current thread (indefinitely!) until all actions dispatched thus far (from this thread or agent) to the agent(s) have occurred.
(await-for timeout-ms agents+)
Blocks the current thread until all actions dispatched thus far (from this thread or agent) to the agents have occurred, or the timeout (in milliseconds) has elapsed. Returns nil if returning due to timeout, non-nil otherwise.
This example is an implementation of the send-a-message-around-a-ring test. A chain of n agents is created, then a sequence of m actions are dispatched to the head of the chain and relayed through it.
(defn setup [n next]
(if (zero? n)
next
(recur (dec n) (agent {:next next}))))
(defn relay [x m]
(when (:next x)
(send (:next x) relay m))
(when (and (zero? m) (:report-queue x))
(. (:report-queue x) (put m)))
x)
(defn run [m n]
(let [q (new java.util.concurrent.SynchronousQueue)
tl (agent {:report-queue q})
hd (setup (dec n) tl)]
(doseq m (reverse (range m))
(send hd relay m))
(. q (take))))
;1 million message sends
(time (run 1000 1000))
->"Elapsed time: 2959.254 msecs"