Last active
January 20, 2023 05:35
-
-
Save danield137/f4d6c7cb045e027fb011206e06972749 to your computer and use it in GitHub Desktop.
Kusto Github Demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pprint import pprint | |
import requests | |
import json | |
import os | |
import sys | |
import time | |
import datetime | |
import traceback | |
from azure.eventhub import EventHubClient, Sender, EventData, EventHubError | |
import logging | |
eventhubs = [ | |
EventHubClient( | |
"sb://<namespace>.servicebus.windows.net/<ehname>", | |
username="<keyname>", | |
password="<key>", | |
) | |
] | |
token = "<gh_token>" | |
ENDPOINT = "https://api.github.com/events?per_page=100" | |
def log(m): | |
print("{} | {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], m)) | |
def debug(message): | |
# log("{} | {}".format("DEBUG", message)) | |
pass | |
def info(message): | |
log("{} | {}".format("INFO", message)) | |
def error(message, err): | |
log("{} | {} | {}".format("ERROR", message, err)) | |
class Monitor: | |
interval = 60 | |
def __init__(self, report_cb=None): | |
self.lut = time.time() | |
self.events_sent = 0 | |
self.requests_issued = 0 | |
self.report_cb = report_cb or print | |
def reset(self): | |
self.lut = time.time() | |
self.events_sent = 0 | |
self.requests_issued = 0 | |
def report(self): | |
now = time.time() | |
if now - self.lut >= 60: | |
self._report() | |
self.reset() | |
def _report(self): | |
msg = "Monitor: events sent : {} calls made :{}".format(self.events_sent, self.requests_issued) | |
self.report_cb(msg) | |
class SlidingCache: | |
def __init__(self, max_size=500): | |
self.prev = set() | |
self.current = set() | |
self.max_size = max_size | |
def add(self, item): | |
if item not in self: | |
if len(self.current) > self.max_size: | |
self.prev = self.current | |
self.current = set() | |
self.current.add(item) | |
def __contains__(self, item): | |
return item in self.current or item in self.prev | |
class BufferedEventHubSender: | |
# eventhub max size is 262144, just to be on the safe side, reduce this to 250000 | |
def __init__(self, sender, flush_cb, serializer=json.dumps, item_seperator="\n", max_size=250000): | |
self.max_size = max_size | |
self.buffer = "" | |
self.item_count = 0 | |
self.sender = sender | |
self.flush_cb = flush_cb | |
self.item_seperator = item_seperator | |
self.serializer = serializer | |
def push(self, item): | |
_item = "{}{}".format(self.serializer(item), self.item_seperator) | |
if len(_item.encode("utf-8")) > self.max_size: | |
raise EventHubError( | |
"Item {} is to big ({}) where as limit is {}. Ignoring.".format( | |
_item, len(_item.encode("utf-8")), self.max_size | |
) | |
) | |
if len((self.buffer + _item).encode("utf-8")) > self.max_size: | |
self.flush() | |
self.buffer += _item | |
self.item_count += 1 | |
def flush(self): | |
try: | |
start = time.time() | |
self.sender.send(EventData(self.buffer)) | |
self.flush_cb(time.time() - start, self.item_count) | |
except Exception as e: | |
raise EventHubError("lost the following {} records:\n {}".format(self.item_count, self.buffer)) | |
self.buffer = "" | |
self.item_count = 0 | |
def run(): | |
info("STARTED github to eventhub") | |
headers = {"Authorization": "token {}".format(token)} | |
monitor = Monitor(info) | |
senders = [] | |
for eh_client in eventhubs: | |
senders.append( | |
BufferedEventHubSender( | |
eh_client.add_sender(), | |
flush_cb=lambda took, count: debug( | |
"EVENTHUB REQUEST | took: {} sec, sent {} records to {}.".format(took, count, eh_client.eh_name) | |
), | |
) | |
) | |
failed = eh_client.run() | |
if failed: | |
raise EventHubError("Couldn't connect to EH {}".format(eh_client.eh_name)) | |
seconds_per_request = round( | |
1.0 / (5000 / 60 / 60), 2 | |
) # requests / minutes / seconds = requests per sec, ^-1=secs per request | |
cache = SlidingCache() | |
events = 0 | |
loop = True | |
while loop: | |
loop_start_time = time.time() | |
monitor.report() | |
try: | |
resp = requests.get(ENDPOINT, headers=headers) | |
resp.raise_for_status() | |
monitor.requests_issued += 1 | |
data = sorted(resp.json(), key=lambda x: x["id"]) | |
payload = "" | |
debug("GITHUB REQUEST | took {} sec, got {} events.".format(resp.elapsed.total_seconds(), len(data))) | |
for d in data: | |
if d["id"] not in cache: | |
for buffered_sender in senders: | |
try: | |
buffered_sender.push(d) | |
except EventHubError as e: | |
error("EventHubError", e.message) | |
monitor.events_sent += 1 | |
cache.add(d.get("id")) | |
cycle_took = time.time() - loop_start_time | |
delay = seconds_per_request - cycle_took | |
debug("CYCLE DONE | took {}, waiting for {}".format(cycle_took, max(delay, 0))) | |
if delay > 0: | |
time.sleep(delay) | |
except requests.HTTPError as e: | |
if resp.status_code in [429, 403]: | |
time_to_wait = int( | |
float(resp.headers.get("X-RateLimit-Reset", 60)) - datetime.datetime.utcnow().timestamp() | |
) | |
info("waiting for {}".format(time_to_wait)) | |
if time_to_wait > 0: | |
time.sleep(time_to_wait) | |
error("HTTP EXCEPTION", repr(e)) | |
except EventHubError as e: | |
error("Failed to send events to eventhub, skipping", repr(e)) | |
except Exception as e: | |
error("UNEXPECTED ERROR", repr(e)) | |
traceback.print_exc() | |
os.kill(os.getpid(), 9) | |
if __name__ == "__main__": | |
run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.create table GithubEvent ( Id:int64, Type: string, Actor: dynamic, Repo: dynamic, Payload: dynamic, Public:bool, CreatedAt: datetime) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import sys | |
import os | |
from azure.kusto.data.request import KustoConnectionStringBuilder | |
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, DataFormat | |
from azure.storage.common import CloudStorageAccount | |
account_name=<azure_storage_account> | |
account_key=<azure_storage_account_key> | |
container_name=<container_name> | |
kusto_url=<kusto_url> | |
app_id=<app_id> | |
app_key=<app_key> | |
auth_id=<authority_id> | |
client = KustoIngestClient( | |
KustoConnectionStringBuilder.with_aad_application_key_authentication( | |
kusto_url,app_id,app_key,auth_id | |
) | |
) | |
mapping = [ | |
{"column": "Id", "path": "$.id"}, | |
{"column": "Type", "path": "$.type"}, | |
{"column": "Actor", "path": "$.actor"}, | |
{"column": "Repo", "path": "$.repo"}, | |
{"column": "Payload", "path": "$.payload"}, | |
{"column": "Public", "path": "$.public"}, | |
{"column": "CreatedAt", "path": "$.create_at"}, | |
] | |
def download_locally(domain, path): | |
if os.path.exists(path): | |
print("{} exists on local fs, skipping...".format(path)) | |
else: | |
uri = domain + path | |
response = requests.get(uri, stream=True) | |
try: | |
# Throw an error for bad status codes | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as e: | |
if e.response.status_code == 404: | |
print("{} not found, skipping.".format(uri)) | |
else: | |
print("{} erred status {}".format(uri, e.response.status_code)) | |
return False | |
with open(path, "wb") as handle: | |
for block in response.iter_content(1024): | |
handle.write(block) | |
return True | |
def missing_file(path): | |
storage_client = CloudStorageAccount(account_name=account_name, account_key=account_key) | |
blob_service = storage_client.create_block_blob_service() | |
return len(list(blob_service.list_blobs(container_name, path))) == 0 | |
def upload_to_azure_storage(path): | |
storage_client = CloudStorageAccount(account_name=account_name, account_key=account_key) | |
blob_service = storage_client.create_block_blob_service() | |
if len(list(blob_service.list_blobs(container_name, path))) > 0: | |
print("{} exists on azure storage, skipping...".format(path)) | |
else: | |
blob_service.create_blob_from_path(container_name=container_name, blob_name=path, file_path=path) | |
print("uploaded to storage {}".format(path)) | |
def main(): | |
print("current folder is: {}".format(os.getcwd())) | |
url_domain = "http://data.gharchive.org/" | |
url_path = "{y}-{m:02d}-{d:02d}-{h}.json.gz" | |
years = range(2016,2019) | |
months = range(1,13) | |
days = range(1, 32) | |
hours = range(0, 24) | |
for y in years: | |
for m in months: | |
for d in days: | |
for h in hours: | |
p = url_path.format(y=y, m=m, d=d, h=h) | |
if missing_file(path=p): | |
if download_locally(domain=url_domain, path=p): | |
upload_to_azure_storage(p) | |
else: | |
print('{} found. SKIPPING...'.format(p)) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import datetime | |
import sys | |
import os | |
from azure.storage.common import CloudStorageAccount | |
storage_account = <> | |
storage_key = <> | |
container_name = <> | |
url_domain = "http://data.gharchive.org/" | |
URL_PATH_FORMAT = "{y}-{m:02d}-{d:02d}-{h}.json.gz" | |
def download_locally(domain, path): | |
if os.path.exists(path): | |
print("{} exists on local fs, skipping...".format(path)) | |
else: | |
uri = domain + path | |
response = requests.get(uri, stream=True) | |
try: | |
# Throw an error for bad status codes | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as e: | |
if e.response.status_code == 404: | |
print("{} not found, skipping.".format(uri)) | |
else: | |
print("{} erred status {}".format(uri, e.response.status_code)) | |
return False | |
with open(path, "wb") as handle: | |
for block in response.iter_content(1024): | |
handle.write(block) | |
return True | |
def upload_to_azure_storage(path): | |
storage_client = CloudStorageAccount(account_name=storage_account, account_key=storage_key) | |
blob_service = storage_client.create_block_blob_service() | |
blob_service.create_blob_from_path(container_name=container_name, blob_name=path, file_path=path) | |
print("uploaded to storage {}".format(path)) | |
def last_existing_time(since): | |
storage_client = CloudStorageAccount(account_name=storage_account, account_key=storage_key) | |
blob_service = storage_client.create_block_blob_service() | |
last = since or datetime.datetime.utcnow() | |
while not blob_service.exists( | |
container_name, URL_PATH_FORMAT.format(y=last.year, m=last.month, d=last.day, h=last.hour) | |
): | |
last -= datetime.timedelta(hours=1) | |
return last | |
def download_time_period(from_date, to_date): | |
current = from_date | |
last = to_date | |
while current != last: | |
p = URL_PATH_FORMAT.format(y=current.year, m=current.month, d=current.day, h=current.hour) | |
if download_locally(domain=url_domain, path=p): | |
upload_to_azure_storage(p) | |
current += datetime.timedelta(hours=1) | |
def main(): | |
print("current folder is: {}".format(os.getcwd())) | |
now = datetime.datetime.utcnow() | |
last_time = last_existing_time(since=now) | |
download_time_period(last_time, now) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import sys | |
import os | |
from azure.kusto.data.request import KustoConnectionStringBuilder | |
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, DataFormat, BlobDescriptor | |
from azure.storage.common import CloudStorageAccount | |
kusto_url = <> | |
app_id = <> | |
app_key = <> | |
auth_id = <> | |
storage_account_name = <> | |
storage_account_key = <> | |
storage_container = <> | |
client = KustoIngestClient( | |
KustoConnectionStringBuilder.with_aad_application_key_authentication( | |
kusto_url,app_id,app_key,auth_id | |
) | |
) | |
mapping = [ | |
{"column": "Id", "path": "$.id"}, | |
{"column": "Type", "path": "$.type"}, | |
{"column": "Actor", "path": "$.actor"}, | |
{"column": "Repo", "path": "$.repo"}, | |
{"column": "Payload", "path": "$.payload"}, | |
{"column": "Public", "path": "$.public"}, | |
{"column": "CreatedAt", "path": "$.created_at"}, | |
] | |
def ingest(file, size): | |
props = IngestionProperties( | |
database="GitHub", | |
table="GithubEvent", | |
dataFormat=DataFormat.json, | |
mapping=mapping, | |
ingestIfNotExists=[file], | |
ingestByTags=[file], | |
dropByTags=[file[57:67]], | |
) | |
client.ingest_from_blob(BlobDescriptor(file, size), props) | |
print("ingested {}".format(file)) | |
def main(): | |
storage_client = CloudStorageAccount(account_name=storage_account_name, account_key=storage_account_key) | |
blob_service = storage_client.create_block_blob_service() | |
years = range(2016, 2019) | |
months = range(1, 13) | |
days = range(1, 32) | |
hours = range(0, 24) | |
url_path = "{y}-{m:02d}-{d:02d}-{h}.json.gz" | |
for y in years: | |
for m in months: | |
for d in days: | |
for h in hours: | |
p = url_path.format(y=y, m=m, d=d, h=h) | |
if os.path.exists(p + "_done"): | |
print("{} ingested. skipping...".format(p)) | |
continue | |
ingest( | |
"https://{}.blob.core.windows.net/{}/{};{}".format( | |
storage_account_name, storage_container, p, storage_account_key | |
), | |
50 * 1024 * 1024, | |
) | |
with open(p + "_done", "w+") as f: | |
f.write(" ") | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Data set: | |
// xx Rows | |
// Schema | |
GithubEvent | |
| getschema | |
| project ColumnName, ColumnType | |
// XX records | |
GithubEvent | |
| count | |
// Event types and counts | |
GithubEvent | |
| summarize Events=count() by Type | |
| order by Events | |
// Observing several push events | |
GithubEvent | |
| where Type == "PushEvent" | |
| take 10 | |
GithubEvent | |
| where Type == "PushEvent" | |
| take 10 | |
| evaluate bag_unpack(Payload) | |
| getschema | |
| project ColumnName, ColumnType | |
// Top 10 - popular repos (by push events) | |
GithubEvent | |
| where Type == "PushEvent" | |
| summarize PushEvents=count() by RepoName = tostring(Repo.name) | |
| top 10 by PushEvents | |
// Top 10 - popular repos (by watch events) | |
GithubEvent | |
| where Type == "WatchEvent" | |
| summarize PushEvents=count() by RepoName = tostring(Repo.name) | |
| top 10 by PushEvents | |
// Top 10 - popular repos (by fork events) | |
GithubEvent | |
| where Type == "ForkEvent" | |
| summarize PushEvents=count() by RepoName = tostring(Repo.name) | |
| top 10 by PushEvents | |
GithubEvent | |
| where Type in ("ForkEvent", "WatchEvent", "PushEvent") | |
| summarize PushEvents=count() by RepoName = tostring(Repo.name) | |
| top 10 by PushEvents | |
// Top 10 - popular repos (by watch events) | |
let popularRepos= | |
GithubEvent | |
| where Type == "WatchEvent" | |
| summarize WatchEvent=count() by RepoName = tostring(Repo.name) | |
| top 10 by WatchEvent | |
| project RepoName; | |
GithubEvent | |
| extend RepoName = tostring(Repo.name) | |
| where RepoName in (popularRepos) | |
| where Type == "WatchEvent" | |
| summarize count() by RepoName, bin(CreatedAt, 1d) | |
| render timechart | |
// Top 10 - popular repos (by watch events) | |
let watchedRepos = GithubEvent | |
| where Type == "WatchEvent" | |
| summarize WatchEvents=count() by RepoName = tostring(Repo.name); | |
let issuesRepos = GithubEvent | |
| where Type in ("IssueCommentEvent", "IssuesEvent") | |
| summarize IssueEvents=count() by RepoName = tostring(Repo.name); | |
watchedRepos | |
| join | |
( | |
issuesRepos | |
) | |
on RepoName | |
| extend Popularity=(WatchEvents * 0.8) + (IssueEvents*0.2) | |
| top 10 by Popularity | |
// ----------------------------More Queries----------------- | |
// Comparing between two languages TypeScript vs CoffeeScript. | |
// You can spot one language getting more popular over time and the other trending down | |
GithubEvent | |
| where Type == "PullRequestEvent" | |
| where Payload.pull_request.head.repo.language == 'TypeScript' or | |
Payload.pull_request.head.repo.language == 'CoffeeScript' | |
| extend language=tostring(Payload.pull_request.head.repo.language) | |
| summarize count() by language, bin(CreatedAt, 30d) | |
| render timechart with (ysplit = axes) | |
// Lagnuages to learn | |
let languages = | |
GithubEvent | |
| where Type == "PullRequestEvent" | |
| extend language=tostring(Payload.pull_request.head.repo.language); | |
let lastYear = languages | where CreatedAt between(datetime(2017-08-01) .. 30d) | summarize LastYearCount=count() by language; | |
let thisYear = languages | where CreatedAt between(datetime(2018-08-01) .. 30d) | summarize ThisYearCount=count() by language; | |
lastYear | |
| join (thisYear) on language | |
| extend diff = (ThisYearCount - LastYearCount) * 100.0 / LastYearCount | |
| where diff > 50.0 and ThisYearCount > 2000 | |
| top 5 by ThisYearCount | |
| project-away language1 | |
// Top 5 - popular repos in June 2018 (by watch events) | |
// And their weekly trend over the past 2 years | |
let popularRepos= | |
GithubEvent | |
| where CreatedAt between(datetime(2018-06-01)..30d) | |
| where Type == "WatchEvent" | |
| summarize WatchEvent=count() by RepoName = tostring(Repo.name) | |
| top 5 by WatchEvent | |
| project RepoName; | |
GithubEvent | |
| extend RepoName = tostring(Repo.name) | |
| where RepoName in (popularRepos) | |
| where Type == "WatchEvent" | |
| summarize count() by RepoName, startofweek(CreatedAt) | |
| render timechart | |
// Full-text search across all columns for comments (over 3TB data) | |
GithubEvent | |
| where * has 'This is nonsense' | |
| summarize NonsenseUsers=dcount(tostring(Actor.login)), NonSenseComments=count() | |
// Top technology that mentions 'Microsoft', 'Google', 'Amazon', 'Facebook' in any comments (issues, commits or pull-requests) | |
let CommentsWith = (keyword:string) | |
{ | |
GithubEvent | |
| where Payload.comment.body has keyword | |
| summarize Mentions=count() by Repo=tostring(Repo.name), Keyword=keyword | |
| top 1 by Mentions | |
}; | |
union | |
(CommentsWith('Microsoft')), | |
(CommentsWith('Google')), | |
(CommentsWith('Amazon')), | |
(CommentsWith('Facebook')), | |
(CommentsWith('Apple')) | |
| project Keyword, Repo, Mentions | |
| order by Keyword | |
// Look on trending up Microsoft repos based on MoM Watch Events. | |
// The query defines a filter for at least 100 watches (absolute number) and 30% increase in Watch events | |
GithubEvent | |
| where CreatedAt >= ago(180d) | |
| where Type == "WatchEvent" | |
| where (Repo.name startswith 'Azure' or Repo.name startswith 'Microsoft') | |
| summarize Count=count() by CreatedAt=bin(CreatedAt, 30d), Repo=tostring(Repo.name) | |
| order by Repo asc, CreatedAt asc | |
| serialize | |
| extend PrevCount = prev(Count), PrevRepo = prev(Repo) | |
| extend DiffRatio = iif(Repo == PrevRepo, (Count - PrevCount)*100.0 / PrevCount, 0.0) | |
| where DiffRatio > 30.0 and Count > 150 | |
| summarize arg_max(CreatedAt, DiffRatio, PrevCount, Count) by Repo | |
| order by DiffRatio | |
// Look for string patterns 'is the best X ever' and try to find out what are the top 10 subjects. | |
GithubEvent | |
| where Payload.comment.body has 'is the best' | |
| extend whatWasGood = extract(@'best (.+) ever', 1, tostring(Payload.comment.body)) | |
| where isnotempty(whatWasGood) | |
| summarize count() by whatWasGood=substring(whatWasGood, 0, 27) | |
| top 10 by count_ | |
// Full-text search across all data corpus for 'Kusto' stirng and finding top-5 repos that mention it. | |
GithubEvent | |
| where * has 'Kusto' | |
| summarize count() by tostring(Repo.name) | |
| top 5 by count_ | |
// I'll conclude my GitHub events data exploration exercise | |
// with light weight sentiment analysis on these three technologies we examined | |
// I'm using Kusto text analytics capabilities to scan through all events with comment payload | |
// searching for happy and sad emoji :-) and :-( | |
GithubEvent | |
| where Repo.name == "Microsoft/vscode" or Repo.name == "Microsoft/TypeScript" or Repo.name == "Microsoft/CNTK" | |
| where Type in ("CommitCommentEvent", "IssueCommentEvent", "PullReviewRequestCommentEvent") | |
| extend comment = tostring(Payload.comment.body) | |
| extend hasHappyEmoji = (comment matches regex @'[;:]-?\)'), | |
hasSadEmoji = comment matches regex @'[;:]-?\(' | |
| summarize happy=countif(hasHappyEmoji), sad=countif(hasSadEmoji) by RepoName = tostring(Repo.name) | |
| render columnchart | |
// As you can see, while only vscode popularity is trending up, all three, exhibit positive sentiment | |
// Big data can be surprising... Let's try to use it to detect technology hypes. | |
// Here are few keywords: 'Spectre', 'GDPR', 'Blockchain', and 'Quantum'. | |
// All are quite famous as they made to the news past 12 months. | |
// Let's look in all terrabytes of Payload data to check how people on GitHub were affected. | |
let HypeOrNot = (_what:string) | |
{ | |
GithubEvent | |
| where Payload has _what | |
| project Repo = tostring(Repo.name), CreatedAt | |
| summarize Mentions=count() by bin(CreatedAt, 7d), What=_what | |
}; | |
union | |
(HypeOrNot('Spectre')), | |
(HypeOrNot('GDPR')), | |
(HypeOrNot('Blockchain')), | |
(HypeOrNot('Quantum')) | |
| render timechart | |
// Look for string patterns 'the best <SOMETHING>' and try to find out what are the top 10 subjects. | |
// Let's find out _what_ is referecned as 'THE BEST' by people usign GitHub 2years comments history. | |
GithubEvent | |
| where Payload.comment.body has 'the best' | |
| extend TheBest = extract(@'the best (.+?)[.,]', 1, tostring(Payload.comment.body)) | |
| where isnotempty(TheBest) and strlen(TheBest) > 4 | |
| extend TheBest = trim(@'\.', TheBest) | |
| summarize Appearances=count() by TheBest=substring(TheBest, 0, 50) | |
| top 10 by Appearances | |
// Now, we are not just looking for THE BEST. We are going for THE BEST EVER! | |
GithubEvent | |
| where Payload.comment.body has 'the best' | |
| extend TheBestEver = extract(@'the best (.+) ever', 1, tostring(Payload.comment.body)) | |
| where isnotempty(TheBestEver) and strlen(TheBestEver) > 4 | |
| summarize Appearances=count() by TheBestEver=substring(TheBestEver, 0, 25) | |
| top 5 by Appearances | |
// Isn't it niddle in the GitHub stack data? |
Daniel, I came here from your medium article, great read! One question - do you have an updated version of crawl_send_eventhub.py - that works with the latest version of EventHub SDK?
Thanks,
Mitul.
I was unaware eventhub changed their sdk 😄
It shouldn't be that hard to achieve.
I might at some point republish this a an open source package, but no plans for such an idea yet..
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Daniel, I came here from your medium article, great read! One question - do you have an updated version of crawl_send_eventhub.py - that works with the latest version of EventHub SDK?
Thanks,
Mitul.