{info}{related_record_name}
').format( + info=info, url=url, related_record_name=related_record_name), + message_type='comment', + author_id=self.env.ref('base.partner_root').id, + subtype_xmlid='mail.mt_note', + ) + if hasattr(related_record, 'message_post'): + # Add notification in document about the new message and related channel + info = _("A new WhatsApp channel is created for this document") + base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') + url = Markup('{base_url}/web#model=discuss.channel&id={channel_id}').format( + base_url=base_url, channel_id=channel.id) + related_record.message_post( + author_id=self.env.ref('base.partner_root').id, + body=Markup('{info}{channel_name}
').format( + info=info, url=url, channel_id=channel.id, channel_name=channel.display_name), + message_type='comment', + subtype_xmlid='mail.mt_note', + ) + if partners_to_notify == channel.whatsapp_partner_id and wa_account_id.notify_user_ids.partner_id: + partners_to_notify += wa_account_id.notify_user_ids.partner_id + channel.channel_member_ids = [(5,0,0)] + [(0,0, {'partner_id': partner.id}) for partner in partners_to_notify] + channel._broadcast(partners_to_notify.ids) + return channel + + def whatsapp_channel_join_and_pin(self): + """ Adds the current partner as a member of self channel and pins them if not already pinned. """ + self.ensure_one() + if self.channel_type != 'whatsapp': + raise ValidationError(_('This join method is not possible for regular channels.')) + + self.check_access_rights('write') + self.check_access_rule('write') + current_partner = self.env.user.partner_id + member = self.channel_member_ids.filtered(lambda m: m.partner_id == current_partner) + if member: + if not member.is_pinned: + member.write({'is_pinned': True}) + else: + new_member = self.env['discuss.channel.member'].with_context(tools.clean_context(self.env.context)).sudo().create([{ + 'partner_id': current_partner.id, + 'channel_id': self.id, + }]) + message_body = Markup(f'{info} {record_name}
').format( + info=info, + url=url, + record_name=record_name, + ), + ) + + @api.model + def _prepare_attachment_vals(self, attachment, wa_account_id): + """ Upload the attachment to WhatsApp and return prepared values to attach to the message. """ + whatsapp_media_type = next(( + media_type + for media_type, mimetypes + in self._SUPPORTED_ATTACHMENT_TYPE.items() + if attachment.mimetype in mimetypes), + False + ) + + if not whatsapp_media_type: + raise WhatsAppError(_("Attachment mimetype is not supported by WhatsApp: %s.", attachment.mimetype)) + wa_api = WhatsAppApi(wa_account_id) + whatsapp_media_uid = wa_api._upload_whatsapp_document(attachment) + + vals = { + 'type': whatsapp_media_type, + whatsapp_media_type: {'id': whatsapp_media_uid} + } + + if whatsapp_media_type == 'document': + vals[whatsapp_media_type]['filename'] = attachment.name + + return vals + + # ------------------------------------------------------------ + # CALLBACK + # ------------------------------------------------------------ + + def _process_statuses(self, value): + """ Process status of the message like 'send', 'delivered' and 'read'.""" + mapping = {'failed': 'error', 'cancelled': 'cancel'} + for statuses in value.get('statuses', []): + whatsapp_message_id = self.env['whatsapp.message'].sudo().search([('msg_uid', '=', statuses['id'])]) + if whatsapp_message_id: + whatsapp_message_id.state = mapping.get(statuses['status'], statuses['status']) + whatsapp_message_id._update_message_fetched_seen() + if statuses['status'] == 'failed': + error = statuses['errors'][0] if statuses.get('errors') else None + if error: + whatsapp_message_id._handle_error(whatsapp_error_code=error['code'], + error_message=f"{error['code']} : {error['title']}") + + def _update_message_fetched_seen(self): + """ Update message status for the whatsapp recipient. """ + self.ensure_one() + if self.mail_message_id.model != 'discuss.channel': + return + channel = self.env['discuss.channel'].browse(self.mail_message_id.res_id) + channel_member = channel.channel_member_ids.filtered(lambda cm: cm.partner_id == channel.whatsapp_partner_id)[0] + notification_type = None + if self.state == 'read': + channel_member.write({ + 'fetched_message_id': max(channel_member.fetched_message_id.id, self.mail_message_id.id), + 'seen_message_id': self.mail_message_id.id, + 'last_seen_dt': fields.Datetime.now(), + }) + notification_type = 'discuss.channel.member/seen' + elif self.state == 'delivered': + channel_member.write({'fetched_message_id': self.mail_message_id.id}) + notification_type = 'discuss.channel.member/fetched' + if notification_type: + self.env['bus.bus']._sendone(channel, notification_type, { + 'channel_id': channel.id, + 'id': channel_member.id, + 'last_message_id': self.mail_message_id.id, + 'partner_id': channel.whatsapp_partner_id.id, + }) diff --git a/odex25_base/whatsapp/models/whatsapp_template.py b/odex25_base/whatsapp/models/whatsapp_template.py new file mode 100644 index 000000000..5b7502859 --- /dev/null +++ b/odex25_base/whatsapp/models/whatsapp_template.py @@ -0,0 +1,833 @@ +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +import json +import re + +from markupsafe import Markup + +from odoo import api, models, fields, _ +from odoo.addons.http_routing.models.ir_http import slugify +from ..tools.lang_list import Languages +from ..tools.whatsapp_api import WhatsAppApi +from ..tools.whatsapp_exception import WhatsAppError +from odoo.exceptions import UserError, ValidationError, AccessError +from odoo.tools import plaintext2html +from odoo.tools.safe_eval import safe_eval + +LATITUDE_LONGITUDE_REGEX = r'^[-+]?([1-8]?\d(\.\d+)?|90(\.0+)?),\s*[-+]?(180(\.0+)?|((1[0-7]\d)|([1-9]?\d))(\.\d+)?)$' + +COMMON_WHATSAPP_PHONE_SAFE_FIELDS = { + 'mobile', + 'phone', + 'phone_sanitized', + 'partner_id.mobile', + 'partner_id.phone', + 'phone_sanitized.phone', + 'x_studio_mobile', + 'x_studio_phone', + 'x_studio_partner_id.mobile', + 'x_studio_partner_id.phone', + 'x_studio_partner_id.phone_sanitized', +} + +class WhatsAppTemplate(models.Model): + _name = 'whatsapp.template' + _inherit = ['mail.thread'] + _description = 'WhatsApp Template' + _order = 'sequence asc, id' + + @api.model + def _get_default_wa_account_id(self): + first_account = self.env['whatsapp.account'].search([ + ('allowed_company_ids', 'in', self.env.companies.ids)], limit=1) + return first_account.id if first_account else False + + @api.model + def _get_model_selection(self): + """ Available models are all models, as even transient models could have + templates associated (e.g. payment.link.wizard) """ + return [ + (model.model, model.name) + for model in self.env['ir.model'].sudo().search([]) + ] + + name = fields.Char(string="Name", tracking=True) + template_name = fields.Char(string="Template Name", compute='_compute_template_name', readonly=False, store=True) + sequence = fields.Integer(required=True, default=0) + active = fields.Boolean(default=True) + + wa_account_id = fields.Many2one( + comodel_name='whatsapp.account', string="Account", default=_get_default_wa_account_id, + ondelete="cascade") + wa_template_uid = fields.Char(string="WhatsApp Template ID", copy=False) + error_msg = fields.Char(string="Error Message") + + model_id = fields.Many2one( + string='Applies to', comodel_name='ir.model', + default=lambda self: self.env['ir.model']._get_id('res.partner'), + ondelete='cascade', required=True, store=True, + tracking=1) + model = fields.Char( + string='Related Document Model', + related='model_id.model', + precompute=True, store=True, readonly=True) + phone_field = fields.Char( + string='Phone Field', compute='_compute_phone_field', + precompute=True, readonly=False, required=True, store=True) + lang_code = fields.Selection(string="Language", selection=Languages, default='en', required=True) + template_type = fields.Selection([ + ('authentication', 'Authentication'), + ('marketing', 'Marketing'), + ('utility', 'Utility')], string="Category", default='marketing', tracking=True, + help="Authentication - One-time passwords that your customers use to authenticate a transaction or login.\n" + "Marketing - Promotions or information about your business, products or services. Or any message that isn't utility or authentication.\n" + "Utility - Messages about a specific transaction, account, order or customer request.") + + status = fields.Selection([ + ('draft', 'Draft'), + ('pending', 'Pending'), + ('in_appeal', 'In Appeal'), + ('approved', 'Approved'), + ('paused', 'Paused'), + ('disabled', 'Disabled'), + ('rejected', 'Rejected'), + ('pending_deletion', 'Pending Deletion'), + ('deleted', 'Deleted'), + ('limit_exceeded', 'Limit Exceeded')], string="Status", default='draft', copy=False, tracking=True) + quality = fields.Selection([ + ('none', 'None'), + ('red', 'Red'), + ('yellow', 'Yellow'), + ('green', 'Green')], string="Quality", default='none', copy=False, tracking=True) + allowed_user_ids = fields.Many2many( + comodel_name='res.users', string="Users", + domain=[('share', '=', False)]) + + body = fields.Text(string="Template body", tracking=True) + header_type = fields.Selection([ + ('none', 'None'), + ('text', 'Text'), + ('image', 'Image'), + ('video', 'Video'), + ('document', 'Document'), + ('location', 'Location')], string="Header Type", default='none') + header_text = fields.Char(string="Template Header Text", size=60) + header_attachment_ids = fields.Many2many('ir.attachment', string="Template Static Header", copy=False) + footer_text = fields.Char(string="Footer Message") + report_id = fields.Many2one(comodel_name='ir.actions.report', string="Report", domain="[('model_id', '=', model_id)]", tracking=True) + variable_ids = fields.One2many('whatsapp.template.variable', 'wa_template_id', copy=True, + string="Template Variables", store=True, compute='_compute_variable_ids', precompute=True, readonly=False) + button_ids = fields.One2many('whatsapp.template.button', 'wa_template_id', string="Buttons") + + messages_count = fields.Integer(string="Messages Count", compute='_compute_messages_count') + has_action = fields.Boolean(string="Has Action", compute='_compute_has_action') + + _sql_constraints = [ + ('unique_name_account_template', 'unique(template_name, lang_code, wa_account_id)', "Duplicate template is not allowed for one Meta account.") + ] + + @api.constrains('header_text') + def _check_header_text(self): + for tmpl in self.filtered(lambda l: l.header_type == 'text'): + header_variables = list(re.findall(r'{{[1-9][0-9]*}}', tmpl.header_text)) + if len(header_variables) > 1 or (header_variables and header_variables[0] != '{{1}}'): + raise ValidationError(_("Header text can only contain a single {{variable}}.")) + + @api.constrains('phone_field', 'model') + def _check_phone_field(self): + is_system = self.user_has_groups('base.group_system') + for tmpl in self.filtered('phone_field'): + model = self.env[tmpl.model] + if not is_system: + if not model.check_access_rights('read', raise_exception=False): + model_description = self.env['ir.model']._get(tmpl.model).display_name + raise AccessError( + _("You can not select field of %(model)s.", model=model_description) + ) + safe_fields = set(COMMON_WHATSAPP_PHONE_SAFE_FIELDS) + if hasattr(model, '_wa_get_safe_phone_fields'): + safe_fields |= set(model._wa_get_safe_phone_fields()) + if tmpl.phone_field not in safe_fields: + raise AccessError( + _("You are not allowed to use %(field)s in phone field, contact your administrator to configure it.", + field=tmpl.phone_field) + ) + try: + model._find_value_from_field_path(tmpl.phone_field) + except UserError as err: + raise ValidationError( + _("'%(field)s' does not seem to be a valid field path on %(model)s", + field=tmpl.phone_field, + model=tmpl.model) + ) from err + + @api.constrains('header_attachment_ids', 'header_type') + def _check_header_attachment_ids(self): + templates_with_attachments = self.filtered('header_attachment_ids') + for tmpl in templates_with_attachments: + if len(tmpl.header_attachment_ids) > 1: + raise ValidationError(_('You may only use one header attachment for each template')) + if tmpl.header_type not in ['image', 'video', 'document']: + raise ValidationError(_("Only templates using media header types may have header documents")) + if not any(tmpl.header_attachment_ids.mimetype in mimetypes for mimetypes in self.env['whatsapp.message']._SUPPORTED_ATTACHMENT_TYPE[tmpl.header_type]): + raise ValidationError(_("File type %(file_type)s not supported for header type %(header_type)s", + file_type=tmpl.header_attachment_ids.mimetype, header_type=tmpl.header_type)) + for tmpl in self - templates_with_attachments: + if tmpl.header_type == 'document' and not tmpl.report_id: + raise ValidationError(_("Header document or report is required")) + if tmpl.header_type in ['image', 'video']: + raise ValidationError(_("Header document is required")) + + @api.constrains('button_ids', 'variable_ids') + def _check_buttons(self): + for tmpl in self: + if len(tmpl.button_ids) > 10: + raise ValidationError(_('Maximum 10 buttons allowed.')) + if len(tmpl.button_ids.filtered(lambda button: button.button_type == 'url')) > 2: + raise ValidationError(_('Maximum 2 URL buttons allowed.')) + if len(tmpl.button_ids.filtered(lambda button: button.button_type == 'phone_number')) > 1: + raise ValidationError(_('Maximum 1 Call Number button allowed.')) + + @api.constrains('variable_ids') + def _check_body_variables(self): + for template in self: + variables = template.variable_ids.filtered(lambda variable: variable.line_type == 'body') + free_text_variables = variables.filtered(lambda variable: variable.field_type == 'free_text') + if len(free_text_variables) > 10: + raise ValidationError(_('Only 10 free text is allowed in body of template')) + + variable_indices = sorted(var._extract_variable_index() for var in variables) + if len(variable_indices) > 0 and (variable_indices[0] != 1 or variable_indices[-1] != len(variables)): + missing = next( + (index for index in range(1, len(variables)) if variable_indices[index - 1] + 1 != variable_indices[index]), + 0) + 1 + raise ValidationError(_('Body variables should start at 1 and not skip any number, missing %d', missing)) + + @api.constrains('header_type', 'variable_ids') + def _check_header_variables(self): + for template in self: + location_vars = template.variable_ids.filtered(lambda var: var.line_type == 'location') + text_vars = template.variable_ids.filtered(lambda var: var.line_type == 'header') + if template.header_type == 'location' and len(location_vars) != 4: + raise ValidationError(_('When using a "location" header, there should 4 location variables not %(count)d.', + count=len(location_vars))) + elif template.header_type != 'location' and location_vars: + raise ValidationError(_('Location variables should only exist when a "location" header is selected.')) + if len(text_vars) > 1: + raise ValidationError(_('There should be at most 1 variable in the header of the template.')) + if text_vars and text_vars._extract_variable_index() != 1: + raise ValidationError(_('Free text variable in the header should be {{1}}')) + + #===================================================== + # Compute Methods + #===================================================== + + @api.depends('model') + def _compute_phone_field(self): + to_reset = self.filtered(lambda template: not template.model) + if to_reset: + to_reset.phone_field = False + for template in self.filtered('model'): + if template.phone_field and template.phone_field in self.env[template.model]._fields: + continue + if 'mobile' in self.env[template.model]._fields: + template.phone_field = 'mobile' + elif 'phone' in self.env[template.model]._fields: + template.phone_field = 'phone' + + @api.depends('name') + def _compute_template_name(self): + for template in self: + if template.status == 'draft' and not template.wa_template_uid: + template.template_name = re.sub(r'\W+', '_', slugify(template.name or '')) + + @api.depends('model') + def _compute_model_id(self): + self.filtered(lambda tpl: not tpl.model).model_id = False + for template in self.filtered('model'): + template.model_id = self.env['ir.model']._get_id(template.model) + + @api.depends('header_type', 'header_text', 'body') + def _compute_variable_ids(self): + """compute template variable according to header text, body and buttons""" + for tmpl in self: + to_delete = [] + to_create = [] + header_variables = list(re.findall(r'{{[1-9][0-9]*}}', tmpl.header_text or '')) + body_variables = set(re.findall(r'{{[1-9][0-9]*}}', tmpl.body or '')) + + # if there is header text + existing_header_text_variable = tmpl.variable_ids.filtered(lambda line: line.line_type == 'header') + if header_variables and not existing_header_text_variable: + to_create.append({'name': header_variables[0], 'line_type': 'header', 'wa_template_id': tmpl.id}) + elif not header_variables and existing_header_text_variable: + to_delete.append(existing_header_text_variable.id) + + # if the header is a location + existing_header_location_variables = tmpl.variable_ids.filtered(lambda line: line.line_type == 'location') + if tmpl.header_type == 'location': + if not existing_header_location_variables: + to_create += [ + {'name': 'name', 'line_type': 'location', 'wa_template_id': tmpl.id}, + {'name': 'address', 'line_type': 'location', 'wa_template_id': tmpl.id}, + {'name': 'latitude', 'line_type': 'location', 'wa_template_id': tmpl.id}, + {'name': 'longitude', 'line_type': 'location', 'wa_template_id': tmpl.id} + ] + else: + to_delete += existing_header_location_variables.ids + + # body + existing_body_variables = tmpl.variable_ids.filtered(lambda line: line.line_type == 'body') + existing_body_variables = {var.name: var for var in existing_body_variables} + new_body_variable_names = [var_name for var_name in body_variables if var_name not in existing_body_variables] + deleted_body_variables = [var.id for name, var in existing_body_variables.items() if name not in body_variables] + + to_create += [{'name': var_name, 'line_type': 'body', 'wa_template_id': tmpl.id} for var_name in set(new_body_variable_names)] + to_delete += deleted_body_variables + + update_commands = [(2, to_delete_id) for to_delete_id in to_delete] + [(0, 0, vals) for vals in to_create] + if update_commands: + tmpl.variable_ids = update_commands + + @api.depends('model_id') + def _compute_has_action(self): + for tmpl in self: + action = self.env['ir.actions.act_window'].sudo().search([('res_model', '=', 'whatsapp.composer'), ('binding_model_id', '=', tmpl.model_id.id)]) + if action: + tmpl.has_action = True + else: + tmpl.has_action = False + + def _compute_messages_count(self): + grouped_messages = self.env['whatsapp.message'].read_group( + domain=[('wa_template_id', 'in', self.ids)], + fields=[], + groupby=['wa_template_id'], + + + + ) + messages_by_template = {} + for item in grouped_messages: + key = item['wa_template_id'][0] + value = item['wa_template_id_count'] + messages_by_template[key] = value + for tmpl in self: + tmpl.messages_count = messages_by_template.get(tmpl.id, 0) + + @api.onchange('header_attachment_ids') + def _onchange_header_attachment_ids(self): + for template in self: + template.header_attachment_ids.res_id = template.id + template.header_attachment_ids.res_model = template._name + + @api.onchange('wa_account_id') + def _onchange_wa_account_id(self): + """Avoid carrying remote sync data when changing account.""" + self.status = 'draft' + self.quality = 'none' + self.wa_template_uid = False + + #=================================================================== + # CRUD + #=================================================================== + + @api.model_create_multi + def create(self, vals_list): + for vals in vals_list: + # stable backward compatible change for model fields + if vals.get('model_id'): + vals['model'] = self.env['ir.model'].sudo().browse(vals[('model_id')]).model + records = super().create(vals_list) + # the model of the variable might have been changed with x2many commands + records.variable_ids._check_field_name() + # update the attachment res_id for new records + records._onchange_header_attachment_ids() + return records + + def write(self, vals): + if vals.get("model_id"): + vals["model"] = self.env['ir.model'].sudo().browse(vals["model_id"]).model + res = super().write(vals) + # Model change: explicitly check for field access. Other changes at variable + # level are checked by '_check_field_name' constraint. + if 'model_id' in vals: + self.variable_ids._check_field_name() + return res + + def copy(self, default=None): + self.ensure_one() + default = default or {} + if not default.get('name'): + default['name'] = _('%(original_name)s (copy)', original_name=self.name) + default['template_name'] = f'{self.template_name}_copy' + return super().copy(default) + + @api.depends('name', 'wa_account_id') + def _compute_display_name(self): + for template in self: + template.display_name = _('%(template_name)s [%(account_name)s]', + template_name=template.name, + account_name=template.wa_account_id.name + ) if template.wa_account_id.name else template.name + + #=================================================================== + # Register template to whatsapp + #=================================================================== + + def _get_template_head_component(self, file_handle): + """Return header component according to header type for template registration to whatsapp""" + if self.header_type == 'none': + return None + head_component = {'type': 'HEADER', 'format': self.header_type.upper()} + if self.header_type == 'text' and self.header_text: + head_component['text'] = self.header_text + header_params = self.variable_ids.filtered(lambda line: line.line_type == 'header') + if header_params: + head_component['example'] = {'header_text': header_params.mapped('demo_value')} + elif self.header_type in ['image', 'video', 'document']: + head_component['example'] = { + 'header_handle': [file_handle] + } + return head_component + + def _get_template_body_component(self): + """Return body component for template registration to whatsapp""" + if not self.body: + return None + body_component = {'type': 'BODY', 'text': self.body} + body_params = self.variable_ids.filtered(lambda line: line.line_type == 'body') + if body_params: + body_component['example'] = {'body_text': [body_params.mapped('demo_value')]} + return body_component + + def _get_template_button_component(self): + """Return button component for template registration to whatsapp""" + if not self.button_ids: + return None + buttons = [] + for button in self.button_ids: + button_data = { + 'type': button.button_type.upper(), + 'text': button.name + } + if button.button_type == 'url': + button_data['url'] = button.website_url + if button.url_type == 'dynamic': + button_data['url'] += '{{1}}' + button_data['example'] = button.variable_ids[0].demo_value + elif button.button_type == 'phone_number': + button_data['phone_number'] = button.call_number + buttons.append(button_data) + return {'type': 'BUTTONS', 'buttons': buttons} + + def _get_template_footer_component(self): + if not self.footer_text: + return None + return {'type': 'FOOTER', 'text': self.footer_text} + + def _get_sample_record(self): + return self.env[self.model].search([], limit=1) + + def button_submit_template(self): + """Register template to WhatsApp Business Account """ + self.ensure_one() + wa_api = WhatsAppApi(self.wa_account_id) + attachment = False + if self.header_type in ('image', 'video', 'document'): + if self.header_type == 'document' and self.report_id: + record = self._get_sample_record() + if not record: + raise ValidationError(_("There is no record for preparing demo pdf in model %(model)s", model=self.model_id.name)) + attachment = self._generate_attachment_from_report(record) + else: + attachment = self.header_attachment_ids + if not attachment: + raise ValidationError("Header Document is missing") + file_handle = False + if attachment: + try: + file_handle = wa_api._upload_demo_document(attachment) + except WhatsAppError as e: + raise UserError(str(e)) + + components = [self._get_template_body_component()] + components += [comp for comp in ( + self._get_template_head_component(file_handle), + self._get_template_button_component(), + self._get_template_footer_component()) if comp] + json_data = json.dumps({ + 'name': self.template_name, + 'language': self.lang_code, + 'category': self.template_type.upper(), + 'components': components, + }) + try: + if self.wa_template_uid: + wa_api._submit_template_update(json_data, self.wa_template_uid) + self.status = 'pending' + else: + response = wa_api._submit_template_new(json_data) + self.write({ + 'wa_template_uid': response['id'], + 'status': response['status'].lower() + }) + except WhatsAppError as we: + raise UserError(str(we)) + + #=================================================================== + # Sync template from whatsapp + #=================================================================== + + def button_sync_template(self): + """Sync template from WhatsApp Business Account """ + self.ensure_one() + wa_api = WhatsAppApi(self.wa_account_id) + try: + response = wa_api._get_template_data(wa_template_uid=self.wa_template_uid) + except WhatsAppError as e: + raise ValidationError(str(e)) + if response.get('id'): + self._update_template_from_response(response) + return { + 'type': 'ir.actions.client', + 'tag': 'reload', + } + + @api.model + def _create_template_from_response(self, remote_template_vals, wa_account): + template_vals = self._get_template_vals_from_response(remote_template_vals, wa_account) + template_vals['variable_ids'] = [(0, 0, var) for var in template_vals['variable_ids']] + for button in template_vals['button_ids']: + button['variable_ids'] = [(0, 0, var) for var in button['variable_ids']] + template_vals['button_ids'] = [(0, 0, button) for button in template_vals['button_ids']] + template_vals['header_attachment_ids'] = [(0, 0, attachment) for attachment in template_vals['header_attachment_ids']] + return template_vals + + def _update_template_from_response(self, remote_template_vals): + self.ensure_one() + update_fields = ('body', 'header_type', 'header_text', 'footer_text', 'lang_code', 'template_type', 'status') + template_vals = self._get_template_vals_from_response(remote_template_vals, self.wa_account_id) + update_vals = {field: template_vals[field] for field in update_fields} + + # variables should be preserved instead of overwritten to keep odoo-specific data like fields + variable_ids = [] + existing_template_variables = {(variable_id.name, variable_id.line_type): variable_id.id for variable_id in self.variable_ids} + for variable_vals in template_vals['variable_ids']: + if not existing_template_variables.pop((variable_vals['name'], variable_vals['line_type']), False): + variable_ids.append((0, 0, variable_vals)) + variable_ids.extend([(2, to_remove) for to_remove in existing_template_variables.values()]) + update_vals['variable_ids'] = variable_ids + + for button in template_vals['button_ids']: + button['variable_ids'] = [(0, 0, var) for var in button['variable_ids']] + update_vals['button_ids'] = [(5, 0, 0)] + [(0, 0, button) for button in template_vals['button_ids']] + + if not self.header_attachment_ids or self.header_type != template_vals['header_type']: + new_attachment_commands = [(0, 0, attachment) for attachment in template_vals['header_attachment_ids']] + update_vals['header_attachment_ids'] = [(5, 0, 0)] + new_attachment_commands + + self.write(update_vals) + + def _get_template_vals_from_response(self, remote_template_vals, wa_account): + """Get dictionary of field: values from whatsapp template response json. + + Relational fields will use arrays instead of commands. + """ + template_vals = { + 'body': False, + 'button_ids': [], + 'footer_text': False, + 'header_text': False, + 'header_attachment_ids': [], + 'header_type': 'none', + 'lang_code': remote_template_vals['language'], + 'name': remote_template_vals['name'].replace("_", " ").title(), + 'status': remote_template_vals['status'].lower(), + 'template_name': remote_template_vals['name'], + 'template_type': remote_template_vals['category'].lower(), + 'variable_ids': [], + 'wa_account_id': wa_account.id, + 'wa_template_uid': int(remote_template_vals['id']), + } + for component in remote_template_vals['components']: + component_type = component['type'] + if component_type == 'HEADER': + template_vals['header_type'] = component['format'].lower() + if component['format'] == 'TEXT': + template_vals['header_text'] = component['text'] + if 'example' in component: + for index, example_value in enumerate(component['example'].get('header_text', [])): + template_vals['variable_ids'].append({ + 'name': '{{%s}}' % (index + 1), + 'demo_value': example_value, + 'line_type': 'header', + }) + elif component['format'] == 'LOCATION': + for location_val in ['name', 'address', 'latitude', 'longitude']: + template_vals['variable_ids'].append({ + 'name': location_val, + 'line_type': 'location', + }) + elif component['format'] in ('IMAGE', 'VIDEO', 'DOCUMENT'): + # TODO RETH fetch remote example if set + extension, mimetype = { + 'IMAGE': ('jpg', 'image/jpeg'), + 'VIDEO': ('mp4', 'video/mp4'), + 'DOCUMENT': ('pdf', 'application/pdf') + }[component['format']] + template_vals['header_attachment_ids'] = [{ + 'name': f'Missing.{extension}', 'res_model': self._name, 'res_id': self.ids[0] if self else False, + 'datas': "AAAA", 'mimetype': mimetype}] + elif component_type == 'BODY': + template_vals['body'] = component['text'] + if 'example' in component: + for index, example_value in enumerate(component['example'].get('body_text', [[]])[0]): + template_vals['variable_ids'].append({ + 'name': '{{%s}}' % (index + 1), + 'demo_value': example_value, + 'line_type': 'body', + }) + elif component_type == 'FOOTER': + template_vals['footer_text'] = component['text'] + elif component_type == 'BUTTONS': + for index, button in enumerate(component['buttons']): + if button['type'] in ('URL', 'PHONE_NUMBER', 'QUICK_REPLY'): + button_vals = { + 'sequence': index, + 'name': button['text'], + 'button_type': button['type'].lower(), + 'call_number': button.get('phone_number'), + 'website_url': button.get('url').replace('{{1}}', '') if button.get('url') else None, + 'url_type': button.get('example', []) and 'dynamic' or 'static', + 'variable_ids': [] + } + for example_index, example_value in enumerate(button.get('example', [])): + button_vals['variable_ids'].append({ + 'name': '{{%s}}' % (example_index + 1), + 'demo_value': example_value, + 'line_type': 'button', + }) + template_vals['button_ids'].append(button_vals) + return template_vals + + #======================================================================== + # Send WhatsApp message using template + #======================================================================== + + def _get_header_component(self, free_text_json, template_variables_value, attachment): + """ Prepare header component for sending WhatsApp template message""" + header = [] + header_type = self.header_type + if header_type == 'text' and template_variables_value.get('header-{{1}}'): + value = (free_text_json or {}).get('header_text') or template_variables_value.get('header-{{1}}') or ' ' + header = { + 'type': 'header', + 'parameters': [{'type': 'text', 'text': value}] + } + elif header_type in ['image', 'video', 'document']: + header = { + 'type': 'header', + 'parameters': [self.env['whatsapp.message']._prepare_attachment_vals(attachment, wa_account_id=self.wa_account_id)] + } + elif header_type == 'location': + header = { + 'type': 'header', + 'parameters': [self._prepare_location_vals(template_variables_value)] + } + return header + + def _prepare_location_vals(self, template_variables_value): + """ Prepare location values for sending WhatsApp template message having header type location""" + self._check_location_latitude_longitude(template_variables_value.get('location-latitude'), template_variables_value.get('location-longitude')) + return { + 'type': 'location', + 'location': { + 'name': template_variables_value.get('location-name'), + 'address': template_variables_value.get('location-address'), + 'latitude': template_variables_value.get('location-latitude'), + 'longitude': template_variables_value.get('location-longitude'), + } + } + + def _get_body_component(self, free_text_json, template_variables_value): + """ Prepare body component for sending WhatsApp template message""" + if not self.variable_ids: + return None + parameters = [] + free_text_count = 1 + for body_val in self.variable_ids.filtered(lambda line: line.line_type == 'body'): + free_text_value = body_val.field_type == 'free_text' and free_text_json.get(f'free_text_{free_text_count}') or False + parameters.append({ + 'type': 'text', + 'text': free_text_value or template_variables_value.get(f'{body_val.line_type}-{body_val.name}') or ' ' + }) + if body_val.field_type == 'free_text': + free_text_count += 1 + return {'type': 'body', 'parameters': parameters} + + def _get_button_components(self, free_text_json, template_variables_value): + """ Prepare button component for sending WhatsApp template message""" + components = [] + if not self.variable_ids: + return components + dynamic_buttons = self.button_ids.filtered(lambda line: line.url_type == 'dynamic') + dynamic_index = {button: i for i, button in enumerate(self.button_ids)} + free_text_index = 1 + for button in dynamic_buttons: + button_var = button.variable_ids[0] + dynamic_url = button.website_url + if button_var.field_type == 'free_text': + value = free_text_json.get(f'button_dynamic_url_{free_text_index}') or ' ' + free_text_index += 1 + else: + value = template_variables_value.get(f'button-{button.name}') or ' ' + value = value.replace(dynamic_url, '').lstrip('/') # / is implicit + components.append({ + 'type': 'button', + 'sub_type': 'url', + 'index': dynamic_index.get(button), + 'parameters': [{'type': 'text', 'text': value}] + }) + return components + + def _get_send_template_vals(self, record, free_text_json, attachment=False): + """Prepare JSON dictionary for sending WhatsApp template message""" + self.ensure_one() + components = [] + template_variables_value = self.variable_ids._get_variables_value(record) + attachment = attachment or self.header_attachment_ids or self._generate_attachment_from_report(record) + header = self._get_header_component(free_text_json=free_text_json, attachment=attachment, template_variables_value=template_variables_value) + body = self._get_body_component(free_text_json=free_text_json, template_variables_value=template_variables_value) + buttons = self._get_button_components(free_text_json=free_text_json, template_variables_value=template_variables_value) + if header: + components.append(header) + if body: + components.append(body) + components.extend(buttons) + template_vals = { + 'name': self.template_name, + 'language': {'code': self.lang_code}, + } + if components: + template_vals['components'] = components + return template_vals, attachment + + def button_reset_to_draft(self): + for tmpl in self: + tmpl.write({'status': 'draft'}) + + def action_open_messages(self): + self.ensure_one() + return { + 'name': _("Message Statistics Of %(template_name)s", template_name=self.name), + 'view_mode': 'tree,form', + 'res_model': 'whatsapp.message', + 'domain': [('wa_template_id', '=', self.id)], + 'type': 'ir.actions.act_window', + } + + def button_create_action(self): + """ Create action for sending WhatsApp template message in model defined in template. It will be used in bulk sending""" + self.check_access_rule('write') + actions = self.env['ir.actions.act_window'].sudo().search([ + ('res_model', '=', 'whatsapp.composer'), + ('binding_model_id', 'in', self.model_id.ids) + ]) + self.env['ir.actions.act_window'].sudo().create([ + { + 'binding_model_id': model.id, + 'name': _('WhatsApp Message'), + 'res_model': 'whatsapp.composer', + 'target': 'new', + 'type': 'ir.actions.act_window', + 'view_mode': 'form', + } + for model in (self.model_id - actions.binding_model_id) + ]) + + def button_delete_action(self): + self.check_access_rule('write') + self.env['ir.actions.act_window'].sudo().search([ + ('res_model', '=', 'whatsapp.composer'), + ('binding_model_id', 'in', self.model_id.ids) + ]).unlink() + + def _generate_attachment_from_report(self, record=False): + """Create attachment from report if relevant""" + if record and self.header_type == 'document' and self.report_id: + report_content, report_format = self.report_id._render_qweb_pdf(self.report_id, record.id) + if self.report_id.print_report_name: + report_name = safe_eval(self.report_id.print_report_name, {'object': record}) + '.' + report_format + else: + report_name = self.display_name + '.' + report_format + return self.env['ir.attachment'].create({ + 'name': report_name, + 'raw': report_content, + 'mimetype': 'application/pdf', + }) + return self.env['ir.attachment'] + + def _check_location_latitude_longitude(self, latitude, longitude): + if not re.match(LATITUDE_LONGITUDE_REGEX, f"{latitude}, {longitude}"): + raise ValidationError( + _("Location Latitude and Longitude %(latitude)s / %(longitude)s is not in proper format.", + latitude=latitude, longitude=longitude) + ) + + @api.model + def _format_markup_to_html(self, body_html): + """ + Convert WhatsApp format text to HTML format text + *bold* -> bold + _italic_ -> italic + ~strikethrough~ ->monospace
+ """
+ formatted_body = str(plaintext2html(body_html)) # stringify for regex
+ formatted_body = re.sub(r'\*(.*?)\*', r'\1', formatted_body)
+ formatted_body = re.sub(r'_(.*?)_', r'\1', formatted_body)
+ formatted_body = re.sub(r'~(.*?)~', r'\1', formatted_body)
+ return Markup(formatted_body)
+
+ def _get_formatted_body(self, demo_fallback=False, variable_values=None):
+ """Get formatted body and header with specified values.
+
+ :param bool demo_fallback: if true, fallback on demo values instead of blanks
+ :param dict variable_values: values to use instead of demo values {'header-{{1}}': 'Hello'}
+ :return Markup:
+ """
+ self.ensure_one()
+ variable_values = variable_values or {}
+ header = ''
+ if self.header_type == 'text' and self.header_text:
+ header_variables = self.variable_ids.filtered(lambda line: line.line_type == 'header')
+ if header_variables:
+ fallback_value = header_variables[0].demo_value if demo_fallback else ' '
+ header = self.header_text.replace('{{1}}', variable_values.get('header-{{1}}', fallback_value))
+ body = self.body
+ for var in self.variable_ids.filtered(lambda var: var.line_type == 'body'):
+ fallback_value = var.demo_value if demo_fallback else ' '
+ body = body.replace(var.name, variable_values.get(f'{var.line_type}-{var.name}', fallback_value))
+ return self._format_markup_to_html(f'{header}\n{body}' if header else body)
+
+
+ # ------------------------------------------------------------
+ # TOOLS
+ # ------------------------------------------------------------
+
+ @api.model
+ def _can_use_whatsapp(self, model_name):
+ return self.env.user.has_group('whatsapp.group_whatsapp_admin') or \
+ bool(self._find_default_for_model(model_name))
+
+ @api.model
+ def _find_default_for_model(self, model_name):
+ return self.search([
+ ('model', '=', model_name),
+ ('status', '=', 'approved'),
+ '|',
+ ('allowed_user_ids', '=', False),
+ ('allowed_user_ids', 'in', self.env.user.ids)
+ ], limit=1)
diff --git a/odex25_base/whatsapp/models/whatsapp_template_button.py b/odex25_base/whatsapp/models/whatsapp_template_button.py
new file mode 100644
index 000000000..5c5022baf
--- /dev/null
+++ b/odex25_base/whatsapp/models/whatsapp_template_button.py
@@ -0,0 +1,81 @@
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from urllib.parse import urlparse
+
+from odoo import api, fields, models, _
+from odoo.exceptions import ValidationError
+from odoo.addons.phone_validation.tools import phone_validation
+
+
+class WhatsAppTemplateButton(models.Model):
+ _name = 'whatsapp.template.button'
+ _description = 'WhatsApp Template Button'
+ _order = 'sequence,id'
+
+ sequence = fields.Integer()
+ name = fields.Char(string="Button Text", size=25)
+ wa_template_id = fields.Many2one(comodel_name='whatsapp.template', required=True, ondelete='cascade')
+
+ button_type = fields.Selection([
+ ('url', 'Visit Website'),
+ ('phone_number', 'Call Number'),
+ ('quick_reply', 'Quick Reply')], string="Type", required=True, default='quick_reply')
+ url_type = fields.Selection([
+ ('static', 'Static'),
+ ('dynamic', 'Dynamic')], string="Url Type", default='static')
+ website_url = fields.Char(string="Website URL")
+ call_number = fields.Char(string="Call Number")
+ variable_ids = fields.One2many('whatsapp.template.variable', 'button_id',
+ compute='_compute_variable_ids', precompute=True, store=True)
+
+ _sql_constraints = [
+ (
+ 'unique_name_per_template',
+ 'UNIQUE(name, wa_template_id)',
+ "Button names must be unique in a given template"
+ )
+ ]
+
+ @api.depends('button_type', 'url_type', 'website_url')
+ def _compute_variable_ids(self):
+ dynamic_urls = self.filtered(lambda button: button.button_type == 'url' and button.url_type == 'dynamic')
+ to_clear = self - dynamic_urls
+ for button in dynamic_urls:
+ url_vars = {'{{1}}'} # for now the var is mandatory and automatically added at the end of the url
+ if not url_vars and button.variable_ids:
+ to_clear += button
+ continue
+ existing_vars = {var.name: var for var in button.variable_ids}
+ unlink_commands = [(3, var.id) for name, var in existing_vars.items() if name not in url_vars]
+ create_commands = [(0, 0, {
+ 'demo_value': button.website_url + '???', 'line_type': 'button',
+ 'name': name, 'wa_template_id': button.wa_template_id.id})
+ for name in url_vars - existing_vars.keys()]
+ if unlink_commands or create_commands:
+ button.write({'variable_ids': unlink_commands + create_commands})
+ if to_clear:
+ to_clear.write({'variable_ids': [(5, 0, 0)]})
+
+ def check_variable_ids(self):
+ for button in self:
+ if len(button.variable_ids) > 1:
+ raise ValidationError(_('Buttons may only contain one placeholder.'))
+ if button.variable_ids and button.url_type != 'dynamic':
+ raise ValidationError(_('Only dynamic urls may have a placeholder.'))
+ elif button.url_type == 'dynamic' and not button.variable_ids:
+ raise ValidationError(_('All dynamic urls must have a placeholder.'))
+ if button.variable_ids.name != "{{1}}":
+ raise ValidationError(_('The placeholder for a button can only be {{1}}.'))
+
+ @api.constrains('button_type', 'url_type', 'variable_ids', 'website_url')
+ def _validate_website_url(self):
+ for button in self.filtered(lambda button: button.button_type == 'url'):
+ parsed_url = urlparse(button.website_url)
+ if not (parsed_url.scheme in {'http', 'https'} and parsed_url.netloc):
+ raise ValidationError(_("Please enter a valid URL in the format 'https://www.example.com'."))
+
+ @api.constrains('call_number')
+ def _validate_call_number(self):
+ for button in self:
+ if button.button_type == 'phone_number':
+ phone_validation.phone_format(button.call_number, False, False)
diff --git a/odex25_base/whatsapp/models/whatsapp_template_variable.py b/odex25_base/whatsapp/models/whatsapp_template_variable.py
new file mode 100644
index 000000000..bd9a55816
--- /dev/null
+++ b/odex25_base/whatsapp/models/whatsapp_template_variable.py
@@ -0,0 +1,152 @@
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from functools import reduce
+from werkzeug.urls import url_join
+
+from odoo import api, models, fields, _
+from odoo.exceptions import UserError, ValidationError
+
+class WhatsAppTemplateVariable(models.Model):
+ _name = 'whatsapp.template.variable'
+ _description = 'WhatsApp Template Variable'
+ _order = 'line_type desc, name, id'
+
+ name = fields.Char(string="Placeholder", required=True)
+ button_id = fields.Many2one('whatsapp.template.button', ondelete='cascade')
+ wa_template_id = fields.Many2one(comodel_name='whatsapp.template', required=True, ondelete='cascade')
+ model = fields.Char(string="Model Name", related='wa_template_id.model')
+
+ line_type = fields.Selection([
+ ('button', 'Button'),
+ ('header', 'Header'),
+ ('location', 'Location'),
+ ('body', 'Body')], string="Variable location", required=True)
+ field_type = fields.Selection([
+ ('user_name', 'User Name'),
+ ('user_mobile', 'User Mobile'),
+ ('free_text', 'Free Text'),
+ ('portal_url', 'Portal Link'),
+ ('field', 'Field of Model')], string="Type", default='free_text', required=True)
+ field_name = fields.Char(string="Field")
+ demo_value = fields.Char(string="Sample Value", default="Sample Value", required=True)
+
+ _sql_constraints = [
+ (
+ 'name_type_template_unique',
+ 'UNIQUE(name, line_type, wa_template_id, button_id)',
+ 'Variable names must be unique for a given template'
+ ),
+ ]
+
+ @api.constrains('field_type', 'demo_value')
+ def _check_demo_values(self):
+ if self.filtered(lambda var: var.field_type == 'free_text' and not var.demo_value):
+ raise ValidationError(_('Free Text template variables must have a demo value.'))
+ if self.filtered(lambda var: var.field_type == 'field' and not var.field_name):
+ raise ValidationError(_("Field template variables must be associated with a field."))
+ for var in self.filtered('button_id'):
+ if not var.demo_value.startswith(var.button_id.website_url):
+ raise ValidationError(_('Demo value of a dynamic url must start with the non-dynamic part'
+ 'of the url such as "https://www.example.com/menu?id=20"'))
+
+ @api.constrains('field_name')
+ def _check_field_name(self):
+ is_system = self.user_has_groups('base.group_system')
+ for variable in self.filtered('field_name'):
+ model = self.env[variable.model]
+ if not is_system:
+ if not model.check_access_rights('read', raise_exception=False):
+ model_description = self.env['ir.model']._get(variable.model).display_name
+ raise ValidationError(
+ _("You can not select field of %(model)s.", model=model_description)
+ )
+ safe_fields = model._get_whatsapp_safe_fields() if hasattr(model, '_get_whatsapp_safe_fields') else []
+ if variable.field_name not in safe_fields:
+ raise ValidationError(
+ _("You are not allowed to use field %(field)s, contact your administrator.",
+ field=variable.field_name)
+ )
+ try:
+ model._find_value_from_field_path(variable.field_name)
+ except UserError as err:
+ raise ValidationError(
+ _("'%(field)s' does not seem to be a valid field path", field=variable.field_name)
+ ) from err
+
+ @api.constrains('name')
+ def _check_name(self):
+ for variable in self:
+ if variable.line_type == 'location' and variable.name not in {'name', 'address', 'latitude', 'longitude'}:
+ raise ValidationError(
+ _("Location variable should be 'name', 'address', 'latitude' or 'longitude'. Cannot parse '%(placeholder)s'",
+ placeholder=variable.name))
+ if variable.line_type != 'location' and not variable._extract_variable_index():
+ raise ValidationError(
+ _('"Template variable should be in format {{number}}. Cannot parse "%(placeholder)s"',
+ placeholder=variable.name))
+
+ @api.constrains('button_id', 'line_type')
+ def _check_button_id(self):
+ for variable in self:
+ if variable.line_type == 'button' and not variable.button_id:
+ raise ValidationError(_('Button variables must be linked to a button.'))
+
+ @api.depends('line_type', 'name')
+ def _compute_display_name(self):
+ for variable in self:
+ if variable.line_type in ('body', 'location'):
+ variable.display_name = f'{variable.line_type} - {variable.name}'
+ elif variable.line_type == 'button':
+ variable.display_name = f'{variable.line_type} "{variable.button_id.name}" - {variable.name}'
+ else:
+ variable.display_name = variable.line_type
+
+ @api.onchange('model')
+ def _onchange_model_id(self):
+ self.field_name = False
+
+ @api.onchange('field_type')
+ def _onchange_field_type(self):
+ if self.field_type != 'field':
+ self.field_name = False
+
+ def _get_variables_value(self, record):
+ value_by_name = {}
+ user = self.env.user
+ for variable in self:
+ if variable.field_type == 'user_name':
+ value = user.name
+ elif variable.field_type == 'user_mobile':
+ value = user.mobile
+ elif variable.field_type == 'field':
+ value = variable._find_value_from_field_chain(record)
+ elif variable.field_type == 'portal_url':
+ portal_url = record._whatsapp_get_portal_url()
+ value = url_join(variable.get_base_url(), (portal_url or ''))
+ else:
+ value = variable.demo_value
+
+ value_str = value and str(value) or ''
+ if variable.button_id:
+ value_by_name[f"button-{variable.button_id.name}"] = value_str
+ else:
+ value_by_name[f"{variable.line_type}-{variable.name}"] = value_str
+
+ return value_by_name
+
+ # ------------------------------------------------------------
+ # TOOLS
+ # ------------------------------------------------------------
+
+ def _find_value_from_field_chain(self, record):
+ """Get the value of field, returning display_name(s) if the field is a model."""
+ self.ensure_one()
+ return record.sudo(False)._find_value_from_field_path(self.field_name)
+
+ def _extract_variable_index(self):
+ """ Extract variable index, located between '{{}}' markers. """
+ self.ensure_one()
+ try:
+ return int(self.name.lstrip('{{').rstrip('}}'))
+ except ValueError:
+ return None
diff --git a/odex25_base/whatsapp/module.py b/odex25_base/whatsapp/module.py
new file mode 100644
index 000000000..49033a5c1
--- /dev/null
+++ b/odex25_base/whatsapp/module.py
@@ -0,0 +1,495 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+import ast
+import collections.abc
+import copy
+import functools
+import importlib
+import logging
+import os
+import pkg_resources
+import re
+import sys
+import warnings
+from os.path import join as opj, normpath
+
+import odoo
+import odoo.tools as tools
+import odoo.release as release
+from odoo.tools import pycompat
+from .misc import file_path
+# todo from odoo.modules.module
+
+MANIFEST_NAMES = ('__manifest__.py', '__openerp__.py')
+README = ['README.rst', 'README.md', 'README.txt']
+
+_DEFAULT_MANIFEST = {
+ #addons_path: f'/path/to/the/addons/path/of/{module}', # automatic
+ 'application': False,
+ 'bootstrap': False, # web
+ 'assets': {},
+ 'author': 'Odoo S.A.',
+ 'auto_install': False,
+ 'category': 'Uncategorized',
+ 'configurator_snippets': {}, # website themes
+ 'countries': [],
+ 'data': [],
+ 'demo': [],
+ 'demo_xml': [],
+ 'depends': [],
+ 'description': '',
+ 'external_dependencies': {},
+ #icon: f'/{module}/static/description/icon.png', # automatic
+ 'init_xml': [],
+ 'installable': True,
+ 'images': [], # website
+ 'images_preview_theme': {}, # website themes
+ #license, mandatory
+ 'live_test_url': '', # website themes
+ 'new_page_templates': {}, # website themes
+ #name, mandatory
+ 'post_init_hook': '',
+ 'post_load': '',
+ 'pre_init_hook': '',
+ 'sequence': 100,
+ 'summary': '',
+ 'test': [],
+ 'update_xml': [],
+ 'uninstall_hook': '',
+ 'version': '1.0',
+ 'web': False,
+ 'website': '',
+}
+
+_logger = logging.getLogger(__name__)
+
+
+class UpgradeHook(object):
+ """Makes the legacy `migrations` package being `odoo.upgrade`"""
+
+ def find_spec(self, fullname, path=None, target=None):
+ if re.match(r"^odoo\.addons\.base\.maintenance\.migrations\b", fullname):
+ # We can't trigger a DeprecationWarning in this case.
+ # In order to be cross-versions, the multi-versions upgrade scripts (0.0.0 scripts),
+ # the tests, and the common files (utility functions) still needs to import from the
+ # legacy name.
+ return importlib.util.spec_from_loader(fullname, self)
+
+ def load_module(self, name):
+ assert name not in sys.modules
+
+ canonical_upgrade = name.replace("odoo.addons.base.maintenance.migrations", "odoo.upgrade")
+
+ if canonical_upgrade in sys.modules:
+ mod = sys.modules[canonical_upgrade]
+ else:
+ mod = importlib.import_module(canonical_upgrade)
+
+ sys.modules[name] = mod
+
+ return sys.modules[name]
+
+
+def initialize_sys_path():
+ """
+ Setup the addons path ``odoo.addons.__path__`` with various defaults
+ and explicit directories.
+ """
+ # hook odoo.addons on data dir
+ dd = os.path.normcase(tools.config.addons_data_dir)
+ if os.access(dd, os.R_OK) and dd not in odoo.addons.__path__:
+ odoo.addons.__path__.append(dd)
+
+ # hook odoo.addons on addons paths
+ for ad in tools.config['addons_path'].split(','):
+ ad = os.path.normcase(os.path.abspath(tools.ustr(ad.strip())))
+ if ad not in odoo.addons.__path__:
+ odoo.addons.__path__.append(ad)
+
+ # hook odoo.addons on base module path
+ base_path = os.path.normcase(os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(__file__)), 'addons')))
+ if base_path not in odoo.addons.__path__ and os.path.isdir(base_path):
+ odoo.addons.__path__.append(base_path)
+
+ # hook odoo.upgrade on upgrade-path
+ from odoo import upgrade
+ legacy_upgrade_path = os.path.join(base_path, 'base', 'maintenance', 'migrations')
+ for up in (tools.config['upgrade_path'] or legacy_upgrade_path).split(','):
+ up = os.path.normcase(os.path.abspath(tools.ustr(up.strip())))
+ if os.path.isdir(up) and up not in upgrade.__path__:
+ upgrade.__path__.append(up)
+
+ # create decrecated module alias from odoo.addons.base.maintenance.migrations to odoo.upgrade
+ spec = importlib.machinery.ModuleSpec("odoo.addons.base.maintenance", None, is_package=True)
+ maintenance_pkg = importlib.util.module_from_spec(spec)
+ maintenance_pkg.migrations = upgrade
+ sys.modules["odoo.addons.base.maintenance"] = maintenance_pkg
+ sys.modules["odoo.addons.base.maintenance.migrations"] = upgrade
+
+ # hook deprecated module alias from openerp to odoo and "crm"-like to odoo.addons
+ if not getattr(initialize_sys_path, 'called', False): # only initialize once
+ sys.meta_path.insert(0, UpgradeHook())
+ initialize_sys_path.called = True
+
+
+def get_module_path(module, downloaded=False, display_warning=True):
+ """Return the path of the given module.
+
+ Search the addons paths and return the first path where the given
+ module is found. If downloaded is True, return the default addons
+ path if nothing else is found.
+
+ """
+ if re.search(r"[\/\\]", module):
+ return False
+ for adp in odoo.addons.__path__:
+ files = [opj(adp, module, manifest) for manifest in MANIFEST_NAMES] +\
+ [opj(adp, module + '.zip')]
+ if any(os.path.exists(f) for f in files):
+ return opj(adp, module)
+
+ if downloaded:
+ return opj(tools.config.addons_data_dir, module)
+ if display_warning:
+ _logger.warning('module %s: module not found', module)
+ return False
+
+def get_module_filetree(module, dir='.'):
+ warnings.warn(
+ "Since 16.0: use os.walk or a recursive glob or something",
+ DeprecationWarning,
+ stacklevel=2
+ )
+ path = get_module_path(module)
+ if not path:
+ return False
+
+ dir = os.path.normpath(dir)
+ if dir == '.':
+ dir = ''
+ if dir.startswith('..') or (dir and dir[0] == '/'):
+ raise Exception('Cannot access file outside the module')
+
+ files = odoo.tools.osutil.listdir(path, True)
+
+ tree = {}
+ for f in files:
+ if not f.startswith(dir):
+ continue
+
+ if dir:
+ f = f[len(dir)+int(not dir.endswith('/')):]
+ lst = f.split(os.sep)
+ current = tree
+ while len(lst) != 1:
+ current = current.setdefault(lst.pop(0), {})
+ current[lst.pop(0)] = None
+
+ return tree
+
+def get_resource_path(module, *args):
+ """Return the full path of a resource of the given module.
+
+ :param module: module name
+ :param list(str) args: resource path components within module
+
+ :rtype: str
+ :return: absolute path to the resource
+ """
+ warnings.warn(
+ f"Since 17.0: use tools.misc.file_path instead of get_resource_path({module}, {args})",
+ DeprecationWarning,
+ )
+ resource_path = opj(module, *args)
+ try:
+ return file_path(resource_path)
+ except (FileNotFoundError, ValueError):
+ return False
+
+# backwards compatibility
+get_module_resource = get_resource_path
+check_resource_path = get_resource_path
+
+def get_resource_from_path(path):
+ """Tries to extract the module name and the resource's relative path
+ out of an absolute resource path.
+
+ If operation is successful, returns a tuple containing the module name, the relative path
+ to the resource using '/' as filesystem seperator[1] and the same relative path using
+ os.path.sep seperators.
+
+ [1] same convention as the resource path declaration in manifests
+
+ :param path: absolute resource path
+
+ :rtype: tuple
+ :return: tuple(module_name, relative_path, os_relative_path) if possible, else None
+ """
+ resource = False
+ sorted_paths = sorted(odoo.addons.__path__, key=len, reverse=True)
+ for adpath in sorted_paths:
+ # force trailing separator
+ adpath = os.path.join(adpath, "")
+ if os.path.commonprefix([adpath, path]) == adpath:
+ resource = path.replace(adpath, "", 1)
+ break
+
+ if resource:
+ relative = resource.split(os.path.sep)
+ if not relative[0]:
+ relative.pop(0)
+ module = relative.pop(0)
+ return (module, '/'.join(relative), os.path.sep.join(relative))
+ return None
+
+def get_module_icon(module):
+ fpath = f"{module}/static/description/icon.png"
+ try:
+ file_path(fpath)
+ return "/" + fpath
+ except FileNotFoundError:
+ return "/base/static/description/icon.png"
+
+def get_module_icon_path(module):
+ try:
+ return file_path(f"{module}/static/description/icon.png")
+ except FileNotFoundError:
+ return file_path("base/static/description/icon.png")
+
+def module_manifest(path):
+ """Returns path to module manifest if one can be found under `path`, else `None`."""
+ if not path:
+ return None
+ for manifest_name in MANIFEST_NAMES:
+ candidate = opj(path, manifest_name)
+ if os.path.isfile(candidate):
+ if manifest_name == '__openerp__.py':
+ warnings.warn(
+ "__openerp__.py manifests are deprecated since 17.0, "
+ f"rename {candidate!r} to __manifest__.py "
+ "(valid since 10.0)",
+ category=DeprecationWarning
+ )
+ return candidate
+
+def get_module_root(path):
+ """
+ Get closest module's root beginning from path
+
+ # Given:
+ # /foo/bar/module_dir/static/src/...
+
+ get_module_root('/foo/bar/module_dir/static/')
+ # returns '/foo/bar/module_dir'
+
+ get_module_root('/foo/bar/module_dir/')
+ # returns '/foo/bar/module_dir'
+
+ get_module_root('/foo/bar')
+ # returns None
+
+ @param path: Path from which the lookup should start
+
+ @return: Module root path or None if not found
+ """
+ while not module_manifest(path):
+ new_path = os.path.abspath(opj(path, os.pardir))
+ if path == new_path:
+ return None
+ path = new_path
+ return path
+
+def load_manifest(module, mod_path=None):
+ """ Load the module manifest from the file system. """
+
+ if not mod_path:
+ mod_path = get_module_path(module, downloaded=True)
+ manifest_file = module_manifest(mod_path)
+
+ if not manifest_file:
+ _logger.debug('module %s: no manifest file found %s', module, MANIFEST_NAMES)
+ return {}
+
+ manifest = copy.deepcopy(_DEFAULT_MANIFEST)
+
+ manifest['icon'] = get_module_icon(module)
+
+ with tools.file_open(manifest_file, mode='r') as f:
+ manifest.update(ast.literal_eval(f.read()))
+
+ if not manifest['description']:
+ readme_path = [opj(mod_path, x) for x in README
+ if os.path.isfile(opj(mod_path, x))]
+ if readme_path:
+ with tools.file_open(readme_path[0]) as fd:
+ manifest['description'] = fd.read()
+
+ if not manifest.get('license'):
+ manifest['license'] = 'LGPL-3'
+ _logger.warning("Missing `license` key in manifest for %r, defaulting to LGPL-3", module)
+
+ # auto_install is either `False` (by default) in which case the module
+ # is opt-in, either a list of dependencies in which case the module is
+ # automatically installed if all dependencies are (special case: [] to
+ # always install the module), either `True` to auto-install the module
+ # in case all dependencies declared in `depends` are installed.
+ if isinstance(manifest['auto_install'], collections.abc.Iterable):
+ manifest['auto_install'] = set(manifest['auto_install'])
+ non_dependencies = manifest['auto_install'].difference(manifest['depends'])
+ assert not non_dependencies,\
+ "auto_install triggers must be dependencies, found " \
+ "non-dependencies [%s] for module %s" % (
+ ', '.join(non_dependencies), module
+ )
+ elif manifest['auto_install']:
+ manifest['auto_install'] = set(manifest['depends'])
+
+ try:
+ manifest['version'] = adapt_version(manifest['version'])
+ except ValueError as e:
+ if manifest.get("installable", True):
+ raise ValueError(f"Module {module}: invalid manifest") from e
+ manifest['addons_path'] = normpath(opj(mod_path, os.pardir))
+
+ return manifest
+
+def get_manifest(module, mod_path=None):
+ """
+ Get the module manifest.
+
+ :param str module: The name of the module (sale, purchase, ...).
+ :param Optional[str] mod_path: The optional path to the module on
+ the file-system. If not set, it is determined by scanning the
+ addons-paths.
+ :returns: The module manifest as a dict or an empty dict
+ when the manifest was not found.
+ :rtype: dict
+ """
+ return copy.deepcopy(_get_manifest_cached(module, mod_path))
+
+@functools.lru_cache(maxsize=None)
+def _get_manifest_cached(module, mod_path=None):
+ return load_manifest(module, mod_path)
+
+def load_information_from_description_file(module, mod_path=None):
+ warnings.warn(
+ 'load_information_from_description_file() is a deprecated '
+ 'alias to get_manifest()', DeprecationWarning, stacklevel=2)
+ return get_manifest(module, mod_path)
+
+def load_openerp_module(module_name):
+ """ Load an OpenERP module, if not already loaded.
+
+ This loads the module and register all of its models, thanks to either
+ the MetaModel metaclass, or the explicit instantiation of the model.
+ This is also used to load server-wide module (i.e. it is also used
+ when there is no model to register).
+ """
+
+ qualname = f'odoo.addons.{module_name}'
+ if qualname in sys.modules:
+ return
+
+ try:
+ __import__(qualname)
+
+ # Call the module's post-load hook. This can done before any model or
+ # data has been initialized. This is ok as the post-load hook is for
+ # server-wide (instead of registry-specific) functionalities.
+ info = get_manifest(module_name)
+ if info['post_load']:
+ getattr(sys.modules[qualname], info['post_load'])()
+
+ except Exception:
+ _logger.critical("Couldn't load module %s", module_name)
+ raise
+
+def get_modules():
+ """Returns the list of module names
+ """
+ def listdir(dir):
+ def clean(name):
+ name = os.path.basename(name)
+ if name[-4:] == '.zip':
+ name = name[:-4]
+ return name
+
+ def is_really_module(name):
+ for mname in MANIFEST_NAMES:
+ if os.path.isfile(opj(dir, name, mname)):
+ return True
+ return [
+ clean(it)
+ for it in os.listdir(dir)
+ if is_really_module(it)
+ ]
+
+ plist = []
+ for ad in odoo.addons.__path__:
+ if not os.path.exists(ad):
+ _logger.warning("addons path does not exist: %s", ad)
+ continue
+ plist.extend(listdir(ad))
+ return sorted(set(plist))
+
+def get_modules_with_version():
+ modules = get_modules()
+ res = dict.fromkeys(modules, adapt_version('1.0'))
+ for module in modules:
+ try:
+ info = get_manifest(module)
+ res[module] = info['version']
+ except Exception:
+ continue
+ return res
+
+def adapt_version(version):
+ serie = release.major_version
+ if version == serie or not version.startswith(serie + '.'):
+ base_version = version
+ version = '%s.%s' % (serie, version)
+ else:
+ base_version = version[len(serie) + 1:]
+
+ if not re.match(r"^[0-9]+\.[0-9]+(?:\.[0-9]+)?$", base_version):
+ raise ValueError(f"Invalid version {base_version!r}. Modules should have a version in format `x.y`, `x.y.z`,"
+ f" `{serie}.x.y` or `{serie}.x.y.z`.")
+
+ return version
+
+
+current_test = None
+
+
+def check_python_external_dependency(pydep):
+ try:
+ pkg_resources.get_distribution(pydep)
+ except pkg_resources.DistributionNotFound as e:
+ try:
+ importlib.import_module(pydep)
+ _logger.info("python external dependency on '%s' does not appear to be a valid PyPI package. Using a PyPI package name is recommended.", pydep)
+ except ImportError:
+ # backward compatibility attempt failed
+ _logger.warning("DistributionNotFound: %s", e)
+ raise Exception('Python library not installed: %s' % (pydep,))
+ except pkg_resources.VersionConflict as e:
+ _logger.warning("VersionConflict: %s", e)
+ raise Exception('Python library version conflict: %s' % (pydep,))
+ except Exception as e:
+ _logger.warning("get_distribution(%s) failed: %s", pydep, e)
+ raise Exception('Error finding python library %s' % (pydep,))
+
+
+def check_manifest_dependencies(manifest):
+ depends = manifest.get('external_dependencies')
+ if not depends:
+ return
+ for pydep in depends.get('python', []):
+ check_python_external_dependency(pydep)
+
+ for binary in depends.get('bin', []):
+ try:
+ tools.find_in_path(binary)
+ except IOError:
+ raise Exception('Unable to find %r in path' % (binary,))
diff --git a/odex25_base/whatsapp/security/ir.model.access.csv b/odex25_base/whatsapp/security/ir.model.access.csv
new file mode 100644
index 000000000..cf86cdbeb
--- /dev/null
+++ b/odex25_base/whatsapp/security/ir.model.access.csv
@@ -0,0 +1,15 @@
+id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
+access_ir_model_wa_admin,access.ir.model.wa.admin,base.model_ir_model,group_whatsapp_admin,1,0,0,0
+access_whatsapp_account_user,access.whatsapp.account.user,model_whatsapp_account,base.group_user,1,0,0,0
+access_whatsapp_account_system_admin,access.whatsapp.account.system.admin,model_whatsapp_account,base.group_system,1,1,1,1
+access_whatsapp_account_administrator,access.whatsapp.account.admin,model_whatsapp_account,group_whatsapp_admin,1,1,1,0
+access_whatsapp_composer_user,access.whatsapp.composer,model_whatsapp_composer,base.group_user,1,1,1,1
+access_whatsapp_message_administrator,access.whatsapp.message,model_whatsapp_message,group_whatsapp_admin,1,1,1,1
+access_whatsapp_message_user,access.whatsapp.message,model_whatsapp_message,base.group_user,1,1,1,0
+access_whatsapp_preview_user,access.whatsapp.preview,model_whatsapp_preview,base.group_user,1,1,1,1
+access_whatsapp_template_administrator,access.whatsapp.template,model_whatsapp_template,group_whatsapp_admin,1,1,1,1
+access_whatsapp_template_user,access.whatsapp.template,model_whatsapp_template,base.group_user,1,0,0,0
+access_whatsapp_template_button_administrator,access.whatsapp.template.button,model_whatsapp_template_button,group_whatsapp_admin,1,1,1,1
+access_whatsapp_template_button_user,access.whatsapp.template.button,model_whatsapp_template_button,base.group_user,1,0,0,0
+access_whatsapp_template_variable_administrator,access.whatsapp.template.variable,model_whatsapp_template_variable,group_whatsapp_admin,1,1,1,1
+access_whatsapp_template_variable_user,access.whatsapp.template.variable,model_whatsapp_template_variable,base.group_user,1,0,0,0
diff --git a/odex25_base/whatsapp/security/ir_rules.xml b/odex25_base/whatsapp/security/ir_rules.xml
new file mode 100644
index 000000000..0702cc040
--- /dev/null
+++ b/odex25_base/whatsapp/security/ir_rules.xml
@@ -0,0 +1,89 @@
+
+Test
", + model: "discuss.channel", + res_id: channelId, + message_type: "whatsapp_message", + }); + const memberIds = pyEnv["discuss.channel.member"].search([["channel_id", "=", channelId]]); + pyEnv["discuss.channel.member"].write(memberIds, { + fetched_message_id: messageId, + seen_message_id: false, + }); + const { openDiscuss } = await start(); + await openDiscuss(channelId); + await contains(".o-mail-MessageSeenIndicator:not(.o-all-seen)"); + await contains(".o-mail-MessageSeenIndicator i"); + + const [channel] = pyEnv["discuss.channel"].searchRead([["id", "=", channelId]]); + // Simulate received channel seen notification + pyEnv["bus.bus"]._sendone(channel, "discuss.channel.member/seen", { + channel_id: channelId, + last_message_id: 100, + partner_id: partnerId2, + }); + await contains(".o-mail-MessageSeenIndicator i", { count: 2 }); +}); + +QUnit.test("No SeenIndicators if message has whatsapp error", async () => { + const pyEnv = await startServer(); + const partnerId2 = pyEnv["res.partner"].create({ name: "WhatsApp User" }); + const channelId = pyEnv["discuss.channel"].create({ + name: "WhatsApp 1", + channel_type: "whatsapp", + channel_member_ids: [ + Command.create({ partner_id: pyEnv.currentPartnerId }), + Command.create({ partner_id: partnerId2 }), + ], + }); + const messageId = pyEnv["mail.message"].create({ + author_id: pyEnv.currentPartnerId, + body: "Test
", + model: "discuss.channel", + res_id: channelId, + message_type: "whatsapp_message", + }); + pyEnv["whatsapp.message"].create({ + mail_message_id: messageId, + failure_reason: "Message Not Sent", + failure_type: "unknown", + state: "error", + }); + const memberIds = pyEnv["discuss.channel.member"].search([["channel_id", "=", channelId]]); + pyEnv["discuss.channel.member"].write(memberIds, { + fetched_message_id: messageId, + seen_message_id: false, + }); + const { openDiscuss } = await start(); + await openDiscuss(channelId); + await contains(".o-mail-Message .fa.fa-whatsapp.text-danger"); + await contains(".o-mail-MessageSeenIndicator", { count: 0 }); +}); + +QUnit.test("whatsapp template messages should have whatsapp icon in message header", async () => { + const pyEnv = await startServer(); + const channelId = pyEnv["discuss.channel"].create({ + name: "WhatsApp 1", + channel_type: "whatsapp", + }); + pyEnv["mail.message"].create({ + body: "WhatsApp Message", + model: "discuss.channel", + res_id: channelId, + message_type: "whatsapp_message", + }); + const { openDiscuss } = await start(); + await openDiscuss(channelId); + await contains(".o-mail-Message-header span.fa-whatsapp"); +}); + +QUnit.test("No Reply button if thread is expired", async () => { + const pyEnv = await startServer(); + const channelId = pyEnv["discuss.channel"].create({ + name: "WhatsApp 1", + channel_type: "whatsapp", + whatsapp_channel_valid_until: DateTime.utc().minus({ minutes: 1 }).toSQL(), + }); + pyEnv["mail.message"].create({ + body: "Test
", + model: "discuss.channel", + res_id: channelId, + message_type: "whatsapp_message", + }); + const { openDiscuss } = await start(); + await openDiscuss(channelId); + await contains(".o-mail-Composer"); + await contains(".o-mail-Message-actions button[title='Reply']", { count: 0 }); +}); diff --git a/odex25_base/whatsapp/static/tests/messaging_menu_patch_tests.js b/odex25_base/whatsapp/static/tests/messaging_menu_patch_tests.js new file mode 100644 index 000000000..ffc217ec1 --- /dev/null +++ b/odex25_base/whatsapp/static/tests/messaging_menu_patch_tests.js @@ -0,0 +1,35 @@ +/* @odoo-module */ + +import { startServer } from "@bus/../tests/helpers/mock_python_environment"; + +import { Command } from "@mail/../tests/helpers/command"; +import { start } from "@mail/../tests/helpers/test_utils"; + +import { click, contains } from "@web/../tests/utils"; + +QUnit.module("messaging menu (patch)"); + +QUnit.test("WhatsApp Channel notification items should have thread icon", async () => { + const pyEnv = await startServer(); + pyEnv["discuss.channel"].create({ + name: "WhatsApp 1", + channel_type: "whatsapp", + }); + await start(); + await click(".o_menu_systray i[aria-label='Messages']"); + await contains(".o-mail-NotificationItem .o-mail-ThreadIcon"); +}); + +QUnit.test("Notification items should have unread counter for unread messages", async () => { + const pyEnv = await startServer(); + pyEnv["discuss.channel"].create({ + name: "WhatsApp 1", + channel_type: "whatsapp", + channel_member_ids: [ + Command.create({ message_unread_counter: 1, partner_id: pyEnv.currentPartnerId }), + ], + }); + await start(); + await click(".o_menu_systray i[aria-label='Messages']"); + await contains(".o-mail-MessagingMenu-counter", { text: "1" }); +}); diff --git a/odex25_base/whatsapp/static/video/test.mp4 b/odex25_base/whatsapp/static/video/test.mp4 new file mode 100644 index 000000000..ab4a18745 Binary files /dev/null and b/odex25_base/whatsapp/static/video/test.mp4 differ diff --git a/odex25_base/whatsapp/tests/__init__.py b/odex25_base/whatsapp/tests/__init__.py new file mode 100644 index 000000000..895fe8ee5 --- /dev/null +++ b/odex25_base/whatsapp/tests/__init__.py @@ -0,0 +1,9 @@ +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from . import template_data +from . import test_discuss_channel +from . import test_phone_format +from . import test_security +from . import test_whatsapp_composer +from . import test_whatsapp_message +from . import test_whatsapp_template diff --git a/odex25_base/whatsapp/tests/common.py b/odex25_base/whatsapp/tests/common.py new file mode 100644 index 000000000..7317d66aa --- /dev/null +++ b/odex25_base/whatsapp/tests/common.py @@ -0,0 +1,596 @@ +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +import hashlib +import hmac +import json +import time + +from contextlib import contextmanager +from unittest.mock import patch + +from odoo.addons.base.models.res_partner import Partner +from odoo.addons.mail.tests.common import MailCommon, mail_new_test_user +from odoo.addons.whatsapp.tools.whatsapp_api import WhatsAppApi +from odoo.addons.whatsapp.models.whatsapp_message import WhatsAppMessage +from odoo.addons.whatsapp.tests.template_data import template_data +from odoo.addons.whatsapp.tools.whatsapp_exception import WhatsAppError +from odoo.tests import common, Form + + +class MockOutgoingWhatsApp(common.BaseCase): + """ Mock calls to WhatsApp API, provide tools and patch to know what happens + when contacting it. """ + + @contextmanager + def mockWhatsappGateway(self): + self._init_wa_mock() + wa_msg_origin = WhatsAppMessage.create + partner_create_origin = Partner.create + + # ------------------------------------------------------------ + # Whatsapp API + # ------------------------------------------------------------ + + def _get_all_template(): + return template_data + + def _get_template_data(wa_template_uid): + for tmpl in template_data["data"]: + if tmpl["id"] == wa_template_uid: + return tmpl + return {} + + def _send_whatsapp(number, *, send_vals, **kwargs): + if send_vals: + msg_uid = f'test_wa_{time.time():.9f}' + self._wa_msg_sent.append(msg_uid) + return msg_uid + raise WhatsAppError("Please make sure to define a template before proceeding.") + + def _submit_template_new(json_data): + if json_data: + return { + "id": f"{time.time():.15f}", + "status": "PENDING", + "category": "MARKETING", + } + raise WhatsAppError("Please make sure to define a template before proceeding.") + + def _upload_demo_document(attachment): + if attachment: + return "2:c2SpecFlow6karmaFsdWU=" + raise WhatsAppError("There is no attachment to upload.") + + def _upload_whatsapp_document(attachment): + if attachment: + return { + "messaging_product": "whatsapp", + "contacts": [{ + "input": self.whatsapp_account, + "wa_id": "1234567890", + }], + "messages": [{ + "id": "qwertyuiop0987654321", + }] + } + raise WhatsAppError("Please ensure you are using the correct file type and try again.") + + # ------------------------------------------------------------ + # Whatsapp Models + # ------------------------------------------------------------ + + def _res_partner_create(model, *args, **kwargs): + records = partner_create_origin(model, *args, **kwargs) + self._new_partners += records.sudo() + return records + + def _wa_message_create(model, *args, **kwargs): + res = wa_msg_origin(model, *args, **kwargs) + self._new_wa_msg += res.sudo() + return res + + try: + with patch.object(Partner, 'create', autospec=True, wraps=Partner, side_effect=_res_partner_create), \ + patch.object(WhatsAppApi, '_get_all_template', side_effect=_get_all_template), \ + patch.object(WhatsAppApi, '_get_template_data', side_effect=_get_template_data), \ + patch.object(WhatsAppApi, '_upload_demo_document', side_effect=_upload_demo_document), \ + patch.object(WhatsAppApi, '_upload_whatsapp_document', side_effect=_upload_whatsapp_document), \ + patch.object(WhatsAppApi, '_send_whatsapp', side_effect=_send_whatsapp), \ + patch.object(WhatsAppApi, '_submit_template_new', side_effect=_submit_template_new), \ + patch.object(WhatsAppMessage, 'create', autospec=True, wraps=WhatsAppMessage, side_effect=_wa_message_create): + yield + finally: + pass + + def _init_wa_mock(self): + self._new_partners = self.env['res.partner'].sudo() + self._new_wa_msg = self.env['whatsapp.message'].sudo() + self._wa_msg_sent = [] + + +class MockIncomingWhatsApp(common.HttpCase): + """ Mock and provide tools on incoming WhatsApp calls. """ + + # ------------------------------------------------------------ + # TOOLS FOR SIMULATING RECEPTION + # ------------------------------------------------------------ + + def _get_message_signature(self, account, message_data): + return hmac.new( + account.app_secret.encode(), + msg=message_data.encode(), + digestmod=hashlib.sha256, + ).hexdigest() + + def _receive_template_update(self, field, account, data): + """ Simulate reception of a template update from WhatsApp API. + + param field: field to update (e.g. "message_template_status_update") + param account: whatsapp.account + param data: data to send in the request (e.g. {"event": "APPROVED"}) + """ + data = json.dumps({ + "entry": [{ + "id": account.account_uid, + "changes": [ + { + "field": field, + "value": data, + } + ] + }] + }) + + return self._make_webhook_request( + account, + message_data=data, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature-256": f"sha256={self._get_message_signature(account, data)}", + } + ) + + def _receive_whatsapp_message(self, account, body, sender_phone_number, additional_message_values=None): + message_data = json.dumps({ + "entry": [{ + "id": account.account_uid, + "changes": [{ + "field": "messages", + "value": { + "metadata": {"phone_number_id": account.phone_uid}, + "messages": [ + dict({ + "id": f"test_wa_{time.time():.9f}", + "from": sender_phone_number, + "type": "text", + "text": {"body": body} + }, **(additional_message_values or {})) + ], + } + }] + }] + }) + + return self._make_webhook_request( + account, + message_data=message_data, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature-256": f"sha256={self._get_message_signature(account, message_data)}", + } + ) + + def _make_webhook_request(self, account, message_data=None, headers=None): + if not message_data: + message_data = json.dumps({'entry': [{'id': account.account_uid}]}).encode() + return self.url_open( + '/whatsapp/webhook/', data=message_data, headers={ + "Content-Type": "application/json", + **(headers or {}) + } + ).json() + + # ------------------------------------------------------------ + # TEST TOOLS AND ASSERTS + # ------------------------------------------------------------ + + def _find_discuss_channel(self, whatsapp_number): + # Remove me in master, moved in WhatsAppCase + return self.env["discuss.channel"].search([("whatsapp_number", "=", whatsapp_number)]) + + def assertWhatsAppChannel(self, sender_phone_number): + # Remove me in master, moved in WhatsAppCase + discuss_channel = self._find_discuss_channel(sender_phone_number) + self.assertEqual(len(discuss_channel), 1, f'Should find exactly one channel for number {sender_phone_number}') + self.assertEqual(len(discuss_channel.message_ids), 1) + return discuss_channel + + +class WhatsAppCase(MockOutgoingWhatsApp): + """ Common class with tools and asserts """ + + # ------------------------------------------------------------ + # TOOLS + # ------------------------------------------------------------ + + def _add_button_to_template(self, template, name, + button_type='quick_reply', sequence=1, + call_number=False, + url_type=False, + website_url=False): + template.write({ + 'button_ids': [(0, 0, { + 'button_type': button_type if button_type else 'quick_reply', + 'call_number': call_number if call_number else '', + 'name': name, + 'sequence': sequence, + 'url_type': url_type if url_type else 'static', + 'wa_template_id': template.id, + 'website_url': website_url if website_url else '', + })], + }) + + def _wa_composer_form(self, template, from_records, with_user=False, + add_context=None): + """ Create a whatsapp composer form, intended to run 'template' on + 'from_records'. + + :param with_user: a user to set on environment, allowing to check ACLs; + :param add_context: optional additional context values given to the + composer creation; + """ + context = dict( + { + 'active_model': from_records._name, + 'active_ids': from_records.ids, + 'default_wa_template_id': template.id, + }, **(add_context or {}) + ) + return Form(self.env['whatsapp.composer'].with_context(context).with_user(with_user or self.env.user)) + + def _instanciate_wa_composer_from_records(self, template, from_records, + with_user=False, + add_context=None): + """ Create a whatsapp composer to run 'template' on 'from_records'. + + :param with_user: a user to set on environment, allowing to check ACLs; + :param add_context: optional additional context values given to the + composer creation; + """ + context = dict( + {'active_model': from_records._name, 'active_ids': from_records.ids}, + **(add_context or {}) + ) + return self.env['whatsapp.composer'].with_context(context).with_user(with_user or self.env.user).create({ + 'wa_template_id': template.id, + }) + + # ------------------------------------------------------------ + # MESSAGE FIND AND ASSERTS + # ------------------------------------------------------------ + + def _find_wa_msg_wnumber(self, mobile_number): + """ Find a WA message, based on 'mobile_number' """ + for wa_msg in self._new_wa_msg: + if wa_msg.mobile_number == mobile_number: + return wa_msg + debug_info = '\n'.join( + f'From: {wa_msg.mobile_number} (ID {wa_msg.id})' + for wa_msg in self._new_wa_msg + ) + raise AssertionError( + f'whatsapp.message not found for number {mobile_number}\n{debug_info})' + ) + + def _find_wa_msg_wrecord(self, record): + """ Find a WA message, using linked record through its mail.message """ + for wa_msg in self._new_wa_msg: + if wa_msg.mail_message_id.model == record._name and wa_msg.mail_message_id.res_id == record.id: + return wa_msg + debug_info = '\n'.join( + f'From: {wa_msg.mobile_number} (ID {wa_msg.id})' + for wa_msg in self._new_wa_msg + ) + raise AssertionError( + f'whatsapp.message not found for record {record.display_name} ({record._name}/{record.id}\n{debug_info})' + ) + + def _assertWAMessage(self, wa_message, status='sent', + fields_values=None, attachment_values=None, + mail_message_values=None): + """ Assert content of WhatsApp message. + + :paramPlease evaluate {expected_value}.
', + } + ) + + # should crash + for field_path in [ + # does not exist on distant model + 'country_id.wrong', + # does not exist + 'wrong', + ]: + with self.subTest(field_path=field_path): + with self.assertRaises(exceptions.ValidationError): + variable.write({ + 'field_name': field_path + }) + + +@tagged('wa_composer') +class WhatsAppComposerPreview(WhatsAppComposerCase): + + @users('user_wa_admin') + def test_composer_preview(self): + """ Test preview feature from composer """ + body_var = 'Nishant' + header_var = 'Jigar' + template = self.env['whatsapp.template'].create({ + 'body': 'Feel *free* to *contact* {{1}}; he is ~great~ ~super~ super great !', + 'footer_text': 'Thank *you*', + 'header_text': 'Header ```Code Content``` {{1}}', + 'header_type': 'text', + 'variable_ids': [ + (5, 0, 0), + (0, 0, { + 'name': "{{1}}", + 'line_type': 'body', + 'field_type': "free_text", + 'demo_value': body_var, + }), + (0, 0, { + 'name': "{{1}}", + 'line_type': 'header', + 'field_type': "free_text", + 'demo_value': header_var, + }), + ], + 'wa_account_id': self.whatsapp_account.id, + }) + composer = self._instanciate_wa_composer_from_records(template, from_records=self.customers[0]) + + for expected_str in [ + f'HeaderCode Content {header_var}',
+ f'Feel free to contact {body_var}; he is Your Template has been rejected.
Reason : <b>Super Reason</b>
+
+ There is no WhatsApp Business Account configured. +
++ If you have credentials for cloud api then setup here and start using WhatsApp. + You can register a WhatsApp Business Account through + WhatsApp cloud api + +
++ No WhatsApp messages found. +
++ Create or sync WhatsApp Templates. +
++ You can retrieve templates from Facebook by clicking Sync Templates on the WhatsApp Business Account. + Or create templates here and send them for approval. + Please refer the Template Guidelines +
+