Skip to content

Commit

Permalink
Added an automated spark config
Browse files Browse the repository at this point in the history
  • Loading branch information
rwalk committed Oct 15, 2015
1 parent 10d6e23 commit c07412b
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 75 deletions.
102 changes: 102 additions & 0 deletions aws_config/configure/configure_spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/python3
#
# Configure Kafka on ec2 instances
#

import boto3, os, sys
from botocore.exceptions import ClientError as BotoClientError
from time import sleep
sys.path.append("..")
from create_clusters import get_tag, keyfile

# configuration
my_instances_filters = [{ 'Name': 'instance-state-name', 'Values': ['running']}, {'Name':'tag-value', 'Values':[get_tag('spark-node')]}]

if __name__=="__main__":

# find all the host nodes
ec2 = boto3.resource('ec2')
hosts = []
private_ips = []
reservations = ec2.instances.filter( Filters = my_instances_filters )
for instance in reservations:
print("ID: {0:<15}\tIP: {1:<15}".format(instance.instance_id, instance.public_ip_address))
hosts.append(instance.public_ip_address)
private_ips.append(instance.private_ip_address)

if len(hosts) != len(private_ips):
raise(RuntimeError("Host and private ips not consistent!"))

if len(hosts) == 0:
raise(RuntimeError("No hosts found."))

# Identify master node
master = hosts[0]
#######################################################################
# Spark requires passwordless SSH
#######################################################################
cmd_str = []

# generate a key on the master
cmd_str.append("ssh -i {0} ubuntu@{1} \"sudo apt-get -y install ssh rsync && ssh-keygen -f ~/.ssh/id_rsa -t rsa -P \'\' \"".format(keyfile, hosts[0]))

# download public key temporarily
cmd_str.append("scp -i {0} ubuntu@{1}:.ssh/id_rsa.pub {2}".format(keyfile, master, "templates/key.tmp"))

# auth public key for all hosts
for h in hosts:
cmd_str.append("scp -i {0} {1} ubuntu@{2}:".format(keyfile, "templates/key.tmp", h))
cmd_str.append("ssh -i {0} ubuntu@{1} \"cat key.tmp >> ~/.ssh/authorized_keys\"".format(keyfile, h))

for cmd in cmd_str:
print(cmd)
res=os.system(cmd)
if res!=0:
raise(RuntimeError("Something went wrong executing {0} Got exit: {1}".format(cmd, res)))

#######################################################################
# Spark
#######################################################################
print("Starting Spark configuration...")
for i,h in enumerate(hosts):
cmd_str = []
with open("templates/spark-env.sh.tmp", "w") as tmpfile:
with open("templates/spark-env.sh","r") as f:
# copy over the template
for l in f:
tmpfile.write(l)

# advertise host's private IP
tmpfile.write("export SPARK_PUBLIC_DNS={0}\n".format(private_ips[i]))

# add commands to queue
cmd_str.append("scp -i {0} {1} ubuntu@{2}:".format(keyfile, tmpfile.name, h))
cmd_str.append("ssh -i {0} ubuntu@{1} sudo mv spark-env.sh.tmp /usr/local/spark/conf/spark-env.sh".format(keyfile, h))

# execute the remote commands
for cmd in cmd_str:
print(cmd)
res=os.system(cmd)
if res!=0:
raise(RuntimeError("Something went wrong executing {0} Got exit: {1}".format(cmd, res)))

# send the slaves file to the master
with open("templates/slaves.tmp", "w") as tmpfile:
for i,h in enumerate(hosts[1:]):
tmpfile.write("{0}\n".format(private_ips[i]))

# add commands to queue
cmd_str.append("scp -i {0} {1} ubuntu@{2}:".format(keyfile, tmpfile.name, master))
cmd_str.append("ssh -i {0} ubuntu@{1} sudo mv slaves.tmp /usr/local/spark/conf/slaves".format(keyfile, master))

# start spark on the master
cmd_str.append("ssh -i {0} ubuntu@{1} /usr/local/spark/sbin/start-all.sh".format(keyfile, master))

for cmd in cmd_str:
print(cmd)
res=os.system(cmd)
if res!=0:
raise(RuntimeError("Something went wrong executing {0} Got exit: {1}".format(cmd, res)))



54 changes: 54 additions & 0 deletions aws_config/configure/templates/spark-env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env bash

# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.

# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
# - SPARK_CLASSPATH, default classpath entries to append

# Options read by executors and drivers running inside the cluster
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_CLASSPATH, default classpath entries to append
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos

# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
# - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
# - SPARK_DRIVER_MEMORY, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
# - SPARK_YARN_APP_NAME, The name of your application (Default: Spark)
# - SPARK_YARN_QUEUE, The hadoop queue to use for allocation requests (Default: ‘default’)
# - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job.
# - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job.

# Options for the daemons used in the standalone deploy mode
# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers

# Generic options for the daemons used in the standalone deploy mode
# - SPARK_CONF_DIR Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs)
# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp)
# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER)
# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0)

export JAVA_HOME=/usr
export SPARK_WORKER_CORES=$(echo $(nproc)*3 | bc)
57 changes: 49 additions & 8 deletions aws_config/create_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@
elasticsearch_initfile = os.path.join(path, "elasticsearch_install.sh")
storm_initfile = os.path.join(path, "storm_install.sh")
flask_initfile = os.path.join(path, "flask_install.sh")
spark_initfile = os.path.join(path, "spark_install.sh")

# base AWS settings
base_aws_image = 'ami-5189a661'

# services
services = ['kafka', 'elasticsearch', 'storm', 'flask']
services = ['kafka', 'elasticsearch', 'storm', 'flask', 'spark']

###############################
# helper methods
Expand Down Expand Up @@ -329,20 +330,20 @@ def get_tag(name):
v.create_tags(Tags=[{'Key':'Name', 'Value':get_tag(tag)}])
print("SERVICE: {0:<15}\tID: {1:<15}\tIP: {2:<15}\tDNS: {3:<15}".format(tag, v.instance_id, v.public_ip_address, v.public_dns_name))

if args.service.lower() in ['all', 'flask']:
if args.service.lower() in ['all', 'spark']:
#########################################
# flask webserver
# Spark
#########################################
print("Creating Flask webserver...")
print("Creating Spark Cluster...")
#
# EC2 Instances
#
shellcodefile=os.path.abspath(flask_initfile)
shellcodefile=os.path.abspath(spark_initfile)
shellfile = open(shellcodefile,'r').read()
pemfile =os.path.abspath(keyfile)
instances = ec2.create_instances(
MinCount=1,
MaxCount=1,
MinCount=4,
MaxCount=4,
UserData=shellfile,
KeyName=pemkey,
ImageId=base_aws_image,
Expand All @@ -361,10 +362,50 @@ def get_tag(name):
)

# tag instances and assign a public ip
tag='flask-node'
tag='spark-node'
print("Sleep 60 seconds to give instances time to configure...")
sleep(60)
for v in instances:
v.create_tags(Tags=[{'Key':'Name', 'Value':get_tag(tag)}])
print("SERVICE: {0:<15}\tID: {1:<15}\tIP: {2:<15}\tDNS: {3:<15}".format(tag, v.instance_id, v.public_ip_address, v.public_dns_name))

if args.service.lower() in ['all', 'storm']:
#########################################
# STORM CLUSTER
#########################################
print("Creating a Storm cluster...")
#
# EC2 Instances
#
shellcodefile=os.path.abspath(storm_initfile)
shellfile = open(shellcodefile,'r').read()
pemfile =os.path.abspath(keyfile)
instances = ec2.create_instances(
MinCount=storm_instances,
MaxCount=storm_instances,
UserData=shellfile,
KeyName=pemkey,
ImageId=base_aws_image,
InstanceType='m4.xlarge',
NetworkInterfaces=[{'SubnetId': subnet.id, 'DeviceIndex':0, 'Groups':[security_group.id], 'AssociatePublicIpAddress':True}],
BlockDeviceMappings=[
{
'VirtualName': 'ephemeral0',
'DeviceName': '/dev/sda1',
'Ebs': {
'VolumeSize': 64,
'VolumeType': 'gp2' # standard for magnetic, gp2 for SSD
}
}
]
)

# tag instances and assign a public ip
tag='storm-node'
print("Sleep 60 seconds to give instances time to configure...")
sleep(60)
for v in instances:
v.create_tags(Tags=[{'Key':'Name', 'Value':get_tag(tag)}])
print("SERVICE: {0:<15}\tID: {1:<15}\tIP: {2:<15}\tDNS: {3:<15}".format(tag, v.instance_id, v.public_ip_address, v.public_dns_name))


29 changes: 29 additions & 0 deletions aws_config/host_install_scripts/spark_install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash

### Agree to stupid oracle license nonsense
### See http://stackoverflow.com/questions/19275856/auto-yes-to-the-license-agreement-on-sudo-apt-get-y-install-oracle-java7-instal
echo debconf shared/accepted-oracle-license-v1-1 select true | sudo debconf-set-selections
echo debconf shared/accepted-oracle-license-v1-1 seen true | sudo debconf-set-selections

### Install Java 8
apt-get update
apt-get install -y python-software-properties
add-apt-repository -y ppa:webupd8team/java
apt-get update
apt-get install -y oracle-java8-installer

###
apt-get install -y scala

# Install sbt
wget https://dl.bintray.com/sbt/debian/sbt-0.13.7.deb -P ~/Downloads
dpkg -i ~/Downloads/sbt-0.13.7.deb
apt-get install sbt

# Install Spark
wget http://apache.mirrors.tds.net/spark/spark-1.4.1/spark-1.4.1-bin-hadoop2.4.tgz -P ~/Downloads
tar zxvf ~/Downloads/spark-1.4.1-bin-hadoop2.4.tgz -C /usr/local
sudo mv /usr/local/spark-1.4.1-bin-hadoop2.4 /usr/local/spark
sudo chown -R ubuntu /usr/local/spark


2 changes: 1 addition & 1 deletion aws_config/straw_service_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
#Source this file to set enviornment vars for config
export AWS_PEM_FILE=/home/ryan/projects/insight/accounts/rwalker.pem
export PEM_KEY=rwalker
export TAG_PREFIX=straw
export TAG_PREFIX=rwalker

66 changes: 0 additions & 66 deletions util/tweet_collect.py

This file was deleted.

0 comments on commit c07412b

Please sign in to comment.