Skip to content

Instantly share code, notes, and snippets.

@ian-whitestone
Last active August 5, 2020 22:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ian-whitestone/2b80531afec31663f1ab07d0296adee3 to your computer and use it in GitHub Desktop.
Save ian-whitestone/2b80531afec31663f1ab07d0296adee3 to your computer and use it in GitHub Desktop.
Parse a unix timestamp with proper handling for milliseconds
from __future__ import unicode_literals
import pytest
from starscream.pipeline.stage import TransformStage
from pyspark.sql import functions as F, types as T
from starscream.contract import Contract
from starscream.utils.dataframe import as_dicts, from_dicts
import pyspark.sql.types as T
contract = Contract({
'id': {'type': T.LongType()},
'updated_at': {'type': T.LongType()},
})
input_df = from_dicts(sc, contract, [
{'id': 1, 'updated_at': 1596503049026},
{'id': 2, 'updated_at': 1596503049773},
{'id': 3, 'updated_at': 1596503049066},
{'id': 4, 'updated_at': 1596503049999},
])
input_df.select(['id', 'updated_at']).show()
class MyStage(TransformStage):
"""
Proper handling of unix timestamp with milliseconds:
1) Take input unix timestamp with milliseconds 1596503049026
2) Get the unix timestamp in seconds, '1596503049', and the milliseconds component '026'
3) Convert the unix timestamp in seconds to a normal timestamp, i.e. '1596503049' --> '2020-08-04 01:04:09'
4) Add in the milliseconds component via string concatenation, '2020-08-04 01:04:09' + '.' + '026'
5) Convert that string back to a timestamp, '2020-08-04 01:04:09.026'
"""
OUTPUT = Contract({
'id': {'type': T.LongType()},
'updated_at': {'type': T.LongType()}, # original unix ts: 1596503049026
'updated_at_ts_ms': {'type': T.TimestampType()}, # Timestamp with milliseconds, i.e. 2020-08-04 01:04:09.026
})
def apply(self, sc, my_input_df):
# TODO: add an assert that updated_at must be a 13 digit long
return (
my_input_df
.withColumn('updated_at_unix_ms_str', F.col('updated_at').cast(T.StringType()))
.withColumn('updated_at_unix_s', F.substring(F.col('updated_at_unix_ms_str'), 1, 10).cast(T.LongType()))
.withColumn('updated_at_ms_str', F.substring(F.col('updated_at_unix_ms_str'), -3, 3))
.withColumn('updated_at_ts_s_str', F.from_unixtime('updated_at_unix_s'))
.withColumn('updated_at_ts_ms_str', F.concat(F.col('updated_at_ts_s_str'), F.lit('.'), F.col('updated_at_ms_str')))
.withColumn('updated_at_ts_ms', F.to_timestamp(F.col('updated_at_ts_ms_str')))
.select(self.OUTPUT.keys())
)
output_df = MyStage().apply(sc, input_df)
output_df.show()
output_df.toPandas()
@ian-whitestone
Copy link
Author

Well I way over-engineered this.... apparently you can just do:

(F.col('updated_at') / 1000).astype('timestamp') and spark will automagically parse as a proper timestamp with milliseconds precision. Same caveat as above that your timestamp column ('updated_at'), must be 13 digits.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment