diff options
| author | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
|---|---|---|
| committer | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
| commit | 3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch) | |
| tree | a44932296ef4a9b71d5f010906253d8c53727726 /addons/survey/models | |
| parent | 0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff) | |
initial commit 2
Diffstat (limited to 'addons/survey/models')
| -rw-r--r-- | addons/survey/models/__init__.py | 9 | ||||
| -rw-r--r-- | addons/survey/models/badge.py | 16 | ||||
| -rw-r--r-- | addons/survey/models/challenge.py | 12 | ||||
| -rw-r--r-- | addons/survey/models/res_partner.py | 32 | ||||
| -rw-r--r-- | addons/survey/models/survey_question.py | 560 | ||||
| -rw-r--r-- | addons/survey/models/survey_survey.py | 1066 | ||||
| -rw-r--r-- | addons/survey/models/survey_user.py | 628 |
7 files changed, 2323 insertions, 0 deletions
diff --git a/addons/survey/models/__init__.py b/addons/survey/models/__init__.py new file mode 100644 index 00000000..82414c70 --- /dev/null +++ b/addons/survey/models/__init__.py @@ -0,0 +1,9 @@ +# -*- encoding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from . import survey_survey +from . import survey_question +from . import survey_user +from . import badge +from . import challenge +from . import res_partner diff --git a/addons/survey/models/badge.py b/addons/survey/models/badge.py new file mode 100644 index 00000000..7ed12731 --- /dev/null +++ b/addons/survey/models/badge.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from odoo import api, fields, models + + +class GamificationBadge(models.Model): + _inherit = 'gamification.badge' + + survey_ids = fields.One2many('survey.survey', 'certification_badge_id', 'Survey Ids') + survey_id = fields.Many2one('survey.survey', 'Survey', compute='_compute_survey_id', store=True) + + @api.depends('survey_ids.certification_badge_id') + def _compute_survey_id(self): + for badge in self: + badge.survey_id = badge.survey_ids[0] if badge.survey_ids else None diff --git a/addons/survey/models/challenge.py b/addons/survey/models/challenge.py new file mode 100644 index 00000000..23b54c16 --- /dev/null +++ b/addons/survey/models/challenge.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from odoo import models, fields + + +class Challenge(models.Model): + _inherit = 'gamification.challenge' + + challenge_category = fields.Selection(selection_add=[ + ('certification', 'Certifications') + ], ondelete={'certification': 'set default'}) diff --git a/addons/survey/models/res_partner.py b/addons/survey/models/res_partner.py new file mode 100644 index 00000000..1c7a5e1e --- /dev/null +++ b/addons/survey/models/res_partner.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from odoo import api, fields, models + + +class ResPartner(models.Model): + _inherit = 'res.partner' + + certifications_count = fields.Integer('Certifications Count', compute='_compute_certifications_count') + certifications_company_count = fields.Integer('Company Certifications Count', compute='_compute_certifications_company_count') + + @api.depends('is_company') + def _compute_certifications_count(self): + read_group_res = self.env['survey.user_input'].sudo().read_group( + [('partner_id', 'in', self.ids), ('scoring_success', '=', True)], + ['partner_id'], 'partner_id' + ) + data = dict((res['partner_id'][0], res['partner_id_count']) for res in read_group_res) + for partner in self: + partner.certifications_count = data.get(partner.id, 0) + + @api.depends('is_company', 'child_ids.certifications_count') + def _compute_certifications_company_count(self): + self.certifications_company_count = sum(child.certifications_count for child in self.child_ids) + + def action_view_certifications(self): + action = self.env["ir.actions.actions"]._for_xml_id("survey.res_partner_action_certifications") + action['view_mode'] = 'tree' + action['domain'] = ['|', ('partner_id', 'in', self.ids), ('partner_id', 'in', self.child_ids.ids)] + + return action diff --git a/addons/survey/models/survey_question.py b/addons/survey/models/survey_question.py new file mode 100644 index 00000000..e1d9d2d9 --- /dev/null +++ b/addons/survey/models/survey_question.py @@ -0,0 +1,560 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +import collections +import json +import itertools +import operator + +from odoo import api, fields, models, tools, _ +from odoo.exceptions import ValidationError + + +class SurveyQuestion(models.Model): + """ Questions that will be asked in a survey. + + Each question can have one of more suggested answers (eg. in case of + multi-answer checkboxes, radio buttons...). + + Technical note: + + survey.question is also the model used for the survey's pages (with the "is_page" field set to True). + + A page corresponds to a "section" in the interface, and the fact that it separates the survey in + actual pages in the interface depends on the "questions_layout" parameter on the survey.survey model. + Pages are also used when randomizing questions. The randomization can happen within a "page". + + Using the same model for questions and pages allows to put all the pages and questions together in a o2m field + (see survey.survey.question_and_page_ids) on the view side and easily reorganize your survey by dragging the + items around. + + It also removes on level of encoding by directly having 'Add a page' and 'Add a question' + links on the tree view of questions, enabling a faster encoding. + + However, this has the downside of making the code reading a little bit more complicated. + Efforts were made at the model level to create computed fields so that the use of these models + still seems somewhat logical. That means: + - A survey still has "page_ids" (question_and_page_ids filtered on is_page = True) + - These "page_ids" still have question_ids (questions located between this page and the next) + - These "question_ids" still have a "page_id" + + That makes the use and display of these information at view and controller levels easier to understand. + """ + _name = 'survey.question' + _description = 'Survey Question' + _rec_name = 'title' + _order = 'sequence,id' + + @api.model + def default_get(self, fields): + defaults = super(SurveyQuestion, self).default_get(fields) + if (not fields or 'question_type' in fields): + defaults['question_type'] = False if defaults.get('is_page') == True else 'text_box' + return defaults + + # question generic data + title = fields.Char('Title', required=True, translate=True) + description = fields.Html( + 'Description', translate=True, sanitize=False, # TDE TODO: sanitize but find a way to keep youtube iframe media stuff + help="Use this field to add additional explanations about your question or to illustrate it with pictures or a video") + survey_id = fields.Many2one('survey.survey', string='Survey', ondelete='cascade') + scoring_type = fields.Selection(related='survey_id.scoring_type', string='Scoring Type', readonly=True) + sequence = fields.Integer('Sequence', default=10) + # page specific + is_page = fields.Boolean('Is a page?') + question_ids = fields.One2many('survey.question', string='Questions', compute="_compute_question_ids") + questions_selection = fields.Selection( + related='survey_id.questions_selection', readonly=True, + help="If randomized is selected, add the number of random questions next to the section.") + random_questions_count = fields.Integer( + 'Random questions count', default=1, + help="Used on randomized sections to take X random questions from all the questions of that section.") + # question specific + page_id = fields.Many2one('survey.question', string='Page', compute="_compute_page_id", store=True) + question_type = fields.Selection([ + ('text_box', 'Multiple Lines Text Box'), + ('char_box', 'Single Line Text Box'), + ('numerical_box', 'Numerical Value'), + ('date', 'Date'), + ('datetime', 'Datetime'), + ('simple_choice', 'Multiple choice: only one answer'), + ('multiple_choice', 'Multiple choice: multiple answers allowed'), + ('matrix', 'Matrix')], string='Question Type', + compute='_compute_question_type', readonly=False, store=True) + is_scored_question = fields.Boolean( + 'Scored', compute='_compute_is_scored_question', + readonly=False, store=True, copy=True, + help="Include this question as part of quiz scoring. Requires an answer and answer score to be taken into account.") + # -- scoreable/answerable simple answer_types: numerical_box / date / datetime + answer_numerical_box = fields.Float('Correct numerical answer', help="Correct number answer for this question.") + answer_date = fields.Date('Correct date answer', help="Correct date answer for this question.") + answer_datetime = fields.Datetime('Correct datetime answer', help="Correct date and time answer for this question.") + answer_score = fields.Float('Score', help="Score value for a correct answer to this question.") + # -- char_box + save_as_email = fields.Boolean( + "Save as user email", compute='_compute_save_as_email', readonly=False, store=True, copy=True, + help="If checked, this option will save the user's answer as its email address.") + save_as_nickname = fields.Boolean( + "Save as user nickname", compute='_compute_save_as_nickname', readonly=False, store=True, copy=True, + help="If checked, this option will save the user's answer as its nickname.") + # -- simple choice / multiple choice / matrix + suggested_answer_ids = fields.One2many( + 'survey.question.answer', 'question_id', string='Types of answers', copy=True, + help='Labels used for proposed choices: simple choice, multiple choice and columns of matrix') + allow_value_image = fields.Boolean('Images on answers', help='Display images in addition to answer label. Valid only for simple / multiple choice questions.') + # -- matrix + matrix_subtype = fields.Selection([ + ('simple', 'One choice per row'), + ('multiple', 'Multiple choices per row')], string='Matrix Type', default='simple') + matrix_row_ids = fields.One2many( + 'survey.question.answer', 'matrix_question_id', string='Matrix Rows', copy=True, + help='Labels used for proposed choices: rows of matrix') + # -- display & timing options + column_nb = fields.Selection([ + ('12', '1'), ('6', '2'), ('4', '3'), ('3', '4'), ('2', '6')], + string='Number of columns', default='12', + help='These options refer to col-xx-[12|6|4|3|2] classes in Bootstrap for dropdown-based simple and multiple choice questions.') + is_time_limited = fields.Boolean("The question is limited in time", + help="Currently only supported for live sessions.") + time_limit = fields.Integer("Time limit (seconds)") + # -- comments (simple choice, multiple choice, matrix (without count as an answer)) + comments_allowed = fields.Boolean('Show Comments Field') + comments_message = fields.Char('Comment Message', translate=True, default=lambda self: _("If other, please specify:")) + comment_count_as_answer = fields.Boolean('Comment Field is an Answer Choice') + # question validation + validation_required = fields.Boolean('Validate entry') + validation_email = fields.Boolean('Input must be an email') + validation_length_min = fields.Integer('Minimum Text Length', default=0) + validation_length_max = fields.Integer('Maximum Text Length', default=0) + validation_min_float_value = fields.Float('Minimum value', default=0.0) + validation_max_float_value = fields.Float('Maximum value', default=0.0) + validation_min_date = fields.Date('Minimum Date') + validation_max_date = fields.Date('Maximum Date') + validation_min_datetime = fields.Datetime('Minimum Datetime') + validation_max_datetime = fields.Datetime('Maximum Datetime') + validation_error_msg = fields.Char('Validation Error message', translate=True, default=lambda self: _("The answer you entered is not valid.")) + constr_mandatory = fields.Boolean('Mandatory Answer') + constr_error_msg = fields.Char('Error message', translate=True, default=lambda self: _("This question requires an answer.")) + # answers + user_input_line_ids = fields.One2many( + 'survey.user_input.line', 'question_id', string='Answers', + domain=[('skipped', '=', False)], groups='survey.group_survey_user') + + # Conditional display + is_conditional = fields.Boolean( + string='Conditional Display', copy=False, help="""If checked, this question will be displayed only + if the specified conditional answer have been selected in a previous question""") + triggering_question_id = fields.Many2one( + 'survey.question', string="Triggering Question", copy=False, compute="_compute_triggering_question_id", + store=True, readonly=False, help="Question containing the triggering answer to display the current question.", + domain="""[('survey_id', '=', survey_id), + '&', ('question_type', 'in', ['simple_choice', 'multiple_choice']), + '|', + ('sequence', '<', sequence), + '&', ('sequence', '=', sequence), ('id', '<', id)]""") + triggering_answer_id = fields.Many2one( + 'survey.question.answer', string="Triggering Answer", copy=False, compute="_compute_triggering_answer_id", + store=True, readonly=False, help="Answer that will trigger the display of the current question.", + domain="[('question_id', '=', triggering_question_id)]") + + _sql_constraints = [ + ('positive_len_min', 'CHECK (validation_length_min >= 0)', 'A length must be positive!'), + ('positive_len_max', 'CHECK (validation_length_max >= 0)', 'A length must be positive!'), + ('validation_length', 'CHECK (validation_length_min <= validation_length_max)', 'Max length cannot be smaller than min length!'), + ('validation_float', 'CHECK (validation_min_float_value <= validation_max_float_value)', 'Max value cannot be smaller than min value!'), + ('validation_date', 'CHECK (validation_min_date <= validation_max_date)', 'Max date cannot be smaller than min date!'), + ('validation_datetime', 'CHECK (validation_min_datetime <= validation_max_datetime)', 'Max datetime cannot be smaller than min datetime!'), + ('positive_answer_score', 'CHECK (answer_score >= 0)', 'An answer score for a non-multiple choice question cannot be negative!'), + ('scored_datetime_have_answers', "CHECK (is_scored_question != True OR question_type != 'datetime' OR answer_datetime is not null)", + 'All "Is a scored question = True" and "Question Type: Datetime" questions need an answer'), + ('scored_date_have_answers', "CHECK (is_scored_question != True OR question_type != 'date' OR answer_date is not null)", + 'All "Is a scored question = True" and "Question Type: Date" questions need an answer') + ] + + @api.depends('is_page') + def _compute_question_type(self): + for question in self: + if not question.question_type or question.is_page: + question.question_type = False + + @api.depends('survey_id.question_and_page_ids.is_page', 'survey_id.question_and_page_ids.sequence') + def _compute_question_ids(self): + """Will take all questions of the survey for which the index is higher than the index of this page + and lower than the index of the next page.""" + for question in self: + if question.is_page: + next_page_index = False + for page in question.survey_id.page_ids: + if page._index() > question._index(): + next_page_index = page._index() + break + + question.question_ids = question.survey_id.question_ids.filtered( + lambda q: q._index() > question._index() and (not next_page_index or q._index() < next_page_index) + ) + else: + question.question_ids = self.env['survey.question'] + + @api.depends('survey_id.question_and_page_ids.is_page', 'survey_id.question_and_page_ids.sequence') + def _compute_page_id(self): + """Will find the page to which this question belongs to by looking inside the corresponding survey""" + for question in self: + if question.is_page: + question.page_id = None + else: + page = None + for q in question.survey_id.question_and_page_ids.sorted(): + if q == question: + break + if q.is_page: + page = q + question.page_id = page + + @api.depends('question_type', 'validation_email') + def _compute_save_as_email(self): + for question in self: + if question.question_type != 'char_box' or not question.validation_email: + question.save_as_email = False + + @api.depends('question_type') + def _compute_save_as_nickname(self): + for question in self: + if question.question_type != 'char_box': + question.save_as_nickname = False + + @api.depends('is_conditional') + def _compute_triggering_question_id(self): + """ Used as an 'onchange' : Reset the triggering question if user uncheck 'Conditional Display' + Avoid CacheMiss : set the value to False if the value is not set yet.""" + for question in self: + if not question.is_conditional or question.triggering_question_id is None: + question.triggering_question_id = False + + @api.depends('triggering_question_id') + def _compute_triggering_answer_id(self): + """ Used as an 'onchange' : Reset the triggering answer if user unset or change the triggering question + or uncheck 'Conditional Display'. + Avoid CacheMiss : set the value to False if the value is not set yet.""" + for question in self: + if not question.triggering_question_id \ + or question.triggering_question_id != question.triggering_answer_id.question_id\ + or question.triggering_answer_id is None: + question.triggering_answer_id = False + + @api.depends('question_type', 'scoring_type', 'answer_date', 'answer_datetime', 'answer_numerical_box') + def _compute_is_scored_question(self): + """ Computes whether a question "is scored" or not. Handles following cases: + - inconsistent Boolean=None edge case that breaks tests => False + - survey is not scored => False + - 'date'/'datetime'/'numerical_box' question types w/correct answer => True + (implied without user having to activate, except for numerical whose correct value is 0.0) + - 'simple_choice / multiple_choice': set to True even if logic is a bit different (coming from answers) + - question_type isn't scoreable (note: choice questions scoring logic handled separately) => False + """ + for question in self: + if question.is_scored_question is None or question.scoring_type == 'no_scoring': + question.is_scored_question = False + elif question.question_type == 'date': + question.is_scored_question = bool(question.answer_date) + elif question.question_type == 'datetime': + question.is_scored_question = bool(question.answer_datetime) + elif question.question_type == 'numerical_box' and question.answer_numerical_box: + question.is_scored_question = True + elif question.question_type in ['simple_choice', 'multiple_choice']: + question.is_scored_question = True + else: + question.is_scored_question = False + + # ------------------------------------------------------------ + # VALIDATION + # ------------------------------------------------------------ + + def validate_question(self, answer, comment=None): + """ Validate question, depending on question type and parameters + for simple choice, text, date and number, answer is simply the answer of the question. + For other multiple choices questions, answer is a list of answers (the selected choices + or a list of selected answers per question -for matrix type-): + - Simple answer : answer = 'example' or 2 or question_answer_id or 2019/10/10 + - Multiple choice : answer = [question_answer_id1, question_answer_id2, question_answer_id3] + - Matrix: answer = { 'rowId1' : [colId1, colId2,...], 'rowId2' : [colId1, colId3, ...] } + + return dict {question.id (int): error (str)} -> empty dict if no validation error. + """ + self.ensure_one() + if isinstance(answer, str): + answer = answer.strip() + # Empty answer to mandatory question + if self.constr_mandatory and not answer and self.question_type not in ['simple_choice', 'multiple_choice']: + return {self.id: self.constr_error_msg} + + # because in choices question types, comment can count as answer + if answer or self.question_type in ['simple_choice', 'multiple_choice']: + if self.question_type == 'char_box': + return self._validate_char_box(answer) + elif self.question_type == 'numerical_box': + return self._validate_numerical_box(answer) + elif self.question_type in ['date', 'datetime']: + return self._validate_date(answer) + elif self.question_type in ['simple_choice', 'multiple_choice']: + return self._validate_choice(answer, comment) + elif self.question_type == 'matrix': + return self._validate_matrix(answer) + return {} + + def _validate_char_box(self, answer): + # Email format validation + # all the strings of the form "<something>@<anything>.<extension>" will be accepted + if self.validation_email: + if not tools.email_normalize(answer): + return {self.id: _('This answer must be an email address')} + + # Answer validation (if properly defined) + # Length of the answer must be in a range + if self.validation_required: + if not (self.validation_length_min <= len(answer) <= self.validation_length_max): + return {self.id: self.validation_error_msg} + return {} + + def _validate_numerical_box(self, answer): + try: + floatanswer = float(answer) + except ValueError: + return {self.id: _('This is not a number')} + + if self.validation_required: + # Answer is not in the right range + with tools.ignore(Exception): + if not (self.validation_min_float_value <= floatanswer <= self.validation_max_float_value): + return {self.id: self.validation_error_msg} + return {} + + def _validate_date(self, answer): + isDatetime = self.question_type == 'datetime' + # Checks if user input is a date + try: + dateanswer = fields.Datetime.from_string(answer) if isDatetime else fields.Date.from_string(answer) + except ValueError: + return {self.id: _('This is not a date')} + if self.validation_required: + # Check if answer is in the right range + if isDatetime: + min_date = fields.Datetime.from_string(self.validation_min_datetime) + max_date = fields.Datetime.from_string(self.validation_max_datetime) + dateanswer = fields.Datetime.from_string(answer) + else: + min_date = fields.Date.from_string(self.validation_min_date) + max_date = fields.Date.from_string(self.validation_max_date) + dateanswer = fields.Date.from_string(answer) + + if (min_date and max_date and not (min_date <= dateanswer <= max_date))\ + or (min_date and not min_date <= dateanswer)\ + or (max_date and not dateanswer <= max_date): + return {self.id: self.validation_error_msg} + return {} + + def _validate_choice(self, answer, comment): + # Empty comment + if self.constr_mandatory \ + and not answer \ + and not (self.comments_allowed and self.comment_count_as_answer and comment): + return {self.id: self.constr_error_msg} + return {} + + def _validate_matrix(self, answers): + # Validate that each line has been answered + if self.constr_mandatory and len(self.matrix_row_ids) != len(answers): + return {self.id: self.constr_error_msg} + return {} + + def _index(self): + """We would normally just use the 'sequence' field of questions BUT, if the pages and questions are + created without ever moving records around, the sequence field can be set to 0 for all the questions. + + However, the order of the recordset is always correct so we can rely on the index method.""" + self.ensure_one() + return list(self.survey_id.question_and_page_ids).index(self) + + # ------------------------------------------------------------ + # STATISTICS / REPORTING + # ------------------------------------------------------------ + + def _prepare_statistics(self, user_input_lines): + """ Compute statistical data for questions by counting number of vote per choice on basis of filter """ + all_questions_data = [] + for question in self: + question_data = {'question': question, 'is_page': question.is_page} + + if question.is_page: + all_questions_data.append(question_data) + continue + + # fetch answer lines, separate comments from real answers + all_lines = user_input_lines.filtered(lambda line: line.question_id == question) + if question.question_type in ['simple_choice', 'multiple_choice', 'matrix']: + answer_lines = all_lines.filtered( + lambda line: line.answer_type == 'suggestion' or ( + line.answer_type == 'char_box' and question.comment_count_as_answer) + ) + comment_line_ids = all_lines.filtered(lambda line: line.answer_type == 'char_box') + else: + answer_lines = all_lines + comment_line_ids = self.env['survey.user_input.line'] + skipped_lines = answer_lines.filtered(lambda line: line.skipped) + done_lines = answer_lines - skipped_lines + question_data.update( + answer_line_ids=answer_lines, + answer_line_done_ids=done_lines, + answer_input_done_ids=done_lines.mapped('user_input_id'), + answer_input_skipped_ids=skipped_lines.mapped('user_input_id'), + comment_line_ids=comment_line_ids) + question_data.update(question._get_stats_summary_data(answer_lines)) + + # prepare table and graph data + table_data, graph_data = question._get_stats_data(answer_lines) + question_data['table_data'] = table_data + question_data['graph_data'] = json.dumps(graph_data) + + all_questions_data.append(question_data) + return all_questions_data + + def _get_stats_data(self, user_input_lines): + if self.question_type == 'simple_choice': + return self._get_stats_data_answers(user_input_lines) + elif self.question_type == 'multiple_choice': + table_data, graph_data = self._get_stats_data_answers(user_input_lines) + return table_data, [{'key': self.title, 'values': graph_data}] + elif self.question_type == 'matrix': + return self._get_stats_graph_data_matrix(user_input_lines) + return [line for line in user_input_lines], [] + + def _get_stats_data_answers(self, user_input_lines): + """ Statistics for question.answer based questions (simple choice, multiple + choice.). A corner case with a void record survey.question.answer is added + to count comments that should be considered as valid answers. This small hack + allow to have everything available in the same standard structure. """ + suggested_answers = [answer for answer in self.mapped('suggested_answer_ids')] + if self.comment_count_as_answer: + suggested_answers += [self.env['survey.question.answer']] + + count_data = dict.fromkeys(suggested_answers, 0) + for line in user_input_lines: + if line.suggested_answer_id or (line.value_char_box and self.comment_count_as_answer): + count_data[line.suggested_answer_id] += 1 + + table_data = [{ + 'value': _('Other (see comments)') if not sug_answer else sug_answer.value, + 'suggested_answer': sug_answer, + 'count': count_data[sug_answer] + } + for sug_answer in suggested_answers] + graph_data = [{ + 'text': _('Other (see comments)') if not sug_answer else sug_answer.value, + 'count': count_data[sug_answer] + } + for sug_answer in suggested_answers] + + return table_data, graph_data + + def _get_stats_graph_data_matrix(self, user_input_lines): + suggested_answers = self.mapped('suggested_answer_ids') + matrix_rows = self.mapped('matrix_row_ids') + + count_data = dict.fromkeys(itertools.product(matrix_rows, suggested_answers), 0) + for line in user_input_lines: + if line.matrix_row_id and line.suggested_answer_id: + count_data[(line.matrix_row_id, line.suggested_answer_id)] += 1 + + table_data = [{ + 'row': row, + 'columns': [{ + 'suggested_answer': sug_answer, + 'count': count_data[(row, sug_answer)] + } for sug_answer in suggested_answers], + } for row in matrix_rows] + graph_data = [{ + 'key': sug_answer.value, + 'values': [{ + 'text': row.value, + 'count': count_data[(row, sug_answer)] + } + for row in matrix_rows + ] + } for sug_answer in suggested_answers] + + return table_data, graph_data + + def _get_stats_summary_data(self, user_input_lines): + stats = {} + if self.question_type in ['simple_choice', 'multiple_choice']: + stats.update(self._get_stats_summary_data_choice(user_input_lines)) + elif self.question_type == 'numerical_box': + stats.update(self._get_stats_summary_data_numerical(user_input_lines)) + + if self.question_type in ['numerical_box', 'date', 'datetime']: + stats.update(self._get_stats_summary_data_scored(user_input_lines)) + return stats + + def _get_stats_summary_data_choice(self, user_input_lines): + right_inputs, partial_inputs = self.env['survey.user_input'], self.env['survey.user_input'] + right_answers = self.suggested_answer_ids.filtered(lambda label: label.is_correct) + if self.question_type == 'multiple_choice': + for user_input, lines in tools.groupby(user_input_lines, operator.itemgetter('user_input_id')): + user_input_answers = self.env['survey.user_input.line'].concat(*lines).filtered(lambda l: l.answer_is_correct).mapped('suggested_answer_id') + if user_input_answers and user_input_answers < right_answers: + partial_inputs += user_input + elif user_input_answers: + right_inputs += user_input + else: + right_inputs = user_input_lines.filtered(lambda line: line.answer_is_correct).mapped('user_input_id') + return { + 'right_answers': right_answers, + 'right_inputs_count': len(right_inputs), + 'partial_inputs_count': len(partial_inputs), + } + + def _get_stats_summary_data_numerical(self, user_input_lines): + all_values = user_input_lines.filtered(lambda line: not line.skipped).mapped('value_numerical_box') + lines_sum = sum(all_values) + return { + 'numerical_max': max(all_values, default=0), + 'numerical_min': min(all_values, default=0), + 'numerical_average': round(lines_sum / (len(all_values) or 1), 2), + } + + def _get_stats_summary_data_scored(self, user_input_lines): + return { + 'common_lines': collections.Counter( + user_input_lines.filtered(lambda line: not line.skipped).mapped('value_%s' % self.question_type) + ).most_common(5) if self.question_type != 'datetime' else [], + 'right_inputs_count': len(user_input_lines.filtered(lambda line: line.answer_is_correct).mapped('user_input_id')) + } + + +class SurveyQuestionAnswer(models.Model): + """ A preconfigured answer for a question. This model stores values used + for + + * simple choice, multiple choice: proposed values for the selection / + radio; + * matrix: row and column values; + + """ + _name = 'survey.question.answer' + _rec_name = 'value' + _order = 'sequence, id' + _description = 'Survey Label' + + question_id = fields.Many2one('survey.question', string='Question', ondelete='cascade') + matrix_question_id = fields.Many2one('survey.question', string='Question (as matrix row)', ondelete='cascade') + sequence = fields.Integer('Label Sequence order', default=10) + value = fields.Char('Suggested value', translate=True, required=True) + value_image = fields.Image('Image', max_width=256, max_height=256) + is_correct = fields.Boolean('Is a correct answer') + answer_score = fields.Float('Score for this choice', help="A positive score indicates a correct choice; a negative or null score indicates a wrong answer") + + @api.constrains('question_id', 'matrix_question_id') + def _check_question_not_empty(self): + """Ensure that field question_id XOR field matrix_question_id is not null""" + for label in self: + if not bool(label.question_id) != bool(label.matrix_question_id): + raise ValidationError(_("A label must be attached to only one question.")) diff --git a/addons/survey/models/survey_survey.py b/addons/survey/models/survey_survey.py new file mode 100644 index 00000000..4188a27f --- /dev/null +++ b/addons/survey/models/survey_survey.py @@ -0,0 +1,1066 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +import json +import random +import uuid +import werkzeug + +from odoo import api, exceptions, fields, models, _ +from odoo.exceptions import AccessError, UserError +from odoo.osv import expression +from odoo.tools import is_html_empty + + +class Survey(models.Model): + """ Settings for a multi-page/multi-question survey. Each survey can have one or more attached pages + and each page can display one or more questions. """ + _name = 'survey.survey' + _description = 'Survey' + _rec_name = 'title' + _inherit = ['mail.thread', 'mail.activity.mixin'] + + def _get_default_access_token(self): + return str(uuid.uuid4()) + + def _get_default_session_code(self): + """ Attempt to generate a session code for our survey. + The method will first try to generate 20 codes with 4 digits each and check if any are colliding. + If we have at least one non-colliding code, we use it. + If all 20 generated codes are colliding, we try with 20 codes of 5 digits, + then 6, ... up to 10 digits. """ + + for digits_count in range(4, 10): + range_lower_bound = 1 * (10 ** (digits_count - 1)) + range_upper_bound = (range_lower_bound * 10) - 1 + code_candidates = set([str(random.randint(range_lower_bound, range_upper_bound)) for i in range(20)]) + colliding_codes = self.sudo().search_read( + [('session_code', 'in', list(code_candidates))], + ['session_code'] + ) + code_candidates -= set([colliding_code['session_code'] for colliding_code in colliding_codes]) + if code_candidates: + return list(code_candidates)[0] + + return False # could not generate a code + + # description + title = fields.Char('Survey Title', required=True, translate=True) + color = fields.Integer('Color Index', default=0) + description = fields.Html( + "Description", translate=True, sanitize=False, # TDE FIXME: find a way to authorize videos + help="The description will be displayed on the home page of the survey. You can use this to give the purpose and guidelines to your candidates before they start it.") + description_done = fields.Html( + "End Message", translate=True, + help="This message will be displayed when survey is completed") + background_image = fields.Binary("Background Image") + active = fields.Boolean("Active", default=True) + state = fields.Selection(selection=[ + ('draft', 'Draft'), ('open', 'In Progress'), ('closed', 'Closed') + ], string="Survey Stage", default='draft', required=True, + group_expand='_read_group_states') + # questions + question_and_page_ids = fields.One2many('survey.question', 'survey_id', string='Sections and Questions', copy=True) + page_ids = fields.One2many('survey.question', string='Pages', compute="_compute_page_and_question_ids") + question_ids = fields.One2many('survey.question', string='Questions', compute="_compute_page_and_question_ids") + questions_layout = fields.Selection([ + ('one_page', 'One page with all the questions'), + ('page_per_section', 'One page per section'), + ('page_per_question', 'One page per question')], + string="Layout", required=True, default='one_page') + questions_selection = fields.Selection([ + ('all', 'All questions'), + ('random', 'Randomized per section')], + string="Selection", required=True, default='all', + help="If randomized is selected, you can configure the number of random questions by section. This mode is ignored in live session.") + progression_mode = fields.Selection([ + ('percent', 'Percentage'), + ('number', 'Number')], string='Progression Mode', default='percent', + help="If Number is selected, it will display the number of questions answered on the total number of question to answer.") + # attendees + user_input_ids = fields.One2many('survey.user_input', 'survey_id', string='User responses', readonly=True, groups='survey.group_survey_user') + # security / access + access_mode = fields.Selection([ + ('public', 'Anyone with the link'), + ('token', 'Invited people only')], string='Access Mode', + default='public', required=True) + access_token = fields.Char('Access Token', default=lambda self: self._get_default_access_token(), copy=False) + users_login_required = fields.Boolean('Login Required', help="If checked, users have to login before answering even with a valid token.") + users_can_go_back = fields.Boolean('Users can go back', help="If checked, users can go back to previous pages.") + users_can_signup = fields.Boolean('Users can signup', compute='_compute_users_can_signup') + # statistics + answer_count = fields.Integer("Registered", compute="_compute_survey_statistic") + answer_done_count = fields.Integer("Attempts", compute="_compute_survey_statistic") + answer_score_avg = fields.Float("Avg Score %", compute="_compute_survey_statistic") + success_count = fields.Integer("Success", compute="_compute_survey_statistic") + success_ratio = fields.Integer("Success Ratio", compute="_compute_survey_statistic") + # scoring + scoring_type = fields.Selection([ + ('no_scoring', 'No scoring'), + ('scoring_with_answers', 'Scoring with answers at the end'), + ('scoring_without_answers', 'Scoring without answers at the end')], + string="Scoring", required=True, default='no_scoring') + scoring_success_min = fields.Float('Success %', default=80.0) + # attendees context: attempts and time limitation + is_attempts_limited = fields.Boolean('Limited number of attempts', help="Check this option if you want to limit the number of attempts per user", + compute="_compute_is_attempts_limited", store=True, readonly=False) + attempts_limit = fields.Integer('Number of attempts', default=1) + is_time_limited = fields.Boolean('The survey is limited in time') + time_limit = fields.Float("Time limit (minutes)", default=10) + # certification + certification = fields.Boolean('Is a Certification', compute='_compute_certification', + readonly=False, store=True) + certification_mail_template_id = fields.Many2one( + 'mail.template', 'Email Template', + domain="[('model', '=', 'survey.user_input')]", + help="Automated email sent to the user when he succeeds the certification, containing his certification document.") + certification_report_layout = fields.Selection([ + ('modern_purple', 'Modern Purple'), + ('modern_blue', 'Modern Blue'), + ('modern_gold', 'Modern Gold'), + ('classic_purple', 'Classic Purple'), + ('classic_blue', 'Classic Blue'), + ('classic_gold', 'Classic Gold')], + string='Certification template', default='modern_purple') + # Certification badge + # certification_badge_id_dummy is used to have two different behaviours in the form view : + # - If the certification badge is not set, show certification_badge_id and only display create option in the m2o + # - If the certification badge is set, show certification_badge_id_dummy in 'no create' mode. + # So it can be edited but not removed or replaced. + certification_give_badge = fields.Boolean('Give Badge', compute='_compute_certification_give_badge', + readonly=False, store=True) + certification_badge_id = fields.Many2one('gamification.badge', 'Certification Badge') + certification_badge_id_dummy = fields.Many2one(related='certification_badge_id', string='Certification Badge ') + # live sessions + session_state = fields.Selection([ + ('ready', 'Ready'), + ('in_progress', 'In Progress'), + ], string="Session State", copy=False) + session_code = fields.Char('Session Code', default=lambda self: self._get_default_session_code(), copy=False, + help="This code will be used by your attendees to reach your session. Feel free to customize it however you like!") + session_link = fields.Char('Session Link', compute='_compute_session_link') + # live sessions - current question fields + session_question_id = fields.Many2one('survey.question', string="Current Question", copy=False, + help="The current question of the survey session.") + session_start_time = fields.Datetime("Current Session Start Time", copy=False) + session_question_start_time = fields.Datetime("Current Question Start Time", copy=False, + help="The time at which the current question has started, used to handle the timer for attendees.") + session_answer_count = fields.Integer("Answers Count", compute='_compute_session_answer_count') + session_question_answer_count = fields.Integer("Question Answers Count", compute='_compute_session_question_answer_count') + # live sessions - settings + session_show_leaderboard = fields.Boolean("Show Session Leaderboard", compute='_compute_session_show_leaderboard', + help="Whether or not we want to show the attendees leaderboard for this survey.") + session_speed_rating = fields.Boolean("Reward quick answers", help="Attendees get more points if they answer quickly") + # conditional questions management + has_conditional_questions = fields.Boolean("Contains conditional questions", compute="_compute_has_conditional_questions") + + _sql_constraints = [ + ('access_token_unique', 'unique(access_token)', 'Access token should be unique'), + ('session_code_unique', 'unique(session_code)', 'Session code should be unique'), + ('certification_check', "CHECK( scoring_type!='no_scoring' OR certification=False )", + 'You can only create certifications for surveys that have a scoring mechanism.'), + ('scoring_success_min_check', "CHECK( scoring_success_min IS NULL OR (scoring_success_min>=0 AND scoring_success_min<=100) )", + 'The percentage of success has to be defined between 0 and 100.'), + ('time_limit_check', "CHECK( (is_time_limited=False) OR (time_limit is not null AND time_limit > 0) )", + 'The time limit needs to be a positive number if the survey is time limited.'), + ('attempts_limit_check', "CHECK( (is_attempts_limited=False) OR (attempts_limit is not null AND attempts_limit > 0) )", + 'The attempts limit needs to be a positive number if the survey has a limited number of attempts.'), + ('badge_uniq', 'unique (certification_badge_id)', "The badge for each survey should be unique!"), + ('give_badge_check', "CHECK(certification_give_badge=False OR (certification_give_badge=True AND certification_badge_id is not null))", + 'Certification badge must be configured if Give Badge is set.'), + ] + + def _compute_users_can_signup(self): + signup_allowed = self.env['res.users'].sudo()._get_signup_invitation_scope() == 'b2c' + for survey in self: + survey.users_can_signup = signup_allowed + + @api.depends('user_input_ids.state', 'user_input_ids.test_entry', 'user_input_ids.scoring_percentage', 'user_input_ids.scoring_success') + def _compute_survey_statistic(self): + default_vals = { + 'answer_count': 0, 'answer_done_count': 0, 'success_count': 0, + 'answer_score_avg': 0.0, 'success_ratio': 0.0 + } + stat = dict((cid, dict(default_vals, answer_score_avg_total=0.0)) for cid in self.ids) + UserInput = self.env['survey.user_input'] + base_domain = ['&', ('survey_id', 'in', self.ids), ('test_entry', '!=', True)] + + read_group_res = UserInput.read_group(base_domain, ['survey_id', 'state'], ['survey_id', 'state', 'scoring_percentage', 'scoring_success'], lazy=False) + for item in read_group_res: + stat[item['survey_id'][0]]['answer_count'] += item['__count'] + stat[item['survey_id'][0]]['answer_score_avg_total'] += item['scoring_percentage'] + if item['state'] == 'done': + stat[item['survey_id'][0]]['answer_done_count'] += item['__count'] + if item['scoring_success']: + stat[item['survey_id'][0]]['success_count'] += item['__count'] + + for survey_id, values in stat.items(): + avg_total = stat[survey_id].pop('answer_score_avg_total') + stat[survey_id]['answer_score_avg'] = avg_total / (stat[survey_id]['answer_done_count'] or 1) + stat[survey_id]['success_ratio'] = (stat[survey_id]['success_count'] / (stat[survey_id]['answer_done_count'] or 1.0))*100 + + for survey in self: + survey.update(stat.get(survey._origin.id, default_vals)) + + @api.depends('question_and_page_ids') + def _compute_page_and_question_ids(self): + for survey in self: + survey.page_ids = survey.question_and_page_ids.filtered(lambda question: question.is_page) + survey.question_ids = survey.question_and_page_ids - survey.page_ids + + @api.depends('question_and_page_ids.is_conditional', 'users_login_required', 'access_mode') + def _compute_is_attempts_limited(self): + for survey in self: + if not survey.is_attempts_limited or \ + (survey.access_mode == 'public' and not survey.users_login_required) or \ + any(question.is_conditional for question in survey.question_and_page_ids): + survey.is_attempts_limited = False + + @api.depends('session_start_time', 'user_input_ids') + def _compute_session_answer_count(self): + """ We have to loop since our result is dependent of the survey.session_start_time. + This field is currently used to display the count about a single survey, in the + context of sessions, so it should not matter too much. """ + + for survey in self: + answer_count = 0 + input_count = self.env['survey.user_input'].read_group( + [('survey_id', '=', survey.id), + ('is_session_answer', '=', True), + ('state', '!=', 'done'), + ('create_date', '>=', survey.session_start_time)], + ['create_uid:count'], + ['survey_id'], + ) + if input_count: + answer_count = input_count[0].get('create_uid', 0) + + survey.session_answer_count = answer_count + + @api.depends('session_question_id', 'session_start_time', 'user_input_ids.user_input_line_ids') + def _compute_session_question_answer_count(self): + """ We have to loop since our result is dependent of the survey.session_question_id and + the survey.session_start_time. + This field is currently used to display the count about a single survey, in the + context of sessions, so it should not matter too much. """ + for survey in self: + answer_count = 0 + input_line_count = self.env['survey.user_input.line'].read_group( + [('question_id', '=', survey.session_question_id.id), + ('survey_id', '=', survey.id), + ('create_date', '>=', survey.session_start_time)], + ['user_input_id:count_distinct'], + ['question_id'], + ) + if input_line_count: + answer_count = input_line_count[0].get('user_input_id', 0) + + survey.session_question_answer_count = answer_count + + @api.depends('session_code') + def _compute_session_link(self): + for survey in self: + if survey.session_code: + survey.session_link = werkzeug.urls.url_join( + survey.get_base_url(), + '/s/%s' % survey.session_code) + else: + survey.session_link = werkzeug.urls.url_join( + survey.get_base_url(), + survey.get_start_url()) + + @api.depends('scoring_type', 'question_and_page_ids.save_as_nickname') + def _compute_session_show_leaderboard(self): + for survey in self: + survey.session_show_leaderboard = survey.scoring_type != 'no_scoring' and \ + any(question.save_as_nickname for question in survey.question_and_page_ids) + + @api.depends('question_and_page_ids.is_conditional') + def _compute_has_conditional_questions(self): + for survey in self: + survey.has_conditional_questions = any(question.is_conditional for question in survey.question_and_page_ids) + + @api.depends('scoring_type') + def _compute_certification(self): + for survey in self: + if not survey.certification or survey.scoring_type == 'no_scoring': + survey.certification = False + + @api.depends('users_login_required', 'certification') + def _compute_certification_give_badge(self): + for survey in self: + if not survey.certification_give_badge or \ + not survey.users_login_required or \ + not survey.certification: + survey.certification_give_badge = False + + def _read_group_states(self, values, domain, order): + selection = self.env['survey.survey'].fields_get(allfields=['state'])['state']['selection'] + return [s[0] for s in selection] + + # ------------------------------------------------------------ + # CRUD + # ------------------------------------------------------------ + + @api.model + def create(self, vals): + survey = super(Survey, self).create(vals) + if vals.get('certification_give_badge'): + survey.sudo()._create_certification_badge_trigger() + return survey + + def write(self, vals): + result = super(Survey, self).write(vals) + if 'certification_give_badge' in vals: + return self.sudo()._handle_certification_badges(vals) + return result + + def copy_data(self, default=None): + title = _("%s (copy)") % (self.title) + default = dict(default or {}, title=title) + return super(Survey, self).copy_data(default) + + def toggle_active(self): + super(Survey, self).toggle_active() + activated = self.filtered(lambda survey: survey.active) + activated.mapped('certification_badge_id').action_unarchive() + (self - activated).mapped('certification_badge_id').action_archive() + + # ------------------------------------------------------------ + # ANSWER MANAGEMENT + # ------------------------------------------------------------ + + def _create_answer(self, user=False, partner=False, email=False, test_entry=False, check_attempts=True, **additional_vals): + """ Main entry point to get a token back or create a new one. This method + does check for current user access in order to explicitely validate + security. + + :param user: target user asking for a token; it might be void or a + public user in which case an email is welcomed; + :param email: email of the person asking the token is no user exists; + """ + self.check_access_rights('read') + self.check_access_rule('read') + + user_inputs = self.env['survey.user_input'] + for survey in self: + if partner and not user and partner.user_ids: + user = partner.user_ids[0] + + invite_token = additional_vals.pop('invite_token', False) + survey._check_answer_creation(user, partner, email, test_entry=test_entry, check_attempts=check_attempts, invite_token=invite_token) + answer_vals = { + 'survey_id': survey.id, + 'test_entry': test_entry, + 'is_session_answer': survey.session_state in ['ready', 'in_progress'] + } + if survey.session_state == 'in_progress': + # if the session is already in progress, the answer skips the 'new' state + answer_vals.update({ + 'state': 'in_progress', + 'start_datetime': fields.Datetime.now(), + }) + if user and not user._is_public(): + answer_vals['partner_id'] = user.partner_id.id + answer_vals['email'] = user.email + answer_vals['nickname'] = user.name + elif partner: + answer_vals['partner_id'] = partner.id + answer_vals['email'] = partner.email + answer_vals['nickname'] = partner.name + else: + answer_vals['email'] = email + answer_vals['nickname'] = email + + if invite_token: + answer_vals['invite_token'] = invite_token + elif survey.is_attempts_limited and survey.access_mode != 'public': + # attempts limited: create a new invite_token + # exception made for 'public' access_mode since the attempts pool is global because answers are + # created every time the user lands on '/start' + answer_vals['invite_token'] = self.env['survey.user_input']._generate_invite_token() + + answer_vals.update(additional_vals) + user_inputs += user_inputs.create(answer_vals) + + for question in self.mapped('question_ids').filtered( + lambda q: q.question_type == 'char_box' and (q.save_as_email or q.save_as_nickname)): + for user_input in user_inputs: + if question.save_as_email and user_input.email: + user_input.save_lines(question, user_input.email) + if question.save_as_nickname and user_input.nickname: + user_input.save_lines(question, user_input.nickname) + + return user_inputs + + def _check_answer_creation(self, user, partner, email, test_entry=False, check_attempts=True, invite_token=False): + """ Ensure conditions to create new tokens are met. """ + self.ensure_one() + if test_entry: + # the current user must have the access rights to survey + if not user.has_group('survey.group_survey_user'): + raise exceptions.UserError(_('Creating test token is not allowed for you.')) + else: + if not self.active: + raise exceptions.UserError(_('Creating token for archived surveys is not allowed.')) + elif self.state == 'closed': + raise exceptions.UserError(_('Creating token for closed surveys is not allowed.')) + if self.access_mode == 'authentication': + # signup possible -> should have at least a partner to create an account + if self.users_can_signup and not user and not partner: + raise exceptions.UserError(_('Creating token for external people is not allowed for surveys requesting authentication.')) + # no signup possible -> should be a not public user (employee or portal users) + if not self.users_can_signup and (not user or user._is_public()): + raise exceptions.UserError(_('Creating token for external people is not allowed for surveys requesting authentication.')) + if self.access_mode == 'internal' and (not user or not user.has_group('base.group_user')): + raise exceptions.UserError(_('Creating token for anybody else than employees is not allowed for internal surveys.')) + if check_attempts and not self._has_attempts_left(partner or (user and user.partner_id), email, invite_token): + raise exceptions.UserError(_('No attempts left.')) + + def _prepare_user_input_predefined_questions(self): + """ Will generate the questions for a randomized survey. + It uses the random_questions_count of every sections of the survey to + pick a random number of questions and returns the merged recordset """ + self.ensure_one() + + questions = self.env['survey.question'] + + # First append questions without page + for question in self.question_ids: + if not question.page_id: + questions |= question + + # Then, questions in sections + + for page in self.page_ids: + if self.questions_selection == 'all': + questions |= page.question_ids + else: + if page.random_questions_count > 0 and len(page.question_ids) > page.random_questions_count: + questions = questions.concat(*random.sample(page.question_ids, page.random_questions_count)) + else: + questions |= page.question_ids + + return questions + + def _can_go_back(self, answer, page_or_question): + """ Check if the user can go back to the previous question/page for the currently + viewed question/page. + Back button needs to be configured on survey and, depending on the layout: + - In 'page_per_section', we can go back if we're not on the first page + - In 'page_per_question', we can go back if: + - It is not a session answer (doesn't make sense to go back in session context) + - We are not on the first question + - The survey does not have pages OR this is not the first page of the survey + (pages are displayed in 'page_per_question' layout when they have a description, see PR#44271) + """ + self.ensure_one() + + if self.users_can_go_back and answer.state == 'in_progress': + if self.questions_layout == 'page_per_section' and page_or_question != self.page_ids[0]: + return True + elif self.questions_layout == 'page_per_question' and \ + not answer.is_session_answer and \ + page_or_question != answer.predefined_question_ids[0] \ + and (not self.page_ids or page_or_question != self.page_ids[0]): + return True + + return False + + def _has_attempts_left(self, partner, email, invite_token): + self.ensure_one() + + if (self.access_mode != 'public' or self.users_login_required) and self.is_attempts_limited: + return self._get_number_of_attempts_lefts(partner, email, invite_token) > 0 + + return True + + def _get_number_of_attempts_lefts(self, partner, email, invite_token): + """ Returns the number of attempts left. """ + self.ensure_one() + + domain = [ + ('survey_id', '=', self.id), + ('test_entry', '=', False), + ('state', '=', 'done') + ] + + if partner: + domain = expression.AND([domain, [('partner_id', '=', partner.id)]]) + else: + domain = expression.AND([domain, [('email', '=', email)]]) + + if invite_token: + domain = expression.AND([domain, [('invite_token', '=', invite_token)]]) + + return self.attempts_limit - self.env['survey.user_input'].search_count(domain) + + # ------------------------------------------------------------ + # QUESTIONS MANAGEMENT + # ------------------------------------------------------------ + + @api.model + def _get_pages_or_questions(self, user_input): + """ Returns the pages or questions (depending on the layout) that will be shown + to the user taking the survey. + In 'page_per_question' layout, we also want to show pages that have a description. """ + + result = self.env['survey.question'] + if self.questions_layout == 'page_per_section': + result = self.page_ids + elif self.questions_layout == 'page_per_question': + if self.questions_selection == 'random' and not self.session_state: + result = user_input.predefined_question_ids + else: + result = self.question_and_page_ids.filtered( + lambda question: not question.is_page or not is_html_empty(question.description)) + + return result + + def _get_next_page_or_question(self, user_input, page_or_question_id, go_back=False): + """ Generalized logic to retrieve the next question or page to show on the survey. + It's based on the page_or_question_id parameter, that is usually the currently displayed question/page. + + There is a special case when the survey is configured with conditional questions: + - for "page_per_question" layout, the next question to display depends on the selected answers and + the questions 'hierarchy'. + - for "page_per_section" layout, before returning the result, we check that it contains at least a question + (all section questions could be disabled based on previously selected answers) + + The whole logic is inverted if "go_back" is passed as True. + + As pages with description are considered as potential question to display, we show the page + if it contains at least one active question or a description. + + :param user_input: user's answers + :param page_or_question_id: current page or question id + :param go_back: reverse the logic and get the PREVIOUS question/page + :return: next or previous question/page + """ + + survey = user_input.survey_id + pages_or_questions = survey._get_pages_or_questions(user_input) + Question = self.env['survey.question'] + + # Get Next + if not go_back: + if not pages_or_questions: + return Question + # First page + if page_or_question_id == 0: + return pages_or_questions[0] + + current_page_index = pages_or_questions.ids.index(page_or_question_id) + + # Get previous and we are on first page OR Get Next and we are on last page + if (go_back and current_page_index == 0) or (not go_back and current_page_index == len(pages_or_questions) - 1): + return Question + + # Conditional Questions Management + triggering_answer_by_question, triggered_questions_by_answer, selected_answers = user_input._get_conditional_values() + inactive_questions = user_input._get_inactive_conditional_questions() + if survey.questions_layout == 'page_per_question': + question_candidates = pages_or_questions[0:current_page_index] if go_back \ + else pages_or_questions[current_page_index + 1:] + for question in question_candidates.sorted(reverse=go_back): + # pages with description are potential questions to display (are part of question_candidates) + if question.is_page: + contains_active_question = any(sub_question not in inactive_questions for sub_question in question.question_ids) + is_description_section = not question.question_ids and not is_html_empty(question.description) + if contains_active_question or is_description_section: + return question + else: + triggering_answer = triggering_answer_by_question.get(question) + if not triggering_answer or triggering_answer in selected_answers: + # question is visible because not conditioned or conditioned by a selected answer + return question + elif survey.questions_layout == 'page_per_section': + section_candidates = pages_or_questions[0:current_page_index] if go_back \ + else pages_or_questions[current_page_index + 1:] + for section in section_candidates.sorted(reverse=go_back): + contains_active_question = any(question not in inactive_questions for question in section.question_ids) + is_description_section = not section.question_ids and not is_html_empty(section.description) + if contains_active_question or is_description_section: + return section + return Question + + def _is_last_page_or_question(self, user_input, page_or_question): + """ This method checks if the given question or page is the last one. + This includes conditional questions configuration. If the given question is normally not the last one but + every following questions are inactive due to conditional questions configurations (and user choices), + the given question will be the last one, except if the given question is conditioning at least + one of the following questions. + For section, we check in each following section if there is an active question. + If yes, the given page is not the last one. + """ + pages_or_questions = self._get_pages_or_questions(user_input) + current_page_index = pages_or_questions.ids.index(page_or_question.id) + next_page_or_question_candidates = pages_or_questions[current_page_index + 1:] + if next_page_or_question_candidates: + inactive_questions = user_input._get_inactive_conditional_questions() + triggering_answer_by_question, triggered_questions_by_answer, selected_answers = user_input._get_conditional_values() + if self.questions_layout == 'page_per_question': + next_active_question = any(next_question not in inactive_questions for next_question in next_page_or_question_candidates) + is_triggering_question = any(triggering_answer in triggered_questions_by_answer.keys() for triggering_answer in page_or_question.suggested_answer_ids) + return not(next_active_question or is_triggering_question) + elif self.questions_layout == 'page_per_section': + is_triggering_section = False + for question in page_or_question.question_ids: + if any(triggering_answer in triggered_questions_by_answer.keys() for triggering_answer in + question.suggested_answer_ids): + is_triggering_section = True + break + next_active_question = False + for section in next_page_or_question_candidates: + next_active_question = any(next_question not in inactive_questions for next_question in section.question_ids) + if next_active_question: + break + return not(next_active_question or is_triggering_section) + + return True + + def _get_survey_questions(self, answer=None, page_id=None, question_id=None): + """ Returns a tuple containing: the survey question and the passed question_id / page_id + based on the question_layout and the fact that it's a session or not. + + Breakdown of use cases: + - We are currently running a session + We return the current session question and it's id + - The layout is page_per_section + We return the questions for that page and the passed page_id + - The layout is page_per_question + We return the question for the passed question_id and the question_id + - The layout is one_page + We return all the questions of the survey and None + + In addition, we cross the returned questions with the answer.predefined_question_ids, + that allows to handle the randomization of questions. """ + + questions, page_or_question_id = None, None + + if answer and answer.is_session_answer: + return self.session_question_id, self.session_question_id.id + if self.questions_layout == 'page_per_section': + if not page_id: + raise ValueError("Page id is needed for question layout 'page_per_section'") + page_id = int(page_id) + questions = self.env['survey.question'].sudo().search([('survey_id', '=', self.id), ('page_id', '=', page_id)]) + page_or_question_id = page_id + elif self.questions_layout == 'page_per_question': + if not question_id: + raise ValueError("Question id is needed for question layout 'page_per_question'") + question_id = int(question_id) + questions = self.env['survey.question'].sudo().browse(question_id) + page_or_question_id = question_id + else: + questions = self.question_ids + + # we need the intersection of the questions of this page AND the questions prepared for that user_input + # (because randomized surveys do not use all the questions of every page) + if answer: + questions = questions & answer.predefined_question_ids + return questions, page_or_question_id + + # ------------------------------------------------------------ + # CONDITIONAL QUESTIONS MANAGEMENT + # ------------------------------------------------------------ + + def _get_conditional_maps(self): + triggering_answer_by_question = {} + triggered_questions_by_answer = {} + for question in self.question_ids: + triggering_answer_by_question[question] = question.is_conditional and question.triggering_answer_id + + if question.is_conditional: + if question.triggering_answer_id in triggered_questions_by_answer: + triggered_questions_by_answer[question.triggering_answer_id] |= question + else: + triggered_questions_by_answer[question.triggering_answer_id] = question + return triggering_answer_by_question, triggered_questions_by_answer + + # ------------------------------------------------------------ + # SESSIONS MANAGEMENT + # ------------------------------------------------------------ + + def _session_open(self): + """ The session start is sudo'ed to allow survey user to manage sessions of surveys + they do not own. + + We flush after writing to make sure it's updated before bus takes over. """ + + if self.env.user.has_group('survey.group_survey_user'): + self.sudo().write({'session_state': 'in_progress'}) + self.sudo().flush(['session_state']) + + def _get_session_next_question(self): + self.ensure_one() + + if not self.question_ids or not self.env.user.has_group('survey.group_survey_user'): + return + + most_voted_answers = self._get_session_most_voted_answers() + return self._get_next_page_or_question( + most_voted_answers, + self.session_question_id.id if self.session_question_id else 0) + + def _get_session_most_voted_answers(self): + """ In sessions of survey that has conditional questions, as the survey is passed at the same time by + many users, we need to extract the most chosen answers, to determine the next questions to display. """ + + # get user_inputs from current session + current_user_inputs = self.user_input_ids.filtered(lambda input: input.create_date > self.session_start_time) + current_user_input_lines = current_user_inputs.mapped('user_input_line_ids').filtered(lambda answer: answer.suggested_answer_id) + + # count the number of vote per answer + votes_by_answer = dict.fromkeys(current_user_input_lines.mapped('suggested_answer_id'), 0) + for answer in current_user_input_lines: + votes_by_answer[answer.suggested_answer_id] += 1 + + # extract most voted answer for each question + most_voted_answer_by_questions = dict.fromkeys(current_user_input_lines.mapped('question_id')) + for question in most_voted_answer_by_questions.keys(): + for answer in votes_by_answer.keys(): + if answer.question_id != question: + continue + most_voted_answer = most_voted_answer_by_questions[question] + if not most_voted_answer or votes_by_answer[most_voted_answer] < votes_by_answer[answer]: + most_voted_answer_by_questions[question] = answer + + # return a fake 'audience' user_input + fake_user_input = self.env['survey.user_input'].new({ + 'survey_id': self.id, + 'predefined_question_ids': [(6, 0, self._prepare_user_input_predefined_questions().ids)] + }) + + fake_user_input_lines = self.env['survey.user_input.line'] + for question, answer in most_voted_answer_by_questions.items(): + fake_user_input_lines |= self.env['survey.user_input.line'].new({ + 'question_id': question.id, + 'suggested_answer_id': answer.id, + 'survey_id': self.id, + 'user_input_id': fake_user_input.id + }) + + return fake_user_input + + def _prepare_leaderboard_values(self): + """" The leaderboard is descending and takes the total of the attendee points minus the + current question score. + We need both the total and the current question points to be able to show the attendees + leaderboard and shift their position based on the score they have on the current question. + This prepares a structure containing all the necessary data for the animations done on + the frontend side. + The leaderboard is sorted based on attendees score *before* the current question. + The frontend will shift positions around accordingly. """ + + self.ensure_one() + + leaderboard = self.env['survey.user_input'].search_read([ + ('survey_id', '=', self.id), + ('create_date', '>=', self.session_start_time) + ], [ + 'id', + 'nickname', + 'scoring_total', + ], limit=15, order="scoring_total desc") + + if leaderboard and self.session_state == 'in_progress' and \ + any(answer.answer_score for answer in self.session_question_id.suggested_answer_ids): + question_scores = {} + input_lines = self.env['survey.user_input.line'].search_read( + [('user_input_id', 'in', [score['id'] for score in leaderboard]), + ('question_id', '=', self.session_question_id.id)], + ['user_input_id', 'answer_score']) + for input_line in input_lines: + question_scores[input_line['user_input_id'][0]] = \ + question_scores.get(input_line['user_input_id'][0], 0) + input_line['answer_score'] + + score_position = 0 + for leaderboard_item in leaderboard: + question_score = question_scores.get(leaderboard_item['id'], 0) + leaderboard_item.update({ + 'updated_score': leaderboard_item['scoring_total'], + 'scoring_total': leaderboard_item['scoring_total'] - question_score, + 'leaderboard_position': score_position, + 'max_question_score': sum( + score for score in self.session_question_id.suggested_answer_ids.mapped('answer_score') + if score > 0 + ) or 1, + 'question_score': question_score + }) + score_position += 1 + leaderboard = sorted( + leaderboard, + key=lambda score: score['scoring_total'], + reverse=True) + + return leaderboard + + + # ------------------------------------------------------------ + # ACTIONS + # ------------------------------------------------------------ + + def action_draft(self): + self.write({'state': 'draft'}) + + def action_open(self): + self.write({'state': 'open'}) + + def action_close(self): + self.write({'state': 'closed'}) + + def action_send_survey(self): + """ Open a window to compose an email, pre-filled with the survey message """ + # Ensure that this survey has at least one question. + if not self.question_ids: + raise UserError(_('You cannot send an invitation for a survey that has no questions.')) + + # Ensure that this survey has at least one section with question(s), if question layout is 'One page per section'. + if self.questions_layout == 'page_per_section': + if not self.page_ids: + raise UserError(_('You cannot send an invitation for a "One page per section" survey if the survey has no sections.')) + if not self.page_ids.mapped('question_ids'): + raise UserError(_('You cannot send an invitation for a "One page per section" survey if the survey only contains empty sections.')) + + if self.state == 'closed': + raise exceptions.UserError(_("You cannot send invitations for closed surveys.")) + + template = self.env.ref('survey.mail_template_user_input_invite', raise_if_not_found=False) + + local_context = dict( + self.env.context, + default_survey_id=self.id, + default_use_template=bool(template), + default_template_id=template and template.id or False, + notif_layout='mail.mail_notification_light', + ) + return { + 'type': 'ir.actions.act_window', + 'view_mode': 'form', + 'res_model': 'survey.invite', + 'target': 'new', + 'context': local_context, + } + + def action_start_survey(self, answer=None): + """ Open the website page with the survey form """ + self.ensure_one() + url = '%s?%s' % (self.get_start_url(), werkzeug.urls.url_encode({'answer_token': answer and answer.access_token or None})) + return { + 'type': 'ir.actions.act_url', + 'name': "Start Survey", + 'target': 'self', + 'url': url, + } + + def action_print_survey(self, answer=None): + """ Open the website page with the survey printable view """ + self.ensure_one() + url = '%s?%s' % (self.get_print_url(), werkzeug.urls.url_encode({'answer_token': answer and answer.access_token or None})) + return { + 'type': 'ir.actions.act_url', + 'name': "Print Survey", + 'target': 'self', + 'url': url + } + + def action_result_survey(self): + """ Open the website page with the survey results view """ + self.ensure_one() + return { + 'type': 'ir.actions.act_url', + 'name': "Results of the Survey", + 'target': 'self', + 'url': '/survey/results/%s' % self.id + } + + def action_test_survey(self): + ''' Open the website page with the survey form into test mode''' + self.ensure_one() + return { + 'type': 'ir.actions.act_url', + 'name': "Test Survey", + 'target': 'self', + 'url': '/survey/test/%s' % self.access_token, + } + + def action_survey_user_input_completed(self): + action = self.env['ir.actions.act_window']._for_xml_id('survey.action_survey_user_input') + ctx = dict(self.env.context) + ctx.update({'search_default_survey_id': self.ids[0], + 'search_default_completed': 1, + 'search_default_not_test': 1}) + action['context'] = ctx + return action + + def action_survey_user_input_certified(self): + action = self.env['ir.actions.act_window']._for_xml_id('survey.action_survey_user_input') + ctx = dict(self.env.context) + ctx.update({'search_default_survey_id': self.ids[0], + 'search_default_scoring_success': 1, + 'search_default_not_test': 1}) + action['context'] = ctx + return action + + def action_survey_user_input(self): + action = self.env['ir.actions.act_window']._for_xml_id('survey.action_survey_user_input') + ctx = dict(self.env.context) + ctx.update({'search_default_survey_id': self.ids[0], + 'search_default_not_test': 1}) + action['context'] = ctx + return action + + def action_survey_preview_certification_template(self): + self.ensure_one() + return { + 'type': 'ir.actions.act_url', + 'target': '_blank', + 'url': '/survey/%s/get_certification_preview' % (self.id) + } + + def action_start_session(self): + """ Sets the necessary fields for the session to take place and starts it. + The write is sudo'ed because a survey user can start a session even if it's + not his own survey. """ + + if not self.env.user.has_group('survey.group_survey_user'): + raise AccessError(_('Only survey users can manage sessions.')) + + self.ensure_one() + self.sudo().write({ + 'questions_layout': 'page_per_question', + 'session_start_time': fields.Datetime.now(), + 'session_question_id': None, + 'session_state': 'ready' + }) + return self.action_open_session_manager() + + def action_open_session_manager(self): + self.ensure_one() + + return { + 'type': 'ir.actions.act_url', + 'name': "Open Session Manager", + 'target': 'self', + 'url': '/survey/session/manage/%s' % self.access_token + } + + def action_end_session(self): + """ The write is sudo'ed because a survey user can end a session even if it's + not his own survey. """ + + if not self.env.user.has_group('survey.group_survey_user'): + raise AccessError(_('Only survey users can manage sessions.')) + + self.sudo().write({'session_state': False}) + self.user_input_ids.sudo().write({'state': 'done'}) + self.env['bus.bus'].sendone(self.access_token, {'type': 'end_session'}) + + def get_start_url(self): + return '/survey/start/%s' % self.access_token + + def get_start_short_url(self): + """ See controller method docstring for more details. """ + return '/s/%s' % self.access_token[:6] + + def get_print_url(self): + return '/survey/print/%s' % self.access_token + + # ------------------------------------------------------------ + # GRAPH / RESULTS + # ------------------------------------------------------------ + + def _prepare_statistics(self, user_input_lines=None): + if user_input_lines: + user_input_domain = [ + ('survey_id', 'in', self.ids), + ('id', 'in', user_input_lines.mapped('user_input_id').ids) + ] + else: + user_input_domain = [ + ('survey_id', 'in', self.ids), + ('state', '=', 'done'), + ('test_entry', '=', False) + ] + count_data = self.env['survey.user_input'].sudo().read_group(user_input_domain, ['scoring_success', 'id:count_distinct'], ['scoring_success']) + + scoring_success_count = 0 + scoring_failed_count = 0 + for count_data_item in count_data: + if count_data_item['scoring_success']: + scoring_success_count += count_data_item['scoring_success_count'] + else: + scoring_failed_count += count_data_item['scoring_success_count'] + + success_graph = json.dumps([{ + 'text': _('Passed'), + 'count': scoring_success_count, + 'color': '#2E7D32' + }, { + 'text': _('Missed'), + 'count': scoring_failed_count, + 'color': '#C62828' + }]) + + total = scoring_success_count + scoring_failed_count + return { + 'global_success_rate': round((scoring_success_count / total) * 100, 1) if total > 0 else 0, + 'global_success_graph': success_graph + } + + # ------------------------------------------------------------ + # GAMIFICATION / BADGES + # ------------------------------------------------------------ + + def _prepare_challenge_category(self): + return 'certification' + + def _create_certification_badge_trigger(self): + self.ensure_one() + goal = self.env['gamification.goal.definition'].create({ + 'name': self.title, + 'description': _("%s certification passed", self.title), + 'domain': "['&', ('survey_id', '=', %s), ('scoring_success', '=', True)]" % self.id, + 'computation_mode': 'count', + 'display_mode': 'boolean', + 'model_id': self.env.ref('survey.model_survey_user_input').id, + 'condition': 'higher', + 'batch_mode': True, + 'batch_distinctive_field': self.env.ref('survey.field_survey_user_input__partner_id').id, + 'batch_user_expression': 'user.partner_id.id' + }) + challenge = self.env['gamification.challenge'].create({ + 'name': _('%s challenge certification', self.title), + 'reward_id': self.certification_badge_id.id, + 'state': 'inprogress', + 'period': 'once', + 'challenge_category': self._prepare_challenge_category(), + 'reward_realtime': True, + 'report_message_frequency': 'never', + 'user_domain': [('karma', '>', 0)], + 'visibility_mode': 'personal' + }) + self.env['gamification.challenge.line'].create({ + 'definition_id': goal.id, + 'challenge_id': challenge.id, + 'target_goal': 1 + }) + + def _handle_certification_badges(self, vals): + if vals.get('certification_give_badge'): + # If badge already set on records, reactivate the ones that are not active. + surveys_with_badge = self.filtered(lambda survey: survey.certification_badge_id and not survey.certification_badge_id.active) + surveys_with_badge.mapped('certification_badge_id').action_unarchive() + # (re-)create challenge and goal + for survey in self: + survey._create_certification_badge_trigger() + else: + # if badge with owner : archive them, else delete everything (badge, challenge, goal) + badges = self.mapped('certification_badge_id') + challenges_to_delete = self.env['gamification.challenge'].search([('reward_id', 'in', badges.ids)]) + goals_to_delete = challenges_to_delete.mapped('line_ids').mapped('definition_id') + badges.action_archive() + # delete all challenges and goals because not needed anymore (challenge lines are deleted in cascade) + challenges_to_delete.unlink() + goals_to_delete.unlink() diff --git a/addons/survey/models/survey_user.py b/addons/survey/models/survey_user.py new file mode 100644 index 00000000..9e4b4659 --- /dev/null +++ b/addons/survey/models/survey_user.py @@ -0,0 +1,628 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +import logging +import uuid + +from dateutil.relativedelta import relativedelta + +from odoo import api, fields, models, _ +from odoo.exceptions import ValidationError +from odoo.tools import float_is_zero + +_logger = logging.getLogger(__name__) + + +class SurveyUserInput(models.Model): + """ Metadata for a set of one user's answers to a particular survey """ + _name = "survey.user_input" + _rec_name = 'survey_id' + _description = 'Survey User Input' + + # answer description + survey_id = fields.Many2one('survey.survey', string='Survey', required=True, readonly=True, ondelete='cascade') + scoring_type = fields.Selection(string="Scoring", related="survey_id.scoring_type") + start_datetime = fields.Datetime('Start date and time', readonly=True) + deadline = fields.Datetime('Deadline', help="Datetime until customer can open the survey and submit answers") + state = fields.Selection([ + ('new', 'Not started yet'), + ('in_progress', 'In Progress'), + ('done', 'Completed')], string='Status', default='new', readonly=True) + test_entry = fields.Boolean(readonly=True) + last_displayed_page_id = fields.Many2one('survey.question', string='Last displayed question/page') + # attempts management + is_attempts_limited = fields.Boolean("Limited number of attempts", related='survey_id.is_attempts_limited') + attempts_limit = fields.Integer("Number of attempts", related='survey_id.attempts_limit') + attempts_number = fields.Integer("Attempt n°", compute='_compute_attempts_number') + survey_time_limit_reached = fields.Boolean("Survey Time Limit Reached", compute='_compute_survey_time_limit_reached') + # identification / access + access_token = fields.Char('Identification token', default=lambda self: str(uuid.uuid4()), readonly=True, required=True, copy=False) + invite_token = fields.Char('Invite token', readonly=True, copy=False) # no unique constraint, as it identifies a pool of attempts + partner_id = fields.Many2one('res.partner', string='Partner', readonly=True) + email = fields.Char('Email', readonly=True) + nickname = fields.Char('Nickname', help="Attendee nickname, mainly used to identify him in the survey session leaderboard.") + # questions / answers + user_input_line_ids = fields.One2many('survey.user_input.line', 'user_input_id', string='Answers', copy=True) + predefined_question_ids = fields.Many2many('survey.question', string='Predefined Questions', readonly=True) + scoring_percentage = fields.Float("Score (%)", compute="_compute_scoring_values", store=True, compute_sudo=True) # stored for perf reasons + scoring_total = fields.Float("Total Score", compute="_compute_scoring_values", store=True, compute_sudo=True) # stored for perf reasons + scoring_success = fields.Boolean('Quizz Passed', compute='_compute_scoring_success', store=True, compute_sudo=True) # stored for perf reasons + # live sessions + is_session_answer = fields.Boolean('Is in a Session', help="Is that user input part of a survey session or not.") + question_time_limit_reached = fields.Boolean("Question Time Limit Reached", compute='_compute_question_time_limit_reached') + + _sql_constraints = [ + ('unique_token', 'UNIQUE (access_token)', 'An access token must be unique!'), + ] + + @api.depends('user_input_line_ids.answer_score', 'user_input_line_ids.question_id', 'predefined_question_ids.answer_score') + def _compute_scoring_values(self): + for user_input in self: + # sum(multi-choice question scores) + sum(simple answer_type scores) + total_possible_score = 0 + for question in user_input.predefined_question_ids: + if question.question_type in ['simple_choice', 'multiple_choice']: + total_possible_score += sum(score for score in question.mapped('suggested_answer_ids.answer_score') if score > 0) + elif question.is_scored_question: + total_possible_score += question.answer_score + + if total_possible_score == 0: + user_input.scoring_percentage = 0 + user_input.scoring_total = 0 + else: + score_total = sum(user_input.user_input_line_ids.mapped('answer_score')) + user_input.scoring_total = score_total + score_percentage = (score_total / total_possible_score) * 100 + user_input.scoring_percentage = round(score_percentage, 2) if score_percentage > 0 else 0 + + @api.depends('scoring_percentage', 'survey_id') + def _compute_scoring_success(self): + for user_input in self: + user_input.scoring_success = user_input.scoring_percentage >= user_input.survey_id.scoring_success_min + + @api.depends( + 'start_datetime', + 'survey_id.is_time_limited', + 'survey_id.time_limit') + def _compute_survey_time_limit_reached(self): + """ Checks that the user_input is not exceeding the survey's time limit. """ + for user_input in self: + if not user_input.is_session_answer and user_input.start_datetime: + start_time = user_input.start_datetime + time_limit = user_input.survey_id.time_limit + user_input.survey_time_limit_reached = user_input.survey_id.is_time_limited and \ + fields.Datetime.now() >= start_time + relativedelta(minutes=time_limit) + else: + user_input.survey_time_limit_reached = False + + @api.depends( + 'survey_id.session_question_id.time_limit', + 'survey_id.session_question_id.is_time_limited', + 'survey_id.session_question_start_time') + def _compute_question_time_limit_reached(self): + """ Checks that the user_input is not exceeding the question's time limit. + Only used in the context of survey sessions. """ + for user_input in self: + if user_input.is_session_answer and user_input.survey_id.session_question_start_time: + start_time = user_input.survey_id.session_question_start_time + time_limit = user_input.survey_id.session_question_id.time_limit + user_input.question_time_limit_reached = user_input.survey_id.session_question_id.is_time_limited and \ + fields.Datetime.now() >= start_time + relativedelta(seconds=time_limit) + else: + user_input.question_time_limit_reached = False + + @api.depends('state', 'test_entry', 'survey_id.is_attempts_limited', 'partner_id', 'email', 'invite_token') + def _compute_attempts_number(self): + attempts_to_compute = self.filtered( + lambda user_input: user_input.state == 'done' and not user_input.test_entry and user_input.survey_id.is_attempts_limited + ) + + for user_input in (self - attempts_to_compute): + user_input.attempts_number = 1 + + if attempts_to_compute: + self.env.cr.execute("""SELECT user_input.id, (COUNT(previous_user_input.id) + 1) AS attempts_number + FROM survey_user_input user_input + LEFT OUTER JOIN survey_user_input previous_user_input + ON user_input.survey_id = previous_user_input.survey_id + AND previous_user_input.state = 'done' + AND previous_user_input.test_entry IS NOT TRUE + AND previous_user_input.id < user_input.id + AND (user_input.invite_token IS NULL OR user_input.invite_token = previous_user_input.invite_token) + AND (user_input.partner_id = previous_user_input.partner_id OR user_input.email = previous_user_input.email) + WHERE user_input.id IN %s + GROUP BY user_input.id; + """, (tuple(attempts_to_compute.ids),)) + + attempts_count_results = self.env.cr.dictfetchall() + + for user_input in attempts_to_compute: + attempts_number = 1 + for attempts_count_result in attempts_count_results: + if attempts_count_result['id'] == user_input.id: + attempts_number = attempts_count_result['attempts_number'] + break + + user_input.attempts_number = attempts_number + + @api.model_create_multi + def create(self, vals_list): + for vals in vals_list: + if 'predefined_question_ids' not in vals: + suvey_id = vals.get('survey_id', self.env.context.get('default_survey_id')) + survey = self.env['survey.survey'].browse(suvey_id) + vals['predefined_question_ids'] = [(6, 0, survey._prepare_user_input_predefined_questions().ids)] + return super(SurveyUserInput, self).create(vals_list) + + # ------------------------------------------------------------ + # ACTIONS / BUSINESS + # ------------------------------------------------------------ + + def action_resend(self): + partners = self.env['res.partner'] + emails = [] + for user_answer in self: + if user_answer.partner_id: + partners |= user_answer.partner_id + elif user_answer.email: + emails.append(user_answer.email) + + return self.survey_id.with_context( + default_existing_mode='resend', + default_partner_ids=partners.ids, + default_emails=','.join(emails) + ).action_send_survey() + + def action_print_answers(self): + """ Open the website page with the survey form """ + self.ensure_one() + return { + 'type': 'ir.actions.act_url', + 'name': "View Answers", + 'target': 'self', + 'url': '/survey/print/%s?answer_token=%s' % (self.survey_id.access_token, self.access_token) + } + + @api.model + def _generate_invite_token(self): + return str(uuid.uuid4()) + + def _mark_in_progress(self): + """ marks the state as 'in_progress' and updates the start_datetime accordingly. """ + self.write({ + 'start_datetime': fields.Datetime.now(), + 'state': 'in_progress' + }) + + def _mark_done(self): + """ This method will: + 1. mark the state as 'done' + 2. send the certification email with attached document if + - The survey is a certification + - It has a certification_mail_template_id set + - The user succeeded the test + Will also run challenge Cron to give the certification badge if any.""" + self.write({'state': 'done'}) + Challenge = self.env['gamification.challenge'].sudo() + badge_ids = [] + for user_input in self: + if user_input.survey_id.certification and user_input.scoring_success: + if user_input.survey_id.certification_mail_template_id and not user_input.test_entry: + user_input.survey_id.certification_mail_template_id.send_mail(user_input.id, notif_layout="mail.mail_notification_light") + if user_input.survey_id.certification_give_badge: + badge_ids.append(user_input.survey_id.certification_badge_id.id) + + # Update predefined_question_id to remove inactive questions + user_input.predefined_question_ids -= user_input._get_inactive_conditional_questions() + + if badge_ids: + challenges = Challenge.search([('reward_id', 'in', badge_ids)]) + if challenges: + Challenge._cron_update(ids=challenges.ids, commit=False) + + def get_start_url(self): + self.ensure_one() + return '%s?answer_token=%s' % (self.survey_id.get_start_url(), self.access_token) + + def get_print_url(self): + self.ensure_one() + return '%s?answer_token=%s' % (self.survey_id.get_print_url(), self.access_token) + + # ------------------------------------------------------------ + # CREATE / UPDATE LINES FROM SURVEY FRONTEND INPUT + # ------------------------------------------------------------ + + def save_lines(self, question, answer, comment=None): + """ Save answers to questions, depending on question type + + If an answer already exists for question and user_input_id, it will be + overwritten (or deleted for 'choice' questions) (in order to maintain data consistency). + """ + old_answers = self.env['survey.user_input.line'].search([ + ('user_input_id', '=', self.id), + ('question_id', '=', question.id) + ]) + + if question.question_type in ['char_box', 'text_box', 'numerical_box', 'date', 'datetime']: + self._save_line_simple_answer(question, old_answers, answer) + if question.save_as_email and answer: + self.write({'email': answer}) + if question.save_as_nickname and answer: + self.write({'nickname': answer}) + + elif question.question_type in ['simple_choice', 'multiple_choice']: + self._save_line_choice(question, old_answers, answer, comment) + elif question.question_type == 'matrix': + self._save_line_matrix(question, old_answers, answer, comment) + else: + raise AttributeError(question.question_type + ": This type of question has no saving function") + + def _save_line_simple_answer(self, question, old_answers, answer): + vals = self._get_line_answer_values(question, answer, question.question_type) + if old_answers: + old_answers.write(vals) + return old_answers + else: + return self.env['survey.user_input.line'].create(vals) + + def _save_line_choice(self, question, old_answers, answers, comment): + if not (isinstance(answers, list)): + answers = [answers] + vals_list = [] + + if question.question_type == 'simple_choice': + if not question.comment_count_as_answer or not question.comments_allowed or not comment: + vals_list = [self._get_line_answer_values(question, answer, 'suggestion') for answer in answers] + elif question.question_type == 'multiple_choice': + vals_list = [self._get_line_answer_values(question, answer, 'suggestion') for answer in answers] + + if comment: + vals_list.append(self._get_line_comment_values(question, comment)) + + old_answers.sudo().unlink() + return self.env['survey.user_input.line'].create(vals_list) + + def _save_line_matrix(self, question, old_answers, answers, comment): + vals_list = [] + + if answers: + for row_key, row_answer in answers.items(): + for answer in row_answer: + vals = self._get_line_answer_values(question, answer, 'suggestion') + vals['matrix_row_id'] = int(row_key) + vals_list.append(vals.copy()) + + if comment: + vals_list.append(self._get_line_comment_values(question, comment)) + + old_answers.sudo().unlink() + return self.env['survey.user_input.line'].create(vals_list) + + def _get_line_answer_values(self, question, answer, answer_type): + vals = { + 'user_input_id': self.id, + 'question_id': question.id, + 'skipped': False, + 'answer_type': answer_type, + } + if not answer or (isinstance(answer, str) and not answer.strip()): + vals.update(answer_type=None, skipped=True) + return vals + + if answer_type == 'suggestion': + vals['suggested_answer_id'] = int(answer) + elif answer_type == 'numerical_box': + vals['value_numerical_box'] = float(answer) + else: + vals['value_%s' % answer_type] = answer + return vals + + def _get_line_comment_values(self, question, comment): + return { + 'user_input_id': self.id, + 'question_id': question.id, + 'skipped': False, + 'answer_type': 'char_box', + 'value_char_box': comment, + } + + # ------------------------------------------------------------ + # STATISTICS / RESULTS + # ------------------------------------------------------------ + + def _prepare_statistics(self): + res = dict((user_input, { + 'correct': 0, + 'incorrect': 0, + 'partial': 0, + 'skipped': 0, + }) for user_input in self) + + scored_questions = self.mapped('predefined_question_ids').filtered(lambda question: question.is_scored_question) + + for question in scored_questions: + if question.question_type in ['simple_choice', 'multiple_choice']: + question_correct_suggested_answers = question.suggested_answer_ids.filtered(lambda answer: answer.is_correct) + for user_input in self: + user_input_lines = user_input.user_input_line_ids.filtered(lambda line: line.question_id == question) + if question.question_type in ['simple_choice', 'multiple_choice']: + res[user_input][self._choice_question_answer_result(user_input_lines, question_correct_suggested_answers)] += 1 + else: + res[user_input][self._simple_question_answer_result(user_input_lines)] += 1 + + return [[ + {'text': _("Correct"), 'count': res[user_input]['correct']}, + {'text': _("Partially"), 'count': res[user_input]['partial']}, + {'text': _("Incorrect"), 'count': res[user_input]['incorrect']}, + {'text': _("Unanswered"), 'count': res[user_input]['skipped']} + ] for user_input in self] + + def _choice_question_answer_result(self, user_input_lines, question_correct_suggested_answers): + correct_user_input_lines = user_input_lines.filtered(lambda line: line.answer_is_correct and not line.skipped).mapped('suggested_answer_id') + incorrect_user_input_lines = user_input_lines.filtered(lambda line: not line.answer_is_correct and not line.skipped) + if question_correct_suggested_answers and correct_user_input_lines == question_correct_suggested_answers: + return 'correct' + elif correct_user_input_lines and correct_user_input_lines < question_correct_suggested_answers: + return 'partial' + elif not correct_user_input_lines and incorrect_user_input_lines: + return 'incorrect' + else: + return 'skipped' + + def _simple_question_answer_result(self, user_input_line): + if user_input_line.skipped: + return 'skipped' + elif user_input_line.answer_is_correct: + return 'correct' + else: + return 'incorrect' + + # ------------------------------------------------------------ + # Conditional Questions Management + # ------------------------------------------------------------ + + def _get_conditional_values(self): + """ For survey containing conditional questions, we need a triggered_questions_by_answer map that contains + {key: answer, value: the question that the answer triggers, if selected}, + The idea is to be able to verify, on every answer check, if this answer is triggering the display + of another question. + If answer is not in the conditional map: + - nothing happens. + If the answer is in the conditional map: + - If we are in ONE PAGE survey : (handled at CLIENT side) + -> display immediately the depending question + - If we are in PAGE PER SECTION : (handled at CLIENT side) + - If related question is on the same page : + -> display immediately the depending question + - If the related question is not on the same page : + -> keep the answers in memory and check at next page load if the depending question is in there and + display it, if so. + - If we are in PAGE PER QUESTION : (handled at SERVER side) + -> During submit, determine which is the next question to display getting the next question + that is the next in sequence and that is either not triggered by another question's answer, or that + is triggered by an already selected answer. + To do all this, we need to return: + - list of all selected answers: [answer_id1, answer_id2, ...] (for survey reloading, otherwise, this list is + updated at client side) + - triggered_questions_by_answer: dict -> for a given answer, list of questions triggered by this answer; + Used mainly for dynamic show/hide behaviour at client side + - triggering_answer_by_question: dict -> for a given question, the answer that triggers it + Used mainly to ease template rendering + """ + triggering_answer_by_question, triggered_questions_by_answer = {}, {} + # Ignore conditional configuration if randomised questions selection + if self.survey_id.questions_selection != 'random': + triggering_answer_by_question, triggered_questions_by_answer = self.survey_id._get_conditional_maps() + selected_answers = self._get_selected_suggested_answers() + + return triggering_answer_by_question, triggered_questions_by_answer, selected_answers + + def _get_selected_suggested_answers(self): + """ + For now, only simple and multiple choices question type are handled by the conditional questions feature. + Mapping all the suggested answers selected by the user will also include answers from matrix question type, + Those ones won't be used. + Maybe someday, conditional questions feature will be extended to work with matrix question. + :return: all the suggested answer selected by the user. + """ + return self.mapped('user_input_line_ids.suggested_answer_id') + + def _clear_inactive_conditional_answers(self): + """ + Clean eventual answers on conditional questions that should not have been displayed to user. + This method is used mainly for page per question survey, a similar method does the same treatment + at client side for the other survey layouts. + E.g.: if depending answer was uncheck after answering conditional question, we need to clear answers + of that conditional question, for two reasons: + - ensure correct scoring + - if the selected answer triggers another question later in the survey, if the answer is not cleared, + a question that should not be displayed to the user will be. + + TODO DBE: Maybe this can be the only cleaning method, even for section_per_page or one_page where + conditional questions are, for now, cleared in JS directly. But this can be annoying if user typed a long + answer, changed his mind unchecking depending answer and changed again his mind by rechecking the depending + answer -> For now, the long answer will be lost. If we use this as the master cleaning method, + long answer will be cleared only during submit. + """ + inactive_questions = self._get_inactive_conditional_questions() + + # delete user.input.line on question that should not be answered. + answers_to_delete = self.user_input_line_ids.filtered(lambda answer: answer.question_id in inactive_questions) + answers_to_delete.unlink() + + def _get_inactive_conditional_questions(self): + triggering_answer_by_question, triggered_questions_by_answer, selected_answers = self._get_conditional_values() + + # get questions that should not be answered + inactive_questions = self.env['survey.question'] + for answer in triggered_questions_by_answer.keys(): + if answer not in selected_answers: + for question in triggered_questions_by_answer[answer]: + inactive_questions |= question + return inactive_questions + + def _get_print_questions(self): + """ Get the questions to display : the ones that should have been answered = active questions + In case of session, active questions are based on most voted answers + :return: active survey.question browse records + """ + survey = self.survey_id + if self.is_session_answer: + most_voted_answers = survey._get_session_most_voted_answers() + inactive_questions = most_voted_answers._get_inactive_conditional_questions() + else: + inactive_questions = self._get_inactive_conditional_questions() + return survey.question_ids - inactive_questions + + +class SurveyUserInputLine(models.Model): + _name = 'survey.user_input.line' + _description = 'Survey User Input Line' + _rec_name = 'user_input_id' + _order = 'question_sequence, id' + + # survey data + user_input_id = fields.Many2one('survey.user_input', string='User Input', ondelete='cascade', required=True) + survey_id = fields.Many2one(related='user_input_id.survey_id', string='Survey', store=True, readonly=False) + question_id = fields.Many2one('survey.question', string='Question', ondelete='cascade', required=True) + page_id = fields.Many2one(related='question_id.page_id', string="Section", readonly=False) + question_sequence = fields.Integer('Sequence', related='question_id.sequence', store=True) + # answer + skipped = fields.Boolean('Skipped') + answer_type = fields.Selection([ + ('text_box', 'Free Text'), + ('char_box', 'Text'), + ('numerical_box', 'Number'), + ('date', 'Date'), + ('datetime', 'Datetime'), + ('suggestion', 'Suggestion')], string='Answer Type') + value_char_box = fields.Char('Text answer') + value_numerical_box = fields.Float('Numerical answer') + value_date = fields.Date('Date answer') + value_datetime = fields.Datetime('Datetime answer') + value_text_box = fields.Text('Free Text answer') + suggested_answer_id = fields.Many2one('survey.question.answer', string="Suggested answer") + matrix_row_id = fields.Many2one('survey.question.answer', string="Row answer") + # scoring + answer_score = fields.Float('Score') + answer_is_correct = fields.Boolean('Correct') + + @api.constrains('skipped', 'answer_type') + def _check_answer_type_skipped(self): + for line in self: + if (line.skipped == bool(line.answer_type)): + raise ValidationError(_('A question can either be skipped or answered, not both.')) + + # allow 0 for numerical box + if line.answer_type == 'numerical_box' and float_is_zero(line['value_numerical_box'], precision_digits=6): + continue + if line.answer_type == 'suggestion': + field_name = 'suggested_answer_id' + elif line.answer_type: + field_name = 'value_%s' % line.answer_type + else: # skipped + field_name = False + + if field_name and not line[field_name]: + raise ValidationError(_('The answer must be in the right type')) + + @api.model_create_multi + def create(self, vals_list): + for vals in vals_list: + score_vals = self._get_answer_score_values(vals) + if not vals.get('answer_score'): + vals.update(score_vals) + return super(SurveyUserInputLine, self).create(vals_list) + + def write(self, vals): + res = True + for line in self: + vals_copy = {**vals} + getter_params = { + 'user_input_id': line.user_input_id.id, + 'answer_type': line.answer_type, + 'question_id': line.question_id.id, + **vals_copy + } + score_vals = self._get_answer_score_values(getter_params, compute_speed_score=False) + if not vals_copy.get('answer_score'): + vals_copy.update(score_vals) + res = super(SurveyUserInputLine, line).write(vals_copy) and res + return res + + @api.model + def _get_answer_score_values(self, vals, compute_speed_score=True): + """ Get values for: answer_is_correct and associated answer_score. + + Requires vals to contain 'answer_type', 'question_id', and 'user_input_id'. + Depending on 'answer_type' additional value of 'suggested_answer_id' may also be + required. + + Calculates whether an answer_is_correct and its score based on 'answer_type' and + corresponding question. Handles choice (answer_type == 'suggestion') questions + separately from other question types. Each selected choice answer is handled as an + individual answer. + + If score depends on the speed of the answer, it is adjusted as follows: + - If the user answers in less than 2 seconds, they receive 100% of the possible points. + - If user answers after that, they receive 50% of the possible points + the remaining + 50% scaled by the time limit and time taken to answer [i.e. a minimum of 50% of the + possible points is given to all correct answers] + + Example of returned values: + * {'answer_is_correct': False, 'answer_score': 0} (default) + * {'answer_is_correct': True, 'answer_score': 2.0} + """ + user_input_id = vals.get('user_input_id') + answer_type = vals.get('answer_type') + question_id = vals.get('question_id') + if not question_id: + raise ValueError(_('Computing score requires a question in arguments.')) + question = self.env['survey.question'].browse(int(question_id)) + + # default and non-scored questions + answer_is_correct = False + answer_score = 0 + + # record selected suggested choice answer_score (can be: pos, neg, or 0) + if question.question_type in ['simple_choice', 'multiple_choice']: + if answer_type == 'suggestion': + suggested_answer_id = vals.get('suggested_answer_id') + if suggested_answer_id: + question_answer = self.env['survey.question.answer'].browse(int(suggested_answer_id)) + answer_score = question_answer.answer_score + answer_is_correct = question_answer.is_correct + # for all other scored question cases, record question answer_score (can be: pos or 0) + elif question.is_scored_question: + answer = vals.get('value_%s' % answer_type) + if answer_type == 'numerical_box': + answer = float(answer) + elif answer_type == 'date': + answer = fields.Date.from_string(answer) + elif answer_type == 'datetime': + answer = fields.Datetime.from_string(answer) + if answer and answer == question['answer_%s' % answer_type]: + answer_is_correct = True + answer_score = question.answer_score + + if compute_speed_score and answer_score > 0: + user_input = self.env['survey.user_input'].browse(user_input_id) + session_speed_rating = user_input.exists() and user_input.is_session_answer and user_input.survey_id.session_speed_rating + if session_speed_rating: + max_score_delay = 2 + time_limit = question.time_limit + now = fields.Datetime.now() + seconds_to_answer = (now - user_input.survey_id.session_question_start_time).total_seconds() + question_remaining_time = time_limit - seconds_to_answer + # if answered within the max_score_delay => leave score as is + if question_remaining_time < 0: # if no time left + answer_score /= 2 + elif seconds_to_answer > max_score_delay: + time_limit -= max_score_delay # we remove the max_score_delay to have all possible values + score_proportion = (time_limit - seconds_to_answer) / time_limit + answer_score = (answer_score / 2) * (1 + score_proportion) + + return { + 'answer_is_correct': answer_is_correct, + 'answer_score': answer_score + } |
