Skip to content

Instantly share code, notes, and snippets.

View lakshay-arora's full-sized avatar
🇮🇳

Lakshay lakshay-arora

🇮🇳
  • Walmart
  • Bengaluru
View GitHub Profile
from random import randint
# create a list of random numbers between 10 to 1000
my_large_list = [randint(10,1000) for x in range(0,20000000)]
# create one partition of the list
my_large_list_one_partition = sc.parallelize(my_large_list,numSlices=1)
# check number of partitions
print(my_large_list_one_partition.getNumPartitions())
# create five partitions of the list
my_large_list_with_five_partition = sc.parallelize(my_large_list, numSlices=5)
# filter numbers greater than equal to 200
my_large_list_with_five_partition = my_large_list_with_five_partition.filter(lambda x : x >= 200)
%%time
# count the number of elements in the filtered list
print(my_large_list_with_five_partition.count())
# import the Matrices
from pyspark.mllib.linalg import Matrices
# create a dense matrix of 3 Rows and 2 columns
matrix_1 = Matrices.dense(3, 2, [1,2,3,4,5,6])
print(matrix_1)
# >> DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], False)
print(matrix_1.toArray())
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
# Create an RDD of coordinate entries with the MatrixEntry class:
matrix_entries = sc.parallelize([MatrixEntry(0, 5, 2), MatrixEntry(1, 1, 1), MatrixEntry(1, 5, 4)])
# Create an CoordinateMatrix from an RDD of MatrixEntries.
c_matrix = CoordinateMatrix(matrix_entries)
# number of columns
print(c_matrix.numCols())
# import the libraries
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 3, [1, 2, 1, 2, 1, 2, 1, 2, 1])),
((1, 1), Matrices.dense(3, 3, [3, 4, 5, 3, 4, 5, 3, 4, 5])),
((2, 0), Matrices.dense(3, 3, [1, 1, 1, 1, 1, 1, 1, 1, 1]))])
# Create a BlockMatrix from an RDD of sub-matrix blocks of size 3X3
# read a csv file
my_data = spark.read.csv('ind-ban-comment.csv',header=True)
# see the default schema of the dataframe
my_data.printSchema()
import pyspark.sql.types as tp
# define the schema
my_schema = tp.StructType([
tp.StructField(name= 'Batsman', dataType= tp.IntegerType(), nullable= True),
tp.StructField(name= 'Batsman_Name', dataType= tp.StringType(), nullable= True),
tp.StructField(name= 'Bowler', dataType= tp.IntegerType(), nullable= True),
tp.StructField(name= 'Bowler_Name', dataType= tp.StringType(), nullable= True),
tp.StructField(name= 'Commentary', dataType= tp.StringType(), nullable= True),
tp.StructField(name= 'Detail', dataType= tp.StringType(), nullable= True),
# get the dimensions of the data
(my_data.count() , len(my_data.columns))
# >> (605, 11)
# import sql function pyspark
import pyspark.sql.functions as f
# null values in each column
data_agg = my_data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in my_data.columns])
data_agg.show()
# drop the columns that are not required
my_data = my_data.drop(*['Batsman', 'Bowler', 'Id'])
my_data.columns