Skip to content

Instantly share code, notes, and snippets.

@Amit-PivotalLabs
Last active December 19, 2020 15:50
Show Gist options
  • Save Amit-PivotalLabs/7c86c92aae3123ac809e81795a41acfa to your computer and use it in GitHub Desktop.
Save Amit-PivotalLabs/7c86c92aae3123ac809e81795a41acfa to your computer and use it in GitHub Desktop.
Spark on Cloud Foundry

Spark on Cloud Foundry

This document describes one means of running a simple Apache Spark cluster on Cloud Foundry. It makes heavy use of Cloud Foundry's container networking features.

You can see an example running at http://spark-ui-proxy.184.73.108.92.xip.io.

Deploy BOSH-Lite on AWS

This cluster was deployed using BOSH-Lite on AWS. Note, this Director cannot be targetted with the new BOSH CLI (see cloudfoundry-attic/bosh-lite#424), but you can use the "old" Ruby CLI just fine. You can use the new CLI for local workflows like manifest interpolation, and then the "old" CLI for remote workflows like deploying and SSH.

My BOSH CLIs are as follows:

$ which bosh
/Users/amitgupta/.rubies/ruby-2.4.0/bin/bosh

$ bosh --version
BOSH 1.3262.26.0

$ /usr/local/bin/bosh --version
version 2.0.1-74fad57-2017-02-15T20:17:00Z

Succeeded

So bosh commands refer to the Ruby CLI, and /usr/local/bin/bosh will be used for the Golang CLI.

Set up to deploy Cloud Foundry with Container Networking

Adapting from the cf-networking-release documentation on BOSH-Lite deploys, you must:

$ ssh -i <BOSH_LITE_SSH_KEY_PATH> ubuntu@<BOSH_LITE_ELASTIC_IP>
> sudo modprobe br_netfilter

$ bosh upload stemcell https://bosh.io/d/stemcells/bosh-warden-boshlite-ubuntu-trusty-go_agent?v=3363.12

$ bosh upload release https://bosh.io/d/github.com/cloudfoundry-incubator/cf-networking-release?v=0.18.0

Generate the Cloud Foundry manifest

We use cf-deployment with a few tweaks so that:

  • it's tailored to BOSH-Lite but doesn't switch the control plane's database from MySQL to Postgres (see cloudfoundry/cf-deployment#96);
  • adds a director_uuid to the manifest so that it works with the old Ruby BOSH CLI; and
  • supports Container Networking.
$ cd ~/workspace/cf-deployment
$ git checkout bfa7cc261ac492924c4c6773b3dcfaf8bf2ab74c
$ /usr/local/bin/bosh interpolate \
  -v system_domain=<BOSH_LITE_ELASTIC_IP>.xip.io \
  -v director_uuid=$(bosh status --uuid) \
  -o operations/bosh-lite.yml \
  -o operations/director-uuid.yml \
  -o operations/c2c-networking.yml \
  --vars-store cf-vars-store.yml \
  cf-deployment.yml  > cf.yml

The ops files used here are included in this Gist.

  • bosh-lite.yml is based on this but has the parts about removing "unnecessary" NATS IPs and switching all the MySQL stuff to Postgres removed.
  • c2c-networking.yml is based on this but removes the hardcoded NATS properties since these aren't correct in the BOSH-Lite context, and are handled implicitly by BOSH Links. That improvement is already covered by an existing PR.
  • director-uuid.yml is new, and just to make things work with the old BOSH CLI.

Note the use of xip.io for system domain. This service is known to be very flaky, so expect a few bumps, especially when doing anything like getting cf logs.

Deploy and Configure Cloud Foundry

$ bosh -d cf.yml deploy
$ cf api api.<BOSH_LITE_ELASTIC_IP>.xip.io --skip-ssl-validation
$ cf auth admin $(/usr/local/bin/bosh int -l cf-vars-store.yml <(echo '((uaa_scim_users_admin_password))'))
$ cf enable-feature-flag diego_docker
$ cf create space and-time
$ cf target -s and-time

Deploy the Spark cluster

This configuration is based on deploying Spark on Kubernetes.

Start the Spark Master

$ cf push spark-master \
  --docker-image gcr.io/google_containers/spark:1.5.2_v1 \
  -i 1 \
  -m 4G \
  --no-manifest \
  --health-check-type none \
  -c 'sleep 20 && /bin/bash -c ". /start-common.sh; /opt/spark/bin/spark-class org.apache.spark.deploy.master.Master --ip ${CF_INSTANCE_INTERNAL_IP} --port 7077 --webui-port 8080"'

Push some Spark Workers

$ cf push spark-worker \
   --docker-image gcr.io/google_containers/spark:1.5.2_v1 \
  -i 3 \
  -i 1G \
  --no-manifest \
  --health-check-type none \
  -c "sleep 20 && /bin/bash -c \". /start-common.sh; /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://$(cf ssh spark-master -c 'echo ${CF_INSTANCE_INTERNAL_IP}'):7077 --port 7077 --webui-port 8080\"" \
  --no-start

Allow access between Master and Workers

I don't understand the details of how Spark communication works, but it appears the Master and/or the Workers spin up many concurrent TCP listeners, on random ports, and need to be able to connect to each other on these ports. While the current Container Networking features are great for connecting applications via a small handful of ports, unlike rice, it's not great when you want 2000 of something.

Run the provided Ruby script to allow access. This may take 3-4 minutes:

$ ruby allow-access.rb <BOSH_LITE_ELASTIC_IP>

Start the Workers

$ cf start spark-worker

Start the Spark UI Proxy

This part isn't strictly necessary, but it allows you to see Worker logs. This allowed me to figure out that there were a lot more ports I needed to allow access for between the Master and Workers, so it's helpful to have.

$ cd ~/workspace/spark-ui-proxy
$ ls
spark-ui-proxy.py
$ touch requirements.txt
$ cf push spark-ui-proxy \
  -b python_buildpack \
  -i 1 \
  -m 1G \
  --no-manifest \
  --health-check-type none \
  -c "python spark-ui-proxy.py $(cf ssh spark-master -c 'echo ${CF_INSTANCE_INTERNAL_IP}'):8080" \
  --no-start
$ cf allow-access spark-ui-proxy spark-master --port 7077 --protocol tcp
$ cf allow-access spark-ui-proxy spark-master --port 8080 --protocol tcp
$ cf allow-access spark-ui-proxy spark-worker --port 7077 --protocol tcp
$ cf allow-access spark-ui-proxy spark-worker --port 8080 --protocol tcp
$ cf start spark-ui-proxy

Use Spark

$ cf ssh spark-master

~# /opt/spark-1.5.2-bin-hadoop2.6/bin/spark-shell \
   --master spark://${CF_INSTANCE_INTERNAL_IP}:7077 \
   --name calculate_pi

> val count = sc.parallelize(1 to 500000000).filter { _ =>
  val x = math.random
  val y = math.random
  x*x + y*y < 1
}.count()
...
17/03/19 09:34:43 INFO DAGScheduler: Job 0 finished: count at <console>:25, took 85.931528 s
count: Long = 392707353

> println(s"Pi is roughly ${4.0 * count / 500000000}")
Pi is roughly 3.141658824

NOTE: If you ever restart the Master...

... make sure to re-push the Workers and UI Proxies since their start commands are dynamically computed at cf push time with some embedded bash that gets the Master's internal IP, which changes every time it restarts.

Fun Fact

  • Paravirtualization hypervisor is running...
  • a BOSH-Lite VM, which is running...
  • Garden, which is running...
  • A Linux container representing the Diego Cell, which is running...
  • Garden (again), which is running...
  • Linux containers representing the Spark Master and Workers, which are running...
  • Spark task executors.

Fun Fact #2

A single-threaded Golang calculation was much faster than this. Note that the 3 "Workers" are all really just running on the same VM, as is the Master and UI Proxies.

$ time go run pi.go
Pi is roughly 3.1415864480
real	0m42.111s
user	0m41.764s
sys	0m0.270s
require 'json'
require 'uri'
require 'net/http'
spark_master_guid = `cf app spark-master --guid`.chomp
spark_worker_guid = `cf app spark-worker --guid`.chomp
authorization = `cf oauth-token`.chomp
policies = ([7077, 8080] + (30000..65535).to_a).flat_map do |port|
[
{
"source" => { "id" => spark_master_guid },
"destination" => { "id" => spark_worker_guid, "port" => port, "protocol" => "tcp" },
},
{
"source" => { "id" => spark_worker_guid },
"destination" => { "id" => spark_master_guid, "port" => port, "protocol" => "tcp" },
},
]
end
uri = URI.parse("http://api.#{ARGV[0]}.xip.io/networking/v0/external/policies")
http = Net::HTTP.new(uri.host, uri.port)
http.read_timeout = 300
req = Net::HTTP::Post.new(uri.path, initheader = {
'Accept' => 'application/json',
'Content-Type' => 'application/json',
'Authorization' => authorization,
})
req.body = { "policies" => policies }.to_json
res = http.request(req)
---
# --- Set Router Static IP ---
- type: replace
path: /instance_groups/name=router/networks
value:
- name: private
static_ips: [10.244.0.34]
# --- Add Bosh Lite Security Groups ---
- type: replace
path: /instance_groups/name=api/jobs/name=cloud_controller_ng/properties/cc/default_running_security_groups
value:
- public_networks
- dns
- load_balancer
- type: replace
path: /instance_groups/name=api/jobs/name=cloud_controller_worker/properties/cc/default_running_security_groups
value:
- public_networks
- dns
- load_balancer
- type: replace
path: /instance_groups/name=cc-clock/jobs/name=cloud_controller_clock/properties/cc/default_running_security_groups
value:
- public_networks
- dns
- load_balancer
- type: replace
path: /instance_groups/name=api/jobs/name=cloud_controller_ng/properties/cc/security_group_definitions/-
value:
name: load_balancer
rules:
- destination: 10.244.0.34
protocol: all
- type: replace
path: /instance_groups/name=api/jobs/name=cloud_controller_worker/properties/cc/security_group_definitions/-
value:
name: load_balancer
rules:
- destination: 10.244.0.34
protocol: all
- type: replace
path: /instance_groups/name=cc-clock/jobs/name=cloud_controller_clock/properties/cc/security_group_definitions/-
value:
name: load_balancer
rules:
- destination: 10.244.0.34
protocol: all
# ----- Move SSH Proxy from diego-brain to router ----
- type: remove
path: /instance_groups/name=diego-brain/vm_extensions
- type: remove
path: /instance_groups/name=diego-brain/jobs/name=ssh_proxy
- type: replace
path: /instance_groups/name=router/vm_extensions
value:
- ssh-proxy-and-router-lb
- type: replace
path: /instance_groups/name=router/jobs/-
value:
name: ssh_proxy
release: diego
properties:
diego:
ssl:
skip_cert_verify: true
ssh_proxy:
enable_cf_auth: true
host_key: "((diego_ssh_proxy_host_key.private_key))"
uaa_secret: "((uaa_clients_ssh-proxy_secret))"
uaa_token_url: "https://uaa.((system_domain))/oauth/token"
bbs: &5
ca_cert: "((diego_bbs_client.ca))"
client_cert: "((diego_bbs_client.certificate))"
client_key: "((diego_bbs_client.private_key))"
# ----- Scale Down ------
- type: replace
path: /instance_groups/name=consul/instances
value: 1
- type: replace
path: /instance_groups/name=nats/instances
value: 1
- type: replace
path: /instance_groups/name=etcd/instances
value: 1
- type: replace
path: /instance_groups/name=etcd/jobs/name=etcd/properties/etcd/cluster
value:
- instances: 1
name: etcd
- type: replace
path: /instance_groups/name=diego-bbs/instances
value: 1
- type: replace
path: /instance_groups/name=uaa/instances
value: 1
- type: replace
path: /instance_groups/name=diego-brain/instances
value: 1
- type: replace
path: /instance_groups/name=diego-cell/instances
value: 1
- type: replace
path: /instance_groups/name=diego-cell/jobs/name=rep/properties/diego/rep/evacuation_timeout_in_seconds?
value: 0
- type: replace
path: /instance_groups/name=router/instances
value: 1
- type: replace
path: /instance_groups/name=route-emitter/azs
value:
- z1
- type: replace
path: /instance_groups/name=route-emitter/instances
value: 1
- type: replace
path: /instance_groups/name=api/instances
value: 1
- type: replace
path: /instance_groups/name=cc-bridge/instances
value: 1
- type: replace
path: /instance_groups/name=doppler/instances
value: 1
- type: replace
path: /instance_groups/name=log-api/instances
value: 1
# add cf-networking release
- type: replace
path: /releases/name=cf-networking?/version
value: latest
# add network policy db to cf mysql
- type: replace
path: /instance_groups/name=mysql/jobs/name=mysql/properties/cf_mysql/mysql/seeded_databases/-
value:
name: network_policy
username: network_policy
password: "((cf_mysql_mysql_seeded_databases_network_policy_password))"
# add users and client scopes
- type: replace
path: /instance_groups/name=uaa/jobs/name=uaa/properties/uaa/scim/users/name=admin/groups/-
value: network.admin
- type: replace
path: /instance_groups/name=uaa/jobs/name=uaa/properties/uaa/clients/cf/scope?
value: network.admin,network.write,cloud_controller.read,cloud_controller.write,openid,password.write,cloud_controller.admin,scim.read,scim.write,doppler.firehose,uaa.user,routing.router_groups.read,routing.router_groups.write
- type: replace
path: /instance_groups/name=uaa/jobs/name=uaa/properties/uaa/clients/network-policy?
value:
authorities: uaa.resource,cloud_controller.admin_read_only
authorized-grant-types: client_credentials,refresh_token
secret: "((uaa_clients_network_policy_secret))"
- type: replace
path: /instance_groups/name=uaa/jobs/name=uaa/properties/uaa/clients/datadog-firehose-nozzle?
value:
access-token-validity: 1209600
authorized-grant-types: authorization_code,client_credentials,refresh_token
override: true
secret: "((uaa_clients_datadog_firehose_nozzle_secret))"
scope: openid,oauth.approvals,doppler.firehose
authorities: oauth.login,doppler.firehose
# point garden to external networker
- type: replace
path: /instance_groups/name=diego-cell/jobs/name=garden/properties/garden/network_plugin?
value: /var/vcap/packages/runc-cni/bin/garden-external-networker
- type: replace
path: /instance_groups/name=diego-cell/jobs/name=garden/properties/garden/network_plugin_extra_args?/-
value: --configFile=/var/vcap/jobs/garden-cni/config/adapter.json
# c2c jobs on the diego cell
- type: replace
path: /instance_groups/name=diego-cell/jobs/-
value:
name: garden-cni
release: cf-networking
properties:
cf_networking:
garden_external_networker:
cni_plugin_dir: /var/vcap/packages/flannel/bin
cni_config_dir: /var/vcap/jobs/cni-flannel/config/cni
- type: replace
path: /instance_groups/name=diego-cell/jobs/-
value:
name: cni-flannel
release: cf-networking
properties:
cf_networking:
plugin:
etcd_endpoints:
- cf-etcd.service.cf.internal
etcd_client_cert: "((etcd_client.certificate))"
etcd_client_key: "((etcd_client.private_key))"
etcd_ca_cert: "((etcd_client.ca))"
- type: replace
path: /instance_groups/name=diego-cell/jobs/-
value:
name: netmon
release: cf-networking
- type: replace
path: /instance_groups/name=diego-cell/jobs/-
value:
name: vxlan-policy-agent
release: cf-networking
properties:
cf_networking:
vxlan_policy_agent:
policy_server_url: https://policy-server.service.cf.internal:4003
ca_cert: "((network_policy_client.ca))"
client_cert: "((network_policy_client.certificate))"
client_key: "((network_policy_client.private_key))"
# policy server vm
- type: replace
path: /instance_groups/-
value:
name: policy-server
azs:
- z1
- z2
instances: 2
vm_type: t2.small
stemcell: default
networks:
- name: private
jobs:
- name: policy-server
release: cf-networking
properties:
cf_networking:
policy_server:
uaa_client_secret: "((uaa_clients_network_policy_secret))"
uaa_ca: "((uaa_ssl.ca))"
ca_cert: "((network_policy_server.ca))"
server_cert: "((network_policy_server.certificate))"
server_key: "((network_policy_server.private_key))"
database:
type: mysql
username: network_policy
password: "((cf_mysql_mysql_seeded_databases_network_policy_password))"
host: sql-db.service.cf.internal
port: 3306
name: network_policy
- name: route_registrar
release: routing
properties:
route_registrar:
routes:
- name: policy-server
port: 4002
registration_interval: 20s
uris:
- "api.((system_domain))/networking"
- name: consul_agent
release: consul
consumes:
consul: {from: consul_server}
properties:
consul:
agent:
services:
policy-server:
name: policy-server
check:
interval: 5s
script: /bin/true
- name: metron_agent
release: loggregator
properties:
syslog_daemon_config:
enable: false
metron_agent:
deployment: "((system_domain))"
etcd:
client_cert: "((etcd_client.certificate))"
client_key: "((etcd_client.private_key))"
metron_endpoint:
shared_secret: "((dropsonde_shared_secret))"
loggregator:
tls:
ca_cert: "((loggregator_ca.certificate))"
metron:
cert: "((loggregator_tls_metron.certificate))"
key: "((loggregator_tls_metron.private_key))"
etcd:
require_ssl: true
ca_cert: "((etcd_server.ca))"
machines:
- cf-etcd.service.cf.internal
# add vars
- type: replace
path: /variables/-
value:
name: cf_mysql_mysql_seeded_databases_network_policy_password
type: password
- type: replace
path: /variables/-
value:
name: uaa_clients_network_policy_secret
type: password
- type: replace
path: /variables/-
value:
name: uaa_clients_datadog_firehose_nozzle_secret
type: password
- type: replace
path: /variables/-
value:
name: network_policy_ca
type: certificate
options:
is_ca: true
common_name: networkPolicyCA
- type: replace
path: /variables/-
value:
name: network_policy_server
type: certificate
options:
ca: network_policy_ca
common_name: policy-server.service.cf.internal
extended_key_usage:
- server_auth
- type: replace
path: /variables/-
value:
name: network_policy_client
type: certificate
options:
ca: network_policy_ca
common_name: clientName
extended_key_usage:
- client_auth
- type: replace
path: /director_uuid?
value: ((director_uuid))
import BaseHTTPServer
import os
import sys
import urllib2
SPARK_MASTER_HOST = ""
class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self):
# redirect if we are hitting the home page
if self.path == "" or self.path == "/":
self.send_response(302)
self.send_header("Location", "/proxy:" + SPARK_MASTER_HOST)
self.end_headers()
return
self.proxyRequest(None)
def do_POST(self):
length = int(self.headers.getheader('content-length'))
postData = self.rfile.read(length)
self.proxyRequest(postData)
def proxyRequest(self, data):
targetHost, path = self.extractUrlDetails(self.path)
targetUrl = "http://" + targetHost + path
print "get: " + self.path
print "host: " + targetHost
print "path: " + path
print "target: " + targetUrl
proxiedRequest = urllib2.urlopen(targetUrl, data)
resCode = proxiedRequest.getcode()
if resCode == 200:
page = proxiedRequest.read()
page = self.rewriteLinks(page, targetHost)
self.send_response(200)
self.end_headers()
self.wfile.write(page)
elif resCode == 302:
self.send_response(302)
self.send_header("Location", "/proxy:" + SPARK_MASTER_HOST)
self.end_headers()
else:
raise Exception("Unsupported response: " + resCode)
def extractUrlDetails(self, path):
if path.startswith("/proxy:"):
idx = path.find("/", 7)
targetHost = path[7:] if idx == -1 else path[7:idx]
path = "" if idx == -1 else path[idx:]
else:
targetHost = SPARK_MASTER_HOST
path = path
return (targetHost, path)
def rewriteLinks(self, page, targetHost):
page = page.replace("href=\"/", "href=\"/proxy:{0}/".format(targetHost))
page = page.replace("href=\"log", "href=\"/proxy:{0}/log".format(targetHost))
page = page.replace("href=\"http://", "href=\"/proxy:")
page = page.replace("src=\"/", "src=\"/proxy:{0}/".format(targetHost))
page = page.replace("action=\"", "action=\"/proxy:{0}/".format(targetHost))
return page
if __name__ == '__main__':
SPARK_MASTER_HOST = sys.argv[1]
BaseHTTPServer.HTTPServer(("0.0.0.0", 8080), ProxyHandler).serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment