Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix contract_tpid_fkey-related race condition #11330

Merged
merged 8 commits into from
Oct 22, 2021
Prev Previous commit
Next Next commit
sequential simulation
  • Loading branch information
S11001001 committed Oct 21, 2021
commit a725fc430c3049a76a0df30d29beeedb51c3e8fc
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ import org.scalatest.{Assertion, AsyncTestSuite, BeforeAndAfterAll, Inside}
import org.scalatest.matchers.should.Matchers
import scalaz.std.list._

import java.util.concurrent.CyclicBarrier
import java.util.concurrent.TimeUnit.SECONDS
import scala.collection.compat._
import scala.concurrent.Future
import scala.util.Try

abstract class AbstractDatabaseIntegrationTest extends AsyncFreeSpecLike with BeforeAndAfterAll {
this: AsyncTestSuite with Matchers with Inside =>
Expand Down Expand Up @@ -215,24 +212,28 @@ abstract class AbstractDatabaseIntegrationTest extends AsyncFreeSpecLike with Be
"doesn't cache uncommitted template IDs" in {
import dbbackend.Queries.DBContract, spray.json.{JsObject, JsNull, JsValue},
spray.json.DefaultJsonProtocol._
import cats.syntax.apply._
import dao.logHandler, dao.jdbcDriver.q.queries,
queries.{insertContracts, surrogateTemplateId}

val tpId = TemplateId("pkg", "mod", "UncomCollision")
val barrier = new CyclicBarrier(2)
def waitForBarrier() = fconn.async[Int] { k =>
k(Try(barrier.await(5, SECONDS)).toEither)
}

def anUpdateThread(cid: String, first: Boolean) = instanceUUIDLogCtx { implicit lc =>
def stid = surrogateTemplateId(tpId.packageId, tpId.moduleName, tpId.entityName)
val simulation = instanceUUIDLogCtx { implicit lc =>
def stid = {
println(s"s11 starting stid")
surrogateTemplateId(tpId.packageId, tpId.moduleName, tpId.entityName)
.map { i => println(s"s11 completed stid"); i }
}

for {
tpid <- if (first) stid <* waitForBarrier() else waitForBarrier().flatMap(_ => stid)
_ <- queries.dropAllTablesIfExist
_ <- queries.initDatabase
_ <- stid
_ <- fconn.rollback // as with when we conflict and retry
tpid <- stid
_ <- insertContracts(
List(
DBContract(
contractId = cid,
contractId = "foo",
templateId = tpid,
key = JsNull: JsValue,
keyHash = None,
Expand All @@ -243,21 +244,13 @@ abstract class AbstractDatabaseIntegrationTest extends AsyncFreeSpecLike with Be
)
)
)
_ <- if (first) waitForBarrier() *> fconn.commit else fconn.commit <* waitForBarrier()
} yield true
_ <- fconn.commit
} yield succeed
}

def actuallyAsync[A](ca: fconn.ConnectionIO[A]) =
Future(()).flatMap(_ => dao.transact(ca).unsafeToFuture())

dao
.transact(queries.dropAllTablesIfExist.flatMap(_ => queries.initDatabase))
.transact(simulation)
.unsafeToFuture()
.flatMap(_ =>
actuallyAsync(anUpdateThread("foo", true))
zip actuallyAsync(anUpdateThread("bar", false))
map (_ shouldBe ((true, true)))
)
}
}

Expand Down