Skip to content

Commit

Permalink
cljstyle fix
Browse files Browse the repository at this point in the history
  • Loading branch information
greglook committed Mar 22, 2024
1 parent 9c1fef3 commit 4519497
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 78 deletions.
7 changes: 3 additions & 4 deletions .cljstyle
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
;; Clojure style configs
;; vim: filetype=clojure
{:padding-lines 2
:max-consecutive-blank-lines 3
:file-ignore #{".git" "target"}
:indents {,,,}}
{:files
{:ignore #{".git" "target"}}}
2 changes: 0 additions & 2 deletions blocks-tests/src/blocks/store/tests.clj
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
blocks))



;; ## Operation Generators

;; Appease clj-kondo
Expand Down Expand Up @@ -340,7 +339,6 @@
(into [] (mapcat #(% ctx)) op-gens))))



;; ## Operation Testing

(defn- start-store
Expand Down
16 changes: 6 additions & 10 deletions src/blocks/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
(instance? Multihash x))



;; ## Block IO

(defn loaded?
Expand Down Expand Up @@ -191,7 +190,6 @@
true))



;; ## Storage API

(defn ->store
Expand Down Expand Up @@ -219,14 +217,14 @@
[store & opts]
(let [opts (args->map opts)
opts (merge
; Validate algorithm option.
;; Validate algorithm option.
(when-let [algorithm (:algorithm opts)]
(if (keyword? algorithm)
{:algorithm algorithm}
(throw (IllegalArgumentException.
(str "Option :algorithm is not a keyword: "
(pr-str algorithm))))))
; Validate 'after' boundary.
;; Validate 'after' boundary.
(when-let [after (:after opts)]
(cond
(hex-string? after)
Expand All @@ -239,7 +237,7 @@
(throw (IllegalArgumentException.
(str "Option :after is not a hex string or multihash: "
(pr-str after))))))
; Validate 'before' boundary.
;; Validate 'before' boundary.
(when-let [before (:before opts)]
(cond
(hex-string? before)
Expand All @@ -252,14 +250,14 @@
(throw (IllegalArgumentException.
(str "Option :before is not a hex string or multihash: "
(pr-str before))))))
; Validate query limit.
;; Validate query limit.
(when-let [limit (:limit opts)]
(if (pos-int? limit)
{:limit limit}
(throw (IllegalArgumentException.
(str "Option :limit is not a positive integer: "
(pr-str limit))))))
; Ensure no other options.
;; Ensure no other options.
(when-let [bad-opts (not-empty (dissoc opts :algorithm :after :before :limit))]
(throw (IllegalArgumentException.
(str "Unknown options passed to list: " (pr-str bad-opts))))))]
Expand Down Expand Up @@ -396,7 +394,6 @@
(io! (store/-delete! store id))))



;; ## Batch API

(defn get-batch
Expand Down Expand Up @@ -448,7 +445,6 @@
(d/success-deferred #{})))



;; ## Storage Utilities

(defn scan
Expand Down Expand Up @@ -483,7 +479,7 @@
(meter/measure-method
store :erase! nil
(store/-erase! store))
; TODO: should be able to parallelize this - how to communicate errors?
;; TODO: should be able to parallelize this - how to communicate errors?
(s/consume-async
(fn erase-block
[block]
Expand Down
6 changes: 2 additions & 4 deletions src/blocks/data.clj
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
content
_meta]

;:load-ns true
;; :load-ns true


java.lang.Object
Expand Down Expand Up @@ -107,7 +107,6 @@
(.write w (str v)))



;; ## Content Readers

(defprotocol ContentReader
Expand Down Expand Up @@ -160,7 +159,7 @@

(read-range
[this start end]
; Ranged open not supported for generic functions, use naive approach.
;; Ranged open not supported for generic functions, use naive approach.
(bounded-input-stream (this) start end)))


Expand All @@ -186,7 +185,6 @@
(persistent-bytes? (.content block)))



;; ## Constructors

;; Remove automatic constructor function.
Expand Down
3 changes: 0 additions & 3 deletions src/blocks/meter.clj
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
(log/warn ex "Failure while recording metric")))))



;; ## Stream Metering

(def ^:dynamic *io-report-period*
Expand Down Expand Up @@ -141,7 +140,6 @@
(.close input-stream))))))



;; ## Metered Content

(deftype MeteredContentReader
Expand Down Expand Up @@ -182,7 +180,6 @@
block)))



;; ## Method Wrappers

(defn measure-stream
Expand Down
49 changes: 23 additions & 26 deletions src/blocks/store.clj
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
value which yields when the store is erased."))



;; ## Store Construction

(defn parse-uri
Expand Down Expand Up @@ -120,7 +119,6 @@
(privatize! ~(symbol (str "map->" record-name)))))



;; ## Async Utilities

(defn ^:no-doc schedule-future!
Expand Down Expand Up @@ -167,7 +165,6 @@
result))))))



;; ## Stream Utilities

(defn preferred-block
Expand All @@ -193,30 +190,30 @@
(fn test-block
[block]
(if (instance? Throwable block)
; Propagate error on the stream.
;; Propagate error on the stream.
(do (s/put! out block)
(s/close! out)
(d/success-deferred false))
; Determine if block matches query criteria.
;; Determine if block matches query criteria.
(let [id (:id block)
hex (multihash/hex id)]
(cond
; Ignore any blocks which don't match the algorithm.
;; Ignore any blocks which don't match the algorithm.
(and algorithm (not= algorithm (:algorithm id)))
(d/success-deferred true)

; Drop blocks until an id later than `after`.
;; Drop blocks until an id later than `after`.
(and after (not (neg? (compare after hex))))
(d/success-deferred true)

; Terminate the stream if block is later than `before` or `limit`
; blocks have already been returned.
;; Terminate the stream if block is later than `before` or `limit`
;; blocks have already been returned.
(or (and before (not (pos? (compare before hex))))
(and (pos-int? limit) (< limit (swap! counter inc))))
(do (s/close! out)
(d/success-deferred false))

; Otherwise, pass the block along.
;; Otherwise, pass the block along.
:else
(s/put! out block)))))
out
Expand All @@ -243,7 +240,7 @@
out (s/stream)]
(d/loop [inputs (map vector intermediates (repeat nil))]
(d/chain
; Take the head value from each stream we don't already have.
;; Take the head value from each stream we don't already have.
(->>
inputs
(map (fn take-next
Expand All @@ -254,39 +251,39 @@
(partial vector input))
pair)))
(apply d/zip))
; Remove drained streams from consideration.
;; Remove drained streams from consideration.
(fn remove-drained
[inputs]
(remove #(identical? ::drained (second %)) inputs))
; Find the next earliest block to return.
;; Find the next earliest block to return.
(fn find-next
[inputs]
(if (empty? inputs)
; Every input is drained.
;; Every input is drained.
(s/close! out)
; Check inputs for errors.
;; Check inputs for errors.
(if-let [error (->> (map second inputs)
(filter #(instance? Throwable %))
(first))]
; Propagate error.
;; Propagate error.
(d/finally
(s/put! out error)
#(s/close! out))
; Determine the next block to output.
;; Determine the next block to output.
(let [earliest (first (sort-by :id (map second inputs)))]
(d/chain
(s/put! out earliest)
(fn check-put
[result]
(if result
; Remove any blocks matching the one emitted.
;; Remove any blocks matching the one emitted.
(d/recur (mapv (fn remove-earliest
[[input head :as pair]]
(if (= (:id earliest) (:id head))
[input nil]
pair))
inputs))
; Out was closed on us.
;; Out was closed on us.
false)))))))))
(s/source-only out))))

Expand Down Expand Up @@ -322,34 +319,34 @@
(fn compare-next
[[s d]]
(cond
; Source stream exhausted; terminate sequence.
;; Source stream exhausted; terminate sequence.
(identical? ::drained s)
(close-all!)

; Destination stream exhausted; return remaining blocks in source.
;; Destination stream exhausted; return remaining blocks in source.
(identical? ::drained d)
(-> (s/put! out s)
(d/chain
(fn [_] (s/drain-into src out)))
(d/finally close-all!))

; Source threw an error; propagate it.
;; Source threw an error; propagate it.
(instance? Throwable s)
(d/finally
(s/put! out s)
close-all!)

; Dest threw an error; propagate it.
;; Dest threw an error; propagate it.
(instance? Throwable d)
(d/finally
(s/put! out d)
close-all!)

; Block is present in both streams; drop and continue.
;; Block is present in both streams; drop and continue.
(= (:id s) (:id d))
(d/recur nil nil)

; Source has a block not in dest.
;; Source has a block not in dest.
(neg? (compare (:id s) (:id d)))
(d/chain
(s/put! out s)
Expand All @@ -358,7 +355,7 @@
(when result
(d/recur nil d))))

; Next source block comes after some dest blocks; skip forward.
;; Next source block comes after some dest blocks; skip forward.
:else
(d/recur s nil)))))
(s/source-only out)))
1 change: 0 additions & 1 deletion src/blocks/store/buffer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@
(s/reduce sum/update (sum/init))))



;; ## Constructors

(store/privatize-constructors! BufferBlockStore)
Expand Down
13 changes: 6 additions & 7 deletions src/blocks/store/cache.clj
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
(let [{:keys [priorities total-size]} @state]
(if (and (< (- size-limit total-size) target-free)
(seq priorities))
; Need to delete the next block.
;; Need to delete the next block.
(let [[id [_ size]] (peek priorities)]
(swap! state remove-block id)
(d/chain
Expand All @@ -93,7 +93,7 @@
(d/recur (if deleted?
(sum/update deleted {:id id, :size size})
deleted)))))
; Enough free space, or no more blocks to delete.
;; Enough free space, or no more blocks to delete.
deleted)))))


Expand Down Expand Up @@ -165,20 +165,20 @@
(fn recache
[block]
(cond
; Block not present in cache or primary.
;; Block not present in cache or primary.
(nil? block)
nil

; Block is already cached.
;; Block is already cached.
(::cached? (meta block))
(do (swap! state touch-block block)
block)

; Determine whether to cache the primary block.
;; Determine whether to cache the primary block.
(cacheable? this block)
(cache-block! this block)

; Non cacheable block from the primary store.
;; Non cacheable block from the primary store.
:else block))))


Expand Down Expand Up @@ -219,7 +219,6 @@
(constantly true))))



;; ## Constructors

(store/privatize-constructors! CachingBlockStore)
Expand Down
Loading

0 comments on commit 4519497

Please sign in to comment.