summaryrefslogtreecommitdiff
path: root/addons/survey/models
diff options
context:
space:
mode:
authorstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
committerstephanchrst <stephanchrst@gmail.com>2022-05-10 21:51:50 +0700
commit3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch)
treea44932296ef4a9b71d5f010906253d8c53727726 /addons/survey/models
parent0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff)
initial commit 2
Diffstat (limited to 'addons/survey/models')
-rw-r--r--addons/survey/models/__init__.py9
-rw-r--r--addons/survey/models/badge.py16
-rw-r--r--addons/survey/models/challenge.py12
-rw-r--r--addons/survey/models/res_partner.py32
-rw-r--r--addons/survey/models/survey_question.py560
-rw-r--r--addons/survey/models/survey_survey.py1066
-rw-r--r--addons/survey/models/survey_user.py628
7 files changed, 2323 insertions, 0 deletions
diff --git a/addons/survey/models/__init__.py b/addons/survey/models/__init__.py
new file mode 100644
index 00000000..82414c70
--- /dev/null
+++ b/addons/survey/models/__init__.py
@@ -0,0 +1,9 @@
+# -*- encoding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from . import survey_survey
+from . import survey_question
+from . import survey_user
+from . import badge
+from . import challenge
+from . import res_partner
diff --git a/addons/survey/models/badge.py b/addons/survey/models/badge.py
new file mode 100644
index 00000000..7ed12731
--- /dev/null
+++ b/addons/survey/models/badge.py
@@ -0,0 +1,16 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from odoo import api, fields, models
+
+
+class GamificationBadge(models.Model):
+ _inherit = 'gamification.badge'
+
+ survey_ids = fields.One2many('survey.survey', 'certification_badge_id', 'Survey Ids')
+ survey_id = fields.Many2one('survey.survey', 'Survey', compute='_compute_survey_id', store=True)
+
+ @api.depends('survey_ids.certification_badge_id')
+ def _compute_survey_id(self):
+ for badge in self:
+ badge.survey_id = badge.survey_ids[0] if badge.survey_ids else None
diff --git a/addons/survey/models/challenge.py b/addons/survey/models/challenge.py
new file mode 100644
index 00000000..23b54c16
--- /dev/null
+++ b/addons/survey/models/challenge.py
@@ -0,0 +1,12 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from odoo import models, fields
+
+
+class Challenge(models.Model):
+ _inherit = 'gamification.challenge'
+
+ challenge_category = fields.Selection(selection_add=[
+ ('certification', 'Certifications')
+ ], ondelete={'certification': 'set default'})
diff --git a/addons/survey/models/res_partner.py b/addons/survey/models/res_partner.py
new file mode 100644
index 00000000..1c7a5e1e
--- /dev/null
+++ b/addons/survey/models/res_partner.py
@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from odoo import api, fields, models
+
+
+class ResPartner(models.Model):
+ _inherit = 'res.partner'
+
+ certifications_count = fields.Integer('Certifications Count', compute='_compute_certifications_count')
+ certifications_company_count = fields.Integer('Company Certifications Count', compute='_compute_certifications_company_count')
+
+ @api.depends('is_company')
+ def _compute_certifications_count(self):
+ read_group_res = self.env['survey.user_input'].sudo().read_group(
+ [('partner_id', 'in', self.ids), ('scoring_success', '=', True)],
+ ['partner_id'], 'partner_id'
+ )
+ data = dict((res['partner_id'][0], res['partner_id_count']) for res in read_group_res)
+ for partner in self:
+ partner.certifications_count = data.get(partner.id, 0)
+
+ @api.depends('is_company', 'child_ids.certifications_count')
+ def _compute_certifications_company_count(self):
+ self.certifications_company_count = sum(child.certifications_count for child in self.child_ids)
+
+ def action_view_certifications(self):
+ action = self.env["ir.actions.actions"]._for_xml_id("survey.res_partner_action_certifications")
+ action['view_mode'] = 'tree'
+ action['domain'] = ['|', ('partner_id', 'in', self.ids), ('partner_id', 'in', self.child_ids.ids)]
+
+ return action
diff --git a/addons/survey/models/survey_question.py b/addons/survey/models/survey_question.py
new file mode 100644
index 00000000..e1d9d2d9
--- /dev/null
+++ b/addons/survey/models/survey_question.py
@@ -0,0 +1,560 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+import collections
+import json
+import itertools
+import operator
+
+from odoo import api, fields, models, tools, _
+from odoo.exceptions import ValidationError
+
+
+class SurveyQuestion(models.Model):
+ """ Questions that will be asked in a survey.
+
+ Each question can have one of more suggested answers (eg. in case of
+ multi-answer checkboxes, radio buttons...).
+
+ Technical note:
+
+ survey.question is also the model used for the survey's pages (with the "is_page" field set to True).
+
+ A page corresponds to a "section" in the interface, and the fact that it separates the survey in
+ actual pages in the interface depends on the "questions_layout" parameter on the survey.survey model.
+ Pages are also used when randomizing questions. The randomization can happen within a "page".
+
+ Using the same model for questions and pages allows to put all the pages and questions together in a o2m field
+ (see survey.survey.question_and_page_ids) on the view side and easily reorganize your survey by dragging the
+ items around.
+
+ It also removes on level of encoding by directly having 'Add a page' and 'Add a question'
+ links on the tree view of questions, enabling a faster encoding.
+
+ However, this has the downside of making the code reading a little bit more complicated.
+ Efforts were made at the model level to create computed fields so that the use of these models
+ still seems somewhat logical. That means:
+ - A survey still has "page_ids" (question_and_page_ids filtered on is_page = True)
+ - These "page_ids" still have question_ids (questions located between this page and the next)
+ - These "question_ids" still have a "page_id"
+
+ That makes the use and display of these information at view and controller levels easier to understand.
+ """
+ _name = 'survey.question'
+ _description = 'Survey Question'
+ _rec_name = 'title'
+ _order = 'sequence,id'
+
+ @api.model
+ def default_get(self, fields):
+ defaults = super(SurveyQuestion, self).default_get(fields)
+ if (not fields or 'question_type' in fields):
+ defaults['question_type'] = False if defaults.get('is_page') == True else 'text_box'
+ return defaults
+
+ # question generic data
+ title = fields.Char('Title', required=True, translate=True)
+ description = fields.Html(
+ 'Description', translate=True, sanitize=False, # TDE TODO: sanitize but find a way to keep youtube iframe media stuff
+ help="Use this field to add additional explanations about your question or to illustrate it with pictures or a video")
+ survey_id = fields.Many2one('survey.survey', string='Survey', ondelete='cascade')
+ scoring_type = fields.Selection(related='survey_id.scoring_type', string='Scoring Type', readonly=True)
+ sequence = fields.Integer('Sequence', default=10)
+ # page specific
+ is_page = fields.Boolean('Is a page?')
+ question_ids = fields.One2many('survey.question', string='Questions', compute="_compute_question_ids")
+ questions_selection = fields.Selection(
+ related='survey_id.questions_selection', readonly=True,
+ help="If randomized is selected, add the number of random questions next to the section.")
+ random_questions_count = fields.Integer(
+ 'Random questions count', default=1,
+ help="Used on randomized sections to take X random questions from all the questions of that section.")
+ # question specific
+ page_id = fields.Many2one('survey.question', string='Page', compute="_compute_page_id", store=True)
+ question_type = fields.Selection([
+ ('text_box', 'Multiple Lines Text Box'),
+ ('char_box', 'Single Line Text Box'),
+ ('numerical_box', 'Numerical Value'),
+ ('date', 'Date'),
+ ('datetime', 'Datetime'),
+ ('simple_choice', 'Multiple choice: only one answer'),
+ ('multiple_choice', 'Multiple choice: multiple answers allowed'),
+ ('matrix', 'Matrix')], string='Question Type',
+ compute='_compute_question_type', readonly=False, store=True)
+ is_scored_question = fields.Boolean(
+ 'Scored', compute='_compute_is_scored_question',
+ readonly=False, store=True, copy=True,
+ help="Include this question as part of quiz scoring. Requires an answer and answer score to be taken into account.")
+ # -- scoreable/answerable simple answer_types: numerical_box / date / datetime
+ answer_numerical_box = fields.Float('Correct numerical answer', help="Correct number answer for this question.")
+ answer_date = fields.Date('Correct date answer', help="Correct date answer for this question.")
+ answer_datetime = fields.Datetime('Correct datetime answer', help="Correct date and time answer for this question.")
+ answer_score = fields.Float('Score', help="Score value for a correct answer to this question.")
+ # -- char_box
+ save_as_email = fields.Boolean(
+ "Save as user email", compute='_compute_save_as_email', readonly=False, store=True, copy=True,
+ help="If checked, this option will save the user's answer as its email address.")
+ save_as_nickname = fields.Boolean(
+ "Save as user nickname", compute='_compute_save_as_nickname', readonly=False, store=True, copy=True,
+ help="If checked, this option will save the user's answer as its nickname.")
+ # -- simple choice / multiple choice / matrix
+ suggested_answer_ids = fields.One2many(
+ 'survey.question.answer', 'question_id', string='Types of answers', copy=True,
+ help='Labels used for proposed choices: simple choice, multiple choice and columns of matrix')
+ allow_value_image = fields.Boolean('Images on answers', help='Display images in addition to answer label. Valid only for simple / multiple choice questions.')
+ # -- matrix
+ matrix_subtype = fields.Selection([
+ ('simple', 'One choice per row'),
+ ('multiple', 'Multiple choices per row')], string='Matrix Type', default='simple')
+ matrix_row_ids = fields.One2many(
+ 'survey.question.answer', 'matrix_question_id', string='Matrix Rows', copy=True,
+ help='Labels used for proposed choices: rows of matrix')
+ # -- display & timing options
+ column_nb = fields.Selection([
+ ('12', '1'), ('6', '2'), ('4', '3'), ('3', '4'), ('2', '6')],
+ string='Number of columns', default='12',
+ help='These options refer to col-xx-[12|6|4|3|2] classes in Bootstrap for dropdown-based simple and multiple choice questions.')
+ is_time_limited = fields.Boolean("The question is limited in time",
+ help="Currently only supported for live sessions.")
+ time_limit = fields.Integer("Time limit (seconds)")
+ # -- comments (simple choice, multiple choice, matrix (without count as an answer))
+ comments_allowed = fields.Boolean('Show Comments Field')
+ comments_message = fields.Char('Comment Message', translate=True, default=lambda self: _("If other, please specify:"))
+ comment_count_as_answer = fields.Boolean('Comment Field is an Answer Choice')
+ # question validation
+ validation_required = fields.Boolean('Validate entry')
+ validation_email = fields.Boolean('Input must be an email')
+ validation_length_min = fields.Integer('Minimum Text Length', default=0)
+ validation_length_max = fields.Integer('Maximum Text Length', default=0)
+ validation_min_float_value = fields.Float('Minimum value', default=0.0)
+ validation_max_float_value = fields.Float('Maximum value', default=0.0)
+ validation_min_date = fields.Date('Minimum Date')
+ validation_max_date = fields.Date('Maximum Date')
+ validation_min_datetime = fields.Datetime('Minimum Datetime')
+ validation_max_datetime = fields.Datetime('Maximum Datetime')
+ validation_error_msg = fields.Char('Validation Error message', translate=True, default=lambda self: _("The answer you entered is not valid."))
+ constr_mandatory = fields.Boolean('Mandatory Answer')
+ constr_error_msg = fields.Char('Error message', translate=True, default=lambda self: _("This question requires an answer."))
+ # answers
+ user_input_line_ids = fields.One2many(
+ 'survey.user_input.line', 'question_id', string='Answers',
+ domain=[('skipped', '=', False)], groups='survey.group_survey_user')
+
+ # Conditional display
+ is_conditional = fields.Boolean(
+ string='Conditional Display', copy=False, help="""If checked, this question will be displayed only
+ if the specified conditional answer have been selected in a previous question""")
+ triggering_question_id = fields.Many2one(
+ 'survey.question', string="Triggering Question", copy=False, compute="_compute_triggering_question_id",
+ store=True, readonly=False, help="Question containing the triggering answer to display the current question.",
+ domain="""[('survey_id', '=', survey_id),
+ '&', ('question_type', 'in', ['simple_choice', 'multiple_choice']),
+ '|',
+ ('sequence', '<', sequence),
+ '&', ('sequence', '=', sequence), ('id', '<', id)]""")
+ triggering_answer_id = fields.Many2one(
+ 'survey.question.answer', string="Triggering Answer", copy=False, compute="_compute_triggering_answer_id",
+ store=True, readonly=False, help="Answer that will trigger the display of the current question.",
+ domain="[('question_id', '=', triggering_question_id)]")
+
+ _sql_constraints = [
+ ('positive_len_min', 'CHECK (validation_length_min >= 0)', 'A length must be positive!'),
+ ('positive_len_max', 'CHECK (validation_length_max >= 0)', 'A length must be positive!'),
+ ('validation_length', 'CHECK (validation_length_min <= validation_length_max)', 'Max length cannot be smaller than min length!'),
+ ('validation_float', 'CHECK (validation_min_float_value <= validation_max_float_value)', 'Max value cannot be smaller than min value!'),
+ ('validation_date', 'CHECK (validation_min_date <= validation_max_date)', 'Max date cannot be smaller than min date!'),
+ ('validation_datetime', 'CHECK (validation_min_datetime <= validation_max_datetime)', 'Max datetime cannot be smaller than min datetime!'),
+ ('positive_answer_score', 'CHECK (answer_score >= 0)', 'An answer score for a non-multiple choice question cannot be negative!'),
+ ('scored_datetime_have_answers', "CHECK (is_scored_question != True OR question_type != 'datetime' OR answer_datetime is not null)",
+ 'All "Is a scored question = True" and "Question Type: Datetime" questions need an answer'),
+ ('scored_date_have_answers', "CHECK (is_scored_question != True OR question_type != 'date' OR answer_date is not null)",
+ 'All "Is a scored question = True" and "Question Type: Date" questions need an answer')
+ ]
+
+ @api.depends('is_page')
+ def _compute_question_type(self):
+ for question in self:
+ if not question.question_type or question.is_page:
+ question.question_type = False
+
+ @api.depends('survey_id.question_and_page_ids.is_page', 'survey_id.question_and_page_ids.sequence')
+ def _compute_question_ids(self):
+ """Will take all questions of the survey for which the index is higher than the index of this page
+ and lower than the index of the next page."""
+ for question in self:
+ if question.is_page:
+ next_page_index = False
+ for page in question.survey_id.page_ids:
+ if page._index() > question._index():
+ next_page_index = page._index()
+ break
+
+ question.question_ids = question.survey_id.question_ids.filtered(
+ lambda q: q._index() > question._index() and (not next_page_index or q._index() < next_page_index)
+ )
+ else:
+ question.question_ids = self.env['survey.question']
+
+ @api.depends('survey_id.question_and_page_ids.is_page', 'survey_id.question_and_page_ids.sequence')
+ def _compute_page_id(self):
+ """Will find the page to which this question belongs to by looking inside the corresponding survey"""
+ for question in self:
+ if question.is_page:
+ question.page_id = None
+ else:
+ page = None
+ for q in question.survey_id.question_and_page_ids.sorted():
+ if q == question:
+ break
+ if q.is_page:
+ page = q
+ question.page_id = page
+
+ @api.depends('question_type', 'validation_email')
+ def _compute_save_as_email(self):
+ for question in self:
+ if question.question_type != 'char_box' or not question.validation_email:
+ question.save_as_email = False
+
+ @api.depends('question_type')
+ def _compute_save_as_nickname(self):
+ for question in self:
+ if question.question_type != 'char_box':
+ question.save_as_nickname = False
+
+ @api.depends('is_conditional')
+ def _compute_triggering_question_id(self):
+ """ Used as an 'onchange' : Reset the triggering question if user uncheck 'Conditional Display'
+ Avoid CacheMiss : set the value to False if the value is not set yet."""
+ for question in self:
+ if not question.is_conditional or question.triggering_question_id is None:
+ question.triggering_question_id = False
+
+ @api.depends('triggering_question_id')
+ def _compute_triggering_answer_id(self):
+ """ Used as an 'onchange' : Reset the triggering answer if user unset or change the triggering question
+ or uncheck 'Conditional Display'.
+ Avoid CacheMiss : set the value to False if the value is not set yet."""
+ for question in self:
+ if not question.triggering_question_id \
+ or question.triggering_question_id != question.triggering_answer_id.question_id\
+ or question.triggering_answer_id is None:
+ question.triggering_answer_id = False
+
+ @api.depends('question_type', 'scoring_type', 'answer_date', 'answer_datetime', 'answer_numerical_box')
+ def _compute_is_scored_question(self):
+ """ Computes whether a question "is scored" or not. Handles following cases:
+ - inconsistent Boolean=None edge case that breaks tests => False
+ - survey is not scored => False
+ - 'date'/'datetime'/'numerical_box' question types w/correct answer => True
+ (implied without user having to activate, except for numerical whose correct value is 0.0)
+ - 'simple_choice / multiple_choice': set to True even if logic is a bit different (coming from answers)
+ - question_type isn't scoreable (note: choice questions scoring logic handled separately) => False
+ """
+ for question in self:
+ if question.is_scored_question is None or question.scoring_type == 'no_scoring':
+ question.is_scored_question = False
+ elif question.question_type == 'date':
+ question.is_scored_question = bool(question.answer_date)
+ elif question.question_type == 'datetime':
+ question.is_scored_question = bool(question.answer_datetime)
+ elif question.question_type == 'numerical_box' and question.answer_numerical_box:
+ question.is_scored_question = True
+ elif question.question_type in ['simple_choice', 'multiple_choice']:
+ question.is_scored_question = True
+ else:
+ question.is_scored_question = False
+
+ # ------------------------------------------------------------
+ # VALIDATION
+ # ------------------------------------------------------------
+
+ def validate_question(self, answer, comment=None):
+ """ Validate question, depending on question type and parameters
+ for simple choice, text, date and number, answer is simply the answer of the question.
+ For other multiple choices questions, answer is a list of answers (the selected choices
+ or a list of selected answers per question -for matrix type-):
+ - Simple answer : answer = 'example' or 2 or question_answer_id or 2019/10/10
+ - Multiple choice : answer = [question_answer_id1, question_answer_id2, question_answer_id3]
+ - Matrix: answer = { 'rowId1' : [colId1, colId2,...], 'rowId2' : [colId1, colId3, ...] }
+
+ return dict {question.id (int): error (str)} -> empty dict if no validation error.
+ """
+ self.ensure_one()
+ if isinstance(answer, str):
+ answer = answer.strip()
+ # Empty answer to mandatory question
+ if self.constr_mandatory and not answer and self.question_type not in ['simple_choice', 'multiple_choice']:
+ return {self.id: self.constr_error_msg}
+
+ # because in choices question types, comment can count as answer
+ if answer or self.question_type in ['simple_choice', 'multiple_choice']:
+ if self.question_type == 'char_box':
+ return self._validate_char_box(answer)
+ elif self.question_type == 'numerical_box':
+ return self._validate_numerical_box(answer)
+ elif self.question_type in ['date', 'datetime']:
+ return self._validate_date(answer)
+ elif self.question_type in ['simple_choice', 'multiple_choice']:
+ return self._validate_choice(answer, comment)
+ elif self.question_type == 'matrix':
+ return self._validate_matrix(answer)
+ return {}
+
+ def _validate_char_box(self, answer):
+ # Email format validation
+ # all the strings of the form "<something>@<anything>.<extension>" will be accepted
+ if self.validation_email:
+ if not tools.email_normalize(answer):
+ return {self.id: _('This answer must be an email address')}
+
+ # Answer validation (if properly defined)
+ # Length of the answer must be in a range
+ if self.validation_required:
+ if not (self.validation_length_min <= len(answer) <= self.validation_length_max):
+ return {self.id: self.validation_error_msg}
+ return {}
+
+ def _validate_numerical_box(self, answer):
+ try:
+ floatanswer = float(answer)
+ except ValueError:
+ return {self.id: _('This is not a number')}
+
+ if self.validation_required:
+ # Answer is not in the right range
+ with tools.ignore(Exception):
+ if not (self.validation_min_float_value <= floatanswer <= self.validation_max_float_value):
+ return {self.id: self.validation_error_msg}
+ return {}
+
+ def _validate_date(self, answer):
+ isDatetime = self.question_type == 'datetime'
+ # Checks if user input is a date
+ try:
+ dateanswer = fields.Datetime.from_string(answer) if isDatetime else fields.Date.from_string(answer)
+ except ValueError:
+ return {self.id: _('This is not a date')}
+ if self.validation_required:
+ # Check if answer is in the right range
+ if isDatetime:
+ min_date = fields.Datetime.from_string(self.validation_min_datetime)
+ max_date = fields.Datetime.from_string(self.validation_max_datetime)
+ dateanswer = fields.Datetime.from_string(answer)
+ else:
+ min_date = fields.Date.from_string(self.validation_min_date)
+ max_date = fields.Date.from_string(self.validation_max_date)
+ dateanswer = fields.Date.from_string(answer)
+
+ if (min_date and max_date and not (min_date <= dateanswer <= max_date))\
+ or (min_date and not min_date <= dateanswer)\
+ or (max_date and not dateanswer <= max_date):
+ return {self.id: self.validation_error_msg}
+ return {}
+
+ def _validate_choice(self, answer, comment):
+ # Empty comment
+ if self.constr_mandatory \
+ and not answer \
+ and not (self.comments_allowed and self.comment_count_as_answer and comment):
+ return {self.id: self.constr_error_msg}
+ return {}
+
+ def _validate_matrix(self, answers):
+ # Validate that each line has been answered
+ if self.constr_mandatory and len(self.matrix_row_ids) != len(answers):
+ return {self.id: self.constr_error_msg}
+ return {}
+
+ def _index(self):
+ """We would normally just use the 'sequence' field of questions BUT, if the pages and questions are
+ created without ever moving records around, the sequence field can be set to 0 for all the questions.
+
+ However, the order of the recordset is always correct so we can rely on the index method."""
+ self.ensure_one()
+ return list(self.survey_id.question_and_page_ids).index(self)
+
+ # ------------------------------------------------------------
+ # STATISTICS / REPORTING
+ # ------------------------------------------------------------
+
+ def _prepare_statistics(self, user_input_lines):
+ """ Compute statistical data for questions by counting number of vote per choice on basis of filter """
+ all_questions_data = []
+ for question in self:
+ question_data = {'question': question, 'is_page': question.is_page}
+
+ if question.is_page:
+ all_questions_data.append(question_data)
+ continue
+
+ # fetch answer lines, separate comments from real answers
+ all_lines = user_input_lines.filtered(lambda line: line.question_id == question)
+ if question.question_type in ['simple_choice', 'multiple_choice', 'matrix']:
+ answer_lines = all_lines.filtered(
+ lambda line: line.answer_type == 'suggestion' or (
+ line.answer_type == 'char_box' and question.comment_count_as_answer)
+ )
+ comment_line_ids = all_lines.filtered(lambda line: line.answer_type == 'char_box')
+ else:
+ answer_lines = all_lines
+ comment_line_ids = self.env['survey.user_input.line']
+ skipped_lines = answer_lines.filtered(lambda line: line.skipped)
+ done_lines = answer_lines - skipped_lines
+ question_data.update(
+ answer_line_ids=answer_lines,
+ answer_line_done_ids=done_lines,
+ answer_input_done_ids=done_lines.mapped('user_input_id'),
+ answer_input_skipped_ids=skipped_lines.mapped('user_input_id'),
+ comment_line_ids=comment_line_ids)
+ question_data.update(question._get_stats_summary_data(answer_lines))
+
+ # prepare table and graph data
+ table_data, graph_data = question._get_stats_data(answer_lines)
+ question_data['table_data'] = table_data
+ question_data['graph_data'] = json.dumps(graph_data)
+
+ all_questions_data.append(question_data)
+ return all_questions_data
+
+ def _get_stats_data(self, user_input_lines):
+ if self.question_type == 'simple_choice':
+ return self._get_stats_data_answers(user_input_lines)
+ elif self.question_type == 'multiple_choice':
+ table_data, graph_data = self._get_stats_data_answers(user_input_lines)
+ return table_data, [{'key': self.title, 'values': graph_data}]
+ elif self.question_type == 'matrix':
+ return self._get_stats_graph_data_matrix(user_input_lines)
+ return [line for line in user_input_lines], []
+
+ def _get_stats_data_answers(self, user_input_lines):
+ """ Statistics for question.answer based questions (simple choice, multiple
+ choice.). A corner case with a void record survey.question.answer is added
+ to count comments that should be considered as valid answers. This small hack
+ allow to have everything available in the same standard structure. """
+ suggested_answers = [answer for answer in self.mapped('suggested_answer_ids')]
+ if self.comment_count_as_answer:
+ suggested_answers += [self.env['survey.question.answer']]
+
+ count_data = dict.fromkeys(suggested_answers, 0)
+ for line in user_input_lines:
+ if line.suggested_answer_id or (line.value_char_box and self.comment_count_as_answer):
+ count_data[line.suggested_answer_id] += 1
+
+ table_data = [{
+ 'value': _('Other (see comments)') if not sug_answer else sug_answer.value,
+ 'suggested_answer': sug_answer,
+ 'count': count_data[sug_answer]
+ }
+ for sug_answer in suggested_answers]
+ graph_data = [{
+ 'text': _('Other (see comments)') if not sug_answer else sug_answer.value,
+ 'count': count_data[sug_answer]
+ }
+ for sug_answer in suggested_answers]
+
+ return table_data, graph_data
+
+ def _get_stats_graph_data_matrix(self, user_input_lines):
+ suggested_answers = self.mapped('suggested_answer_ids')
+ matrix_rows = self.mapped('matrix_row_ids')
+
+ count_data = dict.fromkeys(itertools.product(matrix_rows, suggested_answers), 0)
+ for line in user_input_lines:
+ if line.matrix_row_id and line.suggested_answer_id:
+ count_data[(line.matrix_row_id, line.suggested_answer_id)] += 1
+
+ table_data = [{
+ 'row': row,
+ 'columns': [{
+ 'suggested_answer': sug_answer,
+ 'count': count_data[(row, sug_answer)]
+ } for sug_answer in suggested_answers],
+ } for row in matrix_rows]
+ graph_data = [{
+ 'key': sug_answer.value,
+ 'values': [{
+ 'text': row.value,
+ 'count': count_data[(row, sug_answer)]
+ }
+ for row in matrix_rows
+ ]
+ } for sug_answer in suggested_answers]
+
+ return table_data, graph_data
+
+ def _get_stats_summary_data(self, user_input_lines):
+ stats = {}
+ if self.question_type in ['simple_choice', 'multiple_choice']:
+ stats.update(self._get_stats_summary_data_choice(user_input_lines))
+ elif self.question_type == 'numerical_box':
+ stats.update(self._get_stats_summary_data_numerical(user_input_lines))
+
+ if self.question_type in ['numerical_box', 'date', 'datetime']:
+ stats.update(self._get_stats_summary_data_scored(user_input_lines))
+ return stats
+
+ def _get_stats_summary_data_choice(self, user_input_lines):
+ right_inputs, partial_inputs = self.env['survey.user_input'], self.env['survey.user_input']
+ right_answers = self.suggested_answer_ids.filtered(lambda label: label.is_correct)
+ if self.question_type == 'multiple_choice':
+ for user_input, lines in tools.groupby(user_input_lines, operator.itemgetter('user_input_id')):
+ user_input_answers = self.env['survey.user_input.line'].concat(*lines).filtered(lambda l: l.answer_is_correct).mapped('suggested_answer_id')
+ if user_input_answers and user_input_answers < right_answers:
+ partial_inputs += user_input
+ elif user_input_answers:
+ right_inputs += user_input
+ else:
+ right_inputs = user_input_lines.filtered(lambda line: line.answer_is_correct).mapped('user_input_id')
+ return {
+ 'right_answers': right_answers,
+ 'right_inputs_count': len(right_inputs),
+ 'partial_inputs_count': len(partial_inputs),
+ }
+
+ def _get_stats_summary_data_numerical(self, user_input_lines):
+ all_values = user_input_lines.filtered(lambda line: not line.skipped).mapped('value_numerical_box')
+ lines_sum = sum(all_values)
+ return {
+ 'numerical_max': max(all_values, default=0),
+ 'numerical_min': min(all_values, default=0),
+ 'numerical_average': round(lines_sum / (len(all_values) or 1), 2),
+ }
+
+ def _get_stats_summary_data_scored(self, user_input_lines):
+ return {
+ 'common_lines': collections.Counter(
+ user_input_lines.filtered(lambda line: not line.skipped).mapped('value_%s' % self.question_type)
+ ).most_common(5) if self.question_type != 'datetime' else [],
+ 'right_inputs_count': len(user_input_lines.filtered(lambda line: line.answer_is_correct).mapped('user_input_id'))
+ }
+
+
+class SurveyQuestionAnswer(models.Model):
+ """ A preconfigured answer for a question. This model stores values used
+ for
+
+ * simple choice, multiple choice: proposed values for the selection /
+ radio;
+ * matrix: row and column values;
+
+ """
+ _name = 'survey.question.answer'
+ _rec_name = 'value'
+ _order = 'sequence, id'
+ _description = 'Survey Label'
+
+ question_id = fields.Many2one('survey.question', string='Question', ondelete='cascade')
+ matrix_question_id = fields.Many2one('survey.question', string='Question (as matrix row)', ondelete='cascade')
+ sequence = fields.Integer('Label Sequence order', default=10)
+ value = fields.Char('Suggested value', translate=True, required=True)
+ value_image = fields.Image('Image', max_width=256, max_height=256)
+ is_correct = fields.Boolean('Is a correct answer')
+ answer_score = fields.Float('Score for this choice', help="A positive score indicates a correct choice; a negative or null score indicates a wrong answer")
+
+ @api.constrains('question_id', 'matrix_question_id')
+ def _check_question_not_empty(self):
+ """Ensure that field question_id XOR field matrix_question_id is not null"""
+ for label in self:
+ if not bool(label.question_id) != bool(label.matrix_question_id):
+ raise ValidationError(_("A label must be attached to only one question."))
diff --git a/addons/survey/models/survey_survey.py b/addons/survey/models/survey_survey.py
new file mode 100644
index 00000000..4188a27f
--- /dev/null
+++ b/addons/survey/models/survey_survey.py
@@ -0,0 +1,1066 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+import json
+import random
+import uuid
+import werkzeug
+
+from odoo import api, exceptions, fields, models, _
+from odoo.exceptions import AccessError, UserError
+from odoo.osv import expression
+from odoo.tools import is_html_empty
+
+
+class Survey(models.Model):
+ """ Settings for a multi-page/multi-question survey. Each survey can have one or more attached pages
+ and each page can display one or more questions. """
+ _name = 'survey.survey'
+ _description = 'Survey'
+ _rec_name = 'title'
+ _inherit = ['mail.thread', 'mail.activity.mixin']
+
+ def _get_default_access_token(self):
+ return str(uuid.uuid4())
+
+ def _get_default_session_code(self):
+ """ Attempt to generate a session code for our survey.
+ The method will first try to generate 20 codes with 4 digits each and check if any are colliding.
+ If we have at least one non-colliding code, we use it.
+ If all 20 generated codes are colliding, we try with 20 codes of 5 digits,
+ then 6, ... up to 10 digits. """
+
+ for digits_count in range(4, 10):
+ range_lower_bound = 1 * (10 ** (digits_count - 1))
+ range_upper_bound = (range_lower_bound * 10) - 1
+ code_candidates = set([str(random.randint(range_lower_bound, range_upper_bound)) for i in range(20)])
+ colliding_codes = self.sudo().search_read(
+ [('session_code', 'in', list(code_candidates))],
+ ['session_code']
+ )
+ code_candidates -= set([colliding_code['session_code'] for colliding_code in colliding_codes])
+ if code_candidates:
+ return list(code_candidates)[0]
+
+ return False # could not generate a code
+
+ # description
+ title = fields.Char('Survey Title', required=True, translate=True)
+ color = fields.Integer('Color Index', default=0)
+ description = fields.Html(
+ "Description", translate=True, sanitize=False, # TDE FIXME: find a way to authorize videos
+ help="The description will be displayed on the home page of the survey. You can use this to give the purpose and guidelines to your candidates before they start it.")
+ description_done = fields.Html(
+ "End Message", translate=True,
+ help="This message will be displayed when survey is completed")
+ background_image = fields.Binary("Background Image")
+ active = fields.Boolean("Active", default=True)
+ state = fields.Selection(selection=[
+ ('draft', 'Draft'), ('open', 'In Progress'), ('closed', 'Closed')
+ ], string="Survey Stage", default='draft', required=True,
+ group_expand='_read_group_states')
+ # questions
+ question_and_page_ids = fields.One2many('survey.question', 'survey_id', string='Sections and Questions', copy=True)
+ page_ids = fields.One2many('survey.question', string='Pages', compute="_compute_page_and_question_ids")
+ question_ids = fields.One2many('survey.question', string='Questions', compute="_compute_page_and_question_ids")
+ questions_layout = fields.Selection([
+ ('one_page', 'One page with all the questions'),
+ ('page_per_section', 'One page per section'),
+ ('page_per_question', 'One page per question')],
+ string="Layout", required=True, default='one_page')
+ questions_selection = fields.Selection([
+ ('all', 'All questions'),
+ ('random', 'Randomized per section')],
+ string="Selection", required=True, default='all',
+ help="If randomized is selected, you can configure the number of random questions by section. This mode is ignored in live session.")
+ progression_mode = fields.Selection([
+ ('percent', 'Percentage'),
+ ('number', 'Number')], string='Progression Mode', default='percent',
+ help="If Number is selected, it will display the number of questions answered on the total number of question to answer.")
+ # attendees
+ user_input_ids = fields.One2many('survey.user_input', 'survey_id', string='User responses', readonly=True, groups='survey.group_survey_user')
+ # security / access
+ access_mode = fields.Selection([
+ ('public', 'Anyone with the link'),
+ ('token', 'Invited people only')], string='Access Mode',
+ default='public', required=True)
+ access_token = fields.Char('Access Token', default=lambda self: self._get_default_access_token(), copy=False)
+ users_login_required = fields.Boolean('Login Required', help="If checked, users have to login before answering even with a valid token.")
+ users_can_go_back = fields.Boolean('Users can go back', help="If checked, users can go back to previous pages.")
+ users_can_signup = fields.Boolean('Users can signup', compute='_compute_users_can_signup')
+ # statistics
+ answer_count = fields.Integer("Registered", compute="_compute_survey_statistic")
+ answer_done_count = fields.Integer("Attempts", compute="_compute_survey_statistic")
+ answer_score_avg = fields.Float("Avg Score %", compute="_compute_survey_statistic")
+ success_count = fields.Integer("Success", compute="_compute_survey_statistic")
+ success_ratio = fields.Integer("Success Ratio", compute="_compute_survey_statistic")
+ # scoring
+ scoring_type = fields.Selection([
+ ('no_scoring', 'No scoring'),
+ ('scoring_with_answers', 'Scoring with answers at the end'),
+ ('scoring_without_answers', 'Scoring without answers at the end')],
+ string="Scoring", required=True, default='no_scoring')
+ scoring_success_min = fields.Float('Success %', default=80.0)
+ # attendees context: attempts and time limitation
+ is_attempts_limited = fields.Boolean('Limited number of attempts', help="Check this option if you want to limit the number of attempts per user",
+ compute="_compute_is_attempts_limited", store=True, readonly=False)
+ attempts_limit = fields.Integer('Number of attempts', default=1)
+ is_time_limited = fields.Boolean('The survey is limited in time')
+ time_limit = fields.Float("Time limit (minutes)", default=10)
+ # certification
+ certification = fields.Boolean('Is a Certification', compute='_compute_certification',
+ readonly=False, store=True)
+ certification_mail_template_id = fields.Many2one(
+ 'mail.template', 'Email Template',
+ domain="[('model', '=', 'survey.user_input')]",
+ help="Automated email sent to the user when he succeeds the certification, containing his certification document.")
+ certification_report_layout = fields.Selection([
+ ('modern_purple', 'Modern Purple'),
+ ('modern_blue', 'Modern Blue'),
+ ('modern_gold', 'Modern Gold'),
+ ('classic_purple', 'Classic Purple'),
+ ('classic_blue', 'Classic Blue'),
+ ('classic_gold', 'Classic Gold')],
+ string='Certification template', default='modern_purple')
+ # Certification badge
+ # certification_badge_id_dummy is used to have two different behaviours in the form view :
+ # - If the certification badge is not set, show certification_badge_id and only display create option in the m2o
+ # - If the certification badge is set, show certification_badge_id_dummy in 'no create' mode.
+ # So it can be edited but not removed or replaced.
+ certification_give_badge = fields.Boolean('Give Badge', compute='_compute_certification_give_badge',
+ readonly=False, store=True)
+ certification_badge_id = fields.Many2one('gamification.badge', 'Certification Badge')
+ certification_badge_id_dummy = fields.Many2one(related='certification_badge_id', string='Certification Badge ')
+ # live sessions
+ session_state = fields.Selection([
+ ('ready', 'Ready'),
+ ('in_progress', 'In Progress'),
+ ], string="Session State", copy=False)
+ session_code = fields.Char('Session Code', default=lambda self: self._get_default_session_code(), copy=False,
+ help="This code will be used by your attendees to reach your session. Feel free to customize it however you like!")
+ session_link = fields.Char('Session Link', compute='_compute_session_link')
+ # live sessions - current question fields
+ session_question_id = fields.Many2one('survey.question', string="Current Question", copy=False,
+ help="The current question of the survey session.")
+ session_start_time = fields.Datetime("Current Session Start Time", copy=False)
+ session_question_start_time = fields.Datetime("Current Question Start Time", copy=False,
+ help="The time at which the current question has started, used to handle the timer for attendees.")
+ session_answer_count = fields.Integer("Answers Count", compute='_compute_session_answer_count')
+ session_question_answer_count = fields.Integer("Question Answers Count", compute='_compute_session_question_answer_count')
+ # live sessions - settings
+ session_show_leaderboard = fields.Boolean("Show Session Leaderboard", compute='_compute_session_show_leaderboard',
+ help="Whether or not we want to show the attendees leaderboard for this survey.")
+ session_speed_rating = fields.Boolean("Reward quick answers", help="Attendees get more points if they answer quickly")
+ # conditional questions management
+ has_conditional_questions = fields.Boolean("Contains conditional questions", compute="_compute_has_conditional_questions")
+
+ _sql_constraints = [
+ ('access_token_unique', 'unique(access_token)', 'Access token should be unique'),
+ ('session_code_unique', 'unique(session_code)', 'Session code should be unique'),
+ ('certification_check', "CHECK( scoring_type!='no_scoring' OR certification=False )",
+ 'You can only create certifications for surveys that have a scoring mechanism.'),
+ ('scoring_success_min_check', "CHECK( scoring_success_min IS NULL OR (scoring_success_min>=0 AND scoring_success_min<=100) )",
+ 'The percentage of success has to be defined between 0 and 100.'),
+ ('time_limit_check', "CHECK( (is_time_limited=False) OR (time_limit is not null AND time_limit > 0) )",
+ 'The time limit needs to be a positive number if the survey is time limited.'),
+ ('attempts_limit_check', "CHECK( (is_attempts_limited=False) OR (attempts_limit is not null AND attempts_limit > 0) )",
+ 'The attempts limit needs to be a positive number if the survey has a limited number of attempts.'),
+ ('badge_uniq', 'unique (certification_badge_id)', "The badge for each survey should be unique!"),
+ ('give_badge_check', "CHECK(certification_give_badge=False OR (certification_give_badge=True AND certification_badge_id is not null))",
+ 'Certification badge must be configured if Give Badge is set.'),
+ ]
+
+ def _compute_users_can_signup(self):
+ signup_allowed = self.env['res.users'].sudo()._get_signup_invitation_scope() == 'b2c'
+ for survey in self:
+ survey.users_can_signup = signup_allowed
+
+ @api.depends('user_input_ids.state', 'user_input_ids.test_entry', 'user_input_ids.scoring_percentage', 'user_input_ids.scoring_success')
+ def _compute_survey_statistic(self):
+ default_vals = {
+ 'answer_count': 0, 'answer_done_count': 0, 'success_count': 0,
+ 'answer_score_avg': 0.0, 'success_ratio': 0.0
+ }
+ stat = dict((cid, dict(default_vals, answer_score_avg_total=0.0)) for cid in self.ids)
+ UserInput = self.env['survey.user_input']
+ base_domain = ['&', ('survey_id', 'in', self.ids), ('test_entry', '!=', True)]
+
+ read_group_res = UserInput.read_group(base_domain, ['survey_id', 'state'], ['survey_id', 'state', 'scoring_percentage', 'scoring_success'], lazy=False)
+ for item in read_group_res:
+ stat[item['survey_id'][0]]['answer_count'] += item['__count']
+ stat[item['survey_id'][0]]['answer_score_avg_total'] += item['scoring_percentage']
+ if item['state'] == 'done':
+ stat[item['survey_id'][0]]['answer_done_count'] += item['__count']
+ if item['scoring_success']:
+ stat[item['survey_id'][0]]['success_count'] += item['__count']
+
+ for survey_id, values in stat.items():
+ avg_total = stat[survey_id].pop('answer_score_avg_total')
+ stat[survey_id]['answer_score_avg'] = avg_total / (stat[survey_id]['answer_done_count'] or 1)
+ stat[survey_id]['success_ratio'] = (stat[survey_id]['success_count'] / (stat[survey_id]['answer_done_count'] or 1.0))*100
+
+ for survey in self:
+ survey.update(stat.get(survey._origin.id, default_vals))
+
+ @api.depends('question_and_page_ids')
+ def _compute_page_and_question_ids(self):
+ for survey in self:
+ survey.page_ids = survey.question_and_page_ids.filtered(lambda question: question.is_page)
+ survey.question_ids = survey.question_and_page_ids - survey.page_ids
+
+ @api.depends('question_and_page_ids.is_conditional', 'users_login_required', 'access_mode')
+ def _compute_is_attempts_limited(self):
+ for survey in self:
+ if not survey.is_attempts_limited or \
+ (survey.access_mode == 'public' and not survey.users_login_required) or \
+ any(question.is_conditional for question in survey.question_and_page_ids):
+ survey.is_attempts_limited = False
+
+ @api.depends('session_start_time', 'user_input_ids')
+ def _compute_session_answer_count(self):
+ """ We have to loop since our result is dependent of the survey.session_start_time.
+ This field is currently used to display the count about a single survey, in the
+ context of sessions, so it should not matter too much. """
+
+ for survey in self:
+ answer_count = 0
+ input_count = self.env['survey.user_input'].read_group(
+ [('survey_id', '=', survey.id),
+ ('is_session_answer', '=', True),
+ ('state', '!=', 'done'),
+ ('create_date', '>=', survey.session_start_time)],
+ ['create_uid:count'],
+ ['survey_id'],
+ )
+ if input_count:
+ answer_count = input_count[0].get('create_uid', 0)
+
+ survey.session_answer_count = answer_count
+
+ @api.depends('session_question_id', 'session_start_time', 'user_input_ids.user_input_line_ids')
+ def _compute_session_question_answer_count(self):
+ """ We have to loop since our result is dependent of the survey.session_question_id and
+ the survey.session_start_time.
+ This field is currently used to display the count about a single survey, in the
+ context of sessions, so it should not matter too much. """
+ for survey in self:
+ answer_count = 0
+ input_line_count = self.env['survey.user_input.line'].read_group(
+ [('question_id', '=', survey.session_question_id.id),
+ ('survey_id', '=', survey.id),
+ ('create_date', '>=', survey.session_start_time)],
+ ['user_input_id:count_distinct'],
+ ['question_id'],
+ )
+ if input_line_count:
+ answer_count = input_line_count[0].get('user_input_id', 0)
+
+ survey.session_question_answer_count = answer_count
+
+ @api.depends('session_code')
+ def _compute_session_link(self):
+ for survey in self:
+ if survey.session_code:
+ survey.session_link = werkzeug.urls.url_join(
+ survey.get_base_url(),
+ '/s/%s' % survey.session_code)
+ else:
+ survey.session_link = werkzeug.urls.url_join(
+ survey.get_base_url(),
+ survey.get_start_url())
+
+ @api.depends('scoring_type', 'question_and_page_ids.save_as_nickname')
+ def _compute_session_show_leaderboard(self):
+ for survey in self:
+ survey.session_show_leaderboard = survey.scoring_type != 'no_scoring' and \
+ any(question.save_as_nickname for question in survey.question_and_page_ids)
+
+ @api.depends('question_and_page_ids.is_conditional')
+ def _compute_has_conditional_questions(self):
+ for survey in self:
+ survey.has_conditional_questions = any(question.is_conditional for question in survey.question_and_page_ids)
+
+ @api.depends('scoring_type')
+ def _compute_certification(self):
+ for survey in self:
+ if not survey.certification or survey.scoring_type == 'no_scoring':
+ survey.certification = False
+
+ @api.depends('users_login_required', 'certification')
+ def _compute_certification_give_badge(self):
+ for survey in self:
+ if not survey.certification_give_badge or \
+ not survey.users_login_required or \
+ not survey.certification:
+ survey.certification_give_badge = False
+
+ def _read_group_states(self, values, domain, order):
+ selection = self.env['survey.survey'].fields_get(allfields=['state'])['state']['selection']
+ return [s[0] for s in selection]
+
+ # ------------------------------------------------------------
+ # CRUD
+ # ------------------------------------------------------------
+
+ @api.model
+ def create(self, vals):
+ survey = super(Survey, self).create(vals)
+ if vals.get('certification_give_badge'):
+ survey.sudo()._create_certification_badge_trigger()
+ return survey
+
+ def write(self, vals):
+ result = super(Survey, self).write(vals)
+ if 'certification_give_badge' in vals:
+ return self.sudo()._handle_certification_badges(vals)
+ return result
+
+ def copy_data(self, default=None):
+ title = _("%s (copy)") % (self.title)
+ default = dict(default or {}, title=title)
+ return super(Survey, self).copy_data(default)
+
+ def toggle_active(self):
+ super(Survey, self).toggle_active()
+ activated = self.filtered(lambda survey: survey.active)
+ activated.mapped('certification_badge_id').action_unarchive()
+ (self - activated).mapped('certification_badge_id').action_archive()
+
+ # ------------------------------------------------------------
+ # ANSWER MANAGEMENT
+ # ------------------------------------------------------------
+
+ def _create_answer(self, user=False, partner=False, email=False, test_entry=False, check_attempts=True, **additional_vals):
+ """ Main entry point to get a token back or create a new one. This method
+ does check for current user access in order to explicitely validate
+ security.
+
+ :param user: target user asking for a token; it might be void or a
+ public user in which case an email is welcomed;
+ :param email: email of the person asking the token is no user exists;
+ """
+ self.check_access_rights('read')
+ self.check_access_rule('read')
+
+ user_inputs = self.env['survey.user_input']
+ for survey in self:
+ if partner and not user and partner.user_ids:
+ user = partner.user_ids[0]
+
+ invite_token = additional_vals.pop('invite_token', False)
+ survey._check_answer_creation(user, partner, email, test_entry=test_entry, check_attempts=check_attempts, invite_token=invite_token)
+ answer_vals = {
+ 'survey_id': survey.id,
+ 'test_entry': test_entry,
+ 'is_session_answer': survey.session_state in ['ready', 'in_progress']
+ }
+ if survey.session_state == 'in_progress':
+ # if the session is already in progress, the answer skips the 'new' state
+ answer_vals.update({
+ 'state': 'in_progress',
+ 'start_datetime': fields.Datetime.now(),
+ })
+ if user and not user._is_public():
+ answer_vals['partner_id'] = user.partner_id.id
+ answer_vals['email'] = user.email
+ answer_vals['nickname'] = user.name
+ elif partner:
+ answer_vals['partner_id'] = partner.id
+ answer_vals['email'] = partner.email
+ answer_vals['nickname'] = partner.name
+ else:
+ answer_vals['email'] = email
+ answer_vals['nickname'] = email
+
+ if invite_token:
+ answer_vals['invite_token'] = invite_token
+ elif survey.is_attempts_limited and survey.access_mode != 'public':
+ # attempts limited: create a new invite_token
+ # exception made for 'public' access_mode since the attempts pool is global because answers are
+ # created every time the user lands on '/start'
+ answer_vals['invite_token'] = self.env['survey.user_input']._generate_invite_token()
+
+ answer_vals.update(additional_vals)
+ user_inputs += user_inputs.create(answer_vals)
+
+ for question in self.mapped('question_ids').filtered(
+ lambda q: q.question_type == 'char_box' and (q.save_as_email or q.save_as_nickname)):
+ for user_input in user_inputs:
+ if question.save_as_email and user_input.email:
+ user_input.save_lines(question, user_input.email)
+ if question.save_as_nickname and user_input.nickname:
+ user_input.save_lines(question, user_input.nickname)
+
+ return user_inputs
+
+ def _check_answer_creation(self, user, partner, email, test_entry=False, check_attempts=True, invite_token=False):
+ """ Ensure conditions to create new tokens are met. """
+ self.ensure_one()
+ if test_entry:
+ # the current user must have the access rights to survey
+ if not user.has_group('survey.group_survey_user'):
+ raise exceptions.UserError(_('Creating test token is not allowed for you.'))
+ else:
+ if not self.active:
+ raise exceptions.UserError(_('Creating token for archived surveys is not allowed.'))
+ elif self.state == 'closed':
+ raise exceptions.UserError(_('Creating token for closed surveys is not allowed.'))
+ if self.access_mode == 'authentication':
+ # signup possible -> should have at least a partner to create an account
+ if self.users_can_signup and not user and not partner:
+ raise exceptions.UserError(_('Creating token for external people is not allowed for surveys requesting authentication.'))
+ # no signup possible -> should be a not public user (employee or portal users)
+ if not self.users_can_signup and (not user or user._is_public()):
+ raise exceptions.UserError(_('Creating token for external people is not allowed for surveys requesting authentication.'))
+ if self.access_mode == 'internal' and (not user or not user.has_group('base.group_user')):
+ raise exceptions.UserError(_('Creating token for anybody else than employees is not allowed for internal surveys.'))
+ if check_attempts and not self._has_attempts_left(partner or (user and user.partner_id), email, invite_token):
+ raise exceptions.UserError(_('No attempts left.'))
+
+ def _prepare_user_input_predefined_questions(self):
+ """ Will generate the questions for a randomized survey.
+ It uses the random_questions_count of every sections of the survey to
+ pick a random number of questions and returns the merged recordset """
+ self.ensure_one()
+
+ questions = self.env['survey.question']
+
+ # First append questions without page
+ for question in self.question_ids:
+ if not question.page_id:
+ questions |= question
+
+ # Then, questions in sections
+
+ for page in self.page_ids:
+ if self.questions_selection == 'all':
+ questions |= page.question_ids
+ else:
+ if page.random_questions_count > 0 and len(page.question_ids) > page.random_questions_count:
+ questions = questions.concat(*random.sample(page.question_ids, page.random_questions_count))
+ else:
+ questions |= page.question_ids
+
+ return questions
+
+ def _can_go_back(self, answer, page_or_question):
+ """ Check if the user can go back to the previous question/page for the currently
+ viewed question/page.
+ Back button needs to be configured on survey and, depending on the layout:
+ - In 'page_per_section', we can go back if we're not on the first page
+ - In 'page_per_question', we can go back if:
+ - It is not a session answer (doesn't make sense to go back in session context)
+ - We are not on the first question
+ - The survey does not have pages OR this is not the first page of the survey
+ (pages are displayed in 'page_per_question' layout when they have a description, see PR#44271)
+ """
+ self.ensure_one()
+
+ if self.users_can_go_back and answer.state == 'in_progress':
+ if self.questions_layout == 'page_per_section' and page_or_question != self.page_ids[0]:
+ return True
+ elif self.questions_layout == 'page_per_question' and \
+ not answer.is_session_answer and \
+ page_or_question != answer.predefined_question_ids[0] \
+ and (not self.page_ids or page_or_question != self.page_ids[0]):
+ return True
+
+ return False
+
+ def _has_attempts_left(self, partner, email, invite_token):
+ self.ensure_one()
+
+ if (self.access_mode != 'public' or self.users_login_required) and self.is_attempts_limited:
+ return self._get_number_of_attempts_lefts(partner, email, invite_token) > 0
+
+ return True
+
+ def _get_number_of_attempts_lefts(self, partner, email, invite_token):
+ """ Returns the number of attempts left. """
+ self.ensure_one()
+
+ domain = [
+ ('survey_id', '=', self.id),
+ ('test_entry', '=', False),
+ ('state', '=', 'done')
+ ]
+
+ if partner:
+ domain = expression.AND([domain, [('partner_id', '=', partner.id)]])
+ else:
+ domain = expression.AND([domain, [('email', '=', email)]])
+
+ if invite_token:
+ domain = expression.AND([domain, [('invite_token', '=', invite_token)]])
+
+ return self.attempts_limit - self.env['survey.user_input'].search_count(domain)
+
+ # ------------------------------------------------------------
+ # QUESTIONS MANAGEMENT
+ # ------------------------------------------------------------
+
+ @api.model
+ def _get_pages_or_questions(self, user_input):
+ """ Returns the pages or questions (depending on the layout) that will be shown
+ to the user taking the survey.
+ In 'page_per_question' layout, we also want to show pages that have a description. """
+
+ result = self.env['survey.question']
+ if self.questions_layout == 'page_per_section':
+ result = self.page_ids
+ elif self.questions_layout == 'page_per_question':
+ if self.questions_selection == 'random' and not self.session_state:
+ result = user_input.predefined_question_ids
+ else:
+ result = self.question_and_page_ids.filtered(
+ lambda question: not question.is_page or not is_html_empty(question.description))
+
+ return result
+
+ def _get_next_page_or_question(self, user_input, page_or_question_id, go_back=False):
+ """ Generalized logic to retrieve the next question or page to show on the survey.
+ It's based on the page_or_question_id parameter, that is usually the currently displayed question/page.
+
+ There is a special case when the survey is configured with conditional questions:
+ - for "page_per_question" layout, the next question to display depends on the selected answers and
+ the questions 'hierarchy'.
+ - for "page_per_section" layout, before returning the result, we check that it contains at least a question
+ (all section questions could be disabled based on previously selected answers)
+
+ The whole logic is inverted if "go_back" is passed as True.
+
+ As pages with description are considered as potential question to display, we show the page
+ if it contains at least one active question or a description.
+
+ :param user_input: user's answers
+ :param page_or_question_id: current page or question id
+ :param go_back: reverse the logic and get the PREVIOUS question/page
+ :return: next or previous question/page
+ """
+
+ survey = user_input.survey_id
+ pages_or_questions = survey._get_pages_or_questions(user_input)
+ Question = self.env['survey.question']
+
+ # Get Next
+ if not go_back:
+ if not pages_or_questions:
+ return Question
+ # First page
+ if page_or_question_id == 0:
+ return pages_or_questions[0]
+
+ current_page_index = pages_or_questions.ids.index(page_or_question_id)
+
+ # Get previous and we are on first page OR Get Next and we are on last page
+ if (go_back and current_page_index == 0) or (not go_back and current_page_index == len(pages_or_questions) - 1):
+ return Question
+
+ # Conditional Questions Management
+ triggering_answer_by_question, triggered_questions_by_answer, selected_answers = user_input._get_conditional_values()
+ inactive_questions = user_input._get_inactive_conditional_questions()
+ if survey.questions_layout == 'page_per_question':
+ question_candidates = pages_or_questions[0:current_page_index] if go_back \
+ else pages_or_questions[current_page_index + 1:]
+ for question in question_candidates.sorted(reverse=go_back):
+ # pages with description are potential questions to display (are part of question_candidates)
+ if question.is_page:
+ contains_active_question = any(sub_question not in inactive_questions for sub_question in question.question_ids)
+ is_description_section = not question.question_ids and not is_html_empty(question.description)
+ if contains_active_question or is_description_section:
+ return question
+ else:
+ triggering_answer = triggering_answer_by_question.get(question)
+ if not triggering_answer or triggering_answer in selected_answers:
+ # question is visible because not conditioned or conditioned by a selected answer
+ return question
+ elif survey.questions_layout == 'page_per_section':
+ section_candidates = pages_or_questions[0:current_page_index] if go_back \
+ else pages_or_questions[current_page_index + 1:]
+ for section in section_candidates.sorted(reverse=go_back):
+ contains_active_question = any(question not in inactive_questions for question in section.question_ids)
+ is_description_section = not section.question_ids and not is_html_empty(section.description)
+ if contains_active_question or is_description_section:
+ return section
+ return Question
+
+ def _is_last_page_or_question(self, user_input, page_or_question):
+ """ This method checks if the given question or page is the last one.
+ This includes conditional questions configuration. If the given question is normally not the last one but
+ every following questions are inactive due to conditional questions configurations (and user choices),
+ the given question will be the last one, except if the given question is conditioning at least
+ one of the following questions.
+ For section, we check in each following section if there is an active question.
+ If yes, the given page is not the last one.
+ """
+ pages_or_questions = self._get_pages_or_questions(user_input)
+ current_page_index = pages_or_questions.ids.index(page_or_question.id)
+ next_page_or_question_candidates = pages_or_questions[current_page_index + 1:]
+ if next_page_or_question_candidates:
+ inactive_questions = user_input._get_inactive_conditional_questions()
+ triggering_answer_by_question, triggered_questions_by_answer, selected_answers = user_input._get_conditional_values()
+ if self.questions_layout == 'page_per_question':
+ next_active_question = any(next_question not in inactive_questions for next_question in next_page_or_question_candidates)
+ is_triggering_question = any(triggering_answer in triggered_questions_by_answer.keys() for triggering_answer in page_or_question.suggested_answer_ids)
+ return not(next_active_question or is_triggering_question)
+ elif self.questions_layout == 'page_per_section':
+ is_triggering_section = False
+ for question in page_or_question.question_ids:
+ if any(triggering_answer in triggered_questions_by_answer.keys() for triggering_answer in
+ question.suggested_answer_ids):
+ is_triggering_section = True
+ break
+ next_active_question = False
+ for section in next_page_or_question_candidates:
+ next_active_question = any(next_question not in inactive_questions for next_question in section.question_ids)
+ if next_active_question:
+ break
+ return not(next_active_question or is_triggering_section)
+
+ return True
+
+ def _get_survey_questions(self, answer=None, page_id=None, question_id=None):
+ """ Returns a tuple containing: the survey question and the passed question_id / page_id
+ based on the question_layout and the fact that it's a session or not.
+
+ Breakdown of use cases:
+ - We are currently running a session
+ We return the current session question and it's id
+ - The layout is page_per_section
+ We return the questions for that page and the passed page_id
+ - The layout is page_per_question
+ We return the question for the passed question_id and the question_id
+ - The layout is one_page
+ We return all the questions of the survey and None
+
+ In addition, we cross the returned questions with the answer.predefined_question_ids,
+ that allows to handle the randomization of questions. """
+
+ questions, page_or_question_id = None, None
+
+ if answer and answer.is_session_answer:
+ return self.session_question_id, self.session_question_id.id
+ if self.questions_layout == 'page_per_section':
+ if not page_id:
+ raise ValueError("Page id is needed for question layout 'page_per_section'")
+ page_id = int(page_id)
+ questions = self.env['survey.question'].sudo().search([('survey_id', '=', self.id), ('page_id', '=', page_id)])
+ page_or_question_id = page_id
+ elif self.questions_layout == 'page_per_question':
+ if not question_id:
+ raise ValueError("Question id is needed for question layout 'page_per_question'")
+ question_id = int(question_id)
+ questions = self.env['survey.question'].sudo().browse(question_id)
+ page_or_question_id = question_id
+ else:
+ questions = self.question_ids
+
+ # we need the intersection of the questions of this page AND the questions prepared for that user_input
+ # (because randomized surveys do not use all the questions of every page)
+ if answer:
+ questions = questions & answer.predefined_question_ids
+ return questions, page_or_question_id
+
+ # ------------------------------------------------------------
+ # CONDITIONAL QUESTIONS MANAGEMENT
+ # ------------------------------------------------------------
+
+ def _get_conditional_maps(self):
+ triggering_answer_by_question = {}
+ triggered_questions_by_answer = {}
+ for question in self.question_ids:
+ triggering_answer_by_question[question] = question.is_conditional and question.triggering_answer_id
+
+ if question.is_conditional:
+ if question.triggering_answer_id in triggered_questions_by_answer:
+ triggered_questions_by_answer[question.triggering_answer_id] |= question
+ else:
+ triggered_questions_by_answer[question.triggering_answer_id] = question
+ return triggering_answer_by_question, triggered_questions_by_answer
+
+ # ------------------------------------------------------------
+ # SESSIONS MANAGEMENT
+ # ------------------------------------------------------------
+
+ def _session_open(self):
+ """ The session start is sudo'ed to allow survey user to manage sessions of surveys
+ they do not own.
+
+ We flush after writing to make sure it's updated before bus takes over. """
+
+ if self.env.user.has_group('survey.group_survey_user'):
+ self.sudo().write({'session_state': 'in_progress'})
+ self.sudo().flush(['session_state'])
+
+ def _get_session_next_question(self):
+ self.ensure_one()
+
+ if not self.question_ids or not self.env.user.has_group('survey.group_survey_user'):
+ return
+
+ most_voted_answers = self._get_session_most_voted_answers()
+ return self._get_next_page_or_question(
+ most_voted_answers,
+ self.session_question_id.id if self.session_question_id else 0)
+
+ def _get_session_most_voted_answers(self):
+ """ In sessions of survey that has conditional questions, as the survey is passed at the same time by
+ many users, we need to extract the most chosen answers, to determine the next questions to display. """
+
+ # get user_inputs from current session
+ current_user_inputs = self.user_input_ids.filtered(lambda input: input.create_date > self.session_start_time)
+ current_user_input_lines = current_user_inputs.mapped('user_input_line_ids').filtered(lambda answer: answer.suggested_answer_id)
+
+ # count the number of vote per answer
+ votes_by_answer = dict.fromkeys(current_user_input_lines.mapped('suggested_answer_id'), 0)
+ for answer in current_user_input_lines:
+ votes_by_answer[answer.suggested_answer_id] += 1
+
+ # extract most voted answer for each question
+ most_voted_answer_by_questions = dict.fromkeys(current_user_input_lines.mapped('question_id'))
+ for question in most_voted_answer_by_questions.keys():
+ for answer in votes_by_answer.keys():
+ if answer.question_id != question:
+ continue
+ most_voted_answer = most_voted_answer_by_questions[question]
+ if not most_voted_answer or votes_by_answer[most_voted_answer] < votes_by_answer[answer]:
+ most_voted_answer_by_questions[question] = answer
+
+ # return a fake 'audience' user_input
+ fake_user_input = self.env['survey.user_input'].new({
+ 'survey_id': self.id,
+ 'predefined_question_ids': [(6, 0, self._prepare_user_input_predefined_questions().ids)]
+ })
+
+ fake_user_input_lines = self.env['survey.user_input.line']
+ for question, answer in most_voted_answer_by_questions.items():
+ fake_user_input_lines |= self.env['survey.user_input.line'].new({
+ 'question_id': question.id,
+ 'suggested_answer_id': answer.id,
+ 'survey_id': self.id,
+ 'user_input_id': fake_user_input.id
+ })
+
+ return fake_user_input
+
+ def _prepare_leaderboard_values(self):
+ """" The leaderboard is descending and takes the total of the attendee points minus the
+ current question score.
+ We need both the total and the current question points to be able to show the attendees
+ leaderboard and shift their position based on the score they have on the current question.
+ This prepares a structure containing all the necessary data for the animations done on
+ the frontend side.
+ The leaderboard is sorted based on attendees score *before* the current question.
+ The frontend will shift positions around accordingly. """
+
+ self.ensure_one()
+
+ leaderboard = self.env['survey.user_input'].search_read([
+ ('survey_id', '=', self.id),
+ ('create_date', '>=', self.session_start_time)
+ ], [
+ 'id',
+ 'nickname',
+ 'scoring_total',
+ ], limit=15, order="scoring_total desc")
+
+ if leaderboard and self.session_state == 'in_progress' and \
+ any(answer.answer_score for answer in self.session_question_id.suggested_answer_ids):
+ question_scores = {}
+ input_lines = self.env['survey.user_input.line'].search_read(
+ [('user_input_id', 'in', [score['id'] for score in leaderboard]),
+ ('question_id', '=', self.session_question_id.id)],
+ ['user_input_id', 'answer_score'])
+ for input_line in input_lines:
+ question_scores[input_line['user_input_id'][0]] = \
+ question_scores.get(input_line['user_input_id'][0], 0) + input_line['answer_score']
+
+ score_position = 0
+ for leaderboard_item in leaderboard:
+ question_score = question_scores.get(leaderboard_item['id'], 0)
+ leaderboard_item.update({
+ 'updated_score': leaderboard_item['scoring_total'],
+ 'scoring_total': leaderboard_item['scoring_total'] - question_score,
+ 'leaderboard_position': score_position,
+ 'max_question_score': sum(
+ score for score in self.session_question_id.suggested_answer_ids.mapped('answer_score')
+ if score > 0
+ ) or 1,
+ 'question_score': question_score
+ })
+ score_position += 1
+ leaderboard = sorted(
+ leaderboard,
+ key=lambda score: score['scoring_total'],
+ reverse=True)
+
+ return leaderboard
+
+
+ # ------------------------------------------------------------
+ # ACTIONS
+ # ------------------------------------------------------------
+
+ def action_draft(self):
+ self.write({'state': 'draft'})
+
+ def action_open(self):
+ self.write({'state': 'open'})
+
+ def action_close(self):
+ self.write({'state': 'closed'})
+
+ def action_send_survey(self):
+ """ Open a window to compose an email, pre-filled with the survey message """
+ # Ensure that this survey has at least one question.
+ if not self.question_ids:
+ raise UserError(_('You cannot send an invitation for a survey that has no questions.'))
+
+ # Ensure that this survey has at least one section with question(s), if question layout is 'One page per section'.
+ if self.questions_layout == 'page_per_section':
+ if not self.page_ids:
+ raise UserError(_('You cannot send an invitation for a "One page per section" survey if the survey has no sections.'))
+ if not self.page_ids.mapped('question_ids'):
+ raise UserError(_('You cannot send an invitation for a "One page per section" survey if the survey only contains empty sections.'))
+
+ if self.state == 'closed':
+ raise exceptions.UserError(_("You cannot send invitations for closed surveys."))
+
+ template = self.env.ref('survey.mail_template_user_input_invite', raise_if_not_found=False)
+
+ local_context = dict(
+ self.env.context,
+ default_survey_id=self.id,
+ default_use_template=bool(template),
+ default_template_id=template and template.id or False,
+ notif_layout='mail.mail_notification_light',
+ )
+ return {
+ 'type': 'ir.actions.act_window',
+ 'view_mode': 'form',
+ 'res_model': 'survey.invite',
+ 'target': 'new',
+ 'context': local_context,
+ }
+
+ def action_start_survey(self, answer=None):
+ """ Open the website page with the survey form """
+ self.ensure_one()
+ url = '%s?%s' % (self.get_start_url(), werkzeug.urls.url_encode({'answer_token': answer and answer.access_token or None}))
+ return {
+ 'type': 'ir.actions.act_url',
+ 'name': "Start Survey",
+ 'target': 'self',
+ 'url': url,
+ }
+
+ def action_print_survey(self, answer=None):
+ """ Open the website page with the survey printable view """
+ self.ensure_one()
+ url = '%s?%s' % (self.get_print_url(), werkzeug.urls.url_encode({'answer_token': answer and answer.access_token or None}))
+ return {
+ 'type': 'ir.actions.act_url',
+ 'name': "Print Survey",
+ 'target': 'self',
+ 'url': url
+ }
+
+ def action_result_survey(self):
+ """ Open the website page with the survey results view """
+ self.ensure_one()
+ return {
+ 'type': 'ir.actions.act_url',
+ 'name': "Results of the Survey",
+ 'target': 'self',
+ 'url': '/survey/results/%s' % self.id
+ }
+
+ def action_test_survey(self):
+ ''' Open the website page with the survey form into test mode'''
+ self.ensure_one()
+ return {
+ 'type': 'ir.actions.act_url',
+ 'name': "Test Survey",
+ 'target': 'self',
+ 'url': '/survey/test/%s' % self.access_token,
+ }
+
+ def action_survey_user_input_completed(self):
+ action = self.env['ir.actions.act_window']._for_xml_id('survey.action_survey_user_input')
+ ctx = dict(self.env.context)
+ ctx.update({'search_default_survey_id': self.ids[0],
+ 'search_default_completed': 1,
+ 'search_default_not_test': 1})
+ action['context'] = ctx
+ return action
+
+ def action_survey_user_input_certified(self):
+ action = self.env['ir.actions.act_window']._for_xml_id('survey.action_survey_user_input')
+ ctx = dict(self.env.context)
+ ctx.update({'search_default_survey_id': self.ids[0],
+ 'search_default_scoring_success': 1,
+ 'search_default_not_test': 1})
+ action['context'] = ctx
+ return action
+
+ def action_survey_user_input(self):
+ action = self.env['ir.actions.act_window']._for_xml_id('survey.action_survey_user_input')
+ ctx = dict(self.env.context)
+ ctx.update({'search_default_survey_id': self.ids[0],
+ 'search_default_not_test': 1})
+ action['context'] = ctx
+ return action
+
+ def action_survey_preview_certification_template(self):
+ self.ensure_one()
+ return {
+ 'type': 'ir.actions.act_url',
+ 'target': '_blank',
+ 'url': '/survey/%s/get_certification_preview' % (self.id)
+ }
+
+ def action_start_session(self):
+ """ Sets the necessary fields for the session to take place and starts it.
+ The write is sudo'ed because a survey user can start a session even if it's
+ not his own survey. """
+
+ if not self.env.user.has_group('survey.group_survey_user'):
+ raise AccessError(_('Only survey users can manage sessions.'))
+
+ self.ensure_one()
+ self.sudo().write({
+ 'questions_layout': 'page_per_question',
+ 'session_start_time': fields.Datetime.now(),
+ 'session_question_id': None,
+ 'session_state': 'ready'
+ })
+ return self.action_open_session_manager()
+
+ def action_open_session_manager(self):
+ self.ensure_one()
+
+ return {
+ 'type': 'ir.actions.act_url',
+ 'name': "Open Session Manager",
+ 'target': 'self',
+ 'url': '/survey/session/manage/%s' % self.access_token
+ }
+
+ def action_end_session(self):
+ """ The write is sudo'ed because a survey user can end a session even if it's
+ not his own survey. """
+
+ if not self.env.user.has_group('survey.group_survey_user'):
+ raise AccessError(_('Only survey users can manage sessions.'))
+
+ self.sudo().write({'session_state': False})
+ self.user_input_ids.sudo().write({'state': 'done'})
+ self.env['bus.bus'].sendone(self.access_token, {'type': 'end_session'})
+
+ def get_start_url(self):
+ return '/survey/start/%s' % self.access_token
+
+ def get_start_short_url(self):
+ """ See controller method docstring for more details. """
+ return '/s/%s' % self.access_token[:6]
+
+ def get_print_url(self):
+ return '/survey/print/%s' % self.access_token
+
+ # ------------------------------------------------------------
+ # GRAPH / RESULTS
+ # ------------------------------------------------------------
+
+ def _prepare_statistics(self, user_input_lines=None):
+ if user_input_lines:
+ user_input_domain = [
+ ('survey_id', 'in', self.ids),
+ ('id', 'in', user_input_lines.mapped('user_input_id').ids)
+ ]
+ else:
+ user_input_domain = [
+ ('survey_id', 'in', self.ids),
+ ('state', '=', 'done'),
+ ('test_entry', '=', False)
+ ]
+ count_data = self.env['survey.user_input'].sudo().read_group(user_input_domain, ['scoring_success', 'id:count_distinct'], ['scoring_success'])
+
+ scoring_success_count = 0
+ scoring_failed_count = 0
+ for count_data_item in count_data:
+ if count_data_item['scoring_success']:
+ scoring_success_count += count_data_item['scoring_success_count']
+ else:
+ scoring_failed_count += count_data_item['scoring_success_count']
+
+ success_graph = json.dumps([{
+ 'text': _('Passed'),
+ 'count': scoring_success_count,
+ 'color': '#2E7D32'
+ }, {
+ 'text': _('Missed'),
+ 'count': scoring_failed_count,
+ 'color': '#C62828'
+ }])
+
+ total = scoring_success_count + scoring_failed_count
+ return {
+ 'global_success_rate': round((scoring_success_count / total) * 100, 1) if total > 0 else 0,
+ 'global_success_graph': success_graph
+ }
+
+ # ------------------------------------------------------------
+ # GAMIFICATION / BADGES
+ # ------------------------------------------------------------
+
+ def _prepare_challenge_category(self):
+ return 'certification'
+
+ def _create_certification_badge_trigger(self):
+ self.ensure_one()
+ goal = self.env['gamification.goal.definition'].create({
+ 'name': self.title,
+ 'description': _("%s certification passed", self.title),
+ 'domain': "['&', ('survey_id', '=', %s), ('scoring_success', '=', True)]" % self.id,
+ 'computation_mode': 'count',
+ 'display_mode': 'boolean',
+ 'model_id': self.env.ref('survey.model_survey_user_input').id,
+ 'condition': 'higher',
+ 'batch_mode': True,
+ 'batch_distinctive_field': self.env.ref('survey.field_survey_user_input__partner_id').id,
+ 'batch_user_expression': 'user.partner_id.id'
+ })
+ challenge = self.env['gamification.challenge'].create({
+ 'name': _('%s challenge certification', self.title),
+ 'reward_id': self.certification_badge_id.id,
+ 'state': 'inprogress',
+ 'period': 'once',
+ 'challenge_category': self._prepare_challenge_category(),
+ 'reward_realtime': True,
+ 'report_message_frequency': 'never',
+ 'user_domain': [('karma', '>', 0)],
+ 'visibility_mode': 'personal'
+ })
+ self.env['gamification.challenge.line'].create({
+ 'definition_id': goal.id,
+ 'challenge_id': challenge.id,
+ 'target_goal': 1
+ })
+
+ def _handle_certification_badges(self, vals):
+ if vals.get('certification_give_badge'):
+ # If badge already set on records, reactivate the ones that are not active.
+ surveys_with_badge = self.filtered(lambda survey: survey.certification_badge_id and not survey.certification_badge_id.active)
+ surveys_with_badge.mapped('certification_badge_id').action_unarchive()
+ # (re-)create challenge and goal
+ for survey in self:
+ survey._create_certification_badge_trigger()
+ else:
+ # if badge with owner : archive them, else delete everything (badge, challenge, goal)
+ badges = self.mapped('certification_badge_id')
+ challenges_to_delete = self.env['gamification.challenge'].search([('reward_id', 'in', badges.ids)])
+ goals_to_delete = challenges_to_delete.mapped('line_ids').mapped('definition_id')
+ badges.action_archive()
+ # delete all challenges and goals because not needed anymore (challenge lines are deleted in cascade)
+ challenges_to_delete.unlink()
+ goals_to_delete.unlink()
diff --git a/addons/survey/models/survey_user.py b/addons/survey/models/survey_user.py
new file mode 100644
index 00000000..9e4b4659
--- /dev/null
+++ b/addons/survey/models/survey_user.py
@@ -0,0 +1,628 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+import logging
+import uuid
+
+from dateutil.relativedelta import relativedelta
+
+from odoo import api, fields, models, _
+from odoo.exceptions import ValidationError
+from odoo.tools import float_is_zero
+
+_logger = logging.getLogger(__name__)
+
+
+class SurveyUserInput(models.Model):
+ """ Metadata for a set of one user's answers to a particular survey """
+ _name = "survey.user_input"
+ _rec_name = 'survey_id'
+ _description = 'Survey User Input'
+
+ # answer description
+ survey_id = fields.Many2one('survey.survey', string='Survey', required=True, readonly=True, ondelete='cascade')
+ scoring_type = fields.Selection(string="Scoring", related="survey_id.scoring_type")
+ start_datetime = fields.Datetime('Start date and time', readonly=True)
+ deadline = fields.Datetime('Deadline', help="Datetime until customer can open the survey and submit answers")
+ state = fields.Selection([
+ ('new', 'Not started yet'),
+ ('in_progress', 'In Progress'),
+ ('done', 'Completed')], string='Status', default='new', readonly=True)
+ test_entry = fields.Boolean(readonly=True)
+ last_displayed_page_id = fields.Many2one('survey.question', string='Last displayed question/page')
+ # attempts management
+ is_attempts_limited = fields.Boolean("Limited number of attempts", related='survey_id.is_attempts_limited')
+ attempts_limit = fields.Integer("Number of attempts", related='survey_id.attempts_limit')
+ attempts_number = fields.Integer("Attempt n°", compute='_compute_attempts_number')
+ survey_time_limit_reached = fields.Boolean("Survey Time Limit Reached", compute='_compute_survey_time_limit_reached')
+ # identification / access
+ access_token = fields.Char('Identification token', default=lambda self: str(uuid.uuid4()), readonly=True, required=True, copy=False)
+ invite_token = fields.Char('Invite token', readonly=True, copy=False) # no unique constraint, as it identifies a pool of attempts
+ partner_id = fields.Many2one('res.partner', string='Partner', readonly=True)
+ email = fields.Char('Email', readonly=True)
+ nickname = fields.Char('Nickname', help="Attendee nickname, mainly used to identify him in the survey session leaderboard.")
+ # questions / answers
+ user_input_line_ids = fields.One2many('survey.user_input.line', 'user_input_id', string='Answers', copy=True)
+ predefined_question_ids = fields.Many2many('survey.question', string='Predefined Questions', readonly=True)
+ scoring_percentage = fields.Float("Score (%)", compute="_compute_scoring_values", store=True, compute_sudo=True) # stored for perf reasons
+ scoring_total = fields.Float("Total Score", compute="_compute_scoring_values", store=True, compute_sudo=True) # stored for perf reasons
+ scoring_success = fields.Boolean('Quizz Passed', compute='_compute_scoring_success', store=True, compute_sudo=True) # stored for perf reasons
+ # live sessions
+ is_session_answer = fields.Boolean('Is in a Session', help="Is that user input part of a survey session or not.")
+ question_time_limit_reached = fields.Boolean("Question Time Limit Reached", compute='_compute_question_time_limit_reached')
+
+ _sql_constraints = [
+ ('unique_token', 'UNIQUE (access_token)', 'An access token must be unique!'),
+ ]
+
+ @api.depends('user_input_line_ids.answer_score', 'user_input_line_ids.question_id', 'predefined_question_ids.answer_score')
+ def _compute_scoring_values(self):
+ for user_input in self:
+ # sum(multi-choice question scores) + sum(simple answer_type scores)
+ total_possible_score = 0
+ for question in user_input.predefined_question_ids:
+ if question.question_type in ['simple_choice', 'multiple_choice']:
+ total_possible_score += sum(score for score in question.mapped('suggested_answer_ids.answer_score') if score > 0)
+ elif question.is_scored_question:
+ total_possible_score += question.answer_score
+
+ if total_possible_score == 0:
+ user_input.scoring_percentage = 0
+ user_input.scoring_total = 0
+ else:
+ score_total = sum(user_input.user_input_line_ids.mapped('answer_score'))
+ user_input.scoring_total = score_total
+ score_percentage = (score_total / total_possible_score) * 100
+ user_input.scoring_percentage = round(score_percentage, 2) if score_percentage > 0 else 0
+
+ @api.depends('scoring_percentage', 'survey_id')
+ def _compute_scoring_success(self):
+ for user_input in self:
+ user_input.scoring_success = user_input.scoring_percentage >= user_input.survey_id.scoring_success_min
+
+ @api.depends(
+ 'start_datetime',
+ 'survey_id.is_time_limited',
+ 'survey_id.time_limit')
+ def _compute_survey_time_limit_reached(self):
+ """ Checks that the user_input is not exceeding the survey's time limit. """
+ for user_input in self:
+ if not user_input.is_session_answer and user_input.start_datetime:
+ start_time = user_input.start_datetime
+ time_limit = user_input.survey_id.time_limit
+ user_input.survey_time_limit_reached = user_input.survey_id.is_time_limited and \
+ fields.Datetime.now() >= start_time + relativedelta(minutes=time_limit)
+ else:
+ user_input.survey_time_limit_reached = False
+
+ @api.depends(
+ 'survey_id.session_question_id.time_limit',
+ 'survey_id.session_question_id.is_time_limited',
+ 'survey_id.session_question_start_time')
+ def _compute_question_time_limit_reached(self):
+ """ Checks that the user_input is not exceeding the question's time limit.
+ Only used in the context of survey sessions. """
+ for user_input in self:
+ if user_input.is_session_answer and user_input.survey_id.session_question_start_time:
+ start_time = user_input.survey_id.session_question_start_time
+ time_limit = user_input.survey_id.session_question_id.time_limit
+ user_input.question_time_limit_reached = user_input.survey_id.session_question_id.is_time_limited and \
+ fields.Datetime.now() >= start_time + relativedelta(seconds=time_limit)
+ else:
+ user_input.question_time_limit_reached = False
+
+ @api.depends('state', 'test_entry', 'survey_id.is_attempts_limited', 'partner_id', 'email', 'invite_token')
+ def _compute_attempts_number(self):
+ attempts_to_compute = self.filtered(
+ lambda user_input: user_input.state == 'done' and not user_input.test_entry and user_input.survey_id.is_attempts_limited
+ )
+
+ for user_input in (self - attempts_to_compute):
+ user_input.attempts_number = 1
+
+ if attempts_to_compute:
+ self.env.cr.execute("""SELECT user_input.id, (COUNT(previous_user_input.id) + 1) AS attempts_number
+ FROM survey_user_input user_input
+ LEFT OUTER JOIN survey_user_input previous_user_input
+ ON user_input.survey_id = previous_user_input.survey_id
+ AND previous_user_input.state = 'done'
+ AND previous_user_input.test_entry IS NOT TRUE
+ AND previous_user_input.id < user_input.id
+ AND (user_input.invite_token IS NULL OR user_input.invite_token = previous_user_input.invite_token)
+ AND (user_input.partner_id = previous_user_input.partner_id OR user_input.email = previous_user_input.email)
+ WHERE user_input.id IN %s
+ GROUP BY user_input.id;
+ """, (tuple(attempts_to_compute.ids),))
+
+ attempts_count_results = self.env.cr.dictfetchall()
+
+ for user_input in attempts_to_compute:
+ attempts_number = 1
+ for attempts_count_result in attempts_count_results:
+ if attempts_count_result['id'] == user_input.id:
+ attempts_number = attempts_count_result['attempts_number']
+ break
+
+ user_input.attempts_number = attempts_number
+
+ @api.model_create_multi
+ def create(self, vals_list):
+ for vals in vals_list:
+ if 'predefined_question_ids' not in vals:
+ suvey_id = vals.get('survey_id', self.env.context.get('default_survey_id'))
+ survey = self.env['survey.survey'].browse(suvey_id)
+ vals['predefined_question_ids'] = [(6, 0, survey._prepare_user_input_predefined_questions().ids)]
+ return super(SurveyUserInput, self).create(vals_list)
+
+ # ------------------------------------------------------------
+ # ACTIONS / BUSINESS
+ # ------------------------------------------------------------
+
+ def action_resend(self):
+ partners = self.env['res.partner']
+ emails = []
+ for user_answer in self:
+ if user_answer.partner_id:
+ partners |= user_answer.partner_id
+ elif user_answer.email:
+ emails.append(user_answer.email)
+
+ return self.survey_id.with_context(
+ default_existing_mode='resend',
+ default_partner_ids=partners.ids,
+ default_emails=','.join(emails)
+ ).action_send_survey()
+
+ def action_print_answers(self):
+ """ Open the website page with the survey form """
+ self.ensure_one()
+ return {
+ 'type': 'ir.actions.act_url',
+ 'name': "View Answers",
+ 'target': 'self',
+ 'url': '/survey/print/%s?answer_token=%s' % (self.survey_id.access_token, self.access_token)
+ }
+
+ @api.model
+ def _generate_invite_token(self):
+ return str(uuid.uuid4())
+
+ def _mark_in_progress(self):
+ """ marks the state as 'in_progress' and updates the start_datetime accordingly. """
+ self.write({
+ 'start_datetime': fields.Datetime.now(),
+ 'state': 'in_progress'
+ })
+
+ def _mark_done(self):
+ """ This method will:
+ 1. mark the state as 'done'
+ 2. send the certification email with attached document if
+ - The survey is a certification
+ - It has a certification_mail_template_id set
+ - The user succeeded the test
+ Will also run challenge Cron to give the certification badge if any."""
+ self.write({'state': 'done'})
+ Challenge = self.env['gamification.challenge'].sudo()
+ badge_ids = []
+ for user_input in self:
+ if user_input.survey_id.certification and user_input.scoring_success:
+ if user_input.survey_id.certification_mail_template_id and not user_input.test_entry:
+ user_input.survey_id.certification_mail_template_id.send_mail(user_input.id, notif_layout="mail.mail_notification_light")
+ if user_input.survey_id.certification_give_badge:
+ badge_ids.append(user_input.survey_id.certification_badge_id.id)
+
+ # Update predefined_question_id to remove inactive questions
+ user_input.predefined_question_ids -= user_input._get_inactive_conditional_questions()
+
+ if badge_ids:
+ challenges = Challenge.search([('reward_id', 'in', badge_ids)])
+ if challenges:
+ Challenge._cron_update(ids=challenges.ids, commit=False)
+
+ def get_start_url(self):
+ self.ensure_one()
+ return '%s?answer_token=%s' % (self.survey_id.get_start_url(), self.access_token)
+
+ def get_print_url(self):
+ self.ensure_one()
+ return '%s?answer_token=%s' % (self.survey_id.get_print_url(), self.access_token)
+
+ # ------------------------------------------------------------
+ # CREATE / UPDATE LINES FROM SURVEY FRONTEND INPUT
+ # ------------------------------------------------------------
+
+ def save_lines(self, question, answer, comment=None):
+ """ Save answers to questions, depending on question type
+
+ If an answer already exists for question and user_input_id, it will be
+ overwritten (or deleted for 'choice' questions) (in order to maintain data consistency).
+ """
+ old_answers = self.env['survey.user_input.line'].search([
+ ('user_input_id', '=', self.id),
+ ('question_id', '=', question.id)
+ ])
+
+ if question.question_type in ['char_box', 'text_box', 'numerical_box', 'date', 'datetime']:
+ self._save_line_simple_answer(question, old_answers, answer)
+ if question.save_as_email and answer:
+ self.write({'email': answer})
+ if question.save_as_nickname and answer:
+ self.write({'nickname': answer})
+
+ elif question.question_type in ['simple_choice', 'multiple_choice']:
+ self._save_line_choice(question, old_answers, answer, comment)
+ elif question.question_type == 'matrix':
+ self._save_line_matrix(question, old_answers, answer, comment)
+ else:
+ raise AttributeError(question.question_type + ": This type of question has no saving function")
+
+ def _save_line_simple_answer(self, question, old_answers, answer):
+ vals = self._get_line_answer_values(question, answer, question.question_type)
+ if old_answers:
+ old_answers.write(vals)
+ return old_answers
+ else:
+ return self.env['survey.user_input.line'].create(vals)
+
+ def _save_line_choice(self, question, old_answers, answers, comment):
+ if not (isinstance(answers, list)):
+ answers = [answers]
+ vals_list = []
+
+ if question.question_type == 'simple_choice':
+ if not question.comment_count_as_answer or not question.comments_allowed or not comment:
+ vals_list = [self._get_line_answer_values(question, answer, 'suggestion') for answer in answers]
+ elif question.question_type == 'multiple_choice':
+ vals_list = [self._get_line_answer_values(question, answer, 'suggestion') for answer in answers]
+
+ if comment:
+ vals_list.append(self._get_line_comment_values(question, comment))
+
+ old_answers.sudo().unlink()
+ return self.env['survey.user_input.line'].create(vals_list)
+
+ def _save_line_matrix(self, question, old_answers, answers, comment):
+ vals_list = []
+
+ if answers:
+ for row_key, row_answer in answers.items():
+ for answer in row_answer:
+ vals = self._get_line_answer_values(question, answer, 'suggestion')
+ vals['matrix_row_id'] = int(row_key)
+ vals_list.append(vals.copy())
+
+ if comment:
+ vals_list.append(self._get_line_comment_values(question, comment))
+
+ old_answers.sudo().unlink()
+ return self.env['survey.user_input.line'].create(vals_list)
+
+ def _get_line_answer_values(self, question, answer, answer_type):
+ vals = {
+ 'user_input_id': self.id,
+ 'question_id': question.id,
+ 'skipped': False,
+ 'answer_type': answer_type,
+ }
+ if not answer or (isinstance(answer, str) and not answer.strip()):
+ vals.update(answer_type=None, skipped=True)
+ return vals
+
+ if answer_type == 'suggestion':
+ vals['suggested_answer_id'] = int(answer)
+ elif answer_type == 'numerical_box':
+ vals['value_numerical_box'] = float(answer)
+ else:
+ vals['value_%s' % answer_type] = answer
+ return vals
+
+ def _get_line_comment_values(self, question, comment):
+ return {
+ 'user_input_id': self.id,
+ 'question_id': question.id,
+ 'skipped': False,
+ 'answer_type': 'char_box',
+ 'value_char_box': comment,
+ }
+
+ # ------------------------------------------------------------
+ # STATISTICS / RESULTS
+ # ------------------------------------------------------------
+
+ def _prepare_statistics(self):
+ res = dict((user_input, {
+ 'correct': 0,
+ 'incorrect': 0,
+ 'partial': 0,
+ 'skipped': 0,
+ }) for user_input in self)
+
+ scored_questions = self.mapped('predefined_question_ids').filtered(lambda question: question.is_scored_question)
+
+ for question in scored_questions:
+ if question.question_type in ['simple_choice', 'multiple_choice']:
+ question_correct_suggested_answers = question.suggested_answer_ids.filtered(lambda answer: answer.is_correct)
+ for user_input in self:
+ user_input_lines = user_input.user_input_line_ids.filtered(lambda line: line.question_id == question)
+ if question.question_type in ['simple_choice', 'multiple_choice']:
+ res[user_input][self._choice_question_answer_result(user_input_lines, question_correct_suggested_answers)] += 1
+ else:
+ res[user_input][self._simple_question_answer_result(user_input_lines)] += 1
+
+ return [[
+ {'text': _("Correct"), 'count': res[user_input]['correct']},
+ {'text': _("Partially"), 'count': res[user_input]['partial']},
+ {'text': _("Incorrect"), 'count': res[user_input]['incorrect']},
+ {'text': _("Unanswered"), 'count': res[user_input]['skipped']}
+ ] for user_input in self]
+
+ def _choice_question_answer_result(self, user_input_lines, question_correct_suggested_answers):
+ correct_user_input_lines = user_input_lines.filtered(lambda line: line.answer_is_correct and not line.skipped).mapped('suggested_answer_id')
+ incorrect_user_input_lines = user_input_lines.filtered(lambda line: not line.answer_is_correct and not line.skipped)
+ if question_correct_suggested_answers and correct_user_input_lines == question_correct_suggested_answers:
+ return 'correct'
+ elif correct_user_input_lines and correct_user_input_lines < question_correct_suggested_answers:
+ return 'partial'
+ elif not correct_user_input_lines and incorrect_user_input_lines:
+ return 'incorrect'
+ else:
+ return 'skipped'
+
+ def _simple_question_answer_result(self, user_input_line):
+ if user_input_line.skipped:
+ return 'skipped'
+ elif user_input_line.answer_is_correct:
+ return 'correct'
+ else:
+ return 'incorrect'
+
+ # ------------------------------------------------------------
+ # Conditional Questions Management
+ # ------------------------------------------------------------
+
+ def _get_conditional_values(self):
+ """ For survey containing conditional questions, we need a triggered_questions_by_answer map that contains
+ {key: answer, value: the question that the answer triggers, if selected},
+ The idea is to be able to verify, on every answer check, if this answer is triggering the display
+ of another question.
+ If answer is not in the conditional map:
+ - nothing happens.
+ If the answer is in the conditional map:
+ - If we are in ONE PAGE survey : (handled at CLIENT side)
+ -> display immediately the depending question
+ - If we are in PAGE PER SECTION : (handled at CLIENT side)
+ - If related question is on the same page :
+ -> display immediately the depending question
+ - If the related question is not on the same page :
+ -> keep the answers in memory and check at next page load if the depending question is in there and
+ display it, if so.
+ - If we are in PAGE PER QUESTION : (handled at SERVER side)
+ -> During submit, determine which is the next question to display getting the next question
+ that is the next in sequence and that is either not triggered by another question's answer, or that
+ is triggered by an already selected answer.
+ To do all this, we need to return:
+ - list of all selected answers: [answer_id1, answer_id2, ...] (for survey reloading, otherwise, this list is
+ updated at client side)
+ - triggered_questions_by_answer: dict -> for a given answer, list of questions triggered by this answer;
+ Used mainly for dynamic show/hide behaviour at client side
+ - triggering_answer_by_question: dict -> for a given question, the answer that triggers it
+ Used mainly to ease template rendering
+ """
+ triggering_answer_by_question, triggered_questions_by_answer = {}, {}
+ # Ignore conditional configuration if randomised questions selection
+ if self.survey_id.questions_selection != 'random':
+ triggering_answer_by_question, triggered_questions_by_answer = self.survey_id._get_conditional_maps()
+ selected_answers = self._get_selected_suggested_answers()
+
+ return triggering_answer_by_question, triggered_questions_by_answer, selected_answers
+
+ def _get_selected_suggested_answers(self):
+ """
+ For now, only simple and multiple choices question type are handled by the conditional questions feature.
+ Mapping all the suggested answers selected by the user will also include answers from matrix question type,
+ Those ones won't be used.
+ Maybe someday, conditional questions feature will be extended to work with matrix question.
+ :return: all the suggested answer selected by the user.
+ """
+ return self.mapped('user_input_line_ids.suggested_answer_id')
+
+ def _clear_inactive_conditional_answers(self):
+ """
+ Clean eventual answers on conditional questions that should not have been displayed to user.
+ This method is used mainly for page per question survey, a similar method does the same treatment
+ at client side for the other survey layouts.
+ E.g.: if depending answer was uncheck after answering conditional question, we need to clear answers
+ of that conditional question, for two reasons:
+ - ensure correct scoring
+ - if the selected answer triggers another question later in the survey, if the answer is not cleared,
+ a question that should not be displayed to the user will be.
+
+ TODO DBE: Maybe this can be the only cleaning method, even for section_per_page or one_page where
+ conditional questions are, for now, cleared in JS directly. But this can be annoying if user typed a long
+ answer, changed his mind unchecking depending answer and changed again his mind by rechecking the depending
+ answer -> For now, the long answer will be lost. If we use this as the master cleaning method,
+ long answer will be cleared only during submit.
+ """
+ inactive_questions = self._get_inactive_conditional_questions()
+
+ # delete user.input.line on question that should not be answered.
+ answers_to_delete = self.user_input_line_ids.filtered(lambda answer: answer.question_id in inactive_questions)
+ answers_to_delete.unlink()
+
+ def _get_inactive_conditional_questions(self):
+ triggering_answer_by_question, triggered_questions_by_answer, selected_answers = self._get_conditional_values()
+
+ # get questions that should not be answered
+ inactive_questions = self.env['survey.question']
+ for answer in triggered_questions_by_answer.keys():
+ if answer not in selected_answers:
+ for question in triggered_questions_by_answer[answer]:
+ inactive_questions |= question
+ return inactive_questions
+
+ def _get_print_questions(self):
+ """ Get the questions to display : the ones that should have been answered = active questions
+ In case of session, active questions are based on most voted answers
+ :return: active survey.question browse records
+ """
+ survey = self.survey_id
+ if self.is_session_answer:
+ most_voted_answers = survey._get_session_most_voted_answers()
+ inactive_questions = most_voted_answers._get_inactive_conditional_questions()
+ else:
+ inactive_questions = self._get_inactive_conditional_questions()
+ return survey.question_ids - inactive_questions
+
+
+class SurveyUserInputLine(models.Model):
+ _name = 'survey.user_input.line'
+ _description = 'Survey User Input Line'
+ _rec_name = 'user_input_id'
+ _order = 'question_sequence, id'
+
+ # survey data
+ user_input_id = fields.Many2one('survey.user_input', string='User Input', ondelete='cascade', required=True)
+ survey_id = fields.Many2one(related='user_input_id.survey_id', string='Survey', store=True, readonly=False)
+ question_id = fields.Many2one('survey.question', string='Question', ondelete='cascade', required=True)
+ page_id = fields.Many2one(related='question_id.page_id', string="Section", readonly=False)
+ question_sequence = fields.Integer('Sequence', related='question_id.sequence', store=True)
+ # answer
+ skipped = fields.Boolean('Skipped')
+ answer_type = fields.Selection([
+ ('text_box', 'Free Text'),
+ ('char_box', 'Text'),
+ ('numerical_box', 'Number'),
+ ('date', 'Date'),
+ ('datetime', 'Datetime'),
+ ('suggestion', 'Suggestion')], string='Answer Type')
+ value_char_box = fields.Char('Text answer')
+ value_numerical_box = fields.Float('Numerical answer')
+ value_date = fields.Date('Date answer')
+ value_datetime = fields.Datetime('Datetime answer')
+ value_text_box = fields.Text('Free Text answer')
+ suggested_answer_id = fields.Many2one('survey.question.answer', string="Suggested answer")
+ matrix_row_id = fields.Many2one('survey.question.answer', string="Row answer")
+ # scoring
+ answer_score = fields.Float('Score')
+ answer_is_correct = fields.Boolean('Correct')
+
+ @api.constrains('skipped', 'answer_type')
+ def _check_answer_type_skipped(self):
+ for line in self:
+ if (line.skipped == bool(line.answer_type)):
+ raise ValidationError(_('A question can either be skipped or answered, not both.'))
+
+ # allow 0 for numerical box
+ if line.answer_type == 'numerical_box' and float_is_zero(line['value_numerical_box'], precision_digits=6):
+ continue
+ if line.answer_type == 'suggestion':
+ field_name = 'suggested_answer_id'
+ elif line.answer_type:
+ field_name = 'value_%s' % line.answer_type
+ else: # skipped
+ field_name = False
+
+ if field_name and not line[field_name]:
+ raise ValidationError(_('The answer must be in the right type'))
+
+ @api.model_create_multi
+ def create(self, vals_list):
+ for vals in vals_list:
+ score_vals = self._get_answer_score_values(vals)
+ if not vals.get('answer_score'):
+ vals.update(score_vals)
+ return super(SurveyUserInputLine, self).create(vals_list)
+
+ def write(self, vals):
+ res = True
+ for line in self:
+ vals_copy = {**vals}
+ getter_params = {
+ 'user_input_id': line.user_input_id.id,
+ 'answer_type': line.answer_type,
+ 'question_id': line.question_id.id,
+ **vals_copy
+ }
+ score_vals = self._get_answer_score_values(getter_params, compute_speed_score=False)
+ if not vals_copy.get('answer_score'):
+ vals_copy.update(score_vals)
+ res = super(SurveyUserInputLine, line).write(vals_copy) and res
+ return res
+
+ @api.model
+ def _get_answer_score_values(self, vals, compute_speed_score=True):
+ """ Get values for: answer_is_correct and associated answer_score.
+
+ Requires vals to contain 'answer_type', 'question_id', and 'user_input_id'.
+ Depending on 'answer_type' additional value of 'suggested_answer_id' may also be
+ required.
+
+ Calculates whether an answer_is_correct and its score based on 'answer_type' and
+ corresponding question. Handles choice (answer_type == 'suggestion') questions
+ separately from other question types. Each selected choice answer is handled as an
+ individual answer.
+
+ If score depends on the speed of the answer, it is adjusted as follows:
+ - If the user answers in less than 2 seconds, they receive 100% of the possible points.
+ - If user answers after that, they receive 50% of the possible points + the remaining
+ 50% scaled by the time limit and time taken to answer [i.e. a minimum of 50% of the
+ possible points is given to all correct answers]
+
+ Example of returned values:
+ * {'answer_is_correct': False, 'answer_score': 0} (default)
+ * {'answer_is_correct': True, 'answer_score': 2.0}
+ """
+ user_input_id = vals.get('user_input_id')
+ answer_type = vals.get('answer_type')
+ question_id = vals.get('question_id')
+ if not question_id:
+ raise ValueError(_('Computing score requires a question in arguments.'))
+ question = self.env['survey.question'].browse(int(question_id))
+
+ # default and non-scored questions
+ answer_is_correct = False
+ answer_score = 0
+
+ # record selected suggested choice answer_score (can be: pos, neg, or 0)
+ if question.question_type in ['simple_choice', 'multiple_choice']:
+ if answer_type == 'suggestion':
+ suggested_answer_id = vals.get('suggested_answer_id')
+ if suggested_answer_id:
+ question_answer = self.env['survey.question.answer'].browse(int(suggested_answer_id))
+ answer_score = question_answer.answer_score
+ answer_is_correct = question_answer.is_correct
+ # for all other scored question cases, record question answer_score (can be: pos or 0)
+ elif question.is_scored_question:
+ answer = vals.get('value_%s' % answer_type)
+ if answer_type == 'numerical_box':
+ answer = float(answer)
+ elif answer_type == 'date':
+ answer = fields.Date.from_string(answer)
+ elif answer_type == 'datetime':
+ answer = fields.Datetime.from_string(answer)
+ if answer and answer == question['answer_%s' % answer_type]:
+ answer_is_correct = True
+ answer_score = question.answer_score
+
+ if compute_speed_score and answer_score > 0:
+ user_input = self.env['survey.user_input'].browse(user_input_id)
+ session_speed_rating = user_input.exists() and user_input.is_session_answer and user_input.survey_id.session_speed_rating
+ if session_speed_rating:
+ max_score_delay = 2
+ time_limit = question.time_limit
+ now = fields.Datetime.now()
+ seconds_to_answer = (now - user_input.survey_id.session_question_start_time).total_seconds()
+ question_remaining_time = time_limit - seconds_to_answer
+ # if answered within the max_score_delay => leave score as is
+ if question_remaining_time < 0: # if no time left
+ answer_score /= 2
+ elif seconds_to_answer > max_score_delay:
+ time_limit -= max_score_delay # we remove the max_score_delay to have all possible values
+ score_proportion = (time_limit - seconds_to_answer) / time_limit
+ answer_score = (answer_score / 2) * (1 + score_proportion)
+
+ return {
+ 'answer_is_correct': answer_is_correct,
+ 'answer_score': answer_score
+ }