Skip to content

Instantly share code, notes, and snippets.

@smallufo
Last active November 8, 2019 05:41
Show Gist options
  • Save smallufo/00529f640ded49814edfdf993ff04747 to your computer and use it in GitHub Desktop.
Save smallufo/00529f640ded49814edfdf993ff04747 to your computer and use it in GitHub Desktop.
package destiny
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import mu.KotlinLogging
import org.apache.http.client.fluent.Request
import java.net.URLEncoder
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.test.Test
interface UrlShorter {
suspend fun getShortUrl(longUrl: String): String?
}
val logger = KotlinLogging.logger {}
class IsgdImpl : UrlShorter {
override suspend fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
val url = "https://is.gd/create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8"))
return withContext(Dispatchers.IO) {
logger.info("running Dispatchers.IO : {}", Thread.currentThread().name)
Request.Get(url).execute().returnContent().asString().also {
logger.info("returning {}", it)
}
}
}
}
class TinyImpl : UrlShorter {
override suspend fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
val url = "http://tinyurl.com/api-create.php?url=$longUrl"
return withContext(Dispatchers.IO) {
logger.info("running Dispatchers.IO : {}", Thread.currentThread().name)
Request.Get(url).execute().returnContent().asString().also {
logger.info("returning {}", it)
}
}
}
}
/**
* delays 10 seconds and returns null
*/
class DumbImpl : UrlShorter {
override suspend fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
delay(10 * 1000)
return null
}
}
/**
* returns null immediately
*/
class NullImpl : UrlShorter {
override suspend fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
return null
}
}
@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterService(private val impls: List<UrlShorter>) {
private val es: ExecutorService = Executors.newFixedThreadPool(impls.size)
private val esDispatcher = es.asCoroutineDispatcher()
suspend fun getShortUrl(longUrl: String): String {
return method1(longUrl)
}
private inline fun <T, R : Any> Iterable<T>.firstNotNullResult(transform: (T) -> R?): R? {
for (element in this) {
val result = transform(element)
if (result != null) return result
}
return null
}
/**
* works OK !
*
* 13:40:21,358 INFO NullImpl - running : main @coroutine#3
* 13:40:21,380 INFO DumbImpl - running : main @coroutine#4
* 13:40:21,386 INFO IsgdImpl - running : main @coroutine#5
* 13:40:21,402 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#5
* 13:40:21,416 INFO TinyImpl - running : main @coroutine#6
* 13:40:21,419 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#6
* 13:40:23,029 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo
* 13:40:23,031 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes
* 13:40:23,126 INFO UrlShorterServiceTest$testHedging$1 - result = http://tinyurl.com/389lo
*/
private suspend fun method1(longUrl: String): String {
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
flow {
impl.getShortUrl(longUrl)?.also {
emit(it)
}
}//.flowOn(esDispatcher)
}.first()
//.also { esDispatcher.cancelChildren() } // doesn't impact the result
}
/**
* blocked
*
* 13:22:48,343 INFO TinyImpl - running : pool-1-thread-1 @coroutine#5
* 13:22:48,343 INFO NullImpl - running : pool-1-thread-2 @coroutine#2
* 13:22:48,343 INFO IsgdImpl - running : pool-1-thread-4 @coroutine#4
* 13:22:48,343 INFO DumbImpl - running : pool-1-thread-3 @coroutine#3
* 13:22:48,356 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#5
* 13:22:48,392 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#4
* 13:22:50,296 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes
* 13:22:50,664 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo
* 13:22:58,359 INFO UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes
*/
private suspend fun method2(longUrl: String): String {
return withContext(esDispatcher) {
impls.map { impl ->
async(esDispatcher) {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
/**
* blocked
*
* 13:23:56,941 INFO destiny.DumbImpl - running : pool-1-thread-2 @coroutine#3
* 13:23:56,941 INFO destiny.IsgdImpl - running : pool-1-thread-3 @coroutine#4
* 13:23:56,941 INFO destiny.TinyImpl - running : pool-1-thread-4 @coroutine#5
* 13:23:56,941 INFO destiny.NullImpl - running : pool-1-thread-1 @coroutine#2
* 13:23:56,952 INFO destiny.TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#5
* 13:23:56,953 INFO destiny.IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#4
* 13:23:58,506 INFO destiny.IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes
* 13:23:58,910 INFO destiny.TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo
* 13:24:06,953 INFO destiny.UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes
*/
private suspend fun method3(longUrl: String): String {
return coroutineScope {
impls.map { impl ->
async(esDispatcher) {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
/**
* blocked
*
* 13:24:53,857 INFO DumbImpl - running : pool-1-thread-3 @coroutine#3
* 13:24:53,858 INFO TinyImpl - running : pool-1-thread-1 @coroutine#5
* 13:24:53,858 INFO NullImpl - running : pool-1-thread-2 @coroutine#2
* 13:24:53,858 INFO IsgdImpl - running : pool-1-thread-4 @coroutine#4
* 13:24:53,868 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-3 @coroutine#4
* 13:24:53,868 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#5
* 13:24:56,319 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes
* 13:24:56,949 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo
* 13:25:03,867 INFO UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes
*/
private suspend fun method4(longUrl: String): String {
return withContext(esDispatcher) {
impls.map { impl ->
async {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
/**
* Exception
*
* 13:25:54,615 INFO UrlShorterService$method5$2 - channel closed
* 13:25:54,620 INFO IsgdImpl - running : pool-1-thread-4 @coroutine#4
* 13:25:54,620 INFO DumbImpl - running : pool-1-thread-3 @coroutine#3
* 13:25:54,620 INFO NullImpl - running : pool-1-thread-2 @coroutine#2
* 13:25:54,621 INFO TinyImpl - running : pool-1-thread-1 @coroutine#5
* 13:25:54,629 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#4
* 13:25:54,629 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#5
* 13:25:56,880 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo
* 13:25:57,214 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes
*
* kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
*
*/
private suspend fun method5(longUrl: String): String {
val channel = Channel<String>()
withContext(esDispatcher) {
impls.forEach { impl ->
launch {
impl.getShortUrl(longUrl)?.also {
channel.send(it)
}
}
}
channel.close()
logger.info("channel closed")
}
return channel.consumeAsFlow().first()
}
/**
* OK
*
* 13:27:18,505 INFO NullImpl - running : main @coroutine#2
* 13:27:18,511 INFO DumbImpl - running : main @coroutine#3
* 13:27:18,515 INFO IsgdImpl - running : main @coroutine#4
* 13:27:18,523 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#4
* 13:27:18,524 INFO TinyImpl - running : main @coroutine#5
* 13:27:18,549 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#5
* 13:27:20,684 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes
* 13:27:20,893 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo
* 13:27:20,895 INFO UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes
*/
private suspend fun method6(longUrl: String): String {
return coroutineScope {
val chan = Channel<String?>()
impls.forEach { impl ->
launch {
try {
impl.getShortUrl(longUrl).also { chan.send(it) }
} catch (e: Exception) {
chan.send(null)
}
}
}
(1..impls.size).forEach { _ ->
chan.receive()?.also { shortUrl ->
coroutineContext[Job]!!.cancelChildren()
return@coroutineScope shortUrl
}
}
throw Exception("All services failed")
}
}
}
/**
* spring flux
* https://github.com/spring-tips/hedging/blob/master/client/src/main/java/com/example/client/ClientApplication.java
*/
@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterServiceTest {
@Test
fun testHedging() {
val impls = listOf(NullImpl(), DumbImpl(), IsgdImpl(), TinyImpl())
val service = UrlShorterService(impls)
runBlocking {
service.getShortUrl("https://www.google.com").also {
logger.info("result = {}", it)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment