Skip to content

Instantly share code, notes, and snippets.

@drorata
Last active December 14, 2023 13:19
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save drorata/73c15740f83d0cb0b187d00a57ca74a1 to your computer and use it in GitHub Desktop.
Save drorata/73c15740f83d0cb0b187d00a57ca74a1 to your computer and use it in GitHub Desktop.
Useful snippets

Ansible

AWS and boto3

Change tags of AWS-Glue crawler

def create_or_update_glue_crawler_tags(connection, module, glue_crawler, changed):
    account_id = module.client('sts').get_caller_identity().get('Account')
    region = connection.meta.region_name
    glue_crawler_arn = (
        "arn:aws:glue:{region}:{account_id}:crawler/{glue_crawler}"
        .format(region=region, account_id=account_id, glue_crawler=glue_crawler["Name"])
    )
    current_tags = connection.get_tags(ResourceArn=glue_crawler_arn)["Tags"]

    if module.params.get("tags") is None and current_tags == dict():
        # No current tags and no tags provided in task
        return changed

    user_tags = module.params.get("tags")
    if user_tags == current_tags:
        # Current tags are the same as those provided in the task
        return changed

    # Current tags are different from those provided
    # First, remove all existing tags
    try:
        connection.untag_resource(ResourceArn=glue_crawler_arn, TagsToRemove=list(current_tags.keys()))
    except (BotoCoreError, ClientError) as e:
        module.fail_json_aws(e, "Unable to remove existing tags")
    # Now set the new tags
    if user_tags is None:
        return True
    try:
        connection.tag_resource(
            ResourceArn=glue_crawler_arn,
            TagsToAdd=user_tags
        )
    except (BotoCoreError, ClientError) as e:
        module.fail_json_aws(e, "Unable to tag crawler")
    return True

where:

  • module = AnsibleAWSModule(...)
  • connection = module.client('glue')
  • glue_crawler is the crawler name

Athena

Create partitioned external table

Assumptions

  • You data is stored on S3 with the following structure: s3://my_bucket/base/path/dt=YYYY-MM-DD/
  • Each partition contains JSON files

Create the table

IMPORTANT: If you're trying to change something in the schema, don't forget to DROP the table first!

CREATE EXTERNAL TABLE IF NOT EXISTS `db_name.table_name` (
         `field1` string,
         `field2` string,
         `field3` float,
         `field4` int,
         `field5` int,
         `field6` string,
         `field7` string
)
PARTITIONED BY (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
         'serialization.format' = '1'
)
LOCATION 's3://my_bucket/base/path/'
TBLPROPERTIES ('has_encrypted_data'='true');

Fix the table

MSCK REPAIR TABLE `db_name.table_name`

Query the table

SELECT *
FROM db_name.table_name
WHERE dt > '2020-03-03'
LIMIT 5;

Docker

Pandas and numpy in alpine

FROM python:3.7.3-alpine3.9

# Install dependencies needed for building of pandas+numpy
# See: https://stackoverflow.com/a/38571314/671013
RUN apk --update add --no-cache \
    lapack-dev \
    gcc \
    freetype-dev

RUN apk add --no-cache --virtual .build-deps \
    gfortran \
    musl-dev \
    g++

RUN ln -s /usr/include/locale.h /usr/include/xlocale.h

# Install python dependencies. Needed to run
RUN pip3 install --no-cache-dir Cython
RUN pip3 install --no-cache-dir numpy
RUN pip3 install --no-cache-dir pandas

EC2

Mounting volumes

SSD provided for a spawned EC2 instance are not mounted automatically. Checkout this answer.

jq

Convert json-per-line to CSV

touch res.csv && echo "col1,col2,col3" > res.csv && cat input.json | jq -r '[.col1, .col2, .col3] | @csv' >> res.csv

This is correct, assuming that input.json has the form:

{"col1": 10, "col2": "foo", "col3": true}
{"col1": 11, "col2": "boo", "col3": false}
{"col1": 11, "col2": "goo", "col3": true}

Flatten nested-json-per-line

cat data.json | jq -c '. | {col1: .col1 | .[0:10], col2: .col2, nested1: .nest.foo, nested2: .nest.bar}' > flatten.json

In the example above, the slice [0:10] is kept from col1.

MongoDB

Multiple condictions

db.getCollection('collectionName').find(
    {
        userId: {$eq: "someID_123"},
        contentType: {$eq: "THE_CONTENT_TYPE"},
        "content.some.nestedField": {$exists: 1},
        createdDate: {$gt: ISODate("2019-05-08")}
    }
)

Aggregate by day-of-the-week

db.getCollection('myCollection').aggregate([
    {
        "$project": {
            "dow": {"$dayOfWeek": "$publishDate"}
        }
    },
    {
        "$group": {
            "_id": "$dow" ,
            "count": { "$sum": 1 }
        }
    },
    {
        "$project": {
            "dowName": {
                "$cond": [ {"$eq":[1,"$_id"]}, "Sun", {
                    "$cond": [ {"$eq":[2,"$_id"]}, "Mon", {
                        "$cond": [ {"$eq":[3,"$_id"]}, "Tue", {
                            "$cond": [ {"$eq":[4,"$_id"]}, "Wed", {
                                "$cond": [ {"$eq":[5,"$_id"]}, "Thu", {
                                    "$cond": [ {"$eq":[6,"$_id"]}, "Fri", "Sat"]
                                }]
                            }]
                        }]
                    }]
                }]
            },
            "count": "$count"
        }
    },
    { "$sort" : { "_id" : 1 } }
])

Insert JSONs from file

import datetime

import jsonlines
import pymongo

def insert_data_to_db(filename: str):
    """Insert data (one JSON per line) to DB from a file

    Each data point is enriched with the timestamp of insertion.

    DB's location is provied by an environment variable

    Parameters
    ----------
    filename : str
        Path to a json-lines files where each line corresponds to a valid
        JSON of the data
    """
    LOGGER.info(f"About to upload the data in {filename} to DB")

    app_db_uri = os.getenv("APP_DB_URI", "mongodb://localhost:27017/")
    client = pymongo.MongoClient(app_db_uri)

    db = client["myDB"]
    collection = db["myCollection"]

    with jsonlines.open(filename) as reader:
        for obj in reader:
            obj["inserted_ts"] = datetime.datetime.utcnow()
            collection.insert(obj)
    client.close()

Pandas

Fill missing dates in aggregation

Let us start from this dummy frame:

df = pd.DataFrame(
    [
        ["a", "2020-12-20", 10],
        ["a", "2020-12-26", 11],
        ["a", "2020-12-22", 10],
        ["b", "2020-12-25", 111],
        ["c", "2020-12-20", 20],
        ["d", "2020-12-05", 1111]
    ],
    columns=["cat", "date", "value"]
)
df["date"] = pd.to_datetime(df.date)

The objective is to get the weekly sum of values per category

We start with:

res0 = (
    df
    .set_index("date")
    .groupby("cat")
    .resample("W")["value"].sum()
)

which yields:

cat  date
a    2020-12-20      10
     2020-12-27      21
b    2020-12-27     111
c    2020-12-20      20
d    2020-12-06    1111

But this is not good enough. For the b category we only have one value and we cannot compare it to the weeks where a has values. Let's fill the missing weeks:

res1 = (
    res0
    .unstack()
    .fillna(0)
    .stack()
    .reset_index()
)

which yields:

cat date 0
a 2020-12-06 00:00:00 0
a 2020-12-20 00:00:00 10
a 2020-12-27 00:00:00 21
b 2020-12-06 00:00:00 0
b 2020-12-20 00:00:00 0
b 2020-12-27 00:00:00 111
c 2020-12-06 00:00:00 0
c 2020-12-20 00:00:00 20
c 2020-12-27 00:00:00 0
d 2020-12-06 00:00:00 1111
d 2020-12-20 00:00:00 0
d 2020-12-27 00:00:00 0

Now we have aggregations for all the weeks where at least one category had some value. But what about the week of 2020-12-13? No category was active, but it's still interesting... To nail that we'd need an intermidate step:

res2 = (
    df
    .set_index("date")
    .groupby("cat")
    .resample("W")["value"].sum()
    .unstack()
    .transpose()
)

yielding:

date a b c d
2020-12-06 00:00:00 nan nan nan 1111
2020-12-20 00:00:00 10 nan 20 nan
2020-12-27 00:00:00 21 111 nan nan

Next, we re-index the frame and include the missing week:

res3 = (
    res2
    .reindex(
        pd.date_range(
            res2.index.min(),
            res2.index.max(),
            freq="W",  # This is important!!!
            name="date"
            ))
    .transpose()
    .fillna(0)
    .stack()
    .reset_index()
    .sort_values("date")
)

yielding:

cat date 0
a 2020-12-06 00:00:00 0
b 2020-12-06 00:00:00 0
c 2020-12-06 00:00:00 0
d 2020-12-06 00:00:00 1111
a 2020-12-13 00:00:00 0
b 2020-12-13 00:00:00 0
c 2020-12-13 00:00:00 0
d 2020-12-13 00:00:00 0
a 2020-12-20 00:00:00 10
b 2020-12-20 00:00:00 0
c 2020-12-20 00:00:00 20
d 2020-12-20 00:00:00 0
a 2020-12-27 00:00:00 21
b 2020-12-27 00:00:00 111
c 2020-12-27 00:00:00 0
d 2020-12-27 00:00:00 0

Note that for 2020-12-13 all values are zero!

Generate dummy data (taken from here)

Hidden way down in Pandas’ testing module are a number of convenient functions for quickly building quasi-realistic Series and DataFrames:

import pandas.util.testing as tm
tm.N, tm.K = 15, 3  # Module-level default rows/columns

import numpy as np
np.random.seed(444)

tm.makeTimeDataFrame(freq='M').head()
#                  A       B       C
# 2000-01-31  0.3574 -0.8804  0.2669
# 2000-02-29  0.3775  0.1526 -0.4803
# 2000-03-31  1.3823  0.2503  0.3008
# 2000-04-30  1.1755  0.0785 -0.1791
# 2000-05-31 -0.9393 -0.9039  1.1837

tm.makeDataFrame().head()
#                  A       B       C
# nTLGGTiRHF -0.6228  0.6459  0.1251
# WPBRn9jtsR -0.3187 -0.8091  1.1501
# 7B3wWfvuDA -1.9872 -1.0795  0.2987
# yJ0BTjehH1  0.8802  0.7403 -1.2154
# 0luaYUYvy1 -0.9320  1.2912 -0.2907

Preserve existing values in a map

Assume you have a series:

s = pd.Series(["foo", "bar", 1, 2])

and you want to map foo to 123 and 1 to yoyo. First, define the map as a dictionary:

map_dict = {
    "foo": 123,
    1: "yoyo"
}

Now you can try:

s.map(map_dict)

This yields:

0     123
1     NaN
2    yoyo
3     NaN
dtype: object

and introduces NaN's whereever the value is not a key of the mapping. If you want to keep the values in s that are missing from the map, define the following class:

class DictPreserveMissingKeys(dict):
    def __init__(self, *arg, **kw):
        super(DictPreserveMissingKeys, self).__init__(*arg, **kw)

    def __missing__(self, key):
        return key

and run:

s.map(DictPreserveMissingKeys(map_dict))

which yields:

0     123
1     bar
2    yoyo
3       2
dtype: object

Pytest tricks

Testing against S3 using Moto

Here is a minimal example:

import tempfile
from s3fs import S3FileSystem
import boto3
import json
from moto import mock_s3


@mock_s3
def test_signle_file():
    conn = boto3.resource("s3", region_name="us-east-1")
    conn.create_bucket(Bucket="my_bucket")
    s3 = S3FileSystem()

    my_dict = {"foo": "bar"}

    with s3.open("s3://my_bucket/some/dir/foo.json", "w") as f:
        json.dump(my_dict, f)

    with tempfile.TemporaryDirectory() as tmp_dir:
        copy_artifact("s3://my_bucket/some/dir/foo.json", tmp_dir)
        with open(f"{tmp_dir}/foo.json", "r") as f:
            assert json.load(f) == my_dict

Alternatively, and especially useful if use need to use the same bucket in many tests, you can use a fixture:

import pytest
import tempfile
from s3fs import S3FileSystem
import boto3
import json
from moto import mock_s3


@pytest.yield_fixture(scope="function")
def s3():
    mocks3 = mock_s3()
    mocks3.start()

    client = boto3.client("s3")

    client.create_bucket(Bucket="my_bucket")
    s3 = S3FileSystem()
    yield s3

    mocks3.stop()


def test_single_file(s3):

    my_dict = {"foo": "bar"}

    with s3.open("s3://my_bucket/some/dir/foo.json", "w") as f:
        json.dump(my_dict, f)

    with tempfile.TemporaryDirectory() as tmp_dir:
        copy_artifact("s3://my_bucket/some/dir/foo.json", tmp_dir)
        with open(f"{tmp_dir}/foo.json", "r") as f:
            assert json.load(f) == my_dict

Plotly

Adding vertical line at specific indices of a series

See my answer here.

import pandas as pd
import numpy as np

pd.options.plotting.backend = "plotly"


df = pd.DataFrame(
    {
        "val": np.sin(np.linspace(0, 7, 100))
    }
)
df["signal"] = df.val > 0.7

fig = df.plot(y="val")
for idx in df[df.signal].index:
    fig.add_vline(idx)

Vertical hover ruler when using Pandas

import pandas as pd
pd.options.plotting.backend = "plotly"
import plotly.graph_objects as go

go.Figure({
    'data': my_df.plot().data,
    'layout': {
        'hovermode': 'x',
        'xaxis': {'showspikes': True}
    }
})

Stacking figures

story_ids = df.story_id.unique()

fig = make_subplots(
    rows=story_ids.shape[0], cols=1,
    subplot_titles=[f"Story ID: {story_id}" for story_id in story_ids]
    )

for i, story_id in enumerate(story_ids):
    fig.add_trace(
            px.bar(
                df, x="foo", y="bar", title=f"Title ID: {story_id}"
            ).data[0],
            row=i+1, col=1
        )
fig.update_layout(height=2000, width=1000, title_text="Chapter Retention Rate per Story")
fig

Poetry

Installing on Windows

  1. Creat a virtual environment for your installation: python -m venv ~/.poetry. In this case, it will be created in the home directory.
  2. Next, update pip and setuptools as follows: C:\Users\<your-username>\.poetry\Scripts\python.exe -m pip install -U pip setuptools
  3. Now, you can install poetry: C:\Users\<your-username>\.poetry\Scripts\pip.exe install poetry.
  4. Lastly, it's time to add poetry to your path; add at the end of your PATH list the following: C:\Users\<your-username>\.poetry\Scripts.
  5. Test the installation, by restaring your console and running: poetry config -vvv --list. This will also hint to you where poetry is looking for its config file(s).

NOTE: Due to slow network connection, I had to manually tweek the time out constant. This can be done by editing C:\Users\<your-username>\.poetry\.\Lib\site-packages\poetry\utils\constants.py. I increased it to 1500. I needed that when, for example installing mypy. Check out this discord message. Thanks Clinton!

R

Pairwise matrix(?) comparing

By first checking that all the NAs are aligned, you can then switch to the other values and make sure they are all equal.

elementwise_equal <- function(a, b) {
    expect_equal(length(a), length(b))
    # All NAs appear in the same places
    expect_equal(length(which((is.na(a) == is.na(b)) == FALSE)), 0)
    expect_equal(
        length(which((a == b) == T)) + length(which(is.na(a == b))),
        length(a)
    )
}

JSON loading

One object per line file

See: jeroen/jsonlite#59 (comment)

jsonlite::stream_in(file("path/to/my/file.json"))

Reading from S3

Compressed

read_gzip_json_from_s3_to_df <- function(path) {
  #' Read a single gzipeed JSON file from S3 location into a dataframe
  #'
  #' The compressed JSON should contain a single object per line
  #' with no commas of array structure wrapping the objects
  #'
  #' @param path S3 location of an object; e.g. s3://my-bucket/some/folders/file.json.gz
  raw_data <- path %>% get_object %>% rawConnection %>% gzcon %>% jsonlite::stream_in() %>% jsonlite::flatten()
  raw_data
}

Uncompressed

read_json_from_s3_to_df <- function(path) {
  #' Read a single JSON file from S3 location into a dataframe
  #'
  #' The JSON should contain a single object per line
  #' with no commas of array structure wrapping the objects
  #'
  #' @param path S3 location of an object; e.g. s3://my-bucket/some/folders/file.json
  raw_data <- path %>% get_object %>% rawToChar %>% textConnection %>% jsonlite::stream_in() %>% jsonlite::flatten()
  raw_data
}

ZondID to offset in seconds

zoneId_to_offset_in_seconds <- function(zoneID) {
  #' Given a ZoneID (see the Java definition), return an offset as a signed integer
  #'
  #' @param zoneID a string representing a zone ID. Should have the form "+HH:MM or "-HH:MM".
  #'               May be a single string or a list(?) of strings
  tz_sign <- zoneID %>% substr(1,1) %>% paste0(1) %>% as.numeric()
  hours <- zoneID %>% substr(2,3) %>% as.numeric()
  minutes <- zoneID %>% substr(5,6) %>% as.numeric()

  tz_sign * (hours * 3600 + minutes * 60)
}

S3 CLI

Recursively download with wildcards

aws s3 cp s3://my_bucket/the/prefix/with/trailing/ . --recursive --exclude "*" --include "dt=2020-01-*"

Spark

Inspect a column of a DataSet/DataFrame

Java

ds.select("col_name").collectAsList();
df.select("col_name").collectAsList();

Python

df.select("col_name").distinct().collect()

Load JSON data from S3

df = sqlContext.read.json('s3://my_bucket/raw/items/dt=2019-04-*')

Enable SQL on a dataframe

# Assuming df is some dataframe/dataset
df.createOrReplaceTempView("data")
spark.sql("SELECT * FROM data WHERE userId = 'foobar'")

Count distinct values of every column

See SO

from pyspark.sql.functions import col, countDistinct
df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns))

Check file existence on S3

PySpark

The following links helped when figuring out this solution:

def path_exists(path):
    # spark is a SparkSession
    sc = spark.sparkContext
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
        sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
        sc._jsc.hadoopConfiguration(),
    )
    return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))

Caveat: The mentioned solution requires the specification of the bucket and it is not a generic solution for all S3 paths.

to_date behavior

Consider the following dataframe:

df = SQLContext(context).createDataFrame(
    [
        ('1997-02-28 10:30:45.123',),
        ('1997-02-28 10:30:45',),
        ("1997-02-28 10:30",),
        ("1997-02-28 10",),
        ("1997-02-28",),
    ], ['t'])
df.show()

Now, compare the results of the following:

df.select(F.to_date(df.t, 'yyyy-MM-dd HH:mm:ss.SSS').alias('date')).show()
df.select(F.to_date(df.t, 'yyyy-MM-dd HH:mm:ss').alias('date')).show()
df.select(F.to_date(df.t, 'yyyy-MM-dd HH:mm').alias('date')).show()
df.select(F.to_date(df.t, 'yyyy-MM-dd HH').alias('date')).show()
df.select(F.to_date(df.t, 'yyyy-MM-dd').alias('date')).show()
+----------+
|      date|
+----------+
|1997-02-28|
|      null|
|      null|
|      null|
|      null|
+----------+

+----------+
|      date|
+----------+
|1997-02-28|
|1997-02-28|
|      null|
|      null|
|      null|
+----------+

+----------+
|      date|
+----------+
|1997-02-28|
|1997-02-28|
|1997-02-28|
|      null|
|      null|
+----------+

+----------+
|      date|
+----------+
|1997-02-28|
|1997-02-28|
|1997-02-28|
|1997-02-28|
|      null|
+----------+

+----------+
|      date|
+----------+
|1997-02-28|
|1997-02-28|
|1997-02-28|
|1997-02-28|
|1997-02-28|
+----------+

Timezones with Pyspark

I created this small notebook here with a demo how PySpark behaves with timezones. This blog post is also interesting.

Mapping values with PySpark

Unit test PySpark using pytest

Assuming your environment has pyspark installed and you know where your java is, the following fixture will provide a sqlContext which you could use in other tests.

@pytest.fixture
def sqlC():
    # For this to work I had to tweak:
    # $ export JAVA_HOME="$(/usr/libexec/java_home --version 1.8)"
    import findspark

    findspark.init()

    from pyspark.sql import SparkSession
    from pyspark.sql import SQLContext

    spark = SparkSession.builder.master("local[*]").getOrCreate()
    sc = spark.sparkContext
    return SQLContext(sc)

For example:

def test_foo(sqlC):
     assert True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment