Skip to content

Instantly share code, notes, and snippets.

@danield137
Last active January 20, 2023 05:35
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save danield137/f4d6c7cb045e027fb011206e06972749 to your computer and use it in GitHub Desktop.
Save danield137/f4d6c7cb045e027fb011206e06972749 to your computer and use it in GitHub Desktop.
Kusto Github Demo
from pprint import pprint
import requests
import json
import os
import sys
import time
import datetime
import traceback
from azure.eventhub import EventHubClient, Sender, EventData, EventHubError
import logging
eventhubs = [
EventHubClient(
"sb://<namespace>.servicebus.windows.net/<ehname>",
username="<keyname>",
password="<key>",
)
]
token = "<gh_token>"
ENDPOINT = "https://api.github.com/events?per_page=100"
def log(m):
print("{} | {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], m))
def debug(message):
# log("{} | {}".format("DEBUG", message))
pass
def info(message):
log("{} | {}".format("INFO", message))
def error(message, err):
log("{} | {} | {}".format("ERROR", message, err))
class Monitor:
interval = 60
def __init__(self, report_cb=None):
self.lut = time.time()
self.events_sent = 0
self.requests_issued = 0
self.report_cb = report_cb or print
def reset(self):
self.lut = time.time()
self.events_sent = 0
self.requests_issued = 0
def report(self):
now = time.time()
if now - self.lut >= 60:
self._report()
self.reset()
def _report(self):
msg = "Monitor: events sent : {} calls made :{}".format(self.events_sent, self.requests_issued)
self.report_cb(msg)
class SlidingCache:
def __init__(self, max_size=500):
self.prev = set()
self.current = set()
self.max_size = max_size
def add(self, item):
if item not in self:
if len(self.current) > self.max_size:
self.prev = self.current
self.current = set()
self.current.add(item)
def __contains__(self, item):
return item in self.current or item in self.prev
class BufferedEventHubSender:
# eventhub max size is 262144, just to be on the safe side, reduce this to 250000
def __init__(self, sender, flush_cb, serializer=json.dumps, item_seperator="\n", max_size=250000):
self.max_size = max_size
self.buffer = ""
self.item_count = 0
self.sender = sender
self.flush_cb = flush_cb
self.item_seperator = item_seperator
self.serializer = serializer
def push(self, item):
_item = "{}{}".format(self.serializer(item), self.item_seperator)
if len(_item.encode("utf-8")) > self.max_size:
raise EventHubError(
"Item {} is to big ({}) where as limit is {}. Ignoring.".format(
_item, len(_item.encode("utf-8")), self.max_size
)
)
if len((self.buffer + _item).encode("utf-8")) > self.max_size:
self.flush()
self.buffer += _item
self.item_count += 1
def flush(self):
try:
start = time.time()
self.sender.send(EventData(self.buffer))
self.flush_cb(time.time() - start, self.item_count)
except Exception as e:
raise EventHubError("lost the following {} records:\n {}".format(self.item_count, self.buffer))
self.buffer = ""
self.item_count = 0
def run():
info("STARTED github to eventhub")
headers = {"Authorization": "token {}".format(token)}
monitor = Monitor(info)
senders = []
for eh_client in eventhubs:
senders.append(
BufferedEventHubSender(
eh_client.add_sender(),
flush_cb=lambda took, count: debug(
"EVENTHUB REQUEST | took: {} sec, sent {} records to {}.".format(took, count, eh_client.eh_name)
),
)
)
failed = eh_client.run()
if failed:
raise EventHubError("Couldn't connect to EH {}".format(eh_client.eh_name))
seconds_per_request = round(
1.0 / (5000 / 60 / 60), 2
) # requests / minutes / seconds = requests per sec, ^-1=secs per request
cache = SlidingCache()
events = 0
loop = True
while loop:
loop_start_time = time.time()
monitor.report()
try:
resp = requests.get(ENDPOINT, headers=headers)
resp.raise_for_status()
monitor.requests_issued += 1
data = sorted(resp.json(), key=lambda x: x["id"])
payload = ""
debug("GITHUB REQUEST | took {} sec, got {} events.".format(resp.elapsed.total_seconds(), len(data)))
for d in data:
if d["id"] not in cache:
for buffered_sender in senders:
try:
buffered_sender.push(d)
except EventHubError as e:
error("EventHubError", e.message)
monitor.events_sent += 1
cache.add(d.get("id"))
cycle_took = time.time() - loop_start_time
delay = seconds_per_request - cycle_took
debug("CYCLE DONE | took {}, waiting for {}".format(cycle_took, max(delay, 0)))
if delay > 0:
time.sleep(delay)
except requests.HTTPError as e:
if resp.status_code in [429, 403]:
time_to_wait = int(
float(resp.headers.get("X-RateLimit-Reset", 60)) - datetime.datetime.utcnow().timestamp()
)
info("waiting for {}".format(time_to_wait))
if time_to_wait > 0:
time.sleep(time_to_wait)
error("HTTP EXCEPTION", repr(e))
except EventHubError as e:
error("Failed to send events to eventhub, skipping", repr(e))
except Exception as e:
error("UNEXPECTED ERROR", repr(e))
traceback.print_exc()
os.kill(os.getpid(), 9)
if __name__ == "__main__":
run()
.create table GithubEvent ( Id:int64, Type: string, Actor: dynamic, Repo: dynamic, Payload: dynamic, Public:bool, CreatedAt: datetime)
import requests
import sys
import os
from azure.kusto.data.request import KustoConnectionStringBuilder
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, DataFormat
from azure.storage.common import CloudStorageAccount
account_name=<azure_storage_account>
account_key=<azure_storage_account_key>
container_name=<container_name>
kusto_url=<kusto_url>
app_id=<app_id>
app_key=<app_key>
auth_id=<authority_id>
client = KustoIngestClient(
KustoConnectionStringBuilder.with_aad_application_key_authentication(
kusto_url,app_id,app_key,auth_id
)
)
mapping = [
{"column": "Id", "path": "$.id"},
{"column": "Type", "path": "$.type"},
{"column": "Actor", "path": "$.actor"},
{"column": "Repo", "path": "$.repo"},
{"column": "Payload", "path": "$.payload"},
{"column": "Public", "path": "$.public"},
{"column": "CreatedAt", "path": "$.create_at"},
]
def download_locally(domain, path):
if os.path.exists(path):
print("{} exists on local fs, skipping...".format(path))
else:
uri = domain + path
response = requests.get(uri, stream=True)
try:
# Throw an error for bad status codes
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
print("{} not found, skipping.".format(uri))
else:
print("{} erred status {}".format(uri, e.response.status_code))
return False
with open(path, "wb") as handle:
for block in response.iter_content(1024):
handle.write(block)
return True
def missing_file(path):
storage_client = CloudStorageAccount(account_name=account_name, account_key=account_key)
blob_service = storage_client.create_block_blob_service()
return len(list(blob_service.list_blobs(container_name, path))) == 0
def upload_to_azure_storage(path):
storage_client = CloudStorageAccount(account_name=account_name, account_key=account_key)
blob_service = storage_client.create_block_blob_service()
if len(list(blob_service.list_blobs(container_name, path))) > 0:
print("{} exists on azure storage, skipping...".format(path))
else:
blob_service.create_blob_from_path(container_name=container_name, blob_name=path, file_path=path)
print("uploaded to storage {}".format(path))
def main():
print("current folder is: {}".format(os.getcwd()))
url_domain = "http://data.gharchive.org/"
url_path = "{y}-{m:02d}-{d:02d}-{h}.json.gz"
years = range(2016,2019)
months = range(1,13)
days = range(1, 32)
hours = range(0, 24)
for y in years:
for m in months:
for d in days:
for h in hours:
p = url_path.format(y=y, m=m, d=d, h=h)
if missing_file(path=p):
if download_locally(domain=url_domain, path=p):
upload_to_azure_storage(p)
else:
print('{} found. SKIPPING...'.format(p))
if __name__ == "__main__":
main()
import requests
import datetime
import sys
import os
from azure.storage.common import CloudStorageAccount
storage_account = <>
storage_key = <>
container_name = <>
url_domain = "http://data.gharchive.org/"
URL_PATH_FORMAT = "{y}-{m:02d}-{d:02d}-{h}.json.gz"
def download_locally(domain, path):
if os.path.exists(path):
print("{} exists on local fs, skipping...".format(path))
else:
uri = domain + path
response = requests.get(uri, stream=True)
try:
# Throw an error for bad status codes
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
print("{} not found, skipping.".format(uri))
else:
print("{} erred status {}".format(uri, e.response.status_code))
return False
with open(path, "wb") as handle:
for block in response.iter_content(1024):
handle.write(block)
return True
def upload_to_azure_storage(path):
storage_client = CloudStorageAccount(account_name=storage_account, account_key=storage_key)
blob_service = storage_client.create_block_blob_service()
blob_service.create_blob_from_path(container_name=container_name, blob_name=path, file_path=path)
print("uploaded to storage {}".format(path))
def last_existing_time(since):
storage_client = CloudStorageAccount(account_name=storage_account, account_key=storage_key)
blob_service = storage_client.create_block_blob_service()
last = since or datetime.datetime.utcnow()
while not blob_service.exists(
container_name, URL_PATH_FORMAT.format(y=last.year, m=last.month, d=last.day, h=last.hour)
):
last -= datetime.timedelta(hours=1)
return last
def download_time_period(from_date, to_date):
current = from_date
last = to_date
while current != last:
p = URL_PATH_FORMAT.format(y=current.year, m=current.month, d=current.day, h=current.hour)
if download_locally(domain=url_domain, path=p):
upload_to_azure_storage(p)
current += datetime.timedelta(hours=1)
def main():
print("current folder is: {}".format(os.getcwd()))
now = datetime.datetime.utcnow()
last_time = last_existing_time(since=now)
download_time_period(last_time, now)
if __name__ == "__main__":
main()
import requests
import sys
import os
from azure.kusto.data.request import KustoConnectionStringBuilder
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, DataFormat, BlobDescriptor
from azure.storage.common import CloudStorageAccount
kusto_url = <>
app_id = <>
app_key = <>
auth_id = <>
storage_account_name = <>
storage_account_key = <>
storage_container = <>
client = KustoIngestClient(
KustoConnectionStringBuilder.with_aad_application_key_authentication(
kusto_url,app_id,app_key,auth_id
)
)
mapping = [
{"column": "Id", "path": "$.id"},
{"column": "Type", "path": "$.type"},
{"column": "Actor", "path": "$.actor"},
{"column": "Repo", "path": "$.repo"},
{"column": "Payload", "path": "$.payload"},
{"column": "Public", "path": "$.public"},
{"column": "CreatedAt", "path": "$.created_at"},
]
def ingest(file, size):
props = IngestionProperties(
database="GitHub",
table="GithubEvent",
dataFormat=DataFormat.json,
mapping=mapping,
ingestIfNotExists=[file],
ingestByTags=[file],
dropByTags=[file[57:67]],
)
client.ingest_from_blob(BlobDescriptor(file, size), props)
print("ingested {}".format(file))
def main():
storage_client = CloudStorageAccount(account_name=storage_account_name, account_key=storage_account_key)
blob_service = storage_client.create_block_blob_service()
years = range(2016, 2019)
months = range(1, 13)
days = range(1, 32)
hours = range(0, 24)
url_path = "{y}-{m:02d}-{d:02d}-{h}.json.gz"
for y in years:
for m in months:
for d in days:
for h in hours:
p = url_path.format(y=y, m=m, d=d, h=h)
if os.path.exists(p + "_done"):
print("{} ingested. skipping...".format(p))
continue
ingest(
"https://{}.blob.core.windows.net/{}/{};{}".format(
storage_account_name, storage_container, p, storage_account_key
),
50 * 1024 * 1024,
)
with open(p + "_done", "w+") as f:
f.write(" ")
if __name__ == "__main__":
main()
// Data set:
// xx Rows
// Schema
GithubEvent
| getschema
| project ColumnName, ColumnType
// XX records
GithubEvent
| count
// Event types and counts
GithubEvent
| summarize Events=count() by Type
| order by Events
// Observing several push events
GithubEvent
| where Type == "PushEvent"
| take 10
GithubEvent
| where Type == "PushEvent"
| take 10
| evaluate bag_unpack(Payload)
| getschema
| project ColumnName, ColumnType
// Top 10 - popular repos (by push events)
GithubEvent
| where Type == "PushEvent"
| summarize PushEvents=count() by RepoName = tostring(Repo.name)
| top 10 by PushEvents
// Top 10 - popular repos (by watch events)
GithubEvent
| where Type == "WatchEvent"
| summarize PushEvents=count() by RepoName = tostring(Repo.name)
| top 10 by PushEvents
// Top 10 - popular repos (by fork events)
GithubEvent
| where Type == "ForkEvent"
| summarize PushEvents=count() by RepoName = tostring(Repo.name)
| top 10 by PushEvents
GithubEvent
| where Type in ("ForkEvent", "WatchEvent", "PushEvent")
| summarize PushEvents=count() by RepoName = tostring(Repo.name)
| top 10 by PushEvents
// Top 10 - popular repos (by watch events)
let popularRepos=
GithubEvent
| where Type == "WatchEvent"
| summarize WatchEvent=count() by RepoName = tostring(Repo.name)
| top 10 by WatchEvent
| project RepoName;
GithubEvent
| extend RepoName = tostring(Repo.name)
| where RepoName in (popularRepos)
| where Type == "WatchEvent"
| summarize count() by RepoName, bin(CreatedAt, 1d)
| render timechart
// Top 10 - popular repos (by watch events)
let watchedRepos = GithubEvent
| where Type == "WatchEvent"
| summarize WatchEvents=count() by RepoName = tostring(Repo.name);
let issuesRepos = GithubEvent
| where Type in ("IssueCommentEvent", "IssuesEvent")
| summarize IssueEvents=count() by RepoName = tostring(Repo.name);
watchedRepos
| join
(
issuesRepos
)
on RepoName
| extend Popularity=(WatchEvents * 0.8) + (IssueEvents*0.2)
| top 10 by Popularity
// ----------------------------More Queries-----------------
// Comparing between two languages TypeScript vs CoffeeScript.
// You can spot one language getting more popular over time and the other trending down
GithubEvent
| where Type == "PullRequestEvent"
| where Payload.pull_request.head.repo.language == 'TypeScript' or
Payload.pull_request.head.repo.language == 'CoffeeScript'
| extend language=tostring(Payload.pull_request.head.repo.language)
| summarize count() by language, bin(CreatedAt, 30d)
| render timechart with (ysplit = axes)
// Lagnuages to learn
let languages =
GithubEvent
| where Type == "PullRequestEvent"
| extend language=tostring(Payload.pull_request.head.repo.language);
let lastYear = languages | where CreatedAt between(datetime(2017-08-01) .. 30d) | summarize LastYearCount=count() by language;
let thisYear = languages | where CreatedAt between(datetime(2018-08-01) .. 30d) | summarize ThisYearCount=count() by language;
lastYear
| join (thisYear) on language
| extend diff = (ThisYearCount - LastYearCount) * 100.0 / LastYearCount
| where diff > 50.0 and ThisYearCount > 2000
| top 5 by ThisYearCount
| project-away language1
// Top 5 - popular repos in June 2018 (by watch events)
// And their weekly trend over the past 2 years
let popularRepos=
GithubEvent
| where CreatedAt between(datetime(2018-06-01)..30d)
| where Type == "WatchEvent"
| summarize WatchEvent=count() by RepoName = tostring(Repo.name)
| top 5 by WatchEvent
| project RepoName;
GithubEvent
| extend RepoName = tostring(Repo.name)
| where RepoName in (popularRepos)
| where Type == "WatchEvent"
| summarize count() by RepoName, startofweek(CreatedAt)
| render timechart
// Full-text search across all columns for comments (over 3TB data)
GithubEvent
| where * has 'This is nonsense'
| summarize NonsenseUsers=dcount(tostring(Actor.login)), NonSenseComments=count()
// Top technology that mentions 'Microsoft', 'Google', 'Amazon', 'Facebook' in any comments (issues, commits or pull-requests)
let CommentsWith = (keyword:string)
{
GithubEvent
| where Payload.comment.body has keyword
| summarize Mentions=count() by Repo=tostring(Repo.name), Keyword=keyword
| top 1 by Mentions
};
union
(CommentsWith('Microsoft')),
(CommentsWith('Google')),
(CommentsWith('Amazon')),
(CommentsWith('Facebook')),
(CommentsWith('Apple'))
| project Keyword, Repo, Mentions
| order by Keyword
// Look on trending up Microsoft repos based on MoM Watch Events.
// The query defines a filter for at least 100 watches (absolute number) and 30% increase in Watch events
GithubEvent
| where CreatedAt >= ago(180d)
| where Type == "WatchEvent"
| where (Repo.name startswith 'Azure' or Repo.name startswith 'Microsoft')
| summarize Count=count() by CreatedAt=bin(CreatedAt, 30d), Repo=tostring(Repo.name)
| order by Repo asc, CreatedAt asc
| serialize
| extend PrevCount = prev(Count), PrevRepo = prev(Repo)
| extend DiffRatio = iif(Repo == PrevRepo, (Count - PrevCount)*100.0 / PrevCount, 0.0)
| where DiffRatio > 30.0 and Count > 150
| summarize arg_max(CreatedAt, DiffRatio, PrevCount, Count) by Repo
| order by DiffRatio
// Look for string patterns 'is the best X ever' and try to find out what are the top 10 subjects.
GithubEvent
| where Payload.comment.body has 'is the best'
| extend whatWasGood = extract(@'best (.+) ever', 1, tostring(Payload.comment.body))
| where isnotempty(whatWasGood)
| summarize count() by whatWasGood=substring(whatWasGood, 0, 27)
| top 10 by count_
// Full-text search across all data corpus for 'Kusto' stirng and finding top-5 repos that mention it.
GithubEvent
| where * has 'Kusto'
| summarize count() by tostring(Repo.name)
| top 5 by count_
// I'll conclude my GitHub events data exploration exercise
// with light weight sentiment analysis on these three technologies we examined
// I'm using Kusto text analytics capabilities to scan through all events with comment payload
// searching for happy and sad emoji :-) and :-(
GithubEvent
| where Repo.name == "Microsoft/vscode" or Repo.name == "Microsoft/TypeScript" or Repo.name == "Microsoft/CNTK"
| where Type in ("CommitCommentEvent", "IssueCommentEvent", "PullReviewRequestCommentEvent")
| extend comment = tostring(Payload.comment.body)
| extend hasHappyEmoji = (comment matches regex @'[;:]-?\)'),
hasSadEmoji = comment matches regex @'[;:]-?\('
| summarize happy=countif(hasHappyEmoji), sad=countif(hasSadEmoji) by RepoName = tostring(Repo.name)
| render columnchart
// As you can see, while only vscode popularity is trending up, all three, exhibit positive sentiment
// Big data can be surprising... Let's try to use it to detect technology hypes.
// Here are few keywords: 'Spectre', 'GDPR', 'Blockchain', and 'Quantum'.
// All are quite famous as they made to the news past 12 months.
// Let's look in all terrabytes of Payload data to check how people on GitHub were affected.
let HypeOrNot = (_what:string)
{
GithubEvent
| where Payload has _what
| project Repo = tostring(Repo.name), CreatedAt
| summarize Mentions=count() by bin(CreatedAt, 7d), What=_what
};
union
(HypeOrNot('Spectre')),
(HypeOrNot('GDPR')),
(HypeOrNot('Blockchain')),
(HypeOrNot('Quantum'))
| render timechart
// Look for string patterns 'the best <SOMETHING>' and try to find out what are the top 10 subjects.
// Let's find out _what_ is referecned as 'THE BEST' by people usign GitHub 2years comments history.
GithubEvent
| where Payload.comment.body has 'the best'
| extend TheBest = extract(@'the best (.+?)[.,]', 1, tostring(Payload.comment.body))
| where isnotempty(TheBest) and strlen(TheBest) > 4
| extend TheBest = trim(@'\.', TheBest)
| summarize Appearances=count() by TheBest=substring(TheBest, 0, 50)
| top 10 by Appearances
// Now, we are not just looking for THE BEST. We are going for THE BEST EVER!
GithubEvent
| where Payload.comment.body has 'the best'
| extend TheBestEver = extract(@'the best (.+) ever', 1, tostring(Payload.comment.body))
| where isnotempty(TheBestEver) and strlen(TheBestEver) > 4
| summarize Appearances=count() by TheBestEver=substring(TheBestEver, 0, 25)
| top 5 by Appearances
// Isn't it niddle in the GitHub stack data?
@mitulmk
Copy link

mitulmk commented Jan 23, 2020

Daniel, I came here from your medium article, great read! One question - do you have an updated version of crawl_send_eventhub.py - that works with the latest version of EventHub SDK?
Thanks,
Mitul.

@danield137
Copy link
Author

Daniel, I came here from your medium article, great read! One question - do you have an updated version of crawl_send_eventhub.py - that works with the latest version of EventHub SDK?
Thanks,
Mitul.

I was unaware eventhub changed their sdk 😄
It shouldn't be that hard to achieve.

I might at some point republish this a an open source package, but no plans for such an idea yet..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment