Created
November 3, 2011 20:17
-
-
Save matteobertozzi/1337662 to your computer and use it in GitHub Desktop.
Sql Join
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# http://en.wikipedia.org/wiki/Join_(SQL) | |
def crossJoin(table_a, table_b): | |
""" | |
Cross join returns the Cartesian product of rows from tables in the join. | |
In other words, it will produce rows which combine each row from the first | |
table with each row from the second table. | |
""" | |
results = JoinTable(table_a, table_b) | |
for b_row in table_b: | |
for a_row in table_a: | |
results.insert(a_row, b_row) | |
return results | |
def innerJoin(table_a, table_b, predicate_func): | |
""" | |
Inner join creates a new result table by combining column values of two | |
tables (A and B) based upon the join-predicate. | |
The query compares each row of A with each row of B to find all pairs of | |
rows which satisfy the join-predicate. When the join-predicate is satisfied, | |
column values for each matched pair of rows of A and B are combined into a | |
result row. | |
""" | |
results = JoinTable(table_a, table_b) | |
for a_row in table_a: | |
for b_row in table_b: | |
if predicate_func(a_row, b_row): | |
results.insert(a_row, b_row) | |
return results | |
def equiJoin(table_a, table_b, key_a, key_b): | |
""" | |
An equi-join is a specific type of comparator-based join, or theta join, | |
that uses only equality comparisons in the join-predicate. | |
""" | |
return innerJoin(table_a, table_b, lambda a, b: a[key_a] == b[key_b]) | |
def naturalJoin(table_a, table_b): | |
""" | |
A natural join offers a further specialization of equi-joins. | |
The join predicate arises implicitly by comparing all columns in both | |
tables that have the same column-names in the joined tables. | |
""" | |
keys = set(table_a.columns) & set(table_b.columns) | |
predicate = lambda a, b: all([a[k] == b[k] for k in keys]) | |
return innerJoin(table_a, table_b, predicate) | |
def leftOuterJoin(table_a, table_b, predicate_func): | |
""" | |
The result of a left outer join (or simply left join) for table A and B | |
always contains all records of the "left" table (A), even if the | |
join-condition does not find any matching record in the "right" table (B). | |
This means that if the ON clause matches 0 (zero) records in B, the join | |
will still return a row in the result-but with NULL in each column from B. | |
This means that a left outer join returns all the values from the left | |
table, plus matched values from the right table (or NULL in case of no | |
matching join predicate). If the right table returns one row and the left | |
table returns more than one matching row for it, the values in the right | |
table will be repeated for each distinct row on the left table | |
""" | |
null_b_row = dict((k, None) for k in table_b.columns) | |
results = JoinTable(table_a, table_b) | |
for a_row in table_a: | |
match = False | |
for b_row in table_b: | |
if predicate_func(a_row, b_row): | |
results.insert(a_row, b_row) | |
match = True | |
if match == False: | |
results.insert(a_row, null_b_row) | |
return results | |
def rightOuterJoin(table_a, table_b, predicate_func): | |
""" | |
A right outer join (or right join) closely resembles a left outer join, | |
except with the treatment of the tables reversed. | |
Every row from the "right" table (B) will appear in the joined table | |
at least once. If no matching row from the "left" table (A) exists, NULL | |
will appear in columns from A for those records that have no match in B. | |
""" | |
return leftOuterJoin(table_b, table_a, predicate_func) | |
def groupBy(table, key, predicate_func=None): | |
groups = {} | |
for row in table: | |
if predicate_func is None or predicate_func(row): | |
groups.setdefault(row[key], []).append(row) | |
tgroups = Table(table.name, table.columns) | |
for _, g in sorted(groups.iteritems()): | |
tgroups.bulkInsert(g) | |
return tgroups | |
class Table(object): | |
def __init__(self, name, columns): | |
self.columns = list(columns) | |
self.name = name | |
self.rows = [] | |
def count(self): | |
return len(self.rows) | |
def insert(self, values): | |
assert len(values) == len(self.columns) | |
if isinstance(values, dict): | |
values = [values[k] for k in self.columns] | |
self.rows.append(values) | |
def bulkInsert(self, lvalues): | |
for values in lvalues: | |
self.insert(values) | |
def __iter__(self): | |
for row in self.rows: | |
yield dict((k, v) for k, v in zip(self.columns, row)) | |
class JoinTable(Table): | |
def __init__(self, table_a, table_b): | |
self.table_a = table_a | |
self.table_b = table_b | |
name = '%s+%s' % (table_a.name, table_b.name) | |
columns_a = ['%s.%s' % (table_a.name, c) for c in table_a.columns] | |
columns_b = ['%s.%s' % (table_b.name, c) for c in table_b.columns] | |
super(JoinTable, self).__init__(name, columns_a + columns_b) | |
def insert(self, values_a, values_b): | |
assert isinstance(values_a, dict) | |
assert isinstance(values_b, dict) | |
values_a = [values_a[k] for k in self.table_a.columns] | |
values_b = [values_b[k] for k in self.table_b.columns] | |
assert len(values_a + values_b) == len(self.columns) | |
return super(JoinTable, self).insert(values_a + values_b) | |
if __name__ == '__main__': | |
employee = Table('employee', ('department_id', 'last_name', 'country')) | |
employee.insert((31, 'Rafferty', 'Australia')) | |
employee.insert((33, 'Jones', 'Australia')) | |
employee.insert((33, 'Steinberg', 'Australia')) | |
employee.insert((34, 'Robinson', 'United States')) | |
employee.insert((34, 'Smith', 'Germany')) | |
employee.insert((None, 'John', 'Germany')) | |
department = Table('department', ('department_id', 'department_name')) | |
department.insert((31, 'Sales')) | |
department.insert((33, 'Engineering')) | |
department.insert((34, 'Clerical')) | |
department.insert((35, 'Marketing')) | |
print 'LEFT' | |
for row in leftOuterJoin(employee, department, lambda a, b: a['department_id'] == b['department_id']): | |
print row | |
print 'RIGHT' | |
for row in rightOuterJoin(employee, department, lambda a, b: a['department_id'] == b['department_id']): | |
print row | |
print 'Group By' | |
for row in groupBy(employee, 'country'): | |
print row | |
print 'Inner Join' | |
for row in innerJoin(employee, department, lambda a, b: a['department_id'] == b['department_id']): | |
print row | |
print 'Cross Join' | |
for row in crossJoin(employee, department): | |
print(row) | |
print 'Natural Join' | |
for row in naturalJoin(employee, department): | |
print row |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment