# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import hashlib
import hmac
import logging
import lxml
import random
import re
import threading
import werkzeug.urls
from ast import literal_eval
from datetime import datetime
from dateutil.relativedelta import relativedelta
from werkzeug.urls import url_join
from odoo import api, fields, models, tools, _
from odoo.exceptions import UserError
from odoo.osv import expression
_logger = logging.getLogger(__name__)
MASS_MAILING_BUSINESS_MODELS = [
'crm.lead',
'event.registration',
'hr.applicant',
'res.partner',
'event.track',
'sale.order',
'mailing.list',
'mailing.contact'
]
# Syntax of the data URL Scheme: https://tools.ietf.org/html/rfc2397#section-3
# Used to find inline images
image_re = re.compile(r"data:(image/[A-Za-z]+);base64,(.*)")
class MassMailing(models.Model):
""" MassMailing models a wave of emails for a mass mailign campaign.
A mass mailing is an occurence of sending emails. """
_name = 'mailing.mailing'
_description = 'Mass Mailing'
_inherit = ['mail.thread', 'mail.activity.mixin', 'mail.render.mixin']
_order = 'sent_date DESC'
_inherits = {'utm.source': 'source_id'}
_rec_name = "subject"
@api.model
def default_get(self, fields):
vals = super(MassMailing, self).default_get(fields)
if 'contact_list_ids' in fields and not vals.get('contact_list_ids') and vals.get('mailing_model_id'):
if vals.get('mailing_model_id') == self.env['ir.model']._get('mailing.list').id:
mailing_list = self.env['mailing.list'].search([], limit=2)
if len(mailing_list) == 1:
vals['contact_list_ids'] = [(6, 0, [mailing_list.id])]
return vals
@api.model
def _get_default_mail_server_id(self):
server_id = self.env['ir.config_parameter'].sudo().get_param('mass_mailing.mail_server_id')
try:
server_id = literal_eval(server_id) if server_id else False
return self.env['ir.mail_server'].search([('id', '=', server_id)]).id
except ValueError:
return False
active = fields.Boolean(default=True, tracking=True)
subject = fields.Char('Subject', help='Subject of your Mailing', required=True, translate=True)
preview = fields.Char(
'Preview', translate=True,
help='Catchy preview sentence that encourages recipients to open this email.\n'
'In most inboxes, this is displayed next to the subject.\n'
'Keep it empty if you prefer the first characters of your email content to appear instead.')
email_from = fields.Char(string='Send From', required=True,
default=lambda self: self.env.user.email_formatted)
sent_date = fields.Datetime(string='Sent Date', copy=False)
schedule_date = fields.Datetime(string='Scheduled for', tracking=True)
# don't translate 'body_arch', the translations are only on 'body_html'
body_arch = fields.Html(string='Body', translate=False)
body_html = fields.Html(string='Body converted to be sent by mail', sanitize_attributes=False)
attachment_ids = fields.Many2many('ir.attachment', 'mass_mailing_ir_attachments_rel',
'mass_mailing_id', 'attachment_id', string='Attachments')
keep_archives = fields.Boolean(string='Keep Archives')
campaign_id = fields.Many2one('utm.campaign', string='UTM Campaign', index=True)
source_id = fields.Many2one('utm.source', string='Source', required=True, ondelete='cascade',
help="This is the link source, e.g. Search Engine, another domain, or name of email list")
medium_id = fields.Many2one(
'utm.medium', string='Medium',
compute='_compute_medium_id', readonly=False, store=True,
help="UTM Medium: delivery method (email, sms, ...)")
state = fields.Selection([('draft', 'Draft'), ('in_queue', 'In Queue'), ('sending', 'Sending'), ('done', 'Sent')],
string='Status', required=True, tracking=True, copy=False, default='draft', group_expand='_group_expand_states')
color = fields.Integer(string='Color Index')
user_id = fields.Many2one('res.users', string='Responsible', tracking=True, default=lambda self: self.env.user)
# mailing options
mailing_type = fields.Selection([('mail', 'Email')], string="Mailing Type", default="mail", required=True)
reply_to_mode = fields.Selection([
('thread', 'Recipient Followers'), ('email', 'Specified Email Address')],
string='Reply-To Mode', compute='_compute_reply_to_mode',
readonly=False, store=True,
help='Thread: replies go to target document. Email: replies are routed to a given email.')
reply_to = fields.Char(
string='Reply To', compute='_compute_reply_to', readonly=False, store=True,
help='Preferred Reply-To Address')
# recipients
mailing_model_real = fields.Char(string='Recipients Real Model', compute='_compute_model')
mailing_model_id = fields.Many2one(
'ir.model', string='Recipients Model', ondelete='cascade', required=True,
domain=[('model', 'in', MASS_MAILING_BUSINESS_MODELS)],
default=lambda self: self.env.ref('mass_mailing.model_mailing_list').id)
mailing_model_name = fields.Char(
string='Recipients Model Name', related='mailing_model_id.model',
readonly=True, related_sudo=True)
mailing_domain = fields.Char(
string='Domain', compute='_compute_mailing_domain',
readonly=False, store=True)
mail_server_id = fields.Many2one('ir.mail_server', string='Mail Server',
default=_get_default_mail_server_id,
help="Use a specific mail server in priority. Otherwise Odoo relies on the first outgoing mail server available (based on their sequencing) as it does for normal mails.")
contact_list_ids = fields.Many2many('mailing.list', 'mail_mass_mailing_list_rel', string='Mailing Lists')
contact_ab_pc = fields.Integer(string='A/B Testing percentage',
help='Percentage of the contacts that will be mailed. Recipients will be taken randomly.', default=100)
unique_ab_testing = fields.Boolean(string='Allow A/B Testing', default=False,
help='If checked, recipients will be mailed only once for the whole campaign. '
'This lets you send different mailings to randomly selected recipients and test '
'the effectiveness of the mailings, without causing duplicate messages.')
kpi_mail_required = fields.Boolean('KPI mail required', copy=False)
# statistics data
mailing_trace_ids = fields.One2many('mailing.trace', 'mass_mailing_id', string='Emails Statistics')
total = fields.Integer(compute="_compute_total")
scheduled = fields.Integer(compute="_compute_statistics")
expected = fields.Integer(compute="_compute_statistics")
ignored = fields.Integer(compute="_compute_statistics")
sent = fields.Integer(compute="_compute_statistics")
delivered = fields.Integer(compute="_compute_statistics")
opened = fields.Integer(compute="_compute_statistics")
clicked = fields.Integer(compute="_compute_statistics")
replied = fields.Integer(compute="_compute_statistics")
bounced = fields.Integer(compute="_compute_statistics")
failed = fields.Integer(compute="_compute_statistics")
received_ratio = fields.Integer(compute="_compute_statistics", string='Received Ratio')
opened_ratio = fields.Integer(compute="_compute_statistics", string='Opened Ratio')
replied_ratio = fields.Integer(compute="_compute_statistics", string='Replied Ratio')
bounced_ratio = fields.Integer(compute="_compute_statistics", string='Bounced Ratio')
clicks_ratio = fields.Integer(compute="_compute_clicks_ratio", string="Number of Clicks")
next_departure = fields.Datetime(compute="_compute_next_departure", string='Scheduled date')
def _compute_total(self):
for mass_mailing in self:
total = self.env[mass_mailing.mailing_model_real].search_count(mass_mailing._parse_mailing_domain())
if mass_mailing.contact_ab_pc < 100:
total = int(total / 100.0 * mass_mailing.contact_ab_pc)
mass_mailing.total = total
def _compute_clicks_ratio(self):
self.env.cr.execute("""
SELECT COUNT(DISTINCT(stats.id)) AS nb_mails, COUNT(DISTINCT(clicks.mailing_trace_id)) AS nb_clicks, stats.mass_mailing_id AS id
FROM mailing_trace AS stats
LEFT OUTER JOIN link_tracker_click AS clicks ON clicks.mailing_trace_id = stats.id
WHERE stats.mass_mailing_id IN %s
GROUP BY stats.mass_mailing_id
""", [tuple(self.ids) or (None,)])
mass_mailing_data = self.env.cr.dictfetchall()
mapped_data = dict([(m['id'], 100 * m['nb_clicks'] / m['nb_mails']) for m in mass_mailing_data])
for mass_mailing in self:
mass_mailing.clicks_ratio = mapped_data.get(mass_mailing.id, 0)
def _compute_statistics(self):
""" Compute statistics of the mass mailing """
for key in (
'scheduled', 'expected', 'ignored', 'sent', 'delivered', 'opened',
'clicked', 'replied', 'bounced', 'failed', 'received_ratio',
'opened_ratio', 'replied_ratio', 'bounced_ratio',
):
self[key] = False
if not self.ids:
return
# ensure traces are sent to db
self.flush()
self.env.cr.execute("""
SELECT
m.id as mailing_id,
COUNT(s.id) AS expected,
COUNT(CASE WHEN s.sent is not null THEN 1 ELSE null END) AS sent,
COUNT(CASE WHEN s.scheduled is not null AND s.sent is null AND s.exception is null AND s.ignored is null AND s.bounced is null THEN 1 ELSE null END) AS scheduled,
COUNT(CASE WHEN s.scheduled is not null AND s.sent is null AND s.exception is null AND s.ignored is not null THEN 1 ELSE null END) AS ignored,
COUNT(CASE WHEN s.sent is not null AND s.exception is null AND s.bounced is null THEN 1 ELSE null END) AS delivered,
COUNT(CASE WHEN s.opened is not null THEN 1 ELSE null END) AS opened,
COUNT(CASE WHEN s.clicked is not null THEN 1 ELSE null END) AS clicked,
COUNT(CASE WHEN s.replied is not null THEN 1 ELSE null END) AS replied,
COUNT(CASE WHEN s.bounced is not null THEN 1 ELSE null END) AS bounced,
COUNT(CASE WHEN s.exception is not null THEN 1 ELSE null END) AS failed
FROM
mailing_trace s
RIGHT JOIN
mailing_mailing m
ON (m.id = s.mass_mailing_id)
WHERE
m.id IN %s
GROUP BY
m.id
""", (tuple(self.ids), ))
for row in self.env.cr.dictfetchall():
total = (row['expected'] - row['ignored']) or 1
row['received_ratio'] = 100.0 * row['delivered'] / total
row['opened_ratio'] = 100.0 * row['opened'] / total
row['replied_ratio'] = 100.0 * row['replied'] / total
row['bounced_ratio'] = 100.0 * row['bounced'] / total
self.browse(row.pop('mailing_id')).update(row)
def _compute_next_departure(self):
cron_next_call = self.env.ref('mass_mailing.ir_cron_mass_mailing_queue').sudo().nextcall
str2dt = fields.Datetime.from_string
cron_time = str2dt(cron_next_call)
for mass_mailing in self:
if mass_mailing.schedule_date:
schedule_date = str2dt(mass_mailing.schedule_date)
mass_mailing.next_departure = max(schedule_date, cron_time)
else:
mass_mailing.next_departure = cron_time
@api.depends('mailing_type')
def _compute_medium_id(self):
for mailing in self:
if mailing.mailing_type == 'mail' and not mailing.medium_id:
mailing.medium_id = self.env.ref('utm.utm_medium_email').id
@api.depends('mailing_model_id')
def _compute_model(self):
for record in self:
record.mailing_model_real = (record.mailing_model_name != 'mailing.list') and record.mailing_model_name or 'mailing.contact'
@api.depends('mailing_model_real')
def _compute_reply_to_mode(self):
for mailing in self:
if mailing.mailing_model_real in ['res.partner', 'mailing.contact']:
mailing.reply_to_mode = 'email'
else:
mailing.reply_to_mode = 'thread'
@api.depends('reply_to_mode')
def _compute_reply_to(self):
for mailing in self:
if mailing.reply_to_mode == 'email' and not mailing.reply_to:
mailing.reply_to = self.env.user.email_formatted
elif mailing.reply_to_mode == 'thread':
mailing.reply_to = False
@api.depends('mailing_model_name', 'contact_list_ids')
def _compute_mailing_domain(self):
for mailing in self:
if not mailing.mailing_model_name:
mailing.mailing_domain = ''
else:
mailing.mailing_domain = repr(mailing._get_default_mailing_domain())
# ------------------------------------------------------
# ORM
# ------------------------------------------------------
@api.model
def create(self, values):
if values.get('subject') and not values.get('name'):
values['name'] = "%s %s" % (values['subject'], datetime.strftime(fields.datetime.now(), tools.DEFAULT_SERVER_DATETIME_FORMAT))
if values.get('body_html'):
values['body_html'] = self._convert_inline_images_to_urls(values['body_html'])
return super(MassMailing, self).create(values)
def write(self, values):
if values.get('body_html'):
values['body_html'] = self._convert_inline_images_to_urls(values['body_html'])
return super(MassMailing, self).write(values)
@api.returns('self', lambda value: value.id)
def copy(self, default=None):
self.ensure_one()
default = dict(default or {},
name=_('%s (copy)', self.name),
contact_list_ids=self.contact_list_ids.ids)
return super(MassMailing, self).copy(default=default)
def _group_expand_states(self, states, domain, order):
return [key for key, val in type(self).state.selection]
# ------------------------------------------------------
# ACTIONS
# ------------------------------------------------------
def action_duplicate(self):
self.ensure_one()
mass_mailing_copy = self.copy()
if mass_mailing_copy:
context = dict(self.env.context)
context['form_view_initial_mode'] = 'edit'
return {
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'mailing.mailing',
'res_id': mass_mailing_copy.id,
'context': context,
}
return False
def action_test(self):
self.ensure_one()
ctx = dict(self.env.context, default_mass_mailing_id=self.id)
return {
'name': _('Test Mailing'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'mailing.mailing.test',
'target': 'new',
'context': ctx,
}
def action_schedule(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("mass_mailing.mailing_mailing_schedule_date_action")
action['context'] = dict(self.env.context, default_mass_mailing_id=self.id)
return action
def action_put_in_queue(self):
self.write({'state': 'in_queue'})
def action_cancel(self):
self.write({'state': 'draft', 'schedule_date': False, 'next_departure': False})
def action_retry_failed(self):
failed_mails = self.env['mail.mail'].sudo().search([
('mailing_id', 'in', self.ids),
('state', '=', 'exception')
])
failed_mails.mapped('mailing_trace_ids').unlink()
failed_mails.unlink()
self.write({'state': 'in_queue'})
def action_view_traces_scheduled(self):
return self._action_view_traces_filtered('scheduled')
def action_view_traces_ignored(self):
return self._action_view_traces_filtered('ignored')
def action_view_traces_failed(self):
return self._action_view_traces_filtered('failed')
def action_view_traces_sent(self):
return self._action_view_traces_filtered('sent')
def _action_view_traces_filtered(self, view_filter):
action = self.env["ir.actions.actions"]._for_xml_id("mass_mailing.mailing_trace_action")
action['name'] = _('%s Traces') % (self.name)
action['context'] = {'search_default_mass_mailing_id': self.id,}
filter_key = 'search_default_filter_%s' % (view_filter)
action['context'][filter_key] = True
return action
def action_view_clicked(self):
model_name = self.env['ir.model']._get('link.tracker').display_name
return {
'name': model_name,
'type': 'ir.actions.act_window',
'view_mode': 'tree',
'res_model': 'link.tracker',
'domain': [('mass_mailing_id.id', '=', self.id)],
'context': dict(self._context, create=False)
}
def action_view_opened(self):
return self._action_view_documents_filtered('opened')
def action_view_replied(self):
return self._action_view_documents_filtered('replied')
def action_view_bounced(self):
return self._action_view_documents_filtered('bounced')
def action_view_delivered(self):
return self._action_view_documents_filtered('delivered')
def _action_view_documents_filtered(self, view_filter):
if view_filter in ('opened', 'replied', 'bounced'):
opened_stats = self.mailing_trace_ids.filtered(lambda stat: stat[view_filter])
elif view_filter == ('delivered'):
opened_stats = self.mailing_trace_ids.filtered(lambda stat: stat.sent and not stat.bounced)
else:
opened_stats = self.env['mailing.trace']
res_ids = opened_stats.mapped('res_id')
model_name = self.env['ir.model']._get(self.mailing_model_real).display_name
return {
'name': model_name,
'type': 'ir.actions.act_window',
'view_mode': 'tree',
'res_model': self.mailing_model_real,
'domain': [('id', 'in', res_ids)],
'context': dict(self._context, create=False)
}
def update_opt_out(self, email, list_ids, value):
if len(list_ids) > 0:
model = self.env['mailing.contact'].with_context(active_test=False)
records = model.search([('email_normalized', '=', tools.email_normalize(email))])
opt_out_records = self.env['mailing.contact.subscription'].search([
('contact_id', 'in', records.ids),
('list_id', 'in', list_ids),
('opt_out', '!=', value)
])
opt_out_records.write({'opt_out': value})
message = _('The recipient unsubscribed from %s mailing list(s)') \
if value else _('The recipient subscribed to %s mailing list(s)')
for record in records:
# filter the list_id by record
record_lists = opt_out_records.filtered(lambda rec: rec.contact_id.id == record.id)
if len(record_lists) > 0:
record.sudo().message_post(body=message % ', '.join(str(list.name) for list in record_lists.mapped('list_id')))
# ------------------------------------------------------
# Email Sending
# ------------------------------------------------------
def _get_opt_out_list(self):
"""Returns a set of emails opted-out in target model"""
self.ensure_one()
opt_out = {}
target = self.env[self.mailing_model_real]
if self.mailing_model_real == "mailing.contact":
# if user is opt_out on One list but not on another
# or if two user with same email address, one opted in and the other one opted out, send the mail anyway
# TODO DBE Fixme : Optimise the following to get real opt_out and opt_in
target_list_contacts = self.env['mailing.contact.subscription'].search(
[('list_id', 'in', self.contact_list_ids.ids)])
opt_out_contacts = target_list_contacts.filtered(lambda rel: rel.opt_out).mapped('contact_id.email_normalized')
opt_in_contacts = target_list_contacts.filtered(lambda rel: not rel.opt_out).mapped('contact_id.email_normalized')
opt_out = set(c for c in opt_out_contacts if c not in opt_in_contacts)
_logger.info(
"Mass-mailing %s targets %s, blacklist: %s emails",
self, target._name, len(opt_out))
else:
_logger.info("Mass-mailing %s targets %s, no opt out list available", self, target._name)
return opt_out
def _get_link_tracker_values(self):
self.ensure_one()
vals = {'mass_mailing_id': self.id}
if self.campaign_id:
vals['campaign_id'] = self.campaign_id.id
if self.source_id:
vals['source_id'] = self.source_id.id
if self.medium_id:
vals['medium_id'] = self.medium_id.id
return vals
def _get_seen_list(self):
"""Returns a set of emails already targeted by current mailing/campaign (no duplicates)"""
self.ensure_one()
target = self.env[self.mailing_model_real]
# avoid loading a large number of records in memory
# + use a basic heuristic for extracting emails
query = """
SELECT lower(substring(t.%(mail_field)s, '([^ ,;<@]+@[^> ,;]+)'))
FROM mailing_trace s
JOIN %(target)s t ON (s.res_id = t.id)
WHERE substring(t.%(mail_field)s, '([^ ,;<@]+@[^> ,;]+)') IS NOT NULL
"""
# Apply same 'get email field' rule from mail_thread.message_get_default_recipients
if 'partner_id' in target._fields:
mail_field = 'email'
query = """
SELECT lower(substring(p.%(mail_field)s, '([^ ,;<@]+@[^> ,;]+)'))
FROM mailing_trace s
JOIN %(target)s t ON (s.res_id = t.id)
JOIN res_partner p ON (t.partner_id = p.id)
WHERE substring(p.%(mail_field)s, '([^ ,;<@]+@[^> ,;]+)') IS NOT NULL
"""
elif issubclass(type(target), self.pool['mail.thread.blacklist']):
mail_field = 'email_normalized'
elif 'email_from' in target._fields:
mail_field = 'email_from'
elif 'partner_email' in target._fields:
mail_field = 'partner_email'
elif 'email' in target._fields:
mail_field = 'email'
else:
raise UserError(_("Unsupported mass mailing model %s", self.mailing_model_id.name))
if self.unique_ab_testing:
query +="""
AND s.campaign_id = %%(mailing_campaign_id)s;
"""
else:
query +="""
AND s.mass_mailing_id = %%(mailing_id)s
AND s.model = %%(target_model)s;
"""
query = query % {'target': target._table, 'mail_field': mail_field}
params = {'mailing_id': self.id, 'mailing_campaign_id': self.campaign_id.id, 'target_model': self.mailing_model_real}
self._cr.execute(query, params)
seen_list = set(m[0] for m in self._cr.fetchall())
_logger.info(
"Mass-mailing %s has already reached %s %s emails", self, len(seen_list), target._name)
return seen_list
def _get_mass_mailing_context(self):
"""Returns extra context items with pre-filled blacklist and seen list for massmailing"""
return {
'mass_mailing_opt_out_list': self._get_opt_out_list(),
'mass_mailing_seen_list': self._get_seen_list(),
'post_convert_links': self._get_link_tracker_values(),
}
def _get_recipients(self):
mailing_domain = self._parse_mailing_domain()
res_ids = self.env[self.mailing_model_real].search(mailing_domain).ids
# randomly choose a fragment
if self.contact_ab_pc < 100:
contact_nbr = self.env[self.mailing_model_real].search_count(mailing_domain)
topick = int(contact_nbr / 100.0 * self.contact_ab_pc)
if self.campaign_id and self.unique_ab_testing:
already_mailed = self.campaign_id._get_mailing_recipients()[self.campaign_id.id]
else:
already_mailed = set([])
remaining = set(res_ids).difference(already_mailed)
if topick > len(remaining):
topick = len(remaining)
res_ids = random.sample(remaining, topick)
return res_ids
def _get_remaining_recipients(self):
res_ids = self._get_recipients()
already_mailed = self.env['mailing.trace'].search_read([
('model', '=', self.mailing_model_real),
('res_id', 'in', res_ids),
('mass_mailing_id', '=', self.id)], ['res_id'])
done_res_ids = {record['res_id'] for record in already_mailed}
return [rid for rid in res_ids if rid not in done_res_ids]
def _get_unsubscribe_url(self, email_to, res_id):
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
url = werkzeug.urls.url_join(
base_url, 'mail/mailing/%(mailing_id)s/unsubscribe?%(params)s' % {
'mailing_id': self.id,
'params': werkzeug.urls.url_encode({
'res_id': res_id,
'email': email_to,
'token': self._unsubscribe_token(res_id, email_to),
}),
}
)
return url
def _get_view_url(self, email_to, res_id):
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
url = werkzeug.urls.url_join(
base_url, 'mailing/%(mailing_id)s/view?%(params)s' % {
'mailing_id': self.id,
'params': werkzeug.urls.url_encode({
'res_id': res_id,
'email': email_to,
'token': self._unsubscribe_token(res_id, email_to),
}),
}
)
return url
def action_send_mail(self, res_ids=None):
author_id = self.env.user.partner_id.id
for mailing in self:
if not res_ids:
res_ids = mailing._get_remaining_recipients()
if not res_ids:
raise UserError(_('There are no recipients selected.'))
composer_values = {
'author_id': author_id,
'attachment_ids': [(4, attachment.id) for attachment in mailing.attachment_ids],
'body': mailing._prepend_preview(mailing.body_html, mailing.preview),
'subject': mailing.subject,
'model': mailing.mailing_model_real,
'email_from': mailing.email_from,
'record_name': False,
'composition_mode': 'mass_mail',
'mass_mailing_id': mailing.id,
'mailing_list_ids': [(4, l.id) for l in mailing.contact_list_ids],
'no_auto_thread': mailing.reply_to_mode != 'thread',
'template_id': None,
'mail_server_id': mailing.mail_server_id.id,
}
if mailing.reply_to_mode == 'email':
composer_values['reply_to'] = mailing.reply_to
composer = self.env['mail.compose.message'].with_context(active_ids=res_ids).create(composer_values)
extra_context = mailing._get_mass_mailing_context()
composer = composer.with_context(active_ids=res_ids, **extra_context)
# auto-commit except in testing mode
auto_commit = not getattr(threading.currentThread(), 'testing', False)
composer.send_mail(auto_commit=auto_commit)
mailing.write({
'state': 'done',
'sent_date': fields.Datetime.now(),
# send the KPI mail only if it's the first sending
'kpi_mail_required': not mailing.sent_date,
})
return True
def convert_links(self):
res = {}
for mass_mailing in self:
html = mass_mailing.body_html if mass_mailing.body_html else ''
vals = {'mass_mailing_id': mass_mailing.id}
if mass_mailing.campaign_id:
vals['campaign_id'] = mass_mailing.campaign_id.id
if mass_mailing.source_id:
vals['source_id'] = mass_mailing.source_id.id
if mass_mailing.medium_id:
vals['medium_id'] = mass_mailing.medium_id.id
res[mass_mailing.id] = mass_mailing._shorten_links(html, vals, blacklist=['/unsubscribe_from_list', '/view'])
return res
@api.model
def _process_mass_mailing_queue(self):
mass_mailings = self.search([('state', 'in', ('in_queue', 'sending')), '|', ('schedule_date', '<', fields.Datetime.now()), ('schedule_date', '=', False)])
for mass_mailing in mass_mailings:
user = mass_mailing.write_uid or self.env.user
mass_mailing = mass_mailing.with_context(**user.with_user(user).context_get())
if len(mass_mailing._get_remaining_recipients()) > 0:
mass_mailing.state = 'sending'
mass_mailing.action_send_mail()
else:
mass_mailing.write({
'state': 'done',
'sent_date': fields.Datetime.now(),
# send the KPI mail only if it's the first sending
'kpi_mail_required': not mass_mailing.sent_date,
})
mailings = self.env['mailing.mailing'].search([
('kpi_mail_required', '=', True),
('state', '=', 'done'),
('sent_date', '<=', fields.Datetime.now() - relativedelta(days=1)),
('sent_date', '>=', fields.Datetime.now() - relativedelta(days=5)),
])
if mailings:
mailings._action_send_statistics()
# ------------------------------------------------------
# STATISTICS
# ------------------------------------------------------
def _action_send_statistics(self):
"""Send an email to the responsible of each finished mailing with the statistics."""
self.kpi_mail_required = False
for mailing in self:
user = mailing.user_id
mailing = mailing.with_context(lang=user.lang or self._context.get('lang'))
link_trackers = self.env['link.tracker'].search(
[('mass_mailing_id', '=', mailing.id)]
).sorted('count', reverse=True)
link_trackers_body = self.env['ir.qweb']._render(
'mass_mailing.mass_mailing_kpi_link_trackers',
{'object': mailing, 'link_trackers': link_trackers},
)
rendered_body = self.env['ir.qweb']._render(
'digest.digest_mail_main',
{
'body': tools.html_sanitize(link_trackers_body),
'company': user.company_id,
'user': user,
'display_mobile_banner': True,
** mailing._prepare_statistics_email_values()
},
)
full_mail = self.env['mail.render.mixin']._render_encapsulate(
'digest.digest_mail_layout',
rendered_body,
)
mail_values = {
'subject': _('24H Stats of mailing "%s"') % mailing.subject,
'email_from': user.email_formatted,
'email_to': user.email_formatted,
'body_html': full_mail,
'auto_delete': True,
}
mail = self.env['mail.mail'].sudo().create(mail_values)
mail.send(raise_exception=False)
def _prepare_statistics_email_values(self):
"""Return some statistics that will be displayed in the mailing statistics email.
Each item in the returned list will be displayed as a table, with a title and
1, 2 or 3 columns.
"""
self.ensure_one()
random_tip = self.env['digest.tip'].search(
[('group_id.category_id', '=', self.env.ref('base.module_category_marketing_email_marketing').id)]
)
if random_tip:
random_tip = random.choice(random_tip).tip_description
formatted_date = tools.format_datetime(
self.env, self.sent_date, self.user_id.tz, 'MMM dd, YYYY', self.user_id.lang
) if self.sent_date else False
web_base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
return {
'title': _('24H Stats of mailing'),
'sub_title': '"%s"' % self.subject,
'top_button_label': _('More Info'),
'top_button_url': url_join(web_base_url, f'/web#id={self.id}&model=mailing.mailing&view_type=form'),
'kpi_data': [
{
'kpi_fullname': _('Engagement on %i Emails Sent') % self.sent,
'kpi_action': None,
'kpi_col1': {
'value': f'{self.received_ratio}%',
'col_subtitle': '%s (%i)' % (_('RECEIVED'), self.delivered),
},
'kpi_col2': {
'value': f'{self.opened_ratio}%',
'col_subtitle': '%s (%i)' % (_('OPENED'), self.opened),
},
'kpi_col3': {
'value': f'{self.replied_ratio}%',
'col_subtitle': '%s (%i)' % (_('REPLIED'), self.replied),
},
}, {
'kpi_fullname': _('Business Benefits on %i Emails Sent') % self.sent,
'kpi_action': None,
'kpi_col1': {},
'kpi_col2': {},
'kpi_col3': {},
},
],
'tips': [random_tip] if random_tip else False,
'formatted_date': formatted_date,
}
# ------------------------------------------------------
# TOOLS
# ------------------------------------------------------
def _get_default_mailing_domain(self):
mailing_domain = []
if self.mailing_model_name == 'mailing.list' and self.contact_list_ids:
mailing_domain = [('list_ids', 'in', self.contact_list_ids.ids)]
if self.mailing_type == 'mail' and 'is_blacklisted' in self.env[self.mailing_model_name]._fields:
mailing_domain = expression.AND([[('is_blacklisted', '=', False)], mailing_domain])
return mailing_domain
def _parse_mailing_domain(self):
self.ensure_one()
try:
mailing_domain = literal_eval(self.mailing_domain)
except Exception:
mailing_domain = [('id', 'in', [])]
return mailing_domain
def _unsubscribe_token(self, res_id, email):
"""Generate a secure hash for this mailing list and parameters.
This is appended to the unsubscription URL and then checked at
unsubscription time to ensure no malicious unsubscriptions are
performed.
:param int res_id:
ID of the resource that will be unsubscribed.
:param str email:
Email of the resource that will be unsubscribed.
"""
secret = self.env["ir.config_parameter"].sudo().get_param("database.secret")
token = (self.env.cr.dbname, self.id, int(res_id), tools.ustr(email))
return hmac.new(secret.encode('utf-8'), repr(token).encode('utf-8'), hashlib.sha512).hexdigest()
def _convert_inline_images_to_urls(self, body_html):
"""
Find inline base64 encoded images, make an attachement out of
them and replace the inline image with an url to the attachement.
"""
def _image_to_url(b64image: bytes):
"""Store an image in an attachement and returns an url"""
attachment = self.env['ir.attachment'].create({
'datas': b64image,
'name': "cropped_image_mailing_{}".format(self.id),
'type': 'binary',})
attachment.generate_access_token()
return '/web/image/%s?access_token=%s' % (
attachment.id, attachment.access_token)
modified = False
root = lxml.html.fromstring(body_html)
for node in root.iter('img'):
match = image_re.match(node.attrib.get('src', ''))
if match:
mime = match.group(1) # unsed
image = match.group(2).encode() # base64 image as bytes
node.attrib['src'] = _image_to_url(image)
modified = True
if modified:
return lxml.html.tostring(root)
return body_html