osqa to askbot migration script (only works for ldap auth). 1. dump your osqa databse content with $ python manage.py dumpdata >> data.dump
2. load dump into askbot with $ python manage.py osqa_migrate data.dump
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import askbot.models as askbot | |
import dateutil | |
from pprint import pprint | |
from itertools import groupby | |
from askbot.conf import settings as askbot_settings | |
import askbot.deps.django_authopenid.models as askbot_openid | |
from django.core.management.base import BaseCommand, CommandError | |
from django.db.utils import IntegrityError | |
from django.core.exceptions import ValidationError | |
MAX_TRIES = 10 | |
class Command(BaseCommand): | |
help = "Loads OSQA data from json database dump" | |
def handle(self, *arg, **kwarg): | |
if len(arg) < 1 or not os.path.isfile(arg[0]): | |
raise CommandError('Error: first argument must be a json file with the osqa forum data') | |
data = [] | |
with open(arg[0]) as f: | |
data = json.load(f) | |
grouped = dict([(k,list(g)) for k,g in groupby(data, lambda x: x['model'])]) | |
self.nodes = {} | |
self.tags = {} | |
self.users = {} | |
askbot_settings.update('LIMIT_ONE_ANSWER_PER_USER', False) | |
print('Migrating users...') | |
self.migrate_users(grouped['auth.user']) | |
print('done.\n\n') | |
print('Migrating tags...') | |
self.migrate_tags(grouped['forum.tag']) | |
print('done.\n\n') | |
print('Migrating questions, answers and comments...') | |
self.migrate_qac(grouped['forum.node']) | |
print('done.') | |
print('Migrating revisions...') | |
self.migrate_revisions(grouped['forum.noderevision']) | |
print('done.') | |
print('Migrating Actions...') | |
self.migrate_actions(grouped['forum.action']) | |
print('done.') | |
def migrate_users(self, entries_raw): | |
for data_raw in entries_raw: | |
osqa = data_raw['fields'] | |
try: | |
user = askbot.User.objects.get(username = osqa['username']) | |
print("already exist") | |
except askbot.User.DoesNotExist: | |
user = askbot.User() | |
user.username = osqa['username'] | |
user.realname = "%s %s" % (osqa['first_name'],osqa['last_name']) | |
user.id = data_raw['pk'] | |
user.email = osqa['email'] | |
user.reputation = 1 #it's actually re-computed | |
user.last_login = osqa['last_login'] | |
user.last_seen = osqa['last_login'] | |
user.is_active = True | |
user.set_unusable_password() # we use ldap | |
if osqa['is_superuser']: | |
user.set_admin_status() | |
print("Creating User: %s...\t" % user.username), | |
try: | |
user.save() | |
u_openid = askbot_openid.UserAssociation() | |
u_openid.provider_name = 'ldap' | |
u_openid.openid_url = "%s@ldap" % user.username | |
u_openid.user = user | |
u_openid.save() | |
print("success") | |
except IntegrityError: | |
print('error') | |
self.users[data_raw['pk']] = user | |
def migrate_tags(self, entries_raw): | |
for data_raw in entries_raw: | |
osqa = data_raw['fields'] | |
print("Creating Tag: %s..." % osqa['name']), | |
try: | |
tag = askbot.Tag.objects.get(name=osqa['name']) | |
print("already exists") | |
except askbot.Tag.DoesNotExist: | |
author_id = osqa['created_by'] | |
author = self.users[author_id] | |
tag = askbot.Tag(osqa['name'], created_by = author) | |
print("success") | |
self.tags[data_raw['pk']] = tag | |
def migrate_qac(self, entries_raw): | |
amount = len(entries_raw) | |
success = 0 | |
while len(entries_raw) > 0: | |
data_raw = entries_raw.pop(0) | |
osqa = data_raw['fields'] | |
try: | |
author_id = osqa['author'] | |
author = self.users[author_id] | |
added_at = dateutil.parser.parse(osqa['added_at']) | |
except KeyError: | |
continue | |
post_type = osqa['node_type'] | |
if post_type == 'question': | |
print("Creating Question: %s...\t" % osqa['title']), | |
try: | |
if osqa['state_string'] == "(deleted)": | |
print("skipped - (deleted question)") | |
continue | |
q = self.nodes[osqa['parent']] | |
q.title = osqa['title'], | |
q.body_text = osqa['body'], | |
q.save() | |
except KeyError: | |
try: | |
q = author.post_question( | |
title = osqa['title'], | |
body_text = osqa['body'], | |
tags = osqa['tagnames'] or None, | |
wiki = False, | |
timestamp = added_at | |
) | |
q.thread.view_count = osqa['extra_count'] | |
q.save() | |
except ValidationError: | |
continue | |
self.nodes[data_raw['pk']] = q | |
print('success') | |
success += 1 | |
elif post_type == 'answer': | |
try: | |
print("Creating Answer by %s...\t" % author.username), | |
q = self.nodes[osqa['parent']] | |
p = author.post_answer( | |
question = q, | |
body_text = osqa['body'], | |
timestamp = added_at | |
) | |
self.nodes[data_raw['pk']] = p | |
print('success') | |
success += 1 | |
except ValidationError: | |
print("validation error") | |
except KeyError: | |
if 'tried' in data_raw: | |
if data_raw['tried'] > MAX_TRIES: | |
pprint(data_raw) | |
print("error", self.nodes.keys()) | |
continue | |
else: | |
data_raw['tried'] = 1 | |
print("skipping")#, osqa['parent'], self.nodes.keys()) | |
data_raw['tried'] += 1 | |
entries_raw.append(data_raw) | |
elif post_type == 'comment': | |
try: | |
print("Creating Comment by %s...\t" % author.username), | |
q = self.nodes[osqa['parent']] | |
q.add_comment( | |
comment = osqa['body'], | |
added_at = osqa['added_at'], | |
user = author | |
) | |
self.nodes[data_raw['pk']] = q | |
success += 1 | |
print('success') | |
except ValidationError: | |
print("validation error") | |
except KeyError: | |
if 'tried' in data_raw: | |
if data_raw['tried'] > MAX_TRIES: | |
pprint(data_raw) | |
print("error", self.nodes.keys()) | |
continue | |
else: | |
data_raw['tried'] = 1 | |
print("skipping")#, osqa['parent'], self.nodes.keys()) | |
data_raw['tried'] += 1 | |
entries_raw.append(data_raw) | |
else: | |
print("unknown type:",osqa['node_type']) | |
print("Added %d of %d questions,comments or answers" % (success,amount)) | |
def migrate_revisions(self, entries_raw): | |
for data_raw in entries_raw: | |
osqa = data_raw['fields'] | |
try: | |
print("Creating revision: %d...\t" % data_raw['pk']), | |
p = self.nodes[osqa['node']] | |
p.title = osqa['title'], | |
p.body_text = osqa['body'], | |
p.save() | |
print("success") | |
except KeyError: | |
print("not found") | |
def migrate_actions(self, raw): | |
valid = {'voteup' : 'upvote', 'votedown':'downvote', | |
'acceptanswer':'accept_best_answer'} | |
accepted = [x for x in raw if x['fields']['action_type'] in valid.keys()] | |
for action in accepted: | |
osqa = action['fields'] | |
try: | |
print("Adding action %s..." % osqa['action_type']), | |
post = self.nodes[osqa['node']] | |
user_id = osqa['user'] | |
user = self.users[user_id] | |
vote_method = getattr(askbot.User, valid[osqa['action_type']]) | |
vote_method( | |
user, post, | |
timestamp = dateutil.parser.parse(osqa['action_date']), | |
force = True | |
) | |
print("success") | |
except: | |
print("error") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment