odex25_standard/odex25_dms/dms/models/document.py

1055 lines
44 KiB
Python

# -*- coding: utf-8 -*-
import base64
import hashlib
import json
import logging
import os
from odoo import models, fields, api, _, tools
from odoo.exceptions import UserError
from odoo.osv import expression
from odoo.tools import image_process
from odoo.exceptions import ValidationError
from odoo.exceptions import UserError
from ast import literal_eval
from dateutil.relativedelta import relativedelta
from collections import OrderedDict, defaultdict
import re
class Document(models.Model):
_name = 'documents.document'
_description = 'Document'
_inherit = [
'mail.thread.cc',
"portal.mixin",
"dms.security.mixin",
"dms.mixins.thumbnail",
"mail.thread",
"mail.activity.mixin",
"abstract.dms.mixin",
]
_order = 'id desc'
# Attachment
attachment_id = fields.Many2one('ir.attachment', ondelete='cascade', auto_join=True, copy=False)
attach_id = fields.Many2one(
comodel_name='dms.file.attach',
# related='directory_id.project_id',
string='',
required=False)
attachment_name = fields.Char('Attachment Name', related='attachment_id.name', readonly=False)
attachment_type = fields.Selection(string='Attachment Type', related='attachment_id.type', readonly=False)
datas = fields.Binary(related='attachment_id.datas', related_sudo=True, readonly=False)
file_size = fields.Integer(related='attachment_id.file_size', store=True)
checksum = fields.Char(related='attachment_id.checksum')
mimetype = fields.Char(related='attachment_id.mimetype', default='application/octet-stream')
res_model = fields.Char('Resource Model', compute="_compute_res_record", inverse="_inverse_res_model", store=True)
res_id = fields.Integer('Resource ID', compute="_compute_res_record", inverse="_inverse_res_id", store=True)
res_name = fields.Char('Resource Name', related='attachment_id.res_name')
index_content = fields.Text(related='attachment_id.index_content')
description = fields.Text('Attachment Description', related='attachment_id.description', readonly=False)
# Versioning
previous_attachment_ids = fields.Many2many('ir.attachment', string="History")
# Document
name = fields.Char('Name', copy=True, store=True, compute='_compute_name', inverse='_inverse_name')
file_code = fields.Char(
string="File Code",
required=False)
version = fields.Char(
string="File version",
required=False)
active = fields.Boolean(default=True, string="Active")
thumbnail = fields.Binary(readonly=1, store=True, attachment=True, compute='_compute_thumbnail')
url = fields.Char('URL', index=True, size=1024, tracking=True)
res_model_name = fields.Char(compute='_compute_res_model_name', index=True)
type = fields.Selection([('url', 'URL'), ('binary', 'File'), ('empty', 'Request')],
string='Type', required=True, store=True, default='empty', change_default=True,
compute='_compute_type')
project_id = fields.Many2one(
'project.project',
compute='compute_project_id',
string='',
store=True,
required=False)
def compute_project_id(self):
for rec in self:
rec.project_id = rec.folder_id.project_id.id
def action_save_onboarding_file_step(self):
self.env.user.company_id.set_onboarding_step_done(
"documents_onboarding_file_state"
)
@api.model
def _get_binary_max_size(self):
return int(
self.env["ir.config_parameter"]
.sudo()
.get_param("dms.binary_max_size", default=25)
)
# content_text = fields.Text(compute='_get_text_content', search='_search_content_text')
content_text = fields.Text()
key_words = fields.Char(
string='',
required=False)
reference = fields.Reference(
selection=[('muk_dms.data', _('Data'))],
string="Data Reference",
readonly=True)
def _get_text_content(self):
try:
for rec in self:
if rec.reference and rec.reference._name == 'muk_dms.data_system':
rec.content_text = rec.reference.content_text
except:
pass
def _search_content_text(self, operator, value):
dms_values = self.env['documents.document'].search([])
value_found = []
for rec in dms_values:
try:
if rec.content_text:
if value.lower() in str(rec.content_text).lower():
value_found.append(rec.id)
except:
pass
# else:
# pass
# files = self.sudo().search([]).filtered(lambda aal: aal.reference.id in value_found)
return [('id', 'in', value_found)]
def _search_image_text(self, operator, value):
dms_values = self.env['documents.document'].search([])
value_found = []
for rec in dms_values:
try:
if rec.image_text:
if value.lower() in str(rec.image_text).lower():
value_found.append(rec.id)
except:
pass
return [('id', 'in', value_found)]
def _check_extension(self, extension):
if (
extension in self._get_forbidden_extensions()
):
raise UserError(_("Only admins can upload SVG files."))
# raise ValidationError(_("The file has a forbidden file extension."))
@api.model
def _get_forbidden_extensions(self):
get_param = self.env["ir.config_parameter"].sudo().get_param
extensions = get_param("dms.forbidden_extensions", default="")
return [extension.strip() for extension in extensions.split(",")]
@api.model
def _get_binary_max_size(self):
return int(
self.env["ir.config_parameter"]
.sudo()
.get_param("dms.binary_max_size", default=25)
)
@api.constrains("file_size")
def _check_size(self):
for record in self:
if record.file_size > self._get_binary_max_size() * 1024 * 1024:
raise ValidationError(_("The maximum upload size is %s MB).") % self._get_binary_max_size())
favorited_ids = fields.Many2many('res.users', string="Favorite of")
is_favorited = fields.Boolean(compute='_compute_is_favorited')
tag_ids = fields.Many2many('documents.tag', 'document_tag_rel', string="Tags")
partner_id = fields.Many2one('res.partner', string="Contact", tracking=True)
owner_id = fields.Many2one('res.users', default=lambda self: self.env.user.id, string="Owner",
tracking=True)
available_rule_ids = fields.Many2many('documents.workflow.rule', compute='_compute_available_rules',
string='Available Rules')
lock_uid = fields.Many2one('res.users', string="Locked by")
is_locked = fields.Boolean(compute="_compute_is_locked", string="Locked")
create_share_id = fields.Many2one('documents.share', help='Share used to create this document')
request_activity_id = fields.Many2one('mail.activity')
# Folder
folder_id = fields.Many2one('dms.directory',
string="Workspace",
ondelete="restrict",
tracking=True,
required=True,
index=True)
size = fields.Integer(string="Size", readonly=True)
perm_download = fields.Boolean(
compute="_compute_permissions_download",
string="Download Access",
)
permission_create = fields.Boolean(
compute="_compute_permissions",
string="Create Access",
)
permission_write = fields.Boolean(
compute="_compute_permissions",
string="Write Access",
)
permission_unlink = fields.Boolean(
compute="_compute_permissions",
string="Delete Access",
)
def _compute_permissions(self):
for rec in self:
folder = rec.folder_id
folder._compute_permissions()
# Get the user ID
user_id = rec.env.uid
# Determine if the current user has write permission in any group
# has_permission_read = any(
# user_id in group.users.ids and group.perm_read
# for group in folder.group_ids
# )
has_permission_create = any(
user_id in group.users.ids and group.perm_create
for group in folder.complete_group_ids
)
has_permission_write = any(
user_id in group.users.ids and group.perm_write
for group in folder.complete_group_ids
)
has_permission_unlink = any(
user_id in group.users.ids and group.perm_unlink
for group in folder.complete_group_ids
)
# Set permissions based on the folder's permissions
rec.permission_read = True
rec.permission_create = has_permission_create
rec.permission_write = has_permission_write
rec.permission_unlink = has_permission_unlink
# Debugging print statement
def _compute_permissions_download(self):
for rec in self:
security = rec.folder_id.complete_group_ids
users = []
for i in security:
if i.perm_download:
for user in i.users:
users.append(user.id)
user_id = rec.env.uid
if user_id in users:
for sec in self:
sec.perm_download = True
if user_id not in users:
for sec in rec:
sec.perm_download = False
checksum = fields.Char(string="Checksum/SHA1", readonly=True, index=True)
content_binary = fields.Binary(
string="Content Binary", attachment=False, prefetch=False, invisible=True
)
# Override acording to defined in AbstractDmsMixin
storage_id = fields.Many2one(
'dms.storage',
# related="folder_id.storage_id",
readonly=True,
store=True,
prefetch=False,
)
@api.model
def _get_checksum(self, binary):
return hashlib.sha1(binary or b"").hexdigest()
@api.model
def _get_content_inital_vals(self):
return {"content_binary": False, "datas": False}
def _update_content_vals(self, vals, binary):
new_vals = vals.copy()
new_vals.update(
{
"checksum": self._get_checksum(binary),
"size": binary and len(binary) or 0,
}
)
if self.storage_id.save_type in ["file", "attachment"]:
new_vals["datas"] = self.content
else:
new_vals["content_binary"] = self.content and binary
return new_vals
@api.depends("content_binary", "datas", "attachment_id")
def _compute_content(self):
bin_size = self.env.context.get("bin_size", False)
for record in self:
if record.datas:
context = {"human_size": True} if bin_size else {"base64": True}
record.content = record.with_context(context).datas
elif record.content_binary:
record.content = (
record.content_binary
if bin_size
else base64.b64encode(record.content_binary)
)
elif record.attachment_id:
context = {"human_size": True} if bin_size else {"base64": True}
record.datas = record.with_context(context).attachment_id.datas
# related='folder_id.company_id',
company_id = fields.Many2one('res.company', string='Company', readonly=True)
category_id = fields.Many2one('dms.category')
# related='folder_id.group_ids'
group_ids = fields.Many2many('res.groups', string="Access Groups", readonly=True,
help="This attachment will only be available for the selected user groups",
)
content = fields.Binary(
# compute="_compute_content",
# inverse="_inverse_content",
string="Content",
attachment=False,
prefetch=False,
required=True,
store=False,
)
_sql_constraints = [
('attachment_unique', 'unique (attachment_id)', "This attachment is already a document"),
('file_code_uniq', 'unique (file_code)', "Code already exists!")
]
@api.depends('category_id')
def _category(self):
for rec in self:
rec.category_id = rec.folder_id.category_id
@api.depends('attachment_id.name')
def _compute_name(self):
for record in self:
if record.attachment_name:
record.name = record.attachment_name
def _inverse_name(self):
for record in self:
if record.attachment_id:
record.attachment_name = record.name
@api.depends('attachment_id', 'attachment_id.res_model', 'attachment_id.res_id')
def _compute_res_record(self):
for record in self:
attachment = record.attachment_id
if attachment:
record.res_model = attachment.res_model
record.res_id = attachment.res_id
def _inverse_res_id(self):
for record in self:
attachment = record.attachment_id.with_context(no_document=True)
if attachment:
attachment.res_id = record.res_id
def _inverse_res_model(self):
for record in self:
attachment = record.attachment_id.with_context(no_document=True)
if attachment:
attachment.res_model = record.res_model
@api.onchange('url')
def _onchange_url(self):
if self.url:
is_youtube = re.match(r"^(https?\:\/\/)?(www\.)?(youtube\.com|youtu\.?be)\/.+$", self.url)
if not self.name and not is_youtube:
self.name = self.url.rsplit('/')[-1]
@api.depends('checksum')
def _compute_thumbnail(self):
for record in self:
try:
record.thumbnail = image_process(record.datas, size=(80, 80), crop='center')
except UserError:
record.thumbnail = False
@api.depends('attachment_type', 'url')
def _compute_type(self):
for record in self:
record.type = 'empty'
if record.attachment_id:
record.type = 'binary'
elif record.url:
record.type = 'url'
def _get_models(self, domain):
"""
Return the names of the models to which the attachments are attached.
:param domain: the domain of the read_group on documents.
:return: a list of model data, the latter being a dict with the keys
'id' (technical name),
'name' (display name) and
'__count' (how many attachments with that domain).
"""
not_a_file = []
not_attached = []
models = []
groups = self.read_group(domain, ['res_model'], ['res_model'], lazy=True)
for group in groups:
res_model = group['res_model']
if not res_model:
not_a_file.append({
'id': res_model,
'display_name': _('Not a file'),
'__count': group['res_model_count'],
})
elif res_model == 'documents.document':
not_attached.append({
'id': res_model,
'display_name': _('Not attached'),
'__count': group['res_model_count'],
})
else:
models.append({
'id': res_model,
'display_name': self.env['ir.model']._get(res_model).display_name,
'__count': group['res_model_count'],
})
return sorted(models, key=lambda m: m['display_name']) + not_attached + not_a_file
@api.depends('favorited_ids')
@api.depends_context('uid')
def _compute_is_favorited(self):
favorited = self.filtered(lambda d: self.env.user in d.favorited_ids)
favorited.is_favorited = True
(self - favorited).is_favorited = False
@api.depends('res_model')
def _compute_res_model_name(self):
for record in self:
if record.res_model:
model = self.env['ir.model'].name_search(record.res_model, limit=1)
if model:
record.res_model_name = model[0][1]
else:
record.res_model_name = False
else:
record.res_model_name = False
@api.depends('folder_id')
def _compute_available_rules(self):
"""
loads the rules that can be applied to the attachment.
"""
self.available_rule_ids = False
folder_ids = self.mapped('folder_id.id')
rule_domain = [('domain_folder_id', 'parent_of', folder_ids)] if folder_ids else []
# searching rules with sudo as rules are inherited from parent folders and should be available even
# when they come from a restricted folder.
rules = self.env['documents.workflow.rule'].sudo().search(rule_domain)
for rule in rules:
domain = []
if rule.condition_type == 'domain':
domain = literal_eval(rule.domain) if rule.domain else []
else:
if rule.criteria_partner_id:
domain = expression.AND([[['partner_id', '=', rule.criteria_partner_id.id]], domain])
if rule.criteria_owner_id:
domain = expression.AND([[['owner_id', '=', rule.criteria_owner_id.id]], domain])
if rule.create_model:
domain = expression.AND([[['type', '=', 'binary']], domain])
if rule.required_tag_ids:
domain = expression.AND([[['tag_ids', 'in', rule.required_tag_ids.ids]], domain])
if rule.excluded_tag_ids:
domain = expression.AND([[['tag_ids', 'not in', rule.excluded_tag_ids.ids]], domain])
folder_domain = [['folder_id', 'child_of', rule.domain_folder_id.id]]
subset = expression.AND([[['id', 'in', self.ids]], domain, folder_domain])
document_ids = self.env['documents.document'].search(subset)
for document in document_ids:
document.available_rule_ids = [(4, rule.id, False)]
@api.model
def message_new(self, msg_dict, custom_values=None):
"""
creates a new attachment from any email sent to the alias.
The values defined in the share link upload settings are included
in the custom values (via the alias defaults, synchronized on update)
"""
subject = msg_dict.get('subject', '')
if custom_values is None:
custom_values = {}
defaults = {
'name': "Mail: %s" % subject,
'active': False,
}
defaults.update(custom_values)
return super(Document, self).message_new(msg_dict, defaults)
@api.returns('mail.message', lambda value: value.id)
def message_post(self, *, message_type='notification', **kwargs):
if message_type == 'email' and self.create_share_id:
self = self.with_context(no_document=True)
return super(Document, self).message_post(message_type=message_type, **kwargs)
@api.model
def _message_post_after_hook(self, message, msg_vals):
"""
If the res model was an attachment and a mail, adds all the custom values of the share link
settings to the attachments of the mail.
"""
m2m_commands = msg_vals['attachment_ids']
share = self.create_share_id
if share:
attachments = self.env['ir.attachment'].browse([x[1] for x in m2m_commands])
for attachment in attachments:
document = self.env['documents.document'].create({
'name': attachment.name,
'attachment_id': attachment.id,
'folder_id': share.folder_id.id,
'owner_id': share.owner_id.id if share.owner_id else share.create_uid.id,
'partner_id': share.partner_id.id if share.partner_id else False,
'tag_ids': [(6, 0, share.tag_ids.ids if share.tag_ids else [])],
})
attachment.write({
'res_model': 'documents.document',
'res_id': document.id,
})
document.message_post(body=msg_vals.get('body', ''), subject=self.name)
if share.activity_option:
document.documents_set_activity(settings_record=share)
return super(Document, self)._message_post_after_hook(message, msg_vals)
def documents_set_activity(self, settings_record=None):
"""
Generate an activity based on the fields of settings_record.
:param settings_record: the record that contains the activity fields.
settings_record.activity_type_id (required)
settings_record.activity_summary
settings_record.activity_note
settings_record.activity_date_deadline_range
settings_record.activity_date_deadline_range_type
settings_record.activity_user_id
"""
if settings_record and settings_record.activity_type_id:
for record in self:
activity_vals = {
'activity_type_id': settings_record.activity_type_id.id,
'summary': settings_record.activity_summary or '',
'note': settings_record.activity_note or '',
}
if settings_record.activity_date_deadline_range > 0:
activity_vals['date_deadline'] = fields.Date.context_today(settings_record) + relativedelta(
**{
settings_record.activity_date_deadline_range_type: settings_record.activity_date_deadline_range})
if settings_record._fields.get(
'has_owner_activity') and settings_record.has_owner_activity and record.owner_id:
user = record.owner_id
elif settings_record._fields.get('activity_user_id') and settings_record.activity_user_id:
user = settings_record.activity_user_id
elif settings_record._fields.get('user_id') and settings_record.user_id:
user = settings_record.user_id
else:
user = self.env.user
if user:
activity_vals['user_id'] = user.id
record.activity_schedule(**activity_vals)
def toggle_favorited(self):
self.ensure_one()
self.sudo().write({'favorited_ids': [(3 if self.env.user in self[0].favorited_ids else 4, self.env.user.id)]})
def access_content(self):
self.ensure_one()
action = {
'type': "ir.actions.act_url",
'target': "new",
}
if self.url:
action['url'] = self.url
elif self.type == 'binary':
action['url'] = '/documents/content/%s' % self.id
return action
def create_share(self):
self.ensure_one()
vals = {
'type': 'ids',
'document_ids': [(6, 0, self.ids)],
'folder_id': self.folder_id.id,
}
return self.env['documents.share'].create_share(vals)
def open_resource(self):
self.ensure_one()
if self.res_model and self.res_id:
view_id = self.env[self.res_model].get_formview_id(self.res_id)
return {
'res_id': self.res_id,
'res_model': self.res_model,
'type': 'ir.actions.act_window',
'views': [[view_id, 'form']],
}
def toggle_lock(self):
"""
sets a lock user, the lock user is the user who locks a file for themselves, preventing data replacement
and archive (therefore deletion) for any user but himself.
Members of the group documents.group_document_manager and the superuser can unlock the file regardless.
"""
self.ensure_one()
if self.lock_uid:
if self.env.user == self.lock_uid or self.env.is_admin() or self.user_has_groups(
'documents.group_document_manager'):
self.lock_uid = False
else:
self.lock_uid = self.env.uid
def _compute_is_locked(self):
for record in self:
record.is_locked = record.lock_uid and not (
self.env.user == record.lock_uid or
self.env.is_admin() or
self.user_has_groups('documents.group_document_manager'))
@api.model
def create(self, vals):
keys = [key for key in vals if
self._fields[key].related and self._fields[key].related[0] == 'attachment_id']
attachment_dict = {key: vals.pop(key) for key in keys if key in vals}
attachment = self.env['ir.attachment'].browse(vals.get('attachment_id'))
if 'name' in vals:
extension = os.path.splitext(vals['name'])[1]
# self._check_extension(extension)
if attachment and attachment_dict:
attachment.write(attachment_dict)
elif attachment_dict:
attachment_dict.setdefault('name', vals.get('name', 'unnamed'))
attachment = self.env['ir.attachment'].create(attachment_dict)
vals['attachment_id'] = attachment.id
new_record = super(Document, self).create(vals)
# this condition takes precedence during forward-port.
if (attachment and not attachment.res_id and (
not attachment.res_model or attachment.res_model == 'documents.document')):
attachment.with_context(no_document=True).write(
{'res_model': 'documents.document', 'res_id': new_record.id})
self._check_size()
return new_record
def write(self, vals):
attachment_id = vals.get('attachment_id')
if attachment_id:
self.ensure_one()
for record in self:
if record.type == 'empty' and ('datas' in vals or 'url' in vals):
body = _("Document Request: %s Uploaded by: %s") % (record.name, self.env.user.name)
record.message_post(body=body)
if record.attachment_id:
# versioning
if attachment_id:
if attachment_id in record.previous_attachment_ids.ids:
record.previous_attachment_ids = [(3, attachment_id, False)]
record.previous_attachment_ids = [(4, record.attachment_id.id, False)]
if 'datas' in vals:
old_attachment = record.attachment_id.copy()
record.previous_attachment_ids = [(4, old_attachment.id, False)]
elif vals.get('datas') and not vals.get('attachment_id'):
res_model = vals.get('res_model', record.res_model or 'documents.document')
res_id = vals.get('res_id') if vals.get(
'res_model') else record.res_id if record.res_model else record.id
if res_model and res_model != 'documents.document' and not self.env[res_model].browse(res_id).exists():
record.res_model = res_model = 'documents.document'
record.res_id = res_id = record.id
attachment = self.env['ir.attachment'].with_context(no_document=True).create({
'name': vals.get('name', record.name),
'res_model': res_model,
'res_id': res_id
})
record.attachment_id = attachment.id
record._process_activities(attachment.id)
# pops the datas and/or the mimetype key(s) to explicitly write them in batch on the ir.attachment
# so the mimetype is properly set. The reason was because the related keys are not written in batch
# and because mimetype is readonly on `ir.attachment` (which prevents writing through the related).
attachment_dict = {key: vals.pop(key) for key in ['datas', 'mimetype'] if key in vals}
write_result = super(Document, self).write(vals)
if attachment_dict:
self.mapped('attachment_id').write(attachment_dict)
return write_result
def unlink(self):
if not self.env.user.has_group("dms.group_dms_delete_file") and not self.env.user.id == 1:
raise ValidationError(
_("You are not allowed to delete a file!")
)
return super(Document, self).unlink()
def _process_activities(self, attachment_id):
self.ensure_one()
if attachment_id and self.request_activity_id:
feedback = _("Document Request: %s Uploaded by: %s") % (self.name, self.env.user.name)
self.request_activity_id.action_feedback(feedback=feedback, attachment_ids=[attachment_id])
@api.model
def _pdf_split(self, new_files=None, open_files=None, vals=None):
vals = vals or {}
new_attachments = self.env['ir.attachment']._pdf_split(new_files=new_files, open_files=open_files)
return self.create([
dict(vals, attachment_id=attachment.id) for attachment in new_attachments
])
@api.model
def _search_panel_domain(self, field, operator, folder_id, comodel_domain=False):
if not comodel_domain:
comodel_domain = []
files_ids = self.search([("folder_id", operator, folder_id)]).ids
return expression.AND([comodel_domain, [(field, "in", files_ids)]])
@api.model
def _search_panel_directory(self, **kwargs):
search_domain = (kwargs.get("search_domain", []),)
category_domain = kwargs.get("category_domain", [])
if category_domain and len(category_domain):
return "=", category_domain[0][2]
if search_domain and len(search_domain):
for domain in search_domain[0]:
if domain[0] == "folder_id":
return domain[1], domain[2]
return None, None
@api.model
def search_panel_select_range(self, field_name, **kwargs):
operator, directory_id = self._search_panel_directory(**kwargs)
if field_name == 'folder_id':
enable_counters = kwargs.get('enable_counters', False)
fields = ['display_name', 'name', 'description', 'parent_id']
available_folders = self.env['dms.directory'].search([])
folder_domain = expression.OR(
[[('parent_id', 'parent_of', available_folders.ids)], [('id', 'in', available_folders.ids)]])
# also fetches the ancestors of the available folders to display the complete folder tree for all available folders.
DocumentFolder = self.env['dms.directory'].sudo().with_context(hierarchical_naming=False)
records = DocumentFolder.search_read(folder_domain, fields)
domain_image = {}
if enable_counters:
model_domain = expression.AND([
kwargs.get('search_domain', []),
kwargs.get('category_domain', []),
kwargs.get('filter_domain', []),
[(field_name, '!=', False)]
])
domain_image = self._search_panel_domain_image(field_name, model_domain, enable_counters)
comodel_domain = kwargs.pop("comodel_domain", [])
domain_image = self._search_panel_domain(
"file_ids", operator, directory_id, comodel_domain
)
values_range = OrderedDict()
for record in records:
record['display_name'] = record['name']
record_id = record['id']
if enable_counters:
image_element = domain_image.get(record_id)
record['__count'] = image_element['__count'] if image_element else 0
value = record['parent_id']
record['parent_id'] = value and value[0]
values_range[record_id] = record
if enable_counters:
self._search_panel_global_counters(values_range, 'parent_id')
return {
'parent_field': 'parent_id',
'values': list(values_range.values()),
}
return super(Document, self).search_panel_select_range(field_name)
def _get_processed_tags(self, domain, folder_id):
"""
sets a group color to the tags based on the order of the facets (group_id)
recomputed each time the search_panel fetches the tags as the colors depend on the order and
amount of tag categories. If the amount of categories exceeds the amount of colors, the color
loops back to the first one.
"""
tags = self.env['documents.tag']._get_tags(domain, folder_id)
facets = list(OrderedDict.fromkeys([tag['group_id'] for tag in tags]))
facet_colors = self.env['documents.facet'].FACET_ORDER_COLORS
for tag in tags:
color_index = facets.index(tag['group_id']) % len(facet_colors)
tag['group_hex_color'] = facet_colors[color_index]
return tags
@api.model
def search_panel_select_multi_range(self, field_name, **kwargs):
search_domain = kwargs.get('search_domain', [])
category_domain = kwargs.get('category_domain', [])
filter_domain = kwargs.get('filter_domain', [])
if field_name == 'tag_ids':
folder_id = category_domain[0][2] if len(category_domain) else False
if folder_id:
domain = expression.AND([
search_domain, category_domain, filter_domain,
[(field_name, '!=', False)],
])
return {'values': self._get_processed_tags(domain, folder_id)}
else:
return {'values': []}
elif field_name == 'res_model':
domain = expression.AND([search_domain, category_domain])
model_values = self._get_models(domain)
if filter_domain:
# fetch new counters
domain = expression.AND([search_domain, category_domain, filter_domain])
model_count = {
model['id']: model['__count']
for model in self._get_models(domain)
}
# update result with new counters
for model in model_values:
model['__count'] = model_count.get(model['id'], 0)
return {'values': model_values}
return super(Document, self).search_panel_select_multi_range(field_name, **kwargs)
class IrAttachment(models.Model):
_inherit = ['ir.attachment']
def _get_dms_directories_0(self, folder_id):
domain = [
("id", "=", folder_id.id),
]
if self.env.context.get("attaching_to_record"):
domain += [("storage_id.include_message_attachments", "=", True)]
return self.env["dms.directory"].search(domain)
def _get_dms_directories(self, res_model, res_id, folder_id):
domain = [
("parent_id", "=", folder_id.id),
("is_root_directory", "=", False),
("res_model", "!=", folder_id.model_id.model),
("res_id", "=", res_id),
# ("storage_id.save_type", "=", "attachment"),
]
if self.env.context.get("attaching_to_record"):
domain += [("storage_id.include_message_attachments", "=", True)]
directory = self.env["dms.directory"].search(domain)
return directory
def _dms_directories_create(self):
items = self.sudo()._get_dms_directories(self.res_model, False)
for item in items:
model_item = self.env[self.res_model].browse(self.res_id)
ir_model_item = self.env["ir.model"].search(
[("model", "=", self.res_model)]
)
test = self.env["dms.directory"].sudo().with_context(check_name=False).create(
{
"name": model_item.display_name,
"model_id": ir_model_item.id,
"res_model": self.res_model,
"res_id": self.res_id,
"parent_id": item.id,
"storage_id": item.storage_id.id,
}
)
return test
def _dms_directories_create_2(self, folder_id):
# items = self.sudo()._get_dms_directories_0(self.res_model, False,)
# for item in items:
model_item = self.env[self.res_model].browse(self.res_id)
ir_model_item = self.env["ir.model"].search(
[("model", "=", self.res_model)]
)
test = self.env["dms.directory"].sudo().with_context(check_name=False).create(
{
"name": model_item.display_name,
"model_id": ir_model_item.id,
"res_model": self.res_model,
"res_id": self.res_id,
"parent_id": folder_id.id,
"storage_id": folder_id.storage_id.id,
}
)
return test
def _dms_operations(self):
for attachment in self:
pass
# if attachment:
#
# # TODO#
# # 1 main folder
# # if attachment.folder_id.model_id.model == attachment.res_model:
# # pass
# # # 2 sub folder
# # if attachment.folder_id.model_id.model == attachment.res_model:
# # pass
# if not attachment.res_model or not attachment.res_id:
# continue
#
# if attachment.folder_id.model_id.model != attachment.res_model:
# if attachment.res_model != 'documents.document':
# directories = attachment._get_dms_directories(
# attachment.res_model, attachment.res_id, attachment.folder_id
# )
# if not directories:
# attachment._dms_directories_create_2(attachment.folder_id)
# # Get dms_directories again (with items previously created)
# directories = attachment._get_dms_directories(
# attachment.res_model, attachment.res_id, attachment.folder_id
# )
# # Auto-create_files (if not exists)
# for directory in directories:
# dms_file_model = self.env["documents.document"].sudo()
# dms_file = dms_file_model.search(
# [
# ("attachment_id", "=", attachment.id),
# ("folder_id", "=", directory.id),
# ]
# )
# if not dms_file:
# dms_file_model.create(
# {
# "name": attachment.name,
# "folder_id": directory.id,
# "attachment_id": attachment.id,
# "res_model": attachment.res_model,
# "res_id": attachment.res_id,
# }
# )
# if attachment.folder_id.model_id.model == attachment.res_model:
# directories = self.env["dms.directory"].search(
# [('id', '=', attachment.folder_id.id)])
# # Auto-create_files (if not exists)
# for directory in directories:
# dms_file_model = self.env["documents.document"].sudo()
# dms_file = dms_file_model.search(
# [
# ("attachment_id", "=", attachment.id),
# ("folder_id", "=", directory.id),
# ]
# )
# if not dms_file:
# dms_file_model.create(
# {
# "name": attachment.name,
# "folder_id": directory.id,
# "attachment_id": attachment.id,
# "res_model": attachment.res_model,
# "res_id": attachment.res_id,
# }
# )
###################################################################################################################
def _get_dms_directories_base(self, res_model, res_id):
domain = [
("res_model", "=", res_model),
("res_id", "=", res_id),
("storage_id.save_type", "=", "attachment"),
]
if self.env.context.get("attaching_to_record"):
domain += [("storage_id.include_message_attachments", "=", True)]
return self.env["dms.directory"].search(domain)
def _dms_directories_create_base(self):
items = self.sudo()._get_dms_directories_base(self.res_model, False)
for item in items:
model_item = self.env[self.res_model].browse(self.res_id)
ir_model_item = self.env["ir.model"].search(
[("model", "=", self.res_model)]
)
test = self.env["dms.directory"].sudo().with_context(check_name=False).create(
{
"name": model_item.display_name,
"model_id": ir_model_item.id,
"res_model": self.res_model,
"res_id": self.res_id,
"parent_id": item.id,
"storage_id": item.storage_id.id,
}
)
return test
def _dms_operations_base(self):
for attachment in self:
if not attachment.res_model or not attachment.res_id:
continue
directories = attachment._get_dms_directories_base(
attachment.res_model, attachment.res_id
)
if not directories:
attachment._dms_directories_create_base()
# Get dms_directories again (with items previously created)
directories = attachment._dms_directories_create_base(
attachment.res_model, attachment.res_id
)
# Auto-create_files (if not exists)
for directory in directories:
dms_file_model = self.env["documents.document"].sudo()
dms_file = dms_file_model.search(
[
("attachment_id", "=", attachment.id),
("folder_id", "=", directory.id),
]
)
if not dms_file:
dms_file_model.create(
{
"name": attachment.name,
"folder_id": directory.id,
"attachment_id": attachment.id,
"res_model": attachment.res_model,
"res_id": attachment.res_id,
}
)
@api.model_create_multi
def create(self, vals_list):
records = super().create(vals_list)
try:
model = self.env[vals_list[0]['res_model']]
if model._fields.get('folder_id'):
records._dms_operations()
else:
records._dms_operations_base()
except:
pass
return records
def write(self, vals):
res = super().write(vals)
if not self.env.context.get("dms_file") and self.env.context.get(
"attaching_to_record"
):
self._dms_operations()
return res