Skip to content

Instantly share code, notes, and snippets.

name animal age color
fido dog 4 brown
annabelle cat 15 white
fred bear 29 brown
julie parrot 1 brown
gus fish 1 gold
daisy iguana 2 green
name animal age color
fido 1 4 1
annabelle 2 15 2
fred 3 29 1
julie 4 1 1
gus 5 1 4
daisy 6 2 5
package tv.spotx.scala.monitoring.listeners
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.scheduler._
import org.joda.time.DateTime
import tv.spotx.scala.dbutils.{ConnectionPool, InfluxDBWriter, MySQLConnection}
/**
* :: SpotXSparkStreamingListener ::
* A simple StreamingListener that logs summary statistics across Spark Streaming batches; inherits from DeveloperAPI.
package tv.spotx.scala.dbutils
import java.sql.{Connection, DriverManager}
import java.util.Properties
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool}
import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
import org.apache.logging.log4j.scala.Logging
package tv.spotx.scala.dbutils
import org.apache.logging.log4j.scala.Logging
import scalaj.http.{Http, HttpOptions, HttpResponse}
case class InfluxConfig(hostname: String = "console",
port: Int = 8086, // scalastyle:off magic.number
database: String = "devtest",
ssl: Boolean = false,
username: Option[String] = None,
package hadoopsters.spark.scala.monitoring.listeners
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.scheduler._
import org.joda.time.DateTime
/**
* :: ExampleStreamingListener ::
* A simple StreamingListener that accesses summary statistics across Spark Streaming batches; inherits from DeveloperAPI.
*
@hadoopsters
hadoopsters / joining_streaming_and_static_datasets.scala
Last active February 4, 2021 11:44
A simple way to join static and streaming datasets using the transform() function of a Dstream.
// create a case class to represent a Transaction (from streaming)
case class Transaction(
ts: Int,
customer_id: Int,
transaction_id: String,
amount: Double
)
// create a case class to represent a TransactionDetail (from static)
case class TransactionDetail(
@hadoopsters
hadoopsters / CassandraLoader.scala
Last active January 19, 2020 19:16
Loading Data from Cassandra into Hadoop
import com.datastax.spark.connector.cql.CassandraConnectorConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.cassandra._
object CassandraLoader extends Serializable {
/** Representative of the some_keyspace.some_table table. */
case class MyCassandraTable(user_id: String, `type`: Int, key: String, value: String)
def main(args: Array[String]) { // scalastyle:off method.length
@hadoopsters
hadoopsters / streamingLogLevel.scala
Last active September 18, 2018 12:21
How to Change Log Level for Spark Streaming
val conf = new SparkConf().setAppName(appName) // run on cluster
val ssc = new StreamingContext(conf, Seconds(5))
val sc = ssc.sparkContext
sc.setLogLevel("ERROR")
@hadoopsters
hadoopsters / hiveToCsv_1.sh
Last active April 26, 2018 07:27
Export Hive Table to CSV: Method 1
#!/bin/bash
hive -e "insert overwrite local directory '/path/in/local/'
row format delimited fields terminated by ','
select * from my_database.my_table"
cat /path/in/local/* > /another/path/in/local/my_table.csv