import json import logging import re from datetime import datetime, timedelta import requests from odoo import fields, models, api from odoo.http import request from urllib.parse import urlparse, parse_qs _logger = logging.getLogger(__name__) class UserActivityLog(models.Model): _name = 'user.activity.log' _rec_name = 'page_title' page_title = fields.Char(string="Judul Halaman") url = fields.Char(string="URL") res_user_id = fields.Many2one("res.users", string="User") email = fields.Char(string="Email") update_product = fields.Boolean(string="Update Product") product_id = fields.Many2one('product.template', string='Product') utm_source_id = fields.Many2one('web.utm.source', string='UTM Source') ip_address = fields.Char('IP Address') ip_address_lookup = fields.Text('IP Address Lookup') ip_location_city = fields.Text('IP Location City') ip_location_country = fields.Text('IP Location Country') ip_location_country_code = fields.Text('IP Location Country Code') ip_location_map = fields.Html('Embedded Map', compute='_compute_ip_location_map', sanitize=False) @api.model def create(self, vals): result = super(UserActivityLog, self).create(vals) result.fill_utm_source() return result def fill_utm_source(self): for rec in self: if not rec.url: rec.utm_source_id = False continue parsed_url = urlparse(rec.url) params = parse_qs(parsed_url.query) utm_source_value = params.get('utm_source') if not utm_source_value: continue utm_source = self.env['web.utm.source'].find_or_create_key(utm_source_value[0]) rec.utm_source_id = utm_source.id or False def _parse_json(self, json_string, key): result = '' if json_string: json_object = json.loads(json_string) if key in json_object: result = json_object[key] return result def _compute_ip_location_map(self): self.ip_location_map = "" ip_location_lat = self._parse_json(self.ip_address_lookup, 'lat') ip_location_lon = self._parse_json(self.ip_address_lookup, 'lon') url = 'https://maps.google.com/maps?q=%s,%s&hl=id&z=15&output=embed' % (ip_location_lat, ip_location_lon) if ip_location_lat and ip_location_lon: self.ip_location_map = "" def _parse_ip_location(self): domain = [ ('ip_address_lookup', '!=', False), ('ip_location_city', '=', False), ('ip_location_country', '=', False), ('ip_location_country_code', '=', False), ] logs = self.search(domain, limit=200, order='create_date asc') for log in logs: log.ip_location_city = self._parse_json(log.ip_address_lookup, 'city') log.ip_location_country = self._parse_json(log.ip_address_lookup, 'country') log.ip_location_country_code = self._parse_json(log.ip_address_lookup, 'countryCode') def _load_ip_address_lookup(self): domain = [ ('ip_address', '!=', False), ('ip_address_lookup', '=', False), ] logs = self.search(domain, limit=45, order='create_date asc') for log in logs: try: ipinfo = requests.get('http://ip-api.com/json/%s' % log.ip_address).json() del ipinfo['status'] log.ip_address_lookup = json.dumps(ipinfo, indent=4, sort_keys=True) except: log.ip_address_lookup = '' self._parse_ip_location() def record_activity(self): try: httprequest = request.httprequest if httprequest.remote_addr != '127.0.0.1': self.env['user.activity.log'].sudo().create([{ 'page_title': request.env['ir.config_parameter'].get_param('web.base.url'), 'url': httprequest.base_url, 'ip_address': httprequest.remote_addr }]) return True except: return False def compile_product(self): logs = self.env['user.activity.log'].search([ ('email', '!=', False), ('product_id', '=', False), ('url', 'ilike', 'https://indoteknik.co%/shop/product/%'), ('url', 'not ilike', 'shopping') ], limit=1000, order='create_date desc') for log in logs: _logger.info(log.url) strip_index = i = 0 for c in log.url: if c == '-': strip_index = i i += 1 product_id = log.url[strip_index + 1:len(log.url)] if '#' in product_id: continue if any(ch.isalpha() for ch in product_id): continue product = self.env['product.template'].search([ ('id', '=', product_id) ]) log.product_id = product def clean_activity_log(self): current_time = datetime.now() delta_time = current_time - timedelta(days=180) delta_time = delta_time.strftime('%Y-%m-%d %H:%M:%S') self.env['user.activity.log'].search([ ('create_date', '<', delta_time), ('email', '=', 'False'), ]).unlink() def reset_rank_search_weekly(self): templates = self.env['product.template'].search([ ('type', '=', 'product'), ('active', '=', True), ('search_rank_weekly', '>', 0), ]) for template in templates: template.search_rank_weekly = 0 template.solr_flag = 2 def update_rank_search_weekly(self): current_time = datetime.now() delta_time = current_time - timedelta(days=7) delta_time = delta_time.strftime('%Y-%m-%d %H:%M:%S') activity_logs = self.env['user.activity.log'].search([ ('url', 'ilike', 'https://indoteknik.com%/shop/product/%'), ('create_date', '>', delta_time), ('url', 'not ilike', 'shopping'), ], limit=2000, order='create_date DESC') for activity_log in activity_logs: _logger.info(activity_log.url) strip_index = i = 0 for c in activity_log.url: if c == '-': strip_index = i i += 1 _logger.info(activity_log.url[strip_index + 1:len(activity_log.url)]) product_id = activity_log.url[strip_index + 1:len(activity_log.url)] if '#' in product_id: continue if any(ch.isalpha() for ch in product_id): continue template = self.env['product.template'].search([ ('id', '=', int(product_id)) ], limit=1) if template: template.search_rank_weekly += 1 template.solr_flag = 2 def update_rank_search(self): activity_logs = self.env['user.activity.log'].search([ ('url', 'ilike', '%/shop/product/%'), ('update_product', '!=', True), ], limit=1000, order='create_date DESC') for activity_log in activity_logs: _logger.info(activity_log.url) strip_index = i = 0 for c in activity_log.url: if c == '-': strip_index = i i += 1 _logger.info(activity_log.url[strip_index + 1:len(activity_log.url)]) product_id = activity_log.url[strip_index + 1:len(activity_log.url)] if '#' in product_id: continue if any(ch.isalpha() for ch in product_id): continue template = self.env['product.template'].search([ ('id', '=', int(product_id)) ], limit=1) if template: template.search_rank += 1 activity_log.update_product = True