-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathcore.cljc
283 lines (259 loc) · 10.4 KB
/
core.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
(ns konserve.core
(:refer-clojure :exclude [get get-in update update-in assoc assoc-in exists? dissoc keys])
(:require [clojure.core.async :refer [chan put! poll!]]
[hasch.core :as hasch]
[konserve.protocols :refer [-exists? -get-meta -get-in -assoc-in
-update-in -dissoc -bget -bassoc
-keys]]
[konserve.utils :refer [meta-update #?(:clj async+sync) *default-sync-translation*]
#?@(:cljs [:refer-macros [async+sync]])]
[superv.async :refer [go-try- <?-]]
[taoensso.timbre :refer [trace #?(:cljs debug)]])
#?(:cljs (:require-macros [konserve.core :refer [go-locked locked]])))
;; ACID
;; atomic
;; consistent
;; isolated
;; durable
(defn get-lock [{:keys [locks] :as _store} key]
(or (clojure.core/get @locks key)
(let [c (chan)]
(put! c :unlocked)
(clojure.core/get (swap! locks (fn [old]
(trace "creating lock for: " key)
(if (old key) old
(clojure.core/assoc old key c))))
key))))
(defn wait [lock]
#?(:clj (while (not (poll! lock))
(Thread/sleep (long (rand-int 20))))
:cljs (when-not (some-> lock poll!)
(debug "WARNING: konserve lock is not active. Only use the synchronous variant with the memory store in JavaScript."))))
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(defmacro locked [store key & code]
`(let [l# (get-lock ~store ~key)]
(try
(wait l#)
(trace "acquired spin lock for " ~key)
~@code
(finally
(trace "releasing spin lock for " ~key)
(put! l# :unlocked)))))
(defmacro go-locked [store key & code]
`(go-try-
(let [l# (get-lock ~store ~key)]
(try
(<?- l#)
(trace "acquired go-lock for: " ~key)
~@code
(finally
(trace "releasing go-lock for: " ~key)
(put! l# :unlocked))))))
(defn exists?
"Checks whether value is in the store."
([store key]
(exists? store key {:sync? false}))
([store key opts]
(trace "exists? on key " key)
(async+sync (:sync? opts)
*default-sync-translation*
(go-locked
store key
(<?- (-exists? store key opts))))))
(defn get-in
"Returns the value stored described by key. Returns nil if the key
is not present, or the not-found value if supplied."
([store key-vec]
(get-in store key-vec nil))
([store key-vec not-found]
(get-in store key-vec not-found {:sync? false}))
([store key-vec not-found opts]
(trace "get-in on key " key-vec)
(async+sync (:sync? opts)
*default-sync-translation*
(go-locked
store (first key-vec)
(<?- (-get-in store key-vec not-found opts))))))
(defn get
"Returns the value stored described by key. Returns nil if the key
is not present, or the not-found value if supplied."
([store key]
(get store key nil))
([store key not-found]
(get store key not-found {:sync? false}))
([store key not-found opts]
(get-in store [key] not-found opts)))
(defn get-meta
"Returns the value stored described by key. Returns nil if the key
is not present, or the not-found value if supplied."
([store key]
(get-meta store key nil))
([store key not-found]
(get-meta store key not-found {:sync? false}))
([store key not-found opts]
(trace "get-meta on key " key)
(async+sync (:sync? opts)
*default-sync-translation*
(go-locked
store key
(let [a (<?- (-get-meta store key opts))]
(if (some? a)
a
not-found))))))
(defn update-in
"Updates a position described by key-vec by applying up-fn and storing
the result atomically. Returns a vector [old new] of the previous
value and the result of applying up-fn (the newly stored value)."
([store key-vec up-fn]
(update-in store key-vec up-fn {:sync? false}))
([store key-vec up-fn opts]
(trace "update-in on key " key-vec)
(async+sync (:sync? opts)
*default-sync-translation*
(go-locked
store (first key-vec)
(<?- (-update-in store key-vec (partial meta-update (first key-vec) :edn) up-fn opts))))))
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(defn update
"Updates a position described by key by applying up-fn and storing
the result atomically. Returns a vector [old new] of the previous
value and the result of applying up-fn (the newly stored value)."
([store key fn]
(update store key fn {:sync? false}))
([store key fn opts]
(trace "update on key " key)
(update-in store [key] fn opts)))
(defn assoc-in
"Associates the key-vec to the value, any missing collections for
the key-vec (nested maps and vectors) are newly created."
([store key-vec val]
(assoc-in store key-vec val {:sync? false}))
([store key-vec val opts]
(trace "assoc-in on key " key)
(async+sync (:sync? opts)
*default-sync-translation*
(go-locked
store (first key-vec)
(<?- (-assoc-in store key-vec (partial meta-update (first key-vec) :edn) val opts))))))
(defn assoc
"Associates the key-vec to the value, any missing collections for
the key-vec (nested maps and vectors) are newly created."
([store key val]
(assoc store key val {:sync? false}))
([store key val opts]
(trace "assoc on key " key)
(assoc-in store [key] val opts)))
(defn dissoc
"Removes an entry from the store. "
([store key]
(dissoc store key {:sync? false}))
([store key opts]
(trace "dissoc on key " key)
(async+sync (:sync? opts)
*default-sync-translation*
(go-locked
store key
(<?- (-dissoc store key opts))))))
(defn append
"Append the Element to the log at the given key or create a new append log there.
This operation only needs to write the element and pointer to disk and hence is useful in write-heavy situations."
([store key elem]
(append store key elem {:sync? false}))
([store key elem opts]
(trace "append on key " key)
(async+sync (:sync? opts)
*default-sync-translation*
(go-locked
store key
(let [head (<?- (-get-in store [key] nil opts))
[append-log? last-id first-id] head
new-elem {:next nil
:elem elem}
id (hasch/uuid)]
(when (and head (not= append-log? :append-log))
(throw (ex-info "This is not an append-log." {:key key})))
(<?- (-update-in store [id] (partial meta-update key :append-log) (fn [_] new-elem) opts))
(when first-id
(<?- (-update-in store [last-id :next] (partial meta-update key :append-log) (fn [_] id) opts)))
(<?- (-update-in store [key] (partial meta-update key :append-log) (fn [_] [:append-log id (or first-id id)]) opts))
[first-id id])))))
(defn log
"Loads the whole append log stored at key."
([store key]
(log store key {:sync? false}))
([store key opts]
(trace "log on key " key)
(async+sync (:sync? opts)
*default-sync-translation*
(go-try-
(let [head (<?- (get store key nil opts))
[append-log? _last-id first-id] head]
(when (and head (not= append-log? :append-log))
(throw (ex-info "This is not an append-log." {:key key})))
(when first-id
(loop [{:keys [next elem]} (<?- (get store first-id nil opts))
hist []]
(if next
(recur (<?- (get store next nil opts))
(conj hist elem))
(conj hist elem)))))))))
(defn reduce-log
"Loads the append log and applies reduce-fn over it."
([store key reduce-fn acc]
(reduce-log store key reduce-fn acc {:sync? false}))
([store key reduce-fn acc opts]
(trace "reduce-log on key " key)
(async+sync (:sync? opts)
*default-sync-translation*
(go-try-
(let [head (<?- (get store key nil opts))
[append-log? last-id first-id] head]
(when (and head (not= append-log? :append-log))
(throw (ex-info "This is not an append-log." {:key key})))
(if first-id
(loop [id first-id
acc acc]
(let [{:keys [next elem]} (<?- (get store id nil opts))]
(if (and next (not= id last-id))
(recur next (reduce-fn acc elem))
(reduce-fn acc elem))))
acc))))))
(defn bget
"Calls locked-cb with a platform specific binary representation inside
the lock, e.g. wrapped InputStream on the JVM and Blob in
JavaScript. You need to properly close/dispose the object when you
are done!
You have to do all work in locked-cb, e.g.
(fn [{is :input-stream}]
(let [tmp-file (io/file \"/tmp/my-private-copy\")]
(io/copy is tmp-file)))
When called asynchronously (by default or w/ {:sync? false}), the locked-cb
must synchronously return a channel."
([store key locked-cb]
(bget store key locked-cb {:sync? false}))
([store key locked-cb opts]
(trace "bget on key " key)
(async+sync (:sync? opts)
*default-sync-translation*
(go-locked
store key
(<?- (-bget store key locked-cb opts))))))
(defn bassoc
"Copies given value (InputStream, Reader, File, byte[] or String on
JVM, Blob in JavaScript) under key in the store."
([store key val]
(bassoc store key val {:sync? false}))
([store key val opts]
(trace "bassoc on key " key)
(async+sync (:sync? opts)
*default-sync-translation*
(go-locked
store key
(<?- (-bassoc store key (partial meta-update key :binary) val opts))))))
(defn keys
"Return a channel that will yield all top-level keys currently in the store."
([store]
(keys store {:sync? false}))
([store opts]
(trace "fetching keys")
(-keys store opts)))