Skip to content

Instantly share code, notes, and snippets.

@bzimor
Last active March 22, 2017 10:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bzimor/3f89ed3073be14e5483ccc8710b2cf58 to your computer and use it in GitHub Desktop.
Save bzimor/3f89ed3073be14e5483ccc8710b2cf58 to your computer and use it in GitHub Desktop.
This script crawles and gets informations of newly added .uz domain names from cctld.uz and saves data to a single SQLite db file
import urllib.request
import ssl
from tqdm import *
from os.path import abspath, isfile
from os import listdir, remove
from bs4 import BeautifulSoup
from sqlalchemy import create_engine, Table, Column, Integer, Float, String, ForeignKey, Boolean, desc, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime, date, timedelta
import time
import shutil
import requests
### Global variables ###
proxyAuth = {} # Proxy settings, if needed
cdir = abspath('.').replace('\\', '/')
archpath = cdir
recheckF = cdir + "/sitelist_recheck.txt"
newF = cdir + "/sitelist_new.txt"
getnewurls="http://cctld.uz/rss/"
### Connection setting ###
context = ssl._create_unverified_context()
proxy=urllib.request.ProxyHandler(proxyAuth)
auth = urllib.request.HTTPBasicAuthHandler()
opener = urllib.request.build_opener(proxy, auth, urllib.request.HTTPHandler)
opener.addheaders = [('User-Agent', 'Mozilla/5.0')]
urllib.request.install_opener(opener)
### Preparing database ###
engine = create_engine('sqlite:///uzdomains.db')
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base()
class Domains(Base):
__tablename__ = 'domains'
name = Column(String, primary_key=True)
lenth = Column(Integer, nullable=False)
infodate = Column(Integer, nullable=False)
dfrom = Column(Integer)
dto = Column(Integer)
sitestatus = Column(Integer)
domainexists = Column(Boolean, nullable=False)
class Domaininfo(Base):
__tablename__ = 'domaininfo'
id = Column(Integer, primary_key=True)
domain_id = Column(None, ForeignKey('domains.name'))
dfrom = Column(String, nullable=False)
dto = Column(String, nullable=False)
registrator = Column(None, ForeignKey('registrators.id'))
type = Column(String)
organization = Column(String)
status = Column(String)
nsserver = Column(String)
infodate = Column(String, nullable=False)
class Domainstatus(Base):
__tablename__ = 'domainstatus'
id = Column(Integer, primary_key=True)
domain_id = Column(None, ForeignKey('domains.name'))
code = Column(String)
sitetitle = Column(String)
redirecturl = Column(String)
infodate = Column(Integer, nullable=False)
class Registrators(Base):
__tablename__ = 'registrators'
id = Column(Integer, primary_key=True)
registratorname = Column(String)
class Stats(Base):
__tablename__ = 'stats'
id = Column(Integer, primary_key=True)
infodate = Column(Integer)
addeddomains = Column(Integer)
recheckeddomains = Column(Integer)
deleteddomains = Column(Integer)
domainstatuses = Column(Integer)
totalindb = Column(Integer)
totalincctld = Column(Integer)
discoverability = Column(Float)
Base.metadata.create_all(engine) #Creates tables above
def check_dbarchive():
#backup database
dates = []
date = dateconv(todayint, '%Y%m%d', r=True)
for db in listdir(archpath):
if db.endswith('db'):
dates.append(int(db[10:18]))
if dates:
maxdate = dateconv(max(dates), '%Y%m%d')
if todayint > maxdate + 6*24*60*60:
newarch = 'uzdomains-' + date + '.db'
shutil.copy2('uzdomains.db', newarch)
print('Database was archived')
else:
newarch = 'uzdomains-' + date + '.db'
shutil.copy2('uzdomains.db', newarch)
print('Database was archived')
def get_rechecks():
if isfile(recheckF):
remove(recheckF)
recheckfile=open(recheckF, "a", encoding='utf-8')
rechecklist = []
n = todayint - 30*24*60*60
for domain, in session.query(Domains.name).filter(and_(Domains.dto<n, Domains.domainexists==1)):
rechecklist.append(domain)
session.commit()
if rechecklist:
for item in rechecklist:
s = item + '\n'
recheckfile.write(s)
recheckfile.close()
print('Recheck filelist was created')
def geturls():
getlog = '%s: Getting new sites...' %(time.strftime('%d.%m.%Y %X'))
print(getlog)
erchk = True
n=0
try:
try:
page=urllib.request.urlopen(getnewurls)
except:
page=urllib.request.urlopen(getnewurls, context=context)
except Exception as err:
errlog = '%s: Error occured while getting new sites: Code %s' %(time.strftime('%d.%m.%Y %X'), err.code)
print(errlog)
erchk = False
inputraw = input('Would you like to try again? :y/n')
if inputraw in ('Y', 'y', 'yes'):
geturls()
if erchk:
sss=page.read()
soup = BeautifulSoup(sss, 'lxml')
links = soup.find_all('a')
newsites=open(newF, "a", encoding='utf-8')
for tag in links:
link = tag.get('href',None)
link = link.replace(' ', '')
if link is not None:
if 'http' in link:
if '.uz' in link:
if not 'cctld' in link:
st = link[7:-3]+'\n'
newsites.write(st.lower())
n+=1
newsites.close()
gotlog = '%s: %1.0f sites were found' %(time.strftime('%d.%m.%Y %X'), n)
print(gotlog)
def checknewurls():
new=open(newF).read().splitlines()
recheck = open(recheckF).read().splitlines()
domainlist = []
for url in new:
if not url.replace('-', '').isalnum():
print(url)
new.remove(url)
for domain, in session.query(Domains.name).filter(Domains.domainexists==1):
domainlist.append(domain)
lst = list(set(new).difference(set(domainlist)))
lst = list(set(lst + recheck))
rlstlen = len(recheck)
if lst:
dellist = []
for domain, in session.query(Domains.name).filter(Domains.domainexists==0):
dellist.append(domain)
domainlist = domainlist + dellist
geturlinfo(lst, domainlist, rlstlen, dellist)
else:
print('There were not urls to check!')
if isfile(recheckF):
remove(recheckF)
def updateregs():
reglist = dict()
for regcode, regnm in session.query(Registrators.id, Registrators.registratorname).order_by(Registrators.id):
reglist[str(regcode)] = regnm
return reglist
def domaininfolist():
domaindic = dict()
for name, dateto in session.query(Domaininfo.domain_id,
Domaininfo.dto).order_by(Domaininfo.domain_id):
domaindic[str(name)] = dateto
return domaindic
def geturlinfo(slist, domainlist, rlstlen, dellist):
reglist = updateregs()
sitedic = domaininfolist()
ee=True
listlen = len(slist)
btime=time.time()
startlog = '%s: Crawling starts for %1.0f sites' %(time.strftime('%d.%m.%Y %X'), listlen)
print(startlog)
k=0
b=0
pr=tqdm(k, total=listlen)
while k<listlen:
try:
getsite="http://cctld.uz/whois/?domain=%s&zone=uz" %(slist[k])
try:
page=urllib.request.urlopen(getsite)
except:
page=urllib.request.urlopen(getsite, context=context)
if ee:
pass
else:
unblocklog = '\n%s: Server unblocked access the access, continuing...' %(time.strftime('%d.%m.%Y %X'))
print(unblocklog)
ee=True
except Exception as e:
errorlog = '\n%s: Error occured in %s: Code %s' %(time.strftime('%d.%m.%Y %X'), slist[k], e.code)
print(errorlog)
if e.code == 400:
e400log = '\n%s: Bad site name %s, trying next site...' %(time.strftime('%d.%m.%Y %X'), slist[k])
print(e400log)
k+=1
continue
elif e.code == 403:
e403log = '\n%s: Server blocked the access' %(time.strftime('%d.%m.%Y %X'))
print(e403log)
ee=False
time.sleep(10*60)
elif e.code == 407:
e407log = '\n%s: Proxy Authentication Required' %(time.strftime('%d.%m.%Y %X'))
print(e407log)
break
else:
eulog = '\n%s: Unknown error, check again' %(time.strftime('%d.%m.%Y %X'))
print(eulog)
break
sss=page.read()
b+=sss.__sizeof__()
text=sss.decode('utf8')
if text.find('Дата создания')==-1:
if slist[k] in domainlist:
olddomain = session.query(Domains).filter_by(name=slist[k]).first()
olddomain.domainexists = 0
olddomain.infodate = todayint
session.commit()
else:
num1=text.find('Дата создания')+57
num2=text.find('Активен до')+54
num3=text.find('/reg/reginfo/?id=')+17
sym1=text.find('"', num3)
sym11=text.find('>', sym1)+1
sym12=text.find('<', sym1)
num4=text.find('Тип:')+47
num5=text.find('Организация:')+55
sym2=text.find('&n', num5)
num6=text.find('Статус:')+50
sym3=text.find('&n', num6)
num7=text.find('Первый NS')+60
sym4=text.find('&n', num7)
date_from=text[num1:num1+10]
date_to=text[num2:num2+10]
date_to=date_to.replace('-&nbsp;</t','31.12.2099')
regs=text[num3:sym1]
regname=text[sym11:sym12]
typeper=text[num4:num4+3]
organ=text[num5:sym2]
spec=u'\u049b'
spec2=u'\u04b3'
spec3=u'\u0493'
organ=organ.replace(spec, 'к').replace(spec2, 'х').replace('&quot;', '"').replace(spec3, 'г')
wstatus=text[num6:sym3]
wnsserver=text[num7:sym4]
fromdate = dateconv(date_from, '%d.%m.%Y')
todate = dateconv(date_to, '%d.%m.%Y')
if slist[k] in domainlist:
olddomain = session.query(Domains).filter_by(name=slist[k]).first()
if date_to != sitedic[slist[k]]:
session.add(Domaininfo(domain_id=slist[k], infodate=todayd, dfrom=date_from, dto=date_to,
registrator=regs, type=typeper, organization=organ, status=wstatus, nsserver=wnsserver))
olddomain.dfrom = fromdate
olddomain.dto = todate
olddomain.domainexists = 1
else:
session.add(Domains(name=slist[k], lenth=len(slist[k]), domainexists=1,
dfrom=fromdate, dto=todate, infodate=todayint))
session.add(Domaininfo(domain_id=slist[k], infodate=todayd, dfrom=date_from, dto=date_to,
registrator=regs, type=typeper, organization=organ, status=wstatus, nsserver=wnsserver))
if regs not in list(reglist.keys()):
print('New registrator! %s' %(regname))
session.add(Registrators(id=regs, registratorname=regname))
print('Registrator %s was added' %(regname))
reglist = updateregs()
elif regname not in list(reglist.values()):
oldreg = session.query(Registrators).filter_by(id=regs).first()
oldreg.registratorname = regname
print('Registrator %s was changed' %(regname))
reglist = updateregs()
session.commit()
time.sleep(1)
pr.update()
k+=1
endlog = '%s: Crawling finished.' %(time.strftime('%d.%m.%Y %X'))
etime=time.time()
stime=etime-btime
mb=b/1048576
print ('It spent %1.0f seconds' %(stime))
print ('It spent %1.2f megabytes' %(mb))
sp=stime/(k+1)
print ('Overall %1.2f seconds per site' %(sp))
print(endlog)
if isfile(recheckF):
remove(recheckF)
def getsitecode(sitename):
code=''
try:
resp = requests.head(sitename, proxies=proxyAuth, timeout=3).status_code
code=resp
except requests.exceptions.ProxyError:
code = '511'
except requests.exceptions.Timeout:
code = '522'
except requests.exceptions.RequestException:
code = '520'
return code
def gettitle(site):
weburl="http://" + site + ".uz"
status=str(getsitecode(weburl))
tit = ""
url = ""
if status.isnumeric():
if status == '200':
try:
item = urllib.request.urlopen(weburl)
tit2 = BeautifulSoup(item, "lxml")
tit = tit2.title.string
except urllib.error.URLError:
tit = 'url error!'
except urllib.error.HTTPError:
tit = 'http error!'
except AttributeError:
tit = 'Attribute error'
except:
tit = 'Unknown error'
elif status == '301' or status == '302' or status == '303':
try:
item = urllib.request.urlopen(weburl)
tit2 = BeautifulSoup(item, "lxml")
tit = tit2.title.string
url = item.geturl()
except urllib.error.URLError:
url = 'url error!'
except:
url = 'Unknown error!'
else:
tit = 'not active!'
ttl=' '.join(str(tit).split())
ttl=ttl.lstrip("-")
if '<' in ttl:
ttl = ''
ttl.replace(';', '').replace('\\', '')
string = site, status, str(url), ttl
return string
def writetitles():
j = 0
print('%s: Getting title starts...' %(time.strftime('%d.%m.%Y %X')))
domainlist = []
for domains, in session.query(Domains.name).filter_by(sitestatus=None).order_by(desc(Domains.infodate))[:301]:
domainlist.append(domains)
k = len(domainlist)
pr=tqdm(j, total=k)
for url in domainlist:
row = gettitle(url)
if row:
session.add(Domainstatus(domain_id=row[0], code = row[1], sitetitle = row[3],
redirecturl = row[2], infodate=todayd))
olddomain = session.query(Domains).filter_by(name=row[0]).first()
olddomain.sitestatus = int(row[1])
else:
continue
session.commit()
j += 1
pr.update()
print('Getting titles done for %s domains' %(str(k)))
def delduplicates():
for domain, in session.query(Domains.name).order_by(Domains.name):
doubles = []
for item in session.query(Domaininfo).filter_by(domain_id=domain):
doubles.append(item.dto)
if len(doubles)>1:
for i in set(doubles):
if doubles.count(i)>1:
print(item.domain_id)
session.delete(item)
session.commit()
def dateconv(date, dformat, r=False):
if not r:
newdate = int(datetime.strptime(str(date), dformat).timestamp())
else:
newdate = datetime.fromtimestamp(int(date)).strftime(dformat)
return newdate
def updatestats():
a = session.query(Domains).filter(and_(Domains.domainexists==1, Domains.infodate==todayint)).count()
b = session.query(Domaininfo).filter_by(infodate=todayd).count()
c = b - a
deld = session.query(Domains).filter(and_(Domains.domainexists==0, Domains.infodate==todayint)).count()
totaldb = session.query(Domains).filter_by(domainexists=1).count()
s = session.query(Domainstatus).filter(Domainstatus.infodate==todayd).count()
try:
page=urllib.request.urlopen('https://cctld.uz/')
sss=page.read()
text=sss.decode('utf8')
num1 = text.find('activdomain"')+13
num2 = text.find('<', num1)
totaluz = int(text[num1:num2].replace(' ', ''))
except:
totaluz = 31000
d = int(totaldb/totaluz * 10000)/100
print(a, b, c, totaldb, totaluz, d)
oldstat = session.query(Stats).filter_by(infodate=todayd).first()
if not oldstat:
session.add(Stats(recheckeddomains=c, addeddomains=a, deleteddomains=0, domainstatuses=s,
totalindb=totaldb, totalincctld=totaluz, discoverability=d, infodate=todayd))
else:
oldstat.recheckeddomains=c
oldstat.addeddomains=a
oldstat.deleteddomains=deld
oldstat.domainstatuses=s
oldstat.totalindb=totaldb
oldstat.totalincctld=totaluz
oldstat.discoverability=d
session.commit()
def starter():
check_dbarchive()
get_rechecks()
geturls()
checknewurls()
writetitles()
remove(newF)
#delduplicates()
updatestats()
inputraw = input('Thats all!')
if len(inputraw)>0:
time.sleep(3)
todayd = datetime.now().strftime('%d.%m.%Y')
todayint = dateconv(todayd, '%d.%m.%Y')
starter()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment