Skip to content

Commit

Permalink
Cassandra closeAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin committed Jul 10, 2022
1 parent 5141587 commit a4372ef
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object CassandraRDDReader {
}

/** Close partition session */
(result ++ Iterator({ session.close(); Seq.empty[(K, V)] })).flatten
(result ++ Iterator({ session.closeAsync(); Seq.empty[(K, V)] })).flatten
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ object CassandraRDDWriter {
.flatMap(rowToBytes)
.map(retire)
.parJoinUnbounded
.onComplete { fs2.Stream eval session.closeF[IO] }
.onComplete { fs2.Stream eval IO(session.closeAsync) }

results
.compile
Expand Down
2 changes: 1 addition & 1 deletion cassandra-spark/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ datastax-java-driver {
local-datacenter = datacenter1
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,14 @@ trait CassandraInstance extends Serializable {
/** With session close */
def withSessionDo[T](block: CqlSession => T): T = {
val session = getSession()
try block(session) finally session.close()
try block(session) finally session.closeAsync()
}

def closeAsync[F[_]: Async]: F[Unit] = session.closeF
// def closeAsync[F[_]: Async]: F[Unit] = session.closeF

def close(): Unit = session.close()
// def close(): Unit = session.close()

def closeAsync = session.closeAsync()
}

case class BaseCassandraInstance(
Expand Down Expand Up @@ -134,7 +136,7 @@ object Cassandra {
implicit def instanceToSession[T <: CassandraInstance](instance: T): CqlSession = instance.session

def withCassandraInstance[T <: CassandraInstance, K](instance: T)(block: T => K): K = block(instance)
def withCassandraInstanceDo[T <: CassandraInstance, K](instance: T)(block: T => K): K = try block(instance) finally instance.close()
def withCassandraInstanceDo[T <: CassandraInstance, K](instance: T)(block: T => K): K = try block(instance) finally instance.closeAsync()

def withBaseCassandraInstance[K](hosts: Seq[String],
username: String,
Expand All @@ -155,7 +157,7 @@ object Cassandra {
password: String,
cassandraConfig: CassandraConfig)(block: CassandraInstance => K): K = {
val instance = BaseCassandraInstance(hosts, username, password, cassandraConfig)
try block(instance) finally instance.close()
try block(instance) finally instance.closeAsync()
}
def withBaseCassandraInstanceDo[K](hosts: Seq[String],
username: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.datastax.oss.driver.api.core.cql.{AsyncResultSet, Statement}
import java.math.BigInteger
import java.util.concurrent.CompletableFuture

package object cassandra {
package object cassandra extends Serializable {
implicit def bigToBig(i: BigInt): BigInteger = new BigInteger(i.toByteArray)

implicit class CompletableFutureOps[T](val self: CompletableFuture[T]) extends AnyVal {
Expand All @@ -35,4 +35,9 @@ package object cassandra {
def closeF[F[_]: Async]: F[Unit] = self.closeAsync().toCompletableFuture.liftTo.void
def executeF[F[_]: Async](statement: Statement[_]): F[AsyncResultSet] = self.executeAsync(statement).toCompletableFuture.liftTo
}

implicit class AsyncResultSetOps(val self: AsyncResultSet) extends AnyVal {
def nonEmpty: Boolean = self.remaining() > 0 || self.hasMorePages
def isEmpty: Boolean = !nonEmpty
}
}

0 comments on commit a4372ef

Please sign in to comment.