Skip to content

Multi Subject subscription fails with startAtTime option #9

Open
@jainnidhi703

Description

Hi,

In Spark Streaming job, when I connect to multiple subjects, with startAtTime option, some subjects get subscribed, and some timeout, there is no retry, and no other subscriptions occur after that error, the ones which were connected works well.


val stream: DStream[(String, String)] = NatsToSparkConnector.receiveFromNatsStreaming(
        classOf[String],
        StorageLevel.MEMORY_AND_DISK,
        clusterId
      )
      .withNatsURL()
      .withSubjects("sub1", "sub2", "sub3")
      .withNatsQueue()
      .startAtTime(1570473000)
      .withProperties()
      .asStreamOfKeyValue()

Can see the following error in one of the executors:


com.logimethods.connector.nats.to_spark.OmnipotentNatsStreamingToSparkConnector - io.nats.streaming.StreamingConnectionImpl@49005cb7.subscribe(sub1, stream-queue, com.logimethods.connector.nats.to_spark.NatsStreamingToKeyValueSparkConnectorImpl$1@13f22a1b, SubscriptionOptions{durableName='null', maxInFlight=1024, ackWait=PT30S, startAt=TimeDeltaStart, startSequence=0, startTime=2019-10-07T18:30:00Z, manualAcks=false}) PRODUCES stan: subscribe request timeout
[Thread-12] ERROR com.logimethods.connector.nats.to_spark.NatsToSparkConnector - Cannot start the connector: 
java.io.IOException: stan: subscribe request timeout
    at io.nats.streaming.StreamingConnectionImpl.subscribe(StreamingConnectionImpl.java:480)
    at com.logimethods.connector.nats.to_spark.OmnipotentNatsStreamingToSparkConnector.receive(OmnipotentNatsStreamingToSparkConnector.java:333)
    at com.logimethods.connector.nats.to_spark.NatsToSparkConnector$1.run(NatsToSparkConnector.java:182)

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions