Skip to content

Commit

Permalink
grpc-utils: Simpler extractors. (digital-asset#3594)
Browse files Browse the repository at this point in the history
* grpc-utils: Simpler GrpcStatus extractors when you know the code.

And don't care about the description.

* grpc-utils: Simpler GrpcException extractors when you know the code.

And don't care about the description.

* Apply suggestions from code review

Co-Authored-By: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>

* Add `new` to `SpecificGrpc{Exception,Status}` constructor calls.

Also, don't blindly apply GitHub suggestions.
  • Loading branch information
SamirTalwar authored and mergify[bot] committed Nov 22, 2019
1 parent 16b6f1b commit 1bd2ec5
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package com.digitalasset.extractor
import akka.actor.ActorSystem
import akka.stream.scaladsl.{RestartSource, Sink}
import akka.stream.{ActorMaterializer, KillSwitches}
import com.digitalasset.timer.RetryStrategy
import com.digitalasset.extractor.Types._
import com.digitalasset.extractor.config.{ExtractorConfig, SnapshotEndSetting}
import com.digitalasset.extractor.helpers.FutureUtil.toFuture
Expand All @@ -15,7 +14,7 @@ import com.digitalasset.extractor.ledger.types.TransactionTree
import com.digitalasset.extractor.ledger.types.TransactionTree._
import com.digitalasset.extractor.writers.Writer
import com.digitalasset.extractor.writers.Writer.RefreshPackages
import com.digitalasset.grpc.{GrpcException, GrpcStatus}
import com.digitalasset.grpc.GrpcException
import com.digitalasset.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset
import com.digitalasset.ledger.api.v1.transaction_filter.{Filters, TransactionFilter}
Expand All @@ -25,9 +24,9 @@ import com.digitalasset.ledger.client.configuration._
import com.digitalasset.ledger.client.services.pkg.PackageClient
import com.digitalasset.ledger.service.LedgerReader
import com.digitalasset.ledger.service.LedgerReader.PackageStore
import com.digitalasset.timer.RetryStrategy
import com.typesafe.scalalogging.StrictLogging
import io.grpc.netty.NettyChannelBuilder
import io.grpc.Status.Code.PERMISSION_DENIED
import scalaz.Scalaz._
import scalaz._
import scalaz.syntax.tag._
Expand Down Expand Up @@ -121,7 +120,7 @@ class Extractor[T](config: ExtractorConfig, target: T)(

private def keepRetryingOnPermissionDenied[A](f: () => Future[A]): Future[A] =
RetryStrategy.constant(1.second) {
case GrpcException(GrpcStatus(`PERMISSION_DENIED`, _), _) => true
case GrpcException.PERMISSION_DENIED() => true
} { (attempt, wait) =>
logger.error(s"Failed to authenticate with Ledger API on attempt $attempt, next one in $wait")
tokenHolder.foreach(_.refresh())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import com.digitalasset.extractor.config.{ExtractorConfig, SnapshotEndSetting}
import com.digitalasset.extractor.ledger.types.TransactionTree
import com.digitalasset.extractor.targets.TextPrintTarget
import com.digitalasset.extractor.writers.Writer
import com.digitalasset.grpc.{GrpcException, GrpcStatus}
import com.digitalasset.grpc.GrpcException
import com.digitalasset.jwt.domain.DecodedJwt
import com.digitalasset.jwt.{HMAC256Verifier, JwtSigner}
import com.digitalasset.ledger.api.auth.{AuthServiceJWT, AuthServiceJWTCodec, AuthServiceJWTPayload}
Expand All @@ -27,16 +27,15 @@ import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.services.{SandboxFixture, TestCommands}
import com.digitalasset.timer.Delayed
import com.google.protobuf.timestamp.Timestamp
import io.grpc.Status.Code.PERMISSION_DENIED
import org.scalatest.{AsyncFlatSpec, Matchers}
import org.slf4j.LoggerFactory
import scalaz.{OneAnd, \/}

import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import scala.util.{Failure, Success}

final class AuthSpec
extends AsyncFlatSpec
Expand Down Expand Up @@ -140,7 +139,7 @@ final class AuthSpec

it should "fail immediately with a PERMISSION_DENIED if no token is provided" in {
extractor(noAuth).run().failed.collect {
case GrpcException(GrpcStatus(`PERMISSION_DENIED`, _), _) => succeed
case GrpcException.PERMISSION_DENIED() => succeed
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import com.digitalasset.ledger.api.v1.command_service.{
SubmitAndWaitForTransactionTreeResponse
}
import com.digitalasset.ledger.api.{v1 => scalaAPI}
import com.google.protobuf.{Empty => JEmpty}
import com.google.protobuf.empty.Empty
import com.google.protobuf.{Empty => JEmpty}
import com.google.rpc.Status
import com.google.rpc.code.Code.OK
import io.grpc.Metadata
import io.grpc.Status.Code.INVALID_ARGUMENT
import io.reactivex.{Flowable, Observable, Single}
import org.pcollections.{HashTreePMap, HashTreePSet}
import org.reactivestreams.{Subscriber, Subscription}
Expand Down Expand Up @@ -403,7 +402,7 @@ class BotTest extends FlatSpec with Matchers with DataLayerHelpers {
new FiltersByParty(Collections.emptyMap()),
_ => Flowable.empty())
} catch {
case GrpcException(GrpcStatus(`INVALID_ARGUMENT`, _), trailers) =>
case GrpcException(GrpcStatus.INVALID_ARGUMENT(), trailers) =>
/** the tests relies on specific implementation of the [[TransactionServiceImpl.getTransactions()]] */
fail(trailers.get(Metadata.Key.of("cause", Metadata.ASCII_STRING_MARSHALLER)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.{Timer, TimerTask, UUID}

import akka.stream.scaladsl.Sink
import com.digitalasset.daml.bazeltools.BazelRunfiles.rlocation
import com.digitalasset.grpc.{GrpcException, GrpcStatus}
import com.digitalasset.grpc.GrpcException
import com.digitalasset.jwt.domain.DecodedJwt
import com.digitalasset.jwt.{HMAC256Verifier, JwtSigner}
import com.digitalasset.ledger.api.auth.{AuthServiceJWT, AuthServiceJWTCodec, AuthServiceJWTPayload}
Expand Down Expand Up @@ -45,7 +45,6 @@ import com.digitalasset.ledger.api.v1.transaction_filter.{Filters, TransactionFi
import com.digitalasset.ledger.api.v1.transaction_service._
import com.digitalasset.platform.apitesting._
import com.google.protobuf.ByteString
import io.grpc.Status.Code.PERMISSION_DENIED
import io.grpc.stub.StreamObserver
import io.grpc.{Status, StatusException, StatusRuntimeException}
import org.scalatest.concurrent.AsyncTimeLimitedTests
Expand Down Expand Up @@ -814,7 +813,7 @@ class AuthorizationIT
}
def onError(t: Throwable): Unit = {
t match {
case GrpcException(GrpcStatus(`PERMISSION_DENIED`, _), _) if gotSomething =>
case GrpcException.PERMISSION_DENIED() if gotSomething =>
val _ = promise.trySuccess(())
case _ =>
val _ = promise.tryFailure(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,41 @@

package com.digitalasset.grpc

import com.digitalasset.grpc.GrpcStatus.SpecificGrpcStatus
import io.grpc.{Metadata, Status, StatusException, StatusRuntimeException}

object GrpcException {

def unapply(arg: Exception): Option[(Status, Metadata)] =
arg match {
def unapply(exception: Exception): Option[(Status, Metadata)] =
exception match {
case e: StatusRuntimeException => Some((e.getStatus, e.getTrailers))
case e: StatusException => Some((e.getStatus, e.getTrailers))
case _ => None
}

private[grpc] final class SpecificGrpcException(status: SpecificGrpcStatus) {
def unapply(exception: Exception): Boolean =
exception match {
case e: StatusRuntimeException => status.unapply(e.getStatus)
case e: StatusException => status.unapply(e.getStatus)
case _ => false
}
}

val OK = new SpecificGrpcException(GrpcStatus.OK)
val CANCELLED = new SpecificGrpcException(GrpcStatus.CANCELLED)
val UNKNOWN = new SpecificGrpcException(GrpcStatus.UNKNOWN)
val INVALID_ARGUMENT = new SpecificGrpcException(GrpcStatus.INVALID_ARGUMENT)
val DEADLINE_EXCEEDED = new SpecificGrpcException(GrpcStatus.DEADLINE_EXCEEDED)
val NOT_FOUND = new SpecificGrpcException(GrpcStatus.NOT_FOUND)
val ALREADY_EXISTS = new SpecificGrpcException(GrpcStatus.ALREADY_EXISTS)
val PERMISSION_DENIED = new SpecificGrpcException(GrpcStatus.PERMISSION_DENIED)
val RESOURCE_EXHAUSTED = new SpecificGrpcException(GrpcStatus.RESOURCE_EXHAUSTED)
val FAILED_PRECONDITION = new SpecificGrpcException(GrpcStatus.FAILED_PRECONDITION)
val ABORTED = new SpecificGrpcException(GrpcStatus.ABORTED)
val OUT_OF_RANGE = new SpecificGrpcException(GrpcStatus.OUT_OF_RANGE)
val UNIMPLEMENTED = new SpecificGrpcException(GrpcStatus.UNIMPLEMENTED)
val INTERNAL = new SpecificGrpcException(GrpcStatus.INTERNAL)
val UNAVAILABLE = new SpecificGrpcException(GrpcStatus.UNAVAILABLE)
val DATA_LOSS = new SpecificGrpcException(GrpcStatus.DATA_LOSS)
val UNAUTHENTICATED = new SpecificGrpcException(GrpcStatus.UNAUTHENTICATED)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,42 @@

package com.digitalasset.grpc

import io.grpc.Status
import com.google.rpc.status.{Status => ProtobufStatus}
import io.grpc.Status
import io.grpc.Status.Code

object GrpcStatus {
type Description = Option[String]

def unapply(arg: Status): Some[(Status.Code, Option[String])] =
Some((arg.getCode, Option(arg.getDescription)))
def unapply(status: Status): Some[(Code, Description)] =
Some((status.getCode, Option(status.getDescription)))

def toProto(code: Status.Code, description: Option[String]): ProtobufStatus =
def toProto(code: Code, description: Description): ProtobufStatus =
ProtobufStatus(code.value, description.getOrElse(""))

def toProto(status: Status): ProtobufStatus =
toProto(status.getCode, Option(status.getDescription))

private[grpc] final class SpecificGrpcStatus(code: Code) {
def unapply(status: Status): Boolean =
status.getCode == code
}

val OK = new SpecificGrpcStatus(Code.OK)
val CANCELLED = new SpecificGrpcStatus(Code.CANCELLED)
val UNKNOWN = new SpecificGrpcStatus(Code.UNKNOWN)
val INVALID_ARGUMENT = new SpecificGrpcStatus(Code.INVALID_ARGUMENT)
val DEADLINE_EXCEEDED = new SpecificGrpcStatus(Code.DEADLINE_EXCEEDED)
val NOT_FOUND = new SpecificGrpcStatus(Code.NOT_FOUND)
val ALREADY_EXISTS = new SpecificGrpcStatus(Code.ALREADY_EXISTS)
val PERMISSION_DENIED = new SpecificGrpcStatus(Code.PERMISSION_DENIED)
val RESOURCE_EXHAUSTED = new SpecificGrpcStatus(Code.RESOURCE_EXHAUSTED)
val FAILED_PRECONDITION = new SpecificGrpcStatus(Code.FAILED_PRECONDITION)
val ABORTED = new SpecificGrpcStatus(Code.ABORTED)
val OUT_OF_RANGE = new SpecificGrpcStatus(Code.OUT_OF_RANGE)
val UNIMPLEMENTED = new SpecificGrpcStatus(Code.UNIMPLEMENTED)
val INTERNAL = new SpecificGrpcStatus(Code.INTERNAL)
val UNAVAILABLE = new SpecificGrpcStatus(Code.UNAVAILABLE)
val DATA_LOSS = new SpecificGrpcStatus(Code.DATA_LOSS)
val UNAUTHENTICATED = new SpecificGrpcStatus(Code.UNAUTHENTICATED)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import akka.http.scaladsl.settings.RoutingSettings
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.digitalasset.grpc.{GrpcException, GrpcStatus}
import com.digitalasset.grpc.GrpcException
import com.digitalasset.navigator.SessionJsonProtocol._
import com.digitalasset.navigator.config._
import com.digitalasset.navigator.graphql.GraphQLContext
Expand All @@ -29,7 +29,6 @@ import com.digitalasset.navigator.model.{Ledger, PackageRegistry}
import com.digitalasset.navigator.store.Store._
import com.digitalasset.navigator.store.platform.PlatformStore
import com.typesafe.scalalogging.LazyLogging
import io.grpc.Status.Code.PERMISSION_DENIED
import org.slf4j.LoggerFactory
import sangria.schema._
import spray.json._
Expand Down Expand Up @@ -138,7 +137,7 @@ abstract class UIBackend extends LazyLogging with ApplicationInfoJsonSupport {
_,
_,
_,
GrpcException(GrpcStatus(`PERMISSION_DENIED`, _), _)) =>
GrpcException.PERMISSION_DENIED()) =>
logger.warn("Attempt to sign in without valid token")
complete(signIn(Some(InvalidCredentials)))
case _: ApplicationStateFailed =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.grpc.GrpcException
import com.digitalasset.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import com.digitalasset.grpc.{GrpcException, GrpcStatus}
import com.digitalasset.ledger.api.refinements.{ApiTypes, IdGenerator}
import com.digitalasset.ledger.api.tls.TlsConfiguration
import com.digitalasset.ledger.api.v1.testing.time_service.TimeServiceGrpc
Expand All @@ -31,7 +31,6 @@ import com.digitalasset.navigator.store.Store._
import com.digitalasset.navigator.time._
import com.digitalasset.navigator.util.RetryHelper
import io.grpc.Channel
import io.grpc.Status.Code.UNIMPLEMENTED
import io.grpc.netty.{GrpcSslContexts, NettyChannelBuilder}
import io.netty.handler.ssl.SslContext
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -340,10 +339,9 @@ class PlatformStore(
})
.recover({
// If the time service is not implemented, then the ledger uses UTC time.
case GrpcException(GrpcStatus(`UNIMPLEMENTED`, _), _) => {
case GrpcException.UNIMPLEMENTED() =>
log.info("Time service is not implemented")
None
}
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import java.lang.Math.floor

import akka.actor.Scheduler
import akka.pattern.after
import com.digitalasset.grpc.{GrpcException, GrpcStatus}
import com.digitalasset.grpc.GrpcException
import com.typesafe.scalalogging.LazyLogging
import io.grpc.Status.Code.PERMISSION_DENIED

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -48,7 +47,7 @@ object RetryHelper extends LazyLogging {
}

val failFastOnPermissionDenied: RetryStrategy = {
case GrpcException(GrpcStatus(`PERMISSION_DENIED`, _), _) => false
case GrpcException.PERMISSION_DENIED() => false
case NonFatal(_) => true
}

Expand Down

0 comments on commit 1bd2ec5

Please sign in to comment.