Skip to content

Instantly share code, notes, and snippets.

@maratuska
Last active February 21, 2023 09:41
Show Gist options
  • Save maratuska/c22edb98445fbe4eebdca021f7a5dee9 to your computer and use it in GitHub Desktop.
Save maratuska/c22edb98445fbe4eebdca021f7a5dee9 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 1. Загрузить файл с логами в Spark RDD"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Intitializing Scala interpreter ..."
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"Spark Web UI available at http://192.168.0.102:4044\n",
"SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1594560636658)\n",
"SparkSession available as 'spark'\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO, 2017-03-22T20:11:49+00:00, ghtorrent-31 -- ghtorrent.rb: Added pullreq_commit 244eeac28bf419642d5d5c3b388bd2999c8c72e6 to tgstation/tgstation -> 25341\n",
"DEBUG, 2017-03-23T11:15:14+00:00, ghtorrent-30 -- retriever.rb: Commit mzvast/FlappyFrog -> 80bf5c5fde7be6274a2721422f4d9a773583f73c exists\n",
"DEBUG, 2017-03-22T20:15:48+00:00, ghtorrent-35 -- ghtorrent.rb: Parent af8451e16e077f7c6cae3f98bf43bffaca562f88 for commit 2ef393531a3cfbecc69f17d2cedcc95662fae1e6 exists\n",
"DEBUG, 2017-03-24T12:29:50+00:00, ghtorrent-49 -- ghtorrent.rb: Parent cf060bf3b789ac6391b2f7c1cdc34191c2bc773d for commit 8c924c1115e1abddcaddc27c6e7fd5806583ea90 exists\n",
"DEBUG, 2017-03-23T09:00:44+00:00, ghtorrent-8 -- retriever.rb: Commit iamtheanon/d3 -> a7caf9375fe14d7235562af541fe9decf499bbfb exists\n",
"DEBUG, 2017-03-24T10:52:47+00:00, ghtorrent-50 -- ghtorrent.rb: Repo alyuev/urantia-study-edition exists\n",
"DEBUG, 2017-03-23T11:02:10+00:00, ghtorrent-45 -- retriever.rb: Commit heycalmdown/node-confluence -> 2900cc8718deb2453ff39c909d2563d96227fb3f exists\n",
"INFO, 2017-03-23T10:31:57+00:00, ghtorrent-45 -- retriever.rb: Added commit patrickTingen/DataDigger -> d807d38fe9b9bd1325d45d69752fbabd89d73d96\n",
"INFO, 2017-03-22T20:14:31+00:00, ghtorrent-32 -- ghtorrent.rb: Added commit_assoc of cfd33b7282e5b03e4dc5c2509564e2189cd69af7 with RicardoGuzmanVelasco/Unity-Beat-Detection\n",
"DEBUG, 2017-03-23T11:03:03+00:00, ghtorrent-39 -- ghtorrent.rb: Transaction committed (618 ms)\n"
]
},
{
"data": {
"text/plain": [
"filepath: String = ghtorrent-logs.txt.gz\n",
"rdd: org.apache.spark.rdd.RDD[String] = ghtorrent-logs.txt.gz MapPartitionsRDD[1] at textFile at <console>:26\n"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val filepath = \"ghtorrent-logs.txt.gz\"\n",
"val rdd = sc.textFile(filepath)\n",
"rdd.take(10).foreach(println)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 2. Вывести количество строк в файле"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"res1: Long = 9669788\n"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"rdd.count"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 3. Посчитать количество записей с уровнем WARNING"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"res2: Long = 132158\n"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"rdd.filter(_.startsWith(\"WARN\")).count"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"import org.apache.spark.rdd.RDD\n",
"defined object utils\n"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import org.apache.spark.rdd.RDD\n",
"\n",
"object utils {\n",
" def get_freq(rdd: RDD[String]): RDD[(String, Int)] = {\n",
" return rdd.map(it => (it, 1)).reduceByKey(_ + _);\n",
" }\n",
" \n",
" def get_max_by_freq(rdd: RDD[(String, Int)]): (String, Int) = {\n",
" return rdd.map(it => (it._2, it._1)).max().swap;\n",
" }\n",
" \n",
" def get_max_freq(rdd: RDD[String]): (String, Int) = {\n",
" val freq = get_freq(rdd);\n",
" val max_freq = get_max_by_freq(freq);\n",
" return max_freq;\n",
" }\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 4. Посчитать, сколько всего репозиториев было обработано. Следует учитывать только вызовы api_client"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"import scala.util.matching.Regex\n",
"nameRegexp: String = [\\w\\.\\-\\_]+\n",
"repoUrlRegexp: scala.util.matching.Regex = https:\\/\\/api\\.github\\.com\\/repos\\/[\\w\\.\\-\\_]+\\/[\\w\\.\\-\\_]+\n",
"repos: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:32\n",
"res3: Long = 71807\n"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import scala.util.matching.Regex\n",
"val nameRegexp = \"\"\"[\\w\\.\\-\\_]+\"\"\"\n",
"val repoUrlRegexp = \"\"\"https:\\/\\/api\\.github\\.com\\/repos\\/%s\\/%s\"\"\".format(nameRegexp, nameRegexp).r\n",
"\n",
"val repos = rdd.map(_.split(\"--\").last.trim())\n",
" .flatMap(repoUrlRegexp.findAllIn(_))\n",
"repos.cache()\n",
"\n",
"repos.distinct.count"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 5. Найти клиента, выполнившего больше всего HTTP вызовов"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"clients: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at map at <console>:31\n",
"res4: (String, Int) = (ghtorrent-13,135974)\n"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val clients = rdd.filter(_.split(\"--\").last.trim().startsWith(\"api_client\"))\n",
" .map(_.split(\"--\")(0).trim().split(\"\"\",\\s\"\"\").last)\n",
"\n",
"utils.get_max_freq(clients)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 6. Найти клиента, с наибольшим количеством FAILED HTTP вызовов"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"clients_statuses: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[14] at map at <console>:31\n",
"failed_clients: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:35\n",
"res5: (String, Int) = (ghtorrent-13,79623)\n"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val clients_statuses = rdd.filter(_.split(\"--\").last.trim().startsWith(\"api_client\"))\n",
" .map(x => (x.split(\"--\")(0).trim().split(\"\"\",\\s\"\"\").last, \n",
" x.split(\"--\").last.trim().split(\"\"\"\\.\\s|,\\s\"\"\")(0).split(\":\").last.trim))\n",
"\n",
"val failed_clients = clients_statuses.filter(_._2 == \"Failed request\")\n",
" .map(_._1)\n",
"\n",
"utils.get_max_freq(failed_clients)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 7. Найти наиболее активный по количеству вызовов час"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"import org.joda.time.DateTime\n",
"requests_times: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at map at <console>:33\n",
"requests_by_hours: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[22] at map at <console>:35\n",
"res6: (String, Int) = (11,301408)\n"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import org.joda.time.DateTime\n",
"\n",
"val requests_times = rdd.filter(_.split(\"--\").last.trim().startsWith(\"api_client\"))\n",
" .map(_.split(\"--\")(0).trim().split(\"\"\",\\s\"\"\")(1))\n",
"\n",
"val requests_by_hours = requests_times.map(DateTime.parse(_).getHourOfDay().toString)\n",
"\n",
"utils.get_max_freq(requests_by_hours)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 8. Найти наиболее активный репозиторий"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"res7: (String, Int) = (https://api.github.com/repos/greatfakeman/Tabchi,79524)\n"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"utils.get_max_freq(repos)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 9. Найти лидирующий по ошибкам Access Key (выполнить фильтрацию по \"Access:\")"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"accessRegexp: scala.util.matching.Regex = Access\\:\\s\\w+\n",
"access_keys: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[29] at flatMap at <console>:32\n",
"res8: (String, Int) = (Access: ac6168f8776,79623)\n"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val accessRegexp = \"\"\"Access\\:\\s\\w+\"\"\".r\n",
"val access_keys = rdd.flatMap(accessRegexp.findAllIn(_))\n",
"\n",
"utils.get_max_freq(access_keys)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 10. Посчитать количество успешных и неуспешных вызовов по интересующим репозиториям."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+------------------+\n",
"| url| status|\n",
"+--------------------+------------------+\n",
"|https://api.githu...|Successful request|\n",
"|https://api.githu...|Successful request|\n",
"|https://api.githu...|Successful request|\n",
"|https://api.githu...|Successful request|\n",
"|https://api.githu...|Successful request|\n",
"|https://api.githu...|Successful request|\n",
"|https://api.githu...|Successful request|\n",
"|https://api.githu...| Failed request|\n",
"|https://api.githu...|Successful request|\n",
"|https://api.githu...|Successful request|\n",
"+--------------------+------------------+\n",
"only showing top 10 rows\n",
"\n"
]
},
{
"data": {
"text/plain": [
"api_client_requests: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[34] at filter at <console>:32\n",
"urls_statuses: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[38] at filter at <console>:38\n",
"urls_statuses_df: org.apache.spark.sql.DataFrame = [url: string, status: string]\n"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val api_client_requests = rdd.map(_.split(\"--\").last.trim())\n",
" .filter(_.startsWith(\"api_client\"))\n",
"\n",
"val urls_statuses = api_client_requests.filter(_.contains(\"URL:\"))\n",
" .map(_.split(\"\"\"\\.\\s|,\\s\"\"\"))\n",
" .map(it => (repoUrlRegexp.findFirstIn(it(1)).getOrElse(\"\"), \n",
" it(0).split(\":\").last.trim))\n",
" .filter(_._1 != \"\")\n",
"val urls_statuses_df = urls_statuses.toDF(\"url\", \"status\")\n",
"\n",
"urls_statuses_df.show(10)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+--------------------+--------+--------------------+-----------+-------------------+-----------+-------+-------------------+\n",
"| id| url|owner_id| name| language| created_at|forked_from|deleted| updated_at|\n",
"+--------+--------------------+--------+--------------------+-----------+-------------------+-----------+-------+-------------------+\n",
"|60256709|https://api.githu...|34433644| Basic| Java| 2017-03-21 7:57:59| 35370568| 0| 1970-01-02 0:00:00|\n",
"|60250609|https://api.githu...| 6978755| nextcloud-server| PHP| 2017-03-13 6:31:16| 38091986| 0| 1970-01-02 0:00:00|\n",
"|40175831|https://api.githu...| 5114164| chronopin_node| JavaScript| 2016-06-29 2:24:43| NULL| 0| 1970-01-02 0:00:00|\n",
"|25265021|https://api.githu...| 5002066| Sn1per| PHP|2015-09-06 15:47:38| NULL| 0|2016-02-03 15:43:59|\n",
"|60258131|https://api.githu...| 1886841| markdown-cheatsheet| NULL| 2017-03-21 8:24:18| 1990178| 0| 1970-01-02 0:00:00|\n",
"|50161426|https://api.githu...|28157272| 3xp10it| NULL|2016-11-07 12:28:11| NULL| 0| 1970-01-02 0:00:00|\n",
"|60393147|https://api.githu...|30366120|springboot-docker...| Java| 2017-03-24 8:30:15| 51885577| 0| 1970-01-02 0:00:00|\n",
"|60263879|https://api.githu...|12240209| jQuery-EasyUI| JavaScript|2017-03-21 10:00:46| 29155309| 0| 1970-01-02 0:00:00|\n",
"|60249491|https://api.githu...| 9176581| BMASpinningLabel|Objective-C| 2017-03-21 5:35:58| 59678159| 0| 1970-01-02 0:00:00|\n",
"|60253584|https://api.githu...| 3360269| node| JavaScript| 2017-03-21 6:56:44| 60253585| 0| 1970-01-02 0:00:00|\n",
"+--------+--------------------+--------+--------------------+-----------+-------------------+-----------+-------+-------------------+\n",
"only showing top 10 rows\n",
"\n"
]
},
{
"data": {
"text/plain": [
"important_repos_df: org.apache.spark.sql.DataFrame = [id: string, url: string ... 7 more fields]\n",
"important_repos_urls: org.apache.spark.sql.DataFrame = [url: string]\n"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val important_repos_df = spark.read\n",
" .format(\"csv\")\n",
" .option(\"header\", \"true\")\n",
" .option(\"delimiter\", \",\")\n",
" .load(\"important-repos.csv\")\n",
"important_repos_df.show(10)\n",
"\n",
"val important_repos_urls = important_repos_df.select(\"url\")"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------------------------------------------------------------+------------------+\n",
"|url |status |\n",
"+-------------------------------------------------------------+------------------+\n",
"|https://api.github.com/repos/ghb/springBoot |Successful request|\n",
"|https://api.github.com/repos/mithro/chromium-infra |Successful request|\n",
"|https://api.github.com/repos/cuhappycorner/doorkeeper-mongodb|Successful request|\n",
"|https://api.github.com/repos/anonymous31173/nltk-trainer |Successful request|\n",
"+-------------------------------------------------------------+------------------+\n",
"only showing top 4 rows\n",
"\n",
"+------------------+\n",
"| status|\n",
"+------------------+\n",
"| Failed request|\n",
"|Successful request|\n",
"+------------------+\n",
"\n"
]
},
{
"data": {
"text/plain": [
"join_df: org.apache.spark.sql.DataFrame = [url: string, status: string]\n"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val join_df = important_repos_urls.join(urls_statuses_df, Seq(\"url\"), \"inner\")\n",
"join_df.cache()\n",
"\n",
"join_df.show(4, false)\n",
"join_df.select(\"status\").distinct.show()"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------------------------------------------------------+-------------+------------+\n",
"|url |success_count|failed_count|\n",
"+------------------------------------------------------+-------------+------------+\n",
"|https://api.github.com/repos/mithro/chromium-infra |4084 |0 |\n",
"|https://api.github.com/repos/iam4x/futureRX |305 |0 |\n",
"|https://api.github.com/repos/joomla/joomla-cms |127 |2 |\n",
"|https://api.github.com/repos/makeen-project/makeen |106 |0 |\n",
"|https://api.github.com/repos/zhouchengming1/S |105 |0 |\n",
"|https://api.github.com/repos/wireapp/wire-ios |93 |3 |\n",
"|https://api.github.com/repos/eulerien/curryWurst |84 |0 |\n",
"|https://api.github.com/repos/cdnjs/cdnjs |77 |0 |\n",
"|https://api.github.com/repos/bioconda/bioconda-recipes|76 |0 |\n",
"|https://api.github.com/repos/rulerp/rulerp.github.io |74 |0 |\n",
"+------------------------------------------------------+-------------+------------+\n",
"only showing top 10 rows\n",
"\n"
]
},
{
"data": {
"text/plain": [
"success_freq: org.apache.spark.sql.DataFrame = [url: string, success_count: bigint]\n",
"failed_freq: org.apache.spark.sql.DataFrame = [url: string, failed_count: bigint]\n",
"result: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [url: string, success_count: bigint ... 1 more field]\n"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val success_freq = join_df.filter($\"status\".contains(\"Successful\"))\n",
" .groupBy(\"url\")\n",
" .count.withColumnRenamed(\"count\", \"success_count\")\n",
"val failed_freq = join_df.filter($\"status\".contains(\"Failed\"))\n",
" .groupBy(\"url\")\n",
" .count.withColumnRenamed(\"count\", \"failed_count\")\n",
"\n",
"val result = success_freq.join(failed_freq, Seq(\"url\"), \"outer\")\n",
" .na.fill(0, Seq(\"success_count\", \"failed_count\"))\n",
" .orderBy($\"success_count\".desc)\n",
"\n",
"result.cache()\n",
"result.show(10, false)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------------+-----+\n",
"| url| status|count|\n",
"+--------------------+--------------+-----+\n",
"|https://api.githu...|Failed request| 5|\n",
"|https://api.githu...|Failed request| 3|\n",
"|https://api.githu...|Failed request| 3|\n",
"|https://api.githu...|Failed request| 2|\n",
"|https://api.githu...|Failed request| 2|\n",
"|https://api.githu...|Failed request| 2|\n",
"|https://api.githu...|Failed request| 2|\n",
"|https://api.githu...|Failed request| 2|\n",
"|https://api.githu...|Failed request| 2|\n",
"|https://api.githu...|Failed request| 2|\n",
"+--------------------+--------------+-----+\n",
"only showing top 10 rows\n",
"\n"
]
}
],
"source": [
"// или в другой форме\n",
"join_df.groupBy(\"url\", \"status\").count()\n",
" .orderBy($\"status\", $\"count\".desc)\n",
" .show(10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "spylon-kernel",
"language": "scala",
"name": "spylon-kernel"
},
"language_info": {
"codemirror_mode": "text/x-scala",
"file_extension": ".scala",
"help_links": [
{
"text": "MetaKernel Magics",
"url": "https://metakernel.readthedocs.io/en/latest/source/README.html"
}
],
"mimetype": "text/x-scala",
"name": "scala",
"pygments_lexer": "scala",
"version": "0.4.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment