Skip to content

Instantly share code, notes, and snippets.

@hrichardlee
Last active August 18, 2022 02:32
Show Gist options
  • Save hrichardlee/4be2881a66faaee24f122eeaccf0b2c0 to your computer and use it in GitHub Desktop.
Save hrichardlee/4be2881a66faaee24f122eeaccf0b2c0 to your computer and use it in GitHub Desktop.
Meadowrun-wikipedia-demo
3M
A. O. Smith
Abbott
AbbVie
Abiomed
Accenture
Activision Blizzard
ADM
Adobe
ADP
Advance Auto Parts
AES
Aflac
Agilent Technologies
AIG
Air Products
Akamai
Alaska Air Group
Albemarle
Alexandria
Align
Allegion
Alliant Energy
Allstate
Alphabet
Altria
Amazon
Amcor
AMD
Ameren
American Airlines Group
American Electric Power
American Express
American Tower
American Water
Ameriprise Financial
AmerisourceBergen
Ametek
Amgen
Amphenol
Analog Devices
Ansys
Anthem
Aon
APA Corporation
Apple
Applied Materials
Aptiv
Arista
Assurant
AT&T
Atmos Energy
Autodesk
AutoZone
AvalonBay Communities
Avery Dennison
Baker Hughes
Ball
Bank of America
Bath & Body Works
Baxter
Becton Dickinson
Berkley
Best Buy
Bio-Rad
Bio-Techne
Biogen
BlackRock
BNY Mellon
Boeing
Booking Holdings
BorgWarner
Boston Properties
Boston Scientific
Bristol Myers Squibb
Broadcom
Broadridge
Brown & Brown
C.H. Robinson
Cadence
Caesars Entertainment
Camden
Campbell's
Capital One
Cardinal Health
CarMax
Carnival
Carrier
Catalent
Caterpillar
Cboe
CBRE
CDW
Celanese
Centene
CenterPoint Energy
Ceridian
CF Industries
Charles River
Charles Schwab
Charter Communications
Chevron
Chipotle Mexican Grill
Chubb
Church & Dwight
Cigna
Cincinnati Financial
Cintas
Cisco
Citigroup
Citizens
Citrix
Clorox
CME Group
CMS Energy
Coca-Cola
Cognizant
Colgate-Palmolive
Comcast
Comerica
Conagra Brands
ConocoPhillips
Con Edison
Constellation Brands
Constellation Energy
CooperCompanies
Copart
Corning
Corteva
Costco
Coterra
Crown Castle
CSX
Cummins
CVS Health
D.R. Horton
Danaher
Darden
DaVita
Deere & Co.
Delta Air Lines
Dentsply Sirona
Devon
DexCom
Diamondback
Digital Realty
Discover
Dish
Disney
Dollar General
Dollar Tree
Dominion Energy
Domino's
Dover
Dow
DTE
Duke Energy
Duke Realty
DuPont
DXC Technology
Eastman
Eaton
eBay
Ecolab
Edison International
Edwards Lifesciences
Electronic Arts
Emerson
Enphase
Entergy
EOG Resources
EPAM
Equifax
Equinix
Equity Residential
Essex
Estée Lauder Companies
Etsy
Everest
Evergy
Eversource
Exelon
Expedia Group
Expeditors
Extra Space Storage
ExxonMobil
F5
FactSet
Fastenal
Federal Realty
FedEx
Fifth Third Bank
First Republic
FirstEnergy
FIS
Fiserv
Fleetcor
FMC
Ford
Fortinet
Fortive
Fortune Brands
Fox Corporation
Franklin Templeton
Freeport-McMoRan
Gallagher
Garmin
Gartner
GE
Generac
General Dynamics
General Mills
Genuine Parts
Gilead
Globe Life
Global Payments
GM
Goldman Sachs
Grainger
Halliburton
Hartford
Hasbro
HCA Healthcare
Healthpeak
Henry Schein
Hershey's
Hess
Hewlett Packard Enterprise
Hilton
Hologic
Home Depot
Honeywell
Hormel
Host Hotels & Resorts
Howmet Aerospace
HP
Humana
Huntington Ingalls Industries
Huntington National Bank
IDEX
Idexx Laboratories
Illinois Tool Works
Illumina
Incyte
Ingersoll Rand
Intel
Intercontinental Exchange
IBM
International Paper
Interpublic Group
International Flavors & Fragrances
Intuit
Intuitive Surgical
Invesco
IPG Photonics
IQVIA
Iron Mountain
J.B. Hunt
Jack Henry & Associates
Jacobs
Johnson & Johnson
Johnson Controls
JPMorgan Chase
Juniper Networks
Kellogg's
KeyCorp
Keysight
Kimberly-Clark
Kimco Realty
Kinder Morgan
KLA
Kraft Heinz
Kroger
L3Harris
LabCorp
Lam Research
Lamb Weston
Las Vegas Sands
Leidos
Lennar
Lilly
Lincoln Financial
Linde
Live Nation Entertainment
LKQ Corporation
Lockheed Martin
Loews Corporation
Lowe's
Lumen
LyondellBasell
M&T Bank
Marathon Oil
Marathon Petroleum
MarketAxess
Marriott International
Marsh & McLennan
Martin Marietta
Masco
Mastercard
Match Group
McCormick
McDonald's
McKesson
Medtronic
Merck
Meta
MetLife
Mettler Toledo
MGM Resorts
Microchip
Micron
Microsoft
Mid-America Apartments
Moderna
Mohawk Industries
Molina Healthcare
Molson Coors
Mondelez International
Monolithic Power Systems
Monster Beverage
Moody's
Morgan Stanley
Mosaic
Motorola Solutions
MSCI
Nasdaq
NetApp
Netflix
Newell Brands
Newmont
News Corp
NextEra Energy
Nielsen
Nike
NiSource
Nordson
Norfolk Southern
Northern Trust
Northrop Grumman
NortonLifeLock
Norwegian Cruise Line Holdings
NRG Energy
Nucor
Nvidia
NVR
NXP
O'Reilly Automotive
Occidental Petroleum
Old Dominion
Omnicom Group
Oneok
Oracle
Organon
Otis
Paccar
Packaging Corporation of America
Paramount
Parker
Paychex
Paycom
PayPal
Penn National Gaming
Pentair
PepsiCo
PerkinElmer
Pfizer
Philip Morris International
Phillips 66
Pinnacle West
Pioneer Natural Resources
PNC Financial Services
Pool Corporation
PPG Industries
PPL
Principal
Procter & Gamble
Progressive
Prologis
Prudential
PSEG
PTC
Public Storage
PulteGroup
PVH
Qorvo
Quanta
Qualcomm
Quest Diagnostics
Ralph Lauren
Raymond James
Raytheon Technologies
Realty Income
Regency Centers
Regeneron
Regions
Republic Services
ResMed
Robert Half
Rockwell Automation
Rollins
Roper
Ross
Royal Caribbean Group
S&P Global
Salesforce
SBA Communications
Schlumberger
Seagate
Sealed Air
Sempra
ServiceNow
Sherwin-Williams
Signature Bank
Simon
Skyworks
Smucker
Snap-on
SolarEdge
Southern Company
Southwest Airlines
Stanley Black & Decker
Starbucks
State Street
Steris
Stryker
SVB Financial
Synchrony
Synopsys
Sysco
T-Mobile
T. Rowe Price
Take-Two Interactive
Tapestry
Target
TE Connectivity
Teledyne
Teleflex
Teradyne
Tesla
Texas Instruments
Textron
Thermo Fisher Scientific
TJX Companies
Tractor Supply
Trane Technologies
TransDigm
Travelers
Trimble
Truist
Twitter
Tyler Technologies
Tyson
U.S. Bank
UDR
Ulta Beauty
Under Armour
Union Pacific
United Airlines
United Parcel Service
United Rentals
UnitedHealth Group
Universal Health Services
Valero
Ventas
Verisign
Verisk
Verizon
Vertex
VF Corporation
Viatris
Vici
Visa
Vornado Realty Trust
Vulcan Materials
Wabtec
Walgreens Boots Alliance
Walmart
Warner Bros. Discovery
Waste Management
Waters
WEC Energy Group
Wells Fargo
Welltower
West Pharmaceutical Services
Western Digital
WestRock
Weyerhaeuser
Whirlpool
Williams
Willis Towers Watson
Wynn Resorts
Xcel Energy
Xylem
Yum! Brands
Zebra
Zimmer Biomet
Zions Bancorp
Zoetis
import smart_open
def company_names_regex():
with smart_open.open(
"s3://wikipedia-meadowrun-demo/companies.txt"
) as companies_file:
companies = companies_file.read().splitlines()
return "|".join(companies)
import io
import tarfile
import time
import smart_open
from unzip_wikipedia_articles import iterate_articles_chunk
def convert_articles_chunk_to_tar_gz(articles_offset, num_articles):
"""
Reads the specified articles from the S3 bucket and writes them back to a tar.gz
file where the filenames in the tar archive are the titles and the contents of the
files are the contents of the articles.
"""
t0 = time.perf_counter()
with smart_open.open(
f"s3://wikipedia-meadowrun-demo/extracted-{articles_offset}.tar.gz", "wb"
) as raw_file, tarfile.TarFile(fileobj=raw_file, mode="w") as tar_file:
for title, text in iterate_articles_chunk(
"s3://wikipedia-meadowrun-demo/enwiki-latest-pages-articles-multistream-index.txt.bz2",
"s3://wikipedia-meadowrun-demo/enwiki-latest-pages-articles-multistream.xml.bz2",
articles_offset,
num_articles,
):
if text is not None and not text.startswith("#REDIRECT"):
bs = text.encode("utf-8")
ti = tarfile.TarInfo(title)
ti.size = len(bs)
with io.BytesIO(bs) as text_buffer:
tar_file.addfile(ti, text_buffer)
print(f"Processed {articles_offset}, {num_articles} in {time.perf_counter() - t0}")
import bz2
with bz2.open(
"enwiki-latest-pages-articles-multistream-index.txt.bz2", "rt", encoding="utf-8"
) as index_file:
i = 0
for _ in index_file:
i += 1
print(f"{i:,d} total articles")
[tool.poetry]
name = "meadowrun_hyperscan_demo"
version = "0.1.0"
description = ""
authors = ["Hyunho Richard Lee <hrichardlee@gmail.com>"]
[tool.poetry.dependencies]
python = "^3.9"
meadowrun = "0.1.11"
smart-open = {extras = ["s3"], version = "^6.0.0"}
pyre2 = "^0.3.6"
hyperscan = {version = "^0.3.2", platform = "linux"}
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
import tarfile
import smart_open
def iterate_extract(tar_file):
"""
Yields (article title, article content) for each article in the specified tar file
"""
with smart_open.open(
tar_file, "rb", compression="disable"
) as raw_file, tarfile.open(fileobj=raw_file, mode="r|gz") as tar_file_handle:
for member_file_info in tar_file_handle:
member_file_handle = tar_file_handle.extractfile(member_file_info)
if member_file_handle:
member_file_content = member_file_handle.read()
yield member_file_info.name, member_file_content
import asyncio
import itertools
import time
import meadowrun
from company_names import company_names_regex
from read_articles_extract import iterate_extract
def scan_hyperscan(extract_file, take_first_n, print_matches):
import hyperscan
i = 0
def on_match(match_id, from_index, to_index, flags, context=None):
nonlocal i
if i % 100000 == 0 and print_matches:
article_title, article_content = context
print(
article_title
+ ": "
+ str(article_content[from_index - 50 : to_index + 50])
)
i += 1
db = hyperscan.Database()
patterns = (
# expression, id, flags
(
company_names_regex().encode("utf-8"),
1,
hyperscan.HS_FLAG_CASELESS | hyperscan.HS_FLAG_SOM_LEFTMOST,
),
)
expressions, ids, flags = zip(*patterns)
db.compile(expressions=expressions, ids=ids, elements=len(patterns), flags=flags)
bytes_scanned = 0
t0 = time.perf_counter()
for article_title, article_content in itertools.islice(
iterate_extract(extract_file), take_first_n
):
db.scan(
article_content,
match_event_handler=on_match,
context=(article_title, article_content),
)
bytes_scanned += len(article_content)
time_taken = time.perf_counter() - t0
print(
f"Scanned {bytes_scanned:,d} bytes in {time_taken:.2f} seconds "
f"({bytes_scanned / time_taken:,.0f} B/s)"
)
async def scan_hyperscan_ec2():
return await meadowrun.run_function(
lambda: scan_hyperscan(
"s3://wikipedia-meadowrun-demo/extracted-200000.tar.gz", 10000, True
),
meadowrun.AllocCloudInstance("EC2"),
meadowrun.Resources(
logical_cpu=1,
memory_gb=2,
max_eviction_rate=80,
),
await meadowrun.Deployment.mirror_local(),
)
if __name__ == "__main__":
asyncio.run(scan_hyperscan_ec2())
import asyncio
import sys
import meadowrun
from scan_wikipedia_hyperscan import scan_hyperscan
async def scan_hyperscan_ec2_full():
total_articles = 22_114_834
chunk_size = 100_000
await meadowrun.run_map(
lambda i: scan_hyperscan(
f"s3://wikipedia-meadowrun-demo/extracted-{i}.tar.gz", sys.maxsize, False
),
[i * chunk_size for i in range(total_articles // chunk_size + 1)]
+ [i * chunk_size for i in range(total_articles // chunk_size + 1)],
meadowrun.AllocCloudInstance("EC2"),
meadowrun.Resources(
logical_cpu=1,
memory_gb=2,
max_eviction_rate=80,
),
await meadowrun.Deployment.mirror_local(),
num_concurrent_tasks=64,
)
if __name__ == "__main__":
asyncio.run(scan_hyperscan_ec2_full())
import asyncio
import itertools
import re
# import re2 as re
import time
import meadowrun
from company_names import company_names_regex
from read_articles_extract import iterate_extract
def scan_re(extract_file):
compiled_re = re.compile(company_names_regex(), re.IGNORECASE)
t0 = time.perf_counter()
bytes_scanned = 0
i = 0
for article_title, article_content in itertools.islice(
iterate_extract(extract_file), 100
):
for match in compiled_re.finditer(article_content.decode("utf-8")):
# print out a little context around a sample of matches
if i % 100000 == 0:
print(
f"{article_title}: "
+ " ".join(
match.string[match.start() - 50 : match.end() + 50].split()
)
)
i += 1
bytes_scanned += len(article_content)
time_taken = time.perf_counter() - t0
print(
f"Scanned {bytes_scanned:,d} bytes in {time_taken:.2f} seconds "
f"({bytes_scanned / time_taken:,.0f} B/s)"
)
async def scan_re_ec2():
return await meadowrun.run_function(
lambda: scan_re("s3://wikipedia-meadowrun-demo/extracted-200000.tar.gz"),
meadowrun.AllocCloudInstance("EC2"),
meadowrun.Resources(
logical_cpu=1,
memory_gb=2,
max_eviction_rate=80,
),
await meadowrun.Deployment.mirror_local(),
)
if __name__ == "__main__":
asyncio.run(scan_re_ec2())
import asyncio
import meadowrun
from convert_to_tar import convert_articles_chunk_to_tar_gz
async def unzip_all_articles():
total_articles = 22_114_834
chunk_size = 100_000
await meadowrun.run_map(
lambda i: convert_articles_chunk_to_tar_gz(i, chunk_size),
[i * chunk_size for i in range(total_articles // chunk_size + 1)],
meadowrun.AllocCloudInstance("EC2"),
meadowrun.Resources(
logical_cpu=1,
memory_gb=2,
max_eviction_rate=80,
),
await meadowrun.Deployment.mirror_local(),
num_concurrent_tasks=64,
)
if __name__ == "__main__":
asyncio.run(unzip_all_articles())
import time
from unzip_wikipedia_articles import iterate_articles_chunk
n = 1000
t0 = time.perf_counter()
bytes_read = 0
for title, text in iterate_articles_chunk(
"enwiki-latest-pages-articles-multistream-index.txt.bz2",
"enwiki-latest-pages-articles-multistream.xml.bz2",
0,
n,
):
bytes_read += len(title) + len(text)
print(
f"Read ~{bytes_read:,d} bytes from {n} articles in {time.perf_counter() - t0:.2f}s"
)
import bz2
import xml.etree.ElementTree
import smart_open
def _read_articles_in_section(data_file_section, article_ids):
"""
Reads articles from the given data file section, only returns articles that are in
articles_ids, and keeps trying to read until all article_ids have been read.
Assumes that data_file_section is a file-like object that contains content like:
<page>
<id>1234</id>
<title>Title Of Article</title>
<text>Text of article</text>
<page>
<page>
...
</page>
...
"""
while article_ids:
# we're only interested in seeing complete tags (i.e. the "end" of a tag)
parser = xml.etree.ElementTree.XMLPullParser(["end"])
current_article_title = None
current_article_should_be_returned = False
is_first_id = True
for line in data_file_section:
parser.feed(line)
done = False
for event_type, xml_node in parser.read_events():
if xml_node.tag == "id":
article_id = xml_node.text
if is_first_id:
# this is pretty hacky, we're just assuming that the page's id
# element comes before any other sub-ids
current_article_should_be_returned = article_id in article_ids
article_ids.discard(article_id)
is_first_id = False
elif xml_node.tag == "title":
current_article_title = xml_node.text
elif xml_node.tag == "text":
if current_article_should_be_returned:
# assumes there is an id and title tag before this tag, and
# those are the correct id/title tags
yield current_article_title, xml_node.text
elif xml_node.tag == "page":
done = True
if done:
# if we're done with the page element, we need to start a new XML parser
break
def iterate_articles_chunk(index_file, multistream_file, articles_offset, num_articles):
"""
Iterates through the articles in the wikipedia dump. Skips the first articles_offset
first articles, and iterates through num_articles
"""
with smart_open.open(
index_file, "rt", encoding="utf-8"
) as index_file, smart_open.open(
multistream_file, "rb", compression="disable"
) as compressed_data_file:
# Our strategy will be to group all the requested articles in each section, and
# then call _read_articles_in_section once for each section. These two variables
# will keep track of the "current" section
current_section_offset = None
current_article_ids = None
for i, line in enumerate(index_file):
# skip articles_offset lines
if i < articles_offset:
continue
# and stop when we get to articles_offset + num_articles lines
if i >= articles_offset + num_articles:
break
# parse the line
offset_string, sep, remainder = line[:-1].partition(":")
offset = int(offset_string)
article_id, sep, article_title = remainder.partition(":")
if current_section_offset == offset:
# if we're "in the same section", just add our article_id to the set
current_article_ids.add(article_id)
else:
# if we're in a new section, then call _read_articles_in_section on the
# previous section
if current_article_ids:
compressed_data_file.seek(current_section_offset)
with bz2.open(
compressed_data_file, "rt", encoding="utf-8"
) as data_file_section:
yield from _read_articles_in_section(
data_file_section, current_article_ids
)
# start keeping track of a new section
current_section_offset = offset
current_article_ids = {article_id}
# finally, if we have an unprocessed section, read that
if current_article_ids:
compressed_data_file.seek(current_section_offset)
with bz2.open(
compressed_data_file, "rt", encoding="utf-8"
) as data_file_section:
yield from _read_articles_in_section(
data_file_section, current_article_ids
)
aws s3 mb s3://wikipedia-meadowrun-demo
aws s3 cp enwiki-latest-pages-articles-multistream-index.txt.bz2 s3://wikipedia-meadowrun-demo
aws s3 cp enwiki-latest-pages-articles-multistream.xml.bz2 s3://wikipedia-meadowrun-demo
poetry run meadowrun-manage-ec2 grant-permission-to-s3-bucket wikipedia-meadowrun-demo
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment