Set Pyspark to use version Python 3 on AWS.
$ sudo sed -i -e '$a\export PYSPARK_PYTHON=/usr/bin/python3' /etc/spark/conf/spark-env.sh
Install boto3 if needed:
$ sudo python3 -m pip install boto3
package org.anish.spark.mostcommonvalue | |
import org.apache.spark.sql.Row | |
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} | |
import org.apache.spark.sql.types._ | |
import scalaz.Scalaz._ | |
/** | |
* Spark User Defined Aggregate Function to calculate the most frequent value in a column. This is similar to |
Set Pyspark to use version Python 3 on AWS.
$ sudo sed -i -e '$a\export PYSPARK_PYTHON=/usr/bin/python3' /etc/spark/conf/spark-env.sh
Install boto3 if needed:
$ sudo python3 -m pip install boto3
package qscio | |
import java.util.UUID | |
import com.spotify.scio.values.SCollection | |
import com.spotify.scio.{ScioContext, ScioMetrics, ScioResult} | |
import com.typesafe.config.{Config, ConfigFactory} | |
import org.apache.beam.runners.dataflow.DataflowRunner | |
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions | |
import org.apache.beam.runners.direct.DirectRunner |
// An Algebird immutable Bloom Filter Aggregator backed by a private mutable.BitSet. | |
final case class FBloomFilterAggregator[A](numHashes: Int, width: Int)( | |
implicit hash: Hash128[A] | |
) extends Aggregator[A, mutable.BitSet, BF[A]] { | |
private val hasher = BFHash(numHashes, width)(hash) | |
def prepare(value: A): mutable.BitSet = { | |
val hashes = hasher(value) | |
val b = new mutable.BitSet() |
package org.anish.spark.skew | |
import java.io.File | |
import org.apache.commons.io.FileUtils | |
import org.apache.spark.sql.{Dataset, SaveMode} | |
/** | |
* Few Utility functions for extending Spark Datasets for exploring partitions | |
* Created by anish on 22/10/17. |
package org.anish.akka | |
import akka.actor.{Actor, ActorLogging, ActorSystem, Props} | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ | |
import akka.http.scaladsl.model.StatusCodes | |
import akka.http.scaladsl.server.Directives._ | |
import akka.pattern.ask | |
import akka.stream.ActorMaterializer | |
import akka.util.Timeout |
find . -type f -name '*.java' -exec sed -i '' 's/find/replace/g' {} + | |
# Find files (type f) | |
# with name *.java | |
# execute sed -i (inplace) | |
# '' means no backup files to be created | |
# 's/find/replace/g' sed command to do actual replace | |
# {} + the output of the find command is concatanated and fed to sed. | |
import java.net.URI | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.fs.{FileSystem, Path} | |
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter} | |
import org.apache.spark.{SparkConf, SparkContext} | |
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter} | |
/** |
package org.anish.spark.gcissue | |
import org.apache.spark.sql.{DataFrame, SQLContext} | |
import org.apache.spark.{SparkConf, SparkContext} | |
import scala.util.Random | |
/** | |
* A random code written to produce GC OutOfMemory Issue with Spark and Parquet. More details in my blog at <anish749.github.io> | |
* |