summaryrefslogtreecommitdiff
path: root/addons/web/controllers
diff options
context:
space:
mode:
authorstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
committerstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
commit3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch)
treea44932296ef4a9b71d5f010906253d8c53727726 /addons/web/controllers
parent0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff)
initial commit 2
Diffstat (limited to 'addons/web/controllers')
-rw-r--r--addons/web/controllers/__init__.py4
-rw-r--r--addons/web/controllers/main.py2155
-rw-r--r--addons/web/controllers/pivot.py110
3 files changed, 2269 insertions, 0 deletions
diff --git a/addons/web/controllers/__init__.py b/addons/web/controllers/__init__.py
new file mode 100644
index 00000000..48cc3726
--- /dev/null
+++ b/addons/web/controllers/__init__.py
@@ -0,0 +1,4 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from . import main, pivot
diff --git a/addons/web/controllers/main.py b/addons/web/controllers/main.py
new file mode 100644
index 00000000..ab9168c8
--- /dev/null
+++ b/addons/web/controllers/main.py
@@ -0,0 +1,2155 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+import babel.messages.pofile
+import base64
+import copy
+import datetime
+import functools
+import glob
+import hashlib
+import io
+import itertools
+import jinja2
+import json
+import logging
+import operator
+import os
+import re
+import sys
+import tempfile
+
+import werkzeug
+import werkzeug.exceptions
+import werkzeug.utils
+import werkzeug.wrappers
+import werkzeug.wsgi
+from collections import OrderedDict, defaultdict, Counter
+from werkzeug.urls import url_encode, url_decode, iri_to_uri
+from lxml import etree
+import unicodedata
+
+
+import odoo
+import odoo.modules.registry
+from odoo.api import call_kw, Environment
+from odoo.modules import get_module_path, get_resource_path
+from odoo.tools import image_process, topological_sort, html_escape, pycompat, ustr, apply_inheritance_specs, lazy_property, float_repr
+from odoo.tools.mimetypes import guess_mimetype
+from odoo.tools.translate import _
+from odoo.tools.misc import str2bool, xlsxwriter, file_open
+from odoo.tools.safe_eval import safe_eval, time
+from odoo import http, tools
+from odoo.http import content_disposition, dispatch_rpc, request, serialize_exception as _serialize_exception, Response
+from odoo.exceptions import AccessError, UserError, AccessDenied
+from odoo.models import check_method_name
+from odoo.service import db, security
+
+_logger = logging.getLogger(__name__)
+
+if hasattr(sys, 'frozen'):
+ # When running on compiled windows binary, we don't have access to package loader.
+ path = os.path.realpath(os.path.join(os.path.dirname(__file__), '..', 'views'))
+ loader = jinja2.FileSystemLoader(path)
+else:
+ loader = jinja2.PackageLoader('odoo.addons.web', "views")
+
+env = jinja2.Environment(loader=loader, autoescape=True)
+env.filters["json"] = json.dumps
+
+CONTENT_MAXAGE = http.STATIC_CACHE_LONG # menus, translations, static qweb
+
+DBNAME_PATTERN = '^[a-zA-Z0-9][a-zA-Z0-9_.-]+$'
+
+COMMENT_PATTERN = r'Modified by [\s\w\-.]+ from [\s\w\-.]+'
+
+
+def none_values_filtered(func):
+ @functools.wraps(func)
+ def wrap(iterable):
+ return func(v for v in iterable if v is not None)
+ return wrap
+
+def allow_empty_iterable(func):
+ """
+ Some functions do not accept empty iterables (e.g. max, min with no default value)
+ This returns the function `func` such that it returns None if the iterable
+ is empty instead of raising a ValueError.
+ """
+ @functools.wraps(func)
+ def wrap(iterable):
+ iterator = iter(iterable)
+ try:
+ value = next(iterator)
+ return func(itertools.chain([value], iterator))
+ except StopIteration:
+ return None
+ return wrap
+
+OPERATOR_MAPPING = {
+ 'max': none_values_filtered(allow_empty_iterable(max)),
+ 'min': none_values_filtered(allow_empty_iterable(min)),
+ 'sum': sum,
+ 'bool_and': all,
+ 'bool_or': any,
+}
+
+#----------------------------------------------------------
+# Odoo Web helpers
+#----------------------------------------------------------
+
+db_list = http.db_list
+
+db_monodb = http.db_monodb
+
+def clean(name): return name.replace('\x3c', '')
+def serialize_exception(f):
+ @functools.wraps(f)
+ def wrap(*args, **kwargs):
+ try:
+ return f(*args, **kwargs)
+ except Exception as e:
+ _logger.exception("An exception occured during an http request")
+ se = _serialize_exception(e)
+ error = {
+ 'code': 200,
+ 'message': "Odoo Server Error",
+ 'data': se
+ }
+ return werkzeug.exceptions.InternalServerError(json.dumps(error))
+ return wrap
+
+def redirect_with_hash(*args, **kw):
+ """
+ .. deprecated:: 8.0
+
+ Use the ``http.redirect_with_hash()`` function instead.
+ """
+ return http.redirect_with_hash(*args, **kw)
+
+def abort_and_redirect(url):
+ r = request.httprequest
+ response = werkzeug.utils.redirect(url, 302)
+ response = r.app.get_response(r, response, explicit_session=False)
+ werkzeug.exceptions.abort(response)
+
+def ensure_db(redirect='/web/database/selector'):
+ # This helper should be used in web client auth="none" routes
+ # if those routes needs a db to work with.
+ # If the heuristics does not find any database, then the users will be
+ # redirected to db selector or any url specified by `redirect` argument.
+ # If the db is taken out of a query parameter, it will be checked against
+ # `http.db_filter()` in order to ensure it's legit and thus avoid db
+ # forgering that could lead to xss attacks.
+ db = request.params.get('db') and request.params.get('db').strip()
+
+ # Ensure db is legit
+ if db and db not in http.db_filter([db]):
+ db = None
+
+ if db and not request.session.db:
+ # User asked a specific database on a new session.
+ # That mean the nodb router has been used to find the route
+ # Depending on installed module in the database, the rendering of the page
+ # may depend on data injected by the database route dispatcher.
+ # Thus, we redirect the user to the same page but with the session cookie set.
+ # This will force using the database route dispatcher...
+ r = request.httprequest
+ url_redirect = werkzeug.urls.url_parse(r.base_url)
+ if r.query_string:
+ # in P3, request.query_string is bytes, the rest is text, can't mix them
+ query_string = iri_to_uri(r.query_string)
+ url_redirect = url_redirect.replace(query=query_string)
+ request.session.db = db
+ abort_and_redirect(url_redirect)
+
+ # if db not provided, use the session one
+ if not db and request.session.db and http.db_filter([request.session.db]):
+ db = request.session.db
+
+ # if no database provided and no database in session, use monodb
+ if not db:
+ db = db_monodb(request.httprequest)
+
+ # if no db can be found til here, send to the database selector
+ # the database selector will redirect to database manager if needed
+ if not db:
+ werkzeug.exceptions.abort(werkzeug.utils.redirect(redirect, 303))
+
+ # always switch the session to the computed db
+ if db != request.session.db:
+ request.session.logout()
+ abort_and_redirect(request.httprequest.url)
+
+ request.session.db = db
+
+def module_installed(environment):
+ # Candidates module the current heuristic is the /static dir
+ loadable = list(http.addons_manifest)
+
+ # Retrieve database installed modules
+ # TODO The following code should move to ir.module.module.list_installed_modules()
+ Modules = environment['ir.module.module']
+ domain = [('state','=','installed'), ('name','in', loadable)]
+ modules = OrderedDict(
+ (module.name, module.dependencies_id.mapped('name'))
+ for module in Modules.search(domain)
+ )
+
+ sorted_modules = topological_sort(modules)
+ return sorted_modules
+
+def module_installed_bypass_session(dbname):
+ try:
+ registry = odoo.registry(dbname)
+ with registry.cursor() as cr:
+ return module_installed(
+ environment=Environment(cr, odoo.SUPERUSER_ID, {}))
+ except Exception:
+ pass
+ return {}
+
+def module_boot(db=None):
+ server_wide_modules = odoo.conf.server_wide_modules or []
+ serverside = ['base', 'web']
+ dbside = []
+ for i in server_wide_modules:
+ if i in http.addons_manifest and i not in serverside:
+ serverside.append(i)
+ monodb = db or db_monodb()
+ if monodb:
+ dbside = module_installed_bypass_session(monodb)
+ dbside = [i for i in dbside if i not in serverside]
+ addons = serverside + dbside
+ return addons
+
+
+def fs2web(path):
+ """convert FS path into web path"""
+ return '/'.join(path.split(os.path.sep))
+
+def manifest_glob(extension, addons=None, db=None, include_remotes=False):
+ if addons is None:
+ addons = module_boot(db=db)
+
+ r = []
+ for addon in addons:
+ manifest = http.addons_manifest.get(addon, None)
+ if not manifest:
+ continue
+ # ensure does not ends with /
+ addons_path = os.path.join(manifest['addons_path'], '')[:-1]
+ globlist = manifest.get(extension, [])
+ for pattern in globlist:
+ if pattern.startswith(('http://', 'https://', '//')):
+ if include_remotes:
+ r.append((None, pattern, addon))
+ else:
+ for path in glob.glob(os.path.normpath(os.path.join(addons_path, addon, pattern))):
+ r.append((path, fs2web(path[len(addons_path):]), addon))
+ return r
+
+
+def manifest_list(extension, mods=None, db=None, debug=None):
+ """ list resources to load specifying either:
+ mods: a comma separated string listing modules
+ db: a database name (return all installed modules in that database)
+ """
+ if debug is not None:
+ _logger.warning("odoo.addons.web.main.manifest_list(): debug parameter is deprecated")
+ mods = mods.split(',')
+ files = manifest_glob(extension, addons=mods, db=db, include_remotes=True)
+ return [wp for _fp, wp, addon in files]
+
+def get_last_modified(files):
+ """ Returns the modification time of the most recently modified
+ file provided
+
+ :param list(str) files: names of files to check
+ :return: most recent modification time amongst the fileset
+ :rtype: datetime.datetime
+ """
+ files = list(files)
+ if files:
+ return max(datetime.datetime.fromtimestamp(os.path.getmtime(f))
+ for f in files)
+ return datetime.datetime(1970, 1, 1)
+
+def make_conditional(response, last_modified=None, etag=None, max_age=0):
+ """ Makes the provided response conditional based upon the request,
+ and mandates revalidation from clients
+
+ Uses Werkzeug's own :meth:`ETagResponseMixin.make_conditional`, after
+ setting ``last_modified`` and ``etag`` correctly on the response object
+
+ :param response: Werkzeug response
+ :type response: werkzeug.wrappers.Response
+ :param datetime.datetime last_modified: last modification date of the response content
+ :param str etag: some sort of checksum of the content (deep etag)
+ :return: the response object provided
+ :rtype: werkzeug.wrappers.Response
+ """
+ response.cache_control.must_revalidate = True
+ response.cache_control.max_age = max_age
+ if last_modified:
+ response.last_modified = last_modified
+ if etag:
+ response.set_etag(etag)
+ return response.make_conditional(request.httprequest)
+
+def _get_login_redirect_url(uid, redirect=None):
+ """ Decide if user requires a specific post-login redirect, e.g. for 2FA, or if they are
+ fully logged and can proceed to the requested URL
+ """
+ if request.session.uid: # fully logged
+ return redirect or '/web'
+
+ # partial session (MFA)
+ url = request.env(user=uid)['res.users'].browse(uid)._mfa_url()
+ if not redirect:
+ return url
+
+ parsed = werkzeug.urls.url_parse(url)
+ qs = parsed.decode_query()
+ qs['redirect'] = redirect
+ return parsed.replace(query=werkzeug.urls.url_encode(qs)).to_url()
+
+def login_and_redirect(db, login, key, redirect_url='/web'):
+ uid = request.session.authenticate(db, login, key)
+ redirect_url = _get_login_redirect_url(uid, redirect_url)
+ return set_cookie_and_redirect(redirect_url)
+
+def set_cookie_and_redirect(redirect_url):
+ redirect = werkzeug.utils.redirect(redirect_url, 303)
+ redirect.autocorrect_location_header = False
+ return redirect
+
+def clean_action(action, env):
+ action_type = action.setdefault('type', 'ir.actions.act_window_close')
+ if action_type == 'ir.actions.act_window':
+ action = fix_view_modes(action)
+
+ # When returning an action, keep only relevant fields/properties
+ readable_fields = env[action['type']]._get_readable_fields()
+ action_type_fields = env[action['type']]._fields.keys()
+
+ cleaned_action = {
+ field: value
+ for field, value in action.items()
+ # keep allowed fields and custom properties fields
+ if field in readable_fields or field not in action_type_fields
+ }
+
+ # Warn about custom properties fields, because use is discouraged
+ action_name = action.get('name') or action
+ custom_properties = action.keys() - readable_fields - action_type_fields
+ if custom_properties:
+ _logger.warning("Action %r contains custom properties %s. Passing them "
+ "via the `params` or `context` properties is recommended instead",
+ action_name, ', '.join(map(repr, custom_properties)))
+
+ return cleaned_action
+
+# I think generate_views,fix_view_modes should go into js ActionManager
+def generate_views(action):
+ """
+ While the server generates a sequence called "views" computing dependencies
+ between a bunch of stuff for views coming directly from the database
+ (the ``ir.actions.act_window model``), it's also possible for e.g. buttons
+ to return custom view dictionaries generated on the fly.
+
+ In that case, there is no ``views`` key available on the action.
+
+ Since the web client relies on ``action['views']``, generate it here from
+ ``view_mode`` and ``view_id``.
+
+ Currently handles two different cases:
+
+ * no view_id, multiple view_mode
+ * single view_id, single view_mode
+
+ :param dict action: action descriptor dictionary to generate a views key for
+ """
+ view_id = action.get('view_id') or False
+ if isinstance(view_id, (list, tuple)):
+ view_id = view_id[0]
+
+ # providing at least one view mode is a requirement, not an option
+ view_modes = action['view_mode'].split(',')
+
+ if len(view_modes) > 1:
+ if view_id:
+ raise ValueError('Non-db action dictionaries should provide '
+ 'either multiple view modes or a single view '
+ 'mode and an optional view id.\n\n Got view '
+ 'modes %r and view id %r for action %r' % (
+ view_modes, view_id, action))
+ action['views'] = [(False, mode) for mode in view_modes]
+ return
+ action['views'] = [(view_id, view_modes[0])]
+
+def fix_view_modes(action):
+ """ For historical reasons, Odoo has weird dealings in relation to
+ view_mode and the view_type attribute (on window actions):
+
+ * one of the view modes is ``tree``, which stands for both list views
+ and tree views
+ * the choice is made by checking ``view_type``, which is either
+ ``form`` for a list view or ``tree`` for an actual tree view
+
+ This methods simply folds the view_type into view_mode by adding a
+ new view mode ``list`` which is the result of the ``tree`` view_mode
+ in conjunction with the ``form`` view_type.
+
+ TODO: this should go into the doc, some kind of "peculiarities" section
+
+ :param dict action: an action descriptor
+ :returns: nothing, the action is modified in place
+ """
+ if not action.get('views'):
+ generate_views(action)
+
+ if action.pop('view_type', 'form') != 'form':
+ return action
+
+ if 'view_mode' in action:
+ action['view_mode'] = ','.join(
+ mode if mode != 'tree' else 'list'
+ for mode in action['view_mode'].split(','))
+ action['views'] = [
+ [id, mode if mode != 'tree' else 'list']
+ for id, mode in action['views']
+ ]
+
+ return action
+
+def _local_web_translations(trans_file):
+ messages = []
+ try:
+ with open(trans_file) as t_file:
+ po = babel.messages.pofile.read_po(t_file)
+ except Exception:
+ return
+ for x in po:
+ if x.id and x.string and "openerp-web" in x.auto_comments:
+ messages.append({'id': x.id, 'string': x.string})
+ return messages
+
+def xml2json_from_elementtree(el, preserve_whitespaces=False):
+ """ xml2json-direct
+ Simple and straightforward XML-to-JSON converter in Python
+ New BSD Licensed
+ http://code.google.com/p/xml2json-direct/
+ """
+ res = {}
+ if el.tag[0] == "{":
+ ns, name = el.tag.rsplit("}", 1)
+ res["tag"] = name
+ res["namespace"] = ns[1:]
+ else:
+ res["tag"] = el.tag
+ res["attrs"] = {}
+ for k, v in el.items():
+ res["attrs"][k] = v
+ kids = []
+ if el.text and (preserve_whitespaces or el.text.strip() != ''):
+ kids.append(el.text)
+ for kid in el:
+ kids.append(xml2json_from_elementtree(kid, preserve_whitespaces))
+ if kid.tail and (preserve_whitespaces or kid.tail.strip() != ''):
+ kids.append(kid.tail)
+ res["children"] = kids
+ return res
+
+class HomeStaticTemplateHelpers(object):
+ """
+ Helper Class that wraps the reading of static qweb templates files
+ and xpath inheritance applied to those templates
+ /!\ Template inheritance order is defined by ir.module.module natural order
+ which is "sequence, name"
+ Then a topological sort is applied, which just puts dependencies
+ of a module before that module
+ """
+ NAME_TEMPLATE_DIRECTIVE = 't-name'
+ STATIC_INHERIT_DIRECTIVE = 't-inherit'
+ STATIC_INHERIT_MODE_DIRECTIVE = 't-inherit-mode'
+ PRIMARY_MODE = 'primary'
+ EXTENSION_MODE = 'extension'
+ DEFAULT_MODE = PRIMARY_MODE
+
+ def __init__(self, addons, db, checksum_only=False, debug=False):
+ '''
+ :param str|list addons: plain list or comma separated list of addons
+ :param str db: the current db we are working on
+ :param bool checksum_only: only computes the checksum of all files for addons
+ :param str debug: the debug mode of the session
+ '''
+ super(HomeStaticTemplateHelpers, self).__init__()
+ self.addons = addons.split(',') if isinstance(addons, str) else addons
+ self.db = db
+ self.debug = debug
+ self.checksum_only = checksum_only
+ self.template_dict = OrderedDict()
+
+ def _get_parent_template(self, addon, template):
+ """Computes the real addon name and the template name
+ of the parent template (the one that is inherited from)
+
+ :param str addon: the addon the template is declared in
+ :param etree template: the current template we are are handling
+ :returns: (str, str)
+ """
+ original_template_name = template.attrib[self.STATIC_INHERIT_DIRECTIVE]
+ split_name_attempt = original_template_name.split('.', 1)
+ parent_addon, parent_name = tuple(split_name_attempt) if len(split_name_attempt) == 2 else (addon, original_template_name)
+ if parent_addon not in self.template_dict:
+ if original_template_name in self.template_dict[addon]:
+ parent_addon = addon
+ parent_name = original_template_name
+ else:
+ raise ValueError(_('Module %s not loaded or inexistent, or templates of addon being loaded (%s) are misordered') % (parent_addon, addon))
+
+ if parent_name not in self.template_dict[parent_addon]:
+ raise ValueError(_("No template found to inherit from. Module %s and template name %s") % (parent_addon, parent_name))
+
+ return parent_addon, parent_name
+
+ def _compute_xml_tree(self, addon, file_name, source):
+ """Computes the xml tree that 'source' contains
+ Applies inheritance specs in the process
+
+ :param str addon: the current addon we are reading files for
+ :param str file_name: the current name of the file we are reading
+ :param str source: the content of the file
+ :returns: etree
+ """
+ try:
+ all_templates_tree = etree.parse(io.BytesIO(source), parser=etree.XMLParser(remove_comments=True)).getroot()
+ except etree.ParseError as e:
+ _logger.error("Could not parse file %s: %s" % (file_name, e.msg))
+ raise e
+
+ self.template_dict.setdefault(addon, OrderedDict())
+ for template_tree in list(all_templates_tree):
+ if self.NAME_TEMPLATE_DIRECTIVE in template_tree.attrib:
+ template_name = template_tree.attrib[self.NAME_TEMPLATE_DIRECTIVE]
+ dotted_names = template_name.split('.', 1)
+ if len(dotted_names) > 1 and dotted_names[0] == addon:
+ template_name = dotted_names[1]
+ else:
+ # self.template_dict[addon] grows after processing each template
+ template_name = 'anonymous_template_%s' % len(self.template_dict[addon])
+ if self.STATIC_INHERIT_DIRECTIVE in template_tree.attrib:
+ inherit_mode = template_tree.attrib.get(self.STATIC_INHERIT_MODE_DIRECTIVE, self.DEFAULT_MODE)
+ if inherit_mode not in [self.PRIMARY_MODE, self.EXTENSION_MODE]:
+ raise ValueError(_("Invalid inherit mode. Module %s and template name %s") % (addon, template_name))
+
+ parent_addon, parent_name = self._get_parent_template(addon, template_tree)
+
+ # After several performance tests, we found out that deepcopy is the most efficient
+ # solution in this case (compared with copy, xpath with '.' and stringifying).
+ parent_tree = copy.deepcopy(self.template_dict[parent_addon][parent_name])
+
+ xpaths = list(template_tree)
+ if self.debug and inherit_mode == self.EXTENSION_MODE:
+ for xpath in xpaths:
+ xpath.insert(0, etree.Comment(" Modified by %s from %s " % (template_name, addon)))
+ elif inherit_mode == self.PRIMARY_MODE:
+ parent_tree.tag = template_tree.tag
+ inherited_template = apply_inheritance_specs(parent_tree, xpaths)
+
+ if inherit_mode == self.PRIMARY_MODE: # New template_tree: A' = B(A)
+ for attr_name, attr_val in template_tree.attrib.items():
+ if attr_name not in ('t-inherit', 't-inherit-mode'):
+ inherited_template.set(attr_name, attr_val)
+ if self.debug:
+ self._remove_inheritance_comments(inherited_template)
+ self.template_dict[addon][template_name] = inherited_template
+
+ else: # Modifies original: A = B(A)
+ self.template_dict[parent_addon][parent_name] = inherited_template
+ else:
+ if template_name in self.template_dict[addon]:
+ raise ValueError(_("Template %s already exists in module %s") % (template_name, addon))
+ self.template_dict[addon][template_name] = template_tree
+ return all_templates_tree
+
+ def _remove_inheritance_comments(self, inherited_template):
+ '''Remove the comments added in the template already, they come from other templates extending
+ the base of this inheritance
+
+ :param inherited_template:
+ '''
+ for comment in inherited_template.xpath('//comment()'):
+ if re.match(COMMENT_PATTERN, comment.text.strip()):
+ comment.getparent().remove(comment)
+
+ def _manifest_glob(self):
+ '''Proxy for manifest_glob
+ Usefull to make 'self' testable'''
+ return manifest_glob('qweb', self.addons, self.db)
+
+ def _read_addon_file(self, file_path):
+ """Reads the content of a file given by file_path
+ Usefull to make 'self' testable
+ :param str file_path:
+ :returns: str
+ """
+ with open(file_path, 'rb') as fp:
+ contents = fp.read()
+ return contents
+
+ def _concat_xml(self, file_dict):
+ """Concatenate xml files
+
+ :param dict(list) file_dict:
+ key: addon name
+ value: list of files for an addon
+ :returns: (concatenation_result, checksum)
+ :rtype: (bytes, str)
+ """
+ checksum = hashlib.new('sha512') # sha512/256
+ if not file_dict:
+ return b'', checksum.hexdigest()
+
+ root = None
+ for addon, fnames in file_dict.items():
+ for fname in fnames:
+ contents = self._read_addon_file(fname)
+ checksum.update(contents)
+ if not self.checksum_only:
+ xml = self._compute_xml_tree(addon, fname, contents)
+
+ if root is None:
+ root = etree.Element(xml.tag)
+
+ for addon in self.template_dict.values():
+ for template in addon.values():
+ root.append(template)
+
+ return etree.tostring(root, encoding='utf-8') if root is not None else b'', checksum.hexdigest()[:64]
+
+ def _get_qweb_templates(self):
+ """One and only entry point that gets and evaluates static qweb templates
+
+ :rtype: (str, str)
+ """
+ files = OrderedDict([(addon, list()) for addon in self.addons])
+ [files[f[2]].append(f[0]) for f in self._manifest_glob()]
+ content, checksum = self._concat_xml(files)
+ return content, checksum
+
+ @classmethod
+ def get_qweb_templates_checksum(cls, addons, db=None, debug=False):
+ return cls(addons, db, checksum_only=True, debug=debug)._get_qweb_templates()[1]
+
+ @classmethod
+ def get_qweb_templates(cls, addons, db=None, debug=False):
+ return cls(addons, db, debug=debug)._get_qweb_templates()[0]
+
+
+class GroupsTreeNode:
+ """
+ This class builds an ordered tree of groups from the result of a `read_group(lazy=False)`.
+ The `read_group` returns a list of dictionnaries and each dictionnary is used to
+ build a leaf. The entire tree is built by inserting all leaves.
+ """
+
+ def __init__(self, model, fields, groupby, groupby_type, root=None):
+ self._model = model
+ self._export_field_names = fields # exported field names (e.g. 'journal_id', 'account_id/name', ...)
+ self._groupby = groupby
+ self._groupby_type = groupby_type
+
+ self.count = 0 # Total number of records in the subtree
+ self.children = OrderedDict()
+ self.data = [] # Only leaf nodes have data
+
+ if root:
+ self.insert_leaf(root)
+
+ def _get_aggregate(self, field_name, data, group_operator):
+ # When exporting one2many fields, multiple data lines might be exported for one record.
+ # Blank cells of additionnal lines are filled with an empty string. This could lead to '' being
+ # aggregated with an integer or float.
+ data = (value for value in data if value != '')
+
+ if group_operator == 'avg':
+ return self._get_avg_aggregate(field_name, data)
+
+ aggregate_func = OPERATOR_MAPPING.get(group_operator)
+ if not aggregate_func:
+ _logger.warning("Unsupported export of group_operator '%s' for field %s on model %s" % (group_operator, field_name, self._model._name))
+ return
+
+ if self.data:
+ return aggregate_func(data)
+ return aggregate_func((child.aggregated_values.get(field_name) for child in self.children.values()))
+
+ def _get_avg_aggregate(self, field_name, data):
+ aggregate_func = OPERATOR_MAPPING.get('sum')
+ if self.data:
+ return aggregate_func(data) / self.count
+ children_sums = (child.aggregated_values.get(field_name) * child.count for child in self.children.values())
+ return aggregate_func(children_sums) / self.count
+
+ def _get_aggregated_field_names(self):
+ """ Return field names of exported field having a group operator """
+ aggregated_field_names = []
+ for field_name in self._export_field_names:
+ if field_name == '.id':
+ field_name = 'id'
+ if '/' in field_name:
+ # Currently no support of aggregated value for nested record fields
+ # e.g. line_ids/analytic_line_ids/amount
+ continue
+ field = self._model._fields[field_name]
+ if field.group_operator:
+ aggregated_field_names.append(field_name)
+ return aggregated_field_names
+
+ # Lazy property to memoize aggregated values of children nodes to avoid useless recomputations
+ @lazy_property
+ def aggregated_values(self):
+
+ aggregated_values = {}
+
+ # Transpose the data matrix to group all values of each field in one iterable
+ field_values = zip(*self.data)
+ for field_name in self._export_field_names:
+ field_data = self.data and next(field_values) or []
+
+ if field_name in self._get_aggregated_field_names():
+ field = self._model._fields[field_name]
+ aggregated_values[field_name] = self._get_aggregate(field_name, field_data, field.group_operator)
+
+ return aggregated_values
+
+ def child(self, key):
+ """
+ Return the child identified by `key`.
+ If it doesn't exists inserts a default node and returns it.
+ :param key: child key identifier (groupby value as returned by read_group,
+ usually (id, display_name))
+ :return: the child node
+ """
+ if key not in self.children:
+ self.children[key] = GroupsTreeNode(self._model, self._export_field_names, self._groupby, self._groupby_type)
+ return self.children[key]
+
+ def insert_leaf(self, group):
+ """
+ Build a leaf from `group` and insert it in the tree.
+ :param group: dict as returned by `read_group(lazy=False)`
+ """
+ leaf_path = [group.get(groupby_field) for groupby_field in self._groupby]
+ domain = group.pop('__domain')
+ count = group.pop('__count')
+
+ records = self._model.search(domain, offset=0, limit=False, order=False)
+
+ # Follow the path from the top level group to the deepest
+ # group which actually contains the records' data.
+ node = self # root
+ node.count += count
+ for node_key in leaf_path:
+ # Go down to the next node or create one if it does not exist yet.
+ node = node.child(node_key)
+ # Update count value and aggregated value.
+ node.count += count
+
+ node.data = records.export_data(self._export_field_names).get('datas',[])
+
+
+class ExportXlsxWriter:
+
+ def __init__(self, field_names, row_count=0):
+ self.field_names = field_names
+ self.output = io.BytesIO()
+ self.workbook = xlsxwriter.Workbook(self.output, {'in_memory': True})
+ self.base_style = self.workbook.add_format({'text_wrap': True})
+ self.header_style = self.workbook.add_format({'bold': True})
+ self.header_bold_style = self.workbook.add_format({'text_wrap': True, 'bold': True, 'bg_color': '#e9ecef'})
+ self.date_style = self.workbook.add_format({'text_wrap': True, 'num_format': 'yyyy-mm-dd'})
+ self.datetime_style = self.workbook.add_format({'text_wrap': True, 'num_format': 'yyyy-mm-dd hh:mm:ss'})
+ self.worksheet = self.workbook.add_worksheet()
+ self.value = False
+
+ if row_count > self.worksheet.xls_rowmax:
+ raise UserError(_('There are too many rows (%s rows, limit: %s) to export as Excel 2007-2013 (.xlsx) format. Consider splitting the export.') % (row_count, self.worksheet.xls_rowmax))
+
+ def __enter__(self):
+ self.write_header()
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_traceback):
+ self.close()
+
+ def write_header(self):
+ # Write main header
+ for i, fieldname in enumerate(self.field_names):
+ self.write(0, i, fieldname, self.header_style)
+ self.worksheet.set_column(0, i, 30) # around 220 pixels
+
+ def close(self):
+ self.workbook.close()
+ with self.output:
+ self.value = self.output.getvalue()
+
+ def write(self, row, column, cell_value, style=None):
+ self.worksheet.write(row, column, cell_value, style)
+
+ def write_cell(self, row, column, cell_value):
+ cell_style = self.base_style
+
+ if isinstance(cell_value, bytes):
+ try:
+ # because xlsx uses raw export, we can get a bytes object
+ # here. xlsxwriter does not support bytes values in Python 3 ->
+ # assume this is base64 and decode to a string, if this
+ # fails note that you can't export
+ cell_value = pycompat.to_text(cell_value)
+ except UnicodeDecodeError:
+ raise UserError(_("Binary fields can not be exported to Excel unless their content is base64-encoded. That does not seem to be the case for %s.", self.field_names)[column])
+
+ if isinstance(cell_value, str):
+ if len(cell_value) > self.worksheet.xls_strmax:
+ cell_value = _("The content of this cell is too long for an XLSX file (more than %s characters). Please use the CSV format for this export.", self.worksheet.xls_strmax)
+ else:
+ cell_value = cell_value.replace("\r", " ")
+ elif isinstance(cell_value, datetime.datetime):
+ cell_style = self.datetime_style
+ elif isinstance(cell_value, datetime.date):
+ cell_style = self.date_style
+ self.write(row, column, cell_value, cell_style)
+
+class GroupExportXlsxWriter(ExportXlsxWriter):
+
+ def __init__(self, fields, row_count=0):
+ super().__init__([f['label'].strip() for f in fields], row_count)
+ self.fields = fields
+
+ def write_group(self, row, column, group_name, group, group_depth=0):
+ group_name = group_name[1] if isinstance(group_name, tuple) and len(group_name) > 1 else group_name
+ if group._groupby_type[group_depth] != 'boolean':
+ group_name = group_name or _("Undefined")
+ row, column = self._write_group_header(row, column, group_name, group, group_depth)
+
+ # Recursively write sub-groups
+ for child_group_name, child_group in group.children.items():
+ row, column = self.write_group(row, column, child_group_name, child_group, group_depth + 1)
+
+ for record in group.data:
+ row, column = self._write_row(row, column, record)
+ return row, column
+
+ def _write_row(self, row, column, data):
+ for value in data:
+ self.write_cell(row, column, value)
+ column += 1
+ return row + 1, 0
+
+ def _write_group_header(self, row, column, label, group, group_depth=0):
+ aggregates = group.aggregated_values
+
+ label = '%s%s (%s)' % (' ' * group_depth, label, group.count)
+ self.write(row, column, label, self.header_bold_style)
+ if any(f.get('type') == 'monetary' for f in self.fields[1:]):
+
+ decimal_places = [res['decimal_places'] for res in group._model.env['res.currency'].search_read([], ['decimal_places'])]
+ decimal_places = max(decimal_places) if decimal_places else 2
+ for field in self.fields[1:]: # No aggregates allowed in the first column because of the group title
+ column += 1
+ aggregated_value = aggregates.get(field['name'])
+ # Float fields may not be displayed properly because of float
+ # representation issue with non stored fields or with values
+ # that, even stored, cannot be rounded properly and it is not
+ # acceptable to display useless digits (i.e. monetary)
+ #
+ # non stored field -> we force 2 digits
+ # stored monetary -> we force max digits of installed currencies
+ if isinstance(aggregated_value, float):
+ if field.get('type') == 'monetary':
+ aggregated_value = float_repr(aggregated_value, decimal_places)
+ elif not field.get('store'):
+ aggregated_value = float_repr(aggregated_value, 2)
+ self.write(row, column, str(aggregated_value if aggregated_value is not None else ''), self.header_bold_style)
+ return row + 1, 0
+
+
+#----------------------------------------------------------
+# Odoo Web web Controllers
+#----------------------------------------------------------
+class Home(http.Controller):
+
+ @http.route('/', type='http', auth="none")
+ def index(self, s_action=None, db=None, **kw):
+ return http.local_redirect('/web', query=request.params, keep_hash=True)
+
+ # ideally, this route should be `auth="user"` but that don't work in non-monodb mode.
+ @http.route('/web', type='http', auth="none")
+ def web_client(self, s_action=None, **kw):
+ ensure_db()
+ if not request.session.uid:
+ return werkzeug.utils.redirect('/web/login', 303)
+ if kw.get('redirect'):
+ return werkzeug.utils.redirect(kw.get('redirect'), 303)
+
+ request.uid = request.session.uid
+ try:
+ context = request.env['ir.http'].webclient_rendering_context()
+ response = request.render('web.webclient_bootstrap', qcontext=context)
+ response.headers['X-Frame-Options'] = 'DENY'
+ return response
+ except AccessError:
+ return werkzeug.utils.redirect('/web/login?error=access')
+
+ @http.route('/web/webclient/load_menus/<string:unique>', type='http', auth='user', methods=['GET'])
+ def web_load_menus(self, unique):
+ """
+ Loads the menus for the webclient
+ :param unique: this parameters is not used, but mandatory: it is used by the HTTP stack to make a unique request
+ :return: the menus (including the images in Base64)
+ """
+ menus = request.env["ir.ui.menu"].load_menus(request.session.debug)
+ body = json.dumps(menus, default=ustr)
+ response = request.make_response(body, [
+ # this method must specify a content-type application/json instead of using the default text/html set because
+ # the type of the route is set to HTTP, but the rpc is made with a get and expects JSON
+ ('Content-Type', 'application/json'),
+ ('Cache-Control', 'public, max-age=' + str(CONTENT_MAXAGE)),
+ ])
+ return response
+
+ def _login_redirect(self, uid, redirect=None):
+ return _get_login_redirect_url(uid, redirect)
+
+ @http.route('/web/login', type='http', auth="none")
+ def web_login(self, redirect=None, **kw):
+ ensure_db()
+ request.params['login_success'] = False
+ if request.httprequest.method == 'GET' and redirect and request.session.uid:
+ return http.redirect_with_hash(redirect)
+
+ if not request.uid:
+ request.uid = odoo.SUPERUSER_ID
+
+ values = request.params.copy()
+ try:
+ values['databases'] = http.db_list()
+ except odoo.exceptions.AccessDenied:
+ values['databases'] = None
+
+ if request.httprequest.method == 'POST':
+ old_uid = request.uid
+ try:
+ uid = request.session.authenticate(request.session.db, request.params['login'], request.params['password'])
+ request.params['login_success'] = True
+ return http.redirect_with_hash(self._login_redirect(uid, redirect=redirect))
+ except odoo.exceptions.AccessDenied as e:
+ request.uid = old_uid
+ if e.args == odoo.exceptions.AccessDenied().args:
+ values['error'] = _("Wrong login/password")
+ else:
+ values['error'] = e.args[0]
+ else:
+ if 'error' in request.params and request.params.get('error') == 'access':
+ values['error'] = _('Only employees can access this database. Please contact the administrator.')
+
+ if 'login' not in values and request.session.get('auth_login'):
+ values['login'] = request.session.get('auth_login')
+
+ if not odoo.tools.config['list_db']:
+ values['disable_database_manager'] = True
+
+ response = request.render('web.login', values)
+ response.headers['X-Frame-Options'] = 'DENY'
+ return response
+
+ @http.route('/web/become', type='http', auth='user', sitemap=False)
+ def switch_to_admin(self):
+ uid = request.env.user.id
+ if request.env.user._is_system():
+ uid = request.session.uid = odoo.SUPERUSER_ID
+ # invalidate session token cache as we've changed the uid
+ request.env['res.users'].clear_caches()
+ request.session.session_token = security.compute_session_token(request.session, request.env)
+
+ return http.local_redirect(self._login_redirect(uid), keep_hash=True)
+
+class WebClient(http.Controller):
+
+ @http.route('/web/webclient/csslist', type='json', auth="none")
+ def csslist(self, mods=None):
+ return manifest_list('css', mods=mods)
+
+ @http.route('/web/webclient/jslist', type='json', auth="none")
+ def jslist(self, mods=None):
+ return manifest_list('js', mods=mods)
+
+ @http.route('/web/webclient/locale/<string:lang>', type='http', auth="none")
+ def load_locale(self, lang):
+ magic_file_finding = [lang.replace("_", '-').lower(), lang.split('_')[0]]
+ for code in magic_file_finding:
+ try:
+ return http.Response(
+ werkzeug.wsgi.wrap_file(
+ request.httprequest.environ,
+ file_open('web/static/lib/moment/locale/%s.js' % code, 'rb')
+ ),
+ content_type='application/javascript; charset=utf-8',
+ headers=[('Cache-Control', 'max-age=%s' % http.STATIC_CACHE)],
+ direct_passthrough=True,
+ )
+ except IOError:
+ _logger.debug("No moment locale for code %s", code)
+
+ return request.make_response("", headers=[
+ ('Content-Type', 'application/javascript'),
+ ('Cache-Control', 'max-age=%s' % http.STATIC_CACHE),
+ ])
+
+ @http.route('/web/webclient/qweb/<string:unique>', type='http', auth="none", cors="*")
+ def qweb(self, unique, mods=None, db=None):
+ content = HomeStaticTemplateHelpers.get_qweb_templates(mods, db, debug=request.session.debug)
+
+ return request.make_response(content, [
+ ('Content-Type', 'text/xml'),
+ ('Cache-Control','public, max-age=' + str(CONTENT_MAXAGE))
+ ])
+
+ @http.route('/web/webclient/bootstrap_translations', type='json', auth="none")
+ def bootstrap_translations(self, mods):
+ """ Load local translations from *.po files, as a temporary solution
+ until we have established a valid session. This is meant only
+ for translating the login page and db management chrome, using
+ the browser's language. """
+ # For performance reasons we only load a single translation, so for
+ # sub-languages (that should only be partially translated) we load the
+ # main language PO instead - that should be enough for the login screen.
+ context = dict(request.context)
+ request.session._fix_lang(context)
+ lang = context['lang'].split('_')[0]
+
+ translations_per_module = {}
+ for addon_name in mods:
+ if http.addons_manifest[addon_name].get('bootstrap'):
+ addons_path = http.addons_manifest[addon_name]['addons_path']
+ f_name = os.path.join(addons_path, addon_name, "i18n", lang + ".po")
+ if not os.path.exists(f_name):
+ continue
+ translations_per_module[addon_name] = {'messages': _local_web_translations(f_name)}
+
+ return {"modules": translations_per_module,
+ "lang_parameters": None}
+
+ @http.route('/web/webclient/translations/<string:unique>', type='http', auth="public")
+ def translations(self, unique, mods=None, lang=None):
+ """
+ Load the translations for the specified language and modules
+
+ :param unique: this parameters is not used, but mandatory: it is used by the HTTP stack to make a unique request
+ :param mods: the modules, a comma separated list
+ :param lang: the language of the user
+ :return:
+ """
+ request.disable_db = False
+
+ if mods:
+ mods = mods.split(',')
+ translations_per_module, lang_params = request.env["ir.translation"].get_translations_for_webclient(mods, lang)
+
+ body = json.dumps({
+ 'lang': lang,
+ 'lang_parameters': lang_params,
+ 'modules': translations_per_module,
+ 'multi_lang': len(request.env['res.lang'].sudo().get_installed()) > 1,
+ })
+ response = request.make_response(body, [
+ # this method must specify a content-type application/json instead of using the default text/html set because
+ # the type of the route is set to HTTP, but the rpc is made with a get and expects JSON
+ ('Content-Type', 'application/json'),
+ ('Cache-Control', 'public, max-age=' + str(CONTENT_MAXAGE)),
+ ])
+ return response
+
+ @http.route('/web/webclient/version_info', type='json', auth="none")
+ def version_info(self):
+ return odoo.service.common.exp_version()
+
+ @http.route('/web/tests', type='http', auth="user")
+ def test_suite(self, mod=None, **kwargs):
+ return request.render('web.qunit_suite')
+
+ @http.route('/web/tests/mobile', type='http', auth="none")
+ def test_mobile_suite(self, mod=None, **kwargs):
+ return request.render('web.qunit_mobile_suite')
+
+ @http.route('/web/benchmarks', type='http', auth="none")
+ def benchmarks(self, mod=None, **kwargs):
+ return request.render('web.benchmark_suite')
+
+
+class Proxy(http.Controller):
+
+ @http.route('/web/proxy/post/<path:path>', type='http', auth='user', methods=['GET'])
+ def post(self, path):
+ """Effectively execute a POST request that was hooked through user login"""
+ with request.session.load_request_data() as data:
+ if not data:
+ raise werkzeug.exceptions.BadRequest()
+ from werkzeug.test import Client
+ from werkzeug.wrappers import BaseResponse
+ base_url = request.httprequest.base_url
+ query_string = request.httprequest.query_string
+ client = Client(request.httprequest.app, BaseResponse)
+ headers = {'X-Openerp-Session-Id': request.session.sid}
+ return client.post('/' + path, base_url=base_url, query_string=query_string,
+ headers=headers, data=data)
+
+class Database(http.Controller):
+
+ def _render_template(self, **d):
+ d.setdefault('manage',True)
+ d['insecure'] = odoo.tools.config.verify_admin_password('admin')
+ d['list_db'] = odoo.tools.config['list_db']
+ d['langs'] = odoo.service.db.exp_list_lang()
+ d['countries'] = odoo.service.db.exp_list_countries()
+ d['pattern'] = DBNAME_PATTERN
+ # databases list
+ d['databases'] = []
+ try:
+ d['databases'] = http.db_list()
+ d['incompatible_databases'] = odoo.service.db.list_db_incompatible(d['databases'])
+ except odoo.exceptions.AccessDenied:
+ monodb = db_monodb()
+ if monodb:
+ d['databases'] = [monodb]
+ return env.get_template("database_manager.html").render(d)
+
+ @http.route('/web/database/selector', type='http', auth="none")
+ def selector(self, **kw):
+ request._cr = None
+ return self._render_template(manage=False)
+
+ @http.route('/web/database/manager', type='http', auth="none")
+ def manager(self, **kw):
+ request._cr = None
+ return self._render_template()
+
+ @http.route('/web/database/create', type='http', auth="none", methods=['POST'], csrf=False)
+ def create(self, master_pwd, name, lang, password, **post):
+ insecure = odoo.tools.config.verify_admin_password('admin')
+ if insecure and master_pwd:
+ dispatch_rpc('db', 'change_admin_password', ["admin", master_pwd])
+ try:
+ if not re.match(DBNAME_PATTERN, name):
+ raise Exception(_('Invalid database name. Only alphanumerical characters, underscore, hyphen and dot are allowed.'))
+ # country code could be = "False" which is actually True in python
+ country_code = post.get('country_code') or False
+ dispatch_rpc('db', 'create_database', [master_pwd, name, bool(post.get('demo')), lang, password, post['login'], country_code, post['phone']])
+ request.session.authenticate(name, post['login'], password)
+ return http.local_redirect('/web/')
+ except Exception as e:
+ error = "Database creation error: %s" % (str(e) or repr(e))
+ return self._render_template(error=error)
+
+ @http.route('/web/database/duplicate', type='http', auth="none", methods=['POST'], csrf=False)
+ def duplicate(self, master_pwd, name, new_name):
+ insecure = odoo.tools.config.verify_admin_password('admin')
+ if insecure and master_pwd:
+ dispatch_rpc('db', 'change_admin_password', ["admin", master_pwd])
+ try:
+ if not re.match(DBNAME_PATTERN, new_name):
+ raise Exception(_('Invalid database name. Only alphanumerical characters, underscore, hyphen and dot are allowed.'))
+ dispatch_rpc('db', 'duplicate_database', [master_pwd, name, new_name])
+ request._cr = None # duplicating a database leads to an unusable cursor
+ return http.local_redirect('/web/database/manager')
+ except Exception as e:
+ error = "Database duplication error: %s" % (str(e) or repr(e))
+ return self._render_template(error=error)
+
+ @http.route('/web/database/drop', type='http', auth="none", methods=['POST'], csrf=False)
+ def drop(self, master_pwd, name):
+ insecure = odoo.tools.config.verify_admin_password('admin')
+ if insecure and master_pwd:
+ dispatch_rpc('db', 'change_admin_password', ["admin", master_pwd])
+ try:
+ dispatch_rpc('db','drop', [master_pwd, name])
+ request._cr = None # dropping a database leads to an unusable cursor
+ return http.local_redirect('/web/database/manager')
+ except Exception as e:
+ error = "Database deletion error: %s" % (str(e) or repr(e))
+ return self._render_template(error=error)
+
+ @http.route('/web/database/backup', type='http', auth="none", methods=['POST'], csrf=False)
+ def backup(self, master_pwd, name, backup_format = 'zip'):
+ insecure = odoo.tools.config.verify_admin_password('admin')
+ if insecure and master_pwd:
+ dispatch_rpc('db', 'change_admin_password', ["admin", master_pwd])
+ try:
+ odoo.service.db.check_super(master_pwd)
+ ts = datetime.datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S")
+ filename = "%s_%s.%s" % (name, ts, backup_format)
+ headers = [
+ ('Content-Type', 'application/octet-stream; charset=binary'),
+ ('Content-Disposition', content_disposition(filename)),
+ ]
+ dump_stream = odoo.service.db.dump_db(name, None, backup_format)
+ response = werkzeug.wrappers.Response(dump_stream, headers=headers, direct_passthrough=True)
+ return response
+ except Exception as e:
+ _logger.exception('Database.backup')
+ error = "Database backup error: %s" % (str(e) or repr(e))
+ return self._render_template(error=error)
+
+ @http.route('/web/database/restore', type='http', auth="none", methods=['POST'], csrf=False)
+ def restore(self, master_pwd, backup_file, name, copy=False):
+ insecure = odoo.tools.config.verify_admin_password('admin')
+ if insecure and master_pwd:
+ dispatch_rpc('db', 'change_admin_password', ["admin", master_pwd])
+ try:
+ data_file = None
+ db.check_super(master_pwd)
+ with tempfile.NamedTemporaryFile(delete=False) as data_file:
+ backup_file.save(data_file)
+ db.restore_db(name, data_file.name, str2bool(copy))
+ return http.local_redirect('/web/database/manager')
+ except Exception as e:
+ error = "Database restore error: %s" % (str(e) or repr(e))
+ return self._render_template(error=error)
+ finally:
+ if data_file:
+ os.unlink(data_file.name)
+
+ @http.route('/web/database/change_password', type='http', auth="none", methods=['POST'], csrf=False)
+ def change_password(self, master_pwd, master_pwd_new):
+ try:
+ dispatch_rpc('db', 'change_admin_password', [master_pwd, master_pwd_new])
+ return http.local_redirect('/web/database/manager')
+ except Exception as e:
+ error = "Master password update error: %s" % (str(e) or repr(e))
+ return self._render_template(error=error)
+
+ @http.route('/web/database/list', type='json', auth='none')
+ def list(self):
+ """
+ Used by Mobile application for listing database
+ :return: List of databases
+ :rtype: list
+ """
+ return http.db_list()
+
+class Session(http.Controller):
+
+ @http.route('/web/session/get_session_info', type='json', auth="none")
+ def get_session_info(self):
+ request.session.check_security()
+ request.uid = request.session.uid
+ request.disable_db = False
+ return request.env['ir.http'].session_info()
+
+ @http.route('/web/session/authenticate', type='json', auth="none")
+ def authenticate(self, db, login, password, base_location=None):
+ request.session.authenticate(db, login, password)
+ return request.env['ir.http'].session_info()
+
+ @http.route('/web/session/change_password', type='json', auth="user")
+ def change_password(self, fields):
+ old_password, new_password,confirm_password = operator.itemgetter('old_pwd', 'new_password','confirm_pwd')(
+ {f['name']: f['value'] for f in fields})
+ if not (old_password.strip() and new_password.strip() and confirm_password.strip()):
+ return {'error':_('You cannot leave any password empty.'),'title': _('Change Password')}
+ if new_password != confirm_password:
+ return {'error': _('The new password and its confirmation must be identical.'),'title': _('Change Password')}
+
+ msg = _("Error, password not changed !")
+ try:
+ if request.env['res.users'].change_password(old_password, new_password):
+ return {'new_password':new_password}
+ except AccessDenied as e:
+ msg = e.args[0]
+ if msg == AccessDenied().args[0]:
+ msg = _('The old password you provided is incorrect, your password was not changed.')
+ except UserError as e:
+ msg = e.args[0]
+ return {'title': _('Change Password'), 'error': msg}
+
+ @http.route('/web/session/get_lang_list', type='json', auth="none")
+ def get_lang_list(self):
+ try:
+ return dispatch_rpc('db', 'list_lang', []) or []
+ except Exception as e:
+ return {"error": e, "title": _("Languages")}
+
+ @http.route('/web/session/modules', type='json', auth="user")
+ def modules(self):
+ # return all installed modules. Web client is smart enough to not load a module twice
+ return module_installed(environment=request.env(user=odoo.SUPERUSER_ID))
+
+ @http.route('/web/session/save_session_action', type='json', auth="user")
+ def save_session_action(self, the_action):
+ """
+ This method store an action object in the session object and returns an integer
+ identifying that action. The method get_session_action() can be used to get
+ back the action.
+
+ :param the_action: The action to save in the session.
+ :type the_action: anything
+ :return: A key identifying the saved action.
+ :rtype: integer
+ """
+ return request.session.save_action(the_action)
+
+ @http.route('/web/session/get_session_action', type='json', auth="user")
+ def get_session_action(self, key):
+ """
+ Gets back a previously saved action. This method can return None if the action
+ was saved since too much time (this case should be handled in a smart way).
+
+ :param key: The key given by save_session_action()
+ :type key: integer
+ :return: The saved action or None.
+ :rtype: anything
+ """
+ return request.session.get_action(key)
+
+ @http.route('/web/session/check', type='json', auth="user")
+ def check(self):
+ request.session.check_security()
+ return None
+
+ @http.route('/web/session/account', type='json', auth="user")
+ def account(self):
+ ICP = request.env['ir.config_parameter'].sudo()
+ params = {
+ 'response_type': 'token',
+ 'client_id': ICP.get_param('database.uuid') or '',
+ 'state': json.dumps({'d': request.db, 'u': ICP.get_param('web.base.url')}),
+ 'scope': 'userinfo',
+ }
+ return 'https://accounts.odoo.com/oauth2/auth?' + url_encode(params)
+
+ @http.route('/web/session/destroy', type='json', auth="user")
+ def destroy(self):
+ request.session.logout()
+
+ @http.route('/web/session/logout', type='http', auth="none")
+ def logout(self, redirect='/web'):
+ request.session.logout(keep_db=True)
+ return werkzeug.utils.redirect(redirect, 303)
+
+
+class DataSet(http.Controller):
+
+ @http.route('/web/dataset/search_read', type='json', auth="user")
+ def search_read(self, model, fields=False, offset=0, limit=False, domain=None, sort=None):
+ return self.do_search_read(model, fields, offset, limit, domain, sort)
+
+ def do_search_read(self, model, fields=False, offset=0, limit=False, domain=None, sort=None):
+ """ Performs a search() followed by a read() (if needed) using the
+ provided search criteria
+
+ :param str model: the name of the model to search on
+ :param fields: a list of the fields to return in the result records
+ :type fields: [str]
+ :param int offset: from which index should the results start being returned
+ :param int limit: the maximum number of records to return
+ :param list domain: the search domain for the query
+ :param list sort: sorting directives
+ :returns: A structure (dict) with two keys: ids (all the ids matching
+ the (domain, context) pair) and records (paginated records
+ matching fields selection set)
+ :rtype: list
+ """
+ Model = request.env[model]
+ return Model.web_search_read(domain, fields, offset=offset, limit=limit, order=sort)
+
+ @http.route('/web/dataset/load', type='json', auth="user")
+ def load(self, model, id, fields):
+ value = {}
+ r = request.env[model].browse([id]).read()
+ if r:
+ value = r[0]
+ return {'value': value}
+
+ def call_common(self, model, method, args, domain_id=None, context_id=None):
+ return self._call_kw(model, method, args, {})
+
+ def _call_kw(self, model, method, args, kwargs):
+ check_method_name(method)
+ return call_kw(request.env[model], method, args, kwargs)
+
+ @http.route('/web/dataset/call', type='json', auth="user")
+ def call(self, model, method, args, domain_id=None, context_id=None):
+ return self._call_kw(model, method, args, {})
+
+ @http.route(['/web/dataset/call_kw', '/web/dataset/call_kw/<path:path>'], type='json', auth="user")
+ def call_kw(self, model, method, args, kwargs, path=None):
+ return self._call_kw(model, method, args, kwargs)
+
+ @http.route('/web/dataset/call_button', type='json', auth="user")
+ def call_button(self, model, method, args, kwargs):
+ action = self._call_kw(model, method, args, kwargs)
+ if isinstance(action, dict) and action.get('type') != '':
+ return clean_action(action, env=request.env)
+ return False
+
+ @http.route('/web/dataset/resequence', type='json', auth="user")
+ def resequence(self, model, ids, field='sequence', offset=0):
+ """ Re-sequences a number of records in the model, by their ids
+
+ The re-sequencing starts at the first model of ``ids``, the sequence
+ number is incremented by one after each record and starts at ``offset``
+
+ :param ids: identifiers of the records to resequence, in the new sequence order
+ :type ids: list(id)
+ :param str field: field used for sequence specification, defaults to
+ "sequence"
+ :param int offset: sequence number for first record in ``ids``, allows
+ starting the resequencing from an arbitrary number,
+ defaults to ``0``
+ """
+ m = request.env[model]
+ if not m.fields_get([field]):
+ return False
+ # python 2.6 has no start parameter
+ for i, record in enumerate(m.browse(ids)):
+ record.write({field: i + offset})
+ return True
+
+class View(http.Controller):
+
+ @http.route('/web/view/edit_custom', type='json', auth="user")
+ def edit_custom(self, custom_id, arch):
+ """
+ Edit a custom view
+
+ :param int custom_id: the id of the edited custom view
+ :param str arch: the edited arch of the custom view
+ :returns: dict with acknowledged operation (result set to True)
+ """
+ custom_view = request.env['ir.ui.view.custom'].browse(custom_id)
+ custom_view.write({ 'arch': arch })
+ return {'result': True}
+
+class Binary(http.Controller):
+
+ @staticmethod
+ def placeholder(image='placeholder.png'):
+ image_path = image.lstrip('/').split('/') if '/' in image else ['web', 'static', 'src', 'img', image]
+ with tools.file_open(get_resource_path(*image_path), 'rb') as fd:
+ return fd.read()
+
+ @http.route(['/web/content',
+ '/web/content/<string:xmlid>',
+ '/web/content/<string:xmlid>/<string:filename>',
+ '/web/content/<int:id>',
+ '/web/content/<int:id>/<string:filename>',
+ '/web/content/<int:id>-<string:unique>',
+ '/web/content/<int:id>-<string:unique>/<string:filename>',
+ '/web/content/<int:id>-<string:unique>/<path:extra>/<string:filename>',
+ '/web/content/<string:model>/<int:id>/<string:field>',
+ '/web/content/<string:model>/<int:id>/<string:field>/<string:filename>'], type='http', auth="public")
+ def content_common(self, xmlid=None, model='ir.attachment', id=None, field='datas',
+ filename=None, filename_field='name', unique=None, mimetype=None,
+ download=None, data=None, token=None, access_token=None, **kw):
+
+ status, headers, content = request.env['ir.http'].binary_content(
+ xmlid=xmlid, model=model, id=id, field=field, unique=unique, filename=filename,
+ filename_field=filename_field, download=download, mimetype=mimetype, access_token=access_token)
+
+ if status != 200:
+ return request.env['ir.http']._response_by_status(status, headers, content)
+ else:
+ content_base64 = base64.b64decode(content)
+ headers.append(('Content-Length', len(content_base64)))
+ response = request.make_response(content_base64, headers)
+ if token:
+ response.set_cookie('fileToken', token)
+ return response
+
+ @http.route(['/web/partner_image',
+ '/web/partner_image/<int:rec_id>',
+ '/web/partner_image/<int:rec_id>/<string:field>',
+ '/web/partner_image/<int:rec_id>/<string:field>/<string:model>/'], type='http', auth="public")
+ def content_image_partner(self, rec_id, field='image_128', model='res.partner', **kwargs):
+ # other kwargs are ignored on purpose
+ return self._content_image(id=rec_id, model='res.partner', field=field,
+ placeholder='user_placeholder.jpg')
+
+ @http.route(['/web/image',
+ '/web/image/<string:xmlid>',
+ '/web/image/<string:xmlid>/<string:filename>',
+ '/web/image/<string:xmlid>/<int:width>x<int:height>',
+ '/web/image/<string:xmlid>/<int:width>x<int:height>/<string:filename>',
+ '/web/image/<string:model>/<int:id>/<string:field>',
+ '/web/image/<string:model>/<int:id>/<string:field>/<string:filename>',
+ '/web/image/<string:model>/<int:id>/<string:field>/<int:width>x<int:height>',
+ '/web/image/<string:model>/<int:id>/<string:field>/<int:width>x<int:height>/<string:filename>',
+ '/web/image/<int:id>',
+ '/web/image/<int:id>/<string:filename>',
+ '/web/image/<int:id>/<int:width>x<int:height>',
+ '/web/image/<int:id>/<int:width>x<int:height>/<string:filename>',
+ '/web/image/<int:id>-<string:unique>',
+ '/web/image/<int:id>-<string:unique>/<string:filename>',
+ '/web/image/<int:id>-<string:unique>/<int:width>x<int:height>',
+ '/web/image/<int:id>-<string:unique>/<int:width>x<int:height>/<string:filename>'], type='http', auth="public")
+ def content_image(self, xmlid=None, model='ir.attachment', id=None, field='datas',
+ filename_field='name', unique=None, filename=None, mimetype=None,
+ download=None, width=0, height=0, crop=False, access_token=None,
+ **kwargs):
+ # other kwargs are ignored on purpose
+ return self._content_image(xmlid=xmlid, model=model, id=id, field=field,
+ filename_field=filename_field, unique=unique, filename=filename, mimetype=mimetype,
+ download=download, width=width, height=height, crop=crop,
+ quality=int(kwargs.get('quality', 0)), access_token=access_token)
+
+ def _content_image(self, xmlid=None, model='ir.attachment', id=None, field='datas',
+ filename_field='name', unique=None, filename=None, mimetype=None,
+ download=None, width=0, height=0, crop=False, quality=0, access_token=None,
+ placeholder=None, **kwargs):
+ status, headers, image_base64 = request.env['ir.http'].binary_content(
+ xmlid=xmlid, model=model, id=id, field=field, unique=unique, filename=filename,
+ filename_field=filename_field, download=download, mimetype=mimetype,
+ default_mimetype='image/png', access_token=access_token)
+
+ return Binary._content_image_get_response(
+ status, headers, image_base64, model=model, id=id, field=field, download=download,
+ width=width, height=height, crop=crop, quality=quality,
+ placeholder=placeholder)
+
+ @staticmethod
+ def _content_image_get_response(
+ status, headers, image_base64, model='ir.attachment', id=None,
+ field='datas', download=None, width=0, height=0, crop=False,
+ quality=0, placeholder='placeholder.png'):
+ if status in [301, 304] or (status != 200 and download):
+ return request.env['ir.http']._response_by_status(status, headers, image_base64)
+ if not image_base64:
+ if placeholder is None and model in request.env:
+ # Try to browse the record in case a specific placeholder
+ # is supposed to be used. (eg: Unassigned users on a task)
+ record = request.env[model].browse(int(id)) if id else request.env[model]
+ placeholder_filename = record._get_placeholder_filename(field=field)
+ placeholder_content = Binary.placeholder(image=placeholder_filename)
+ else:
+ placeholder_content = Binary.placeholder()
+ # Since we set a placeholder for any missing image, the status must be 200. In case one
+ # wants to configure a specific 404 page (e.g. though nginx), a 404 status will cause
+ # troubles.
+ status = 200
+ image_base64 = base64.b64encode(placeholder_content)
+
+ if not (width or height):
+ width, height = odoo.tools.image_guess_size_from_field_name(field)
+
+ try:
+ image_base64 = image_process(image_base64, size=(int(width), int(height)), crop=crop, quality=int(quality))
+ except Exception:
+ return request.not_found()
+
+ content = base64.b64decode(image_base64)
+ headers = http.set_safe_image_headers(headers, content)
+ response = request.make_response(content, headers)
+ response.status_code = status
+ return response
+
+ # backward compatibility
+ @http.route(['/web/binary/image'], type='http', auth="public")
+ def content_image_backward_compatibility(self, model, id, field, resize=None, **kw):
+ width = None
+ height = None
+ if resize:
+ width, height = resize.split(",")
+ return self.content_image(model=model, id=id, field=field, width=width, height=height)
+
+
+ @http.route('/web/binary/upload', type='http', auth="user")
+ @serialize_exception
+ def upload(self, ufile, callback=None):
+ # TODO: might be useful to have a configuration flag for max-length file uploads
+ out = """<script language="javascript" type="text/javascript">
+ var win = window.top.window;
+ win.jQuery(win).trigger(%s, %s);
+ </script>"""
+ try:
+ data = ufile.read()
+ args = [len(data), ufile.filename,
+ ufile.content_type, pycompat.to_text(base64.b64encode(data))]
+ except Exception as e:
+ args = [False, str(e)]
+ return out % (json.dumps(clean(callback)), json.dumps(args)) if callback else json.dumps(args)
+
+ @http.route('/web/binary/upload_attachment', type='http', auth="user")
+ @serialize_exception
+ def upload_attachment(self, model, id, ufile, callback=None):
+ files = request.httprequest.files.getlist('ufile')
+ Model = request.env['ir.attachment']
+ out = """<script language="javascript" type="text/javascript">
+ var win = window.top.window;
+ win.jQuery(win).trigger(%s, %s);
+ </script>"""
+ args = []
+ for ufile in files:
+
+ filename = ufile.filename
+ if request.httprequest.user_agent.browser == 'safari':
+ # Safari sends NFD UTF-8 (where é is composed by 'e' and [accent])
+ # we need to send it the same stuff, otherwise it'll fail
+ filename = unicodedata.normalize('NFD', ufile.filename)
+
+ try:
+ attachment = Model.create({
+ 'name': filename,
+ 'datas': base64.encodebytes(ufile.read()),
+ 'res_model': model,
+ 'res_id': int(id)
+ })
+ attachment._post_add_create()
+ except Exception:
+ args.append({'error': _("Something horrible happened")})
+ _logger.exception("Fail to upload attachment %s" % ufile.filename)
+ else:
+ args.append({
+ 'filename': clean(filename),
+ 'mimetype': ufile.content_type,
+ 'id': attachment.id,
+ 'size': attachment.file_size
+ })
+ return out % (json.dumps(clean(callback)), json.dumps(args)) if callback else json.dumps(args)
+
+ @http.route([
+ '/web/binary/company_logo',
+ '/logo',
+ '/logo.png',
+ ], type='http', auth="none", cors="*")
+ def company_logo(self, dbname=None, **kw):
+ imgname = 'logo'
+ imgext = '.png'
+ placeholder = functools.partial(get_resource_path, 'web', 'static', 'src', 'img')
+ uid = None
+ if request.session.db:
+ dbname = request.session.db
+ uid = request.session.uid
+ elif dbname is None:
+ dbname = db_monodb()
+
+ if not uid:
+ uid = odoo.SUPERUSER_ID
+
+ if not dbname:
+ response = http.send_file(placeholder(imgname + imgext))
+ else:
+ try:
+ # create an empty registry
+ registry = odoo.modules.registry.Registry(dbname)
+ with registry.cursor() as cr:
+ company = int(kw['company']) if kw and kw.get('company') else False
+ if company:
+ cr.execute("""SELECT logo_web, write_date
+ FROM res_company
+ WHERE id = %s
+ """, (company,))
+ else:
+ cr.execute("""SELECT c.logo_web, c.write_date
+ FROM res_users u
+ LEFT JOIN res_company c
+ ON c.id = u.company_id
+ WHERE u.id = %s
+ """, (uid,))
+ row = cr.fetchone()
+ if row and row[0]:
+ image_base64 = base64.b64decode(row[0])
+ image_data = io.BytesIO(image_base64)
+ mimetype = guess_mimetype(image_base64, default='image/png')
+ imgext = '.' + mimetype.split('/')[1]
+ if imgext == '.svg+xml':
+ imgext = '.svg'
+ response = http.send_file(image_data, filename=imgname + imgext, mimetype=mimetype, mtime=row[1])
+ else:
+ response = http.send_file(placeholder('nologo.png'))
+ except Exception:
+ response = http.send_file(placeholder(imgname + imgext))
+
+ return response
+
+ @http.route(['/web/sign/get_fonts','/web/sign/get_fonts/<string:fontname>'], type='json', auth='public')
+ def get_fonts(self, fontname=None):
+ """This route will return a list of base64 encoded fonts.
+
+ Those fonts will be proposed to the user when creating a signature
+ using mode 'auto'.
+
+ :return: base64 encoded fonts
+ :rtype: list
+ """
+
+
+ fonts = []
+ if fontname:
+ module_path = get_module_path('web')
+ fonts_folder_path = os.path.join(module_path, 'static/src/fonts/sign/')
+ module_resource_path = get_resource_path('web', 'static/src/fonts/sign/' + fontname)
+ if fonts_folder_path and module_resource_path:
+ fonts_folder_path = os.path.join(os.path.normpath(fonts_folder_path), '')
+ module_resource_path = os.path.normpath(module_resource_path)
+ if module_resource_path.startswith(fonts_folder_path):
+ with file_open(module_resource_path, 'rb') as font_file:
+ font = base64.b64encode(font_file.read())
+ fonts.append(font)
+ else:
+ current_dir = os.path.dirname(os.path.abspath(__file__))
+ fonts_directory = os.path.join(current_dir, '..', 'static', 'src', 'fonts', 'sign')
+ font_filenames = sorted([fn for fn in os.listdir(fonts_directory) if fn.endswith(('.ttf', '.otf', '.woff', '.woff2'))])
+
+ for filename in font_filenames:
+ font_file = open(os.path.join(fonts_directory, filename), 'rb')
+ font = base64.b64encode(font_file.read())
+ fonts.append(font)
+ return fonts
+
+class Action(http.Controller):
+
+ @http.route('/web/action/load', type='json', auth="user")
+ def load(self, action_id, additional_context=None):
+ Actions = request.env['ir.actions.actions']
+ value = False
+ try:
+ action_id = int(action_id)
+ except ValueError:
+ try:
+ action = request.env.ref(action_id)
+ assert action._name.startswith('ir.actions.')
+ action_id = action.id
+ except Exception:
+ action_id = 0 # force failed read
+
+ base_action = Actions.browse([action_id]).sudo().read(['type'])
+ if base_action:
+ ctx = dict(request.context)
+ action_type = base_action[0]['type']
+ if action_type == 'ir.actions.report':
+ ctx.update({'bin_size': True})
+ if additional_context:
+ ctx.update(additional_context)
+ request.context = ctx
+ action = request.env[action_type].sudo().browse([action_id]).read()
+ if action:
+ value = clean_action(action[0], env=request.env)
+ return value
+
+ @http.route('/web/action/run', type='json', auth="user")
+ def run(self, action_id):
+ action = request.env['ir.actions.server'].browse([action_id])
+ result = action.run()
+ return clean_action(result, env=action.env) if result else False
+
+class Export(http.Controller):
+
+ @http.route('/web/export/formats', type='json', auth="user")
+ def formats(self):
+ """ Returns all valid export formats
+
+ :returns: for each export format, a pair of identifier and printable name
+ :rtype: [(str, str)]
+ """
+ return [
+ {'tag': 'xlsx', 'label': 'XLSX', 'error': None if xlsxwriter else "XlsxWriter 0.9.3 required"},
+ {'tag': 'csv', 'label': 'CSV'},
+ ]
+
+ def fields_get(self, model):
+ Model = request.env[model]
+ fields = Model.fields_get()
+ return fields
+
+ @http.route('/web/export/get_fields', type='json', auth="user")
+ def get_fields(self, model, prefix='', parent_name= '',
+ import_compat=True, parent_field_type=None,
+ parent_field=None, exclude=None):
+
+ fields = self.fields_get(model)
+ if import_compat:
+ if parent_field_type in ['many2one', 'many2many']:
+ rec_name = request.env[model]._rec_name_fallback()
+ fields = {'id': fields['id'], rec_name: fields[rec_name]}
+ else:
+ fields['.id'] = {**fields['id']}
+
+ fields['id']['string'] = _('External ID')
+
+ if parent_field:
+ parent_field['string'] = _('External ID')
+ fields['id'] = parent_field
+
+ fields_sequence = sorted(fields.items(),
+ key=lambda field: odoo.tools.ustr(field[1].get('string', '').lower()))
+
+ records = []
+ for field_name, field in fields_sequence:
+ if import_compat and not field_name == 'id':
+ if exclude and field_name in exclude:
+ continue
+ if field.get('readonly'):
+ # If none of the field's states unsets readonly, skip the field
+ if all(dict(attrs).get('readonly', True)
+ for attrs in field.get('states', {}).values()):
+ continue
+ if not field.get('exportable', True):
+ continue
+
+ id = prefix + (prefix and '/'or '') + field_name
+ val = id
+ if field_name == 'name' and import_compat and parent_field_type in ['many2one', 'many2many']:
+ # Add name field when expand m2o and m2m fields in import-compatible mode
+ val = prefix
+ name = parent_name + (parent_name and '/' or '') + field['string']
+ record = {'id': id, 'string': name,
+ 'value': val, 'children': False,
+ 'field_type': field.get('type'),
+ 'required': field.get('required'),
+ 'relation_field': field.get('relation_field')}
+ records.append(record)
+
+ if len(id.split('/')) < 3 and 'relation' in field:
+ ref = field.pop('relation')
+ record['value'] += '/id'
+ record['params'] = {'model': ref, 'prefix': id, 'name': name, 'parent_field': field}
+ record['children'] = True
+
+ return records
+
+ @http.route('/web/export/namelist', type='json', auth="user")
+ def namelist(self, model, export_id):
+ # TODO: namelist really has no reason to be in Python (although itertools.groupby helps)
+ export = request.env['ir.exports'].browse([export_id]).read()[0]
+ export_fields_list = request.env['ir.exports.line'].browse(export['export_fields']).read()
+
+ fields_data = self.fields_info(
+ model, [f['name'] for f in export_fields_list])
+
+ return [
+ {'name': field['name'], 'label': fields_data[field['name']]}
+ for field in export_fields_list
+ ]
+
+ def fields_info(self, model, export_fields):
+ info = {}
+ fields = self.fields_get(model)
+ if ".id" in export_fields:
+ fields['.id'] = fields.get('id', {'string': 'ID'})
+
+ # To make fields retrieval more efficient, fetch all sub-fields of a
+ # given field at the same time. Because the order in the export list is
+ # arbitrary, this requires ordering all sub-fields of a given field
+ # together so they can be fetched at the same time
+ #
+ # Works the following way:
+ # * sort the list of fields to export, the default sorting order will
+ # put the field itself (if present, for xmlid) and all of its
+ # sub-fields right after it
+ # * then, group on: the first field of the path (which is the same for
+ # a field and for its subfields and the length of splitting on the
+ # first '/', which basically means grouping the field on one side and
+ # all of the subfields on the other. This way, we have the field (for
+ # the xmlid) with length 1, and all of the subfields with the same
+ # base but a length "flag" of 2
+ # * if we have a normal field (length 1), just add it to the info
+ # mapping (with its string) as-is
+ # * otherwise, recursively call fields_info via graft_subfields.
+ # all graft_subfields does is take the result of fields_info (on the
+ # field's model) and prepend the current base (current field), which
+ # rebuilds the whole sub-tree for the field
+ #
+ # result: because we're not fetching the fields_get for half the
+ # database models, fetching a namelist with a dozen fields (including
+ # relational data) falls from ~6s to ~300ms (on the leads model).
+ # export lists with no sub-fields (e.g. import_compatible lists with
+ # no o2m) are even more efficient (from the same 6s to ~170ms, as
+ # there's a single fields_get to execute)
+ for (base, length), subfields in itertools.groupby(
+ sorted(export_fields),
+ lambda field: (field.split('/', 1)[0], len(field.split('/', 1)))):
+ subfields = list(subfields)
+ if length == 2:
+ # subfields is a seq of $base/*rest, and not loaded yet
+ info.update(self.graft_subfields(
+ fields[base]['relation'], base, fields[base]['string'],
+ subfields
+ ))
+ elif base in fields:
+ info[base] = fields[base]['string']
+
+ return info
+
+ def graft_subfields(self, model, prefix, prefix_string, fields):
+ export_fields = [field.split('/', 1)[1] for field in fields]
+ return (
+ (prefix + '/' + k, prefix_string + '/' + v)
+ for k, v in self.fields_info(model, export_fields).items())
+
+class ExportFormat(object):
+
+ @property
+ def content_type(self):
+ """ Provides the format's content type """
+ raise NotImplementedError()
+
+ def filename(self, base):
+ """ Creates a valid filename for the format (with extension) from the
+ provided base name (exension-less)
+ """
+ raise NotImplementedError()
+
+ def from_data(self, fields, rows):
+ """ Conversion method from Odoo's export data to whatever the
+ current export class outputs
+
+ :params list fields: a list of fields to export
+ :params list rows: a list of records to export
+ :returns:
+ :rtype: bytes
+ """
+ raise NotImplementedError()
+
+ def from_group_data(self, fields, groups):
+ raise NotImplementedError()
+
+ def base(self, data, token):
+ params = json.loads(data)
+ model, fields, ids, domain, import_compat = \
+ operator.itemgetter('model', 'fields', 'ids', 'domain', 'import_compat')(params)
+
+ Model = request.env[model].with_context(**params.get('context', {}))
+ if not Model._is_an_ordinary_table():
+ fields = [field for field in fields if field['name'] != 'id']
+
+ field_names = [f['name'] for f in fields]
+ if import_compat:
+ columns_headers = field_names
+ else:
+ columns_headers = [val['label'].strip() for val in fields]
+
+ groupby = params.get('groupby')
+ if not import_compat and groupby:
+ groupby_type = [Model._fields[x.split(':')[0]].type for x in groupby]
+ domain = [('id', 'in', ids)] if ids else domain
+ groups_data = Model.read_group(domain, [x if x != '.id' else 'id' for x in field_names], groupby, lazy=False)
+
+ # read_group(lazy=False) returns a dict only for final groups (with actual data),
+ # not for intermediary groups. The full group tree must be re-constructed.
+ tree = GroupsTreeNode(Model, field_names, groupby, groupby_type)
+ for leaf in groups_data:
+ tree.insert_leaf(leaf)
+
+ response_data = self.from_group_data(fields, tree)
+ else:
+ Model = Model.with_context(import_compat=import_compat)
+ records = Model.browse(ids) if ids else Model.search(domain, offset=0, limit=False, order=False)
+
+ export_data = records.export_data(field_names).get('datas',[])
+ response_data = self.from_data(columns_headers, export_data)
+
+ return request.make_response(response_data,
+ headers=[('Content-Disposition',
+ content_disposition(self.filename(model))),
+ ('Content-Type', self.content_type)],
+ cookies={'fileToken': token})
+
+class CSVExport(ExportFormat, http.Controller):
+
+ @http.route('/web/export/csv', type='http', auth="user")
+ @serialize_exception
+ def index(self, data, token):
+ return self.base(data, token)
+
+ @property
+ def content_type(self):
+ return 'text/csv;charset=utf8'
+
+ def filename(self, base):
+ return base + '.csv'
+
+ def from_group_data(self, fields, groups):
+ raise UserError(_("Exporting grouped data to csv is not supported."))
+
+ def from_data(self, fields, rows):
+ fp = io.BytesIO()
+ writer = pycompat.csv_writer(fp, quoting=1)
+
+ writer.writerow(fields)
+
+ for data in rows:
+ row = []
+ for d in data:
+ # Spreadsheet apps tend to detect formulas on leading =, + and -
+ if isinstance(d, str) and d.startswith(('=', '-', '+')):
+ d = "'" + d
+
+ row.append(pycompat.to_text(d))
+ writer.writerow(row)
+
+ return fp.getvalue()
+
+class ExcelExport(ExportFormat, http.Controller):
+
+ @http.route('/web/export/xlsx', type='http', auth="user")
+ @serialize_exception
+ def index(self, data, token):
+ return self.base(data, token)
+
+ @property
+ def content_type(self):
+ return 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
+
+ def filename(self, base):
+ return base + '.xlsx'
+
+ def from_group_data(self, fields, groups):
+ with GroupExportXlsxWriter(fields, groups.count) as xlsx_writer:
+ x, y = 1, 0
+ for group_name, group in groups.children.items():
+ x, y = xlsx_writer.write_group(x, y, group_name, group)
+
+ return xlsx_writer.value
+
+ def from_data(self, fields, rows):
+ with ExportXlsxWriter(fields, len(rows)) as xlsx_writer:
+ for row_index, row in enumerate(rows):
+ for cell_index, cell_value in enumerate(row):
+ if isinstance(cell_value, (list, tuple)):
+ cell_value = pycompat.to_text(cell_value)
+ xlsx_writer.write_cell(row_index + 1, cell_index, cell_value)
+
+ return xlsx_writer.value
+
+
+class ReportController(http.Controller):
+
+ #------------------------------------------------------
+ # Report controllers
+ #------------------------------------------------------
+ @http.route([
+ '/report/<converter>/<reportname>',
+ '/report/<converter>/<reportname>/<docids>',
+ ], type='http', auth='user', website=True)
+ def report_routes(self, reportname, docids=None, converter=None, **data):
+ report = request.env['ir.actions.report']._get_report_from_name(reportname)
+ context = dict(request.env.context)
+
+ if docids:
+ docids = [int(i) for i in docids.split(',')]
+ if data.get('options'):
+ data.update(json.loads(data.pop('options')))
+ if data.get('context'):
+ # Ignore 'lang' here, because the context in data is the one from the webclient *but* if
+ # the user explicitely wants to change the lang, this mechanism overwrites it.
+ data['context'] = json.loads(data['context'])
+ if data['context'].get('lang') and not data.get('force_context_lang'):
+ del data['context']['lang']
+ context.update(data['context'])
+ if converter == 'html':
+ html = report.with_context(context)._render_qweb_html(docids, data=data)[0]
+ return request.make_response(html)
+ elif converter == 'pdf':
+ pdf = report.with_context(context)._render_qweb_pdf(docids, data=data)[0]
+ pdfhttpheaders = [('Content-Type', 'application/pdf'), ('Content-Length', len(pdf))]
+ return request.make_response(pdf, headers=pdfhttpheaders)
+ elif converter == 'text':
+ text = report.with_context(context)._render_qweb_text(docids, data=data)[0]
+ texthttpheaders = [('Content-Type', 'text/plain'), ('Content-Length', len(text))]
+ return request.make_response(text, headers=texthttpheaders)
+ else:
+ raise werkzeug.exceptions.HTTPException(description='Converter %s not implemented.' % converter)
+
+ #------------------------------------------------------
+ # Misc. route utils
+ #------------------------------------------------------
+ @http.route(['/report/barcode', '/report/barcode/<type>/<path:value>'], type='http', auth="public")
+ def report_barcode(self, type, value, width=600, height=100, humanreadable=0, quiet=1, mask=None):
+ """Contoller able to render barcode images thanks to reportlab.
+ Samples:
+ <img t-att-src="'/report/barcode/QR/%s' % o.name"/>
+ <img t-att-src="'/report/barcode/?type=%s&amp;value=%s&amp;width=%s&amp;height=%s' %
+ ('QR', o.name, 200, 200)"/>
+
+ :param type: Accepted types: 'Codabar', 'Code11', 'Code128', 'EAN13', 'EAN8', 'Extended39',
+ 'Extended93', 'FIM', 'I2of5', 'MSI', 'POSTNET', 'QR', 'Standard39', 'Standard93',
+ 'UPCA', 'USPS_4State'
+ :param humanreadable: Accepted values: 0 (default) or 1. 1 will insert the readable value
+ at the bottom of the output image
+ :param quiet: Accepted values: 0 (default) or 1. 1 will display white
+ margins on left and right.
+ :param mask: The mask code to be used when rendering this QR-code.
+ Masks allow adding elements on top of the generated image,
+ such as the Swiss cross in the center of QR-bill codes.
+ """
+ try:
+ barcode = request.env['ir.actions.report'].barcode(type, value, width=width,
+ height=height, humanreadable=humanreadable, quiet=quiet, mask=mask)
+ except (ValueError, AttributeError):
+ raise werkzeug.exceptions.HTTPException(description='Cannot convert into barcode.')
+
+ return request.make_response(barcode, headers=[('Content-Type', 'image/png')])
+
+ @http.route(['/report/download'], type='http', auth="user")
+ def report_download(self, data, token, context=None):
+ """This function is used by 'action_manager_report.js' in order to trigger the download of
+ a pdf/controller report.
+
+ :param data: a javascript array JSON.stringified containg report internal url ([0]) and
+ type [1]
+ :returns: Response with a filetoken cookie and an attachment header
+ """
+ requestcontent = json.loads(data)
+ url, type = requestcontent[0], requestcontent[1]
+ try:
+ if type in ['qweb-pdf', 'qweb-text']:
+ converter = 'pdf' if type == 'qweb-pdf' else 'text'
+ extension = 'pdf' if type == 'qweb-pdf' else 'txt'
+
+ pattern = '/report/pdf/' if type == 'qweb-pdf' else '/report/text/'
+ reportname = url.split(pattern)[1].split('?')[0]
+
+ docids = None
+ if '/' in reportname:
+ reportname, docids = reportname.split('/')
+
+ if docids:
+ # Generic report:
+ response = self.report_routes(reportname, docids=docids, converter=converter, context=context)
+ else:
+ # Particular report:
+ data = dict(url_decode(url.split('?')[1]).items()) # decoding the args represented in JSON
+ if 'context' in data:
+ context, data_context = json.loads(context or '{}'), json.loads(data.pop('context'))
+ context = json.dumps({**context, **data_context})
+ response = self.report_routes(reportname, converter=converter, context=context, **data)
+
+ report = request.env['ir.actions.report']._get_report_from_name(reportname)
+ filename = "%s.%s" % (report.name, extension)
+
+ if docids:
+ ids = [int(x) for x in docids.split(",")]
+ obj = request.env[report.model].browse(ids)
+ if report.print_report_name and not len(obj) > 1:
+ report_name = safe_eval(report.print_report_name, {'object': obj, 'time': time})
+ filename = "%s.%s" % (report_name, extension)
+ response.headers.add('Content-Disposition', content_disposition(filename))
+ response.set_cookie('fileToken', token)
+ return response
+ else:
+ return
+ except Exception as e:
+ se = _serialize_exception(e)
+ error = {
+ 'code': 200,
+ 'message': "Odoo Server Error",
+ 'data': se
+ }
+ return request.make_response(html_escape(json.dumps(error)))
+
+ @http.route(['/report/check_wkhtmltopdf'], type='json', auth="user")
+ def check_wkhtmltopdf(self):
+ return request.env['ir.actions.report'].get_wkhtmltopdf_state()
diff --git a/addons/web/controllers/pivot.py b/addons/web/controllers/pivot.py
new file mode 100644
index 00000000..addb8f2b
--- /dev/null
+++ b/addons/web/controllers/pivot.py
@@ -0,0 +1,110 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from collections import deque
+import io
+import json
+
+from odoo import http
+from odoo.http import request
+from odoo.tools import ustr
+from odoo.tools.misc import xlsxwriter
+
+
+class TableExporter(http.Controller):
+
+ @http.route('/web/pivot/check_xlsxwriter', type='json', auth='none')
+ def check_xlsxwriter(self):
+ return xlsxwriter is not None
+
+ @http.route('/web/pivot/export_xlsx', type='http', auth="user")
+ def export_xlsx(self, data, token):
+ jdata = json.loads(data)
+ output = io.BytesIO()
+ workbook = xlsxwriter.Workbook(output, {'in_memory': True})
+ worksheet = workbook.add_worksheet(jdata['title'])
+
+ header_bold = workbook.add_format({'bold': True, 'pattern': 1, 'bg_color': '#AAAAAA'})
+ header_plain = workbook.add_format({'pattern': 1, 'bg_color': '#AAAAAA'})
+ bold = workbook.add_format({'bold': True})
+
+ measure_count = jdata['measure_count']
+ origin_count = jdata['origin_count']
+
+ # Step 1: writing col group headers
+ col_group_headers = jdata['col_group_headers']
+
+ # x,y: current coordinates
+ # carry: queue containing cell information when a cell has a >= 2 height
+ # and the drawing code needs to add empty cells below
+ x, y, carry = 1, 0, deque()
+ for i, header_row in enumerate(col_group_headers):
+ worksheet.write(i, 0, '', header_plain)
+ for header in header_row:
+ while (carry and carry[0]['x'] == x):
+ cell = carry.popleft()
+ for j in range(measure_count * (2 * origin_count - 1)):
+ worksheet.write(y, x+j, '', header_plain)
+ if cell['height'] > 1:
+ carry.append({'x': x, 'height': cell['height'] - 1})
+ x = x + measure_count * (2 * origin_count - 1)
+ for j in range(header['width']):
+ worksheet.write(y, x + j, header['title'] if j == 0 else '', header_plain)
+ if header['height'] > 1:
+ carry.append({'x': x, 'height': header['height'] - 1})
+ x = x + header['width']
+ while (carry and carry[0]['x'] == x):
+ cell = carry.popleft()
+ for j in range(measure_count * (2 * origin_count - 1)):
+ worksheet.write(y, x+j, '', header_plain)
+ if cell['height'] > 1:
+ carry.append({'x': x, 'height': cell['height'] - 1})
+ x = x + measure_count * (2 * origin_count - 1)
+ x, y = 1, y + 1
+
+ # Step 2: writing measure headers
+ measure_headers = jdata['measure_headers']
+
+ if measure_headers:
+ worksheet.write(y, 0, '', header_plain)
+ for measure in measure_headers:
+ style = header_bold if measure['is_bold'] else header_plain
+ worksheet.write(y, x, measure['title'], style)
+ for i in range(1, 2 * origin_count - 1):
+ worksheet.write(y, x+i, '', header_plain)
+ x = x + (2 * origin_count - 1)
+ x, y = 1, y + 1
+ # set minimum width of cells to 16 which is around 88px
+ worksheet.set_column(0, len(measure_headers), 16)
+
+ # Step 3: writing origin headers
+ origin_headers = jdata['origin_headers']
+
+ if origin_headers:
+ worksheet.write(y, 0, '', header_plain)
+ for origin in origin_headers:
+ style = header_bold if origin['is_bold'] else header_plain
+ worksheet.write(y, x, origin['title'], style)
+ x = x + 1
+ y = y + 1
+
+ # Step 4: writing data
+ x = 0
+ for row in jdata['rows']:
+ worksheet.write(y, x, row['indent'] * ' ' + ustr(row['title']), header_plain)
+ for cell in row['values']:
+ x = x + 1
+ if cell.get('is_bold', False):
+ worksheet.write(y, x, cell['value'], bold)
+ else:
+ worksheet.write(y, x, cell['value'])
+ x, y = 0, y + 1
+
+ workbook.close()
+ xlsx_data = output.getvalue()
+ response = request.make_response(xlsx_data,
+ headers=[('Content-Type', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'),
+ ('Content-Disposition', 'attachment; filename=table.xlsx')],
+ cookies={'fileToken': token})
+
+ return response