Skip to content

Commit

Permalink
Tweak stream-to-stream fault tolerance stress test timeout and events…
Browse files Browse the repository at this point in the history
… count (#24548)

Fixes #24537.

Non equi-join stress test is more heavyweight, so I cut joining count of
events twice
  • Loading branch information
Fly-Style authored May 14, 2023
1 parent f1f6e5e commit e1727aa
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.HazelcastSerialParametersRunnerFactory;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelJVMTest;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -60,16 +61,17 @@

@RunWith(HazelcastParametrizedRunner.class)
@UseParametersRunnerFactory(HazelcastSerialParametersRunnerFactory.class)
@Category(NightlyTest.class)
@Category({NightlyTest.class, ParallelJVMTest.class})
public class SqlSTSInnerEquiJoinFaultToleranceStressTest extends SqlTestSupport {
private static final int EVENTS_PER_SINK = 500;
private static final int SINK_COUNT = 400;
protected static final int EVENTS_TO_PROCESS = EVENTS_PER_SINK * SINK_COUNT;
protected static final int SNAPSHOT_TIMEOUT_SECONDS = 30;

protected static final String JOB_NAME = "s2s_join";
protected static final String EXACTLY_ONCE = "exactlyOnce";
protected static final String AT_LEAST_ONCE = "atLeastOnce";

protected final int eventsPerSink = 500;
protected int sinkCount = 400;
protected int eventsToProcess = eventsPerSink * sinkCount;

private static KafkaTestSupport kafkaTestSupport;
private volatile Throwable ex;

Expand All @@ -80,9 +82,9 @@ public class SqlSTSInnerEquiJoinFaultToleranceStressTest extends SqlTestSupport
protected String sinkTopic;
private JobRestarter jobRestarter;

protected int expectedEventsCount = EVENTS_TO_PROCESS;
protected int expectedEventsCount = eventsToProcess;
protected int firstItemId = 1;
protected int lastItemId = EVENTS_TO_PROCESS;
protected int lastItemId = eventsToProcess;

@Parameter(value = 0)
public String processingGuarantee;
Expand Down Expand Up @@ -168,7 +170,7 @@ public void after() throws InterruptedException {
}
}

@Test
@Test(timeout = 1_200_000L)
public void stressTest() throws Exception {
sqlService.execute(setupFetchingQuery());

Expand Down Expand Up @@ -262,15 +264,15 @@ public void finish() {
private void createTopicData(SqlService sqlService, String topicName) {
try {
int itemsSank = 0;
for (int sink = 1; sink <= SINK_COUNT; sink++) {
for (int sink = 1; sink <= sinkCount; sink++) {
StringBuilder queryBuilder = new StringBuilder("INSERT INTO " + topicName + " VALUES ");
for (int i = 0; i < EVENTS_PER_SINK; ++i) {
for (int i = 0; i < eventsPerSink; ++i) {
++itemsSank;
queryBuilder.append("(").append(itemsSank).append(", 'value-").append(itemsSank).append("'),");
}
queryBuilder.setLength(queryBuilder.length() - 1);

assertEquals(itemsSank, EVENTS_PER_SINK * sink);
assertEquals(itemsSank, eventsPerSink * sink);
sqlService.execute(queryBuilder.toString());
logger.info("Items sank " + itemsSank);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.HazelcastSerialParametersRunnerFactory;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelJVMTest;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(HazelcastParametrizedRunner.class)
@Parameterized.UseParametersRunnerFactory(HazelcastSerialParametersRunnerFactory.class)
@Category(NightlyTest.class)
@Category({NightlyTest.class, ParallelJVMTest.class})
public class SqlSTSInnerNonEquiJoinFaultToleranceStressTest extends SqlSTSInnerEquiJoinFaultToleranceStressTest {

public SqlSTSInnerNonEquiJoinFaultToleranceStressTest() {
super();
this.sinkCount = 200;
}

@Override
protected String setupFetchingQuery() {
expectedEventsCount = EVENTS_TO_PROCESS - 1; // we do expected fewer items for query below
expectedEventsCount = eventsToProcess - 1; // we do expected fewer items for query below
firstItemId = 2; // we do expect first item to be [1, value-2]
lastItemId = EVENTS_TO_PROCESS;
lastItemId = eventsToProcess;
return "CREATE JOB " + JOB_NAME +
" OPTIONS (" +
" 'processingGuarantee'='" + processingGuarantee + "', 'snapshotIntervalMillis' = '1000') " +
Expand Down

0 comments on commit e1727aa

Please sign in to comment.