Skip to content

Instantly share code, notes, and snippets.

from os.path import abspath
import pandas as pd
from pyspark.sql import SparkSession
from petastorm import make_batch_reader
DATASET_SIZE = 100000 # 3000
JSON_PATH = "data.json"
import os, sys
import numpy as np
from glob import glob
from datetime import date
import cv2
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructField, StructType, IntegerType, BinaryType, StringType, TimestampType, DateType
number_of_rows = spark.sql(
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`').collect()
# Create a dataframe object from a parquet file
dataframe = spark.read.parquet(dataset_url)
# Show a schema
dataframe.printSchema()
# Count all
dataframe.count()
# Show a single column
rdd = dataset_as_rdd('file:///tmp/hello_world_dataset', spark,
[HelloWorldSchema.id, HelloWorldSchema.image1])
print(rdd.first().id)
with DataLoader(Reader('file:///tmp/hello_world_dataset')) as train_loader:
sample = next(iter(train_loader))
print(sample['id'])
plt.plot(sample['image1'])
with Reader('file:///tmp/hello_world_dataset') as reader:
tensor = tf_tensors(reader)
with tf.Session() as sess:
sample = sess.run(tensor)
print(sample.id)
plt.imshow(sample.image1)
with Reader('file:///tmp/hello_world_dataset') as reader:
# Pure python
for sample in reader:
print(sample.id)
plt.imshow(sample.image1)
rows_count = 10
with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):
rows_rdd = sc.parallelize(range(rows_count))\
.map(row_generator)\
.map(lambda x: dict_to_spark_row(HelloWorldSchema, x))
spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
.write \
.parquet('file:///tmp/hello_world_dataset')