Skip to content

Commit

Permalink
Make string interning available at ingestion [DPP-711] (#11645)
Browse files Browse the repository at this point in the history
We need to support mapping to internal ids at ingestion.

* Wire StringInterning to Schema
* Wire StringInterning to IngestionStorageBackend
* Fixing tests
* Renames MockStringInterning (post-review work from different PR)

changelog_begin
changelog_end
  • Loading branch information
nmarton-da authored Nov 11, 2021
1 parent 7296ba4 commit 0f0a865
Show file tree
Hide file tree
Showing 14 changed files with 254 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,12 @@ private[platform] case class ParallelIndexerSubscription[DB_BATCH](
metrics,
),
batchingParallelism = batchingParallelism,
batcher = batcherExecutor.execute(batcher(ingestionStorageBackend.batch, metrics)),
batcher = batcherExecutor.execute(
batcher(ingestionStorageBackend.batch(_, stringInterningView), metrics)
),
ingestingParallelism = ingestionParallelism,
ingester = ingester(ingestionStorageBackend.insertBatch, dbDispatcher, metrics),
tailer = tailer(ingestionStorageBackend.batch(Vector.empty)),
tailer = tailer(ingestionStorageBackend.batch(Vector.empty, stringInterningView)),
tailingRateLimitPerSecond = tailingRateLimitPerSecond,
ingestTail =
ingestTail[DB_BATCH](parameterStorageBackend.updateLedgerEnd, dbDispatcher, metrics),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private[appendonlydao] case class SequentialWriteDaoImpl[DB_BATCH](
}

dbDtosWithStringInterning
.pipe(ingestionStorageBackend.batch)
.pipe(ingestionStorageBackend.batch(_, stringInterningView))
.pipe(ingestionStorageBackend.insertBatch(connection, _))

parameterStorageBackend.updateLedgerEnd(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.daml.platform.store.backend

import java.sql.Connection

import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.configuration.Configuration
Expand All @@ -20,9 +21,10 @@ import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeP
import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
import com.daml.platform.store.interfaces.LedgerDaoContractsReader.KeyState
import com.daml.platform.store.interning.StringInterning
import com.daml.scalautil.NeverEqualsOverride

import javax.sql.DataSource

import scala.annotation.unused
import scala.util.Try

Expand Down Expand Up @@ -55,9 +57,10 @@ trait IngestionStorageBackend[DB_BATCH] {
* This should be pure CPU logic without IO.
*
* @param dbDtos is a collection of DbDto from which the batch is formed
* @param stringInterning will be used to switch ingested strings to the internal integers
* @return the database-specific batch DTO, which can be inserted via insertBatch
*/
def batch(dbDtos: Vector[DbDto]): DB_BATCH
def batch(dbDtos: Vector[DbDto], stringInterning: StringInterning): DB_BATCH

/** Using a JDBC connection, a batch will be inserted into the database.
* No significant CPU load, mostly blocking JDBC communication with the database backend.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package com.daml.platform.store.backend.common
import java.lang
import java.sql.PreparedStatement

import com.daml.platform.store.interning.StringInterning

import scala.reflect.ClassTag

/** @tparam FROM is an arbitrary type from which we can extract the data of interest for the particular column
Expand All @@ -19,13 +21,16 @@ import scala.reflect.ClassTag
private[backend] abstract class Field[FROM, TO, CONVERTED](implicit
classTag: ClassTag[CONVERTED]
) {
def extract: FROM => TO
def extract: StringInterning => FROM => TO
def convert: TO => CONVERTED
def selectFieldExpression(inputFieldName: String): String = inputFieldName

final def toArray(input: Vector[FROM]): Array[CONVERTED] =
final def toArray(
input: Vector[FROM],
stringInterning: StringInterning,
): Array[CONVERTED] =
input.view
.map(extract andThen convert)
.map(extract(stringInterning) andThen convert)
.toArray(classTag)

final def prepareData(preparedStatement: PreparedStatement, index: Int, value: Any): Unit =
Expand Down Expand Up @@ -53,51 +58,67 @@ private[backend] trait TrivialOptionalField[FROM, TO >: Null <: AnyRef]
override def convert: Option[TO] => TO = _.orNull
}

private[backend] case class StringField[FROM](extract: FROM => String)
private[backend] case class StringField[FROM](extract: StringInterning => FROM => String)
extends TrivialField[FROM, String]

private[backend] case class StringOptional[FROM](extract: FROM => Option[String])
private[backend] case class StringOptional[FROM](extract: StringInterning => FROM => Option[String])
extends TrivialOptionalField[FROM, String]

private[backend] case class Bytea[FROM](extract: FROM => Array[Byte])
private[backend] case class Bytea[FROM](extract: StringInterning => FROM => Array[Byte])
extends TrivialField[FROM, Array[Byte]]

private[backend] case class ByteaOptional[FROM](extract: FROM => Option[Array[Byte]])
extends TrivialOptionalField[FROM, Array[Byte]]
private[backend] case class ByteaOptional[FROM](
extract: StringInterning => FROM => Option[Array[Byte]]
) extends TrivialOptionalField[FROM, Array[Byte]]

private[backend] case class Integer[FROM](extract: FROM => Int) extends TrivialField[FROM, Int]
private[backend] case class Integer[FROM](extract: StringInterning => FROM => Int)
extends TrivialField[FROM, Int]

private[backend] case class IntOptional[FROM](extract: FROM => Option[Int])
private[backend] case class IntOptional[FROM](extract: StringInterning => FROM => Option[Int])
extends Field[FROM, Option[Int], java.lang.Integer] {
override def convert: Option[Int] => java.lang.Integer = _.map(Int.box).orNull
}

private[backend] case class Bigint[FROM](extract: FROM => Long) extends TrivialField[FROM, Long]
private[backend] case class Bigint[FROM](extract: StringInterning => FROM => Long)
extends TrivialField[FROM, Long]

private[backend] case class BigintOptional[FROM](extract: FROM => Option[Long])
private[backend] case class BigintOptional[FROM](extract: StringInterning => FROM => Option[Long])
extends Field[FROM, Option[Long], java.lang.Long] {
override def convert: Option[Long] => java.lang.Long = _.map(Long.box).orNull
}

private[backend] case class SmallintOptional[FROM](extract: FROM => Option[Int])
private[backend] case class SmallintOptional[FROM](extract: StringInterning => FROM => Option[Int])
extends Field[FROM, Option[Int], java.lang.Integer] {
override def convert: Option[Int] => java.lang.Integer = _.map(Int.box).orNull
}

private[backend] case class BooleanField[FROM](extract: FROM => Boolean)
private[backend] case class BooleanField[FROM](extract: StringInterning => FROM => Boolean)
extends TrivialField[FROM, Boolean]

private[backend] case class BooleanOptional[FROM](extract: FROM => Option[Boolean])
extends Field[FROM, Option[Boolean], java.lang.Boolean] {
private[backend] case class BooleanOptional[FROM](
extract: StringInterning => FROM => Option[Boolean]
) extends Field[FROM, Option[Boolean], java.lang.Boolean] {
override def convert: Option[Boolean] => lang.Boolean = _.map(Boolean.box).orNull
}

private[backend] case class StringArray[FROM](extract: FROM => Iterable[String])
private[backend] case class StringArray[FROM](extract: StringInterning => FROM => Iterable[String])
extends Field[FROM, Iterable[String], Array[String]] {
override def convert: Iterable[String] => Array[String] = _.toArray
}

private[backend] case class StringArrayOptional[FROM](extract: FROM => Option[Iterable[String]])
extends Field[FROM, Option[Iterable[String]], Array[String]] {
private[backend] case class StringArrayOptional[FROM](
extract: StringInterning => FROM => Option[Iterable[String]]
) extends Field[FROM, Option[Iterable[String]], Array[String]] {
override def convert: Option[Iterable[String]] => Array[String] = _.map(_.toArray).orNull
}

private[backend] case class IntArray[FROM](extract: StringInterning => FROM => Iterable[Int])
extends Field[FROM, Iterable[Int], Array[Int]] {
override def convert: Iterable[Int] => Array[Int] = _.toArray
}

private[backend] case class IntArrayOptional[FROM](
extract: StringInterning => FROM => Option[Iterable[Int]]
) extends Field[FROM, Option[Iterable[Int]], Array[Int]] {
override def convert: Option[Iterable[Int]] => Array[Int] = _.map(_.toArray).orNull
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import java.sql.Connection

import anorm.{SQL, SqlQuery}
import com.daml.platform.store.backend.{DbDto, IngestionStorageBackend, ParameterStorageBackend}
import com.daml.platform.store.interning.StringInterning

private[backend] class IngestionStorageBackendTemplate(schema: Schema[DbDto])
extends IngestionStorageBackend[AppendOnlySchema.Batch] {
Expand Down Expand Up @@ -48,6 +49,9 @@ private[backend] class IngestionStorageBackendTemplate(schema: Schema[DbDto])
): Unit =
schema.executeUpdate(dbBatch, connection)

override def batch(dbDtos: Vector[DbDto]): AppendOnlySchema.Batch =
schema.prepareData(dbDtos)
override def batch(
dbDtos: Vector[DbDto],
stringInterning: StringInterning,
): AppendOnlySchema.Batch =
schema.prepareData(dbDtos, stringInterning)
}
Loading

0 comments on commit 0f0a865

Please sign in to comment.