Skip to content

Instantly share code, notes, and snippets.

@dineshdharme
dineshdharme / create-offline-python3.6-dependencies-repository.sh
Last active June 4, 2021 06:59
Offline installation of python dependencies
#!/usr/bin/env bash
# This script follows the steps described in this link:
# https://stackoverflow.com/a/51646354/8808983
LIBRARIES_DIR="python3.6-wheelhouse"
REQ_FILE="requirements.txt"
PLATFORM="manylinux1_x86_64"
@dineshdharme
dineshdharme / python_code_snippets.py
Created June 4, 2021 01:30
Some useful code snippets in python
## get the list of column names
cols_now = list(pandas_df)
# move the column to head of list using index, pop and insert
cols_now.insert(0, cols_now.pop(cols_now.index('middle_col_name')))
# select the modified column name list from the original dataframe
# thus rearranging the column names
pandas_df = pandas_df.loc[:, cols_now]
@dineshdharme
dineshdharme / batch_generator.py
Created June 4, 2021 01:51
Parallel generate million rows of large number of string columns in python for classification and upload to S3
import numpy as np
from pathlib import Path
import boto3
from boto3.exceptions import S3UploadFailedError
from joblib import Parallel, delayed
import pandas as pd
import sys
from coolname import generate
import random
@dineshdharme
dineshdharme / batch_generator.py
Created June 4, 2021 01:57
Parallel generate million rows of large number of numeric columns in python for classification and upload to S3
import numpy as np
from pathlib import Path
import boto3
from boto3.exceptions import S3UploadFailedError
from joblib import Parallel, delayed
import sys
# boto3 clients not thread-safe, but also not serializable, so making it global
s3_client = boto3.client('s3')
@dineshdharme
dineshdharme / GraphTraversalMotifsPyspark.py
Created August 29, 2023 14:07
Apache Spark, GraphX API, Pyspark, Graphframes, Solving Graph traversal problems using Graphframes Python binding to GraphX API of Apache Spark.
I have added helpful comments in the code at appropriate location.
This solution uses GraphX API of Spark though `graphframes` python bindings.
from pyspark import SQLContext
from pyspark.sql.functions import *
from graphframes import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
@dineshdharme
dineshdharme / SolveRecurrenceRelationsWindowsUserDefinedPandasUDF.py
Created September 5, 2023 17:00
Calculate recurrence relation equations at each time step. Solution uses PandasUDF over Windows function in pyspark.
Not sure if this answer is helpful or not since I couldn't cast your iterative equation into a normal format or find an iterative equation solver. But you can definitely use scipy's fsolve to solve non-linear equations.
EDIT : We can used a specialized Pandas UDF to do aggregation over appropriate Window definition.
Here's an example below :
import sys
from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F
@dineshdharme
dineshdharme / Optimized_MultipleModel_Inference_MapPartitions_Pyspark.py
Created September 5, 2023 17:03
Optimized implementation of multiple model inferencing on a dataframe of value using mapPartition function to achieve batch inference
This need not be so complicated. Here's an concrete example of how you can do this.
First load all the models in a list. Broadcast the list. Access the list broadcasted variable's value using `value`. You can concatenate your features into an array as I have done below then do inference on samples one by one.
You could achieve the batch semantics by using mapPartition function on an rdd and then convert the result back to dataframe as shown below.
import sys
@dineshdharme
dineshdharme / PandasUDFType_GROUPED_AGG.py
Created September 6, 2023 14:50
An example of PandasUDFType.GROUPED_AGG in pyspark. Clearly explained.
Here's a solution using `PandasUDFType.GROUPED_AGG` which can be used inside a groupby clause.
from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from typing import Iterator, Tuple
import pandas as pd
@dineshdharme
dineshdharme / CreateMapAndUseIt.py
Created September 7, 2023 02:14
A simple problem and unique solution.
My approach to solve this problem has been to create a map of code and minimum timestamp it appeared in. Then using that map to populate the start_time_i columns. Below is the code.
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import col,when
sc = SparkContext('local')
@dineshdharme
dineshdharme / XMLDataWranglingExample.py
Created September 8, 2023 16:01
A simple example of data wrangling.
Here's one way to do it. Basically I asked the Robs node to be made into ROW. Then onwards its normal data wrangling.
import sys
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyApp") \