(with-open [f (io/reader (io/file some-file))]
(line-seq f))
line-seq
will return a lazy seq of lines read from some-file
, but if the lazy seq escapes the dynamic extent of with-open
, then you will get an exception:
IOException Stream closed java.io.BufferedReader.ensureOpen (BufferedReader.java:115)
With laziness, the callee produces data, but the caller can control when data is produced. However, sometimes the data that is produced has associated resources that must be managed. Leaving the caller in control of when data is produced means the caller must know about and manage the related resources. Using a lazy sequence is like co-routines passing control back and forth between the caller and callee, but it only transfers control for each item, there is no way to run a cleanup routine after the caller has decided to stop consuming the sequence.
A Tempting Solution
One might immediately think about putting the resource control into the lazy seq:
(defn my-line-seq* [rdr [line & lines]]
(if line
(cons line (lazy-seq (my-line-seq* rdr lines)))
(do (.close rdr)
nil)))
(defn my-line-seq [some-file]
(let [rdr (io/reader (io/file some-file))
lines (line-seq rdr)]
(my-line-seq* rdr lines)))
This way the caller can consume the sequence how it wants, but the callee remains in control of the resources. The problem with this approach is the caller is not guaranteed to fully consume the sequence, and unless the caller fully consumes the sequence the file reader will never get closed.
An Actual Solution
There is a way to fix this. You can require the caller to pass in a function to consume the generated data, then the callee can manage the resource and execute the function. It might look something like:
(defn process-the-file [some-file some-fn]
(with-open [f (io/reader (io/file some-file))]
(doall (some-fn (line-seq f)))))
(process-the-file my-file-name do-the-things)
Once upon a time clojure.java.jdbc used to have a
with-query-results
macro that would expose a lazy seq of query results, and you had these resource management issues. Then it was changed to use this second approach where you pass in functions.There is a hitch to this approach. Now the callee has to know more about how the caller's logic works. For instance, in the above code you are assuming that
some-fn
returns a sequence that you can pass to doall
, but what if some-fn
reduces the sequence of lines down to a scalar value? Perhaps process-the-file
could take two functions seq-fn
and item-fn
:
(defn process-the-file [some-file item-fn seq-fn]
(with-open [f (io/reader (io/file some-file))]
(seq-fn (map item-fn (line-seq f)))))
(process-the-file my-file-name do-a-thing identity)
That's better? I still see two problems:
- The caller is back to having to know/worry about resource management, because it could pass a
seq-fn
that does not fully realize the lazy seq before it escapes thewith-open
- The logic hooks that
process-the-file
provides may never be quite right. What about a hook for when the file is open? How about when it is closed?
An additional design consequence is that you are inverting control from what it was in the lazy seq case. Whereas before the caller had control over when the data is consumed, now the callee does. You have to break your logic up into small chunks that can be passed into
process-the-file
, which can make the code a bit harder to follow, and you must put your sharded logic close to the callsite for process-the-file
(i.e. you cannot take a lazy sequence from process-the-file
and pass it to another part of your code for processing). There are advantages and disadvantages to this consequence, so it is not necessarily bad, it is just something you have to consider.Another Solution
We can also solve this by using a different mechanism in Clojure: reduction. Normally you would think of the reduction process as taking a collection and producing a scalar value:
(defn process-the-file [some-file some-fn]
(with-open [f (io/reader (io/file some-file))]
(reduce (fn [a v] (conj a (somefn v)) [] (line-seq f))))
(process-the-file my-file-name do-a-thing)
While this may look very similar to our first attempt, we have some options for improving it. Ideally we'd like to push the resource management into the reduction process and pull the logic out. We can do this by reifying a couple of Clojure interfaces, and by taking advantage of transducers.
If we can wrap a stream in an object that is reducible, then it can manage its own resources. The reduction process puts the collection in control of how it is reduced, so it can clean up resources even in the case of early termination. When we also make use of transducers, we can keep our logic together as a single transformation pipeline, but pass the logic into the reduction process.
I have created a library called pjstadig/reducible-stream, which will create this wrapper object around a stream. There are several functions that will fuse an input stream, a decoding process, and resource management into an reducible object. Let's take a look at them:
decode-lines!
will take an input stream and produce a reducible collection of the lines from that stream.decode-edn!
will take an input stream and produce a reducible collection of the objects read from that stream (using clojure.edn/read).decode-clojure!
will take an input stream and produce a reducible collection of the objects read from that stream (using clojure.core/read).decode-transit!
will take an input stream and produce a reducible collection of the objects read from that stream.
decode!
function that encapsulates the general abstraction, and can be used for some other kind of decoding process. Here is an example of the use of decode-lines!
:
(into []
(comp (filter (comp odd? count))
(take-while (complement #(string/starts-with? % "1"))))
(decode-lines! (io/input-stream (io/file "/etc/hosts"))))
This code will parse
/etc/hosts
into lines keeping only lines with an odd number of characters until it finds a line that starts with the number '1'. Whether the process consumes the entire file or not, the input stream will be closed.Advantages:
- This reducible object can be created and passed around to other bits of code until it is ready to be consumed.
- When the object is consumed either partially or fully the related resources will be cleaned up.
- Logic can be defined separately and in total (as a transducer), and can be applied to other sources like channels, collection, etc..
- This object can only be consumed once. If you try to consume it again, you will get an exception because the stream is already closed.
- If you treat this object like a sequence, it will fully consume the input stream and fully realize the decoded data in memory. In certain uses cases this may be an acceptable tradeoff for having the resources automatically managed for you.
Summary
Clojure affords you several different tools for deciding how to construct your logic and manage resources when you are processing collections. Laziness is one tool and it has advantages and disadvantages. It's main disadvantage is around managing resources.By making use of transducers and the reduction process in a smart way, we can produce an object that can manage its own resources while also allowing collection processing logic to be defined externally. The library pjstadig/reducible-stream provides a way to construct these reducible wrappers with decoding and resource management fused to a stream.
Acknowledgments
Special hat tip to hiredman. His treatise on reducers is well worth the read. Many moons ago it got me started thinking about these things, and I think with transducers on the scene, the idea of a collection managing its own resources during reduction is even more interesting.
No comments:
Post a Comment