Skip to content

Instantly share code, notes, and snippets.

@mahmoudhanafy
Created July 12, 2016 12:31
Show Gist options
  • Save mahmoudhanafy/ac232a8f20389c50189443f099642a22 to your computer and use it in GitHub Desktop.
Save mahmoudhanafy/ac232a8f20389c50189443f099642a22 to your computer and use it in GitHub Desktop.
import java.io._
import java.net.URI
import java.time.LocalDate
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, _}
val startDate = LocalDate.of(2014, 10, 31)
val endDate = LocalDate.of(2015, 7, 1)
val hivePath = "/user/host/Seeloz/Data/Hive"
val aggregationPath = s"$hivePath/Aggregation"
val tmpPath = "/user/host/Seeloz/tmp"
val fs = FileSystem.get(new URI(hivePath), new Configuration())
val stores = Set(74200,74201,74179,74180,74181,74182,74183,74184,74185,74187,74188,74189,74190,74191,74192,74193,74194,74195,74196,74197,74199)
val fileNames = fs.listStatus(new Path(aggregationPath)).map(_.getPath.toString)
val reasorFiles = files.filter(s => stores.contains(s.split("/").last.split("_").head.toInt))
val requiredRange = reasorFiles.map(s => (LocalDate.parse(s.split("_")(1).substring(0, 10)), s)).filter(t => t._1.isAfter(startDate) && t._1.isBefore(endDate))
requiredRange.foreach { case (date, path) =>
val newDate = date.plusYears(1).toString
val newPath = path.split("_")(0) + newDate + path.split("_")(1).substring(10)
val rdd = sc.textFile(date)
val updatedRDD = rdd.map { line =>
val splits = line.split(",")
splits(5) = newDate
splits.mkString(",")
}
val tmpRDDPath = s"$tmpPath/tmpFile"
updatedRDD.repartition(1).saveAsTextFile(tmpRDDPath)
fs.delete(path, true)
fs.rename(tmpRDDPath, newPath)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment