Skip to content

Instantly share code, notes, and snippets.

@nitisht
Last active April 5, 2024 08:25
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nitisht/5f5e0875af1356795036afdb9ad56f47 to your computer and use it in GitHub Desktop.
Save nitisht/5f5e0875af1356795036afdb9ad56f47 to your computer and use it in GitHub Desktop.
Use Spark to read / analyse / store Rosbag file formats for MinIO server
from time import time
from pyspark import SparkContext,SparkConf
import pyrosbag
from functools import partial
import pandas as pd
import numpy as np
from PIL import Image
from io import BytesIO
import rosbag
import cv2
from cv_bridge import CvBridge
import subprocess
import os
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
from PIL import Image
from io import BytesIO
import boto3
conf = SparkConf().setAppName("ADAS")
sc =SparkContext(conf=conf)
sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", "minio")
sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKeyId", "minio123")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://localhost:9000")
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
fin = sc.newAPIHadoopFile(
path = "s3a://spark/HMB_1.bag",
inputFormatClass = "de.valtech.foss.RosbagMapInputFormat",
keyClass = "org.apache.hadoop.io.LongWritable",
valueClass = "org.apache.hadoop.io.MapWritable",
conf = {"RosbagInputFormat.chunkIdx":"/home/nitish/dev/rosbag/data/HMB_1.bag.idx.bin"})
s3 = boto3.resource('s3',
endpoint_url='http://localhost:9000',
aws_access_key_id='minio',
aws_secret_access_key='minio123',
region_name='us-east-1')
def msg_map(r, func=str, conn={}):
from collections import namedtuple
from rosbag.bag import _get_message_type
if r[1]['header']['op'] == 2 and r[1]['header']['conn'] == conn['header']['conn']:
c = conn['data']
c['datatype'] = str(c['type'])
c['msg_def'] = str(c['message_definition'])
c['md5sum'] = str(c['md5sum'])
c = namedtuple('GenericDict', c.keys())(**c)
msg_type = _get_message_type(c)
msg = msg_type()
msg.deserialize(r[1]['data'])
yield func(msg)
# store conn_a for further processing
conn_a = fin.filter(lambda r: r[1]['header']['op'] == 7).map(lambda r: r[1])
#conn_a.coalesce(1).saveAsTextFile('s3a://spark/kv1.txt')
conn_d = {str(k['header']['topic']):k for k in conn_a.collect()}
# extraction of compressed images from bag
topics_compressed=['/center_camera/image_color/compressed']
def f(msg):
return (msg.data)
res = fin.flatMap(
partial(msg_map, func=lambda r: r.data, conn=conn_d[topics_compressed[0]])
).take(50)
## Convert compressed image bytearrays to CV2 format & upload to MinIO
count = 0
for r in res:
img = cv2.imdecode(np.fromstring(BytesIO(r).read(), np.uint8), 1)
img_numpy = Image.fromarray(img, 'RGB')
memfile = BytesIO()
img_numpy.save(memfile,'PNG')
memfile.seek(0)
s3.Bucket('spark').put_object(Key="image/" + str(count),Body=memfile)
count += 1
@nitisht
Copy link
Author

nitisht commented Jun 21, 2019

Pre-requisites:

Run the file as

./bin/spark-submit --master local --jars "../bin/hadoop-aws-2.8.2.jar,../bin/httpclient-4.5.3.jar,../bin/aws-java-sdk-core-1.11.234.jar,../bin/aws-java-sdk-kms-1.11.234.jar,../bin/aws-java-sdk-1.11.234.jar,../bin/aws-java-sdk-s3-1.11.234.jar,../bin/joda-time-2.9.9.jar,../bin/rosbaginputformat.jar,../bin/rosbaginputformat_2.11-0.9.8.jar,../bin/scala-library-2.11.8.jar,../bin/protobuf-java-3.3.0.jar" /home/nitish/dev/rosbag/rosbag-MinIO.py

@matthewji
Copy link

Thank you for great information!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment