Channels in Clojure

I have been kicking the tires on Google’s Go. I was especially impressed with its RPC framework example. I wanted to implement a similar framework in Clojure.

Let’s start with the client interface:

(ns rpc-example
  (import [java.util.concurrent BlockingQueue

(defn rpc [#^BlockingQueue q f & args]
  "Performs a blocking call using the given queue."
  (let [result (promise)]
    (.put q {:fn f :args args :result result})

While Clojure does not currently provide anything like Go’s channels, Java’s BlockingQueue is close enough1. A request is simply a map of a function, a list of arguments, and a promise2. This map is put on the queue for a waiting handler.

The handler function takes requests off the queue, processes them, and delivers the result:

(defn handler [#^BlockingQueue q]
  "Repeatedly processes requests on the given queue."
  (doseq [req (repeatedly #(.take q))]
    (let [result (apply (:fn req) (:args req))]
      (deliver (:result req) result))))

Finally, I define some functions to manage the execution of handlers. Like the Go example, I want the ability to run handlers concurrently. For that, I use future3.

(defn serve 
  "Create n handlers for the given queue."
  [#^BlockingQueue q n]
  (doall (for [_ (range n)]
           (future (handler q)))))

(defn quit [server]
  "Cancel all handlers for the given server."
  (doall (map future-cancel server)))

Putting the pieces together:

(def q (LinkedBlockingQueue. 10))
; #'rpc-example/q
(def rpc-call (future (rpc q + 1 2 3)))
; #'rpc-example/rpc-call
; #<Object$IDeref$Future$2b9be1f9@13c4c09: :pending>
(def server (serve q 3))
; #'rpc-example/server
; #<Object$IDeref$Future$2b9be1f9@13c4c09: 6>
(quit server)
; (true true true)

PS. An asynchronous client interface:

(defn rpc-async [#^BlockingQueue q callback f & args]
  (future (callback (apply rpc q f args))))

  1. Like channels, BlockingQueue provides both blocking (put and take) and non-blocking (offer and poll) queue operations. However, unlike Java’s poll that uses null to signal an empty queue, Go’s channels signal receive failures out-of-band, thus allowing them to transmit nil values. ↩︎

  2. Introduced in Clojure’s 1.1 release, a promise is a dataflow variable: it can be bound to a value once, and computations that rely on its value block until the value is provided. ↩︎

  3. Also a Clojure 1.1 feature, a future represents an asynchronous computation. The body is executed on another thread. Once created, requests for the result of the computation will block. However, in this case the computation is infinite and yields no value, so I use future-cancel to terminate it. ↩︎

All Posts
© 2022 Dave Jack. All rights reserved.