Skip to content

Instantly share code, notes, and snippets.

@lukebeer
Created August 25, 2015 18:47
Show Gist options
  • Save lukebeer/5a60e7ff07e6936026da to your computer and use it in GitHub Desktop.
Save lukebeer/5a60e7ff07e6936026da to your computer and use it in GitHub Desktop.
Command line source search using searchcode.com API
import asyncio
import aiohttp
import urllib
import json
import tqdm
import math
import sys
import ssl
from urllib import request
from urllib import parse
@asyncio.coroutine
def get(*args, **kwargs):
response = yield from aiohttp.request('GET', *args, **kwargs)
return (yield from response.read_and_close(decode=True))
@asyncio.coroutine
def wait_with_progress(coros):
for f in tqdm.tqdm(asyncio.as_completed(coros), total=len(coros)):
yield from f
@asyncio.coroutine
def do_request(url):
with (yield from sem):
page = yield from get(url, compress=True, verify_ssl=False)
if page['total'] > 0:
print(page['results'])
def chunk_request(query, url, perpage=100, pages=49):
urls = []
total = pages * perpage
context = ssl._create_unverified_context()
r = request.urlopen('%sq=%s&p=1&per_page=100' % (url, query), context=context)
data = json.loads(r.read().decode(r.info().get_param('charset') or 'utf-8'))
if data['total'] < total:
urls.extend(['q=%s&p=%s&per_page=100' % (query, i)
for i in range(1, math.ceil(data['total']/pages))])
print(urls)
return urls
for lang in data['language_filters']:
if lang['count'] > total:
linechunks = math.ceil(lang['count'] / total)
linesteps = math.ceil(10000 / linechunks)
minloc = 0
maxloc = linesteps
for x in range(linechunks):
urls.extend(['q=%s&p=%s&per_page=100&loc=%s&loc2=%s' % (query, i, minloc, maxloc) for i in range(1, pages)])
minloc += linesteps
maxloc += linesteps
else:
urls.extend(['q=%s&p=%s&per_page=100&lan=%s' % (query, i, lang['id'])
for i in range(1, math.ceil(lang['count'] / 49))])
print(urls)
return urls
baseurl = 'https://searchcode.com/api/codesearch_I/?'
sem = asyncio.Semaphore(20)
loop = asyncio.get_event_loop()
urls = []
try:
with open(sys.argv[1], 'r') as fh:
for line in fh:
urls.extend(chunk_request(line.strip(), baseurl))
except:
urls.extend(chunk_request(sys.argv[1], baseurl))
f = asyncio.wait([do_request(baseurl+u) for u in urls])
loop.run_until_complete(f)
# chunk_request('a', baseurl)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment