Create SET-based data for MySQL testing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import random | |
import time | |
import string | |
from threading import Thread | |
from mysql.utilities.common import (database, options, server, table) | |
from subprocess import call | |
from contextlib import contextmanager | |
server_host = '127.0.0.1' | |
server_port = '3306' | |
server_user = 'sbuser' | |
server_password = 'sbpass' | |
server_connection = "%s:%s@%s:%s" % (server_user, server_password, | |
server_host, server_port) | |
skills = ['actor', 'singer', 'dancer', | |
'actor,singer', 'actor,dancer', | |
'singer,actor', 'singer,dancer', | |
'dancer,actor', 'dancer,singer', | |
'actor,singer,dancer', 'singer,dancer,actor', 'dancer,actor,singer', 'dancer,singer,actor'] | |
class Movie(object): | |
def __init__(self, | |
server, | |
database=u"imdb"): | |
self.server = server | |
self.database = database | |
@property | |
def movie_db(self): | |
""" Connect and return the connection to MySQL """ | |
return self.connect_db(self.database) | |
def connect_db(self, db_name): | |
""" Method to connect to MySQL """ | |
db_options = {u'skip_grants': True} | |
return database.Database(self.server, db_name, db_options) | |
def rnd_user(self): | |
query = u"INSERT INTO sbtest.settest (name, skills) VALUES ('%(name)s','%(skill)s');" | |
f_name = self.genstring(3,9) | |
s = random.randint(0,12) | |
print query % {u'name': f_name, u'skill': skills[s]} | |
#t=Thread(target=self.server.exec_query, args=(query % {u'email': email, u'first_name': f_name, u'last_name': l_name},)) | |
#t.start() | |
def genstring(self,lim_down=3,lim_up=9): | |
alpha = random.randint(lim_down,lim_up) | |
vowels = ['a','e','i','o','u'] | |
consonants = [a for a in string.ascii_lowercase if a not in vowels] | |
####utility functions | |
def a_part(slen): | |
ret = '' | |
for i in range(slen): | |
if i%2 ==0: | |
randid = random.randint(0,20) #number of consonants | |
ret += consonants[randid] | |
else: | |
randid = random.randint(0,4) #number of vowels | |
ret += vowels[randid] | |
return ret | |
def n_part(slen): | |
ret = '' | |
for i in range(slen): | |
randid = random.randint(0,9) #number of digits | |
ret += digits[randid] | |
return ret | |
#### | |
fpl = alpha/2 | |
if alpha % 2 : | |
fpl = int(alpha/2) + 1 | |
lpl = alpha - fpl | |
start = a_part(fpl) | |
end = a_part(lpl) | |
return "%s%s" % (start.capitalize(),end) | |
def main(): | |
while True: | |
try: | |
movie = Movie( server.get_server(u'localhost', server_connection, False)) | |
movie.rnd_user() | |
except: | |
time.sleep(10) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment