Skip to content

Instantly share code, notes, and snippets.

@badocelot
Created September 1, 2015 20:12
Show Gist options
  • Save badocelot/0071a966425b7fca6e23 to your computer and use it in GitHub Desktop.
Save badocelot/0071a966425b7fca6e23 to your computer and use it in GitHub Desktop.
Toward an rdbms in Python...
class Heading (set):
def __init__(self, *attributes):
if len(attributes) == 1 \
and isinstance(attributes[0], (list, tuple, set)):
attributes = attributes[0]
for att in attributes:
if not isinstance(att, str):
print(att)
raise TypeError("Attribute name must be string.")
if att == "":
raise ValueError("Attribute name must not be empty.")
super().__init__(attributes)
def __repr__(self):
if len(self) == 1:
return "Heading(%s)" % repr(tuple(x for x in self)[0])
return "Heading%s" % repr(tuple(self))
def __str__(self):
return "%s" % str(tuple(self))
class Tuple (dict):
def __init__(self, values):
try:
self.__heading = Heading(*values.keys())
except:
raise ValueError("invalid tuple heading")
for a in values:
self[a] = values[a]
@property
def heading(self):
return self.__heading
def __getattr__(self, attr):
return self[attr]
def __getstate__(self):
return dict(self)
def __setstate__(self, state):
for x in state:
self[x] = state[x]
self.__heading = Heading(*self.keys())
def __repr__(self):
return "Tuple(%s)" % repr({x:self[x] for x in self})
def __str__(self):
return str({x:self[x] for x in self})
def Tuples(*tuples):
return tuple(Tuple(t) for t in tuples)
class Relation:
def __init__(self, heading, tuples):
for t in tuples:
if t.heading != heading:
raise ValueError("invalid tuple: %s" % repr(t))
self.__heading = Heading(heading)
self.__tuples = tuple(t for t in tuples)
@property
def heading(self):
return self.__heading
@property
def tuples(self):
return self.__tuples
def union(self, other):
if other.heading != self.heading:
raise TypeError("incompatible headings")
new_tuples = list(self.tuples)
for t in other.tuples:
if t not in new_tuples:
new_tuples.append(t)
return Relation(self.heading, tuple(new_tuples))
def intersection(self, other):
if other.heading != self.heading:
raise TypeError("incompatible headings")
new_tuples = []
for t in self.tuples:
if t in other.tuples:
new_tuples.append(t)
return Relation(self.heading, tuple(new_tuples))
def difference(self, other):
if other.heading != self.heading:
raise TypeError("incompatible headings")
new_tuples = []
for t in self.tuples:
if t not in other.tuples:
new_tuples.append(t)
return Relation(self.heading, tuple(new_tuples))
def projection(self, new_heading):
for att in new_heading:
if att not in self.heading:
raise TypeError("invalid heading")
new_tuples = []
for t in self.tuples:
new_t = {}
for att in new_heading:
new_t[att] = t[att]
new_t = Tuple(new_t)
if new_t not in new_tuples:
new_tuples.append(new_t)
return Relation(new_heading, tuple(new_tuples))
def restriction(self, condfn):
new_tuples = []
for t in self.tuples:
if condfn(t):
new_tuples.append(t)
return Relation(self.heading, tuple(new_tuples))
def rename(self, old_attr, new_attr):
if new_attr in self.heading:
raise ValueError("attribute name exists")
new_heading = []
for a in self.heading:
if a == old_attr:
new_heading.append(new_attr)
else:
new_heading.append(a)
new_heading = Heading(*new_heading)
new_tuples = []
for t in self.tuples:
new_t = {}
for a in t:
if a == old_attr:
new_t[new_attr] = t[a]
else:
new_t[a] = t[a]
new_t = Tuple(new_t)
new_tuples.append(new_t)
return Relation(new_heading, tuple(new_tuples))
def cross_join(self, other):
x = self
y = other
while len(x.heading.intersection(y.heading)) > 0:
for a in x.heading:
if a in other.heading:
x = x.rename(a, a + " 1")
y = y.rename(a, a + " 2")
new_heading = Heading(x.heading.union(y.heading))
new_tuples = []
for t in x.tuples:
for u in y.tuples:
new_t = {}
for a in t.heading:
new_t[a] = t[a]
for a in u.heading:
new_t[a] = u[a]
new_t = Tuple(new_t)
new_tuples.append(new_t)
return Relation(new_heading, tuple(new_tuples))
def natural_join(self, other):
new_heading = Heading(self.heading.union(other.heading))
overlap = self.heading.intersection(other.heading)
new_tuples = []
for t in self.tuples:
for u in other.tuples:
match = True
for a in overlap:
if t[a] != u[a]:
match = False
break
if match:
new_t = {}
for a in self.heading:
new_t[a] = t[a]
for a in other.heading:
new_t[a] = u[a]
new_t = Tuple(new_t)
if new_t not in new_tuples:
new_tuples.append(new_t)
return Relation(new_heading, tuple(new_tuples))
def join (self, other, wherefn):
x = self
y = other
while len(x.heading.intersection(y.heading)) > 0:
for a in x.heading:
if a in other.heading:
x = x.rename(a, a + " 1")
y = y.rename(a, a + " 2")
new_heading = Heading(x.heading.union(y.heading))
new_tuples = []
for i in range(len(self.tuples)):
t1 = self.tuples[i]
t2 = x.tuples[i]
for j in range(len(other.tuples)):
u1 = other.tuples[j]
u2 = y.tuples[j]
if wherefn(t1, u1):
new_t = {}
for a in x.heading:
new_t[a] = t2[a]
for a in y.heading:
new_t[a] = u2[a]
new_t = Tuple(new_t)
new_tuples.append(new_t)
return Relation(new_heading, tuple(new_tuples))
def __getstate__(self):
return {"heading": self.heading, "tuples": self.tuples}
def __setstate__(self, state):
self.__heading = state["heading"]
self.__tuples = state["tuples"]
def __getattr__(self, attr):
return self.__getattribute__(attr)
def __len__(self):
return len(self.tuples)
def __repr__(self):
return "Relation(%s, %s)" % (repr(self.heading), repr(self.tuples))
def __str__(self):
return "\n".join(tuple(str(t) for t in self.tuples))
class Relvar:
def __init__(self, name, heading, tuples):
self.__relation = Relation(heading, tuples)
self.__name = name
@property
def heading(self):
return self.__relation.heading
@property
def name(self):
return self.__name
@property
def relation(self):
return self.__relation
@property
def tuples(self):
return self.__relation.tuples
def insert(self, t):
if t.heading != self.heading:
raise TypeError("incompatible headings")
self.__relation = self.relation.union(Relation(t.heading, (t,)))
def insert_many(self, tuples):
for t in tuples:
if t.heading != self.heading:
raise TypeError("incompatible headings")
self.__relation = self.relation.union(Relation(t.heading, tuples))
def remove(self, t):
if t.heading != self.heading:
raise TypeError("incompatible headings")
self.__relation = self.relation.difference(Relation(t.heading, (t,)))
def remove_where(self, wherefn):
self.__relation = self.relation.difference(
self.relation.restriction(wherefn))
def update(self, old_tuple, new_tuple):
if old_tuple not in self.tuples:
raise ValueError("tuple does not exist in relation")
if new_tuple.heading != self.heading:
raise TypeError("incompatible headings")
self.__relation = self.relation.difference(
Relation(self.heading, (old_tuple,))).union(
Relation(self.heading, (new_tuple,)))
def update_many(self, old_tuples, new_tuples):
for old in old_tuples:
if old not in self.tuples:
raise ValueError("tuple does not exist in relation")
for new in new_tuples:
if new.heading != self.heading:
raise TypeError("incompatible headings")
self.__relation = self.relation.difference(
Relation(self.heading, old_tuples)).union(
Relation(self.heading, new_tuples))
def update_where(self, wherefn):
class Setter:
def set(self2, attr, value):
if attr not in self.heading:
raise ValueError("invalid attribute")
old_tuples = self.relation.restriction(wherefn).tuples
new_tuples = []
for t in old_tuples:
new_t = {}
for a in t.heading:
if a == attr:
new_t[a] = value
else:
new_t[a] = t[a]
new_t = Tuple(new_t)
new_tuples.append(new_t)
self.update_many(old_tuples, new_tuples)
return self2
return Setter()
@staticmethod
def from_relation(name, rel):
return Relvar(name, rel.heading, rel.tuples)
@staticmethod
def load(filename):
import pickle
f = open(filename, "rb")
r = pickle.load(f)
f.close()
return r
def save(self, filename):
import pickle
f = open(filename, "wb")
pickle.dump(self, f)
f.close()
def __getattr__(self, attr):
return self.__relation.__getattr__(attr)
def __getstate__(self):
return {"name": self.name, "relation": self.relation}
def __setstate__(self, state):
self.__name = state["name"]
self.__relation = state["relation"]
def __len__(self):
return len(self.tuples)
def __repr__(self):
return "Relvar(%s, %s, %s)" % (repr(self.name), repr(self.heading),
repr(self.tuples))
def __str__(self):
return self.name + ":\n" + "\n".join(tuple(str(t) for t in self.tuples))
# Test data
A = Relvar("A", Heading("ID"), Tuples({"ID": 1}, {"ID": 2}))
S = Relvar("S", Heading("SNO", "SNAME", "STATUS", "CITY"), (
Tuple({"SNO": "S1", "SNAME": "Clark", "STATUS": 10, "CITY": "London"}),
Tuple({"SNO": "S2", "SNAME": "Duncan", "STATUS": 20, "CITY": "Athens"}),
Tuple({"SNO": "S3", "SNAME": "Michael", "STATUS": 30, "CITY": "London"})))
SP = Relvar("SP", Heading("SNO", "PNO", "QTY"), (
Tuple({"SNO": "S1", "PNO": "P1", "QTY": 20}),
Tuple({"SNO": "S1", "PNO": "P2", "QTY": 10}),
Tuple({"SNO": "S2", "PNO": "P1", "QTY": 5}),
Tuple({"SNO": "S3", "PNO": "P2", "QTY": 10})))
TABLE_DUM = Relation(Heading(), ())
TABLE_DEE = Relation(Heading(), (Tuple({}),))
@badocelot
Copy link
Author

If the TABLE_DUM and TABLE_DEE didn't give it away, this is heavy inspired by C.J. Date's work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment