Skip to content

Instantly share code, notes, and snippets.

@mcsquaredjr
Last active December 30, 2015 08:08
Show Gist options
  • Save mcsquaredjr/7800291 to your computer and use it in GitHub Desktop.
Save mcsquaredjr/7800291 to your computer and use it in GitHub Desktop.
Create issues in a GitHub repo from a source defined in a Google spreadsheet. Order of the columns in the spreadsheet is not important, but those columns that will be mapped into fields **must** be named accordingly (case ignored), others columns will be ignored. You need to install gdata-python-client and gisthub3.py before trying to run the sc…
__author__ = 'Serge Boyko'
__date__ = '12/04/13'
__email__ = 'serge.boyko@gmail.com'
# Read issues from a google spreadsheet and create them in a repository
# on GitHub
# TODO:
# * add validation step to ensure GitHub won't complain about non-existing
# fields
# * add two-way synchronization
import sys
import gdata.docs
import gdata.docs.client
import gdata.docs.data
import gdata.docs.service
import gdata.spreadsheet.service
import re
import datetime
import time
from github3 import login
############################################################
# CLASS GOOGLESPREADSHEET #
############################################################
class GoogleSpreadsheet(object):
'''Wrap up google gdata methods to access a Google spreadsheet'''
def __init__(self, email, passwd):
# Connect to Google account
self.gd_client = gdata.spreadsheet.service.SpreadsheetsService()
self.gd_client.email = email
self.gd_client.password = passwd
self.gd_client.source = 'GoogleSpreadsheet wrapper class'
self.gd_client.ProgrammaticLogin()
def create_spreadsheet(self, spreadsheet_title):
'''Create a new spreadsheet, given its title, return spreadsheet
key if successful.
'''
client = gdata.docs.client.DocsClient()
client.http_client.debug = False
client.client_login(self.gd_client.email,
self.gd_client.password,
'GoogleSpreadsheet wrapper class')
# Create spreadsheet
doc = gdata.docs.data.Resource(type='spreadsheet', title=spreadsheet_title)
document = client.create_resource(doc)
spreadsheet_key = document.GetId().split("%3A")[1]
return spreadsheet_key
def get_spreadsheet_key(self, spreadsheet_title):
'''Get spreadsheet id by spreadsheet title'''
doc_query = gdata.spreadsheet.service.DocumentQuery()
doc_query['title'] = spreadsheet_title
doc_query['title-exact'] = 'true'
ss_feed = self.gd_client.GetSpreadsheetsFeed(query=doc_query)
spreadsheet_key = ss_feed.entry[0].id.text.rsplit('/',1)[1]
return spreadsheet_key
def add_worksheet(self, spreadsheet_key, worksheet_title, row_count, col_count):
'''Add new worsheet to the spreadsheet identified by its key'''
ws = self.gd_client.AddWorksheet(worksheet_title,
row_count, col_count,
spreadsheet_key)
# Get worksheet as SpreadsheatsWorksheet and return its id
return ws.id.text.rsplit('/',1)[1]
def get_worksheet_id(self, spreadsheet_key, worksheet_name):
'''Get worksheet id by spreadsheet key and workbook name'''
ws_feed = self.gd_client.GetWorksheetsFeed(spreadsheet_key)
d = self.worksheet_dict(spreadsheet_key)
try:
id = d[worksheet_name]
except KeyError, e:
print e
id = None
return id
def worksheet_dict(self, spreadsheet_key):
'''Create dictionary containing worsheet's ids with keys equal to
their names if spreadsheet_key is given.
'''
ws_feed = self.gd_client.GetWorksheetsFeed(spreadsheet_key)
d = dict()
for i, entry in enumerate(ws_feed.entry):
d[entry.title.text] = entry.id.text.split('/')[-1]
return d
def _get_worksheet_url(self, spreadsheet_key, worksheet_id):
ws_feed = self.gd_client.GetWorksheetsFeed(spreadsheet_key)
url = None
for i, entry in enumerate(ws_feed.entry):
if entry.id.text.split('/')[-1] == worksheet_id:
url = entry.link[-1].href
break
return url
def delete_worksheet(self, spreadsheet_key, worksheet_id):
url = self._get_worksheet_url(spreadsheet_key, worksheet_id)
self.gd_client.DeleteWorksheet(url=url)
def _get_num_cols(self, range):
'''Compute the number of rows in a range'''
# It's ugly, but I did not find a method in the API that does it
pattern = re.compile('[\d]*$')
first, second = range.split(':')
m1 = re.search(pattern, first)
m2 = re.search(pattern, second)
ind1 = int(first[m1.start():])
ind2 = int(second[m2.start():])
num_cols = ind2 - ind1 + 1
return num_cols
def get_range(self, spreadsheet_key, worksheet_id, rng):
'''Return cell range as a list of tuples, so that each element of
the tuple represents a row of data in the spreadsheet. The range
should be provided following standard R1C1 notation. This method
always returns a rectangular array. Empty cells are returned as
None.
Parameters:
spreadhseet_id -- spreadhseet id
worksheet_id -- worksheet
rng -- range of cells, e.g. 'A2:R23'
'''
# If range is given
if rng is not None:
cell_query = gdata.spreadsheet.service.CellQuery()
cell_query['range'] = rng
cell_query['return-empty'] = 'true' # oh, my!
else:
cell_query = None
cell_feed = self.gd_client.GetCellsFeed(spreadsheet_key,
worksheet_id,
query=cell_query)
entry = cell_feed.entry
num_cols = self._get_num_cols(rng)
num_rows = len(entry) / num_cols
cells = []
for ii in range(len(entry)):
cells.append(entry[ii].content.text)
# Now reshape it to create num_rowsXnum_cols lists of tuples
cells = zip(*[iter(cells)]*num_rows)
return cells
def update_cells(self, spreadsheet_key, worksheet_id, data, row=1, col=1):
'''Update data in a worksheets specified by keys and sheets id,
starting from position specified by row and col (default 1, 1)
Parameters:
data -- is a list of lists, so that each element represents a row,
data must be a rectangular array (all rows are of the same length)
Warning: this method is painfully slow, use insert_row whenever possible.
'''
num_rows = len(data)
num_cols = len(data[0])
for i in range(num_rows):
for j in range(num_cols):
cell = self.gd_client.UpdateCell(i+row,
j+col,
str(data[i][j]),
spreadsheet_key,
worksheet_id)
if isinstance(cell, gdata.spreadsheet.SpreadsheetsCell) == False:
print 'Error updating cell R{0}C{1}'.format(i+row, j+col)
def __insert_rows(self, spreadsheet_key, worksheet_id, data):
'''Insert rows in empty spreadsheet'''
hdr = [data[0]]
self.update_cells(spreadsheet_key, worksheet_id, hdr)
# Now we may use InsertRow
for i in range(1, len(data)):
row_dict = dict()
for j in range(len(data[0])):
row_dict[str(data[0][j]).lower()] = str(data[i][j])
entry = self.gd_client.InsertRow(row_dict, spreadsheet_key, worksheet_id)
if isinstance(entry, gdata.spreadsheet.SpreadsheetsList) == False:
print 'Error inserting row #{0}'.format(i)
def insert_rows(self, spreadsheet_key, worksheet_id, data):
'''Insert a row of data in the spreadsheet. Data should be a list of
lists, representing a rectangular array, i.e. each list has the same
number of elements.
'''
list_feed = self.gd_client.GetListFeed(spreadsheet_key, worksheet_id)
if len(list_feed.entry) == 0:
self.__insert_rows(spreadsheet_key, worksheet_id, data)
else:
# Delete non-empty rows
for i in range(len(list_feed.entry)):
self.gd_client.DeleteRow(list_feed.entry[i])
self.__insert_rows(spreadsheet_key, worksheet_id, data)
def get_cols(self, spreadsheet_key, worksheet_id):
'''Reads entire workbook and returns dictionary, that contains
column names as keys and list of column elements as dictionary
values.
'''
rows = self.gd_client.GetListFeed(spreadsheet_key, worksheet_id)
d = dict()
# Build dictionary
for row in rows.entry:
for key in row.custom:
if key in d.keys():
d[key].append(row.custom[key].text)
else:
d[key] = [row.custom[key].text]
return d
def get_list_query(self, query_str):
'''Retun an instance of ListQuery suitable to pass to the get_cols
method. You may set query parameters by setting up query keys.
Query keys:
sq: for general query such as 'name=john&last=smith'
orderby: for example: 'column:first'
reverse: values are 'true' and 'false', note these are string.
'''
list_query = gdata.spreadsheet.service.ListQuery()
return list_query
############################################################
# CLASS ISSUES MAKER #
############################################################
class Issues_Maker(object):
'''
Represent GitHub object and some metods, needed to create
issues in a GitHub repo.
'''
def __init__(self, user, password):
self.login = login(user, password)
def create_issues(self, repo_name, issues_dict):
'''
For every issue in the issues_dict create an issue in GitHub
repository identified by its name.
'''
li = len(issues_dict['title'])
try:
title = issues_dict['title']
except KeyError:
print '*** ERROR: no titles in issues.'
sys.exit(-3)
try:
body = issues_dict['body']
except KeyError:
issues_dict['body'] = ['']*li
try:
assignee = issues_dict['assignee']
except KeyError:
issues_dict['assignee'] = ['']*li
try:
milestone = issues_dict['milestone']
except KeyError:
issues_dict['milestone'] = ['']*li
# need a list here
try:
labels = issues_dict['labels']
except KeyError:
issues_dict['labels'] = ['']*li
for i in range(li):
title = issues_dict['title'][i]
body = issues_dict['body'][i]
assignee = issues_dict['assignee'][i]
assignee = ['']*li
milestone = issues_dict['milestone'][i]
milestone = ['']*li
# need a list here
try:
label = issues_dict['label'].split(',')
except AttributeError:
label = issues_dict['label']
if title is not None:
self.login.create_issue(self.login.user().login,
repo_name,
title)
# Have to make sure they are valid and exist in the repo
#body=body,
#assignee=assignee,
#milestone=milestone
print 'Creating issue: {0}'.format(issues_dict['title'][i])
else:
print 'Skipping issue with empty body.'
############################################################
# CLASS ISSUES READER #
############################################################
class Issues_Reader(object):
'''
Read issues from a google spreadsheet specified by its name,
worksheet name.
'''
def __init__(self, email, pswd, sheet_name, wks_name):
'''
Read data from a google spreadsheet into a list of dictionaries.
'''
self.sheet = GoogleSpreadsheet(email, pswd)
self.sheet_name = sheet_name
self.wks_name = wks_name
def get_issues(self):
'''Make list of issues suitable for passing to an isrance of
Issue_Maker.
'''
# Read all the data, and then leave only keys needed for issues
try:
spr_key = self.sheet.get_spreadsheet_key(self.sheet_name)
except IndexError:
print '*** ERROR: Cannot find spreadsheet. Migration terminated.'
sys.exit(-1)
# Get worksheet id and quit if was not found
wks_id = self.sheet.get_worksheet_id(spr_key, self.wks_name)
if wks_id is not None:
issue_dict = self.sheet.get_cols(spr_key, wks_id)
# Here is what we need:
issue_keys = ['title', 'body', 'milestone', 'assignee', 'label', 'status']
keys = issue_dict.keys()
for key in keys:
if key.lower() not in issue_keys:
issue_dict.pop(key, None)
return issue_dict
else:
print '*** ERROR: Worksheet does not exist. Migration terminated.'
sys.exit(-2)
if __name__ == '__main__':
# Let's rock
gh_login = raw_input('Enter your Github login:')
gh_password = raw_input('Enter your Github password: ')
goog_login = raw_input('Enter your Google email: ')
goog_password = raw_input('Enter your Google password: ')
repo_name = raw_input('Enter repository name: ')
sheeet_name = raw_input('Enter spreadsheet name: ')
wks_name = raw_input('Enter worksheet name: ')
print '=== OK. Starting migration.'
print 'Reaching Google...'
reader = Issues_Reader(goog_login, goog_password, sheeet_name, wks_name)
print 'Reading issues...'
issues_dict = reader.get_issues()
print 'Reaching GitHub...'
maker = Issues_Maker(gh_login, gh_password)
maker.create_issues(repo_name, issues_dict)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment