Skip to content

Instantly share code, notes, and snippets.

@lachesis
Last active February 6, 2024 20:35
Show Gist options
  • Save lachesis/14c42da196e34f01ffd188f8b7ffd4e1 to your computer and use it in GitHub Desktop.
Save lachesis/14c42da196e34f01ffd188f8b7ffd4e1 to your computer and use it in GitHub Desktop.
BulkVS bulk number download tool
#!/usr/bin/env python
# downloads a list of BulkVS phone numbers and writes them out as a CSV to stdout
# developed on python3.11, but should work in any modern python3
# before running, install deps:
# virtualenv venv
# source ./venv/bin/activate
# pip install --upgrade aiohttp aiohttp-client-cache
# supply env vars with your BULKVS_EMAIL and BULKVS_APIKEY
# delete "cache.sqlite" to clear the HTTP cache, otherwise it's indefinite
# this takes hours to run, downloads 500MB to 30GB, and makes 30k+ network requests to BulkVS
# run it sparingly!
# License: MIT
# Copyright (c) 2024 Eric Swanson
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import time
import logging
import os
import re
import sys
import asyncio
import aiohttp
AUTH = aiohttp.BasicAuth(os.getenv('BULKVS_EMAIL'), os.getenv('BULKVS_APIKEY'))
assert AUTH[0], "Supply your BulkVS account email in BULKVS_EMAIL env var"
assert AUTH[1], "Supply your BulkVS api key in BULKVS_APIKEY env var"
CONCURRENCY = 5 # max concurrent NPAs processing at a time
MAX_LIMIT = 5000 # this is max they will actually return
N_DIGITS = '23456789' # valid first digits of NPA or NXX
# used to track bandwidth and network calls into BulKVS
call_count = 0
bytes_downloaded = 0
# cache library is optional so make code that works without it too
try:
import aiohttp_client_cache
except ImportError:
print("Warning: unable to import aiohttp_client_cache", file=sys.stderr)
def make_session():
return aiohttp.ClientSession()
else:
cache = aiohttp_client_cache.SQLiteBackend(
cache_name='cache',
allowable_codes=[200,400,404],
expire_after=-1,
autoclose=False,
)
def make_session():
return aiohttp_client_cache.CachedSession(cache=cache)
def deduplicate_list(l, key=lambda x:x): # preserves order
seen = set()
out = []
for x in l:
if key(x) in seen:
continue
out.append(x)
seen.add(key(x))
return out
# ask BulkVS for numbers in an NPA/NXX
async def query(npa, nxx=None):
global call_count, bytes_downloaded
call_count += 1
npa = int(npa)
if nxx:
nxx = int(nxx)
assert npa >= 200 and npa < 1000, "NPA invalid"
assert nxx is None or (nxx >= 200 and nxx < 1000), "NXX invalid"
params = {
'Limit': MAX_LIMIT,
'Npa': npa,
}
if nxx:
params['Nxx'] = nxx
url = 'https://portal.bulkvs.com/api/v1.0/orderTn'
async with make_session() as session:
try:
async with session.get(
url,
auth=AUTH,
params=params,
timeout=31,
) as resp:
if resp.status == 404:
return [] # empty list for 404
resp.raise_for_status()
js = await resp.json()
bytes_downloaded += resp.content_length
except aiohttp.ClientError as e:
raise
for x in js:
# TF numbers are given as 18XXNXXXXXX for some reason
# so remove leading 1
x['TN'] = x['TN'].lstrip('1')
# weird bug, they don't actually filter to NPA, so do it for them
# note, they also don't filter to NXX but we take advantage of that later
js = [x for x in js if x['TN'].startswith(str(npa))]
return js
# find the area codes and/or exchanges with a given string prefix
async def find_npas(prefix):
# note: prefix is a string, may have any number of digits between 1 and 6 inclusive
global call_count, bytes_downloaded
call_count += 1
url = 'https://portal.bulkvs.com/tninv/'
params = {'q':prefix}
async with make_session() as session:
try:
async with session.get(
url,
auth=AUTH,
params=params,
timeout=91,
) as resp:
resp.raise_for_status()
js = await resp.json()
bytes_downloaded += resp.content_length
except aiohttp.ClientError as e:
raise
area_codes = {}
exchanges = {}
# "js" will be list of some objects, each of which has two subobjects, header and data
# header has title, num, and limit
# if num >= limit, we missed some data and need a tighter prefix search
# title will be Area Code or Exchanges
# data is list of more objects, each of which has properties:
# primary: the NPA for Area Code OR NPANXX for Exchanges
# secondary: string like "1043 Numbers available"
fae = faac = None # dumb var names sorry
for obj in js:
if not obj['header']: continue
title = obj['header']['title']
data = obj['data']
if title == 'Area Code':
faac = obj['header']['num'] < obj['header']['limit']
for item in data:
area_codes[item['primary']] = int(item['secondary'].split(' ')[0])
elif title == 'Exchanges':
fae = obj['header']['num'] < obj['header']['limit']
for item in data:
exchanges[item['primary']] = int(item['secondary'].split(' ')[0])
if not faac and area_codes:
print("Warning: missed some area codes with prefix", prefix, file=sys.stderr)
if not fae and exchanges:
print("Warning: missed some exchanges with prefix", prefix, file=sys.stderr)
return dict(area_codes=area_codes, exchanges=exchanges, found_all_area_codes=faac, found_all_exchanges=fae)
# handle a given NPA - this is our unit of parallelism
async def _process_npa(npa, expected):
our_numbers = list() # list of JSON objects
numbers_set = set() # set of just the telephone number strings
num = expected # also dumb var name, oh well
# number of available telephone numbers
print("Scraping NPA", npa, "with", num, "expected telephone numbers", file=sys.stderr)
# if number of available numbers is < MAX_LIMIT, we can query in one shot
res = await query(npa=npa)
our_numbers.extend(res)
numbers_set.update(set(x['TN'] for x in res))
# XXX seems like expections don't match for some NPAs
# XXX tried hitting this code path every time but it didn't help
if num >= MAX_LIMIT:
# otherwise we must query individual exchanges in that area code, so find them all
exchanges = {}
res = await find_npas(npa)
exchanges.update(res['exchanges'])
# possibly an area code contains more exchanges than we can actually get in one query
# if that happens, we'll try a one-digit tighter prefix
# i guess theoretically this could happen even with NPA-N prefix but limit is 250 so it won't
if not res['found_all_exchanges']:
for digit in N_DIGITS:
res = await find_npas(npa + digit)
exchanges.update(res['exchanges'])
# strip the NPA from the front of the exchange
assert all(x.startswith(npa) for x in exchanges)
exchanges = {x[3:]: y for x,y in exchanges.items()}
# now query each npa / nxx to get numbers
for nxx in sorted(list(exchanges)):
prefix = str(npa) + str(nxx)
nxx_num = exchanges[nxx]
# set comprehension, find all numbers that we know already and are in this NPA/NXX
known_numbers = {x for x in numbers_set if x.startswith(prefix)}
# in theory one NXX could have 10,000 numbers and then we'd be hooped as max limit is 5000
# for now just throw a warning, not sure what else to do
if nxx_num >= MAX_LIMIT:
print("Warning: expecting", nxx_num, "numbers for", npa, nxx, file=sys.stderr)
# hey we already found the expected number or more, skip scraping this NXX explicitly
if len(known_numbers) >= nxx_num:
print("Scraped NPA", npa, "exchange", nxx, "with", nxx_num,
"expected numbers, already know", len(known_numbers), file=sys.stderr)
continue
# scrape the NPA/NXX pair, which can return numbers from other NXXes
# don't filter NXX on our side, see if we can resolve more than one at a time
res = await query(npa=npa, nxx=nxx)
our_numbers.extend(res)
numbers_set.update(set(x['TN'] for x in res))
known_numbers = {x for x in numbers_set if x.startswith(prefix)}
print("Scraped NPA", npa, "exchange", nxx, "with", nxx_num,
"expected numbers, got", len(known_numbers), file=sys.stderr)
#print(prefix, nxx_num, len(known_numbers), sep=",")
# uniqify the numbers, as we might get multiple copies when we scrape multiple NXXes
our_numbers = deduplicate_list(our_numbers, key=lambda x: x['TN'])
print("Finished NPA", npa, "with", len(our_numbers), "numbers of expected", num, file=sys.stderr)
return our_numbers
async def process_npa(semaphore, npa, expected):
# manage concurrency limit right here with this semaphore
async with semaphore:
try:
return await _process_npa(npa, expected)
except Exception as exc:
print("EXCEPTION: Something went wrong with NPA", npa, repr(exc), file=sys.stderr)
num_completed = 0
num_npas = 0
def handle_job_results(results):
global num_completed
try:
num_completed += 1
npa = None
if results:
npa = results[0]['TN'][:3]
print("*** Completed NPA {} #{:3d} of {}".format(npa, num_completed, num_npas), file=sys.stderr)
for rec in results:
try:
print(",".join([
rec['TN'],
rec['Tier'],
rec['Rate Center'],
rec['State'],
rec['Per Minute Rate'] or "",
rec['Mrc'] or "",
rec['Nrc'] or "",
]))
except Exception:
print("EXCEPTION: Skipping record", rec, file=sys.stderr)
except Exception as exc:
print("EXCEPTION: Something went wrong with results!", repr(exc), file=sys.stderr)
async def main():
logging.basicConfig(level=logging.WARNING) # we don't really use this... oh well
global num_npas
# find all area codes and the number of available numbers in that code
area_codes = {}
for digit in N_DIGITS:
res = await find_npas(digit)
area_codes.update(res['area_codes'])
total_expected_numbers = sum(area_codes.values())
print("We expect to find", total_expected_numbers, "total telephone numbers in",
len(area_codes), "distinct NPAs", file=sys.stderr)
num_npas = len(area_codes)
todo = sorted(list(area_codes.items()))
if CONCURRENCY > 1:
# the faster path
sem = asyncio.Semaphore(CONCURRENCY)
jobs = [
process_npa(sem, npa, expected)
for npa, expected in todo
]
num_completed = 0
print("number,tier,rate center,state,per minute rate,mrc,nrc")
for job in asyncio.as_completed(jobs):
handle_job_results(await job)
print("*** DOWNLOADED {} bytes with {} requests so far".format(bytes_downloaded, call_count), file=sys.stderr)
else:
# the more easily debuggable path for development
for npa, expected in todo:
try:
results = await _process_npa(npa, expected)
handle_job_results(results)
print("*** DOWNLOADED {} bytes with {} requests so far".format(bytes_downloaded, call_count), file=sys.stderr)
except Exception as exc:
print("EXCEPTION: Something went wrong with one job at a time", repr(exc), file=sys.stderr)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment