odex25_standard/odex25_base/simplify_access_management/models/models.py

403 lines
21 KiB
Python

from odoo import api, fields, models, tools, _
from odoo.exceptions import UserError,AccessError
from bs4 import BeautifulSoup
from lxml import etree
from odoo.osv import expression
from odoo.tools.safe_eval import safe_eval
import ast
from odoo.http import request
from odoo.addons.advanced_web_domain_widget.models.domain_prepare import prepare_domain_v2
class BaseModel(models.AbstractModel):
_inherit = 'base'
def _get_hidden_fields_for_model(self, model_name=None):
"""
Get list of fields to remove from search/group-by filters
based on 'remove_from_custom_filter' flag
"""
if not model_name:
model_name = self._name
hidden_field_names = []
try:
field_exists = self.env['ir.model.fields'].sudo().search_count([
('model', '=', 'hide.field'),
('name', '=', 'remove_from_custom_filter'),
]) > 0
if not field_exists:
return []
# Get hide field records for current model and user
hide_field_recs = self.env['hide.field'].sudo().search([
('access_management_id.company_ids', 'in', self.env.company.id),
('access_management_id.user_ids', 'in', self.env.user.id),
('access_management_id.active', '=', True),
('model_id.model', '=', model_name),
('remove_from_custom_filter', '=', True),
])
for hide_field_rec in hide_field_recs:
for field in hide_field_rec.field_id:
hidden_field_names.append(field.name)
except Exception:
pass
return hidden_field_names
@api.model
def fields_get(self, allfields=None, attributes=None):
"""
Override fields_get to exclude hidden fields from search/filter dropdowns
"""
res = super().fields_get(allfields, attributes)
hidden_fields = self._get_hidden_fields_for_model()
for field_name in hidden_fields:
if field_name in res:
res[field_name]['searchable'] = False
res[field_name]['sortable'] = False
return res
@api.model
def load_views(self, views, options=None):
actions_and_prints = []
for access in self.env['remove.action'].sudo().search([('access_management_id.company_ids', 'in', self.env.company.id),
('access_management_id', 'in',
self.env.user.access_management_ids.ids),
('model_id.model', '=', self._name)]):
actions_and_prints = actions_and_prints + access.mapped('report_action_ids.action_id').ids
actions_and_prints = actions_and_prints + access.mapped('server_action_ids.action_id').ids
for view_data in access.view_data_ids:
for view_data_list in views:
if view_data.techname == view_data_list[1]:
views.pop(views.index(view_data_list))
res = super(BaseModel, self).load_views(views, options=options)
if 'fields_views' in res.keys():
for view in ['list', 'form']:
if view in res['fields_views'].keys():
if 'toolbar' in res['fields_views'][view].keys():
if 'print' in res['fields_views'][view]['toolbar'].keys():
prints = res['fields_views'][view]['toolbar']['print'][:]
for pri in prints:
if pri['id'] in actions_and_prints:
res['fields_views'][view]['toolbar']['print'].remove(pri)
if 'print' in res['fields_views'][view]['toolbar'].keys():
action = res['fields_views'][view]['toolbar']['action'][:]
for act in action:
if act['id'] in actions_and_prints:
res['fields_views'][view]['toolbar']['action'].remove(act)
return res
@api.model
def fields_view_get(self, view_id=None, view_type='form', toolbar=False, submenu=False):
res = super().fields_view_get(view_id, view_type, toolbar, submenu)
access_management_obj = self.env['access.management']
cids = request.httprequest.cookies.get('cids') and request.httprequest.cookies.get('cids').split(',')[0] or request.env.company.id
readonly_access_id = access_management_obj.sudo().search([('company_ids','in',int(cids)),('active','=',True),('user_ids','in',self.env.user.id),('readonly','=',True)])
access_recs = self.env['access.domain.ah'].sudo().search([('access_management_id.company_ids','in',self.env.company.id),('access_management_id.user_ids','in',self.env.user.id),('access_management_id.active','=',True),('model_id.model','=',res['model'])])
access_model_recs = self.env['remove.action'].sudo().search([('access_management_id.company_ids','in',self.env.company.id),('access_management_id.user_ids','in',self.env.user.id),('access_management_id.active','=',True),('model_id.model','=',res['model'])])
if view_type == 'form':
access_management_id = access_management_obj.sudo().search([('company_ids', 'in', self.env.company.id),
('active', '=', True),
('user_ids', 'in', self.env.user.id),
('hide_chatter', '=', True)],
limit=1).id
if access_management_id:
doc = etree.XML(res['arch'])
for div in doc.xpath("//div[@class='oe_chatter']"):
div.getparent().remove(div)
res['arch'] = etree.tostring(doc, encoding='unicode')
else:
if self.env['hide.chatter'].sudo().search([('access_management_id.company_ids', 'in', self.env.company.id),
('access_management_id.active', '=', True),
('access_management_id.user_ids', 'in', self.env.user.id),
('model_id.model', '=', self._name),
('hide_chatter', '=', True)],
limit=1):
doc = etree.XML(res['arch'])
for div in doc.xpath("//div[@class='oe_chatter']"):
div.getparent().remove(div)
res['arch'] = etree.tostring(doc, encoding='unicode')
restrict_import = access_management_obj.sudo().search([('company_ids', 'in', self.env.company.id),
('active', '=', True),
('user_ids', 'in', self.env.user.id),
('hide_import', '=', True)], limit=1).id
if access_model_recs.filtered(lambda x: x.restrict_import) or restrict_import:
if view_type in ['kanban', 'tree']:
doc = etree.XML(res['arch'])
doc.attrib.update({'import': 'false'})
res['arch'] = etree.tostring(doc, encoding='unicode')
if readonly_access_id:
if view_type == 'form':
doc = etree.XML(res['arch'])
doc.attrib.update({'create':'false', 'delete':'false','edit':'false'})
res['arch'] = etree.tostring(doc, encoding='unicode')
if view_type == 'tree':
doc = etree.XML(res['arch'])
doc.attrib.update({'create':'false', 'delete':'false','edit':'false'})
res['arch'] = etree.tostring(doc, encoding='unicode')
if view_type == 'kanban':
doc = etree.XML(res['arch'])
doc.attrib.update({'create':'false', 'delete':'false','edit':'false'})
res['arch'] = etree.tostring(doc, encoding='unicode').replace('"','"')
else:
if access_model_recs:
delete = 'true'
edit = 'true'
create = 'true'
for access_model in access_model_recs:
if access_model.restrict_create:
create = 'false'
if access_model.restrict_edit:
edit = 'false'
if access_model.restrict_delete:
delete = 'false'
if view_type == 'form':
doc = etree.XML(res['arch'])
doc.attrib.update({'create':create, 'delete':delete,'edit':edit})
res['arch'] = etree.tostring(doc, encoding='unicode')
if view_type == 'tree':
doc = etree.XML(res['arch'])
doc.attrib.update({'create':create, 'delete':delete,'edit':edit})
res['arch'] = etree.tostring(doc, encoding='unicode')
if view_type == 'kanban':
doc = etree.XML(res['arch'])
doc.attrib.update({'create':create, 'delete':delete,'edit':edit})
res['arch'] = etree.tostring(doc, encoding='unicode')
if access_recs:
delete = 'false'
edit = 'false'
create = 'false'
for access_rec in access_recs:
if access_rec.create_right:
create = 'true'
if access_rec.write_right:
edit = 'true'
if access_rec.delete_right:
delete = 'true'
if view_type == 'form':
doc = etree.XML(res['arch'])
doc.attrib.update({'create':create, 'delete':delete,'edit':edit})
res['arch'] = etree.tostring(doc, encoding='unicode')
if view_type == 'tree':
doc = etree.XML(res['arch'])
doc.attrib.update({'create':create, 'delete':delete,'edit':edit})
res['arch'] = etree.tostring(doc, encoding='unicode')
if view_type == 'kanban':
doc = etree.XML(res['arch'])
doc.attrib.update({'create':create, 'delete':delete,'edit':edit})
res['arch'] = etree.tostring(doc, encoding='unicode').replace('"','"')
return res
def _get_access_management_domain_record(self, model=False):
records = None
try:
if model:
self._cr.execute("SELECT id FROM ir_model WHERE model='" + model +"'")
model_numeric_id = self._cr.fetchone()[0]
if model_numeric_id and isinstance(model_numeric_id,int) and self.env.user:
self._cr.execute("""
SELECT dm.id
FROM access_domain_ah as dm
WHERE dm.model_id=%s AND dm.access_management_id
IN (SELECT am.id
FROM access_management as am
WHERE am.active='t' AND am.id
IN (SELECT amusr.access_management_id
FROM access_management_users_rel_ah as amusr
WHERE amusr.user_id=%s))
""",[model_numeric_id, self.env.user.id])
records = self.env['access.domain.ah'].sudo().browse(row[0] for row in self._cr.fetchall())
except:
pass
return records
def _check_access_management_right(self,mode=False,records=False):
access_flag = False
access_rule = None
length = len(records.sudo()) if records.sudo() else 0
partner_ids = self.env['res.users'].sudo().search([]).mapped("partner_id.id")
partner_domain = ['|', ('id', 'in', partner_ids)]
for record in records.sudo():
if mode == 'create' and record.create_right:
access_flag = True
break
elif mode in ['write','unlink']:
access = False
if mode == 'unlink':
access = record.delete_right
elif mode == 'write':
access = record.write_right
domain_list = []
if self.sudo()._name == "res.partner":
domain_list += partner_domain
# eval_context = rec._eval_context()
dom = safe_eval(record.domain) if record.domain else []
if dom:
dom = expression.normalize_domain(dom)
model_name = self._name
if isinstance(dom,list):
for dom_tuple in dom:
if isinstance(dom_tuple, tuple):
left_value = dom_tuple[0]
operator_value = dom_tuple[1]
right_value = dom_tuple[2]
left_value_split_list = left_value.split('.')
model_string = model_name
left_user = False
left_company = False
for field in left_value_split_list:
left_user = False
left_company = False
model_obj = self.env[model_string]
field_type = model_obj.fields_get()[field]['type']
if field_type in ['many2one', 'many2many', 'one2many']:
field_relation = model_obj.fields_get()[field]['relation']
model_string = field_relation
if model_string == 'res.users':
left_user = True
if model_string == 'res.company':
left_company = True
if left_user:
if operator_value in ['in', 'not in']:
if isinstance(right_value, list) and 0 in right_value:
zero_index = right_value.index(0)
right_value[zero_index] = self.env.user.id
if left_company:
if operator_value in ['in', 'not in']:
if isinstance(right_value, list) and 0 in right_value:
zero_index = right_value.index(0)
right_value[zero_index] = self.env.company.id
already_add = False
if operator_value == 'date_filter':
domain_list += prepare_domain_v2(dom_tuple)
else:
domain_list.append(dom_tuple)
else:
domain_list.append(dom_tuple)
# if length > 1:
# domain_list.insert(0, '|')
# length -= 1
# field_type = self.fields_get()[field]['type']
# operator_value = tuple[1]
# right_value = tuple[2]
# if operator_value in ['in','not in']:
# if isinstance(right_value,list) and 0 in right_value:
# zero_index = right_value.index(0)
# right_value[zero_index] = self.env.user.id
# if operator_value == 'date_filter':
# domain_list.append(prepare_domain_v2([tuple]))
search_domain = domain_list
# search_domain = expression.AND(prepare_domain_v2(domain_list))
if 'active' in self._fields:
search_domain = ['|',('active','=',False),('active','=',True)] + search_domain
record_ids = self.search(search_domain)
if self in record_ids and access:
access_flag = access
break
access_rule = record.access_management_id.name
return {'access_flag' : access_flag, 'access_rule' : access_rule}
def _display_access_management_error(self,mode=None,rule=None):
if mode and rule:
msg_heads = {
'unlink': _("Due to access management rule,\nYou are not allowed to delete record '%(record)s' from (%(document_model)s) model.", record=self.name, document_model=self._name),
'write': _("Due to access management rule,\nYou are not allowed to edit record '%(record)s' from (%(document_model)s) model.", record=self.name, document_model=self._name),
'create': _("Due to access management rule,\nYou are not allowed to create records from (%(document_model)s) model.", document_model=self._name),
}
operation_error = msg_heads[mode]
resolution_info = _("Check Applied Rule on Access Management:\n %(access_name)s",access_name=rule)
msg = """{operation_error}
{resolution_info}""".format(operation_error=operation_error,resolution_info=resolution_info)
raise AccessError(msg)
def unlink(self):
value = self.env['ir.config_parameter'].sudo().search([('key','=','uninstall_simplify_access_management')],limit=1).value
if not value:
for rec in self:
if rec._name:
access_domain_ah_ids = rec._get_access_management_domain_record(model=rec._name)
if access_domain_ah_ids:
access_domain_ah_ids = access_domain_ah_ids.filtered(lambda line: self.env.company in line.access_management_id.company_ids)
if access_domain_ah_ids:
flag = rec._check_access_management_right(mode='unlink',records=access_domain_ah_ids)
unlink_flag = flag['access_flag']
access_rule = flag['access_rule']
if not unlink_flag:
rec._display_access_management_error(mode='unlink',rule=access_rule)
return super().unlink()
def write(self, vals):
value = self.env['ir.config_parameter'].sudo().search([('key','=','uninstall_simplify_access_management')],limit=1).value
if not value:
for rec in self:
if rec._name:
access_domain_ah_ids = rec._get_access_management_domain_record(model=rec._name)
if access_domain_ah_ids:
access_domain_ah_ids = access_domain_ah_ids.filtered(lambda line: self.env.company in line.access_management_id.company_ids)
if access_domain_ah_ids:
flag = rec._check_access_management_right(mode='write',records=access_domain_ah_ids)
write_flag = flag['access_flag']
access_rule = flag['access_rule']
if not write_flag:
rec._display_access_management_error(mode='write',rule=access_rule)
return super().write(vals)
@api.model_create_multi
@api.returns('self', lambda value: value.id)
def create(self, vals_list):
value = self.env['ir.config_parameter'].sudo().search([('key','=','uninstall_simplify_access_management')],limit=1).value
if not value:
if self._name:
access_domain_ah_ids = self._get_access_management_domain_record(model=self._name)
if access_domain_ah_ids:
access_domain_ah_ids = access_domain_ah_ids.filtered(lambda line: self.env.company in line.access_management_id.company_ids)
if access_domain_ah_ids:
flag = self._check_access_management_right(mode='create',records=access_domain_ah_ids)
create_flag = flag['access_flag']
access_rule = flag['access_rule']
if not create_flag:
self._display_access_management_error(mode='create',rule=access_rule)
return super().create(vals_list)