Skip to content

Instantly share code, notes, and snippets.

@dskarataev
Created January 12, 2015 14:37
Show Gist options
  • Save dskarataev/1a68c0b75a8625bba638 to your computer and use it in GitHub Desktop.
Save dskarataev/1a68c0b75a8625bba638 to your computer and use it in GitHub Desktop.
files management
#coding: utf-8
import os
from openerp import SUPERUSER_ID
from openerp.osv import osv, fields
from openerp.tools.translate import _
import re
def utf8(value):
if isinstance(value, (str, unicode, basestring)):
return value and str(value.encode('utf-8')) or ""
return str(value)
def norm_filename(filename):
if not filename:
filename = ''
return filename.replace('/', '').replace('\\', '').replace('*', '')
def norm_path(path):
"""This function converts many different kinds of path to the format path/to/folder/file.ext,
without slashes in begin and end of the path. Slashes can be added manually
"""
if not path:
path = ''
path = path.replace('\\', '/').replace('*', '')
old_path = ''
while old_path != path:
old_path = path
path = path.replace('../', '/').replace('./', '/')
path_parts = [x for x in path.split('/') if x]
if path_parts:
path = os.path.join(*path_parts)
else:
path = ''
return path
def create_path(*args):
"""This function concatenates several parts to the main path.
If parameter is tuple, second element says should we norm_path for this element or no.
"""
norm_part_list = []
for part in args:
if isinstance(part, tuple) and len(part) > 1 and part[1] is False:
# in case if it is (x, False) we don't apply norm_path() function to this part of path
norm_part_list.append(part[0])
elif part:
norm_part_list.append(norm_path(part))
return os.path.join('/', *norm_part_list)
class IrConfigParameter(osv.osv):
_inherit = "ir.config_parameter"
def get_storage_path(self, cr, uid):
storage_path = self.get_param(cr, uid, 'files.filestorage', default=False)
if not storage_path:
raise osv.except_osv(_("Warning"),
_("You must define 'files.filestorage' parameter in system configuration!"))
# here we normalize and recreate full path because user can fill it in wrong format
storage_path = create_path(storage_path)
if not os.path.exists(storage_path) or not os.path.isdir(storage_path):
raise osv.except_osv(_("Warning"),
_("Storage path that you filled in system configuration does not exist!"))
return storage_path
def get_log_dir(self, cr, uid):
log_dir = self.get_param(cr, uid, 'files.logdir', default=False)
if not log_dir:
raise osv.except_osv(_("Warning"),
_("You must define 'files.logdir' parameter in system configuration!"))
log_dir = create_path(log_dir)
if not os.path.exists(log_dir) or not os.path.isdir(log_dir):
raise osv.except_osv(_("Warning"),
_("log directory path that you filled in system configuration does not exist!"))
return log_dir
class FileType(osv.osv):
_name = "file.type"
_description = "File Type"
@staticmethod
def norm_folder_if_exists(vals):
if 'folder' in vals:
vals['folder'] = norm_path(vals['folder'])
return vals
def create(self, cr, uid, vals, context=None):
vals = self.norm_folder_if_exists(vals)
return super(FileType, self).create(cr, uid, vals, context=context)
def write(self, cr, uid, ids, vals, context=None):
vals = self.norm_folder_if_exists(vals)
return super(FileType, self).write(cr, uid, ids, vals, context=context)
def _check_not_root_folder(self, cr, uid, ids, context=None):
for file_type in self.browse(cr, uid, ids, context=context):
if not file_type.folder:
return False
return True
_columns = {
'name': fields.char("Name", required=True),
'folder': fields.char("Folder", required=True),
}
_constraints = [
(_check_not_root_folder, "You can't use root folder as Type folder. Please use one of subfolders.",
['folder']),
]
_sql_constraints = [
('name_uniq', 'unique(name)', 'File type name must be unique!'),
('folder_uniq', 'unique(folder)', 'Folder path must be unique!'),
]
class FileFieldSetting(osv.osv):
_name = "file.field.setting"
_description = "Fields Settings"
_columns = {
'model_id': fields.many2one('ir.model', string="Model", required=True, ondelete='restrict'),
'field_id': fields.many2one('ir.model.fields', string="Field", required=True, ondelete='restrict'),
}
def get_model_name_field_name(self, cr, uid, model_id, context=None):
# 1. trying to find user setting for this model
setting_ids = self.search(cr, uid, [('model_id', '=', model_id)], context=context)
if setting_ids:
assert len(setting_ids) == 1
setting = self.browse(cr, uid, setting_ids[0], context=context)
return setting.model_id.model, (setting.field_id.name, setting.field_id.field_description)
# 2. checking is this model exist and if yes, checking is the field "code" with type char in this model exist
model_obj = self.pool.get('ir.model')
model_ids = model_obj.search(cr, uid, [('id', '=', model_id)], context=context)
if model_ids:
assert len(model_ids) == 1
model = model_obj.browse(cr, uid, model_ids[0], context=context)
model_fields = self.pool.get(model.model).fields_get(cr, uid, [], context=context)
if 'code' in model_fields and model_fields['code']['type'] == 'char':
return model.model, ('code', model_fields['code']['string'])
# 3. if nothing was found we return False, it means that we will not put any information about default field
return model.model, False
raise osv.except_osv(_('Wrong Model'), _("Wrong model_id is passed to context!"))
@staticmethod
def on_change_model_id(cr, uid, ids, context=None):
return {'value': {'field_id': False}}
def _check_field_belongs_to_model(self, cr, uid, ids, context=None):
for setting in self.browse(cr, uid, ids, context=context):
if setting.field_id.model_id != setting.model_id:
return False
return True
_constraints = [
(_check_field_belongs_to_model, 'Field must belong to the selected model!', ['field_id', 'model_id']),
]
_sql_constraints = [
('model_id_uniq', 'unique(model_id)', 'For every model you can specify only one field!'),
# actually it's unused situation because we already defined in _constraints that field must belong to the
# selected model, it means that we shouldn't have the situation when we have more than one same field_id,
# because also we defined in _sql_constraints that model_id must be unique. but just in case I add it here.
('field_id_uniq', 'unique(field_id)',
'Every field can be used only once! If you see this error, please contact developers, something is wrong.'),
]
class FileFile(osv.osv):
_name = "file.file"
_description = "File"
def warning_file_copies(self, cr, uid, type_id, file_path, context=None):
warning_message = False
file_copy_ids = self.search(cr, uid, [('file_path', '=', file_path), ('type_id', '=', type_id)],
context=context)
warning_list = []
for file_copy in self.browse(cr, uid, file_copy_ids, context=context):
model_obj = self.pool.get(file_copy.model_id.model)
res_name = '%s %s' % (file_copy.model_id.name, model_obj.name_get(cr, uid, file_copy.res_id,
context=context)[0][1])
warning_list.append(res_name)
if warning_list:
warning_message = _('''File or Automatic Rule with same path
already exists and is attached to the %s''') % ', '.join(warning_list)
return warning_message
def create(self, cr, uid, vals, context=None):
if context is None:
context = {}
action_type = context.get('action_type')
if not action_type:
raise osv.except_osv(_('Warning'), _("""Action type in context is expected. Without action type we can't
save file. Please contact developers, something is wrong."""))
del context['action_type']
dummy, file_path = self.get_type_file_abs_path(cr, uid, vals['is_auto'], vals['type_id'],
vals['file_path'], context=context)
if action_type == 'upload_new' and os.path.exists(file_path):
raise osv.except_osv(_('Warning'), _("""File on this path already exists. Change file name and try again or
attach already existed file instead of uploading new one."""))
elif action_type == 'attach_existed_manual' and (not os.path.exists(file_path) or not os.path.isfile(file_path)):
raise osv.except_osv(_('Warning'),
_('File on this path does not exist. Please try again with different path.'))
return super(FileFile, self).create(cr, uid, vals, context=context)
def name_get(self, cr, uid, ids, context=None):
res = super(FileFile, self).name_get(cr, uid, ids, context=context)
new_res = []
for name in res:
if self.browse(cr, uid, name[0], context=context).is_auto:
new_name = (name[0], ': '.join((_('Automatic Rule'), name[1])))
new_res.append(new_name)
else:
new_res.append(name)
return new_res
def _display_name(self, cr, uid, ids, name, args, context=None):
return dict(self.name_get(cr, uid, ids, context=context))
def _is_res_exist(self, cr, uid, ids, model_id, res_id, args, context=None):
res = {}
for file in self.browse(cr, uid, ids, context=context):
model_obj = self.pool.get(file.model_id.model)
is_res_exist = model_obj.search(cr, uid, [('id', '=', file.res_id)], context=context)
if is_res_exist:
res[file.id] = True
else:
res[file.id] = False
return res
def _display_res_name(self, cr, uid, ids, model_id, res_id, args, context=None):
res = {}
for file in self.browse(cr, uid, ids, context=context):
model_obj = self.pool.get(file.model_id.model)
if file.is_res_exist:
res[file.id] = model_obj.name_get(cr, uid, file.res_id, context=context)[0][1]
else:
res[file.id] = _('ERROR')
return res
def _invisible_res_id(self, cr, uid, ids, res_id, args, context=None):
is_inv = True
if uid == SUPERUSER_ID:
is_inv = False
return dict([(id, is_inv) for id in ids])
_columns = {
'name': fields.char("Name"),
'display_name': fields.function(_display_name, type='char', string='Name', store=False, readonly=True),
'is_res_exist': fields.function(_is_res_exist, type='boolean', string='Is resource exist', readonly=True),
'display_res_name': fields.function(_display_res_name, type='char', string='Resource Name', store=False,
readonly=True),
'file_name': fields.char("File name", readonly=True),
'file_path': fields.char("File", required=True, readonly=True),
'type_id': fields.many2one('file.type', string="Type", ondelete='restrict', required=True, readonly=True),
'model_id': fields.many2one('ir.model', string="Resource Type", ondelete='restrict', required=True,
readonly=True),
'res_id': fields.integer("Resource ID", required=True, readonly=True),
'invisible_res_id': fields.function(_invisible_res_id, type='boolean', string='Is res_id invisible',
store=False, readonly=True),
'is_auto': fields.boolean("Automatically attach links", readonly=True),
}
_defaults = {
'is_auto': False,
}
_order = 'type_id, model_id, res_id, is_auto, name'
_sql_constraints = [('type_id_model_id_res_id_file_path_uniq',
'unique(type_id, model_id, res_id, file_path)',
'You have already attached file with same path to this resource!')]
def get_type_file_rel_path(self, cr, uid, is_auto, type_id, file_path, context=None):
file_type = self.pool.get('file.type').browse(cr, uid, type_id, context=context)
if file_path and is_auto:
assert isinstance(file_path, (str, unicode))
# because we have to search by all filename, not only by part of the name, we add to the pattern ^ and $
if file_path[0] != '^':
file_path = '^' + file_path
if file_path[-1] != '$':
file_path += '$'
# here we prepare to tell to the function create_path() to don't apply norm_path to the file_path
file_path = (file_path, False)
return file_type.folder, create_path(file_type.folder, file_path)
def get_type_file_abs_path(self, cr, uid, is_auto, type_id, file_path, context=None):
storage_path = self.pool.get('ir.config_parameter').get_storage_path(cr, uid)
type_folder, file_related_path = self.get_type_file_rel_path(cr, uid, is_auto, type_id, file_path,
context=context)
return create_path(storage_path, type_folder), create_path(storage_path, file_related_path)
def attach_file_automatically(self, cr, uid, model_id, res_id, type_id, file_path, file_name, context=None):
# looking for duplicates
ids = self.search(cr, uid, [('model_id', '=', model_id), ('res_id', '=', res_id), ('type_id', '=', type_id),
('file_path', '=', file_path), ('is_auto', '=', False)], context=context)
if not ids:
if context is None:
context = {}
context.update(action_type='attach_existed_manual')
file_id = self.create(cr, uid, {
'name': file_name,
'file_name': file_name,
'file_path': file_path,
'type_id': type_id,
'model_id': model_id,
'res_id': res_id,
'is_auto': False,
}, context=context)
return file_id
return False
def run_auto_attached_file_scheduler(self, cr, uid, context=None):
ids = self.search(cr, uid, [('is_auto', '!=', False)], context=context)
for file in self.browse(cr, uid, ids, context=context):
type_path, dummy = self.get_type_file_abs_path(cr, uid, file.is_auto, file.type_id.id, file.file_path,
context=context)
type_path_length = len(type_path)
# we need len(file.file_path) > 2 because 2 it's just '^$'
if file.file_path and len(file.file_path) > 2 and os.path.exists(type_path) and os.path.isdir(type_path):
for top, dirs, files in os.walk(type_path):
# here we get subdir of type_path
# example: if type_path = '/type/path' and top = '/type/path/subdir' we get 'subdir'
subdir = top[type_path_length + 1:]
for filename in files:
if subdir:
file_path_os = os.path.join(subdir, filename)
else:
file_path_os = filename
pattern = file.file_path
if re.match(pattern, file_path_os):
self.attach_file_automatically(cr, uid, file.model_id.id, file.res_id, file.type_id.id,
file_path_os, filename, context=context)
@staticmethod
def copy(cr, uid, id, default=None, context=None):
raise osv.except_osv(_("Warning"), _("Unable duplicate files"))
def get_files_count(self, cr, uid, model, res_id, context=None):
model_id = self.pool.get('ir.model').search(cr, uid, (('model', '=', model),), context=context)[0]
return self.search_count(cr, uid, [('model_id', '=', model_id), ('res_id', '=', res_id)], context=context)
def list_types(self, cr, uid, model, res_id, context=None):
model_id = self.pool.get('ir.model').search(cr, uid, [('model', '=', model)], context=context)[0]
types = []
file_ids = self.search(cr, uid, [('model_id', '=', model_id), ('res_id', '=', res_id)], context=context)
for file_record in self.browse(cr, uid, file_ids, context=context):
type = (file_record.type_id.id, file_record.type_id.name)
if not type in types:
types.append(type)
return types, model_id
def store_file(self, cr, uid, file_id, file_data, context=None):
file = self.browse(cr, uid, file_id, context=context)
type_path, file_path = self.get_type_file_abs_path(cr, uid, file.is_auto, file.type_id.id, file.file_name,
context=context)
if not os.path.exists(type_path):
try:
os.makedirs(type_path)
except:
storage_path = self.pool.get('ir.config_parameter').get_storage_path(cr, uid)
raise osv.except_osv(_('Warning'),
_('Directory %s cannot be created! Please do it manually and try again or '
'ask your system administrator to set up true access rights for directory %s') %
(type_path, storage_path))
try:
new_file = open(file_path, mode="w")
new_file.write(file_data)
new_file.close()
except:
raise osv.except_osv(_('Warning'), _('Your file cannot be saved. '
'Please ask your system administrator to set up true access rights '
'for directory %s') % type_path)
return True
def get_resource(self, cr, uid, ids, context=None):
record = self.browse(cr, uid, ids[0], context=context)
return {
'type': 'ir.actions.act_window',
'name': record.model_id.name,
'res_model': record.model_id.model,
'res_id': record.res_id,
'view_type': 'form',
'view_mode': 'form',
'target': 'current',
'context': context,
}
def download_file(self, cr, uid, ids, context=None):
new_id = self.pool.get('file.download').create(cr, uid, {'file_id': ids[0]}, context=context)
return {
'type': 'ir.actions.act_window',
'name': 'Download file',
'res_model': 'file.download',
'res_id': new_id,
'view_type': 'form',
'view_mode': 'form',
'target': 'new',
'context': context,
}
def remove_files_with_unexisted_resource(self, cr, uid, context=None):
ids = self.search(cr, uid, [], context=context)
for file_record in self.browse(cr, uid, ids, context=context):
if not file_record.is_res_exist:
print '%s %s' % (file_record.file_path, file_record.display_res_name)
self.unlink(cr, uid, file_record.id, context=context)
return True
def check_integrity(self, cr, uid, context=None):
self.remove_files_with_unexisted_resource(cr, uid, context=context)
log_dir = self.pool.get('ir.config_parameter').get_log_dir(cr, uid)
ids = self.search(cr, uid, [('is_auto', '=', False)], context=context)
curr_date = fields.datetime.now()
report = '''<html><head><meta charset="utf-8"/><title>Files integrity report %s UTC</title>
<style>
table {border-collapse: collapse}
th, td {padding: 5px}
</style>
</head><body><h1>Files integrity report %s UTC</h1>''' % (curr_date, curr_date)
is_any = False
if ids:
sub_report = '''<h3>These files don't exist:</h3><table border="1">
<tr>
<th>File</th>
<th>Document Name</th>
<th>Resource Type</th>
<th>Resource Name</th>
</tr>'''
for file_record in self.browse(cr, uid, ids, context=context):
dummy, file_path = self.get_type_file_abs_path(cr, uid, False, file_record.type_id.id,
file_record.file_path, context=context)
if not os.path.exists(file_path) or not os.path.isfile(file_path):
is_any = True
sub_report += '''<tr><td>%s</td><td>%s</td>
<td>%s</td><td>%s</td></tr>''' % (utf8(file_path),
utf8(file_record.name),
utf8(file_record.model_id.name),
utf8(file_record.display_res_name))
sub_report += '</table>'
if is_any:
report += sub_report
else:
report += 'No errors are found. Have a nice day!'
report += '</body></html>'
report_file = file(os.path.join(log_dir, 'files_integrity_report.html'), mode="w")
report_file.write(report)
report_file.close()
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment