Parse a unix timestamp with proper handling for milliseconds
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import unicode_literals | |
import pytest | |
from starscream.pipeline.stage import TransformStage | |
from pyspark.sql import functions as F, types as T | |
from starscream.contract import Contract | |
from starscream.utils.dataframe import as_dicts, from_dicts | |
import pyspark.sql.types as T | |
contract = Contract({ | |
'id': {'type': T.LongType()}, | |
'updated_at': {'type': T.LongType()}, | |
}) | |
input_df = from_dicts(sc, contract, [ | |
{'id': 1, 'updated_at': 1596503049026}, | |
{'id': 2, 'updated_at': 1596503049773}, | |
{'id': 3, 'updated_at': 1596503049066}, | |
{'id': 4, 'updated_at': 1596503049999}, | |
]) | |
input_df.select(['id', 'updated_at']).show() | |
class MyStage(TransformStage): | |
""" | |
Proper handling of unix timestamp with milliseconds: | |
1) Take input unix timestamp with milliseconds 1596503049026 | |
2) Get the unix timestamp in seconds, '1596503049', and the milliseconds component '026' | |
3) Convert the unix timestamp in seconds to a normal timestamp, i.e. '1596503049' --> '2020-08-04 01:04:09' | |
4) Add in the milliseconds component via string concatenation, '2020-08-04 01:04:09' + '.' + '026' | |
5) Convert that string back to a timestamp, '2020-08-04 01:04:09.026' | |
""" | |
OUTPUT = Contract({ | |
'id': {'type': T.LongType()}, | |
'updated_at': {'type': T.LongType()}, # original unix ts: 1596503049026 | |
'updated_at_ts_ms': {'type': T.TimestampType()}, # Timestamp with milliseconds, i.e. 2020-08-04 01:04:09.026 | |
}) | |
def apply(self, sc, my_input_df): | |
# TODO: add an assert that updated_at must be a 13 digit long | |
return ( | |
my_input_df | |
.withColumn('updated_at_unix_ms_str', F.col('updated_at').cast(T.StringType())) | |
.withColumn('updated_at_unix_s', F.substring(F.col('updated_at_unix_ms_str'), 1, 10).cast(T.LongType())) | |
.withColumn('updated_at_ms_str', F.substring(F.col('updated_at_unix_ms_str'), -3, 3)) | |
.withColumn('updated_at_ts_s_str', F.from_unixtime('updated_at_unix_s')) | |
.withColumn('updated_at_ts_ms_str', F.concat(F.col('updated_at_ts_s_str'), F.lit('.'), F.col('updated_at_ms_str'))) | |
.withColumn('updated_at_ts_ms', F.to_timestamp(F.col('updated_at_ts_ms_str'))) | |
.select(self.OUTPUT.keys()) | |
) | |
output_df = MyStage().apply(sc, input_df) | |
output_df.show() | |
output_df.toPandas() |
Well I way over-engineered this.... apparently you can just do:
(F.col('updated_at') / 1000).astype('timestamp')
and spark will automagically parse as a proper timestamp with milliseconds precision. Same caveat as above that your timestamp column ('updated_at'
), must be 13 digits.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Outputs: