# -*- coding: utf-8 -*- """ Quiz Models (Consolidated) =========================== All quiz-related models in one file following Odoo standard. Previously split across 5 files: quiz, question, answer, attempt, response. """ from odoo import models, fields, api, _ from odoo.exceptions import UserError # ============================================================================= # QUIZ (Main Container) # ============================================================================= class GeniusQuiz(models.Model): """Quiz/Assessment container for topic validation.""" _name = 'genius.quiz' _description = 'Quiz' _order = 'name' name = fields.Char(string='Quiz Title', required=True, translate=True) description = fields.Html(string='Description', translate=True) # Questions question_ids = fields.One2many('genius.quiz.question', 'quiz_id', string='Questions') question_count = fields.Integer(string='Question Count', compute='_compute_question_count', store=True) # Configuration time_limit_minutes = fields.Integer(string='Time Limit (Minutes)', default=0, help='0 = no limit') passing_score = fields.Float(string='Passing Score (%)', default=70.0) max_attempts = fields.Integer(string='Max Attempts', default=0, help='0 = unlimited') shuffle_questions = fields.Boolean(string='Shuffle Questions', default=False) show_correct_answers = fields.Boolean(string='Show Correct Answers', default=True) # Linked Topics topic_ids = fields.One2many('genius.topic', 'quiz_id', string='Linked Topics') # Stats attempt_ids = fields.One2many('genius.quiz.attempt', 'quiz_id', string='Attempts') attempt_count = fields.Integer(string='Attempt Count', compute='_compute_stats') avg_score = fields.Float(string='Average Score (%)', compute='_compute_stats') pass_rate = fields.Float(string='Pass Rate (%)', compute='_compute_stats') active = fields.Boolean(string='Active', default=True) company_id = fields.Many2one('res.company', string='Company', default=lambda self: self.env.company) @api.depends('question_ids') def _compute_question_count(self): for quiz in self: quiz.question_count = len(quiz.question_ids) @api.depends('attempt_ids', 'attempt_ids.score', 'attempt_ids.is_passed', 'attempt_ids.is_preview') def _compute_stats(self): for quiz in self: attempts = quiz.attempt_ids.filtered(lambda a: not a.is_preview) quiz.attempt_count = len(attempts) if attempts: quiz.avg_score = sum(a.score for a in attempts) / len(attempts) passed = len(attempts.filtered('is_passed')) quiz.pass_rate = (passed / len(attempts)) * 100 else: quiz.avg_score = 0.0 quiz.pass_rate = 0.0 def unlink(self): """Prevent deleting quizzes with real attempts or linked tours.""" for quiz in self: # Check for real attempts (exclude preview/test attempts) real_attempts = quiz.attempt_ids.filtered(lambda a: not a.is_preview) if real_attempts: raise UserError( _('Cannot delete quiz "%s" because it has %d user attempt(s). ' 'Archive it instead (uncheck Active).') % (quiz.name, len(real_attempts)) ) # Check for linked tours if quiz.topic_ids: tour_names = ', '.join(quiz.topic_ids.mapped('name')) raise UserError( _('Cannot delete quiz "%s" because it is linked to tour(s): %s. ' 'Unlink the tours first.') % (quiz.name, tour_names) ) return super(GeniusQuiz, self).unlink() # Advanced Config sample_size = fields.Integer(string='Questions to Ask', default=0, help='0 = Ask all questions') success_message = fields.Html(string='Success Message', translate=True, help='Message shown when passing') fail_message = fields.Html(string='Failure Message', translate=True, help='Message shown when failing') # Certificate Template Fields - Branding certificate_logo = fields.Binary( string='Primary Logo', attachment=True, help='Main organization logo (top left)' ) certificate_secondary_logo = fields.Binary( string='Secondary Logo', attachment=True, help='Secondary logo, e.g., system/department logo (top right)' ) certificate_title = fields.Char( string='Certificate Title', default='CERTIFICATE', translate=True, help='Main title (e.g., CERTIFICATE, شهادة)' ) certificate_issuer = fields.Char( string='Issuing Organization', default='Training Department', translate=True, help='Organization name issuing the certificate' ) # Certificate Template Fields - Body certificate_body_template = fields.Text( string='Achievement Description', default='has successfully completed the training course and demonstrated mastery of the required competencies.', translate=True, help='Achievement text. Variables: {user_name}, {topic_name}, {quiz_name}' ) is_published = fields.Boolean(string='Published', default=True) # Genius Widget Hook - used to attach the JS widget for test mode test_mode_opener = fields.Char(string='Test Mode Button', compute='_compute_test_mode_opener') def _compute_test_mode_opener(self): for record in self: record.test_mode_opener = "Test Mode" # Certificate Template Fields - Signature certificate_signature_image = fields.Binary( string='Signature Image', attachment=True, help='Digital signature image (PNG with transparent background recommended)' ) certificate_signature_name = fields.Char( string='Signatory Name', default='Training Director', translate=True, help='Name of authorized signatory' ) certificate_signature_title = fields.Char( string='Signatory Position', translate=True, help='Position/title of signatory' ) certificate_stamp = fields.Binary( string='Official Stamp/Seal', attachment=True, help='Official company stamp or seal image (PNG with transparent background recommended)' ) def _truncate_text(self, text, max_length=50): """Safely truncate text, handling None/False values""" if not text: return '' text = str(text).strip() if len(text) > max_length: return text[:max_length] + '...' return text def create_attempt(self, user_id, is_preview=False): """Create a new attempt with configured logic (shuffle, sample)""" self.ensure_one() # 1. Select Questions - EXPLICITLY sorted by sequence for reliability # One2many _order may not be reliably applied in all ORM operations questions = self.question_ids.sorted(key=lambda q: (q.sequence, q.id)) questions = list(questions) # Convert to list for shuffling/slicing if self.shuffle_questions: import random random.shuffle(questions) # Optional: Truncate to sample size if self.sample_size > 0: questions = questions[:self.sample_size] # 2. Create Attempt # Default Odoo create vals = { 'quiz_id': self.id, 'user_id': user_id, 'is_preview': is_preview, } attempt = self.env['genius.quiz.attempt'].create(vals) # 3. Create Responses with sequence to preserve order responses = [] for idx, q in enumerate(questions): responses.append({ 'attempt_id': attempt.id, 'question_id': q.id, 'sequence': idx + 1, # 1-based sequence }) if responses: self.env['genius.quiz.response'].create(responses) # CRITICAL: Refresh attempt to ensure one2many relations are populated in cache # This fixes "No questions" issue where response_ids might be empty for caller attempt.refresh() return attempt def action_preview_quiz(self): """ Preview quiz - opens quiz popup for admin to test. Returns a client action that triggers the quiz popup. """ self.ensure_one() if not self.question_ids: return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': 'Warning: No Questions', 'message': 'Add questions to the quiz before previewing.', 'type': 'warning', 'sticky': False, } } # Return client action to open quiz popup return { 'type': 'ir.actions.client', 'tag': 'genius_quiz_preview', 'target': 'new', 'params': { 'quiz_id': self.id, 'quiz_name': self.name, } } def action_start_quiz_preview(self): """ AJAX method called by quiz popup to get quiz data for preview. Creates attempt and returns quiz structure. Only instructors can preview quizzes. """ self.ensure_one() # Security: Only instructors can preview if not self.env.user.has_group('tour_genius.group_genius_instructor'): return {'error': 'Only instructors can preview quizzes.'} # Auto-Cleanup: Delete OLD preview attempts for this user/quiz # This prevents DB pollution while keeping the 'current' attempt for certificate generation old_previews = self.env['genius.quiz.attempt'].search([ ('quiz_id', '=', self.id), ('user_id', '=', self.env.user.id), ('is_preview', '=', True) ]) if old_previews: old_previews.unlink() # Create new attempt for preview (bypass max attempts for admin) attempt = self.create_attempt(self.env.user.id, is_preview=True) # Build questions data from attempt's responses questions = [] for response in attempt.response_ids: q = response.question_id question_data = { 'id': q.id, 'text': q.question_text.strip() if q.question_text else '', 'type': q.question_type, 'image': q.image.decode('utf-8') if q.image else None, 'points': q.points, 'answers': [] } # Add answers answers_list = list(q.answer_ids) # CRITICAL: For ordering questions, shuffle answers if q.question_type == 'ordering': import random random.shuffle(answers_list) for ans in answers_list: question_data['answers'].append({ 'id': ans.id, 'text': ans.answer_text, }) questions.append(question_data) return { 'attempt_id': attempt.id, 'quiz_name': self.name, 'questions': questions, 'time_limit_minutes': self.time_limit_minutes, 'passing_score': self.passing_score, 'description': self.description or '', 'success_message': self.success_message or '', 'fail_message': self.fail_message or '', 'show_correct_answers': self.show_correct_answers, 'is_preview': True, } def action_preview_certificate(self): """ Preview certificate design with sample data. Opens PDF in new tab for testing the certificate layout. """ self.ensure_one() import base64 from datetime import datetime # Sample data for preview topic_name = self.topic_ids[0].name if self.topic_ids else self.name user_name = self.env.user.name score = 95 # Sample score date_str = datetime.now().strftime('%B %d, %Y') # Format body text with sample variables body_text = self.certificate_body_template or 'has successfully completed the training course and demonstrated mastery of the required competencies.' for key, val in {'user_name': user_name, 'quiz_name': self.name, 'topic_name': topic_name}.items(): body_text = body_text.replace('{' + key + '}', val) # Helper function to safely get base64 from Binary field def get_image_base64(binary_data): if not binary_data: return None if isinstance(binary_data, bytes): return binary_data.decode('utf-8') if binary_data else None return str(binary_data) if binary_data else None # Build image HTML primary_logo_html = '' logo_b64 = get_image_base64(self.certificate_logo) if logo_b64: primary_logo_html = f'' secondary_logo_html = '' logo_b64 = get_image_base64(self.certificate_secondary_logo) if logo_b64: secondary_logo_html = f'' signature_img_html = '' sig_b64 = get_image_base64(self.certificate_signature_image) if sig_b64: signature_img_html = f'' stamp_img_html = '' stamp_b64 = get_image_base64(self.certificate_stamp) if stamp_b64: stamp_img_html = f'' # Generate the same HTML as actual certificate but with sample data # (Reuse the template from generate_certificate_pdf in GeniusQuizAttempt) html_content = self._get_certificate_html_template( user_name=user_name, topic_name=topic_name, body_text=body_text, date_str=date_str, score=score, primary_logo_html=primary_logo_html, secondary_logo_html=secondary_logo_html, signature_img_html=signature_img_html, stamp_img_html=stamp_img_html, ) # Generate PDF try: import subprocess import tempfile import os with tempfile.NamedTemporaryFile(mode='w', suffix='.html', delete=False, encoding='utf-8') as html_file: html_file.write(html_content) html_path = html_file.name pdf_path = html_path.replace('.html', '.pdf') cmd = [ 'wkhtmltopdf', '--page-size', 'A4', '--orientation', 'Landscape', '--margin-top', '0', '--margin-bottom', '0', '--margin-left', '0', '--margin-right', '0', '--disable-smart-shrinking', '--dpi', '96', '--print-media-type', '--enable-local-file-access', '--quiet', html_path, pdf_path ] subprocess.run(cmd, check=True, capture_output=True) with open(pdf_path, 'rb') as pdf_file: pdf_content = pdf_file.read() # Cleanup os.unlink(html_path) os.unlink(pdf_path) # Create attachment with public access for preview attachment = self.env['ir.attachment'].create({ 'name': f'Certificate_Preview_{self.name}.pdf', 'type': 'binary', 'datas': base64.b64encode(pdf_content), 'mimetype': 'application/pdf', 'public': True, }) # Return action to open in PDF.js viewer (guarantees inline view with controls) return { 'type': 'ir.actions.act_url', 'url': f'/web/static/lib/pdfjs/web/viewer.html?file=/web/content/{attachment.id}/{attachment.name}', 'target': 'new', } except Exception as e: import logging _logger = logging.getLogger(__name__) _logger.exception('Failed to generate certificate preview: %s', str(e)) from odoo.exceptions import UserError raise UserError(f'Failed to generate certificate preview: {str(e)}') def _get_certificate_html_template(self, user_name, topic_name, body_text, date_str, score, primary_logo_html, secondary_logo_html, signature_img_html, stamp_img_html): """Return a professional certificate HTML template - polished version.""" quiz = self return f'''
{primary_logo_html}
✦ ✦ ✦
{quiz.certificate_title or 'CERTIFICATE'}
OF ACHIEVEMENT
{secondary_logo_html}
This is to certify that
{user_name}
{body_text}
{topic_name}

{score}%Score
''' def action_duplicate_quiz(self): """Duplicate quiz with all questions""" self.ensure_one() new_quiz = self.copy({'name': f"{self.name} (Copy)"}) return { 'type': 'ir.actions.act_window', 'res_model': 'genius.quiz', 'res_id': new_quiz.id, 'view_mode': 'form', 'target': 'current', } def action_view_attempts(self): """View all attempts for this quiz""" self.ensure_one() return { 'type': 'ir.actions.act_window', 'name': f'Attempts: {self.name}', 'res_model': 'genius.quiz.attempt', 'view_mode': 'tree,form', 'domain': [('quiz_id', '=', self.id), ('is_preview', '=', False)], 'context': {'create': False}, } def action_reset_statistics(self): """Reset all quiz statistics by deleting all attempts""" self.ensure_one() # Security: Only instructors/admins can reset if not self.env.user.has_group('tour_genius.group_genius_instructor'): return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': 'Access Denied', 'message': 'Only instructors can reset statistics.', 'type': 'danger', 'sticky': False, } } attempts = self.env['genius.quiz.attempt'].search([('quiz_id', '=', self.id)]) attempt_count = len(attempts) if attempt_count: attempts.unlink() return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': 'Statistics Reset', 'message': f'{attempt_count} attempt(s) deleted.', 'type': 'success', 'sticky': False, } } def compute_question_difficulty(self): """ Compute difficulty for each question based on actual responses. Returns a dict of question_id: difficulty analysis. """ self.ensure_one() result = {} for question in self.question_ids: # Get all responses for this question responses = self.env['genius.quiz.response'].search([ ('question_id', '=', question.id), ('attempt_id.state', '=', 'submitted') ]) if not responses: result[question.id] = { 'question': self._truncate_text(question.question_text, 50), 'total_responses': 0, 'correct_count': 0, 'success_rate': 0, 'difficulty': 'unknown', } continue total = len(responses) correct = sum(1 for r in responses if r.is_correct) success_rate = (correct / total * 100) if total > 0 else 0 # Determine difficulty based on success rate if success_rate >= 80: difficulty = 'easy' elif success_rate >= 50: difficulty = 'medium' elif success_rate >= 20: difficulty = 'hard' else: difficulty = 'very_hard' result[question.id] = { 'question': self._truncate_text(question.question_text, 50), 'total_responses': total, 'correct_count': correct, 'success_rate': round(success_rate, 1), 'difficulty': difficulty, } return result def action_analyze_difficulty(self): """Show question difficulty analysis""" self.ensure_one() analysis = self.compute_question_difficulty() # Build message if not analysis: msg = "No questions to analyze." else: lines = [] for q_id, data in analysis.items(): icon = {'easy': '[Easy]', 'medium': '[Medium]', 'hard': '[Hard]', 'very_hard': '[Very Hard]', 'unknown': '[Unknown]'}.get(data['difficulty'], '[Unknown]') lines.append(f"{icon} {data['question']} - {data['success_rate']}% ({data['difficulty']})") msg = '\n'.join(lines) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': 'Question Difficulty Analysis', 'message': msg[:500] + '...' if len(msg) > 500 else msg, 'type': 'info', 'sticky': True, } } # ============================================================================= # QUESTION # ============================================================================= class GeniusQuizQuestion(models.Model): """Individual question within a quiz.""" _name = 'genius.quiz.question' _description = 'Quiz Question' _order = 'sequence, id' quiz_id = fields.Many2one('genius.quiz', string='Quiz', required=True, ondelete='cascade') sequence = fields.Integer(string='Order', default=10) name = fields.Char(string='Question', compute='_compute_name', store=True) question_text = fields.Text(string='Question Text', required=True, translate=True) question_type = fields.Selection([ ('single', 'Single Choice'), ('multiple', 'Multiple Choice'), ('short_answer', 'Short Answer'), ('fill_blank', 'Fill in the Blank'), ('ordering', 'Ordering/Sequence'), ], string='Question Type', default='single', required=True, help="Single Choice: One correct answer.\n" "Multiple Choice: Multiple correct answers.\n" "Short Answer: User types the answer.\n" "Fill in the Blank: Complete the sentence.\n" "Ordering: Arrange items in correct sequence.") @api.depends('question_text') def _compute_name(self): for record in self: if record.question_text: text = record.question_text.strip() record.name = (text[:97] + '...') if len(text) > 97 else text else: record.name = f"Question {record.id}" image = fields.Binary(string='Image', attachment=True) answer_ids = fields.One2many('genius.quiz.answer', 'question_id', string='Answers') # Short answer / Fill in the Blank correct_short_answer = fields.Char( string='Correct Answer', help="The expected answer. For Fill in the Blank, this is the word/phrase that fills the blank.") answer_alternatives = fields.Char( string='Alternative Answers', help="Comma-separated list of alternative correct answers (e.g., 'color, colour, Color').") case_sensitive = fields.Boolean( string='Case Sensitive', default=False, help="If checked, the answer must match exactly including uppercase/lowercase.") points = fields.Integer(string='Points', default=1) explanation = fields.Html(string='Explanation', translate=True) active = fields.Boolean(string='Active', default=True) # ------------------------------------------------------------------------- # Constraints # ------------------------------------------------------------------------- @api.constrains('question_type', 'answer_ids', 'correct_short_answer') def _check_valid_answers(self): """Ensure each question type has valid answer configuration. NOTE: Skips validation during module install/update because questions are created before their answers in XML data files. """ # Skip during module install/update (answers not yet linked) if self.env.context.get('install_mode') or self.env.context.get('module'): return for q in self: if q.question_type in ('single', 'multiple'): if not q.answer_ids: raise UserError(_( "Question '%s' must have at least one answer option." ) % self._truncate_question(q.question_text)) correct_count = sum(1 for a in q.answer_ids if a.is_correct) if q.question_type == 'single' and correct_count != 1: raise UserError(_( "Single Choice question '%s' must have exactly ONE correct answer. Found: %d" ) % (self._truncate_question(q.question_text), correct_count)) if q.question_type == 'multiple' and correct_count < 1: raise UserError(_( "Multiple Choice question '%s' must have at least one correct answer." ) % self._truncate_question(q.question_text)) elif q.question_type == 'ordering': if len(q.answer_ids) < 2: raise UserError(_( "Ordering question '%s' must have at least 2 items to order." ) % self._truncate_question(q.question_text)) elif q.question_type in ('short_answer', 'fill_blank'): if not q.correct_short_answer: raise UserError(_( "Question '%s' must have a correct answer defined." ) % self._truncate_question(q.question_text)) def _truncate_question(self, text, max_len=50): """Safely truncate question text for error messages.""" if not text: return 'Untitled' text = text.strip() return (text[:max_len] + '...') if len(text) > max_len else text # ------------------------------------------------------------------------- # Answer Helpers # ------------------------------------------------------------------------- def get_correct_answers(self): """Get correct answer(s) for this question based on type.""" self.ensure_one() if self.question_type in ('short_answer', 'fill_blank'): answers = [self.correct_short_answer] if self.correct_short_answer else [] # Add alternatives if self.answer_alternatives: alternatives = [a.strip() for a in self.answer_alternatives.split(',') if a.strip()] answers.extend(alternatives) return answers elif self.question_type == 'ordering': # Return answers in correct sequence order return self.answer_ids.sorted('sequence') else: # Single/Multiple choice return self.answer_ids.filtered('is_correct') def check_text_answer(self, user_answer): """Check if user's text answer is correct (for short_answer/fill_blank).""" self.ensure_one() if not user_answer: return False correct_answers = self.get_correct_answers() user_answer_normalized = user_answer.strip() for correct in correct_answers: if not correct: continue correct_normalized = correct.strip() if self.case_sensitive: if user_answer_normalized == correct_normalized: return True else: if user_answer_normalized.lower() == correct_normalized.lower(): return True return False def check_ordering_answer(self, user_sequence): """Check if user's ordering is correct. Args: user_sequence: List of answer IDs in user's order Returns: Tuple of (is_fully_correct, score_percentage) """ self.ensure_one() correct_sequence = self.answer_ids.sorted('sequence').ids if user_sequence == correct_sequence: return (True, 100.0) # Partial credit: count correctly positioned items correct_positions = sum( 1 for i, ans_id in enumerate(user_sequence) if i < len(correct_sequence) and ans_id == correct_sequence[i] ) score = (correct_positions / len(correct_sequence)) * 100 if correct_sequence else 0 return (False, score) # ============================================================================= # ANSWER # ============================================================================= class GeniusQuizAnswer(models.Model): """Answer option for a quiz question.""" _name = 'genius.quiz.answer' _description = 'Quiz Answer' _order = 'sequence, id' _rec_name = 'answer_text' question_id = fields.Many2one('genius.quiz.question', string='Question', required=True, ondelete='cascade') sequence = fields.Integer(string='Order', default=10) answer_text = fields.Char(string='Answer', required=True, translate=True) is_correct = fields.Boolean(string='Is Correct', default=False) feedback = fields.Text(string='Feedback', translate=True) # ============================================================================= # ATTEMPT # ============================================================================= class GeniusQuizAttempt(models.Model): """User's attempt at a quiz.""" _name = 'genius.quiz.attempt' _description = 'Quiz Attempt' _order = 'create_date desc' quiz_id = fields.Many2one('genius.quiz', string='Quiz', required=True, ondelete='restrict') user_id = fields.Many2one('res.users', string='User', required=True, default=lambda self: self.env.user, ondelete='cascade') state = fields.Selection([ ('in_progress', 'In Progress'), ('submitted', 'Submitted'), ], string='Status', default='in_progress') response_ids = fields.One2many('genius.quiz.response', 'attempt_id', string='Responses') # Timing started_at = fields.Datetime(string='Started At', default=fields.Datetime.now) submitted_at = fields.Datetime(string='Submitted At') time_taken_minutes = fields.Integer(string='Time Taken (Minutes)', compute='_compute_time_taken', store=True) # Scoring score = fields.Float(string='Score (%)', compute='_compute_score', store=True) points_earned = fields.Integer(string='Points Earned', compute='_compute_score', store=True) points_possible = fields.Integer(string='Points Possible', compute='_compute_score', store=True) is_passed = fields.Boolean(string='Passed', compute='_compute_score', store=True) # UI Control show_correct_answers = fields.Boolean(related='quiz_id.show_correct_answers', string='Show Answers') success_message = fields.Html(related='quiz_id.success_message', string='Success Message', readonly=True) fail_message = fields.Html(related='quiz_id.fail_message', string='Failure Message', readonly=True) attempt_number = fields.Integer(string='Attempt #', compute='_compute_attempt_number') is_preview = fields.Boolean(string='Test/Preview Mode', default=False, help="If true, this attempt is a test run and won't count towards statistics.") @api.depends('started_at', 'submitted_at') def _compute_time_taken(self): for attempt in self: if attempt.started_at and attempt.submitted_at: delta = attempt.submitted_at - attempt.started_at attempt.time_taken_minutes = int(delta.total_seconds() / 60) else: attempt.time_taken_minutes = 0 @api.depends('response_ids', 'response_ids.is_correct', 'response_ids.question_id.points') def _compute_score(self): for attempt in self: # Fix: Calculate total points based on QUESTIONS IN ATTEMPT (for sampling/random) # Old logic (bad): sum(attempt.quiz_id.question_ids.mapped('points')) total_points = sum(r.question_id.points for r in attempt.response_ids) earned_points = sum(r.question_id.points for r in attempt.response_ids.filtered('is_correct')) attempt.points_possible = total_points attempt.points_earned = earned_points if total_points > 0: attempt.score = (earned_points / total_points) * 100 attempt.is_passed = attempt.score >= attempt.quiz_id.passing_score else: attempt.score = 0.0 attempt.is_passed = False def _compute_attempt_number(self): for attempt in self: # Only count real attempts for real numbering domain = [ ('quiz_id', '=', attempt.quiz_id.id), ('user_id', '=', attempt.user_id.id), ('create_date', '<', attempt.create_date), ] # If this is a real attempt, ignore previews in count if not attempt.is_preview: domain.append(('is_preview', '=', False)) previous = self.search_count(domain) attempt.attempt_number = previous + 1 def action_submit(self): """Submit and calculate score""" for attempt in self: # 1. Score responses for response in attempt.response_ids: response._score_response() # 2. Mark as submitted attempt.write({ 'state': 'submitted', 'submitted_at': fields.Datetime.now(), }) # 3. GENIUS INTEGRATION: Auto-verify linked tours if passed # Re-read to get computed score/is_passed attempt.refresh() # CRITICAL: Do not verify tours if this is just a preview/test if attempt.is_passed and not attempt.is_preview: # Find tours using this quiz linked_topics = attempt.quiz_id.topic_ids if linked_topics: # Find progress records for these topics and current user progress_records = self.env['genius.progress'].search([ ('user_id', '=', attempt.user_id.id), ('topic_id', 'in', linked_topics.ids) ]) # Update progress to verified progress_records.write({ 'state': 'verified', 'date_verified': fields.Datetime.now(), 'quiz_score': attempt.score, 'quiz_attempt_id': attempt.id, }) else: # Update score on progress even if failed (propagate latest attempt) # CRITICAL: Skip for preview attempts - they should NOT affect user progress if not attempt.is_preview: linked_topics = attempt.quiz_id.topic_ids progress_records = self.env['genius.progress'].search([ ('user_id', '=', attempt.user_id.id), ('topic_id', 'in', linked_topics.ids) ]) progress_records.write({ 'quiz_score': attempt.score, 'quiz_attempt_id': attempt.id }) return True def action_cancel_attempt(self): """Called when user aborts the quiz popup. Deletes the attempt.""" for attempt in self: # Delete if in progress OR if it's a finished preview (cleanup) if attempt.state == 'in_progress' or (attempt.is_preview and attempt.state == 'submitted'): attempt.unlink() return True def action_submit_from_popup(self, responses_data): """ Submit quiz from the popup widget. responses_data: [{question_id, selected_answer_ids, text_answer}, ...] Returns results for display in popup. """ self.ensure_one() # Build lookup dict for O(1) access (instead of O(n) filtered search per response) response_by_question = {r.question_id.id: r for r in self.response_ids} # Save responses for resp_data in responses_data: question_id = resp_data.get('question_id') response = response_by_question.get(question_id) if response: vals = {} question = response.question_id # For ordering questions, save sequence in order_sequence field (M2M doesn't preserve order) if question.question_type == 'ordering' and resp_data.get('selected_answer_ids'): import json vals['order_sequence'] = json.dumps(resp_data['selected_answer_ids']) elif resp_data.get('selected_answer_ids'): vals['selected_answer_ids'] = [(6, 0, resp_data['selected_answer_ids'])] if resp_data.get('text_answer'): vals['text_answer'] = resp_data['text_answer'] if vals: response.write(vals) # Submit (scores and updates progress) self.action_submit() # Calculate time taken time_taken = 0 time_formatted = "00:00" if self.started_at and self.submitted_at: delta = self.submitted_at - self.started_at total_seconds = int(delta.total_seconds()) time_taken = int(total_seconds / 60) minutes, seconds = divmod(total_seconds, 60) time_formatted = "{:02d}:{:02d}".format(minutes, seconds) # Check if can retry can_retry = True if self.quiz_id.max_attempts > 0: # Bypass limit for previews/test mode if self.is_preview: can_retry = True else: attempt_count = self.env['genius.quiz.attempt'].search_count([ ('quiz_id', '=', self.quiz_id.id), ('user_id', '=', self.env.user.id), ('state', '=', 'submitted'), ('is_preview', '=', False), # CRITICAL: Exclude preview attempts from count ]) can_retry = attempt_count < self.quiz_id.max_attempts # Build correct answers data if show_correct_answers is enabled correct_answers = {} if self.quiz_id.show_correct_answers: for resp in self.response_ids: q = resp.question_id # Get correct answers with text correct_answer_objs = q.answer_ids.filtered('is_correct') correct_answer_texts = [a.answer_text for a in correct_answer_objs] # Get user's selected answers with text user_answer_texts = [a.answer_text for a in resp.selected_answer_ids] # Build response data based on question type answer_data = { 'is_correct': resp.is_correct, 'explanation': q.explanation or '', 'question_type': q.question_type, # For single/multiple choice 'correct_answers': correct_answer_texts, 'user_answers': user_answer_texts, 'correct_ids': correct_answer_objs.ids, 'user_ids': resp.selected_answer_ids.ids, } # Handle Short Answer / Fill in the Blank questions if q.question_type in ('short_answer', 'fill_blank'): # Get all correct answers including alternatives correct_list = q.get_correct_answers() answer_data['correct_answer'] = correct_list[0] if correct_list else '' answer_data['correct_alternatives'] = correct_list[1:] if len(correct_list) > 1 else [] answer_data['user_answer'] = resp.text_answer or '' # Handle Ordering questions elif q.question_type == 'ordering': # Correct order is the sequence order in question correct_order = q.answer_ids.sorted('sequence') answer_data['correct_order'] = [a.answer_text for a in correct_order] # User's order is stored in order_sequence as JSON user_order = [] if resp.order_sequence: import json try: user_sequence_ids = json.loads(resp.order_sequence) for ans_id in user_sequence_ids: ans = q.answer_ids.filtered(lambda a: a.id == ans_id) if ans: user_order.append(ans.answer_text) except (json.JSONDecodeError, ValueError): pass answer_data['user_order'] = user_order correct_answers[q.id] = answer_data # Calculate attempts remaining for UI display max_attempts = self.quiz_id.max_attempts attempts_remaining = -1 # -1 means unlimited if max_attempts > 0 and not self.is_preview: # Reuse attempt_count from can_retry calculation (already accounts for is_preview) real_attempts = self.env['genius.quiz.attempt'].search_count([ ('quiz_id', '=', self.quiz_id.id), ('user_id', '=', self.env.user.id), ('state', '=', 'submitted'), ('is_preview', '=', False), ]) attempts_remaining = max(0, max_attempts - real_attempts) return { 'attempt_id': self.id, # For PDF download 'score': self.score, 'is_passed': self.is_passed, 'points_earned': self.points_earned, 'points_possible': self.points_possible, 'time_taken': time_taken, 'time_formatted': time_formatted, 'user_name': self.env.user.name, 'date': fields.Date.today().strftime('%B %d, %Y'), 'can_retry': can_retry, # Attempts info for Genius UI 'attempts_remaining': attempts_remaining, 'max_attempts': max_attempts, # New fields for enhanced feedback 'success_message': self.quiz_id.success_message or '', 'fail_message': self.quiz_id.fail_message or '', 'show_correct_answers': self.quiz_id.show_correct_answers, 'correct_answers': correct_answers, } def generate_certificate_pdf(self): """ Generate a professional PDF certificate for passed quiz attempts. Uses the shared template from GeniusQuiz._get_certificate_html_template. Returns the attachment ID. """ self.ensure_one() if not self.is_passed: return False quiz = self.quiz_id import base64 # Build template variables topic_name = quiz.topic_ids[0].name if quiz.topic_ids else quiz.name # Format body text with variables body_text = quiz.certificate_body_template or 'has successfully completed the training course and demonstrated mastery of the required competencies.' for key, val in {'user_name': self.user_id.name, 'quiz_name': quiz.name, 'topic_name': topic_name}.items(): body_text = body_text.replace('{' + key + '}', val) # Format date date_str = self.submitted_at.strftime('%B %d, %Y') if self.submitted_at else fields.Date.today().strftime('%B %d, %Y') # Helper function to safely get base64 from Binary field def get_image_base64(binary_data): if not binary_data: return None if isinstance(binary_data, bytes): return binary_data.decode('utf-8') if binary_data else None return str(binary_data) if binary_data else None # Build image HTML primary_logo_html = '' logo_b64 = get_image_base64(quiz.certificate_logo) if logo_b64: primary_logo_html = f'' secondary_logo_html = '' logo_b64 = get_image_base64(quiz.certificate_secondary_logo) if logo_b64: secondary_logo_html = f'' signature_img_html = '' sig_b64 = get_image_base64(quiz.certificate_signature_image) if sig_b64: signature_img_html = f'' stamp_img_html = '' stamp_b64 = get_image_base64(quiz.certificate_stamp) if stamp_b64: stamp_img_html = f'' # Use the shared template from GeniusQuiz html_content = quiz._get_certificate_html_template( user_name=self.user_id.name, topic_name=topic_name, body_text=body_text, date_str=date_str, score=round(self.score), primary_logo_html=primary_logo_html, secondary_logo_html=secondary_logo_html, signature_img_html=signature_img_html, stamp_img_html=stamp_img_html, ) # Generate PDF using wkhtmltopdf try: import subprocess import tempfile import os with tempfile.NamedTemporaryFile(mode='w', suffix='.html', delete=False, encoding='utf-8') as html_file: html_file.write(html_content) html_path = html_file.name pdf_path = html_path.replace('.html', '.pdf') # Run wkhtmltopdf with proper options for full-page rendering cmd = [ 'wkhtmltopdf', '--orientation', 'Landscape', '--page-size', 'A4', '--margin-top', '0', '--margin-right', '0', '--margin-bottom', '0', '--margin-left', '0', '--disable-smart-shrinking', '--dpi', '96', '--enable-local-file-access', '--print-media-type', '--quiet', html_path, pdf_path ] subprocess.run(cmd, check=True, capture_output=True) # Read PDF content with open(pdf_path, 'rb') as pdf_file: pdf_content = pdf_file.read() # Cleanup temp files os.unlink(html_path) os.unlink(pdf_path) # Create attachment import base64 attachment = self.env['ir.attachment'].create({ 'name': f'Certificate_{quiz.name}_{self.user_id.name}_{self.id}.pdf', 'type': 'binary', 'datas': base64.b64encode(pdf_content), 'res_model': 'genius.quiz.attempt', 'res_id': self.id, 'mimetype': 'application/pdf', }) return attachment.id except Exception as e: import logging _logger = logging.getLogger(__name__) _logger.exception('Failed to generate certificate PDF: %s', str(e)) return False # ============================================================================= # RESPONSE # ============================================================================= class GeniusQuizResponse(models.Model): """User's response to a single question.""" _name = 'genius.quiz.response' _description = 'Quiz Response' _order = 'sequence, id' # Preserve question order as created attempt_id = fields.Many2one('genius.quiz.attempt', string='Attempt', required=True, ondelete='cascade') question_id = fields.Many2one('genius.quiz.question', string='Question', required=True, ondelete='cascade') sequence = fields.Integer(string='Sequence', default=10, help='Order of question in attempt') @api.model def _fix_missing_sequences(self): """Auto-fix legacy records without proper sequence values.""" # Find responses with default sequence (10) that might need fixing # Group by attempt and set sequence based on ID order within attempt self.env.cr.execute(""" WITH ranked AS ( SELECT id, ROW_NUMBER() OVER (PARTITION BY attempt_id ORDER BY id) as new_seq FROM genius_quiz_response WHERE sequence = 10 OR sequence IS NULL ) UPDATE genius_quiz_response r SET sequence = ranked.new_seq FROM ranked WHERE r.id = ranked.id AND (r.sequence = 10 OR r.sequence IS NULL) """) return True # Response selected_answer_ids = fields.Many2many('genius.quiz.answer', string='Selected Answers') text_answer = fields.Char(string='Text Answer') # CRITICAL: Ordering questions store sequence here because M2M doesn't preserve order order_sequence = fields.Char( string='Order Sequence', help='JSON list of answer IDs in user order. Used for ordering questions.') # Scoring is_correct = fields.Boolean(string='Correct', default=False) explanation = fields.Html(related='question_id.explanation', string='Explanation', readonly=True) def _score_response(self): """Score this response based on question type.""" for response in self: question = response.question_id if question.question_type in ('short_answer', 'fill_blank'): # Use the question's check_text_answer which handles alternatives response.is_correct = question.check_text_answer(response.text_answer) elif question.question_type == 'ordering': # For ordering: compare user's sequence with correct sequence # User's sequence is stored in order_sequence as JSON (M2M doesn't preserve order!) user_sequence = [] if response.order_sequence: import json try: user_sequence = json.loads(response.order_sequence) except (json.JSONDecodeError, ValueError): pass is_correct, score_pct = question.check_ordering_answer(user_sequence) response.is_correct = is_correct # Note: For partial credit in future, use score_pct else: # single, multiple correct_answers = question.answer_ids.filtered('is_correct') selected = response.selected_answer_ids response.is_correct = set(correct_answers.ids) == set(selected.ids)