Skip to content

Instantly share code, notes, and snippets.

View anish749's full-sized avatar
:octocat:

Anish Chakraborty anish749

:octocat:
View GitHub Profile
@anish749
anish749 / pyspark_python3.md
Created August 4, 2019 20:02
Pyspark with Python3 on EMR

Set Pyspark to use version Python 3 on AWS.

$ sudo sed -i -e '$a\export PYSPARK_PYTHON=/usr/bin/python3' /etc/spark/conf/spark-env.sh

Install boto3 if needed:

$ sudo python3 -m pip install boto3
@anish749
anish749 / Java8.md
Last active August 4, 2019 17:46
Quickly switch to Java 8

Weird errros with Apache Spark / Beam / Scio?

For most of 2019 and early 2020, this will most likely be caused by not having the right Java version.

As of August 2019, all big data frame works mostly are written and compatible with Java 8.

Check your java version here:

$ java -version
package qscio
import java.util.UUID
import com.spotify.scio.values.SCollection
import com.spotify.scio.{ScioContext, ScioMetrics, ScioResult}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.beam.runners.dataflow.DataflowRunner
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import org.apache.beam.runners.direct.DirectRunner
@anish749
anish749 / BloomFilterAggregator.scala
Last active May 14, 2019 09:48
Immutable BF Aggregator backed by private mutable BitSet.
// An Algebird immutable Bloom Filter Aggregator backed by a private mutable.BitSet.
final case class FBloomFilterAggregator[A](numHashes: Int, width: Int)(
implicit hash: Hash128[A]
) extends Aggregator[A, mutable.BitSet, BF[A]] {
private val hasher = BFHash(numHashes, width)(hash)
def prepare(value: A): mutable.BitSet = {
val hashes = hasher(value)
val b = new mutable.BitSet()
@anish749
anish749 / SparkDataSetPartitionUtils.scala
Created October 22, 2017 10:57
Utility functions for extending Spark Datasets for exploring partitions
package org.anish.spark.skew
import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{Dataset, SaveMode}
/**
* Few Utility functions for extending Spark Datasets for exploring partitions
* Created by anish on 22/10/17.
package org.anish.akka
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives._
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
@anish749
anish749 / recursive_find_replace_mac.sh
Created August 7, 2017 07:05
Recursive find and replace using sed in Mac OS X
find . -type f -name '*.java' -exec sed -i '' 's/find/replace/g' {} +
# Find files (type f)
# with name *.java
# execute sed -i (inplace)
# '' means no backup files to be created
# 's/find/replace/g' sed command to do actual replace
# {} + the output of the find command is concatanated and fed to sed.
@anish749
anish749 / MostCommonValue.scala
Last active February 3, 2023 11:50
Spark UDAF to calculate the most common element in a column or the Statistical Mode for a given column. Written and test in Spark 2.1.0
package org.anish.spark.mostcommonvalue
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scalaz.Scalaz._
/**
* Spark User Defined Aggregate Function to calculate the most frequent value in a column. This is similar to
@anish749
anish749 / ParquetCommonMetaDataGenerator.scala
Created May 15, 2017 19:42
Generate Parquet Common MetaData separately using a Spark Submit
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter}
/**
@anish749
anish749 / SparkParquetGcOutOfMemory.scala
Created May 15, 2017 15:39
Code to produce GC OutOfMemory - Overhead Limit Exceeded error when appending data to Parquet files from Apache Spark
package org.anish.spark.gcissue
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random
/**
* A random code written to produce GC OutOfMemory Issue with Spark and Parquet. More details in my blog at <anish749.github.io>
*