forked from yogthos/migratus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcore.clj
232 lines (205 loc) · 8.09 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
;;;; Copyright © 2011 Paul Stadig
;;;;
;;;; Licensed under the Apache License, Version 2.0 (the "License"); you may not
;;;; use this file except in compliance with the License. You may obtain a copy
;;;; of the License at
;;;;
;;;; http://www.apache.org/licenses/LICENSE-2.0
;;;;
;;;; Unless required by applicable law or agreed to in writing, software
;;;; distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
;;;; WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
;;;; License for the specific language governing permissions and limitations
;;;; under the License.
(ns migratus.core
(:require
[clojure.set :as set]
[clojure.string :as str]
[clojure.tools.logging :as log]
[migratus.migrations :as mig]
[migratus.protocols :as proto]
migratus.database))
(defmacro ^{:private true} assert-args
[& pairs]
`(do (when-not ~(first pairs)
(throw (IllegalArgumentException.
(str (first ~'&form) " requires " ~(second pairs) " in " ~'*ns* ":" (:line (meta ~'&form))))))
~(let [more (nnext pairs)]
(when more
(list* `assert-args more)))))
(defmacro with-store
"bindings => name init
Evaluates body in a try expression with name bound to the value
of the init, and (proto/connect name) called before body, and a
finally clause that calls (proto/disconnect name)."
([bindings & body]
(assert-args
(vector? bindings) "a vector for its binding"
(= 2 (count bindings)) "exactly 2 forms in binding vector"
(symbol? (bindings 0)) "only Symbols in bindings")
(let [form (bindings 0) init (bindings 1)]
`(let [~form ~init]
(try
(proto/connect ~form)
~@body
(finally
(proto/disconnect ~form)))))))
(defn run [store ids command]
(try
(log/info "Starting migrations")
(proto/connect store)
(command store ids)
(catch java.sql.BatchUpdateException e
(throw (or (.getNextException e) e)))
(finally
(log/info "Ending migrations")
(proto/disconnect store))))
(defn require-plugin [{:keys [store]}]
(when-not store
(throw (Exception. "Store is not configured")))
(let [plugin (symbol (str "migratus." (name store)))]
(require plugin)))
(defn completed-migrations [config store]
(let [completed? (set (proto/completed-ids store))]
(filter (comp completed? proto/id) (mig/list-migrations config))))
(defn uncompleted-migrations
"Returns a list of uncompleted migrations.
Fetch list of applied migrations from db and existing migrations from migrations dir."
[config store]
(let [completed? (set (proto/completed-ids store))]
(remove (comp completed? proto/id) (mig/list-migrations config))))
(defn migration-name [migration]
(str (proto/id migration) "-" (proto/name migration)))
(defn- up* [store migration]
(log/info "Up" (migration-name migration))
(proto/migrate-up store migration))
(defn- migrate-up* [store migrations]
(let [migrations (sort-by proto/id migrations)]
(when (seq migrations)
(log/info "Running up for" (pr-str (vec (map proto/id migrations))))
(loop [[migration & more] migrations]
(when migration
(when (Thread/interrupted)
(log/info "Thread cancellation detected. Stopping migration.")
(throw (InterruptedException. "Migration interrupted by thread cancellation.")))
(case (up* store migration)
:success (recur more)
:ignore (do
(log/info "Migration reserved by another instance. Ignoring.")
:ignore)
(do
(log/error "Stopping:" (migration-name migration) "failed to migrate")
:failure)))))))
(defn- migrate* [config store _]
(let [migrations (->> store
(uncompleted-migrations config)
(sort-by proto/id))]
(migrate-up* store migrations)))
(defn migrate
"Bring up any migrations that are not completed.
Returns nil if successful, :ignore if the table is reserved, :failure otherwise.
Supports thread cancellation."
[config]
(run (proto/make-store config) nil (partial migrate* config)))
(defn- run-up [config store ids]
(let [completed (set (proto/completed-ids store))
ids (set/difference (set ids) completed)
migrations (filter (comp ids proto/id) (mig/list-migrations config))]
(migrate-up* store migrations)))
(defn up
"Bring up the migrations identified by ids.
Any migrations that are already complete will be skipped."
[config & ids]
(run (proto/make-store config) ids (partial run-up config)))
(defn- run-down [config store ids]
(let [completed (set (proto/completed-ids store))
ids (set/intersection (set ids) completed)
migrations (filter (comp ids proto/id)
(mig/list-migrations config))
migrations (reverse (sort-by proto/id migrations))]
(when (seq migrations)
(log/info "Running down for" (pr-str (vec (map proto/id migrations))))
(doseq [migration migrations]
(log/info "Down" (migration-name migration))
(proto/migrate-down store migration)))))
(defn down
"Bring down the migrations identified by ids.
Any migrations that are not completed will be skipped."
[config & ids]
(run (proto/make-store config) ids (partial run-down config)))
(defn- rollback* [config store _]
(run-down
config
store
(->> (proto/completed-ids store)
first
vector)))
(defn- reset* [config store _]
(run-down config store (->> (proto/completed-ids store) sort)))
(defn rollback
"Rollback the last migration that was successfully applied."
[config]
(run (proto/make-store config) nil (partial rollback* config)))
(defn reset
"Reset the database by down-ing all migrations successfully
applied, then up-ing all migratinos."
[config]
(run (proto/make-store config) nil (partial reset* config))
(migrate config))
(defn init
"Initialize the data store"
[config & [name]]
(proto/init (proto/make-store config)))
(defn create
"Create a new migration with the current date"
[config & [name type]]
(mig/create config name (or type :sql)))
(defn destroy
"Destroy migration"
[config & [name]]
(mig/destroy config name))
(defn select-migrations
"List pairs of id and name for migrations selected by the selection-fn."
[config selection-fn]
(with-store [store (proto/make-store config)]
(->> store
(selection-fn config)
(mapv (juxt proto/id proto/name)))))
(defn completed-list
"List completed migrations"
[config]
(let [migrations (select-migrations config completed-migrations)]
(log/debug (apply str "You have " (count migrations) " completed migrations:\n"
(str/join "\n" migrations)))
(mapv second migrations)))
(defn pending-list
"List pending migrations"
[config]
(let [migrations (select-migrations config uncompleted-migrations)]
(log/debug (apply str "You have " (count migrations) " pending migrations:\n"
(str/join "\n" migrations)))
(mapv second migrations)))
(defn migrate-until-just-before
"Run all migrations preceding migration-id. This is useful when testing that a
migration behaves as expected on fixture data. This only considers uncompleted
migrations, and will not migrate down."
[config migration-id]
(with-store [store (proto/make-store config)]
(->> (uncompleted-migrations config store)
(map proto/id)
distinct
sort
(take-while #(< % migration-id))
(apply up config))))
(defn rollback-until-just-after
"Migrate down all migrations after migration-id. This only considers completed
migrations, and will not migrate up."
[config migration-id]
(with-store [store (proto/make-store config)]
(->> (completed-migrations config store)
(map proto/id)
distinct
sort
reverse
(take-while #(> % migration-id))
(apply down config))))