Skip to content

Instantly share code, notes, and snippets.

View lyubent's full-sized avatar

Lyuben Todorov lyubent

View GitHub Profile
// parallelize a sequence of 3-touples
val rdd1 = sc.parallelize(Seq(("Lora", 41, 6), ("Jack", 12, 3), ("Smith", 41, 2)), 2)
// rdd1: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:29
// map over the rdd we created, aggregate fields 2 and 3 by summing them.
val rddExtraCol = rdd1.map(e => (e._1, e._2, e._3, e._2+e._3))
rddExtraCol: org.apache.spark.rdd.RDD[(String, Int, Int, Int)] = MapPartitionsRDD[10] at map at <console>:31
// check first item in original rdd
// Start Spark in local mode and take 2 cores.
// $ cd $SPARK_HOME
// $ ./bin/spark-shell --master local[2]
// create the Hive table, we need this to exist before we write to hive.
sqlContext.sql("CREATE TABLE IF NOT EXISTS test (key int, value String ) STORED AS TEXTFILE")
// create an RDD from a in-memory collection of pair data.
// Notice the pairs here are Int:String, like the table schema we devined in Hive.
val kvRDD = sc.parallelize(List((1, "data"), (2, "more data")))
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object ConnTest
{
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("AaA")
.setMaster("spark://localhost:7077")
// configuration to help point the driver to the master
@lyubent
lyubent / spark-env.sh
Last active August 29, 2015 14:18
Minimalistic Spark configuration
#!/usr/bin/env bash
# Directory to use for "scratch" space in Spark, including map output files
# and RDDs that get stored on disk. This should be on a fast, local disk in
# your system. It can also be a comma-separated list of multiple directories
# on different disks.
export SPARK_LOCAL_DIRS=/spark/data/spark-data
# IP address of the machine to bind to
export SPARK_LOCAL_IP=localhost
# Bind the master to a specific IP address
export SPARK_MASTER_IP=localhost
@lyubent
lyubent / QueryRecorder.diff
Created May 17, 2014 20:26
QueryRecorder difference pre package change.
@@ -19,130 +19,134 @@ package org.apache.cassandra.cql3.recording;
import java.io.*;
import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
Current patch format: logger.info("config:[\n " + Joiner.on("\n ").join(configMap.entrySet()) + "\n]");
INFO 18:21:10 config:[
authenticator=AllowAllAuthenticator
authorizer=AllowAllAuthorizer
auto_snapshot=true
cas_contention_timeout_in_ms=1000
client_encryption_options=<REDACTED>
cluster_name=Test Cluster
column_index_size_in_kb=64
commitlog_directory=/var/lib/cassandra/commitlog
// at DataTracker#newSSTables
// todo 5351
// in cluster = [a, b, c] this assert fails every time after one node (lets say b) is repaired
// an then another is attempted to be repaired (lets say c) If the repair is then repeated on
// node C or B the assert will be passed, however if it's attempted on A (as a hasn't been repaired)
// it will fail. This cycle repeats every time new data is added.
// System.out.println("newSSTables(.size() = " + newSSTables.size());
// assert newSSTables.size() == newSSTablesSize : String.format("Expecting new size of %d, got %d while replacing %s by %s in %s", newSSTablesSize, newSSTables.size(), oldSSTables, replacements, this);
if (newSSTables.size() != newSSTablesSize)
@lyubent
lyubent / gist:7651310
Created November 26, 2013 00:19
CASSANDRA-5818 Test cases for dirs.
Test scenarios
1) /var/lib/cassandra doesn't exist
2) /var/lib/cassandra is a file and not a directory
3) Have only X permissions
4) Have only W permissions
5) Have only R permissions
6) Have only RW permissions
7) Have only RX permissions
8) Have only XW permissions
@lyubent
lyubent / gist:7564180
Last active December 28, 2015 21:19
1.2.8 Querying on secondery index with 2M Rows inserted. Machine specs (for clary): Core 2 Duo @ 1.4GHz, 4GB RAM, 128GB Flash drive.
cqlsh> TRACING ON;
Now tracing requests.
cqlsh> SELECT * FROM test.usertask where ts=6 limit 3000000;
tid | content | ts
--------------------------------------+------------------------------------------------------------------------+----
f00a1bbc-b656-4e04-87e8-f45c12cf5f04 | dYZQOYKmzynXENjVPXJHZDBFXUUePbEDcXzFtTDUfrSCzghRxopMZNYNBhKHPPZtfbCUci | 6
ecd42378-8c78-4350-9dc4-4a25e56aac8e | bQrAspzdzRNSAzwMndVCcGzqIswFaGfcOYcfwqpzyFxpizcjiyddBOEItVYNTnbUTIWfFL | 6
fdab359a-7fde-465c-9b87-0aac2b157f89 | ssGrrZTPbxqPCHKskOhrrNEQDhaTqIYsYFZYkdDkBfZrPVkHTRtPftRmHeRYMTerpGmAvU | 6
@lyubent
lyubent / 6316.sh
Last active December 28, 2015 00:39
CASSANDRA-6316 verification.
# Procedure:
# 1) Start cassandra
./cassandra -f
# 2) Create schema, insert 2 rows then delete one.
./cqlsh
CREATE KEYSPACE test_range_tombstones WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
CREATE TABLE test_range_tombstones.tbl (a INT, b text, c INT, d TIMESTAMP, e BOOLEAN, PRIMARY KEY(a, b, c)) WITH compaction = { 'class' : 'LeveledCompactionStrategy' };
insert into test_range_tombstones.tbl (a, b, c, d, e) values (1, 'fdsaf', 0, 4, false);
insert into test_range_tombstones.tbl (a, b, c, d, e) values (3, 'opir', 8, 8, true);