Skip to content

Commit

Permalink
Merge branch 'STORM-2792' of https://github.com/revans2/incubator-storm
Browse files Browse the repository at this point in the history
… into STORM-2792

STORM-2792: Remove RAS EvictionPolicy and cleanup

This closes apache#2400
Robert Evans committed Nov 7, 2017
2 parents b5eb86b + efd1f05 commit 06341d8
Showing 25 changed files with 911 additions and 705 deletions.
1 change: 0 additions & 1 deletion conf/defaults.yaml
Original file line number Diff line number Diff line change
@@ -274,7 +274,6 @@ topology.component.resources.offheap.memory.mb: 0.0
topology.component.cpu.pcore.percent: 10.0
topology.worker.max.heap.size.mb: 768.0
topology.scheduler.strategy: "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy"
resource.aware.scheduler.eviction.strategy: "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy"
resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"

blacklist.scheduler.tolerance.time.secs: 300
119 changes: 84 additions & 35 deletions docs/Resource_Aware_Scheduler_overview.md

Large diffs are not rendered by default.

17 changes: 8 additions & 9 deletions storm-server/src/main/java/org/apache/storm/DaemonConfig.java
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@
import static org.apache.storm.validation.ConfigValidationAnnotations.isStringList;
import static org.apache.storm.validation.ConfigValidationAnnotations.isStringOrStringList;
import static org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber;
import static org.apache.storm.validation.ConfigValidationAnnotations.isType;
import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
import static org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom;
import static org.apache.storm.validation.ConfigValidationAnnotations.isBoolean;
@@ -37,7 +36,6 @@
import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
import org.apache.storm.scheduler.blacklist.reporters.IReporter;
import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.apache.storm.validation.ConfigValidation;
@@ -878,20 +876,21 @@ public class DaemonConfig implements Validated {
valueValidatorClasses = {ConfigValidation.UserResourcePoolEntryValidator.class})
public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools";

/**
* The class that specifies the eviction strategy to use in ResourceAwareScheduler.
*/
@NotNull
@isImplementationOfClass(implementsClass = IEvictionStrategy.class)
public static final String RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY = "resource.aware.scheduler.eviction.strategy";

/**
* the class that specifies the scheduling priority strategy to use in ResourceAwareScheduler.
*/
@NotNull
@isImplementationOfClass(implementsClass = ISchedulingPriorityStrategy.class)
public static final String RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY = "resource.aware.scheduler.priority.strategy";

/**
* The maximum number of times that the RAS will attempt to schedule a topology. The default is 5.
*/
@isInteger
@isPositiveNumber
public static final String RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS =
"resource.aware.scheduler.max.topology.scheduling.attempts";

/**
* How often nimbus's background thread to sync code for missing topologies should run.
*/
Original file line number Diff line number Diff line change
@@ -134,7 +134,7 @@
import org.apache.storm.nimbus.ITopologyValidator;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.Cluster.SupervisorResources;
import org.apache.storm.scheduler.SupervisorResources;
import org.apache.storm.scheduler.DefaultScheduler;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.INimbus;
@@ -971,7 +971,7 @@ private static Map<String, Double> setResourcesDefaultIfNotSet(Map<String, Map<S
if (resourcesMap == null) {
resourcesMap = new HashMap<>();
}
ResourceUtils.checkIntialization(resourcesMap, compId, topoConf);
ResourceUtils.checkInitialization(resourcesMap, compId, topoConf);
return resourcesMap;
}

70 changes: 10 additions & 60 deletions storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.scheduler;

import com.google.common.annotations.VisibleForTesting;
@@ -44,64 +45,6 @@
public class Cluster implements ISchedulingState {
private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);

public static class SupervisorResources {
private final double totalMem;
private final double totalCpu;
private final double usedMem;
private final double usedCpu;

/**
* Constructor for a Supervisor's resources.
*
* @param totalMem the total mem on the supervisor
* @param totalCpu the total CPU on the supervisor
* @param usedMem the used mem on the supervisor
* @param usedCpu the used CPU on the supervisor
*/
public SupervisorResources(double totalMem, double totalCpu, double usedMem, double usedCpu) {
this.totalMem = totalMem;
this.totalCpu = totalCpu;
this.usedMem = usedMem;
this.usedCpu = usedCpu;
}

public double getUsedMem() {
return usedMem;
}

public double getUsedCpu() {
return usedCpu;
}

public double getTotalMem() {
return totalMem;
}

public double getTotalCpu() {
return totalCpu;
}

public double getAvailableCpu() {
return totalCpu - usedCpu;
}

public double getAvailableMem() {
return totalMem - usedMem;
}

private SupervisorResources add(WorkerResources wr) {
return new SupervisorResources(
totalMem,
totalCpu,
usedMem + wr.get_mem_off_heap() + wr.get_mem_on_heap(),
usedCpu + wr.get_cpu());
}

public SupervisorResources addMem(Double value) {
return new SupervisorResources(totalMem, totalCpu, usedMem + value, usedCpu);
}
}

/**
* key: supervisor id, value: supervisor details.
*/
@@ -839,8 +782,8 @@ public void setNetworkTopography(Map<String, List<String>> networkTopography) {
* @param topConf - the topology config
* @return the assigned memory (in MB)
*/
public static Double getAssignedMemoryForSlot(final Map<String, Object> topConf) {
Double totalWorkerMemory = 0.0;
public static double getAssignedMemoryForSlot(final Map<String, Object> topConf) {
double totalWorkerMemory = 0.0;
final Integer topologyWorkerDefaultMemoryAllocation = 768;

List<String> topologyWorkerGcChildopts = ConfigUtils.getValueAsList(
@@ -887,6 +830,13 @@ public static Double getAssignedMemoryForSlot(final Map<String, Object> topConf)
return totalWorkerMemory;
}

/**
* set scheduler status for a topology.
*/
public void setStatus(TopologyDetails td, String statusMessage) {
setStatus(td.getId(), statusMessage);
}

/**
* set scheduler status for a topology.
*/
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@

import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.scheduler.Cluster.SupervisorResources;

/** An interface that provides access to the current scheduling state. */
public interface ISchedulingState {
@@ -43,7 +42,7 @@ public interface ISchedulingState {
List<TopologyDetails> needsSchedulingTopologies();

/**
* Does the topology need scheduling?
* Does the topology need scheduling.
*
* <p>A topology needs scheduling if one of the following conditions holds:
*
@@ -103,12 +102,14 @@ public interface ISchedulingState {
Collection<ExecutorDetails> getUnassignedExecutors(TopologyDetails topology);

/**
* Get the executor to component name map for executors that need to be scheduled.
* @param topology the topology this is for
* @return a executor -> component-id map which needs scheduling in this topology.
*/
Map<ExecutorDetails, String> getNeedsSchedulingExecutorToComponents(TopologyDetails topology);

/**
* Get the component name to executor list for executors that need to be scheduled.
* @param topology the topology this is for
* @return a component-id -> executors map which needs scheduling in this topology.
*/
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.scheduler;

import org.apache.storm.generated.WorkerResources;

public class SupervisorResources {
private final double totalMem;
private final double totalCpu;
private final double usedMem;
private final double usedCpu;

/**
* Constructor for a Supervisor's resources.
*
* @param totalMem the total mem on the supervisor
* @param totalCpu the total CPU on the supervisor
* @param usedMem the used mem on the supervisor
* @param usedCpu the used CPU on the supervisor
*/
public SupervisorResources(double totalMem, double totalCpu, double usedMem, double usedCpu) {
this.totalMem = totalMem;
this.totalCpu = totalCpu;
this.usedMem = usedMem;
this.usedCpu = usedCpu;
}

public double getUsedMem() {
return usedMem;
}

public double getUsedCpu() {
return usedCpu;
}

public double getTotalMem() {
return totalMem;
}

public double getTotalCpu() {
return totalCpu;
}

public double getAvailableCpu() {
return totalCpu - usedCpu;
}

public double getAvailableMem() {
return totalMem - usedMem;
}

SupervisorResources add(WorkerResources wr) {
return new SupervisorResources(
totalMem,
totalCpu,
usedMem + wr.get_mem_off_heap() + wr.get_mem_on_heap(),
usedCpu + wr.get_cpu());
}

public SupervisorResources addMem(Double value) {
return new SupervisorResources(totalMem, totalCpu, usedMem + value, usedCpu);
}
}
Original file line number Diff line number Diff line change
@@ -139,7 +139,7 @@ private void initResourceList() {
//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
Map<String, Double> topologyResources =
ResourceUtils.parseResources(bolt.getValue().get_common().get_json_conf());
ResourceUtils.checkIntialization(topologyResources, bolt.getKey(), topologyConf);
ResourceUtils.checkInitialization(topologyResources, bolt.getKey(), topologyConf);
for (Map.Entry<ExecutorDetails, String> anExecutorToComponent :
executorToComponent.entrySet()) {
if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
@@ -153,7 +153,7 @@ private void initResourceList() {
for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
Map<String, Double> topologyResources =
ResourceUtils.parseResources(spout.getValue().get_common().get_json_conf());
ResourceUtils.checkIntialization(topologyResources, spout.getKey(), this.topologyConf);
ResourceUtils.checkInitialization(topologyResources, spout.getKey(), this.topologyConf);
for (Map.Entry<ExecutorDetails, String> anExecutorToComponent :
executorToComponent.entrySet()) {
if (spout.getKey().equals(anExecutorToComponent.getValue())) {
Loading
Oops, something went wrong.

0 comments on commit 06341d8

Please sign in to comment.