Skip to content

Instantly share code, notes, and snippets.

@n2iw
Created October 17, 2017 15:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save n2iw/6244f413387cda34dbc6f860636b9008 to your computer and use it in GitHub Desktop.
Save n2iw/6244f413387cda34dbc6f860636b9008 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# Batch create users from CSV file
'''
This is one of the tools I wrote recently, The batch create users in a web
application (Knowledge Forum 6).
This program read data from a csv file, organize data and call some APIs
to create users, add users to specified community and group.
'''
'''
Prerequisites:
Python 3
Packages:
Request:
$ pip3 install requests
Usage:
add-users.py -u 'kf6-user-name' -p 'kf6-password' -s 'kf6-server-address'
All parameters have default value
Defualt values are saved in variables list below:
SERVER = 'http://localhost:9000'
USER_NAME = 'user_name'
PASSWORD = 'password'
'''
# Packages need to be installed
import requests
# Python Built-in packages
import argparse
import csv
import os
import json
import datetime
import traceback
from pprint import pprint
from operator import itemgetter
# Defualt Settings
SERVER = 'http://localhost:9000'
USER_NAME = 'user_name'
PASSWORD = 'password'
CREATE_USER_PATH = '/api/users'
LOGIN_PATH = '/auth/local'
ALL_COMMUNIRY_PATH = '/api/communities'
MY_USER_INFO_PATH = '/api/users/me'
ADD_AUTHOR_PATH = '/api/authors'
GROUPS_PATH = '/api/communities/{}/groups' # communityId
MY_COMMUNITIES_PATH = '/api/users/myRegistrations'
CREATE_GROUP_PATH = '/api/groups/{}' # communityId
UPDATE_OBJECT_PATH = '/api/objects/{}/{}' # communityId, objectId
class Kf6:
def __init__(self, server, user, password, community):
self.server = server
self.userName = user
self.password = password
self.communityTitle = community
def createNewUser(self, firstName, lastName, userName, password, email):
url = self.server + CREATE_USER_PATH
json = requests.post(url, json={
'firstName': firstName,
'lastName': lastName,
'userName': userName,
'password': password,
'email': email
}).json()
if 'token' in json:
print("Create user {} succeeded!".format(userName))
self.token = json['token']
self.headers = {'Authorization': 'bearer ' + self.token, 'Content-Type': 'application/json'}
json2 = self.get(MY_USER_INFO_PATH)
if '_id' in json2:
self.userId = json2['_id']
else:
print("Get user info failed!")
else:
pprint(json)
def getMyCommunities(self):
result = self.get(MY_COMMUNITIES_PATH)
self.myCommunityIds = set()
for c in result:
if 'communityId' in c:
self.myCommunityIds.add(c['communityId'])
if self.communityTitle == c['_community']['title']:
self.communityId = c['communityId']
self.authorId = c['_id']
else:
print("Get my communities failed!")
pprint(c)
exit(0)
def joinCommunity(self):
if self.communityId in self.myCommunityIds:
return
data = {
"userId": self.userId,
"communityId": self.communityId,
"registrationKey": self.registrationKey
}
result = self.post(ADD_AUTHOR_PATH, data)
if result and '_id' in result:
self.authorId = result['_id']
else:
print("Join community failed for: {}".format(self.userName))
pprint(result)
def getGroups(self):
if not hasattr(self, 'communityId'):
return
url = GROUPS_PATH.format(self.communityId)
json = self.get(url)
self.groups = {}
if json:
for g in json:
self.groups[g['title']] = g
def createGroup(self, name):
if name in self.groups:
return
data = {
"communityId": self.communityId,
"type": "Group",
"title": name,
"authors": [self.authorId],
"members": [self.authorId],
"status": "active",
"permission": "protected"
}
result = self.post(CREATE_GROUP_PATH.format(self.communityId), data)
if result:
self.groups[name] = result
def addGroupMember(self, name, authorId):
if not hasattr(self, 'groups'):
self.getGroups()
if name not in masterKf6.groups:
self.createGroup(name)
if authorId not in self.groups[name]['members']:
self.groups[name]['members'].append(authorId)
self.put(UPDATE_OBJECT_PATH.format(self.communityId, self.groups[name]['_id']), self.groups[name])
def isReady(self):
if not hasattr(self, "token"):
return False
if not hasattr(self, "headers"):
return False
if not hasattr(self, "communityId"):
return False
return True
def get(self, path):
url = self.server + path
result = requests.get(url, headers=self.headers)
if result.status_code == 200:
return result.json()
else:
print('GET {} failed with code: {}'.format(url, result.status_code))
# traceback.print_stack()
exit(1)
def post(self, path, data):
url = self.server + path
result = requests.post(url, headers=self.headers, json=data)
if result.status_code // 100 == 2:
return result.json()
else:
print('POST {} failed with code: {}'.format(url, result.status_code))
# traceback.print_stack()
pprint(result)
return None
def put(self, path, data):
url = self.server + path
result = requests.put(url, headers=self.headers, json=data)
if result.status_code // 100 == 2:
return result.json()
else:
print('PUT {} failed with code: {}'.format(url, result.status_code))
# traceback.print_stack()
pprint(result)
return None
def login(self):
url = self.server + LOGIN_PATH
json = requests.post(url, json={'userName': self.userName, 'password': self.password}).json()
if 'token' in json:
self.token = json['token']
self.headers = {'Authorization': 'bearer ' + self.token, 'Content-Type': 'application/json'}
json2 = self.get(MY_USER_INFO_PATH)
if '_id' in json2:
self.userId = json2['_id']
else:
print("Get user info failed!")
else:
pprint(json)
def getCommunity(self):
json = self.get(ALL_COMMUNIRY_PATH)
result = {}
for community in json:
cTitle = community['title']
if self.communityTitle.lower() in cTitle.lower():
result = community
communityId = community['_id']
print("Find community: " + cTitle + " : " + communityId)
if hasattr(self, 'communityId') and self.communityId != communityId:
print('Find Two communities, but with diffrent Ids, please refine your input1')
exit(1)
self.communityId = communityId
self.scaffoldIds = community["scaffolds"]
self.registrationKey = community['registrationKey']
self.managerRegistrationKey = community['managerRegistrationKey']
return result
def getTimeStamp():
d = datetime.datetime.now()
date = ( "%02d-%02d-%02d-%02d%02d%02d" %
(d.year, d.month, d.day, d.hour, d.minute, d.second) )
return date
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Batch create KF6 users from CSV file")
parser.add_argument('-u', '--user', help='user name', default=USER_NAME)
parser.add_argument('-p', '--password', help='password', default=PASSWORD)
parser.add_argument('-s', '--server', help='server address', default=SERVER)
parser.add_argument('csvfile', type=argparse.FileType('r'), help='Input CSV file')
args = parser.parse_args()
reader = csv.reader(args.csvfile, dialect='excel')
reader.__next__()
masterKf6 = Kf6(args.server, args.user, args.password, "")
masterKf6.login()
count = 0
for arr in reader:
user = {
'firstName': arr[0].strip(),
'lastName': arr[1].strip(),
'userName': arr[2].strip(),
'password': arr[3].strip(),
'email': arr[2].strip() + '@gmail.com',
'community': arr[4].strip(),
'group': arr[5].strip()
}
print("-" * 50)
print("{}: {}".format(user['community'], user['userName']))
if user['community'] != masterKf6.communityTitle:
masterKf6.communityTitle = user['community']
masterKf6.getMyCommunities()
masterKf6.getGroups()
if not hasattr(masterKf6, 'authorId'):
masterKf6.getCommunity()
if not masterKf6.isReady():
print("Can't get community: '{}'".format(masterKf6.communityTitle))
exit(1)
masterKf6.joinCommunity()
kf6 = Kf6(args.server, user['userName'], user['password'], user['community'])
kf6.login()
if not hasattr(kf6, 'token'):
print("Creating new user")
kf6.createNewUser(user['firstName'], user['lastName'], user['userName'], user['password'], user['email'])
if not hasattr(kf6, 'token'):
print("User: {} failed!".format(user['userName']))
else:
kf6.getMyCommunities()
if not hasattr(kf6, 'authorId'):
kf6.getCommunity()
if not kf6.isReady():
print("Can't get community: '{}'".format(kf6.communityTitle))
exit(1)
kf6.joinCommunity()
if user['group']:
masterKf6.addGroupMember(user['group'], kf6.authorId)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment