Skip to content

Instantly share code, notes, and snippets.

@TkTech
Created July 15, 2013 14:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TkTech/6000549 to your computer and use it in GitHub Desktop.
Save TkTech/6000549 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""clean_db.py
Helps to immunize registration spam from a MediaWiki installation.
Usage:
clean_db.py scan <dbstring> [options]
clean_db.py bans <dbstring>
Options:
-h, --help Show this message.
--multiple-accounts Only consider IPs that have signed up
for multiple accounts.
--days-ago=<days> Only consider IPs that are at least
<days> old.
--ban Bans matching IPs.
"""
import sys
import datetime
from collections import defaultdict
from docopt import docopt
from sqlalchemy import *
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import (
declarative_base,
DeferredReflection
)
from sqlalchemy.orm.exc import MultipleResultsFound
to_mw_ts = lambda dt: dt.strftime('%Y%m%d%H%M%S')
from_mw_ts = lambda ts: datetime.datetime.strptime(ts, '%Y%m%d%H%M%S')
def ip_to_hex(ip):
return ''.join(
hex(int(x))[2:].zfill(2)
for x in ip.split('.')
)
Base = declarative_base(cls=DeferredReflection)
class User(Base):
__tablename__ = 'wiki_user'
user_id = Column('user_id', Integer, primary_key=True)
def user_groups(self, session):
return session.query(UserGroups.ug_group).filter(
UserGroups.ug_user == self.user_id
).all()
@property
def registered(self):
return from_mw_ts(self.user_registration)
def ip(self, session):
try:
return session.query(RecentChanges.rc_ip).filter(
RecentChanges.rc_user == self.user_id
).scalar()
except MultipleResultsFound:
return None
return None
class RecentChanges(Base):
__tablename__ = 'wiki_recentchanges'
class UserGroups(Base):
__tablename__ = 'wiki_user_groups'
ug_group = Column('ug_group', VARBINARY, primary_key=True)
ug_user = Column(
'ug_user',
Integer,
ForeignKey('wiki_user.user_id'),
primary_key=True
)
class IPBlocks(Base):
__tablename__ = 'wiki_ipblocks'
def _do_scan(args, session):
# Get all of the user's that haven't previously made an
# edit.
user_q = session.query(User).filter(
User.user_editcount == 0
)
ips = defaultdict(lambda: dict(count=0, users=list()))
for user in user_q:
user_groups = user.user_groups(session)
# Make sure this isn't a confirmed user.
if user_groups:
if any(g.ug_group == 'confirmed' for g in user_groups):
continue
# Make sure the account is at least X days old.
if args['--days-ago']:
days_ago = int(args['--days-ago'])
delta = datetime.datetime.now() - user.registered
if delta.days < days_ago:
continue
ip = user.ip(session)
if not ip:
continue
ips[ip]['users'].append(user.user_name)
ips[ip]['count'] += 1
if args['--multiple-accounts']:
final_ips = dict(
(k, v)
for k, v in ips.items()
if v['count'] > 1
)
else:
final_ips = dict(ips)
if args['--ban']:
for ip, data in final_ips.items():
existing_bans = session.query(IPBlocks).filter(
IPBlocks.ipb_address == ip
).all()
if existing_bans:
# This IP is already banned, we can forget about
# it.
print('Skipping {ip}.'.format(ip=ip))
continue
session.add(IPBlocks(
ipb_address=ip,
ipb_user=0,
ipb_by=1,
ipb_by_text='TkTech',
ipb_reason='Spam',
ipb_timestamp=to_mw_ts(datetime.datetime.now()),
ipb_auto=0,
ipb_anon_only=0,
ipb_create_account=1,
ipb_enable_autoblock=0,
ipb_expiry='infinity',
ipb_range_start=ip_to_hex(ip),
ipb_range_end=ip_to_hex(ip),
ipb_deleted=0,
ipb_block_email=0,
ipb_allow_usertalk=0,
ipb_parent_block_id=None
))
session.commit()
print('Banned {ip}'.format(ip=ip))
def _do_bans(args, session):
print(set(
ban[0] for ban in session.query(IPBlocks.ipb_address).all()
))
def main(argv):
args = docopt(__doc__, argv=sys.argv[1:])
engine = create_engine(args['<dbstring>'])
Base.prepare(engine)
Session = sessionmaker(bind=engine, autocommit=False)
session = Session()
if args['scan']:
_do_scan(args, session)
elif args['bans']:
_do_bans(args, session)
if __name__ == '__main__':
sys.exit(main(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment