diff options
| author | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
|---|---|---|
| committer | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
| commit | 3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch) | |
| tree | a44932296ef4a9b71d5f010906253d8c53727726 /addons/bus/models | |
| parent | 0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff) | |
initial commit 2
Diffstat (limited to 'addons/bus/models')
| -rw-r--r-- | addons/bus/models/__init__.py | 5 | ||||
| -rw-r--r-- | addons/bus/models/bus.py | 199 | ||||
| -rw-r--r-- | addons/bus/models/bus_presence.py | 71 | ||||
| -rw-r--r-- | addons/bus/models/res_partner.py | 29 | ||||
| -rw-r--r-- | addons/bus/models/res_users.py | 28 |
5 files changed, 332 insertions, 0 deletions
diff --git a/addons/bus/models/__init__.py b/addons/bus/models/__init__.py new file mode 100644 index 00000000..5f99c824 --- /dev/null +++ b/addons/bus/models/__init__.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +from . import bus +from . import bus_presence +from . import res_users +from . import res_partner diff --git a/addons/bus/models/bus.py b/addons/bus/models/bus.py new file mode 100644 index 00000000..b88a8336 --- /dev/null +++ b/addons/bus/models/bus.py @@ -0,0 +1,199 @@ +# -*- coding: utf-8 -*- +import datetime +import json +import logging +import random +import select +import threading +import time + +import odoo +from odoo import api, fields, models, SUPERUSER_ID +from odoo.tools.misc import DEFAULT_SERVER_DATETIME_FORMAT +from odoo.tools import date_utils + +_logger = logging.getLogger(__name__) + +# longpolling timeout connection +TIMEOUT = 50 + +#---------------------------------------------------------- +# Bus +#---------------------------------------------------------- +def json_dump(v): + return json.dumps(v, separators=(',', ':'), default=date_utils.json_default) + +def hashable(key): + if isinstance(key, list): + key = tuple(key) + return key + + +class ImBus(models.Model): + + _name = 'bus.bus' + _description = 'Communication Bus' + + channel = fields.Char('Channel') + message = fields.Char('Message') + + @api.autovacuum + def _gc_messages(self): + timeout_ago = datetime.datetime.utcnow()-datetime.timedelta(seconds=TIMEOUT*2) + domain = [('create_date', '<', timeout_ago.strftime(DEFAULT_SERVER_DATETIME_FORMAT))] + return self.sudo().search(domain).unlink() + + @api.model + def sendmany(self, notifications): + channels = set() + for channel, message in notifications: + channels.add(channel) + values = { + "channel": json_dump(channel), + "message": json_dump(message) + } + self.sudo().create(values) + if channels: + # We have to wait until the notifications are commited in database. + # When calling `NOTIFY imbus`, some concurrent threads will be + # awakened and will fetch the notification in the bus table. If the + # transaction is not commited yet, there will be nothing to fetch, + # and the longpolling will return no notification. + @self.env.cr.postcommit.add + def notify(): + with odoo.sql_db.db_connect('postgres').cursor() as cr: + cr.execute("notify imbus, %s", (json_dump(list(channels)),)) + + @api.model + def sendone(self, channel, message): + self.sendmany([[channel, message]]) + + @api.model + def poll(self, channels, last=0, options=None): + if options is None: + options = {} + # first poll return the notification in the 'buffer' + if last == 0: + timeout_ago = datetime.datetime.utcnow()-datetime.timedelta(seconds=TIMEOUT) + domain = [('create_date', '>', timeout_ago.strftime(DEFAULT_SERVER_DATETIME_FORMAT))] + else: # else returns the unread notifications + domain = [('id', '>', last)] + channels = [json_dump(c) for c in channels] + domain.append(('channel', 'in', channels)) + notifications = self.sudo().search_read(domain) + # list of notification to return + result = [] + for notif in notifications: + result.append({ + 'id': notif['id'], + 'channel': json.loads(notif['channel']), + 'message': json.loads(notif['message']), + }) + return result + + +#---------------------------------------------------------- +# Dispatcher +#---------------------------------------------------------- +class ImDispatch(object): + def __init__(self): + self.channels = {} + self.started = False + + def poll(self, dbname, channels, last, options=None, timeout=TIMEOUT): + if options is None: + options = {} + # Dont hang ctrl-c for a poll request, we need to bypass private + # attribute access because we dont know before starting the thread that + # it will handle a longpolling request + if not odoo.evented: + current = threading.current_thread() + current._daemonic = True + # rename the thread to avoid tests waiting for a longpolling + current.setName("openerp.longpolling.request.%s" % current.ident) + + registry = odoo.registry(dbname) + + # immediatly returns if past notifications exist + with registry.cursor() as cr: + env = api.Environment(cr, SUPERUSER_ID, {}) + notifications = env['bus.bus'].poll(channels, last, options) + + # immediatly returns in peek mode + if options.get('peek'): + return dict(notifications=notifications, channels=channels) + + # or wait for future ones + if not notifications: + if not self.started: + # Lazy start of events listener + self.start() + + event = self.Event() + for channel in channels: + self.channels.setdefault(hashable(channel), set()).add(event) + try: + event.wait(timeout=timeout) + with registry.cursor() as cr: + env = api.Environment(cr, SUPERUSER_ID, {}) + notifications = env['bus.bus'].poll(channels, last, options) + except Exception: + # timeout + pass + finally: + # gc pointers to event + for channel in channels: + channel_events = self.channels.get(hashable(channel)) + if channel_events and event in channel_events: + channel_events.remove(event) + return notifications + + def loop(self): + """ Dispatch postgres notifications to the relevant polling threads/greenlets """ + _logger.info("Bus.loop listen imbus on db postgres") + with odoo.sql_db.db_connect('postgres').cursor() as cr: + conn = cr._cnx + cr.execute("listen imbus") + cr.commit(); + while True: + if select.select([conn], [], [], TIMEOUT) == ([], [], []): + pass + else: + conn.poll() + channels = [] + while conn.notifies: + channels.extend(json.loads(conn.notifies.pop().payload)) + # dispatch to local threads/greenlets + events = set() + for channel in channels: + events.update(self.channels.pop(hashable(channel), set())) + for event in events: + event.set() + + def run(self): + while True: + try: + self.loop() + except Exception as e: + _logger.exception("Bus.loop error, sleep and retry") + time.sleep(TIMEOUT) + + def start(self): + if odoo.evented: + # gevent mode + import gevent + self.Event = gevent.event.Event + gevent.spawn(self.run) + else: + # threaded mode + self.Event = threading.Event + t = threading.Thread(name="%s.Bus" % __name__, target=self.run) + t.daemon = True + t.start() + self.started = True + return self + +dispatch = None +if not odoo.multi_process or odoo.evented: + # We only use the event dispatcher in threaded and gevent mode + dispatch = ImDispatch() diff --git a/addons/bus/models/bus_presence.py b/addons/bus/models/bus_presence.py new file mode 100644 index 00000000..2f6545bc --- /dev/null +++ b/addons/bus/models/bus_presence.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +import datetime +import time + +from psycopg2 import OperationalError + +from odoo import api, fields, models +from odoo import tools +from odoo.addons.bus.models.bus import TIMEOUT +from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY +from odoo.tools.misc import DEFAULT_SERVER_DATETIME_FORMAT + +DISCONNECTION_TIMER = TIMEOUT + 5 +AWAY_TIMER = 1800 # 30 minutes + + +class BusPresence(models.Model): + """ User Presence + Its status is 'online', 'away' or 'offline'. This model should be a one2one, but is not + attached to res_users to avoid database concurrence errors. Since the 'update' method is executed + at each poll, if the user have multiple opened tabs, concurrence errors can happend, but are 'muted-logged'. + """ + + _name = 'bus.presence' + _description = 'User Presence' + _log_access = False + + _sql_constraints = [('bus_user_presence_unique', 'unique(user_id)', 'A user can only have one IM status.')] + + user_id = fields.Many2one('res.users', 'Users', required=True, index=True, ondelete='cascade') + last_poll = fields.Datetime('Last Poll', default=lambda self: fields.Datetime.now()) + last_presence = fields.Datetime('Last Presence', default=lambda self: fields.Datetime.now()) + status = fields.Selection([('online', 'Online'), ('away', 'Away'), ('offline', 'Offline')], 'IM Status', default='offline') + + @api.model + def update(self, inactivity_period): + """ Updates the last_poll and last_presence of the current user + :param inactivity_period: duration in milliseconds + """ + # This method is called in method _poll() and cursor is closed right + # after; see bus/controllers/main.py. + try: + self._update(inactivity_period) + # commit on success + self.env.cr.commit() + except OperationalError as e: + if e.pgcode in PG_CONCURRENCY_ERRORS_TO_RETRY: + # ignore concurrency error + return self.env.cr.rollback() + raise + + @api.model + def _update(self, inactivity_period): + presence = self.search([('user_id', '=', self._uid)], limit=1) + # compute last_presence timestamp + last_presence = datetime.datetime.now() - datetime.timedelta(milliseconds=inactivity_period) + values = { + 'last_poll': time.strftime(DEFAULT_SERVER_DATETIME_FORMAT), + } + # update the presence or a create a new one + if not presence: # create a new presence for the user + values['user_id'] = self._uid + values['last_presence'] = last_presence + self.create(values) + else: # update the last_presence if necessary, and write values + if presence.last_presence < last_presence: + values['last_presence'] = last_presence + # Hide transaction serialization errors, which can be ignored, the presence update is not essential + with tools.mute_logger('odoo.sql_db'): + presence.write(values) + presence.flush() diff --git a/addons/bus/models/res_partner.py b/addons/bus/models/res_partner.py new file mode 100644 index 00000000..71f2f8fb --- /dev/null +++ b/addons/bus/models/res_partner.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- + +from odoo import api, fields, models +from odoo.addons.bus.models.bus_presence import AWAY_TIMER +from odoo.addons.bus.models.bus_presence import DISCONNECTION_TIMER + + +class ResPartner(models.Model): + _inherit = 'res.partner' + + im_status = fields.Char('IM Status', compute='_compute_im_status') + + def _compute_im_status(self): + self.env.cr.execute(""" + SELECT + U.partner_id as id, + CASE WHEN max(B.last_poll) IS NULL THEN 'offline' + WHEN age(now() AT TIME ZONE 'UTC', max(B.last_poll)) > interval %s THEN 'offline' + WHEN age(now() AT TIME ZONE 'UTC', max(B.last_presence)) > interval %s THEN 'away' + ELSE 'online' + END as status + FROM bus_presence B + RIGHT JOIN res_users U ON B.user_id = U.id + WHERE U.partner_id IN %s AND U.active = 't' + GROUP BY U.partner_id + """, ("%s seconds" % DISCONNECTION_TIMER, "%s seconds" % AWAY_TIMER, tuple(self.ids))) + res = dict(((status['id'], status['status']) for status in self.env.cr.dictfetchall())) + for partner in self: + partner.im_status = res.get(partner.id, 'im_partner') # if not found, it is a partner, useful to avoid to refresh status in js diff --git a/addons/bus/models/res_users.py b/addons/bus/models/res_users.py new file mode 100644 index 00000000..8e40c1b1 --- /dev/null +++ b/addons/bus/models/res_users.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- + +from odoo import api, fields, models +from odoo.addons.bus.models.bus_presence import AWAY_TIMER +from odoo.addons.bus.models.bus_presence import DISCONNECTION_TIMER + + +class ResUsers(models.Model): + + _inherit = "res.users" + + im_status = fields.Char('IM Status', compute='_compute_im_status') + + def _compute_im_status(self): + """ Compute the im_status of the users """ + self.env.cr.execute(""" + SELECT + user_id as id, + CASE WHEN age(now() AT TIME ZONE 'UTC', last_poll) > interval %s THEN 'offline' + WHEN age(now() AT TIME ZONE 'UTC', last_presence) > interval %s THEN 'away' + ELSE 'online' + END as status + FROM bus_presence + WHERE user_id IN %s + """, ("%s seconds" % DISCONNECTION_TIMER, "%s seconds" % AWAY_TIMER, tuple(self.ids))) + res = dict(((status['id'], status['status']) for status in self.env.cr.dictfetchall())) + for user in self: + user.im_status = res.get(user.id, 'offline') |
