Created
May 7, 2014 12:46
-
-
Save kowey/5e874a11780fe2d95afc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# utilities for (darcs) roundup interaction | |
# $ PYTHONSTARTUP=/var/lib/roundup/trackers/darcs/pythonrc.py python | |
from pprint import pprint as pp | |
def nub(xs): return list(set(xs)) | |
def replaced_list(old,new,l): | |
l2 = [] | |
for i in l: | |
if i == old and not i in l2: l2.append(new) | |
else: l2.append(i) | |
return l2 | |
from roundup.date import Date,Interval,Range | |
from roundup import instance | |
db = instance.open('/var/lib/roundup/trackers/darcs').open('admin') | |
classes = db.getclasses() | |
def filter_issues (spec={},sort=[],group=[]): return db.issue.filter (None,spec,sort,group) | |
#def filter_patches (spec={},sort=[],group=[]): return db.patch.filter (None,spec,sort,group) | |
def filter_files (spec={},sort=[],group=[]): return db.file.filter (None,spec,sort,group) | |
def filter_users (spec={},sort=[],group=[]): return db.user.filter (None,spec,sort,group) | |
def filter_msgs (spec={},sort=[],group=[]): return db.msg.filter (None,spec,sort,group) | |
def filter_statuses(spec={},sort=[],group=[]): return db.status.filter(None,spec,sort,group) | |
def filter_priorities(spec={},sort=[],group=[]): return db.priority.filter(None,spec,sort,group) | |
def filter_queries (spec={},sort=[],group=[]): return db.query.filter (None,spec,sort,group) | |
issue = db.issue.getnode | |
#patch = db.patch.getnode | |
file = db.file.getnode | |
user = db.user.getnode | |
msg = db.msg.getnode | |
status = db.status.getnode | |
priority = db.priority.getnode | |
query = db.query.getnode | |
issues = [issue(i) for i in db.issue.list()] | |
patches = [] #patch(i) for i in db.patch.list()] | |
files = [file(i) for i in db.file.list()] | |
users = [user(i) for i in filter_users({},sort=[('+','creation')])] | |
msgs = [msg(i) for i in db.msg.list()] | |
statuses = [status(i) for i in db.status.list()] | |
priorities = [priority(i) for i in db.priority.list()] | |
queries = [query(i) for i in db.query.list()] | |
issues_assigned_to = lambda id:db.issue.find(assignedto=str(id)) | |
nosy_issues_for = lambda id:db.issue.find(nosy=str(id)) | |
msgs_from = lambda id:db.msg.find(author=str(id)) | |
msgs_to = lambda id:db.msg.find(recipients=str(id)) | |
queries_of = lambda id:db.query.find(private_for=str(id)) | |
user_links = lambda id:[issues_assigned_to(id),msgs_from(id),msgs_to(id),queries_of(id)] | |
user_link_counts = lambda id:[len(issues_assigned_to(id)),len(msgs_from(id)),len(msgs_to(id)),len(queries_of(id))] | |
status_named = lambda n: [s.id for s in statuses if s.name==n][0] | |
priority_named = lambda n: [p.id for p in priorities if p.name==n][0] | |
def reassign_issues(olduserid,newuserid): | |
for i in issues_assigned_to(olduserid): issue(i).assignedto = newuserid | |
def readdress_nosy_issues(olduserid,newuserid): | |
for i in nosy_issues_for(olduserid): | |
issue(i).nosy = replaced_list(str(olduserid),str(newuserid),issue(i).nosy) | |
def remove_nosy(userobj,issueobj): | |
issueobj.nosy = filter(lambda id:id!=userobj.id, issueobj.nosy) | |
def remove_all_nosy(username): | |
print "Takes a while. Remember to db.commit() when done. You may want to ctrl-c and commit partial progress." | |
u = primary_user_named(username) | |
issues = nosy_issues_for(u.id) | |
n = 0 | |
for i in issues: | |
n += 1 | |
print "removed %s from issue %s (%d of %d)" % (u.username, i, n, len(issues)) | |
remove_nosy(u,issue(i)) | |
def reauthor_msgs(olduserid,newuserid): | |
for m in msgs_from(olduserid): msg(m).author = newuserid | |
def readdress_msgs(olduserid,newuserid): | |
for m in msgs_to(olduserid): | |
msg(m).recipients = replaced_list(str(olduserid),str(newuserid),msg(m).recipients) | |
def reassign_queries(olduserid,newuserid): | |
for q in queries_of(olduserid): query(q).private_for = newuserid | |
def reassign_links(olduserid,newuserid): | |
reassign_issues(olduserid,newuserid) | |
readdress_nosy_issues(olduserid,newuserid) | |
reauthor_msgs(olduserid,newuserid) | |
readdress_msgs(olduserid,newuserid) | |
reassign_queries(olduserid,newuserid) | |
# ---------------------------------------------------------------------- | |
# fixing duplicate users | |
# not yet handled: user history | |
# ---------------------------------------------------------------------- | |
# you can also use this manually if the methods below for detecting duplicates | |
# are not enough | |
def merge_users(n,extra_ns): | |
u=primary_user_named(n) | |
alts = [] | |
if u.alternate_addresses != None: | |
alts += u.alternate_addresses.split("\n") | |
for en in extra_ns: | |
u2=primary_user_named(en) | |
reassign_links(u2.id,u.id) | |
alts.append(u2.address) | |
if u2.alternate_addresses != None: | |
alts += u2.alternate_addresses.split("\n") | |
u2.retire() | |
alts = nub(alts) | |
if u.address in alts: | |
alts.remove(u.address) | |
u.alternate_addresses = "\n".join(alts) | |
def most_active_first(dupes): | |
activity_and_users = sorted([(sum(user_link_counts(u.id)),u) for u in dupes]) | |
activity_and_users.reverse() | |
return [ x[1] for x in activity_and_users ] | |
# useful for notifying users of merging | |
def show_emails(names): | |
for x in names: | |
u = primary_user_named(x) | |
alts = [ u.address ] | |
if u.alternate_addresses != None: | |
alts += u.alternate_addresses.split("\n") | |
print '%-30s %s' % (u.username, ', '.join(alts)) | |
# fixing duplicate usernames (first one is used) | |
usernames = [u.username for u in users] | |
username_counts = [(n,usernames.count(n)) for n in usernames] | |
dupe_usernames = sorted(nub([u for (u,n) in username_counts if n > 1])) | |
dupe_users = [u for u in users if u.username in dupe_usernames] | |
dupe_user_links = [[u.id, u.creation, u.username, issues_assigned_to(u.id), msgs_from(u.id), msgs_to(u.id), queries_of(u.id)] for u in dupe_users] | |
users_named = lambda n: [user(u) for u in filter_users({'username':n},sort=[('+','creation')]) if user(u).username==n] | |
primary_user_named = lambda n: users_named(n) and users_named(n)[0] or None | |
secondary_users_named = lambda n: users_named(n)[1:] | |
def show_dupe_username_users(): | |
for n in dupe_usernames: | |
for u in users_named(n): | |
print [u.creation,u.username,u.address] | |
def show_dupe_username_users_detail(): | |
for n in dupe_usernames: | |
for u in users_named(n): | |
print [u.creation,u.username,u.address,issues_assigned_to(u.id),msgs_from(u.id),msgs_to(u.id)] | |
def consolidate_links_to_dupe_username_users(): | |
for n in dupe_usernames: | |
for u in secondary_users_named(n): | |
reassign_links(u.id,primary_user_named(n).id) | |
def retire_secondary_dupe_username_users(): | |
for n in dupe_usernames: | |
for u in secondary_users_named(n): | |
u.retire() | |
# fixing duplicate email addresses (most active is used) | |
addresses = [u.address for u in users] | |
address_counts = [(n,addresses.count(n)) for n in addresses] | |
dupe_addresseses = sorted(nub([u for (u,n) in address_counts if n > 1])) | |
def dupes_for_address(x): | |
dupes = [ u for u in users if u.address == x ] | |
return most_active_first(dupes) | |
def dupes_for_realname(x): | |
dupes = [ u for u in users if u.realname == x ] | |
return most_active_first(dupes) | |
def show_dupe_address_users(): | |
for a in dupe_addresseses: | |
just_users = [ x.username for x in dupes_for_address(a) ] | |
print '%-30s %s' % (a,' '.join(just_users)) | |
def merge_dupe_address_users(): | |
for a in dupe_addresseses: | |
dupes = [ u.username for u in dupes_for_address(a) ] | |
print 'merging %s' % ' '.join(dupes) | |
merge_users(dupes[0], dupes[1:]) | |
# fixing duplicate real names (must be done by hand via merge_users, but these will help) | |
realnames = [ u.realname for u in users ] | |
realname_counts = [ (n, realnames.count(n)) for n in realnames ] | |
dupe_realnames = sorted(nub([u for (u,n) in realname_counts if n > 1])) | |
# do NOT write merge_dupe_realname_users; will not be reliable | |
def show_dupe_realname_users(): | |
for a in dupe_realnames: | |
just_users = [ x.username for x in dupes_for_realname(a) ] | |
quoted_users = [ "'" + x + "'" for x in just_users ] | |
print '%-30s merge_users(%s,[%s])' % (a,quoted_users[0],' '.join(quoted_users[1:])) | |
# ---------------------------------------------------------------------- | |
# show status | |
del i, u, n | |
h = "loaded roundup utilities: %s\n" % (', '.join(filter(lambda x:x[0]!='_',dir()))) + \ | |
"%d users, %d issues, %d patches, %d files, %d messages, %d statuses, %d priorities, %d queries, %d duplicated usernames, %d duplicated addresses at %s" % \ | |
(len(users),len(issues),len(patches),len(files),len(msgs),len(statuses),len(priorities),len(queries),len(dupe_usernames),len(dupe_addresseses),Date()) | |
print h | |
# vim: ai ts=4 sts=4 et sw=4 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment