Skip to content

Commit

Permalink
Merge pull request #5 from opensrp/import-muliptle-location-tags
Browse files Browse the repository at this point in the history
Import multiple location tags
  • Loading branch information
ellykits authored Sep 4, 2021
2 parents 2832854 + 9a79171 commit 6d3f461
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 257 deletions.
2 changes: 1 addition & 1 deletion README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ For example run the following command to list available options (`app.properties

```shell script

java -jar opensrp-data-import-3.0.2-SNAPSHOT-fat.jar --configs-file app.properties --help
java -jar opensrp-data-import-3.0.3-SNAPSHOT-fat.jar --configs-file app.properties --help

```

Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ plugins {
}

group = "org.smartregister"
version = "3.0.2-SNAPSHOT"
version = "3.0.3-SNAPSHOT"

repositories {
mavenCentral()
Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/org/smartregister/dataimport/Debug.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import org.smartregister.dataimport.shared.*

fun main() {
val configs = JsonObject().apply {
put(IMPORT_OPTION, "keycloak_users")
put(IMPORT_OPTION, "locations")
// put(SOURCE_FILE, "assets/locations.csv")
put(USERS_FILE, "assets/liberia-users.csv")
// put(USERS_FILE, "assets/liberia-users.csv")
// put(SKIP_USER_GROUP, true)
// put(SKIP_LOCATION_TAGS, true)
// put(SKIP_LOCATIONS, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.vertx.core.buffer.Buffer
import io.vertx.core.eventbus.Message
import io.vertx.core.http.HttpMethod
import io.vertx.core.json.JsonArray
import io.vertx.core.json.JsonObject
import io.vertx.ext.web.client.HttpResponse
import io.vertx.kotlin.coroutines.await
import io.vertx.kotlin.coroutines.awaitEvent
Expand All @@ -18,22 +19,22 @@ import org.smartregister.dataimport.shared.*
import org.smartregister.dataimport.shared.model.LocationTag
import kotlin.math.ceil

/**
* Subclass of [BaseVerticle] which is the base class for all classes with OpenSRP related code
*/
/** Subclass of [BaseVerticle] which is the base class for all classes with OpenSRP related code */
abstract class BaseOpenSRPVerticle : BaseVerticle() {

protected var locationTagsMap = mapOf<String, LocationTag>()

/**
* This function consumes the messaged in the event bus from the [countAddress]. Once A count has been received,
* the function request for data from OpenMRS periodically (as configured via the [requestInterval] property)
* via event bus through [loadAddress]. This will prompt querying OpenMRS
* database for data once the data is received the callback method [action] with the data as its parameter.
* [dataItem]
* This function consumes the messages in the event bus from the [countAddress]. Once A count has
* been received, the function request for data from OpenMRS periodically via event bus through
* [loadAddress]. This will prompt querying OpenMRS database for data once the data is received
* the callback method [action] with the data as its parameter. [dataItem]
*/
protected fun consumeOpenMRSData(
dataItem: DataItem, countAddress: String, loadAddress: String, action: suspend (JsonArray) -> Unit
dataItem: DataItem,
countAddress: String,
loadAddress: String,
action: suspend (JsonArray) -> Unit
) {
val eventBus = vertx.eventBus()
eventBus.consumer<Int>(countAddress).handler { countMessage ->
Expand All @@ -46,9 +47,10 @@ abstract class BaseOpenSRPVerticle : BaseVerticle() {
startVertxCounter(dataItem = dataItem, dataSize = numberOfRequests.toLong())
while (offset <= count) {
awaitEvent<Long> { vertx.setTimer(getRequestInterval(dataItem), it) }
val message = awaitResult<Message<JsonArray>> { handler ->
eventBus.request(loadAddress, offset, handler)
}
val message =
awaitResult<Message<JsonArray>> { handler ->
eventBus.request(loadAddress, offset, handler)
}
action(message.body())
offset += limit
}
Expand All @@ -57,38 +59,96 @@ abstract class BaseOpenSRPVerticle : BaseVerticle() {
}
}
}
handleLocationTaskCompletion()
}

private fun handleLocationTaskCompletion() {
val eventBus = vertx.eventBus()
eventBus.consumer<String>(EventBusAddress.OPENMRS_TASK_COMPLETE).handler {
when (DataItem.valueOf(it.body())) {
DataItem.LOCATION_TAGS -> launch(vertx.dispatcher()) {
retrieveLocationTags()
deployVerticle(OpenMRSLocationVerticle(), poolName = OPENMRS_LOCATIONS)
}
DataItem.KEYCLOAK_USERS -> vertx.eventBus().send(EventBusAddress.OPENMRS_KEYCLOAK_USERS_GROUP_ASSIGN, true)
DataItem.LOCATION_TAGS ->
launch(vertx.dispatcher()) {
retrieveLocationTags()
deployVerticle(OpenMRSLocationVerticle(), poolName = OPENMRS_LOCATIONS)
}
DataItem.KEYCLOAK_USERS ->
vertx.eventBus().send(EventBusAddress.OPENMRS_KEYCLOAK_USERS_GROUP_ASSIGN, true)
else -> eventBus.send(EventBusAddress.APP_SHUTDOWN, true)
}
}
}

protected suspend fun consumeOpenMRSLocation(action: suspend (List<String>) -> Unit) {
handleLocationTaskCompletion()
val dataItem = DataItem.LOCATIONS
val eventBus = vertx.eventBus()
eventBus.consumer<Int>(EventBusAddress.OPENMRS_LOCATIONS_COUNT).handler { countMessage ->
var offset = 0
val count = countMessage.body()
val allLocations = JsonArray()
launch(vertx.dispatcher()) {
try {
while (offset <= count) {
awaitEvent<Long> { vertx.setTimer(getRequestInterval(dataItem), it) }
val message =
awaitResult<Message<JsonArray>> { handler ->
eventBus.request(EventBusAddress.OPENMRS_LOCATIONS_LOAD, offset, handler)
}
allLocations.addAll(message.body())
offset += limit
}

val taggedLocations: List<String> =
allLocations.asSequence()
.map { it as JsonObject }
.filter { it.containsKey(ID) }
.groupBy { it.getString(ID) }
.onEach { entry: Map.Entry<String, List<JsonObject>> ->
with(entry.value.first()) {
getJsonArray(LOCATION_TAGS).apply {
entry.value
.filter { it.containsKey(LOCATION_TAGS) }
.flatMap { it.getJsonArray(LOCATION_TAGS) }
.map { it as JsonObject }
.takeLast(entry.value.size - 1)
.forEach {
this.add(it)
}
}
updateLocationTagIds(this)
}
}.map { it.value.first().toString() }
.toList()
action(taggedLocations)
} catch (throwable: Throwable) {
vertx.exceptionHandler().handle(throwable)
}
}
}
}

/**
* Post [data] to the provided [url] keeping track of the number of responses from the server.This will help in correctly
* determining whether the task is fully completed. The counter starts off with number of requests sent to the server.
* When 0 it means that was the last request.
* Post [data] to the provided [url] keeping track of the number of responses from the server.This
* will help in correctly determining whether the task is fully completed. The counter starts off
* with number of requests sent to the server. When 0 it means that was the last request.
*/
protected suspend inline fun <reified T> postData(url: String, data: List<T>, dataItem: DataItem) {
protected suspend inline fun <reified T> postData(
url: String,
data: List<T>,
dataItem: DataItem
) {
try {
val item = dataItem.name.lowercase()
logger.info("Posting ${data.size} $item data to OpenSRP")
val json: String = jsonEncoder().encodeToString(data)
val counter = vertx.sharedData().getCounter(dataItem.name).await()

awaitResult<HttpResponse<Buffer>?> {
webRequest(url = url, payload = json, handler = it)
}?.run {
logHttpResponse()
logger.info("Posted ${data.size} $item to OpenSRP")
checkTaskCompletion(counter, dataItem)
}
awaitResult<HttpResponse<Buffer>?> { webRequest(url = url, payload = json, handler = it) }
?.run {
logHttpResponse()
logger.info("Posted ${data.size} $item to OpenSRP")
checkTaskCompletion(counter, dataItem)
}
} catch (throwable: Throwable) {
vertx.exceptionHandler().handle(throwable)
}
Expand All @@ -115,4 +175,25 @@ abstract class BaseOpenSRPVerticle : BaseVerticle() {
}
}

private fun updateLocationTagIds(location: JsonObject) {
// Delete locationTags attributes for locations without tags
if (location.containsKey(LOCATION_TAGS)) {
val locationTags = location.getJsonArray(LOCATION_TAGS, JsonArray())
val cleanedTags = JsonArray()
locationTags.forEach { tag ->
if (tag is JsonObject) {
when {
tag.getString(ID) == null -> location.remove(LOCATION_TAGS)
tag.getString(ID) != null -> {
val newTag = JsonObject(Json.encodeToString(locationTagsMap[tag.getString(NAME)]))
cleanedTags.add(newTag)
}
}
}
}
if (!cleanedTags.isEmpty) {
location.put(LOCATION_TAGS, cleanedTags)
}
}
}
}
Loading

0 comments on commit 6d3f461

Please sign in to comment.