Skip to content

Instantly share code, notes, and snippets.

/**
* An RDD that applies the provided function to every partition of the parent RDD.
*/
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {
...
}
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
traceroute to 8.8.8.8 (8.8.8.8), 64 hops max, 52 byte packets
1 192.168.1.254 (192.168.1.254) 1.853 ms 1.047 ms 0.864 ms
2 bas75-2-88-170-200-254.fbx.proxad.net (88.170.200.254) 8.597 ms 13.748 ms 20.132 ms
3 213.228.4.254 (213.228.4.254) 7.894 ms 9.763 ms 8.298 ms
4 bzn-crs16-1-be1024.intf.routers.proxad.net (212.27.56.149) 19.415 ms 7.696 ms 10.763 ms
5 bzn-crs16-2-be1005.routers.proxad.net (78.254.249.77) 8.272 ms 9.363 ms 9.688 ms
6 72.14.211.26 (72.14.211.26) 11.785 ms 10.035 ms 9.851 ms
7 72.14.239.205 (72.14.239.205) 9.241 ms
72.14.239.145 (72.14.239.145) 7.893 ms 10.883 ms
8 216.239.47.89 (216.239.47.89) 8.495 ms
abstract class RDD[T: ClassTag](...) extends Serializable with Logging {
...
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
...
}
@cerisier
cerisier / lsi.sh
Created June 15, 2016 12:12
LSI MegaRaid CLI - taken from calomel.org
#!/bin/bash
#
# Calomel.org
# https://calomel.org/megacli_lsi_commands.html
# LSI MegaRaid CLI
# lsi.sh @ Version 0.05
#
# description: MegaCLI script to configure and monitor LSI raid cards.
# Full path to the MegaRaid CLI binary
@cerisier
cerisier / install-py3-dataproc.sh
Last active July 2, 2021 21:56
Dataproc initialization action script for installing python3
#!/bin/bash
# from https://gist.githubusercontent.com/nehalecky/9258c01fb2077f51545a/raw/789f08141dc681cf1ad5da05455c2cd01d1649e8/install-py3-dataproc.sh
apt-get -y install python3
echo "export PYSPARK_PYTHON=python3" | tee -a /etc/profile.d/spark_config.sh /etc/*bashrc /usr/lib/spark/conf/spark-env.sh
echo "Adding PYTHONHASHSEED=0 to profiles and spark-defaults.conf..."
echo "export PYTHONHASHSEED=0" | tee -a /etc/profile.d/spark_config.sh /etc/*bashrc /usr/lib/spark/conf/spark-env.sh
echo "spark.executorEnv.PYTHONHASHSEED=0" >> /etc/spark/conf/spark-defaults.conf
@cerisier
cerisier / check.sh
Created March 25, 2018 19:37
Ensure no unterminated quotes in cldr strings
#!/bin/bash
set -eux
for i in `find . -iname strings_cldr.xml`; do cat $i | tr -cd "'\n" | awk 'length%2==1 {print NR, $0}'; done;
@cerisier
cerisier / HackyProtobufAvroParquetExample.kt
Last active November 18, 2022 21:42
Hackish example of writing protobuf DynamicMessage objects to parquet via Avro using AvroParquetWriter.
package org.example
import com.google.protobuf.Descriptors
import com.google.protobuf.DynamicMessage
import org.apache.avro.LogicalTypes
import org.apache.avro.Schema
import org.apache.avro.protobuf.ProtobufData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter