Multi Subject subscription fails with startAtTime option #9
Open
Description
Hi,
In Spark Streaming job, when I connect to multiple subjects, with startAtTime option, some subjects get subscribed, and some timeout, there is no retry, and no other subscriptions occur after that error, the ones which were connected works well.
val stream: DStream[(String, String)] = NatsToSparkConnector.receiveFromNatsStreaming(
classOf[String],
StorageLevel.MEMORY_AND_DISK,
clusterId
)
.withNatsURL()
.withSubjects("sub1", "sub2", "sub3")
.withNatsQueue()
.startAtTime(1570473000)
.withProperties()
.asStreamOfKeyValue()
Can see the following error in one of the executors:
com.logimethods.connector.nats.to_spark.OmnipotentNatsStreamingToSparkConnector - io.nats.streaming.StreamingConnectionImpl@49005cb7.subscribe(sub1, stream-queue, com.logimethods.connector.nats.to_spark.NatsStreamingToKeyValueSparkConnectorImpl$1@13f22a1b, SubscriptionOptions{durableName='null', maxInFlight=1024, ackWait=PT30S, startAt=TimeDeltaStart, startSequence=0, startTime=2019-10-07T18:30:00Z, manualAcks=false}) PRODUCES stan: subscribe request timeout
[Thread-12] ERROR com.logimethods.connector.nats.to_spark.NatsToSparkConnector - Cannot start the connector:
java.io.IOException: stan: subscribe request timeout
at io.nats.streaming.StreamingConnectionImpl.subscribe(StreamingConnectionImpl.java:480)
at com.logimethods.connector.nats.to_spark.OmnipotentNatsStreamingToSparkConnector.receive(OmnipotentNatsStreamingToSparkConnector.java:333)
at com.logimethods.connector.nats.to_spark.NatsToSparkConnector$1.run(NatsToSparkConnector.java:182)
Metadata
Assignees
Labels
No labels