diff --git a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java index 35010fe18d6..792ec3aa13b 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java @@ -25,24 +25,75 @@ import org.apache.storm.Config; import org.apache.storm.DaemonConfig; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.KeySequenceNumber; +import org.apache.storm.blobstore.LocalFsBlobStore; +import org.apache.storm.cluster.IStormClusterState; import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.generated.StormTopology; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.nimbus.ILeaderElector; +import org.apache.storm.nimbus.NimbusInfo; +import org.apache.storm.scheduler.INimbus; import org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategyOld; import org.apache.storm.scheduler.resource.strategies.scheduling.RoundRobinResourceAwareStrategy; +import org.apache.storm.security.auth.IGroupMappingServiceProvider; import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.thrift.TException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Time; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.MockitoAnnotations; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class NimbusTest { + private static final String BLOB_FILE_KEY = "file-key"; + + @Mock + private StormMetricsRegistry metricRegistry; + @Mock + private INimbus iNimbus; + @Mock + private IStormClusterState stormClusterState; + @Mock + private NimbusInfo nimbusInfo; + @Mock + private LocalFsBlobStore localBlobStore; + @Mock + private ILeaderElector leaderElector; + @Mock + private IGroupMappingServiceProvider groupMapper; + + private Nimbus nimbus; + + @BeforeEach + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this).close(); + + Map conf = Map.of(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS, 10); + nimbus = new Nimbus(conf, iNimbus, stormClusterState, nimbusInfo, localBlobStore, leaderElector, groupMapper, metricRegistry); + } -public class NimbusTest { @Test public void testMemoryLoadLargerThanMaxHeapSize() { // Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=128.0 < 129.0, @@ -112,4 +163,41 @@ public void validateNoTopoConfOverrides() { Map normalized = Nimbus.normalizeConf(conf, topoConf, topology); assertNull(normalized.get(Config.STORM_WORKERS_ARTIFACTS_DIR)); } + + @Test + void testCreateStateInZookeeper() throws TException { + nimbus.createStateInZookeeper(BLOB_FILE_KEY); + + verify(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any()); + } + + @Test + void testCreateStateInZookeeperWithoutLocalFsBlobStoreInstanceShouldNotCreate() throws Exception { + BlobStore blobStore = mock(BlobStore.class); + Map conf = Map.of(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS, 10); + nimbus = new Nimbus(conf, iNimbus, stormClusterState, nimbusInfo, blobStore, leaderElector, groupMapper, metricRegistry); + + nimbus.createStateInZookeeper(BLOB_FILE_KEY); + + verify(stormClusterState, never()).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any()); + } + + @Test + void testCreateStateInZookeeperWhenFailToSetupBlobWithRuntimeExceptionThrowsRuntimeException() { + doThrow(new RuntimeException("Failed to setup blob")).when(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any()); + + assertThrows(RuntimeException.class, () -> nimbus.createStateInZookeeper(BLOB_FILE_KEY)); + verify(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any()); + } + + @Test + void testCreateStateInZookeeperWhenKeyNotFoundHandlesException() throws Exception { + try (MockedConstruction keySequenceNumber = mockConstruction(KeySequenceNumber.class, (mock, context) -> + when(mock.getKeySequenceNumber(any())).thenThrow(new KeyNotFoundException("Failed to setup blob")))) { + nimbus.createStateInZookeeper(BLOB_FILE_KEY); + + verify(keySequenceNumber.constructed().get(0)).getKeySequenceNumber(any()); + verify(stormClusterState, never()).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any()); + } + } }