# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import base64
from unittest.mock import patch
from odoo.addons.mail.tests.common import mail_new_test_user
from odoo.addons.test_mail.tests.common import TestMailCommon
from odoo.addons.test_mail.models.test_mail_models import MailTestSimple
from odoo.exceptions import AccessError
from odoo.tools import mute_logger, formataddr
from odoo.tests import tagged
class TestMessageValues(TestMailCommon):
@classmethod
def setUpClass(cls):
super(TestMessageValues, cls).setUpClass()
cls._init_mail_gateway()
cls.alias_record = cls.env['mail.test.container'].with_context(cls._test_context).create({
'name': 'Pigs',
'alias_name': 'pigs',
'alias_contact': 'followers',
})
cls.Message = cls.env['mail.message'].with_user(cls.user_employee)
@mute_logger('odoo.models.unlink')
def test_mail_message_format(self):
record1 = self.env['mail.test.simple'].create({'name': 'Test1'})
message = self.env['mail.message'].create([{
'model': 'mail.test.simple',
'res_id': record1.id,
}])
res = message.message_format()
self.assertEqual(res[0].get('record_name'), 'Test1')
record1.write({"name": "Test2"})
res = message.message_format()
self.assertEqual(res[0].get('record_name'), 'Test2')
@mute_logger('odoo.models.unlink')
def test_mail_message_format_access(self):
"""
User that doesn't have access to a record should still be able to fetch
the record_name inside message_format.
"""
company_2 = self.env['res.company'].create({'name': 'Second Test Company'})
record1 = self.env['mail.test.multi.company'].create({
'name': 'Test1',
'company_id': company_2.id,
})
message = record1.message_post(body='', partner_ids=[self.user_employee.partner_id.id])
# We need to flush and invalidate the ORM cache since the record_name
# is already cached from the creation. Otherwise it will leak inside
# message_format.
message.flush()
message.invalidate_cache()
res = message.with_user(self.user_employee).message_format()
self.assertEqual(res[0].get('record_name'), 'Test1')
@mute_logger('odoo.models.unlink')
def test_mail_message_values_no_document_values(self):
msg = self.Message.create({
'reply_to': 'test.reply@example.com',
'email_from': 'test.from@example.com',
})
self.assertIn('-private', msg.message_id.split('@')[0], 'mail_message: message_id for a void message should be a "private" one')
self.assertEqual(msg.reply_to, 'test.reply@example.com')
self.assertEqual(msg.email_from, 'test.from@example.com')
@mute_logger('odoo.models.unlink')
def test_mail_message_values_no_document(self):
msg = self.Message.create({})
self.assertIn('-private', msg.message_id.split('@')[0], 'mail_message: message_id for a void message should be a "private" one')
reply_to_name = self.env.user.company_id.name
reply_to_email = '%s@%s' % (self.alias_catchall, self.alias_domain)
self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email)))
self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))
# no alias domain -> author
self.env['ir.config_parameter'].search([('key', '=', 'mail.catchall.domain')]).unlink()
msg = self.Message.create({})
self.assertIn('-private', msg.message_id.split('@')[0], 'mail_message: message_id for a void message should be a "private" one')
self.assertEqual(msg.reply_to, formataddr((self.user_employee.name, self.user_employee.email)))
self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))
# no alias catchall, no alias -> author
self.env['ir.config_parameter'].set_param('mail.catchall.domain', self.alias_domain)
self.env['ir.config_parameter'].search([('key', '=', 'mail.catchall.alias')]).unlink()
msg = self.Message.create({})
self.assertIn('-private', msg.message_id.split('@')[0], 'mail_message: message_id for a void message should be a "private" one')
self.assertEqual(msg.reply_to, formataddr((self.user_employee.name, self.user_employee.email)))
self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))
@mute_logger('odoo.models.unlink')
def test_mail_message_values_document_alias(self):
msg = self.Message.create({
'model': 'mail.test.container',
'res_id': self.alias_record.id
})
self.assertIn('-openerp-%d-mail.test' % self.alias_record.id, msg.message_id.split('@')[0])
reply_to_name = '%s %s' % (self.env.user.company_id.name, self.alias_record.name)
reply_to_email = '%s@%s' % (self.alias_record.alias_name, self.alias_domain)
self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email)))
self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))
# no alias domain -> author
self.env['ir.config_parameter'].search([('key', '=', 'mail.catchall.domain')]).unlink()
msg = self.Message.create({
'model': 'mail.test.container',
'res_id': self.alias_record.id
})
self.assertIn('-openerp-%d-mail.test' % self.alias_record.id, msg.message_id.split('@')[0])
self.assertEqual(msg.reply_to, formataddr((self.user_employee.name, self.user_employee.email)))
self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))
# no catchall -> don't care, alias
self.env['ir.config_parameter'].set_param('mail.catchall.domain', self.alias_domain)
self.env['ir.config_parameter'].search([('key', '=', 'mail.catchall.alias')]).unlink()
msg = self.Message.create({
'model': 'mail.test.container',
'res_id': self.alias_record.id
})
self.assertIn('-openerp-%d-mail.test' % self.alias_record.id, msg.message_id.split('@')[0])
reply_to_name = '%s %s' % (self.env.company.name, self.alias_record.name)
reply_to_email = '%s@%s' % (self.alias_record.alias_name, self.alias_domain)
self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email)))
self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))
@mute_logger('odoo.models.unlink')
def test_mail_message_values_document_no_alias(self):
test_record = self.env['mail.test.simple'].create({'name': 'Test', 'email_from': 'ignasse@example.com'})
msg = self.Message.create({
'model': 'mail.test.simple',
'res_id': test_record.id
})
self.assertIn('-openerp-%d-mail.test.simple' % test_record.id, msg.message_id.split('@')[0])
reply_to_name = '%s %s' % (self.env.user.company_id.name, test_record.name)
reply_to_email = '%s@%s' % (self.alias_catchall, self.alias_domain)
self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email)))
self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))
@mute_logger('odoo.models.unlink')
def test_mail_message_values_document_manual_alias(self):
test_record = self.env['mail.test.simple'].create({'name': 'Test', 'email_from': 'ignasse@example.com'})
alias = self.env['mail.alias'].create({
'alias_name': 'MegaLias',
'alias_user_id': False,
'alias_model_id': self.env['ir.model']._get('mail.test.simple').id,
'alias_parent_model_id': self.env['ir.model']._get('mail.test.simple').id,
'alias_parent_thread_id': test_record.id,
})
msg = self.Message.create({
'model': 'mail.test.simple',
'res_id': test_record.id
})
self.assertIn('-openerp-%d-mail.test.simple' % test_record.id, msg.message_id.split('@')[0])
reply_to_name = '%s %s' % (self.env.user.company_id.name, test_record.name)
reply_to_email = '%s@%s' % (alias.alias_name, self.alias_domain)
self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email)))
self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))
def test_mail_message_values_no_auto_thread(self):
msg = self.Message.create({
'model': 'mail.test.container',
'res_id': self.alias_record.id,
'no_auto_thread': True,
})
self.assertIn('reply_to', msg.message_id.split('@')[0])
self.assertNotIn('mail.test.container', msg.message_id.split('@')[0])
self.assertNotIn('-%d-' % self.alias_record.id, msg.message_id.split('@')[0])
def test_mail_message_base64_image(self):
msg = self.env['mail.message'].with_user(self.user_employee).create({
'body': 'taratata
',
})
self.assertEqual(len(msg.attachment_ids), 1)
body = '
taratata
This is First Message
', subject='Subject', message_type='comment', subtype_xmlid='mail.mt_note') # portal user have no rights to read the message with self.assertRaises(AccessError): message.with_user(self.user_portal).read(['subject, body']) with patch.object(MailTestSimple, 'check_access_rights', return_value=True): with self.assertRaises(AccessError): message.with_user(self.user_portal).read(['subject, body']) # parent message is accessible to references notification mail values # for _notify method and portal user have no rights to send the message for this model new_msg = test_record.with_user(self.user_portal).message_post( body='This is Second Message
', subject='Subject', parent_id=message.id, message_type='comment', subtype_xmlid='mail.mt_comment', mail_auto_delete=False) new_mail = self.env['mail.mail'].sudo().search([ ('mail_message_id', '=', new_msg.id), ('references', '=', message.message_id), ]) self.assertTrue(new_mail) self.assertEqual(new_msg.parent_id, message) # -------------------------------------------------- # WRITE # -------------------------------------------------- def test_mail_message_access_write_moderation(self): """ Only moderators can modify pending messages """ self.group_public.write({ 'email_send': True, 'moderation': True, 'channel_partner_ids': [(4, self.partner_employee.id)], 'moderator_ids': [(4, self.user_employee.id)], }) self.message.write({'model': 'mail.channel', 'res_id': self.group_public.id, 'moderation_status': 'pending_moderation'}) self.message.with_user(self.user_employee).write({'moderation_status': 'accepted'}) def test_mail_message_access_write_crash_moderation(self): self.message.write({'model': 'mail.channel', 'res_id': self.group_public.id, 'moderation_status': 'pending_moderation'}) with self.assertRaises(AccessError): self.message.with_user(self.user_employee).write({'moderation_status': 'accepted'}) @mute_logger('openerp.addons.mail.models.mail_mail') def test_mark_all_as_read(self): self.user_employee.notification_type = 'inbox' emp_partner = self.user_employee.partner_id.with_user(self.user_employee) group_private = self.env['mail.channel'].with_context({ 'mail_create_nolog': True, 'mail_create_nosubscribe': True, 'mail_channel_noautofollow': True, }).create({ 'name': 'Private', 'description': 'Private James R.', 'public': 'private', 'alias_name': 'private', 'alias_contact': 'followers'} ).with_context({'mail_create_nosubscribe': False}) # mark all as read clear needactions msg1 = group_private.message_post(body='Test', message_type='comment', subtype_xmlid='mail.mt_comment', partner_ids=[emp_partner.id]) self._reset_bus() emp_partner.env['mail.message'].mark_all_as_read(domain=[]) self.assertBusNotifications([(self.cr.dbname, 'res.partner', emp_partner.id)], [{ 'type': 'mark_as_read', 'message_ids': [msg1.id], 'needaction_inbox_counter': 0 }]) na_count = emp_partner.get_needaction_count() self.assertEqual(na_count, 0, "mark all as read should conclude all needactions") # mark all as read also clear inaccessible needactions msg2 = group_private.message_post(body='Zest', message_type='comment', subtype_xmlid='mail.mt_comment', partner_ids=[emp_partner.id]) needaction_accessible = len(emp_partner.env['mail.message'].search([['needaction', '=', True]])) self.assertEqual(needaction_accessible, 1, "a new message to a partner is readable to that partner") msg2.sudo().partner_ids = self.env['res.partner'] emp_partner.env['mail.message'].search([['needaction', '=', True]]) needaction_length = len(emp_partner.env['mail.message'].search([['needaction', '=', True]])) self.assertEqual(needaction_length, 1, "message should still be readable when notified") na_count = emp_partner.get_needaction_count() self.assertEqual(na_count, 1, "message not accessible is currently still counted") self._reset_bus() emp_partner.env['mail.message'].mark_all_as_read(domain=[]) self.assertBusNotifications([(self.cr.dbname, 'res.partner', emp_partner.id)], [{ 'type': 'mark_as_read', 'message_ids': [msg2.id], 'needaction_inbox_counter': 0 }]) na_count = emp_partner.get_needaction_count() self.assertEqual(na_count, 0, "mark all read should conclude all needactions even inacessible ones") @mute_logger('openerp.addons.mail.models.mail_mail') def test_mark_all_as_read_share(self): self.user_portal.notification_type = 'inbox' portal_partner = self.user_portal.partner_id.with_user(self.user_portal) # mark all as read clear needactions self.group_pigs.message_post(body='Test', message_type='comment', subtype_xmlid='mail.mt_comment', partner_ids=[portal_partner.id]) portal_partner.env['mail.message'].mark_all_as_read(domain=[]) na_count = portal_partner.get_needaction_count() self.assertEqual(na_count, 0, "mark all as read should conclude all needactions") # mark all as read also clear inaccessible needactions new_msg = self.group_pigs.message_post(body='Zest', message_type='comment', subtype_xmlid='mail.mt_comment', partner_ids=[portal_partner.id]) needaction_accessible = len(portal_partner.env['mail.message'].search([['needaction', '=', True]])) self.assertEqual(needaction_accessible, 1, "a new message to a partner is readable to that partner") new_msg.sudo().partner_ids = self.env['res.partner'] needaction_length = len(portal_partner.env['mail.message'].search([['needaction', '=', True]])) self.assertEqual(needaction_length, 1, "message should still be readable when notified") na_count = portal_partner.get_needaction_count() self.assertEqual(na_count, 1, "message not accessible is currently still counted") portal_partner.env['mail.message'].mark_all_as_read(domain=[]) na_count = portal_partner.get_needaction_count() self.assertEqual(na_count, 0, "mark all read should conclude all needactions even inacessible ones") @tagged('moderation') class TestMessageModeration(TestMailCommon): @classmethod def setUpClass(cls): super(TestMessageModeration, cls).setUpClass() cls.channel_1 = cls.env['mail.channel'].create({ 'name': 'Moderation_1', 'email_send': True, 'moderation': True }) cls.user_employee.write({'moderation_channel_ids': [(6, 0, [cls.channel_1.id])]}) cls.user_portal = cls._create_portal_user() # A pending moderation message needs to have field channel_ids empty. Moderators # need to be able to notify a pending moderation message (in a channel they moderate). cls.msg_c1_admin1 = cls._add_messages(cls.channel_1, 'Body11', author=cls.partner_admin, moderation_status='pending_moderation') cls.msg_c1_admin2 = cls._add_messages(cls.channel_1, 'Body12', author=cls.partner_admin, moderation_status='pending_moderation') cls.msg_c1_portal = cls._add_messages(cls.channel_1, 'Body21', author=cls.partner_portal, moderation_status='pending_moderation') @mute_logger('odoo.models.unlink') def test_moderate_accept(self): self._reset_bus() self.assertFalse(self.msg_c1_admin1.channel_ids | self.msg_c1_admin2.channel_ids | self.msg_c1_portal.channel_ids) self.msg_c1_admin1.with_user(self.user_employee)._moderate('accept') self.assertEqual(self.msg_c1_admin1.channel_ids, self.channel_1) self.assertEqual(self.msg_c1_admin1.moderation_status, 'accepted') self.assertEqual(self.msg_c1_admin2.moderation_status, 'pending_moderation') self.assertBusNotifications([(self.cr.dbname, 'mail.channel', self.channel_1.id)]) @mute_logger('odoo.models.unlink') def test_moderate_allow(self): self._reset_bus() self.msg_c1_admin1.with_user(self.user_employee)._moderate('allow') self.assertEqual(self.msg_c1_admin1.channel_ids, self.channel_1) self.assertEqual(self.msg_c1_admin2.channel_ids, self.channel_1) self.assertEqual(self.msg_c1_admin1.moderation_status, 'accepted') self.assertEqual(self.msg_c1_admin2.moderation_status, 'accepted') self.assertBusNotifications([ (self.cr.dbname, 'mail.channel', self.channel_1.id), (self.cr.dbname, 'mail.channel', self.channel_1.id)]) @mute_logger('odoo.models.unlink') def test_moderate_reject(self): with self.mock_mail_gateway(): (self.msg_c1_admin1 | self.msg_c1_portal).with_user(self.user_employee)._moderate_send_reject_email('Title', 'Message to author') self.assertEqual(len(self._new_mails), 2) for mail in self._new_mails: self.assertEqual(mail.author_id, self.partner_employee) self.assertEqual(mail.subject, 'Title') self.assertEqual(mail.state, 'outgoing') self.assertEqual( set(self._new_mails.mapped('email_to')), set([self.msg_c1_admin1.email_from, self.msg_c1_portal.email_from]) ) self.assertEqual( set(self._new_mails.mapped('body_html')), set(['