Skip to content

Instantly share code, notes, and snippets.

@theSage21
Created May 6, 2016 05:24
Show Gist options
  • Save theSage21/2f408836a66507528d5293a659af0f64 to your computer and use it in GitHub Desktop.
Save theSage21/2f408836a66507528d5293a659af0f64 to your computer and use it in GitHub Desktop.
# We use a class since, well it makes more sense this way and
# is easier to maintain in the future
class DB:
'''
BelonQuery Database
'''
def __init__(self, columns_and_types):
'''
columns must be list of (colname, type)
'''
self.columns = tuple((i for i,j in columns_and_types)) # Stores column names and order
self.typemap = {i: j for i, j in columns_and_types} # Typemap
self.data = [] # Data storage
self.default = type(None) # What is the default type
def save(self, fname_base='db'):
'save to file'
import json
import pickle
with open(fname_base+'.db', 'w') as fl:
fl.write(json.dumps(self.data))
with open(fname_base+'.map', 'wb') as fl:
pickle.dump(self.typemap, fl) # Json has trouble writing class bool to file
with open(fname_base+'.cols', 'w') as fl:
fl.write(json.dumps(self.columns))
def load(self, fname_base='db'):
'load from files'
import json
import pickle
with open(fname_base+'.db', 'r') as fl:
self.data = json.loads(fl.read())
with open(fname_base+'.map', 'rb') as fl:
self.typemap = pickle.load(fl) # Json has trouble writing class bool to file
with open(fname_base+'.cols', 'r') as fl:
self.columns = json.loads(fl.read())
def __valid_row(self, values):
'is this a valid row'
assert len(self.columns) == len(values)
x = [(i, self.typemap[j])
for i, j in zip(values, self.columns)]
y = [isinstance(i, j) for i, j in x]
assert all(y)
def add_row(self, values):
'add a row at the end'
self.__valid_row(values)
self.data.append(values)
def update_row(self, new_values, index):
'update a row by index'
self.__valid_row(new_values)
assert 0 <= index <= len(self.data), 'Out of index'
self.data[index] = new_values
def rm_row(self, index):
'remove by index'
assert 0 <= index <= len(self.data), 'Out of index'
self.data.pop(index)
def _search(self, on_column, *args):
"Search on a columns"
assert on_column in self.columns
ctype = self.typemap[on_column]
# What to do?
ops = {type(''): self.__strsearch,
type(1): self.__numbersearch,
type(True): self.__boolsearch,}
# Do it
return ops[ctype](on_column, *args)
def __boolsearch(self, col, *args):
'Provide one variable for T and F'
# not really needed but to keep consistency with the rest
# of the search styles we use a qtype and a query
qtype, query, *_ = args
index = self.columns.index(col)
if qtype == '==':
return [query == i[index] for i in self.data]
elif qtype == '!=':
return [query != i[index] for i in self.data]
def __strsearch(self, col, *args):
'''
Provides these searches:
- substring
- equals
*args must be querytype, query
querytype must be one of 'in', 'eq'
query is the argument needed
'''
qtype, query, *_ = args
index = self.columns.index(col)
if qtype == 'in':
return [query in i[index] for i in self.data]
elif qtype == 'eq':
return [query == i[index] for i in self.data]
else:
raise Exception("Unknown text query type recieved")
def __numbersearch(self, col, *args):
'''
Provides operator searches
- <
- >
- ==
- <=
- >=
- !=
*args must be querytype, query
'''
qtype, query, *_ = args
index = self.columns.index(col)
if qtype.strip() == '<':
return [query > i[index] for i in self.data]
elif qtype.strip() == '>':
return [query < i[index] for i in self.data]
elif qtype.strip() == '==':
return [query == i[index] for i in self.data]
elif qtype.strip() == '<=':
return [query >= i[index] for i in self.data]
elif qtype.strip() == '>=':
return [query <= i[index] for i in self.data]
elif qtype.strip() == '!=':
return [query != i[index] for i in self.data]
else:
raise Exception('Unknown query type received')
def search(self, *args):
'''
Used for multiple query searches on the database
search format is :
(colname, search_type, query) combiner (.., .., ..)
combiners are one of ['+', '.'] corresponding to [OR, AND]
'''
results, op = None, None
while args:
col, qtype, q, *temp = args # get the current column search
args = temp
r = self._search(col, qtype, q)
if results is None: # this is the first search
results = r
else:
# We have previous searches + operator
prev, this = results, r
# Combine the results as per operator
if op == '+':
results = [i or j for i, j in zip(prev, this)]
elif op == '.':
results = [i and j for i, j in zip(prev, this)]
# See if there is an operator and set it if so
if args:
op, *temp = args
args = temp
return results
def __str__(self):
'Print the db'
string = ','.join(self.columns) + '\n'
string += '-'*len(string) + '\n'
for row in self.data:
string += ','.join(list(map(str, row))) + '\n'
return string
def get(self, series_index):
'Get as per boolean index'
assert len(series_index) == len(self.data)
result = [i for j, i in zip(series_index, self.data) if j]
return result
def getsearch(self, *args):
'Search and return a new DB with the query satisfied'
index = self.search(*args)
data = self.get(index)
# Make new DB
d = DB([(i, self.typemap[i]) for i in self.columns])
for row in data:
d.add_row(row)
return d
def test():
'Testing stuff'
print('Creating database')
db = DB([('srno',int),
('name',str),
('age', float),
('member', bool)
])
print('Adding data')
db.add_row((1,'arjoonn',21.1,True))
db.add_row((2,'arjoonn2',31.3,False))
db.add_row((3,'arjoonn3',41.1,True))
db.add_row((4,'arjoonn4',5.11,False))
db.add_row((5,'arjoonn5',61.1,True))
print(db)
print('Remove row')
db.rm_row(2)
print(db)
query = (15,'arjoonn_new',9.11,True)
print('Update row with new data {}'.format(str(query)))
db.update_row(query, 3)
print(db)
query = ('srno', '==', 2)
print('Search and return index {}'.format(str(query)))
sr_index = db.search(*query)
print(sr_index)
print('Get by index method')
print(db.get(sr_index))
query = ('srno', '<=', 2, '+', 'member', '==', True)
print('Boolean joined query, {}'.format(str(query)))
new_db = db.getsearch(*query)
print(new_db)
print('Save to disk')
db.save('myfile')
print('Load from disk')
new_db.load('myfile')
print(db)
print(new_db)
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment