Skip to content

Commit

Permalink
Coroutines based database-vertx implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitrySadchikov authored and Dmitry Sadchikov committed Mar 25, 2023
1 parent 0533f33 commit fdda87f
Show file tree
Hide file tree
Showing 27 changed files with 788 additions and 47 deletions.
4 changes: 2 additions & 2 deletions common/src/main/java/ru/tinkoff/kora/common/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ public static Context current(CoroutineContext ctx) {
public static CoroutineContext inject(CoroutineContext cctx, Context context) {
var reactorContext = Reactor.inject(reactor.util.context.Context.of(Context.class, cctx), context);
var coroutineContext = (CoroutineContext) (Object) ReactorContextKt.asCoroutineContext(reactorContext);

return cctx.plus(coroutineContext).plus(asCoroutineContext(context));
var contextElement = new CoroutineContextElement(context);
return cctx.plus(contextElement).plus(coroutineContext).plus(asCoroutineContext(context));
}

public static CoroutineContext asCoroutineContext(Context ctx) {
Expand Down
2 changes: 2 additions & 0 deletions database/database-annotation-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ dependencies {
testImplementation testFixtures(project(':annotation-processor-common'))
testImplementation project(':database:database-common')
testImplementation project(':database:database-jdbc')
testImplementation project(':database:database-vertx-common')
testImplementation project(':database:database-vertx')
testImplementation project(':database:database-vertx-coroutines')
testImplementation project(':database:database-r2dbc')
testImplementation project(':database:database-cassandra')
}
Expand Down
2 changes: 2 additions & 0 deletions database/database-symbol-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ dependencies {
testImplementation testFixtures(project(':annotation-processor-common'))
testImplementation project(':database:database-common')
testImplementation project(':database:database-jdbc')
testImplementation project(':database:database-vertx-common')
testImplementation project(':database:database-vertx')
testImplementation project(':database:database-vertx-coroutines')
testImplementation project(':database:database-r2dbc')
testImplementation project(':database:database-cassandra')
testImplementation(libs.kotlin.stdlib.lib)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ import com.google.devtools.ksp.processing.KSPLogger
import com.google.devtools.ksp.processing.Resolver
import com.google.devtools.ksp.symbol.ClassKind
import com.google.devtools.ksp.symbol.KSClassDeclaration
import com.squareup.kotlinpoet.*
import com.squareup.kotlinpoet.AnnotationSpec
import com.squareup.kotlinpoet.CodeBlock
import com.squareup.kotlinpoet.FunSpec
import com.squareup.kotlinpoet.ParameterSpec
import com.squareup.kotlinpoet.TypeSpec
import com.squareup.kotlinpoet.ksp.addOriginatingKSFile
import com.squareup.kotlinpoet.ksp.toClassName
import com.squareup.kotlinpoet.ksp.toTypeName
import org.slf4j.LoggerFactory
import ru.tinkoff.kora.database.symbol.processor.cassandra.CassandraRepositoryGenerator
import ru.tinkoff.kora.database.symbol.processor.jdbc.JdbcRepositoryGenerator
import ru.tinkoff.kora.database.symbol.processor.r2dbc.R2DbcRepositoryGenerator
import ru.tinkoff.kora.database.symbol.processor.vertx.VertxCoroutineBasedRepositoryGenerator
import ru.tinkoff.kora.database.symbol.processor.vertx.VertxRepositoryGenerator
import ru.tinkoff.kora.kora.app.ksp.extendsKeepAop
import ru.tinkoff.kora.ksp.common.KspCommonUtils.generated
Expand All @@ -27,6 +32,7 @@ class RepositoryBuilder(
private val availableGenerators = listOf(
JdbcRepositoryGenerator(resolver),
VertxRepositoryGenerator(resolver, kspLogger),
VertxCoroutineBasedRepositoryGenerator(resolver, kspLogger),
R2DbcRepositoryGenerator(resolver),
CassandraRepositoryGenerator(resolver),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package ru.tinkoff.kora.database.symbol.processor.vertx

import com.google.devtools.ksp.processing.KSPLogger
import com.google.devtools.ksp.processing.Resolver
import com.google.devtools.ksp.symbol.KSClassDeclaration
import com.google.devtools.ksp.symbol.KSFunction
import com.google.devtools.ksp.symbol.KSFunctionDeclaration
import com.squareup.kotlinpoet.AnnotationSpec
import com.squareup.kotlinpoet.ClassName
import com.squareup.kotlinpoet.CodeBlock
import com.squareup.kotlinpoet.FunSpec
import com.squareup.kotlinpoet.KModifier
import com.squareup.kotlinpoet.MemberName
import com.squareup.kotlinpoet.ParameterSpec
import com.squareup.kotlinpoet.ParameterizedTypeName.Companion.parameterizedBy
import com.squareup.kotlinpoet.PropertySpec
import com.squareup.kotlinpoet.TypeSpec
import com.squareup.kotlinpoet.ksp.toTypeName
import ru.tinkoff.kora.database.symbol.processor.DbUtils
import ru.tinkoff.kora.database.symbol.processor.DbUtils.addMapper
import ru.tinkoff.kora.database.symbol.processor.DbUtils.findQueryMethods
import ru.tinkoff.kora.database.symbol.processor.DbUtils.parseExecutorTag
import ru.tinkoff.kora.database.symbol.processor.DbUtils.queryMethodBuilder
import ru.tinkoff.kora.database.symbol.processor.DbUtils.resultMapperName
import ru.tinkoff.kora.database.symbol.processor.DbUtils.updateCount
import ru.tinkoff.kora.database.symbol.processor.Mapper
import ru.tinkoff.kora.database.symbol.processor.QueryWithParameters
import ru.tinkoff.kora.database.symbol.processor.RepositoryGenerator
import ru.tinkoff.kora.database.symbol.processor.model.QueryParameter
import ru.tinkoff.kora.database.symbol.processor.model.QueryParameterParser
import ru.tinkoff.kora.ksp.common.AnnotationUtils.findAnnotation
import ru.tinkoff.kora.ksp.common.AnnotationUtils.findValue
import ru.tinkoff.kora.ksp.common.CommonClassNames
import ru.tinkoff.kora.ksp.common.CommonClassNames.isFlow
import ru.tinkoff.kora.ksp.common.CommonClassNames.isList
import ru.tinkoff.kora.ksp.common.FieldFactory
import ru.tinkoff.kora.ksp.common.FunctionUtils.isFlow
import ru.tinkoff.kora.ksp.common.FunctionUtils.isSuspend
import ru.tinkoff.kora.ksp.common.parseMappingData

class VertxCoroutineBasedRepositoryGenerator(private val resolver: Resolver, private val kspLogger: KSPLogger) : RepositoryGenerator {
private val runBlocking = MemberName("kotlinx.coroutines", "runBlocking")
private val repositoryInterface = resolver.getClassDeclarationByName(resolver.getKSNameFromString(VertxTypes.Coroutines.repository.canonicalName))?.asStarProjectedType()
override fun repositoryInterface() = repositoryInterface

override fun generate(repositoryType: KSClassDeclaration, typeBuilder: TypeSpec.Builder, constructorBuilder: FunSpec.Builder): TypeSpec {
this.enrichWithExecutor(repositoryType, typeBuilder, constructorBuilder)
val repositoryResolvedType = repositoryType.asStarProjectedType()
val resultMappers = FieldFactory(typeBuilder, constructorBuilder, "_result_mapper_")
val parameterMappers = FieldFactory(typeBuilder, constructorBuilder, "_parameter_mapper_")
for (method in repositoryType.findQueryMethods()) {
val methodType = method.asMemberOf(repositoryResolvedType)
val parameters = QueryParameterParser.parse(listOf(VertxTypes.sqlConnection, VertxTypes.sqlClient), method, methodType)
val queryAnnotation = method.findAnnotation(DbUtils.queryAnnotation)!!
val queryString = queryAnnotation.findValue<String>("value")!!
val query = QueryWithParameters.parse(queryString, parameters)
val resultMapperName = this.parseResultMapper(method, parameters, methodType)?.let { resultMappers.addMapper(it) }
DbUtils.parseParameterMappers(method, parameters, query, VertxTypes.parameterColumnMapper) { VertxNativeTypes.findNativeType(it.toTypeName()) != null }
.forEach { parameterMappers.addMapper(it) }
val methodSpec = this.generate(method, methodType, query, parameters, resultMapperName, parameterMappers)
typeBuilder.addFunction(methodSpec)
}

return typeBuilder
.addAnnotation(
AnnotationSpec.builder(ClassName("kotlin", "OptIn"))
.addMember("%T::class", ClassName("kotlinx.coroutines", "ExperimentalCoroutinesApi"))
.build()
)
.primaryConstructor(constructorBuilder.build())
.build()
}

private fun generate(funDeclaration: KSFunctionDeclaration, function: KSFunction, query: QueryWithParameters, parameters: List<QueryParameter>, resultMapperName: String?, parameterMappers: FieldFactory): FunSpec {
var sql = query.rawQuery
query.parameters.indices.asSequence()
.map { query.parameters[it].sqlParameterName to "$" + (it + 1) }
.sortedByDescending { it.first.length }
.forEach { sql = sql.replace(":" + it.first, it.second) }

val b = funDeclaration.queryMethodBuilder(resolver)
b.addCode("val _query = %T(\n %S,\n %S\n)\n", DbUtils.queryContext, query.rawQuery, sql)
val batchParam = parameters.firstOrNull { it is QueryParameter.BatchParameter }
val isSuspend = funDeclaration.isSuspend()
val isFlow = funDeclaration.isFlow()
ParametersToTupleBuilder.generate(b, query, funDeclaration, parameters, batchParam, parameterMappers)
val connectionParameter = parameters.asSequence().filterIsInstance<QueryParameter.ConnectionParameter>().firstOrNull()?.variable?.name?.asString()

if (isSuspend) {
b.addCode("return ")
} else {
b.addCode("return %M {\n", runBlocking)
}

if (batchParam != null) {
if (connectionParameter == null) {
b.addCode("%T.awaitBatch(this.vertxConnectionFactory, _query, _batchParams)", VertxTypes.Coroutines.repositoryHelper)
} else {
b.addCode("%T.awaitBatch(%N, this.vertxConnectionFactory.telemetry(), _query, _batchParams)", VertxTypes.Coroutines.repositoryHelper, connectionParameter)
}
if (function.returnType == resolver.builtIns.unitType) {
b.addCode(" \n.let {}")
}
} else if (isFlow) {
if (connectionParameter == null) {
b.addCode("%T.flow(this.vertxConnectionFactory, _query, _tuple, %N)", VertxTypes.Coroutines.repositoryHelper, resultMapperName)
} else {
b.addCode("%T.flow(%N, this.vertxConnectionFactory.telemetry(), _query, _tuple, %N)", VertxTypes.Coroutines.repositoryHelper, connectionParameter, resultMapperName)
}
} else {
if (function.returnType == resolver.builtIns.unitType) {
if (connectionParameter == null) {
b.addCode("%T.await(this.vertxConnectionFactory, _query, _tuple)", VertxTypes.Coroutines.repositoryHelper)
} else {
b.addCode("%T.await(%N, this.vertxConnectionFactory.telemetry(), _query, _tuple)", VertxTypes.Coroutines.repositoryHelper, connectionParameter)
}
} else {
if (connectionParameter == null) {
b.addCode("%T.awaitSingleOrNull(this.vertxConnectionFactory, _query, _tuple", VertxTypes.Coroutines.repositoryHelper)
} else {
b.addCode("%T.awaitSingleOrNull(%N, this.vertxConnectionFactory.telemetry(), _query, _tuple", VertxTypes.Coroutines.repositoryHelper, connectionParameter)
}
if (function.returnType?.toTypeName() == updateCount) {
b.addCode(") { %T.extractUpdateCount(it) }", VertxTypes.rowSetMapper)
} else {
b.addCode(", %N)", resultMapperName)
}
if (function.returnType?.isMarkedNullable == false) {
b.addCode("!!")
}
}
}
b.addCode("\n")
if (!isSuspend) {
b.addCode(" }\n")
}
return b.build()
}

private fun parseResultMapper(method: KSFunctionDeclaration, parameters: List<QueryParameter>, methodType: KSFunction): Mapper? {
for (parameter in parameters) {
if (parameter is QueryParameter.BatchParameter) {
return null
}
}
val returnType = methodType.returnType!!
val mapperName = method.resultMapperName()
val mappings = method.parseMappingData()
val resultSetMapper = mappings.getMapping(VertxTypes.rowSetMapper)
val rowMapper = mappings.getMapping(VertxTypes.rowMapper)
if (returnType.isFlow()) {
val flowParam = returnType.arguments[0]
val returnTypeName = flowParam.toTypeName().copy(false)
val mapperType = VertxTypes.rowMapper.parameterizedBy(returnTypeName)
if (rowMapper != null) {
return Mapper(rowMapper, mapperType, mapperName)
}
return Mapper(mapperType, mapperName)
}
val mapperType = VertxTypes.rowSetMapper.parameterizedBy(returnType.toTypeName())
if (resultSetMapper != null) {
return Mapper(resultSetMapper, mapperType, mapperName)
}
if (rowMapper != null) {
if (returnType.isList()) {
return Mapper(rowMapper, mapperType, mapperName) {
CodeBlock.of("%T.listRowSetMapper(%L)", VertxTypes.rowSetMapper, it)
}
} else {
return Mapper(rowMapper, mapperType, mapperName) {
CodeBlock.of("%T.singleRowSetMapper(%L)", VertxTypes.rowSetMapper, it)
}
}
}
if (returnType == resolver.builtIns.unitType) {
return null
}
if (returnType.toTypeName() == updateCount) {
return null
}
return Mapper(mapperType, mapperName)
}

private fun enrichWithExecutor(repositoryElement: KSClassDeclaration, builder: TypeSpec.Builder, constructorBuilder: FunSpec.Builder) {
builder.addSuperinterface(VertxTypes.Coroutines.repository)

builder.addProperty(
PropertySpec.builder("vertxConnectionFactory", VertxTypes.Coroutines.connectionFactory, KModifier.OVERRIDE)
.build()
)
val executorTag = repositoryElement.parseExecutorTag()
if (executorTag != null) {
constructorBuilder.addParameter(
ParameterSpec.builder("_vertxConnectionFactory", VertxTypes.Coroutines.connectionFactory).addAnnotation(
AnnotationSpec.builder(CommonClassNames.tag).addMember("value = %L", executorTag).build()
).build()
)
} else {
constructorBuilder.addParameter("_vertxConnectionFactory", VertxTypes.Coroutines.connectionFactory)
}
constructorBuilder.addStatement("this.vertxConnectionFactory = _vertxConnectionFactory")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,10 @@ object VertxTypes {
val rowMapper = ClassName("ru.tinkoff.kora.database.vertx.mapper.result", "VertxRowMapper")
val resultColumnMapper = ClassName("ru.tinkoff.kora.database.vertx.mapper.result", "VertxResultColumnMapper")
val parameterColumnMapper = ClassName("ru.tinkoff.kora.database.vertx.mapper.parameter", "VertxParameterColumnMapper")

object Coroutines {
val connectionFactory = ClassName("ru.tinkoff.kora.database.vertx.coroutines", "VertxConnectionFactory")
val repository = ClassName("ru.tinkoff.kora.database.vertx.coroutines", "VertxRepository")
val repositoryHelper = ClassName("ru.tinkoff.kora.database.vertx.coroutines", "VertxRepositoryHelper")
}
}
17 changes: 17 additions & 0 deletions database/database-vertx-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apply from: "${project.rootDir}/kotlin-plugin.gradle"

dependencies {
api project(":database:database-common")
api project(":vertx-common")
api project(":common")

api(libs.vertx.pg.client)
compileOnly(libs.kotlin.stdlib.lib)
compileOnly(libs.kotlin.coroutines.reactor)
compileOnly(libs.kotlin.coroutines.jdk8)
implementation "com.ongres.scram:common:2.1"
implementation "com.ongres.scram:client:2.1"

testImplementation project(':test:test-postgres')
testImplementation libs.reactor.test
}
19 changes: 19 additions & 0 deletions database/database-vertx-coroutines/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apply from: "${project.rootDir}/kotlin-plugin.gradle"

dependencies {
api project(":database:database-vertx-common")
api project(":vertx-common")
api project(":common")

api(libs.vertx.pg.client)
api(libs.vertx.kotlin.coroutines)
compileOnly(libs.kotlin.stdlib.lib)
compileOnly(libs.kotlin.coroutines.reactor)
compileOnly(libs.kotlin.coroutines.reactive)
compileOnly(libs.kotlin.coroutines.jdk8)
implementation "com.ongres.scram:common:2.1"
implementation "com.ongres.scram:client:2.1"

testImplementation project(':test:test-postgres')
testImplementation libs.reactor.test
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ru.tinkoff.kora.database.vertx.coroutines;

import com.typesafe.config.Config;
import io.netty.channel.EventLoopGroup;
import ru.tinkoff.kora.config.common.extractor.ConfigValueExtractor;
import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetryFactory;
import ru.tinkoff.kora.database.vertx.VertxDatabaseBaseModule;
import ru.tinkoff.kora.database.vertx.VertxDatabaseConfig;

public interface VertxDatabaseModule extends VertxDatabaseBaseModule {
default VertxDatabaseConfig vertxDatabaseConfig(Config config, ConfigValueExtractor<VertxDatabaseConfig> extractor) {
var value = config.getValue("db");
return extractor.extract(value);
}

default VertxDatabase vertxDatabase(VertxDatabaseConfig vertxDatabaseConfig, EventLoopGroup eventLoopGroup, DataBaseTelemetryFactory telemetryFactory) {
return new VertxDatabase(vertxDatabaseConfig, eventLoopGroup, telemetryFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ru.tinkoff.kora.database.vertx.coroutines

import io.vertx.sqlclient.Pool
import io.vertx.sqlclient.SqlConnection
import kotlinx.coroutines.ExperimentalCoroutinesApi
import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetry

@ExperimentalCoroutinesApi
interface VertxConnectionFactory {

suspend fun currentConnection(): SqlConnection?

suspend fun newConnection(): SqlConnection

fun pool(): Pool

fun telemetry(): DataBaseTelemetry

suspend fun <T> withConnection(callback: suspend (SqlConnection) -> T): T

suspend fun <T> inTx(callback: suspend (SqlConnection) -> T): T
}
Loading

0 comments on commit fdda87f

Please sign in to comment.