Last active
November 8, 2019 05:41
-
-
Save smallufo/00529f640ded49814edfdf993ff04747 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package destiny | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.flow.* | |
import mu.KotlinLogging | |
import org.apache.http.client.fluent.Request | |
import java.net.URLEncoder | |
import java.util.concurrent.ExecutorService | |
import java.util.concurrent.Executors | |
import kotlin.test.Test | |
interface UrlShorter { | |
suspend fun getShortUrl(longUrl: String): String? | |
} | |
val logger = KotlinLogging.logger {} | |
class IsgdImpl : UrlShorter { | |
override suspend fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
val url = "https://is.gd/create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8")) | |
return withContext(Dispatchers.IO) { | |
logger.info("running Dispatchers.IO : {}", Thread.currentThread().name) | |
Request.Get(url).execute().returnContent().asString().also { | |
logger.info("returning {}", it) | |
} | |
} | |
} | |
} | |
class TinyImpl : UrlShorter { | |
override suspend fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
val url = "http://tinyurl.com/api-create.php?url=$longUrl" | |
return withContext(Dispatchers.IO) { | |
logger.info("running Dispatchers.IO : {}", Thread.currentThread().name) | |
Request.Get(url).execute().returnContent().asString().also { | |
logger.info("returning {}", it) | |
} | |
} | |
} | |
} | |
/** | |
* delays 10 seconds and returns null | |
*/ | |
class DumbImpl : UrlShorter { | |
override suspend fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
delay(10 * 1000) | |
return null | |
} | |
} | |
/** | |
* returns null immediately | |
*/ | |
class NullImpl : UrlShorter { | |
override suspend fun getShortUrl(longUrl: String): String? { | |
logger.info("running : {}", Thread.currentThread().name) | |
return null | |
} | |
} | |
@ExperimentalCoroutinesApi | |
@FlowPreview | |
class UrlShorterService(private val impls: List<UrlShorter>) { | |
private val es: ExecutorService = Executors.newFixedThreadPool(impls.size) | |
private val esDispatcher = es.asCoroutineDispatcher() | |
suspend fun getShortUrl(longUrl: String): String { | |
return method1(longUrl) | |
} | |
private inline fun <T, R : Any> Iterable<T>.firstNotNullResult(transform: (T) -> R?): R? { | |
for (element in this) { | |
val result = transform(element) | |
if (result != null) return result | |
} | |
return null | |
} | |
/** | |
* works OK ! | |
* | |
* 13:40:21,358 INFO NullImpl - running : main @coroutine#3 | |
* 13:40:21,380 INFO DumbImpl - running : main @coroutine#4 | |
* 13:40:21,386 INFO IsgdImpl - running : main @coroutine#5 | |
* 13:40:21,402 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#5 | |
* 13:40:21,416 INFO TinyImpl - running : main @coroutine#6 | |
* 13:40:21,419 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#6 | |
* 13:40:23,029 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo | |
* 13:40:23,031 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes | |
* 13:40:23,126 INFO UrlShorterServiceTest$testHedging$1 - result = http://tinyurl.com/389lo | |
*/ | |
private suspend fun method1(longUrl: String): String { | |
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl -> | |
flow { | |
impl.getShortUrl(longUrl)?.also { | |
emit(it) | |
} | |
}//.flowOn(esDispatcher) | |
}.first() | |
//.also { esDispatcher.cancelChildren() } // doesn't impact the result | |
} | |
/** | |
* blocked | |
* | |
* 13:22:48,343 INFO TinyImpl - running : pool-1-thread-1 @coroutine#5 | |
* 13:22:48,343 INFO NullImpl - running : pool-1-thread-2 @coroutine#2 | |
* 13:22:48,343 INFO IsgdImpl - running : pool-1-thread-4 @coroutine#4 | |
* 13:22:48,343 INFO DumbImpl - running : pool-1-thread-3 @coroutine#3 | |
* 13:22:48,356 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#5 | |
* 13:22:48,392 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#4 | |
* 13:22:50,296 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes | |
* 13:22:50,664 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo | |
* 13:22:58,359 INFO UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes | |
*/ | |
private suspend fun method2(longUrl: String): String { | |
return withContext(esDispatcher) { | |
impls.map { impl -> | |
async(esDispatcher) { | |
impl.getShortUrl(longUrl) | |
} | |
}.firstNotNullResult { it.await() } ?: longUrl | |
} | |
} | |
/** | |
* blocked | |
* | |
* 13:23:56,941 INFO destiny.DumbImpl - running : pool-1-thread-2 @coroutine#3 | |
* 13:23:56,941 INFO destiny.IsgdImpl - running : pool-1-thread-3 @coroutine#4 | |
* 13:23:56,941 INFO destiny.TinyImpl - running : pool-1-thread-4 @coroutine#5 | |
* 13:23:56,941 INFO destiny.NullImpl - running : pool-1-thread-1 @coroutine#2 | |
* 13:23:56,952 INFO destiny.TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#5 | |
* 13:23:56,953 INFO destiny.IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#4 | |
* 13:23:58,506 INFO destiny.IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes | |
* 13:23:58,910 INFO destiny.TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo | |
* 13:24:06,953 INFO destiny.UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes | |
*/ | |
private suspend fun method3(longUrl: String): String { | |
return coroutineScope { | |
impls.map { impl -> | |
async(esDispatcher) { | |
impl.getShortUrl(longUrl) | |
} | |
}.firstNotNullResult { it.await() } ?: longUrl | |
} | |
} | |
/** | |
* blocked | |
* | |
* 13:24:53,857 INFO DumbImpl - running : pool-1-thread-3 @coroutine#3 | |
* 13:24:53,858 INFO TinyImpl - running : pool-1-thread-1 @coroutine#5 | |
* 13:24:53,858 INFO NullImpl - running : pool-1-thread-2 @coroutine#2 | |
* 13:24:53,858 INFO IsgdImpl - running : pool-1-thread-4 @coroutine#4 | |
* 13:24:53,868 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-3 @coroutine#4 | |
* 13:24:53,868 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#5 | |
* 13:24:56,319 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes | |
* 13:24:56,949 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo | |
* 13:25:03,867 INFO UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes | |
*/ | |
private suspend fun method4(longUrl: String): String { | |
return withContext(esDispatcher) { | |
impls.map { impl -> | |
async { | |
impl.getShortUrl(longUrl) | |
} | |
}.firstNotNullResult { it.await() } ?: longUrl | |
} | |
} | |
/** | |
* Exception | |
* | |
* 13:25:54,615 INFO UrlShorterService$method5$2 - channel closed | |
* 13:25:54,620 INFO IsgdImpl - running : pool-1-thread-4 @coroutine#4 | |
* 13:25:54,620 INFO DumbImpl - running : pool-1-thread-3 @coroutine#3 | |
* 13:25:54,620 INFO NullImpl - running : pool-1-thread-2 @coroutine#2 | |
* 13:25:54,621 INFO TinyImpl - running : pool-1-thread-1 @coroutine#5 | |
* 13:25:54,629 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#4 | |
* 13:25:54,629 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#5 | |
* 13:25:56,880 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo | |
* 13:25:57,214 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes | |
* | |
* kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed | |
* | |
*/ | |
private suspend fun method5(longUrl: String): String { | |
val channel = Channel<String>() | |
withContext(esDispatcher) { | |
impls.forEach { impl -> | |
launch { | |
impl.getShortUrl(longUrl)?.also { | |
channel.send(it) | |
} | |
} | |
} | |
channel.close() | |
logger.info("channel closed") | |
} | |
return channel.consumeAsFlow().first() | |
} | |
/** | |
* OK | |
* | |
* 13:27:18,505 INFO NullImpl - running : main @coroutine#2 | |
* 13:27:18,511 INFO DumbImpl - running : main @coroutine#3 | |
* 13:27:18,515 INFO IsgdImpl - running : main @coroutine#4 | |
* 13:27:18,523 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#4 | |
* 13:27:18,524 INFO TinyImpl - running : main @coroutine#5 | |
* 13:27:18,549 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#5 | |
* 13:27:20,684 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes | |
* 13:27:20,893 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo | |
* 13:27:20,895 INFO UrlShorterServiceTest$testHedging$1 - result = https://is.gd/EuvYes | |
*/ | |
private suspend fun method6(longUrl: String): String { | |
return coroutineScope { | |
val chan = Channel<String?>() | |
impls.forEach { impl -> | |
launch { | |
try { | |
impl.getShortUrl(longUrl).also { chan.send(it) } | |
} catch (e: Exception) { | |
chan.send(null) | |
} | |
} | |
} | |
(1..impls.size).forEach { _ -> | |
chan.receive()?.also { shortUrl -> | |
coroutineContext[Job]!!.cancelChildren() | |
return@coroutineScope shortUrl | |
} | |
} | |
throw Exception("All services failed") | |
} | |
} | |
} | |
/** | |
* spring flux | |
* https://github.com/spring-tips/hedging/blob/master/client/src/main/java/com/example/client/ClientApplication.java | |
*/ | |
@ExperimentalCoroutinesApi | |
@FlowPreview | |
class UrlShorterServiceTest { | |
@Test | |
fun testHedging() { | |
val impls = listOf(NullImpl(), DumbImpl(), IsgdImpl(), TinyImpl()) | |
val service = UrlShorterService(impls) | |
runBlocking { | |
service.getShortUrl("https://www.google.com").also { | |
logger.info("result = {}", it) | |
} | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment