Skip to content

Instantly share code, notes, and snippets.

@granturing
Last active November 14, 2022 04:28
Show Gist options
  • Star 29 You must be signed in to star a gist
  • Fork 13 You must be signed in to fork a gist
  • Save granturing/a09aed4a302a7367be92 to your computer and use it in GitHub Desktop.
Save granturing/a09aed4a302a7367be92 to your computer and use it in GitHub Desktop.
Sample reactive Leaflet code for Zeppelin
<!-- place this in an %angular paragraph -->
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/leaflet/0.7.5/leaflet.css" />
<div id="map" style="height: 800px; width: 100%"></div>
<script type="text/javascript">
function initMap() {
var map = L.map('map').setView([30.00, -30.00], 3);
L.tileLayer('http://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {
attribution: 'Map data &copy; <a href="http://openstreetmap.org">OpenStreetMap</a> contributors',
maxZoom: 12,
minZoom: 3
}).addTo(map);
var geoMarkers = L.layerGroup().addTo(map);
var el = angular.element($('#map').parent('.ng-scope'));
angular.element(el).ready(function() {
window.locationWatcher = el.scope().compiledScope.$watch('locations', function(newValue, oldValue) {
// geoMarkers.clearLayers(); -- if you want to only show new data clear the layer first
angular.forEach(newValue, function(tweet) {
var marker = L.marker([ tweet.loc.lat, tweet.loc.lon ])
.bindPopup(tweet.user + ": " + tweet.tweet)
.addTo(geoMarkers);
});
})
});
}
if (window.locationWatcher) {
// clear existing watcher otherwise we'll have duplicates
window.locationWatcher();
}
// ensure we only load the script once, seems to cause issues otherwise
if (window.L) {
initMap();
} else {
console.log('Loading Leaflet library');
var sc = document.createElement('script');
sc.type = 'text/javascript';
sc.src = 'https://cdnjs.cloudflare.com/ajax/libs/leaflet/0.7.5/leaflet.js';
sc.onload = initMap;
sc.onerror = function(err) { alert(err); }
document.getElementsByTagName('head')[0].appendChild(sc);
}
</script>
// place this in a Spark paragraph
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted
import org.apache.spark.streaming.twitter._
case class Loc(lat: Double, lon: Double)
case class Tweet(timestamp: java.util.Date, user: String, tweet: String, loc: Loc)
val ssc = new StreamingContext(sc, Seconds(2))
val input = TwitterUtils.createStream(ssc, None)
input.foreachRDD(rdd => {
val df = rdd
.filter(_.getGeoLocation() != null)
.map(s => Tweet(
s.getCreatedAt,
s.getUser.getName,
s.getText,
Loc(s.getGeoLocation.getLatitude, s.getGeoLocation.getLongitude)))
var items = df.collect
z.angularBind("locations", items) // this is what sends the data to the frontend
})
ssc.start()
@sbelyankin
Copy link

WOW, great work!

@vak
Copy link

vak commented Sep 22, 2017

oh, similarly as @sameerast mentioned: twitter isn't available at EMR Zeppelin as for now :-/
but the map is atleast shown, it would be nice to get an idea how to place markers on a map now and how to show some custome routes :)

@alisator
Copy link

I used as dependency
%spark2.dep
z.reset()
z.addRepo("MavenCentral").url("https://mvnrepository.com/")
..
z.load("org.apache.bahir:spark-streaming-twitter_2.11:2.3.0")
https://bahir.apache.org/docs/spark/2.3.0/spark-streaming-twitter/ for new Spark 2.3 This solves error: object twitter is not a member of package org.apache.spark.streaming import org.apache.spark.streaming.twitter._
If you use twitter original reference for old spark -> it will lead to another issue, wrong paths to classes and so on.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment