Created
January 12, 2015 14:37
-
-
Save dskarataev/1a68c0b75a8625bba638 to your computer and use it in GitHub Desktop.
files management
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding: utf-8 | |
import os | |
from openerp import SUPERUSER_ID | |
from openerp.osv import osv, fields | |
from openerp.tools.translate import _ | |
import re | |
def utf8(value): | |
if isinstance(value, (str, unicode, basestring)): | |
return value and str(value.encode('utf-8')) or "" | |
return str(value) | |
def norm_filename(filename): | |
if not filename: | |
filename = '' | |
return filename.replace('/', '').replace('\\', '').replace('*', '') | |
def norm_path(path): | |
"""This function converts many different kinds of path to the format path/to/folder/file.ext, | |
without slashes in begin and end of the path. Slashes can be added manually | |
""" | |
if not path: | |
path = '' | |
path = path.replace('\\', '/').replace('*', '') | |
old_path = '' | |
while old_path != path: | |
old_path = path | |
path = path.replace('../', '/').replace('./', '/') | |
path_parts = [x for x in path.split('/') if x] | |
if path_parts: | |
path = os.path.join(*path_parts) | |
else: | |
path = '' | |
return path | |
def create_path(*args): | |
"""This function concatenates several parts to the main path. | |
If parameter is tuple, second element says should we norm_path for this element or no. | |
""" | |
norm_part_list = [] | |
for part in args: | |
if isinstance(part, tuple) and len(part) > 1 and part[1] is False: | |
# in case if it is (x, False) we don't apply norm_path() function to this part of path | |
norm_part_list.append(part[0]) | |
elif part: | |
norm_part_list.append(norm_path(part)) | |
return os.path.join('/', *norm_part_list) | |
class IrConfigParameter(osv.osv): | |
_inherit = "ir.config_parameter" | |
def get_storage_path(self, cr, uid): | |
storage_path = self.get_param(cr, uid, 'files.filestorage', default=False) | |
if not storage_path: | |
raise osv.except_osv(_("Warning"), | |
_("You must define 'files.filestorage' parameter in system configuration!")) | |
# here we normalize and recreate full path because user can fill it in wrong format | |
storage_path = create_path(storage_path) | |
if not os.path.exists(storage_path) or not os.path.isdir(storage_path): | |
raise osv.except_osv(_("Warning"), | |
_("Storage path that you filled in system configuration does not exist!")) | |
return storage_path | |
def get_log_dir(self, cr, uid): | |
log_dir = self.get_param(cr, uid, 'files.logdir', default=False) | |
if not log_dir: | |
raise osv.except_osv(_("Warning"), | |
_("You must define 'files.logdir' parameter in system configuration!")) | |
log_dir = create_path(log_dir) | |
if not os.path.exists(log_dir) or not os.path.isdir(log_dir): | |
raise osv.except_osv(_("Warning"), | |
_("log directory path that you filled in system configuration does not exist!")) | |
return log_dir | |
class FileType(osv.osv): | |
_name = "file.type" | |
_description = "File Type" | |
@staticmethod | |
def norm_folder_if_exists(vals): | |
if 'folder' in vals: | |
vals['folder'] = norm_path(vals['folder']) | |
return vals | |
def create(self, cr, uid, vals, context=None): | |
vals = self.norm_folder_if_exists(vals) | |
return super(FileType, self).create(cr, uid, vals, context=context) | |
def write(self, cr, uid, ids, vals, context=None): | |
vals = self.norm_folder_if_exists(vals) | |
return super(FileType, self).write(cr, uid, ids, vals, context=context) | |
def _check_not_root_folder(self, cr, uid, ids, context=None): | |
for file_type in self.browse(cr, uid, ids, context=context): | |
if not file_type.folder: | |
return False | |
return True | |
_columns = { | |
'name': fields.char("Name", required=True), | |
'folder': fields.char("Folder", required=True), | |
} | |
_constraints = [ | |
(_check_not_root_folder, "You can't use root folder as Type folder. Please use one of subfolders.", | |
['folder']), | |
] | |
_sql_constraints = [ | |
('name_uniq', 'unique(name)', 'File type name must be unique!'), | |
('folder_uniq', 'unique(folder)', 'Folder path must be unique!'), | |
] | |
class FileFieldSetting(osv.osv): | |
_name = "file.field.setting" | |
_description = "Fields Settings" | |
_columns = { | |
'model_id': fields.many2one('ir.model', string="Model", required=True, ondelete='restrict'), | |
'field_id': fields.many2one('ir.model.fields', string="Field", required=True, ondelete='restrict'), | |
} | |
def get_model_name_field_name(self, cr, uid, model_id, context=None): | |
# 1. trying to find user setting for this model | |
setting_ids = self.search(cr, uid, [('model_id', '=', model_id)], context=context) | |
if setting_ids: | |
assert len(setting_ids) == 1 | |
setting = self.browse(cr, uid, setting_ids[0], context=context) | |
return setting.model_id.model, (setting.field_id.name, setting.field_id.field_description) | |
# 2. checking is this model exist and if yes, checking is the field "code" with type char in this model exist | |
model_obj = self.pool.get('ir.model') | |
model_ids = model_obj.search(cr, uid, [('id', '=', model_id)], context=context) | |
if model_ids: | |
assert len(model_ids) == 1 | |
model = model_obj.browse(cr, uid, model_ids[0], context=context) | |
model_fields = self.pool.get(model.model).fields_get(cr, uid, [], context=context) | |
if 'code' in model_fields and model_fields['code']['type'] == 'char': | |
return model.model, ('code', model_fields['code']['string']) | |
# 3. if nothing was found we return False, it means that we will not put any information about default field | |
return model.model, False | |
raise osv.except_osv(_('Wrong Model'), _("Wrong model_id is passed to context!")) | |
@staticmethod | |
def on_change_model_id(cr, uid, ids, context=None): | |
return {'value': {'field_id': False}} | |
def _check_field_belongs_to_model(self, cr, uid, ids, context=None): | |
for setting in self.browse(cr, uid, ids, context=context): | |
if setting.field_id.model_id != setting.model_id: | |
return False | |
return True | |
_constraints = [ | |
(_check_field_belongs_to_model, 'Field must belong to the selected model!', ['field_id', 'model_id']), | |
] | |
_sql_constraints = [ | |
('model_id_uniq', 'unique(model_id)', 'For every model you can specify only one field!'), | |
# actually it's unused situation because we already defined in _constraints that field must belong to the | |
# selected model, it means that we shouldn't have the situation when we have more than one same field_id, | |
# because also we defined in _sql_constraints that model_id must be unique. but just in case I add it here. | |
('field_id_uniq', 'unique(field_id)', | |
'Every field can be used only once! If you see this error, please contact developers, something is wrong.'), | |
] | |
class FileFile(osv.osv): | |
_name = "file.file" | |
_description = "File" | |
def warning_file_copies(self, cr, uid, type_id, file_path, context=None): | |
warning_message = False | |
file_copy_ids = self.search(cr, uid, [('file_path', '=', file_path), ('type_id', '=', type_id)], | |
context=context) | |
warning_list = [] | |
for file_copy in self.browse(cr, uid, file_copy_ids, context=context): | |
model_obj = self.pool.get(file_copy.model_id.model) | |
res_name = '%s %s' % (file_copy.model_id.name, model_obj.name_get(cr, uid, file_copy.res_id, | |
context=context)[0][1]) | |
warning_list.append(res_name) | |
if warning_list: | |
warning_message = _('''File or Automatic Rule with same path | |
already exists and is attached to the %s''') % ', '.join(warning_list) | |
return warning_message | |
def create(self, cr, uid, vals, context=None): | |
if context is None: | |
context = {} | |
action_type = context.get('action_type') | |
if not action_type: | |
raise osv.except_osv(_('Warning'), _("""Action type in context is expected. Without action type we can't | |
save file. Please contact developers, something is wrong.""")) | |
del context['action_type'] | |
dummy, file_path = self.get_type_file_abs_path(cr, uid, vals['is_auto'], vals['type_id'], | |
vals['file_path'], context=context) | |
if action_type == 'upload_new' and os.path.exists(file_path): | |
raise osv.except_osv(_('Warning'), _("""File on this path already exists. Change file name and try again or | |
attach already existed file instead of uploading new one.""")) | |
elif action_type == 'attach_existed_manual' and (not os.path.exists(file_path) or not os.path.isfile(file_path)): | |
raise osv.except_osv(_('Warning'), | |
_('File on this path does not exist. Please try again with different path.')) | |
return super(FileFile, self).create(cr, uid, vals, context=context) | |
def name_get(self, cr, uid, ids, context=None): | |
res = super(FileFile, self).name_get(cr, uid, ids, context=context) | |
new_res = [] | |
for name in res: | |
if self.browse(cr, uid, name[0], context=context).is_auto: | |
new_name = (name[0], ': '.join((_('Automatic Rule'), name[1]))) | |
new_res.append(new_name) | |
else: | |
new_res.append(name) | |
return new_res | |
def _display_name(self, cr, uid, ids, name, args, context=None): | |
return dict(self.name_get(cr, uid, ids, context=context)) | |
def _is_res_exist(self, cr, uid, ids, model_id, res_id, args, context=None): | |
res = {} | |
for file in self.browse(cr, uid, ids, context=context): | |
model_obj = self.pool.get(file.model_id.model) | |
is_res_exist = model_obj.search(cr, uid, [('id', '=', file.res_id)], context=context) | |
if is_res_exist: | |
res[file.id] = True | |
else: | |
res[file.id] = False | |
return res | |
def _display_res_name(self, cr, uid, ids, model_id, res_id, args, context=None): | |
res = {} | |
for file in self.browse(cr, uid, ids, context=context): | |
model_obj = self.pool.get(file.model_id.model) | |
if file.is_res_exist: | |
res[file.id] = model_obj.name_get(cr, uid, file.res_id, context=context)[0][1] | |
else: | |
res[file.id] = _('ERROR') | |
return res | |
def _invisible_res_id(self, cr, uid, ids, res_id, args, context=None): | |
is_inv = True | |
if uid == SUPERUSER_ID: | |
is_inv = False | |
return dict([(id, is_inv) for id in ids]) | |
_columns = { | |
'name': fields.char("Name"), | |
'display_name': fields.function(_display_name, type='char', string='Name', store=False, readonly=True), | |
'is_res_exist': fields.function(_is_res_exist, type='boolean', string='Is resource exist', readonly=True), | |
'display_res_name': fields.function(_display_res_name, type='char', string='Resource Name', store=False, | |
readonly=True), | |
'file_name': fields.char("File name", readonly=True), | |
'file_path': fields.char("File", required=True, readonly=True), | |
'type_id': fields.many2one('file.type', string="Type", ondelete='restrict', required=True, readonly=True), | |
'model_id': fields.many2one('ir.model', string="Resource Type", ondelete='restrict', required=True, | |
readonly=True), | |
'res_id': fields.integer("Resource ID", required=True, readonly=True), | |
'invisible_res_id': fields.function(_invisible_res_id, type='boolean', string='Is res_id invisible', | |
store=False, readonly=True), | |
'is_auto': fields.boolean("Automatically attach links", readonly=True), | |
} | |
_defaults = { | |
'is_auto': False, | |
} | |
_order = 'type_id, model_id, res_id, is_auto, name' | |
_sql_constraints = [('type_id_model_id_res_id_file_path_uniq', | |
'unique(type_id, model_id, res_id, file_path)', | |
'You have already attached file with same path to this resource!')] | |
def get_type_file_rel_path(self, cr, uid, is_auto, type_id, file_path, context=None): | |
file_type = self.pool.get('file.type').browse(cr, uid, type_id, context=context) | |
if file_path and is_auto: | |
assert isinstance(file_path, (str, unicode)) | |
# because we have to search by all filename, not only by part of the name, we add to the pattern ^ and $ | |
if file_path[0] != '^': | |
file_path = '^' + file_path | |
if file_path[-1] != '$': | |
file_path += '$' | |
# here we prepare to tell to the function create_path() to don't apply norm_path to the file_path | |
file_path = (file_path, False) | |
return file_type.folder, create_path(file_type.folder, file_path) | |
def get_type_file_abs_path(self, cr, uid, is_auto, type_id, file_path, context=None): | |
storage_path = self.pool.get('ir.config_parameter').get_storage_path(cr, uid) | |
type_folder, file_related_path = self.get_type_file_rel_path(cr, uid, is_auto, type_id, file_path, | |
context=context) | |
return create_path(storage_path, type_folder), create_path(storage_path, file_related_path) | |
def attach_file_automatically(self, cr, uid, model_id, res_id, type_id, file_path, file_name, context=None): | |
# looking for duplicates | |
ids = self.search(cr, uid, [('model_id', '=', model_id), ('res_id', '=', res_id), ('type_id', '=', type_id), | |
('file_path', '=', file_path), ('is_auto', '=', False)], context=context) | |
if not ids: | |
if context is None: | |
context = {} | |
context.update(action_type='attach_existed_manual') | |
file_id = self.create(cr, uid, { | |
'name': file_name, | |
'file_name': file_name, | |
'file_path': file_path, | |
'type_id': type_id, | |
'model_id': model_id, | |
'res_id': res_id, | |
'is_auto': False, | |
}, context=context) | |
return file_id | |
return False | |
def run_auto_attached_file_scheduler(self, cr, uid, context=None): | |
ids = self.search(cr, uid, [('is_auto', '!=', False)], context=context) | |
for file in self.browse(cr, uid, ids, context=context): | |
type_path, dummy = self.get_type_file_abs_path(cr, uid, file.is_auto, file.type_id.id, file.file_path, | |
context=context) | |
type_path_length = len(type_path) | |
# we need len(file.file_path) > 2 because 2 it's just '^$' | |
if file.file_path and len(file.file_path) > 2 and os.path.exists(type_path) and os.path.isdir(type_path): | |
for top, dirs, files in os.walk(type_path): | |
# here we get subdir of type_path | |
# example: if type_path = '/type/path' and top = '/type/path/subdir' we get 'subdir' | |
subdir = top[type_path_length + 1:] | |
for filename in files: | |
if subdir: | |
file_path_os = os.path.join(subdir, filename) | |
else: | |
file_path_os = filename | |
pattern = file.file_path | |
if re.match(pattern, file_path_os): | |
self.attach_file_automatically(cr, uid, file.model_id.id, file.res_id, file.type_id.id, | |
file_path_os, filename, context=context) | |
@staticmethod | |
def copy(cr, uid, id, default=None, context=None): | |
raise osv.except_osv(_("Warning"), _("Unable duplicate files")) | |
def get_files_count(self, cr, uid, model, res_id, context=None): | |
model_id = self.pool.get('ir.model').search(cr, uid, (('model', '=', model),), context=context)[0] | |
return self.search_count(cr, uid, [('model_id', '=', model_id), ('res_id', '=', res_id)], context=context) | |
def list_types(self, cr, uid, model, res_id, context=None): | |
model_id = self.pool.get('ir.model').search(cr, uid, [('model', '=', model)], context=context)[0] | |
types = [] | |
file_ids = self.search(cr, uid, [('model_id', '=', model_id), ('res_id', '=', res_id)], context=context) | |
for file_record in self.browse(cr, uid, file_ids, context=context): | |
type = (file_record.type_id.id, file_record.type_id.name) | |
if not type in types: | |
types.append(type) | |
return types, model_id | |
def store_file(self, cr, uid, file_id, file_data, context=None): | |
file = self.browse(cr, uid, file_id, context=context) | |
type_path, file_path = self.get_type_file_abs_path(cr, uid, file.is_auto, file.type_id.id, file.file_name, | |
context=context) | |
if not os.path.exists(type_path): | |
try: | |
os.makedirs(type_path) | |
except: | |
storage_path = self.pool.get('ir.config_parameter').get_storage_path(cr, uid) | |
raise osv.except_osv(_('Warning'), | |
_('Directory %s cannot be created! Please do it manually and try again or ' | |
'ask your system administrator to set up true access rights for directory %s') % | |
(type_path, storage_path)) | |
try: | |
new_file = open(file_path, mode="w") | |
new_file.write(file_data) | |
new_file.close() | |
except: | |
raise osv.except_osv(_('Warning'), _('Your file cannot be saved. ' | |
'Please ask your system administrator to set up true access rights ' | |
'for directory %s') % type_path) | |
return True | |
def get_resource(self, cr, uid, ids, context=None): | |
record = self.browse(cr, uid, ids[0], context=context) | |
return { | |
'type': 'ir.actions.act_window', | |
'name': record.model_id.name, | |
'res_model': record.model_id.model, | |
'res_id': record.res_id, | |
'view_type': 'form', | |
'view_mode': 'form', | |
'target': 'current', | |
'context': context, | |
} | |
def download_file(self, cr, uid, ids, context=None): | |
new_id = self.pool.get('file.download').create(cr, uid, {'file_id': ids[0]}, context=context) | |
return { | |
'type': 'ir.actions.act_window', | |
'name': 'Download file', | |
'res_model': 'file.download', | |
'res_id': new_id, | |
'view_type': 'form', | |
'view_mode': 'form', | |
'target': 'new', | |
'context': context, | |
} | |
def remove_files_with_unexisted_resource(self, cr, uid, context=None): | |
ids = self.search(cr, uid, [], context=context) | |
for file_record in self.browse(cr, uid, ids, context=context): | |
if not file_record.is_res_exist: | |
print '%s %s' % (file_record.file_path, file_record.display_res_name) | |
self.unlink(cr, uid, file_record.id, context=context) | |
return True | |
def check_integrity(self, cr, uid, context=None): | |
self.remove_files_with_unexisted_resource(cr, uid, context=context) | |
log_dir = self.pool.get('ir.config_parameter').get_log_dir(cr, uid) | |
ids = self.search(cr, uid, [('is_auto', '=', False)], context=context) | |
curr_date = fields.datetime.now() | |
report = '''<html><head><meta charset="utf-8"/><title>Files integrity report %s UTC</title> | |
<style> | |
table {border-collapse: collapse} | |
th, td {padding: 5px} | |
</style> | |
</head><body><h1>Files integrity report %s UTC</h1>''' % (curr_date, curr_date) | |
is_any = False | |
if ids: | |
sub_report = '''<h3>These files don't exist:</h3><table border="1"> | |
<tr> | |
<th>File</th> | |
<th>Document Name</th> | |
<th>Resource Type</th> | |
<th>Resource Name</th> | |
</tr>''' | |
for file_record in self.browse(cr, uid, ids, context=context): | |
dummy, file_path = self.get_type_file_abs_path(cr, uid, False, file_record.type_id.id, | |
file_record.file_path, context=context) | |
if not os.path.exists(file_path) or not os.path.isfile(file_path): | |
is_any = True | |
sub_report += '''<tr><td>%s</td><td>%s</td> | |
<td>%s</td><td>%s</td></tr>''' % (utf8(file_path), | |
utf8(file_record.name), | |
utf8(file_record.model_id.name), | |
utf8(file_record.display_res_name)) | |
sub_report += '</table>' | |
if is_any: | |
report += sub_report | |
else: | |
report += 'No errors are found. Have a nice day!' | |
report += '</body></html>' | |
report_file = file(os.path.join(log_dir, 'files_integrity_report.html'), mode="w") | |
report_file.write(report) | |
report_file.close() | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment