summaryrefslogtreecommitdiff
path: root/addons/bus/models
diff options
context:
space:
mode:
authorstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
committerstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
commit3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch)
treea44932296ef4a9b71d5f010906253d8c53727726 /addons/bus/models
parent0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff)
initial commit 2
Diffstat (limited to 'addons/bus/models')
-rw-r--r--addons/bus/models/__init__.py5
-rw-r--r--addons/bus/models/bus.py199
-rw-r--r--addons/bus/models/bus_presence.py71
-rw-r--r--addons/bus/models/res_partner.py29
-rw-r--r--addons/bus/models/res_users.py28
5 files changed, 332 insertions, 0 deletions
diff --git a/addons/bus/models/__init__.py b/addons/bus/models/__init__.py
new file mode 100644
index 00000000..5f99c824
--- /dev/null
+++ b/addons/bus/models/__init__.py
@@ -0,0 +1,5 @@
+# -*- coding: utf-8 -*-
+from . import bus
+from . import bus_presence
+from . import res_users
+from . import res_partner
diff --git a/addons/bus/models/bus.py b/addons/bus/models/bus.py
new file mode 100644
index 00000000..b88a8336
--- /dev/null
+++ b/addons/bus/models/bus.py
@@ -0,0 +1,199 @@
+# -*- coding: utf-8 -*-
+import datetime
+import json
+import logging
+import random
+import select
+import threading
+import time
+
+import odoo
+from odoo import api, fields, models, SUPERUSER_ID
+from odoo.tools.misc import DEFAULT_SERVER_DATETIME_FORMAT
+from odoo.tools import date_utils
+
+_logger = logging.getLogger(__name__)
+
+# longpolling timeout connection
+TIMEOUT = 50
+
+#----------------------------------------------------------
+# Bus
+#----------------------------------------------------------
+def json_dump(v):
+ return json.dumps(v, separators=(',', ':'), default=date_utils.json_default)
+
+def hashable(key):
+ if isinstance(key, list):
+ key = tuple(key)
+ return key
+
+
+class ImBus(models.Model):
+
+ _name = 'bus.bus'
+ _description = 'Communication Bus'
+
+ channel = fields.Char('Channel')
+ message = fields.Char('Message')
+
+ @api.autovacuum
+ def _gc_messages(self):
+ timeout_ago = datetime.datetime.utcnow()-datetime.timedelta(seconds=TIMEOUT*2)
+ domain = [('create_date', '<', timeout_ago.strftime(DEFAULT_SERVER_DATETIME_FORMAT))]
+ return self.sudo().search(domain).unlink()
+
+ @api.model
+ def sendmany(self, notifications):
+ channels = set()
+ for channel, message in notifications:
+ channels.add(channel)
+ values = {
+ "channel": json_dump(channel),
+ "message": json_dump(message)
+ }
+ self.sudo().create(values)
+ if channels:
+ # We have to wait until the notifications are commited in database.
+ # When calling `NOTIFY imbus`, some concurrent threads will be
+ # awakened and will fetch the notification in the bus table. If the
+ # transaction is not commited yet, there will be nothing to fetch,
+ # and the longpolling will return no notification.
+ @self.env.cr.postcommit.add
+ def notify():
+ with odoo.sql_db.db_connect('postgres').cursor() as cr:
+ cr.execute("notify imbus, %s", (json_dump(list(channels)),))
+
+ @api.model
+ def sendone(self, channel, message):
+ self.sendmany([[channel, message]])
+
+ @api.model
+ def poll(self, channels, last=0, options=None):
+ if options is None:
+ options = {}
+ # first poll return the notification in the 'buffer'
+ if last == 0:
+ timeout_ago = datetime.datetime.utcnow()-datetime.timedelta(seconds=TIMEOUT)
+ domain = [('create_date', '>', timeout_ago.strftime(DEFAULT_SERVER_DATETIME_FORMAT))]
+ else: # else returns the unread notifications
+ domain = [('id', '>', last)]
+ channels = [json_dump(c) for c in channels]
+ domain.append(('channel', 'in', channels))
+ notifications = self.sudo().search_read(domain)
+ # list of notification to return
+ result = []
+ for notif in notifications:
+ result.append({
+ 'id': notif['id'],
+ 'channel': json.loads(notif['channel']),
+ 'message': json.loads(notif['message']),
+ })
+ return result
+
+
+#----------------------------------------------------------
+# Dispatcher
+#----------------------------------------------------------
+class ImDispatch(object):
+ def __init__(self):
+ self.channels = {}
+ self.started = False
+
+ def poll(self, dbname, channels, last, options=None, timeout=TIMEOUT):
+ if options is None:
+ options = {}
+ # Dont hang ctrl-c for a poll request, we need to bypass private
+ # attribute access because we dont know before starting the thread that
+ # it will handle a longpolling request
+ if not odoo.evented:
+ current = threading.current_thread()
+ current._daemonic = True
+ # rename the thread to avoid tests waiting for a longpolling
+ current.setName("openerp.longpolling.request.%s" % current.ident)
+
+ registry = odoo.registry(dbname)
+
+ # immediatly returns if past notifications exist
+ with registry.cursor() as cr:
+ env = api.Environment(cr, SUPERUSER_ID, {})
+ notifications = env['bus.bus'].poll(channels, last, options)
+
+ # immediatly returns in peek mode
+ if options.get('peek'):
+ return dict(notifications=notifications, channels=channels)
+
+ # or wait for future ones
+ if not notifications:
+ if not self.started:
+ # Lazy start of events listener
+ self.start()
+
+ event = self.Event()
+ for channel in channels:
+ self.channels.setdefault(hashable(channel), set()).add(event)
+ try:
+ event.wait(timeout=timeout)
+ with registry.cursor() as cr:
+ env = api.Environment(cr, SUPERUSER_ID, {})
+ notifications = env['bus.bus'].poll(channels, last, options)
+ except Exception:
+ # timeout
+ pass
+ finally:
+ # gc pointers to event
+ for channel in channels:
+ channel_events = self.channels.get(hashable(channel))
+ if channel_events and event in channel_events:
+ channel_events.remove(event)
+ return notifications
+
+ def loop(self):
+ """ Dispatch postgres notifications to the relevant polling threads/greenlets """
+ _logger.info("Bus.loop listen imbus on db postgres")
+ with odoo.sql_db.db_connect('postgres').cursor() as cr:
+ conn = cr._cnx
+ cr.execute("listen imbus")
+ cr.commit();
+ while True:
+ if select.select([conn], [], [], TIMEOUT) == ([], [], []):
+ pass
+ else:
+ conn.poll()
+ channels = []
+ while conn.notifies:
+ channels.extend(json.loads(conn.notifies.pop().payload))
+ # dispatch to local threads/greenlets
+ events = set()
+ for channel in channels:
+ events.update(self.channels.pop(hashable(channel), set()))
+ for event in events:
+ event.set()
+
+ def run(self):
+ while True:
+ try:
+ self.loop()
+ except Exception as e:
+ _logger.exception("Bus.loop error, sleep and retry")
+ time.sleep(TIMEOUT)
+
+ def start(self):
+ if odoo.evented:
+ # gevent mode
+ import gevent
+ self.Event = gevent.event.Event
+ gevent.spawn(self.run)
+ else:
+ # threaded mode
+ self.Event = threading.Event
+ t = threading.Thread(name="%s.Bus" % __name__, target=self.run)
+ t.daemon = True
+ t.start()
+ self.started = True
+ return self
+
+dispatch = None
+if not odoo.multi_process or odoo.evented:
+ # We only use the event dispatcher in threaded and gevent mode
+ dispatch = ImDispatch()
diff --git a/addons/bus/models/bus_presence.py b/addons/bus/models/bus_presence.py
new file mode 100644
index 00000000..2f6545bc
--- /dev/null
+++ b/addons/bus/models/bus_presence.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+import datetime
+import time
+
+from psycopg2 import OperationalError
+
+from odoo import api, fields, models
+from odoo import tools
+from odoo.addons.bus.models.bus import TIMEOUT
+from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY
+from odoo.tools.misc import DEFAULT_SERVER_DATETIME_FORMAT
+
+DISCONNECTION_TIMER = TIMEOUT + 5
+AWAY_TIMER = 1800 # 30 minutes
+
+
+class BusPresence(models.Model):
+ """ User Presence
+ Its status is 'online', 'away' or 'offline'. This model should be a one2one, but is not
+ attached to res_users to avoid database concurrence errors. Since the 'update' method is executed
+ at each poll, if the user have multiple opened tabs, concurrence errors can happend, but are 'muted-logged'.
+ """
+
+ _name = 'bus.presence'
+ _description = 'User Presence'
+ _log_access = False
+
+ _sql_constraints = [('bus_user_presence_unique', 'unique(user_id)', 'A user can only have one IM status.')]
+
+ user_id = fields.Many2one('res.users', 'Users', required=True, index=True, ondelete='cascade')
+ last_poll = fields.Datetime('Last Poll', default=lambda self: fields.Datetime.now())
+ last_presence = fields.Datetime('Last Presence', default=lambda self: fields.Datetime.now())
+ status = fields.Selection([('online', 'Online'), ('away', 'Away'), ('offline', 'Offline')], 'IM Status', default='offline')
+
+ @api.model
+ def update(self, inactivity_period):
+ """ Updates the last_poll and last_presence of the current user
+ :param inactivity_period: duration in milliseconds
+ """
+ # This method is called in method _poll() and cursor is closed right
+ # after; see bus/controllers/main.py.
+ try:
+ self._update(inactivity_period)
+ # commit on success
+ self.env.cr.commit()
+ except OperationalError as e:
+ if e.pgcode in PG_CONCURRENCY_ERRORS_TO_RETRY:
+ # ignore concurrency error
+ return self.env.cr.rollback()
+ raise
+
+ @api.model
+ def _update(self, inactivity_period):
+ presence = self.search([('user_id', '=', self._uid)], limit=1)
+ # compute last_presence timestamp
+ last_presence = datetime.datetime.now() - datetime.timedelta(milliseconds=inactivity_period)
+ values = {
+ 'last_poll': time.strftime(DEFAULT_SERVER_DATETIME_FORMAT),
+ }
+ # update the presence or a create a new one
+ if not presence: # create a new presence for the user
+ values['user_id'] = self._uid
+ values['last_presence'] = last_presence
+ self.create(values)
+ else: # update the last_presence if necessary, and write values
+ if presence.last_presence < last_presence:
+ values['last_presence'] = last_presence
+ # Hide transaction serialization errors, which can be ignored, the presence update is not essential
+ with tools.mute_logger('odoo.sql_db'):
+ presence.write(values)
+ presence.flush()
diff --git a/addons/bus/models/res_partner.py b/addons/bus/models/res_partner.py
new file mode 100644
index 00000000..71f2f8fb
--- /dev/null
+++ b/addons/bus/models/res_partner.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+
+from odoo import api, fields, models
+from odoo.addons.bus.models.bus_presence import AWAY_TIMER
+from odoo.addons.bus.models.bus_presence import DISCONNECTION_TIMER
+
+
+class ResPartner(models.Model):
+ _inherit = 'res.partner'
+
+ im_status = fields.Char('IM Status', compute='_compute_im_status')
+
+ def _compute_im_status(self):
+ self.env.cr.execute("""
+ SELECT
+ U.partner_id as id,
+ CASE WHEN max(B.last_poll) IS NULL THEN 'offline'
+ WHEN age(now() AT TIME ZONE 'UTC', max(B.last_poll)) > interval %s THEN 'offline'
+ WHEN age(now() AT TIME ZONE 'UTC', max(B.last_presence)) > interval %s THEN 'away'
+ ELSE 'online'
+ END as status
+ FROM bus_presence B
+ RIGHT JOIN res_users U ON B.user_id = U.id
+ WHERE U.partner_id IN %s AND U.active = 't'
+ GROUP BY U.partner_id
+ """, ("%s seconds" % DISCONNECTION_TIMER, "%s seconds" % AWAY_TIMER, tuple(self.ids)))
+ res = dict(((status['id'], status['status']) for status in self.env.cr.dictfetchall()))
+ for partner in self:
+ partner.im_status = res.get(partner.id, 'im_partner') # if not found, it is a partner, useful to avoid to refresh status in js
diff --git a/addons/bus/models/res_users.py b/addons/bus/models/res_users.py
new file mode 100644
index 00000000..8e40c1b1
--- /dev/null
+++ b/addons/bus/models/res_users.py
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+
+from odoo import api, fields, models
+from odoo.addons.bus.models.bus_presence import AWAY_TIMER
+from odoo.addons.bus.models.bus_presence import DISCONNECTION_TIMER
+
+
+class ResUsers(models.Model):
+
+ _inherit = "res.users"
+
+ im_status = fields.Char('IM Status', compute='_compute_im_status')
+
+ def _compute_im_status(self):
+ """ Compute the im_status of the users """
+ self.env.cr.execute("""
+ SELECT
+ user_id as id,
+ CASE WHEN age(now() AT TIME ZONE 'UTC', last_poll) > interval %s THEN 'offline'
+ WHEN age(now() AT TIME ZONE 'UTC', last_presence) > interval %s THEN 'away'
+ ELSE 'online'
+ END as status
+ FROM bus_presence
+ WHERE user_id IN %s
+ """, ("%s seconds" % DISCONNECTION_TIMER, "%s seconds" % AWAY_TIMER, tuple(self.ids)))
+ res = dict(((status['id'], status['status']) for status in self.env.cr.dictfetchall()))
+ for user in self:
+ user.im_status = res.get(user.id, 'offline')