Created
June 29, 2012 07:28
-
-
Save pniederlag/3016471 to your computer and use it in GitHub Desktop.
automate openerp setup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
''' | |
Created on 29.03.2012 | |
@author: pn | |
''' | |
import sys | |
import traceback | |
import argparse | |
import openerplib | |
from xmlrpclib import Fault | |
import csv | |
import re | |
def get_connector(): | |
# general service connector | |
conn=openerplib.get_connector('localhost', 'xmlrpc', 8089) | |
return conn | |
def get_connection(login='admin', password='admin'): | |
# authorized db connection | |
connection = openerplib.get_connection(hostname='localhost', protocol='xmlrpc', port=8089, database=db_name, login=login, password=password) | |
return connection | |
def create_database(): | |
#handle db creation | |
db_service=get_connector().get_service('db') | |
#print '{0}'.format(db_service.list()) | |
if not db_service.db_exist(db_name): | |
# none threading as we rely on the db further on | |
print 'will create database "{0}" now'.format(db_name) | |
db_service.create_database('admin', db_name, False, 'de_DE', 'admin') | |
else: | |
print 'use existing database "{0}"'.format(db_name) | |
def install_update_module(name, update=False): | |
# install/update/upgrade module | |
module_osv=connection.get_model('ir.module.module') | |
module_ids=module_osv.search([('name', '=', name)]) | |
module=module_osv.read(module_ids[0]) | |
#print '{0}'.format(module['state']) | |
if not module['state'] == 'installed': | |
print 'installing "{0}"'.format(name) | |
module_osv.button_immediate_install([module_ids[0]]) | |
elif update: | |
print 'upgrading "{0}"'.format(name) | |
module_osv.button_upgrade([module_ids[0]]) | |
upgrade_id=connection.get_model('base.module.upgrade').create({'module_info': module_ids[0]}) | |
connection.get_model('base.module.upgrade').upgrade_module(upgrade_id) | |
else: | |
print 'skipping "{0}"'.format(name) | |
def hotfix_irrule(): | |
print 'apply hotfix ir_rule' | |
model, model_id = connection.get_model('ir.model.data').get_object_reference('base', 'model_ir_values') | |
rule_osv=connection.get_model('ir.rule') | |
rule_ids=rule_osv.search([('model_id', '=', model_id)]) | |
rule=rule_osv.read(rule_ids[0]) | |
#rule['domain_force'] = "[('key','=','default'),'|',('user_id','=',user.id),('name','in',['taxes_id','supplier_taxes_id'])]" | |
rule_osv.write(rule_ids[0],{'domain_force' : "[('key','=','default'),'|',('user_id','=',user.id),('name','in',['taxes_id','supplier_taxes_id'])]"}) | |
def install_languages(): | |
print 'install/update languages/translations' | |
languages = ['el_GR'] | |
for lang in languages: | |
lid = connection.get_model('base.language.install').create({'lang' : lang}) | |
#lang_obj = connection.get_model('base.language.install').read(lid) | |
connection.get_model('base.language.install').lang_install([lid]) | |
pass | |
def create_periods(xml_ref, charts, user, pw, period='3months'): | |
# Looking for the companies | |
company_connection = get_connection(user, pw) | |
model, company_id = company_connection.get_model('ir.model.data').get_object_reference(xml_ref[0], xml_ref[1]) | |
#chart_module_ids=module_osv.search([('name', '=', 'l10n_de')]) | |
account_installer_data={ | |
'company_id': company_id, | |
'charts' : charts, | |
'period' : '3months' | |
} | |
account_installer_osv=company_connection.get_model('account.installer') | |
try: | |
#account_installer_osv.check_unconfigured_cmp() | |
if not account_installer_osv.get_unconfigured_cmp(): | |
print 'no periods to add' | |
return | |
except Fault as ex: | |
print ex.faultCode | |
return | |
try: | |
account_installer_ids=account_installer_osv.search([('company_id','=',account_installer_data['company_id'])]) | |
if account_installer_ids: | |
account_installer_id=account_installer_ids[0] | |
else: | |
account_installer_id=account_installer_osv.create(account_installer_data) | |
account_installer=account_installer_osv.read([account_installer_id]) | |
account_installer_osv.execute_simple([account_installer_id]) | |
except Fault as ex: | |
if ex.faultCode == 'cannot marshal None unless allow_none is enabled': | |
print 'catch and ignore "{0}"'.format(ex.faultCode) | |
else: | |
raise Exception(ex.faultCode) | |
def create_account_charts(xml_ref, user, pw): | |
company_connection = get_connection(user, pw) | |
account_installer_osv=company_connection.get_model('account.installer') | |
if not account_installer_osv.get_unconfigured_cmp(): | |
print 'no companies left to configure' | |
return | |
account_charts_data={ | |
'company_id': 'dummy', | |
'chart_template_id': 'dummy', | |
'bank_accounts_id': [[0,0, {'currency_id':False, 'acc_name': 'Barkasse', 'account_type': 'cash'}],[0,0, {'currency_id':False, 'acc_name': 'Bank', 'account_type': 'bank'}]], | |
'code_digits': 6, | |
'seq_journal': True, | |
'sale_tax': 'dummy', | |
'purchase_tax': 'dummy', | |
'sale_tax_rate': 19.00, | |
'purchase_tax_rate': 19.00, | |
'complete_tax_set': True | |
} | |
model, company_id = company_connection.get_model('ir.model.data').get_object_reference(xml_ref[0], xml_ref[1]) | |
if xml_ref[0] + '.' + xml_ref[1] == 'profile_sonnett.german_company': | |
model, tmpl_sale_tax_id = company_connection.get_model('ir.model.data').get_object_reference('l10n_de','tax_code_USTVA_skr03') | |
model, tmpl_purchase_tax_id = company_connection.get_model('ir.model.data').get_object_reference('l10n_de','tax_code_VST_skr03') | |
model, tmpl_account_id = company_connection.get_model('ir.model.data').get_object_reference('l10n_de','root_de') | |
elif xml_ref[0] + '.' + xml_ref[1] == 'profile_sonnett.greek_company': | |
# @todo FIXME add proper accounts for greek | |
model, tmpl_sale_tax_id = company_connection.get_model('ir.model.data').get_object_reference('l10n_gr','ivat23') # @todo check | |
model, tmpl_purchase_tax_id = company_connection.get_model('ir.model.data').get_object_reference('l10n_gr','pvat23') # @todo check | |
model, tmpl_account_id = company_connection.get_model('ir.model.data').get_object_reference('l10n_gr','l10n_gr_chart_template') # @todo check | |
print tmpl_sale_tax_id# | |
print tmpl_purchase_tax_id | |
print tmpl_account_id | |
else: | |
print 'tmpl_account_id could not be found, abort' | |
return | |
account_charts_data['company_id'] = company_id | |
account_charts_data['chart_template_id'] = tmpl_account_id | |
account_charts_data['sale_tax'] = tmpl_sale_tax_id | |
account_charts_data['purchase_tax'] = tmpl_purchase_tax_id | |
#model should be 'account.tax.code.template' | |
#print tmpl_sale_tax_id | |
#print tmpl_purchase_tax_id | |
#print tmpl_account_id | |
#print account_charts_data | |
account_chart_installer_osv=company_connection.get_model('wizard.multi.charts.accounts') | |
account_charts_installer_ids=account_chart_installer_osv.search([('company_id','=',account_charts_data['company_id'])]) | |
if account_charts_installer_ids: | |
account_chart_installer_id=account_charts_installer_ids[0] | |
else: | |
account_chart_installer_id=account_chart_installer_osv.create(account_charts_data) | |
account_chart_installer_osv.execute([account_chart_installer_id]) | |
#account_charts=connection.get_model('wizard.multi.charts.accounts').create(account_charts_data) | |
# el_GR | |
# de_DE | |
def import_product_translations(lang_key): | |
productReader = csv.DictReader(open('/path/to/someproduct-translation.csv', 'rb'), delimiter=',', quotechar='"') | |
# for some reasons we have/use some weird! whitespace characters (char(20) and char(30)?) that require the unicode flag for matching | |
#p = re.compile('([0-9]*)(?:\s)([0-9]*)',re.UNICODE) | |
for row in productReader: | |
#product_ref = ''.join(p.search(row['id']).groups(0)) | |
product_ref = row['id'] | |
#print product_ref | |
product_template_ref = 'product_product_' + product_ref + '_product_template' | |
#print product_template_ref | |
try: | |
model, product_template_id = connection.get_model('ir.model.data').get_object_reference('profile_xyz', product_template_ref) | |
except Fault as ex: | |
print ex.faultCode | |
continue | |
product_template = connection.get_model('product.template').read([product_template_id]) | |
for i, field_name in enumerate(['name','description_sale']): | |
#print ', '.join(row) | |
#print field_name | |
ir_translation = { | |
'lang': lang_key, | |
'src': product_template[0][field_name], | |
'value': row[field_name], | |
'name': 'product.template,' + field_name, | |
'res_id': product_template_id, | |
'type': 'model' | |
} | |
#print ir_translation | |
translation_id = connection.get_model('ir.translation').create(ir_translation) | |
pass | |
pass | |
def import_address(): | |
print "start address reader" | |
addressReader = csv.DictReader(open('/path/to/some/address.csv', 'rb'), delimiter=',', quotechar='"') | |
field_map = [ | |
('Tab_Adressen.ID', 'id') | |
] | |
import_data = [] | |
import_fields = ['name','comment'] | |
print field_map | |
for row in addressReader: | |
address_id = row['Tab_Adressen.ID'] | |
address_ref = 'address_' + address_id | |
#print address_ref | |
import_data.append(['xyz','nuden']) | |
#print import_data | |
print "start import" | |
import_result = connection.get_model('res.partner').import_data(import_fields, import_data, current_module='profile_xyz', filename='/tmp/foo.csv') | |
print import_result | |
pass | |
if __name__ == "__main__": | |
''' | |
you need to have a module by the name 'profile_xyz' in place | |
this module should contain some import data (.xml) with basic data | |
for our project that is two companies and a couple of users as well as product data | |
keep in mind this is not a generic solution but a work in progress for a project we work on | |
you will need to uncomment any of the lines in the try block below to really make some things happening apart from a database being created. ;) | |
''' | |
parser = argparse.ArgumentParser(description='helper to initialize OpenERP projects') | |
parser.add_argument('database_name',help='name of the database to create') | |
args = parser.parse_args() | |
db_name=args.database_name | |
create_database() | |
connection=get_connection() | |
try: | |
#install_update_module('base') | |
#install_update_module('profile_xyz') | |
#install_update_module('profile_xyz', True) #todo perm on Personal/Mitarbeiter Gruppe | |
#hotfix_irrule() | |
#install_languages() | |
# periods and accounts germany | |
#create_periods(['profile_xyz','german_company'], 'l10n_de', 'admin_de', 'admin_de') | |
#create_account_charts(['profile_xyz','german_company'], 'admin_de', 'admin_de') | |
# periods and accounts greek | |
#create_periods(['profile_xyz','greek_company'], 'l10n_gr', 'admin_gr', 'admin_gr') | |
#create_account_charts(['profile_xyz','greek_company'], 'admin_gr', 'admin_gr') | |
#import_product_translations('el_GR') | |
#import_address() | |
pass | |
except Fault as ex: | |
exc_type, exc_value, exc_traceback = sys.exc_info() | |
print u'Fehler: "{0}"'.format(ex.faultCode) | |
traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) | |
except Exception as ex: | |
exc_type, exc_value, exc_traceback = sys.exc_info() | |
print u'Fehler: "{0}"'.format(ex) | |
traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) | |
finally: | |
pass | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment