Skip to content

Instantly share code, notes, and snippets.

@rokroskar
Created November 6, 2015 16:23
Show Gist options
  • Save rokroskar/bc0b4713214bb9b1e5ed to your computer and use it in GitHub Desktop.
Save rokroskar/bc0b4713214bb9b1e5ed to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import numpy as np"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Make a custom dtype that holds three floats:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"dt = np.dtype([('x','<f4', 1), ('y','<f4', 1), ('z','<f4', 1)])"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"dtype([('x', '<f4'), ('y', '<f4'), ('z', '<f4')])"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dt"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"arr = np.random.rand(100*3)"
]
},
{
"cell_type": "code",
"execution_count": 62,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"array([ 0.45313089, 0.33825112, 0.69725428, 0.97353836, 0.61312493,\n",
" 0.41893263, 0.21935282, 0.2584598 , 0.47569436, 0.74854242,\n",
" 0.07829222, 0.07778259, 0.59431165, 0.74552305, 0.53854664,\n",
" 0.53203663, 0.54238514, 0.28069276, 0.70110043, 0.82141481,\n",
" 0.46863893, 0.19000203, 0.94062347, 0.56337202, 0.2738124 ,\n",
" 0.18619099, 0.09986266, 0.63342246, 0.09899456, 0.20546126])"
]
},
"execution_count": 62,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"arr[:30]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Convert the array of 300 random numbers to an array of 100 3-float elements and write it to disk:"
]
},
{
"cell_type": "code",
"execution_count": 70,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"obj_arr = np.fromstring(np.float32(arr).tobytes(), dtype=dt)"
]
},
{
"cell_type": "code",
"execution_count": 71,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"array([(0.45313090085983276, 0.33825111389160156, 0.6972543001174927),\n",
" (0.973538339138031, 0.6131249070167542, 0.41893264651298523),\n",
" (0.2193528264760971, 0.25845980644226074, 0.47569435834884644),\n",
" (0.7485424280166626, 0.07829221338033676, 0.07778258621692657),\n",
" (0.5943116545677185, 0.7455230355262756, 0.538546621799469),\n",
" (0.5320366621017456, 0.5423851609230042, 0.2806927561759949),\n",
" (0.7011004090309143, 0.8214148283004761, 0.46863892674446106),\n",
" (0.19000202417373657, 0.9406234622001648, 0.563372015953064),\n",
" (0.2738124132156372, 0.18619099259376526, 0.09986265748739243),\n",
" (0.6334224343299866, 0.09899456053972244, 0.2054612636566162)], \n",
" dtype=[('x', '<f4'), ('y', '<f4'), ('z', '<f4')])"
]
},
"execution_count": 71,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"obj_arr[:10]"
]
},
{
"cell_type": "code",
"execution_count": 72,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"obj_arr.tofile('/var/tmp/obj_arr_dump')"
]
},
{
"cell_type": "code",
"execution_count": 73,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"data_len = dt.itemsize\n",
"block_len = data_len * 10"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Read in blocks of 10 elements:"
]
},
{
"cell_type": "code",
"execution_count": 74,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"with open('/var/tmp/obj_arr_dump') as f:\n",
" buf = np.fromstring(f.read(block_len), dtype=dt)"
]
},
{
"cell_type": "code",
"execution_count": 75,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"array([(0.45313090085983276, 0.33825111389160156, 0.6972543001174927),\n",
" (0.973538339138031, 0.6131249070167542, 0.41893264651298523),\n",
" (0.2193528264760971, 0.25845980644226074, 0.47569435834884644),\n",
" (0.7485424280166626, 0.07829221338033676, 0.07778258621692657),\n",
" (0.5943116545677185, 0.7455230355262756, 0.538546621799469),\n",
" (0.5320366621017456, 0.5423851609230042, 0.2806927561759949),\n",
" (0.7011004090309143, 0.8214148283004761, 0.46863892674446106),\n",
" (0.19000202417373657, 0.9406234622001648, 0.563372015953064),\n",
" (0.2738124132156372, 0.18619099259376526, 0.09986265748739243),\n",
" (0.6334224343299866, 0.09899456053972244, 0.2054612636566162)], \n",
" dtype=[('x', '<f4'), ('y', '<f4'), ('z', '<f4')])"
]
},
"execution_count": 75,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"buf"
]
},
{
"cell_type": "code",
"execution_count": 76,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"sqc = SQLContext(sc)"
]
},
{
"cell_type": "code",
"execution_count": 77,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from pyspark.sql.types import FloatType, StructType, StructField, Row\n",
"\n",
"schema = StructType(fields = [StructField('x', FloatType(), True), \n",
" StructField('y', FloatType(), True),\n",
" StructField('z', FloatType(), True)])"
]
},
{
"cell_type": "code",
"execution_count": 89,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def read_batches(iterator):\n",
" f = open('/var/tmp/obj_arr_dump', 'r')\n",
" \n",
" for batch_id in iterator: \n",
" offset = batch_id * block_len\n",
" f.seek(offset, 0)\n",
" buf = np.fromstring(f.read(block_len), dtype=dt)\n",
" rows = []\n",
" for element in buf: \n",
" d = {buf.dtype.names[i]:float(element[i]) for i in range(len(element))}\n",
" yield Row(**d)"
]
},
{
"cell_type": "code",
"execution_count": 90,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"batch_rdd = sc.parallelize(range(10))"
]
},
{
"cell_type": "code",
"execution_count": 91,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df = (batch_rdd.mapPartitions(read_batches)\n",
" .toDF(schema))"
]
},
{
"cell_type": "code",
"execution_count": 92,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[Row(x=0.45313090085983276, y=0.33825111389160156, z=0.6972543001174927),\n",
" Row(x=0.973538339138031, y=0.6131249070167542, z=0.41893264651298523),\n",
" Row(x=0.2193528264760971, y=0.25845980644226074, z=0.47569435834884644),\n",
" Row(x=0.7485424280166626, y=0.07829221338033676, z=0.07778258621692657),\n",
" Row(x=0.5943116545677185, y=0.7455230355262756, z=0.538546621799469),\n",
" Row(x=0.5320366621017456, y=0.5423851609230042, z=0.2806927561759949),\n",
" Row(x=0.7011004090309143, y=0.8214148283004761, z=0.46863892674446106),\n",
" Row(x=0.19000202417373657, y=0.9406234622001648, z=0.563372015953064),\n",
" Row(x=0.2738124132156372, y=0.18619099259376526, z=0.09986265748739243),\n",
" Row(x=0.6334224343299866, y=0.09899456053972244, z=0.2054612636566162)]"
]
},
"execution_count": 92,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.take(10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment