summaryrefslogtreecommitdiff
path: root/addons/mass_mailing/tests/common.py
blob: 1baf6c7d75d96244e6446312ce4652e0b8ab0973 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.

import datetime
import random
import re
import werkzeug

from odoo.addons.link_tracker.tests.common import MockLinkTracker
from odoo.addons.mail.tests.common import MailCase, MailCommon, mail_new_test_user
from odoo import tools

class MassMailCase(MailCase, MockLinkTracker):

    # ------------------------------------------------------------
    # ASSERTS
    # ------------------------------------------------------------

    def assertMailingStatistics(self, mailing, **kwargs):
        """ Helper to assert mailing statistics fields. As we have many of them
        it helps lessening test asserts. """
        if not kwargs.get('expected'):
            kwargs['expected'] = len(mailing.mailing_trace_ids)
        if not kwargs.get('delivered'):
            kwargs['delivered'] = len(mailing.mailing_trace_ids)
        for fname in ['scheduled', 'expected', 'sent', 'delivered',
                      'opened', 'replied', 'clicked',
                      'ignored', 'failed', 'bounced']:
            self.assertEqual(
                mailing[fname], kwargs.get(fname, 0),
                'Mailing %s statistics failed: got %s instead of %s' % (fname, mailing[fname], kwargs.get(fname, 0))
            )

    def assertMailTraces(self, recipients_info, mailing, records,
                         check_mail=True, sent_unlink=False, author=None,
                         mail_links_info=None):
        """ Check content of traces. Traces are fetched based on a given mailing
        and records. Their content is compared to recipients_info structure that
        holds expected information. Links content may be checked, notably to
        assert shortening or unsubscribe links. Mail.mail records may optionally
        be checked.

        :param recipients_info: list[{
            # TRACE
            'partner': res.partner record (may be empty),
            'email': email used when sending email (may be empty, computed based on partner),
            'state': outgoing / sent / ignored / bounced / exception / opened (sent by default),
            'record: linked record,
            # MAIL.MAIL
            'content': optional content that should be present in mail.mail body_html;
            'failure_type': optional failure reason;
            }, { ... }]

        :param mailing: a mailing.mailing record from which traces have been
          generated;
        :param records: records given to mailing that generated traces. It is
          used notably to find traces using their IDs;
        :param check_mail: if True, also check mail.mail records that should be
          linked to traces;
        :param sent_unlink: it True, sent mail.mail are deleted and we check gateway
          output result instead of actual mail.mail records;
        :param mail_links_info: if given, should follow order of ``recipients_info``
          and give details about links. See ``assertLinkShortenedHtml`` helper for
          more details about content to give;
        :param author: author of sent mail.mail;
        """
        # map trace state to email state
        state_mapping = {
            'sent': 'sent',
            'opened': 'sent',  # opened implies something has been sent
            'replied': 'sent',  # replied implies something has been sent
            'ignored': 'cancel',
            'exception': 'exception',
            'canceled': 'cancel',
            'bounced': 'cancel',
        }

        traces = self.env['mailing.trace'].search([
            ('mass_mailing_id', 'in', mailing.ids),
            ('res_id', 'in', records.ids)
        ])

        # ensure trace coherency
        self.assertTrue(all(s.model == records._name for s in traces))
        self.assertEqual(set(s.res_id for s in traces), set(records.ids))

        # check each traces
        if not mail_links_info:
            mail_links_info = [None] * len(recipients_info)
        for recipient_info, link_info, record in zip(recipients_info, mail_links_info, records):
            partner = recipient_info.get('partner', self.env['res.partner'])
            email = recipient_info.get('email')
            state = recipient_info.get('state', 'sent')
            record = record or recipient_info.get('record')
            content = recipient_info.get('content')
            if email is None and partner:
                email = partner.email_normalized

            recipient_trace = traces.filtered(
                lambda t: t.email == email and t.state == state and (t.res_id == record.id if record else True)
            )
            self.assertTrue(
                len(recipient_trace) == 1,
                'MailTrace: email %s (recipient %s, state: %s, record: %s): found %s records (1 expected)' % (email, partner, state, record, len(recipient_trace))
            )
            self.assertTrue(bool(recipient_trace.mail_mail_id_int))

            if check_mail:
                if author is None:
                    author = self.env.user.partner_id

                fields_values = {'mailing_id': mailing}
                if 'failure_type' in recipient_info:
                    fields_values['failure_type'] = recipient_info['failure_type']

                # specific for partner: email_formatted is used
                if partner:
                    if state == 'sent' and sent_unlink:
                        self.assertSentEmail(author, [partner])
                    else:
                        self.assertMailMail(partner, state_mapping[state], author=author, content=content, fields_values=fields_values)
                # specific if email is False -> could have troubles finding it if several falsy traces
                elif not email and state in ('ignored', 'canceled', 'bounced'):
                    self.assertMailMailWId(recipient_trace.mail_mail_id_int, state_mapping[state], content=content, fields_values=fields_values)
                else:
                    self.assertMailMailWEmails([email], state_mapping[state], author=author, content=content, fields_values=fields_values)

            if link_info:
                trace_mail = self._find_mail_mail_wrecord(record)
                for (anchor_id, url, is_shortened, add_link_params) in link_info:
                    link_params = {'utm_medium': 'Email', 'utm_source': mailing.name}
                    if add_link_params:
                        link_params.update(**add_link_params)
                    self.assertLinkShortenedHtml(
                        trace_mail.body_html,
                        (anchor_id, url, is_shortened),
                        link_params=link_params,
                    )

    # ------------------------------------------------------------
    # TOOLS
    # ------------------------------------------------------------

    def gateway_mail_bounce(self, mailing, record, bounce_base_values=None):
        """ Generate a bounce at mailgateway level.

        :param mailing: a ``mailing.mailing`` record on which we find a trace
          to bounce;
        :param record: record which should bounce;
        :param bounce_base_values: optional values given to routing;
        """
        trace = mailing.mailing_trace_ids.filtered(lambda t: t.model == record._name and t.res_id == record.id)

        parsed_bounce_values = {
            'email_from': 'some.email@external.example.com',  # TDE check: email_from -> trace email ?
            'to': 'bounce@test.example.com',  # TDE check: bounce alias ?
            'message_id': tools.generate_tracking_message_id('MailTest'),
            'bounced_partner': self.env['res.partner'].sudo(),
            'bounced_message': self.env['mail.message'].sudo()
        }
        if bounce_base_values:
            parsed_bounce_values.update(bounce_base_values)
        parsed_bounce_values.update({
            'bounced_email': trace.email,
            'bounced_msg_id': [trace.message_id],
        })
        self.env['mail.thread']._routing_handle_bounce(False, parsed_bounce_values)

    def gateway_mail_click(self, mailing, record, click_label):
        """ Simulate a click on a sent email. """
        trace = mailing.mailing_trace_ids.filtered(lambda t: t.model == record._name and t.res_id == record.id)
        email = self._find_sent_mail_wemail(trace.email)
        self.assertTrue(bool(email))
        for (_url_href, link_url, _dummy, label) in re.findall(tools.HTML_TAG_URL_REGEX, email['body']):
            if label == click_label and '/r/' in link_url:  # shortened link, like 'http://localhost:8069/r/LBG/m/53'
                parsed_url = werkzeug.urls.url_parse(link_url)
                path_items = parsed_url.path.split('/')
                code, trace_id = path_items[2], int(path_items[4])
                self.assertEqual(trace.id, trace_id)

                self.env['link.tracker.click'].sudo().add_click(
                    code,
                    ip='100.200.300.%3f' % random.random(),
                    country_code='BE',
                    mailing_trace_id=trace.id
                )
                break
        else:
            raise AssertionError('url %s not found in mailing %s for record %s' % (click_label, mailing, record))

    @classmethod
    def _create_bounce_trace(cls, mailing, record, dt=None):
        if 'email_normalized' in record:
            trace_email = record.email_normalized
        elif 'email_from' in record:
            trace_email = record.email_from
        else:
            trace_email = record.email
        if dt is None:
            dt = datetime.datetime.now() - datetime.timedelta(days=1)
        randomized = random.random()
        trace = cls.env['mailing.trace'].create({
            'mass_mailing_id': mailing.id,
            'model': record._name,
            'res_id': record.id,
            'bounced': dt,
            # TDE FIXME: improve this with a mail-enabled heuristics
            'email': trace_email,
            'message_id': '<%5f@gilbert.boitempomils>' % randomized,
        })
        return trace


class MassMailCommon(MailCommon, MassMailCase):

    @classmethod
    def setUpClass(cls):
        super(MassMailCommon, cls).setUpClass()

        cls.user_marketing = mail_new_test_user(
            cls.env, login='user_marketing',
            groups='base.group_user,base.group_partner_manager,mass_mailing.group_mass_mailing_user',
            name='Martial Marketing', signature='--\nMartial')

        cls.email_reply_to = 'MyCompany SomehowAlias <test.alias@test.mycompany.com>'

        cls.env['base'].flush()

    @classmethod
    def _create_mailing_list(cls):
        """ Shortcut to create mailing lists. Currently hardcoded, maybe evolve
        in a near future. """
        cls.mailing_list_1 = cls.env['mailing.list'].with_context(cls._test_context).create({
            'name': 'List1',
            'contact_ids': [
                (0, 0, {'name': 'Déboulonneur', 'email': 'fleurus@example.com'}),
                (0, 0, {'name': 'Gorramts', 'email': 'gorramts@example.com'}),
                (0, 0, {'name': 'Ybrant', 'email': 'ybrant@example.com'}),
            ]
        })
        cls.mailing_list_2 = cls.env['mailing.list'].with_context(cls._test_context).create({
            'name': 'List2',
            'contact_ids': [
                (0, 0, {'name': 'Gilberte', 'email': 'gilberte@example.com'}),
                (0, 0, {'name': 'Gilberte En Mieux', 'email': 'gilberte@example.com'}),
                (0, 0, {'name': 'Norbert', 'email': 'norbert@example.com'}),
                (0, 0, {'name': 'Ybrant', 'email': 'ybrant@example.com'}),
            ]
        })