Skip to content

Instantly share code, notes, and snippets.

@kolommik
Created August 20, 2020 21:16
Show Gist options
  • Save kolommik/98bfd52caec167795a05eb52a7936316 to your computer and use it in GitHub Desktop.
Save kolommik/98bfd52caec167795a05eb52a7936316 to your computer and use it in GitHub Desktop.
PRODUCT_CATALOG_PATH = "mnt/PRODGEN2/OUTPUT/RUSSIA_DATA_FOUNDATION/UNIVERSALCATALOG/MASTERDATA/MARS_UNIVERSAL_PETCARE_MATERIALS.csv"
# product_catalog = spark.read.csv(PRODUCT_CATALOG_PATH, header = "True", sep=chr(1), schema = product_catalogSchema)
# Убрал привязку к схеме (если схема меняется... а такое случается, то надо ее обновлять, т.к. перестает загружать новые записи)
product_catalog = spark.read.csv(PRODUCT_CATALOG_PATH, header = "True", sep=chr(1))
product_catalog = (
product_catalog
.withColumn("VMSTD", F.to_date(col("VMSTD"),'yyyy.MM.dd')) # Cahnge date format
.withColumn("START_DATE", F.to_date(col("START_DATE"),'yyyy-MM-dd')) # Cahnge date format
.withColumn("END_DATE", F.to_date(col("END_DATE"),'yyyy-MM-dd')) # Cahnge date format
.withColumn("rn", F.row_number().over(Window.partitionBy(["MATNR"]).orderBy(desc("VMSTD")))) # add Row_number
.where(
(col("rn")==1) # Filter only last (by VMSTD) values
)
.drop("rn") # drop column, no need any more
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment