Created
October 17, 2017 15:16
-
-
Save n2iw/6244f413387cda34dbc6f860636b9008 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Batch create users from CSV file | |
''' | |
This is one of the tools I wrote recently, The batch create users in a web | |
application (Knowledge Forum 6). | |
This program read data from a csv file, organize data and call some APIs | |
to create users, add users to specified community and group. | |
''' | |
''' | |
Prerequisites: | |
Python 3 | |
Packages: | |
Request: | |
$ pip3 install requests | |
Usage: | |
add-users.py -u 'kf6-user-name' -p 'kf6-password' -s 'kf6-server-address' | |
All parameters have default value | |
Defualt values are saved in variables list below: | |
SERVER = 'http://localhost:9000' | |
USER_NAME = 'user_name' | |
PASSWORD = 'password' | |
''' | |
# Packages need to be installed | |
import requests | |
# Python Built-in packages | |
import argparse | |
import csv | |
import os | |
import json | |
import datetime | |
import traceback | |
from pprint import pprint | |
from operator import itemgetter | |
# Defualt Settings | |
SERVER = 'http://localhost:9000' | |
USER_NAME = 'user_name' | |
PASSWORD = 'password' | |
CREATE_USER_PATH = '/api/users' | |
LOGIN_PATH = '/auth/local' | |
ALL_COMMUNIRY_PATH = '/api/communities' | |
MY_USER_INFO_PATH = '/api/users/me' | |
ADD_AUTHOR_PATH = '/api/authors' | |
GROUPS_PATH = '/api/communities/{}/groups' # communityId | |
MY_COMMUNITIES_PATH = '/api/users/myRegistrations' | |
CREATE_GROUP_PATH = '/api/groups/{}' # communityId | |
UPDATE_OBJECT_PATH = '/api/objects/{}/{}' # communityId, objectId | |
class Kf6: | |
def __init__(self, server, user, password, community): | |
self.server = server | |
self.userName = user | |
self.password = password | |
self.communityTitle = community | |
def createNewUser(self, firstName, lastName, userName, password, email): | |
url = self.server + CREATE_USER_PATH | |
json = requests.post(url, json={ | |
'firstName': firstName, | |
'lastName': lastName, | |
'userName': userName, | |
'password': password, | |
'email': email | |
}).json() | |
if 'token' in json: | |
print("Create user {} succeeded!".format(userName)) | |
self.token = json['token'] | |
self.headers = {'Authorization': 'bearer ' + self.token, 'Content-Type': 'application/json'} | |
json2 = self.get(MY_USER_INFO_PATH) | |
if '_id' in json2: | |
self.userId = json2['_id'] | |
else: | |
print("Get user info failed!") | |
else: | |
pprint(json) | |
def getMyCommunities(self): | |
result = self.get(MY_COMMUNITIES_PATH) | |
self.myCommunityIds = set() | |
for c in result: | |
if 'communityId' in c: | |
self.myCommunityIds.add(c['communityId']) | |
if self.communityTitle == c['_community']['title']: | |
self.communityId = c['communityId'] | |
self.authorId = c['_id'] | |
else: | |
print("Get my communities failed!") | |
pprint(c) | |
exit(0) | |
def joinCommunity(self): | |
if self.communityId in self.myCommunityIds: | |
return | |
data = { | |
"userId": self.userId, | |
"communityId": self.communityId, | |
"registrationKey": self.registrationKey | |
} | |
result = self.post(ADD_AUTHOR_PATH, data) | |
if result and '_id' in result: | |
self.authorId = result['_id'] | |
else: | |
print("Join community failed for: {}".format(self.userName)) | |
pprint(result) | |
def getGroups(self): | |
if not hasattr(self, 'communityId'): | |
return | |
url = GROUPS_PATH.format(self.communityId) | |
json = self.get(url) | |
self.groups = {} | |
if json: | |
for g in json: | |
self.groups[g['title']] = g | |
def createGroup(self, name): | |
if name in self.groups: | |
return | |
data = { | |
"communityId": self.communityId, | |
"type": "Group", | |
"title": name, | |
"authors": [self.authorId], | |
"members": [self.authorId], | |
"status": "active", | |
"permission": "protected" | |
} | |
result = self.post(CREATE_GROUP_PATH.format(self.communityId), data) | |
if result: | |
self.groups[name] = result | |
def addGroupMember(self, name, authorId): | |
if not hasattr(self, 'groups'): | |
self.getGroups() | |
if name not in masterKf6.groups: | |
self.createGroup(name) | |
if authorId not in self.groups[name]['members']: | |
self.groups[name]['members'].append(authorId) | |
self.put(UPDATE_OBJECT_PATH.format(self.communityId, self.groups[name]['_id']), self.groups[name]) | |
def isReady(self): | |
if not hasattr(self, "token"): | |
return False | |
if not hasattr(self, "headers"): | |
return False | |
if not hasattr(self, "communityId"): | |
return False | |
return True | |
def get(self, path): | |
url = self.server + path | |
result = requests.get(url, headers=self.headers) | |
if result.status_code == 200: | |
return result.json() | |
else: | |
print('GET {} failed with code: {}'.format(url, result.status_code)) | |
# traceback.print_stack() | |
exit(1) | |
def post(self, path, data): | |
url = self.server + path | |
result = requests.post(url, headers=self.headers, json=data) | |
if result.status_code // 100 == 2: | |
return result.json() | |
else: | |
print('POST {} failed with code: {}'.format(url, result.status_code)) | |
# traceback.print_stack() | |
pprint(result) | |
return None | |
def put(self, path, data): | |
url = self.server + path | |
result = requests.put(url, headers=self.headers, json=data) | |
if result.status_code // 100 == 2: | |
return result.json() | |
else: | |
print('PUT {} failed with code: {}'.format(url, result.status_code)) | |
# traceback.print_stack() | |
pprint(result) | |
return None | |
def login(self): | |
url = self.server + LOGIN_PATH | |
json = requests.post(url, json={'userName': self.userName, 'password': self.password}).json() | |
if 'token' in json: | |
self.token = json['token'] | |
self.headers = {'Authorization': 'bearer ' + self.token, 'Content-Type': 'application/json'} | |
json2 = self.get(MY_USER_INFO_PATH) | |
if '_id' in json2: | |
self.userId = json2['_id'] | |
else: | |
print("Get user info failed!") | |
else: | |
pprint(json) | |
def getCommunity(self): | |
json = self.get(ALL_COMMUNIRY_PATH) | |
result = {} | |
for community in json: | |
cTitle = community['title'] | |
if self.communityTitle.lower() in cTitle.lower(): | |
result = community | |
communityId = community['_id'] | |
print("Find community: " + cTitle + " : " + communityId) | |
if hasattr(self, 'communityId') and self.communityId != communityId: | |
print('Find Two communities, but with diffrent Ids, please refine your input1') | |
exit(1) | |
self.communityId = communityId | |
self.scaffoldIds = community["scaffolds"] | |
self.registrationKey = community['registrationKey'] | |
self.managerRegistrationKey = community['managerRegistrationKey'] | |
return result | |
def getTimeStamp(): | |
d = datetime.datetime.now() | |
date = ( "%02d-%02d-%02d-%02d%02d%02d" % | |
(d.year, d.month, d.day, d.hour, d.minute, d.second) ) | |
return date | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description="Batch create KF6 users from CSV file") | |
parser.add_argument('-u', '--user', help='user name', default=USER_NAME) | |
parser.add_argument('-p', '--password', help='password', default=PASSWORD) | |
parser.add_argument('-s', '--server', help='server address', default=SERVER) | |
parser.add_argument('csvfile', type=argparse.FileType('r'), help='Input CSV file') | |
args = parser.parse_args() | |
reader = csv.reader(args.csvfile, dialect='excel') | |
reader.__next__() | |
masterKf6 = Kf6(args.server, args.user, args.password, "") | |
masterKf6.login() | |
count = 0 | |
for arr in reader: | |
user = { | |
'firstName': arr[0].strip(), | |
'lastName': arr[1].strip(), | |
'userName': arr[2].strip(), | |
'password': arr[3].strip(), | |
'email': arr[2].strip() + '@gmail.com', | |
'community': arr[4].strip(), | |
'group': arr[5].strip() | |
} | |
print("-" * 50) | |
print("{}: {}".format(user['community'], user['userName'])) | |
if user['community'] != masterKf6.communityTitle: | |
masterKf6.communityTitle = user['community'] | |
masterKf6.getMyCommunities() | |
masterKf6.getGroups() | |
if not hasattr(masterKf6, 'authorId'): | |
masterKf6.getCommunity() | |
if not masterKf6.isReady(): | |
print("Can't get community: '{}'".format(masterKf6.communityTitle)) | |
exit(1) | |
masterKf6.joinCommunity() | |
kf6 = Kf6(args.server, user['userName'], user['password'], user['community']) | |
kf6.login() | |
if not hasattr(kf6, 'token'): | |
print("Creating new user") | |
kf6.createNewUser(user['firstName'], user['lastName'], user['userName'], user['password'], user['email']) | |
if not hasattr(kf6, 'token'): | |
print("User: {} failed!".format(user['userName'])) | |
else: | |
kf6.getMyCommunities() | |
if not hasattr(kf6, 'authorId'): | |
kf6.getCommunity() | |
if not kf6.isReady(): | |
print("Can't get community: '{}'".format(kf6.communityTitle)) | |
exit(1) | |
kf6.joinCommunity() | |
if user['group']: | |
masterKf6.addGroupMember(user['group'], kf6.authorId) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment