Skip to content

Commit

Permalink
Fix location tags request counter
Browse files Browse the repository at this point in the history
  • Loading branch information
ellykits committed Sep 3, 2021
1 parent ad9b5c6 commit 7faf4c5
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,29 @@ package org.smartregister.dataimport.opensrp

import io.vertx.core.buffer.Buffer
import io.vertx.core.eventbus.Message
import io.vertx.core.http.HttpMethod
import io.vertx.core.json.JsonArray
import io.vertx.ext.web.client.HttpResponse
import io.vertx.kotlin.coroutines.await
import io.vertx.kotlin.coroutines.awaitEvent
import io.vertx.kotlin.coroutines.awaitResult
import io.vertx.kotlin.coroutines.dispatcher
import kotlinx.coroutines.launch
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.smartregister.dataimport.openmrs.OpenMRSLocationVerticle
import org.smartregister.dataimport.shared.*
import org.smartregister.dataimport.shared.model.LocationTag
import kotlin.math.ceil

/**
* Subclass of [BaseVerticle] which is the base class for all classes with OpenSRP related code
*/
abstract class BaseOpenSRPVerticle : BaseVerticle() {

protected var locationTagsMap = mapOf<String, LocationTag>()

/**
* This function consumes the messaged in the event bus from the [countAddress]. Once A count has been received,
* the function request for data from OpenMRS periodically (as configured via the [requestInterval] property)
Expand Down Expand Up @@ -54,6 +61,7 @@ abstract class BaseOpenSRPVerticle : BaseVerticle() {
eventBus.consumer<String>(EventBusAddress.OPENMRS_TASK_COMPLETE).handler {
when (DataItem.valueOf(it.body())) {
DataItem.LOCATION_TAGS -> launch(vertx.dispatcher()) {
retrieveLocationTags()
deployVerticle(OpenMRSLocationVerticle(), poolName = OPENMRS_LOCATIONS)
}
DataItem.KEYCLOAK_USERS -> vertx.eventBus().send(EventBusAddress.OPENMRS_KEYCLOAK_USERS_GROUP_ASSIGN, true)
Expand Down Expand Up @@ -90,4 +98,21 @@ abstract class BaseOpenSRPVerticle : BaseVerticle() {
val payload = JsonArray(jsonEncoder().encodeToString(data))
vertx.eventBus().send(address, payload)
}

protected suspend fun retrieveLocationTags() {
val locationTags =
awaitResult<HttpResponse<Buffer>?> {
webRequest(
method = HttpMethod.GET,
url = config.getString("opensrp.rest.location.tag.url"),
handler = it
)
}
?.body()
if (locationTags != null && config.getString(SOURCE_FILE).isNullOrBlank()) {
locationTagsMap =
Json.decodeFromString<List<LocationTag>>(locationTags.toString()).associateBy { it.name }
}
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package org.smartregister.dataimport.opensrp.location

import io.vertx.core.buffer.Buffer
import io.vertx.core.eventbus.Message
import io.vertx.core.http.HttpMethod
import io.vertx.core.json.JsonArray
import io.vertx.core.json.JsonObject
import io.vertx.ext.web.client.HttpResponse
import io.vertx.kotlin.coroutines.await
import io.vertx.kotlin.coroutines.awaitEvent
import io.vertx.kotlin.coroutines.awaitResult
import io.vertx.kotlin.coroutines.dispatcher
import kotlinx.coroutines.launch
import org.smartregister.dataimport.openmrs.OpenMRSLocationTagVerticle
import org.smartregister.dataimport.opensrp.BaseOpenSRPVerticle
import org.smartregister.dataimport.shared.*
import org.smartregister.dataimport.shared.EventBusAddress.OPENMRS_LOCATION_TAGS_LOAD

/**
* Subclass of [BaseOpenSRPVerticle] responsible for posting OpenSRP location tags and for deploying [OpenSRPLocationVerticle]
Expand All @@ -24,12 +28,26 @@ class OpenSRPLocationTagVerticle : BaseOpenSRPVerticle() {

if (sourceFile.isNullOrBlank()) {
deployVerticle(OpenMRSLocationTagVerticle(), OPENMRS_LOCATION_TAGS)
consumeOpenMRSData(
dataItem = DataItem.LOCATION_TAGS,
countAddress = EventBusAddress.OPENMRS_LOCATION_TAGS_COUNT,
loadAddress = EventBusAddress.OPENMRS_LOCATION_TAGS_LOAD,
action = this::postLocationTags
)
vertx.eventBus().consumer<Int>(EventBusAddress.OPENMRS_LOCATION_TAGS_COUNT).handler { countMessage ->
try {
var offset = 0
val count = countMessage.body()
launch(vertx.dispatcher()) {
startVertxCounter(DataItem.LOCATION_TAGS, count.toLong(), true)
while (offset <= count) {
awaitEvent<Long> { vertx.setTimer(getRequestInterval(DataItem.LOCATION_TAGS), it) }
val message = awaitResult<Message<JsonArray>> { handler ->
vertx.eventBus().request(OPENMRS_LOCATION_TAGS_LOAD, offset, handler)
}
postLocationTags(message.body())
offset += limit
}
}
} catch (throwable: Throwable) {
vertx.exceptionHandler().handle(throwable)
}
}

} else {
val skipLocationsTags = config.getBoolean(SKIP_LOCATION_TAGS)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import io.vertx.kotlin.coroutines.await
import io.vertx.kotlin.coroutines.awaitResult
import io.vertx.kotlin.coroutines.dispatcher
import kotlinx.coroutines.launch
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.smartregister.dataimport.opensrp.BaseOpenSRPVerticle
Expand All @@ -27,8 +26,6 @@ import java.util.*
*/
class OpenSRPLocationVerticle : BaseOpenSRPVerticle() {

private var locationTagsMap = mapOf<String, LocationTag>()

private var locationIdsMap = TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER)

private var organizationUsers = mapOf<String, List<KeycloakUser>>()
Expand Down Expand Up @@ -56,7 +53,6 @@ class OpenSRPLocationVerticle : BaseOpenSRPVerticle() {

try {
if (sourceFile.isNullOrBlank()) {
retrieveLocationTags()
consumeOpenMRSData(
dataItem = DataItem.LOCATIONS,
countAddress = EventBusAddress.OPENMRS_LOCATIONS_COUNT,
Expand Down Expand Up @@ -203,13 +199,16 @@ class OpenSRPLocationVerticle : BaseOpenSRPVerticle() {
locations.forEach { location ->
// Delete locationTags attributes for locations without tags
if (location is JsonObject && location.containsKey(LOCATION_TAGS)) {
val locationTags = location.getJsonArray(LOCATION_TAGS)
val locationTags = location.getJsonArray(LOCATION_TAGS, JsonArray())
val cleanedTags = JsonArray()
locationTags.forEach { tag ->
if (tag is JsonObject) {
when {
tag.getString(ID) == null -> location.remove(LOCATION_TAGS)
tag.getString(ID) != null -> JsonObject(Json.encodeToString(locationTagsMap[tag.getString(NAME)]))
tag.getString(ID) != null -> {
val newTag = JsonObject(Json.encodeToString(locationTagsMap[tag.getString(NAME)]))
cleanedTags.add(newTag)
}
}
}
}
Expand Down Expand Up @@ -331,22 +330,6 @@ class OpenSRPLocationVerticle : BaseOpenSRPVerticle() {
}
}

private suspend fun retrieveLocationTags() {
val locationTags =
awaitResult<HttpResponse<Buffer>?> {
webRequest(
method = HttpMethod.GET,
url = config.getString("opensrp.rest.location.tag.url"),
handler = it
)
}
?.body()
if (locationTags != null && !sourceFile.isNullOrBlank()) {
locationTagsMap =
Json.decodeFromString<List<LocationTag>>(locationTags.toString()).associateBy { it.name }
}
}

private fun validateHeaders(
headers: Array<String>,
promise: Promise<List<List<Location>>>
Expand Down

0 comments on commit 7faf4c5

Please sign in to comment.