Skip to content

Commit

Permalink
Merge branch 'STORM-2084' of https://github.com/revans2/incubator-storm
Browse files Browse the repository at this point in the history
… into STORM-2084-merge
HeartSaVioR committed Sep 28, 2017
2 parents 99969a5 + 78cb243 commit 5c05bb7
Showing 17 changed files with 1,627 additions and 1,908 deletions.
2 changes: 1 addition & 1 deletion conf/defaults.yaml
Original file line number Diff line number Diff line change
@@ -123,7 +123,7 @@ supervisor.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore"
supervisor.blobstore.download.thread.count: 5
supervisor.blobstore.download.max_retries: 3
supervisor.localizer.cache.target.size.mb: 10240
supervisor.localizer.cleanup.interval.ms: 600000
supervisor.localizer.cleanup.interval.ms: 30000

nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore"
nimbus.blobstore.expiration.secs: 600
2 changes: 1 addition & 1 deletion storm-client/src/jvm/org/apache/storm/StormTimer.java
Original file line number Diff line number Diff line change
@@ -225,7 +225,7 @@ private void checkActive() {
*/

@Override
public void close() throws Exception {
public void close() throws InterruptedException {
if (this.task.isActive()) {
this.task.setActive(false);
this.task.interrupt();
44 changes: 31 additions & 13 deletions storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
Original file line number Diff line number Diff line change
@@ -65,21 +65,21 @@
(reify TestJob
(^void run [this ^ILocalCluster cluster]
(let [topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
{"spout" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
{"2" (Thrift/prepareBoltDetails
{(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
{(GlobalStreamId. "spout" Utils/DEFAULT_STREAM_ID)
(Thrift/prepareFieldsGrouping ["word"])}
(TestWordCounter.) (Integer. 4))
"3" (Thrift/prepareBoltDetails
{(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
{(GlobalStreamId. "spout" Utils/DEFAULT_STREAM_ID)
(Thrift/prepareGlobalGrouping)}
(TestGlobalCount.))
"4" (Thrift/prepareBoltDetails
{(GlobalStreamId. "2" Utils/DEFAULT_STREAM_ID)
(Thrift/prepareGlobalGrouping)}
(TestAggregatesCounter.))})
mocked-sources (doto (MockedSources.)
(.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
(.addMockData "spout" (into-array Values [(Values. (into-array ["nathan"]))
(Values. (into-array ["bob"]))
(Values. (into-array ["joey"]))
(Values. (into-array ["nathan"]))])
@@ -93,7 +93,7 @@
topology
complete-topology-param)]
(is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
(Testing/readTuples results "1")))
(Testing/readTuples results "spout")))
(is (Testing/multiseteq [["nathan" (int 1)] ["nathan" (int 2)] ["bob" (int 1)] ["joey" (int 1)]]
(Testing/readTuples results "2")))
(is (= [[1] [2] [3] [4]]
@@ -102,18 +102,36 @@
(Testing/readTuples results "4")))
))))

(deftest test-complete-topology
(doseq [zmq-on? [true false]
:let [daemon-conf (doto (Config.)
(.put STORM-LOCAL-MODE-ZMQ zmq-on?))
mk-cluster-param (doto (MkClusterParam.)
(.setSupervisors (int 4))
(.setDaemonConf daemon-conf))]]
(deftest test-complete-topology-netty-simulated
(let [daemon-conf (doto (Config.)
(.put STORM-LOCAL-MODE-ZMQ true))
mk-cluster-param (doto (MkClusterParam.)
(.setSupervisors (int 4))
(.setDaemonConf daemon-conf))]
(Testing/withSimulatedTimeLocalCluster
mk-cluster-param complete-topology-testjob )
mk-cluster-param complete-topology-testjob)))

(deftest test-complete-topology-netty
(let [daemon-conf (doto (Config.)
(.put STORM-LOCAL-MODE-ZMQ true))
mk-cluster-param (doto (MkClusterParam.)
(.setSupervisors (int 4))
(.setDaemonConf daemon-conf))]
(Testing/withLocalCluster
mk-cluster-param complete-topology-testjob)))

(deftest test-complete-topology-local
(let [mk-cluster-param (doto (MkClusterParam.)
(.setSupervisors (int 4)))]
(Testing/withLocalCluster
mk-cluster-param complete-topology-testjob)))

(deftest test-complete-topology-local-simulated
(let [mk-cluster-param (doto (MkClusterParam.)
(.setSupervisors (int 4)))]
(Testing/withSimulatedTimeLocalCluster
mk-cluster-param complete-topology-testjob)))

(deftest test-with-tracked-cluster
(Testing/withTrackedCluster
(reify TestJob
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.localizer.ILocalizer;
import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Time;
@@ -60,7 +60,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
private final AtomicInteger readRetry = new AtomicInteger(0);
private final String assignmentId;
private final ISupervisor iSuper;
private final ILocalizer localizer;
private final AsyncLocalizer localizer;
private final ContainerLauncher launcher;
private final String host;
private final LocalState localState;
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.localizer.ILocalizer;
import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.LocalState;
@@ -79,7 +79,7 @@ static enum MachineState {
}

static class StaticState {
public final ILocalizer localizer;
public final AsyncLocalizer localizer;
public final long hbTimeoutMs;
public final long firstHbTimeoutMs;
public final long killSleepMs;
@@ -90,10 +90,10 @@ static class StaticState {
public final ISupervisor iSupervisor;
public final LocalState localState;

StaticState(ILocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
long killSleepMs, long monitorFreqMs,
ContainerLauncher containerLauncher, String host, int port,
ISupervisor iSupervisor, LocalState localState) {
StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
long killSleepMs, long monitorFreqMs,
ContainerLauncher containerLauncher, String host, int port,
ISupervisor iSupervisor, LocalState localState) {
this.localizer = localizer;
this.hbTimeoutMs = hbTimeoutMs;
this.firstHbTimeoutMs = firstHbTimeoutMs;
@@ -684,12 +684,12 @@ static DynamicState handleEmpty(DynamicState dynamicState, StaticState staticSta
private volatile DynamicState dynamicState;
private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments;

public Slot(ILocalizer localizer, Map<String, Object> conf,
ContainerLauncher containerLauncher, String host,
int port, LocalState localState,
IStormClusterState clusterState,
ISupervisor iSupervisor,
AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
public Slot(AsyncLocalizer localizer, Map<String, Object> conf,
ContainerLauncher containerLauncher, String host,
int port, LocalState localState,
IStormClusterState clusterState,
ISupervisor iSupervisor,
AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
super("SLOT_"+port);

this.cachedCurrentAssignments = cachedCurrentAssignments;
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.daemon.supervisor;

import java.io.File;
@@ -25,7 +26,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;

@@ -39,19 +39,15 @@
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
import org.apache.storm.event.EventManager;
import org.apache.storm.event.EventManagerImp;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.localizer.ILocalizer;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.messaging.IContext;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.LocalState;
@@ -78,8 +74,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
private final StormTimer heartbeatTimer;
private final StormTimer eventTimer;
private final StormTimer blobUpdateTimer;
private final Localizer localizer;
private final AsyncLocalizer asyncLocalizer;
private EventManager eventManager;
private ReadClusterState readState;
@@ -110,10 +104,11 @@ public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor
throw Utils.wrapInRuntime(e);
}

this.currAssignment = new AtomicReference<>(new HashMap<>());

try {
this.localState = ServerConfigUtils.supervisorState(conf);
this.localizer = ServerUtils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf));
this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer);
this.asyncLocalizer = new AsyncLocalizer(conf, currAssignment, localState.getLocalAssignmentsMap());
} catch (IOException e) {
throw Utils.wrapInRuntime(e);
}
@@ -126,13 +121,9 @@ public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor
throw Utils.wrapInRuntime(e);
}

this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>());

this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());

this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());

this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
}

public String getId() {
@@ -178,12 +169,8 @@ public String getHostName() {
public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
return currAssignment;
}

public Localizer getLocalizer() {
return localizer;
}

ILocalizer getAsyncLocalizer() {
AsyncLocalizer getAsyncLocalizer() {
return asyncLocalizer;
}

@@ -199,8 +186,6 @@ public void launch() throws Exception {
String path = ServerConfigUtils.supervisorTmpDir(conf);
FileUtils.cleanDirectory(new File(path));

Localizer localizer = getLocalizer();

SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);
hb.run();
// should synchronize supervisor so it doesn't launch anything after being down (optimization)
@@ -209,36 +194,14 @@ public void launch() throws Exception {

this.eventManager = new EventManagerImp(false);
this.readState = new ReadClusterState(this);

Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf);
Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap();
if (portToAssignments != null) {
Map<String, LocalAssignment> assignments = new HashMap<>();
for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) {
assignments.put(la.get_topology_id(), la);
}
for (String topoId : downloadedTopoIds) {
LocalAssignment la = assignments.get(topoId);
if (la != null) {
SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner());
} else {
LOG.warn("Could not find an owner for topo {}", topoId);
}
}
}
// do this after adding the references so we don't try to clean things being used
localizer.startCleaner();

UpdateBlobs updateBlobsThread = new UpdateBlobs(this);
asyncLocalizer.start();

if ((Boolean) conf.get(DaemonConfig.SUPERVISOR_ENABLE)) {
// This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
// to date even if callbacks don't all work exactly right
eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));

// Blob update thread. Starts with 30 seconds delay, every 30 seconds
blobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, eventManager));

// supervisor health check
eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this));
}
@@ -282,15 +245,13 @@ public void close() {
this.active = false;
heartbeatTimer.close();
eventTimer.close();
blobUpdateTimer.close();
if (eventManager != null) {
eventManager.close();
}
if (readState != null) {
readState.close();
}
asyncLocalizer.shutdown();
localizer.shutdown();
asyncLocalizer.close();
getStormClusterState().disconnect();
} catch (Exception e) {
LOG.error("Error Shutting down", e);
Original file line number Diff line number Diff line change
@@ -19,9 +19,7 @@

import org.apache.storm.Config;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.localizer.LocalResource;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
@@ -33,14 +31,11 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class SupervisorUtils {

@@ -95,34 +90,6 @@ public static List<LocalResource> blobstoreMapToLocalresources(Map<String, Map<S
return localResourceList;
}

/**
* For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the
* cache on restart.
*
* @param localizer
* @param stormId
* @param conf
*/
static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf, String user) throws IOException {
Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
if (blobstoreMap != null) {
localizer.addReferences(localresources, user, topoName);
}
}

public static Set<String> readDownloadedTopologyIds(Map<String, Object> conf) throws IOException {
Set<String> stormIds = new HashSet<>();
String path = ConfigUtils.supervisorStormDistRoot(conf);
Collection<String> rets = ConfigUtils.readDirContents(path);
for (String ret : rets) {
stormIds.add(URLDecoder.decode(ret));
}
return stormIds;
}

public static Collection<String> supervisorWorkerIds(Map<String, Object> conf) {
String workerRoot = ConfigUtils.workerRoot(conf);
return ConfigUtils.readDirContents(workerRoot);
Loading
Oops, something went wrong.

0 comments on commit 5c05bb7

Please sign in to comment.