Skip to content

Instantly share code, notes, and snippets.

@dineshdharme
dineshdharme / Windows12HourSumPerRow.py
Created May 17, 2024 14:03
A clever way to sum all departures within a window before current row arrival.
https://stackoverflow.com/questions/78490654/calculate-rolling-counts-from-two-different-time-series-columns-in-pyspark/78495948#78495948
Here's a clever way to figure out all departures before current rows arrival.
Label the corresponding times with arrival flag i.e. `"A"` or departure `"D"`
Now union these two dataframes.
Order these dataframes by time irrespective of label.
@dineshdharme
dineshdharme / InLineManager.py
Last active May 11, 2024 09:11
Find inline manager list all the way to CEO i.e. top using graph-tool library
I'm working with hierarchical data in PySpark where each employee has a manager, and I need to find all the inline managers for each employee. An inline manager is defined as the manager of the manager, and so on, until we reach the top-level manager (CEO) who does not have a manager.
Is it necessary that you have to use Pyspark in Databricks ?
If yes, this answer could help you. It does exactly that.
https://stackoverflow.com/a/77627393/3238085
Here is another solution which can help you. Unfortunately the guy who asked question has deleted it.
@dineshdharme
dineshdharme / BatchWriteParquetFromStructuredStreaming.py
Created April 28, 2024 19:24
BatchWriteParquetFromStructuredStreaming.py
This is not a perfect solution. But since streaming solution would be more suitable so providing it as an option.
Adapted from socket example below
https://github.com/abulbasar/pyspark-examples/blob/master/structured-streaming-socket.py
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html (search for 'socket' in this webpage)
To figure out if processing is finished., just check for this line in the logs.
@dineshdharme
dineshdharme / MissingValuesInterpolationAndWindowIdentification.py
Created April 11, 2024 12:28
Interpolate missing values in a timeseries
https://stackoverflow.com/questions/78304441/how-can-i-interpolate-missing-values-based-on-the-sum-of-the-gap-using-pyspark/
This was a nice fun problem to solve.
In pyspark, you can populate a column over a window specification with first not Null value or last not Null value.
Then we can also identify the groups of nulls which come together as a bunch
and then rank over them.
Once, we have those above two values, calculating the interpolated values is
@dineshdharme
dineshdharme / MaximalBipartiteMatchingGraphProblem.py
Last active April 9, 2024 16:21
A maximum bipartite matching algorithm solution.
https://stackoverflow.com/questions/78294920/select-unique-pairs-from-pyspark-dataframe
As @ Abdennacer Lachiheb mentioned in the comment, this is indeed a bipartite matching algorithm. Unlikely to get solved correctly in pyspark or using graphframes. The best would to solve it using a graph algorithm library's `hopcroft_karp_matching` like `NetworkX`. Or use `scipy.optimize.linear_sum_assignment`
`hopcroft_karp_matching` : pure python code, runs in O(E√V) time, where E is the number of edges and V is the number of vertices in the graph.
`scipy.optimize.linear_sum_assignment` : O(n^3) complexity but written in c++.
So only practical testing on the data can determine which works better on your data sizes.
@dineshdharme
dineshdharme / DynamicJsonFormatting.py
Created April 8, 2024 13:43
Dynamic Json Formatting in Pyspark using schema_of_json function.
https://stackoverflow.com/questions/78290764/flatten-dynamic-json-payload-string-using-pyspark/
There is a nifty method `schema_of_json` in pyspark which derives the schema of json string and applies to the whole column.
So the following method to handly dynamic json payloads is as follows:
- First take `json_payload` of first row of dataframe
- Create a schema of the json_payload using `schema_of_json`
@dineshdharme
dineshdharme / ParallelAPICallsInPysparkUDF_Example.py
Created April 5, 2024 13:44
A demo pyspark script to show how to invoke parallel api calls using spark.
Here's an helpful example of using Dataframes and making parallel API calls.
import json
import sys
from pyspark.sql import SQLContext
import requests
from pyspark.sql.functions import *
from pyspark.sql.types import *
@dineshdharme
dineshdharme / ParsingBooleanExpressionUsingLark.py
Created April 4, 2024 15:50
Parsing Boolean Expression Using Lark.
https://stackoverflow.com/questions/78272962/split-strings-containing-nested-brackets-in-spark-sql
It is very easy to do with lark python library.
$ `pip install lark --upgrade`
Then you need to create a grammar which is able to parse your expressions.
Following is the script :
@dineshdharme
dineshdharme / Suggestion.py
Created March 31, 2024 08:20
Rooted Minimum Spanning Tree in a Directed Graph.
https://stackoverflow.com/questions/78244909/graphframes-pyspark-route-compaction/78248893#78248893
You can possibly use
networkx's Edmond's algorithm to find minimum spanning arborescence rooted at a particular root in a given directed graph.
https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.tree.branchings.Edmonds.html
In graph theory, an arborescence is a directed graph having a distinguished vertex u (called the root) such that, for any other vertex v, there is exactly one directed path from u to v.
@dineshdharme
dineshdharme / ClusteringNamesUsingMinHashLSH.py
Created March 20, 2024 11:29
Clustering similar text using minhashing and lsh.
https://stackoverflow.com/questions/78186018/fuzzy-logic-to-match-the-records-in-a-dataframe/78192904#78192904
Here's another implementation which does the same thing. This time using MinHash and LSH.
Here's an article which explains this.
https://spotintelligence.com/2023/01/02/minhash/
First, install `datasketch` and `networkx`