summaryrefslogtreecommitdiff
path: root/addons/link_tracker/models/link_tracker.py
diff options
context:
space:
mode:
Diffstat (limited to 'addons/link_tracker/models/link_tracker.py')
-rw-r--r--addons/link_tracker/models/link_tracker.py258
1 files changed, 258 insertions, 0 deletions
diff --git a/addons/link_tracker/models/link_tracker.py b/addons/link_tracker/models/link_tracker.py
new file mode 100644
index 00000000..30cf26fa
--- /dev/null
+++ b/addons/link_tracker/models/link_tracker.py
@@ -0,0 +1,258 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+import random
+import requests
+import string
+
+from lxml import html
+from werkzeug import urls
+
+from odoo import tools, models, fields, api, _
+
+URL_MAX_SIZE = 10 * 1024 * 1024
+
+
+class LinkTracker(models.Model):
+ """ Link trackers allow users to wrap any URL into a short URL that can be
+ tracked by Odoo. Clicks are counter on each link. A tracker is linked to
+ UTMs allowing to analyze marketing actions.
+
+ This model is also used in mass_mailing where each link in html body is
+ automatically converted into a short link that is tracked and integrates
+ UTMs. """
+ _name = "link.tracker"
+ _rec_name = "short_url"
+ _description = "Link Tracker"
+ _order="count DESC"
+ _inherit = ["utm.mixin"]
+
+ # URL info
+ url = fields.Char(string='Target URL', required=True)
+ absolute_url = fields.Char("Absolute URL", compute="_compute_absolute_url")
+ short_url = fields.Char(string='Tracked URL', compute='_compute_short_url')
+ redirected_url = fields.Char(string='Redirected URL', compute='_compute_redirected_url')
+ short_url_host = fields.Char(string='Host of the short URL', compute='_compute_short_url_host')
+ title = fields.Char(string='Page Title', store=True)
+ label = fields.Char(string='Button label')
+ # Tracking
+ link_code_ids = fields.One2many('link.tracker.code', 'link_id', string='Codes')
+ code = fields.Char(string='Short URL code', compute='_compute_code')
+ link_click_ids = fields.One2many('link.tracker.click', 'link_id', string='Clicks')
+ count = fields.Integer(string='Number of Clicks', compute='_compute_count', store=True)
+
+ @api.depends("url")
+ def _compute_absolute_url(self):
+ web_base_url = urls.url_parse(self.env['ir.config_parameter'].sudo().get_param('web.base.url'))
+ for tracker in self:
+ url = urls.url_parse(tracker.url)
+ if url.scheme:
+ tracker.absolute_url = tracker.url
+ else:
+ tracker.absolute_url = web_base_url.join(url).to_url()
+
+ @api.depends('link_click_ids.link_id')
+ def _compute_count(self):
+ if self.ids:
+ clicks_data = self.env['link.tracker.click'].read_group(
+ [('link_id', 'in', self.ids)],
+ ['link_id'],
+ ['link_id']
+ )
+ mapped_data = {m['link_id'][0]: m['link_id_count'] for m in clicks_data}
+ else:
+ mapped_data = dict()
+ for tracker in self:
+ tracker.count = mapped_data.get(tracker.id, 0)
+
+ @api.depends('code')
+ def _compute_short_url(self):
+ for tracker in self:
+ base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
+ tracker.short_url = urls.url_join(base_url, '/r/%(code)s' % {'code': tracker.code})
+
+ def _compute_short_url_host(self):
+ for tracker in self:
+ tracker.short_url_host = self.env['ir.config_parameter'].sudo().get_param('web.base.url') + '/r/'
+
+ def _compute_code(self):
+ for tracker in self:
+ record = self.env['link.tracker.code'].search([('link_id', '=', tracker.id)], limit=1, order='id DESC')
+ tracker.code = record.code
+
+ @api.depends('url')
+ def _compute_redirected_url(self):
+ for tracker in self:
+ parsed = urls.url_parse(tracker.url)
+ utms = {}
+ for key, field_name, cook in self.env['utm.mixin'].tracking_fields():
+ field = self._fields[field_name]
+ attr = getattr(tracker, field_name)
+ if field.type == 'many2one':
+ attr = attr.name
+ if attr:
+ utms[key] = attr
+ utms.update(parsed.decode_query())
+ tracker.redirected_url = parsed.replace(query=urls.url_encode(utms)).to_url()
+
+ @api.model
+ @api.depends('url')
+ def _get_title_from_url(self, url):
+ try:
+ head = requests.head(url, timeout=5)
+ if (
+ int(head.headers.get('Content-Length', 0)) > URL_MAX_SIZE
+ or
+ 'text/html' not in head.headers.get('Content-Type', 'text/html')
+ ):
+ return url
+ # HTML parser can work with a part of page, so ask server to limit downloading to 50 KB
+ page = requests.get(url, timeout=5, headers={"range": "bytes=0-50000"})
+ p = html.fromstring(page.text.encode('utf-8'), parser=html.HTMLParser(encoding='utf-8'))
+ title = p.find('.//title').text
+ except:
+ title = url
+
+ return title
+
+ @api.model
+ def create(self, vals):
+ create_vals = vals.copy()
+
+ if 'url' not in create_vals:
+ raise ValueError('URL field required')
+ else:
+ create_vals['url'] = tools.validate_url(vals['url'])
+
+ search_domain = [
+ (fname, '=', value)
+ for fname, value in create_vals.items()
+ if fname in ['url', 'campaign_id', 'medium_id', 'source_id']
+ ]
+
+ result = self.search(search_domain, limit=1)
+
+ if result:
+ return result
+
+ if not create_vals.get('title'):
+ create_vals['title'] = self._get_title_from_url(create_vals['url'])
+
+ # Prevent the UTMs to be set by the values of UTM cookies
+ for (key, fname, cook) in self.env['utm.mixin'].tracking_fields():
+ if fname not in create_vals:
+ create_vals[fname] = False
+
+ link = super(LinkTracker, self).create(create_vals)
+
+ code = self.env['link.tracker.code'].get_random_code_string()
+ self.env['link.tracker.code'].sudo().create({'code': code, 'link_id': link.id})
+
+ return link
+
+ @api.model
+ def convert_links(self, html, vals, blacklist=None):
+ raise NotImplementedError('Moved on mail.render.mixin')
+
+ def _convert_links_text(self, body, vals, blacklist=None):
+ raise NotImplementedError('Moved on mail.render.mixin')
+
+ def action_view_statistics(self):
+ action = self.env['ir.actions.act_window']._for_xml_id('link_tracker.link_tracker_click_action_statistics')
+ action['domain'] = [('link_id', '=', self.id)]
+ action['context'] = dict(self._context, create=False)
+ return action
+
+ def action_visit_page(self):
+ return {
+ 'name': _("Visit Webpage"),
+ 'type': 'ir.actions.act_url',
+ 'url': self.url,
+ 'target': 'new',
+ }
+
+ @api.model
+ def recent_links(self, filter, limit):
+ if filter == 'newest':
+ return self.search_read([], order='create_date DESC, id DESC', limit=limit)
+ elif filter == 'most-clicked':
+ return self.search_read([('count', '!=', 0)], order='count DESC', limit=limit)
+ elif filter == 'recently-used':
+ return self.search_read([('count', '!=', 0)], order='write_date DESC, id DESC', limit=limit)
+ else:
+ return {'Error': "This filter doesn't exist."}
+
+ @api.model
+ def get_url_from_code(self, code):
+ code_rec = self.env['link.tracker.code'].sudo().search([('code', '=', code)])
+
+ if not code_rec:
+ return None
+
+ return code_rec.link_id.redirected_url
+
+ _sql_constraints = [
+ ('url_utms_uniq', 'unique (url, campaign_id, medium_id, source_id)', 'The URL and the UTM combination must be unique')
+ ]
+
+
+class LinkTrackerCode(models.Model):
+ _name = "link.tracker.code"
+ _description = "Link Tracker Code"
+ _rec_name = 'code'
+
+ code = fields.Char(string='Short URL Code', required=True, store=True)
+ link_id = fields.Many2one('link.tracker', 'Link', required=True, ondelete='cascade')
+
+ _sql_constraints = [
+ ('code', 'unique( code )', 'Code must be unique.')
+ ]
+
+ @api.model
+ def get_random_code_string(self):
+ size = 3
+ while True:
+ code_proposition = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(size))
+
+ if self.search([('code', '=', code_proposition)]):
+ size += 1
+ else:
+ return code_proposition
+
+
+class LinkTrackerClick(models.Model):
+ _name = "link.tracker.click"
+ _rec_name = "link_id"
+ _description = "Link Tracker Click"
+
+ campaign_id = fields.Many2one(
+ 'utm.campaign', 'UTM Campaign',
+ related="link_id.campaign_id", store=True)
+ link_id = fields.Many2one(
+ 'link.tracker', 'Link',
+ index=True, required=True, ondelete='cascade')
+ ip = fields.Char(string='Internet Protocol')
+ country_id = fields.Many2one('res.country', 'Country')
+
+ def _prepare_click_values_from_route(self, **route_values):
+ click_values = dict((fname, route_values[fname]) for fname in self._fields if fname in route_values)
+ if not click_values.get('country_id') and route_values.get('country_code'):
+ click_values['country_id'] = self.env['res.country'].search([('code', '=', route_values['country_code'])], limit=1).id
+ return click_values
+
+ @api.model
+ def add_click(self, code, **route_values):
+ """ Main API to add a click on a link. """
+ tracker_code = self.env['link.tracker.code'].search([('code', '=', code)])
+ if not tracker_code:
+ return None
+
+ ip = route_values.get('ip', False)
+ existing = self.search_count(['&', ('link_id', '=', tracker_code.link_id.id), ('ip', '=', ip)])
+ if existing:
+ return None
+
+ route_values['link_id'] = tracker_code.link_id.id
+ click_values = self._prepare_click_values_from_route(**route_values)
+
+ return self.create(click_values)