import logging import pytz from datetime import datetime from odoo import models, fields, api, registry, _ from odoo.exceptions import ValidationError, UserError from odoo.osv import expression from odoo.addons.base.models.res_partner import _tz_get from odoo.addons.to_base.helper.multi_threading import Threading from ..pyzk.zk import ZK from ..pyzk.zk.user import User from ..pyzk.zk.exception import ZKErrorResponse, ZKNetworkError, ZKConnectionUnauthorized _logger = logging.getLogger(__name__) class AttendanceDevice(models.Model): _name = 'attendance.device' _description = 'Attendance Machine' _inherit = ['mail.thread', 'mail.activity.mixin', 'to.base'] @api.model def _default_get_attendance_device_state_lines(self): attendance_device_state_line_data = [] for state in self.env['attendance.state'].search([]): attendance_device_state_line_data.append( fields.Command.create({ 'attendance_state_id': state.id, 'code': state.code, 'type': state.type, 'activity_id': state.activity_id.id }) ) return attendance_device_state_line_data state = fields.Selection([ ('draft', 'Draft'), ('confirmed', 'Confirmed'), ('cancelled', 'Cancelled') ], string='Status', default='draft', index=True, copy=False, required=True, tracking=True, help="Only confirmed machines will get their data synchronized automatically.") name = fields.Char(string='Name', required=True, help="The name of the attendance machine", tracking=True, translate=True, copy=True, default='/', ) firmware_version = fields.Char(string='Firmware Version', readonly=True, help="The firmware version of the machine which will be filled automatically when you hit the 'Get Machine Info' button.") serialnumber = fields.Char(string='Serial Number', readonly=True, help="The serial number of the machine which will be filled automatically when you hit the 'Get Machine Info' button.") oem_vendor = fields.Char(string='OEM Vendor', readonly=True, help="The OEM Vendor of the machine which will be filled automatically when you hit the 'Get Machine Info' button.") platform = fields.Char(string='Platform', readonly=True, help="The Platform of the machine which will be filled automatically when you hit the 'Get Machine Info' button.") fingerprint_algorithm = fields.Char(string='Fingerprint Algorithm', readonly=True, help="The Fingerprint Algorithm (aka ZKFPVersion) of the machine which will be filled automatically when you hit the 'Get Machine Info' button.") device_name = fields.Char(string='Machine Name', readonly=True, help="The model of the machine which will be filled automatically when you hit the 'Get Machine Info' button.") work_code = fields.Char(string='Work Code', readonly=True, help="The Work Code of the machine which will be filled automatically when you hit the 'Get Machine Info' button.") ip = fields.Char(string='IP / Domain Name', required=True, tracking=True, copy=False, help="The accessible IP or Domain Name of the machine to get the machine's attendance data", default='0.0.0.0') port = fields.Integer(string='Port', required=True, default=4370, tracking=True, ) timeout = fields.Integer(string='Timeout', default=20, required=True, help="Maximum time in seconds to wait for response from the machine", tracking=True, ) description = fields.Text(string='Description') user_id = fields.Many2one('res.users', string='Technician', tracking=True, default=lambda self: self.env.user) device_user_ids = fields.One2many('attendance.device.user', 'device_id', string='Machine Users', help="List of Users stored in the attendance machine") device_users_count = fields.Integer(string='Users Count', compute='_compute_device_users_count', store=True, tracking=True) mapped_employee_ids = fields.Many2many('hr.employee', 'mapped_device_employee_rel', string='Mapped Employees', compute='_compute_employees', store=True, help="List of employees that have been mapped with this machine's users") mapped_employees_count = fields.Integer(string='Mapped Employee Count', compute='_compute_mapped_employees_count', store=True, tracking=True) umapped_device_user_ids = fields.One2many('attendance.device.user', 'device_id', string='Unmapped Machine Users', domain=[('employee_id', '=', False)], help="List of Machine Users that have not been mapped with an employee") unmapped_employee_ids = fields.Many2many('hr.employee', 'device_employee_rel', 'device_id', 'employee_id', compute='_compute_employees', store=True, string='Unmapped Employees', help="The employees that have not been mapped with any user of this machine") attendance_device_state_line_ids = fields.One2many('attendance.device.state.line', 'device_id', string='State Codes', copy=False, default=_default_get_attendance_device_state_lines, ) location_id = fields.Many2one('attendance.device.location', string='Location', tracking=True, help="The location where the machine is located", required=True, ) ignore_unknown_code = fields.Boolean(string='Ignore Unknown Code', default=False, tracking=True, help="Sometimes you don't want to load attendance data with status " "codes those not declared in the table below. In such the case, check this field.", ) company_id = fields.Many2one('res.company', string='Company', default=lambda self: self.env.company, ) auto_clear_attendance = fields.Boolean(string='Auto Clear Attendance Data', default=False, tracking=True, help="Check this to clear all machine attendance data after download into System") auto_clear_attendance_schedule = fields.Selection([ ('on_download_complete', 'On Download Completion'), ('time_scheduled', 'Time Scheduled')], string='Auto Clear Schedule', required=True, default='on_download_complete', tracking=True, help="On Download Completion: Delete attendance data as soon as download finished\n" "Time Scheduled: Delete attendance data on the time specified below") auto_clear_attendance_hour = fields.Float(string='Auto Clear At', tracking=True, required=True, default=0.0, help="The time (in the attendance machine's timezone) to clear attendance data after download.") auto_clear_attendance_dow = fields.Selection([ ('-1', 'Everyday'), ('0', 'Monday'), ('1', 'Tuesday'), ('2', 'Wednesday'), ('3', 'Thursday'), ('4', 'Friday'), ('5', 'Saturday'), ('6', 'Sunday'), ], string='Auto Clear On', default='6', required=True, tracking=True) auto_clear_attendance_error_notif = fields.Boolean(string='Auto Clear Attendance Notif.', default=True, tracking=True, help="Notify upon no safe found to clear attendance data") tz = fields.Selection(_tz_get, string='Time zone', compute='_compute_tz', store=True, precompute=True, help="The machine's timezone, used to output proper date and time values inside attendance reports.") active = fields.Boolean(string='Active', default=True, tracking=True, readonly=False) unique_uid = fields.Boolean(string='Unique UID', default=True, required=True, tracking=True, help="Some Bad Machines allow UID duplication. In this case, uncheck this field. But it is recommended to change your machine.") last_attendance_download = fields.Datetime(string='Last Sync.', readonly=True, help="The last time that the attendance data was downloaded from the machine into the System.") map_before_dl = fields.Boolean(string='Map Employee Before Download', default=True, help="Always try to map users and employees (if any new found) before downloading attendance data.") create_employee_during_mapping = fields.Boolean(string='Generate Employees During Mapping', default=False, help="If checked, during mapping between Machine's Users and company's employees, unmapped machine" " users will try to create a new employee then map accordingly.") download_error_notification = fields.Boolean(string='Download Error Notification', default=True, help="Enable this to get notified when data download error occurs.") debug_message = fields.Boolean(string='Debug Message', default=False, help="If checked, debugging messages will be posted in OpenChatter for debugging purpose.") user_attendance_ids = fields.One2many('user.attendance', 'device_id', string='Attendance Data', readonly=True) total_att_records = fields.Integer(string='Attendance Records', compute='_compute_total_attendance_records') finger_template_ids = fields.One2many('finger.template', 'device_id', string='Finger Template', readonly=True) total_finger_template_records = fields.Integer(string='Finger Templates', compute='_compute_total_finger_template_records') protocol = fields.Selection([('udp', 'UDP'), ('tcp', 'TCP')], string='Protocol', required=True, default='tcp', tracking=True, help="Some old devices do not support TCP. In such a case, please try switching to UDP.") omit_ping = fields.Boolean(string='Omit Ping', default=True, help="Omit pinging the IP address when connecting to the machine.", ) password = fields.Char(string='Password', help="The password to authenticate the machine, if required.") unaccent_user_name = fields.Boolean(string='Unaccent User Name', default=True, tracking=True, help="Some Machines support Unicode names such as the ZKTeco K50, some others do not." " In addition to this, the name field on devices is usually limited at about 24 Latin characters" " or less Unicode characters. Unaccent is sometimes a workaround for long Unicode names") # 65472 (0xFFc0) is the max size of TCP in the original pyzk (use in the method base.read_with_buffer as MAX_CHUNK) max_size_TCP = fields.Selection([('65472', '65472 bytes'), ('32768', '32768 bytes'), ('16384', '16384 bytes'), ('8192', '8192 bytes'), ('4096', '4096 bytes'), ('2048', '2048 bytes'), ('1024', '1024 bytes'), ], string='TCP Max-Size', default='65472', required=True, help="The default value (65472) works well for almost attendance machines. However, in some rare cases" " the error '[Errno 32] Broken pipe' may occur while getting data from devices. In such case, you may try on decreasing this value" " to see if it would help.\n" "Note: the smaller this value is, the slower data getting will be.") # 16384 is the max size of UDP in the original pyzk (use in the method base.read_with_buffer) max_size_UDP = fields.Selection([('65472', '65472 bytes'), ('32768', '32768 bytes'), ('16384', '16384 bytes'), ('8192', '8192 bytes'), ('4096', '4096 bytes'), ('2048', '2048 bytes'), ('1024', '1024 bytes'), ], string='UDP Max-Size', default='16384', required=True, help="The default value (16384) works well for almost attendance machines. However, in some rare cases," " the error 'timed out' may occur while getting data from devices. In such situation, you may try on decreasing this value to see if it would help\n." "Note: the smaller this value is, the slower data getting will be.") zk_cache = {} _sql_constraints = [ ('ip_and_port_unique', 'UNIQUE(ip, port, location_id)', "You cannot have more than one machine with the same ip and port of the same location!"), ] @property def zk(self): """ This method return a ZK object. If an object corresponding to the connection param was created and available in self.zk_cache, it will be return. To avoid it, call it with .with_context(no_zk_cache=True) """ self.ensure_one() force_udp = self.protocol == 'udp' password = self.password or 0 cached_key = (self.protocol, self.omit_ping, self.timeout, password, self.max_size_TCP, self.max_size_UDP, self.ip, self.port) if cached_key not in self.zk_cache.keys() or self.env.context.get('no_zk_cache', False): self.zk_cache[cached_key] = ZK(self.ip, self.port, self.timeout, password=password, force_udp=force_udp, ommit_ping=self.omit_ping, max_size_TCP=int(self.max_size_TCP), max_size_UDP=int(self.max_size_UDP)) return self.zk_cache[cached_key] @api.depends('location_id.tz') def _compute_tz(self): default_tz = self.env.context.get('tz') or self.env.user.tz for r in self: if r.location_id and r.location_id.tz: r.tz = r.location_id.tz else: r.tz = default_tz def name_get(self): """ name_get that supports displaying location name and model as prefix """ result = [] for r in self: name = r.name if r.oem_vendor: if r.device_name: name = "[%s %s] %s" % (r.oem_vendor, r.device_name, name) else: name = "[%s] %s" % (r.oem_vendor, name) if r.location_id: name = "[%s] %s" % (r.location_id.name, name) result.append((r.id, name)) return result @api.model def name_search(self, name, args=None, operator='ilike', limit=100): """ name search that supports searching by tag code """ args = args or [] domain = [] if name: domain = ['|', ('location_id.name', '=ilike', name + '%'), ('name', operator, name)] if operator in expression.NEGATIVE_TERM_OPERATORS: domain = ['&'] + domain state = self.search(domain + args, limit=limit) return state.name_get() @api.depends('device_user_ids', 'device_user_ids.active') def _compute_device_users_count(self): total_att_data = self.env['attendance.device.user'].read_group([('device_id', 'in', self.ids)], ['device_id'], ['device_id']) mapped_data = dict([(dict_data['device_id'][0], dict_data['device_id_count']) for dict_data in total_att_data]) for r in self: r.device_users_count = mapped_data.get(r.id, 0) def _compute_total_finger_template_records(self): total_att_data = self.env['finger.template'].read_group([('device_id', 'in', self.ids)], ['device_id'], ['device_id']) mapped_data = dict([(dict_data['device_id'][0], dict_data['device_id_count']) for dict_data in total_att_data]) for r in self: r.total_finger_template_records = mapped_data.get(r.id, 0) def _compute_total_attendance_records(self): total_att_data = self.env['user.attendance'].read_group([('device_id', 'in', self.ids)], ['device_id'], ['device_id']) mapped_data = dict([(dict_data['device_id'][0], dict_data['device_id_count']) for dict_data in total_att_data]) for r in self: r.total_att_records = mapped_data.get(r.id, 0) @api.depends('device_user_ids', 'device_user_ids.active', 'device_user_ids.employee_id', 'device_user_ids.employee_id.active') def _compute_employees(self): HrEmployee = self.env['hr.employee'] for r in self: r.update({ 'unmapped_employee_ids': [fields.Command.set(HrEmployee.search([('company_id', '=', r.company_id.id), ('id', 'not in', r.device_user_ids.mapped('employee_id').ids)]).ids)], 'mapped_employee_ids': [fields.Command.set(r.device_user_ids.mapped('employee_id').filtered(lambda employee: employee.active is True).ids)], }) @api.depends('mapped_employee_ids') def _compute_mapped_employees_count(self): for r in self: r.mapped_employees_count = len(r.mapped_employee_ids) @api.onchange('unique_uid') def onchange_unique_uid(self): if not self.unique_uid: message = _("This is for experiment to check if the machine contains bad data with non-unique user's uid." " Turn this option off will allow mapping machine user's user_id with user's user_id in System.\n" "NOTE:\n" "- non-latin user_id are not supportted.\n" "- Do not turn this option off in production.") return { 'warning': { 'title': "Warning!", 'message': message, }, } @api.model_create_multi def create(self, vals_list): for vals in vals_list: if 'ip' in vals: vals['ip'] = vals['ip'].strip() or False return super(AttendanceDevice, self).create(vals_list) def write(self, vals): if 'ip' in vals: vals['ip'] = vals['ip'].strip() or False return super(AttendanceDevice, self).write(vals) def connect(self): def post_message(email_template, error_msg=''): try: with registry(self._cr.dbname).cursor() as cr: with cr.savepoint(): self.with_env(self.env(cr=cr)).with_context(error_msg=error_msg).post_message(email_template) # pylint: disable=invalid-commit cr.commit() except Exception as e: _logger.error( "Could not post message using the template %s. Here is debugging info: %s", email_template.display_name, str(e) ) self.ensure_one() error_msg = False try: return self.zk.connect() except ZKNetworkError as e: error_msg = _("Could not connect to the machine %s.\nDebugging info: %s") % (self.display_name, e) except ZKConnectionUnauthorized: error_msg = _("Connection Unauthorized! The machine %s may require password.") % self.display_name except ZKErrorResponse as e: error_msg = _("Could not get connected to the machine %s. This is usually due to either the network error or" " wrong protocol selection or password authentication is required.\n" "Debugging info:\n%s") % (self.display_name, e) except Exception as e: error_msg = _("Could not get connected to the machine '%s'. Please check your network" " configuration and machine password and/or hard restart your machine.\nDebugging info: %s") % (self.display_name, e) if error_msg: email_template = self.env.ref('to_attendance_device.email_template_attendance_device') post_message(email_template, error_msg) raise ValidationError(error_msg) def disconnect(self): try: return self.zk.disconnect() except Exception as e: _logger.error(e) raise ValidationError(_("Could not get the machine %s disconnected. Here is the debugging information:\n%s") % (self.display_name, e)) def disableDevice(self): """ disable (lock) machine, ensure no activity when process run """ try: return self.zk.disable_device() except Exception as e: _logger.error(e) raise ValidationError(_("Could not get the machine %s disabled. Here is the debugging information:\n%s") % (self.display_name, e)) def _restart(self): self.ensure_one() self.restartDevice() def enableDevice(self): """ re-enable the connected machine """ try: return self.zk.enable_device() except Exception as e: _logger.error(e) raise ValidationError(_("Could not get the machine %s enabled. Here is the debugging information:\n%s") % (self.display_name, e)) def getFirmwareVersion(self): ''' return the firmware version ''' try: self.connect() self.enableDevice() return self.zk.get_firmware_version() except Exception as e: _logger.error(e) raise ValidationError(_("Could not get the firmware version of the machine %s. Here is the debugging information:\n%s") % (self.display_name, e)) finally: self.disconnect() def getSerialNumber(self): ''' return the serial number ''' try: self.connect() self.enableDevice() return self.zk.get_serialnumber() except Exception as e: _logger.error(e) raise ValidationError(_("Could not get the serial number of the machine %s. Here is the debugging information:\n%s") % (self.display_name, e)) finally: self.disconnect() def getOEMVendor(self): ''' return the serial number ''' try: self.connect() self.enableDevice() return self.zk.get_oem_vendor() except Exception as e: _logger.error(e) raise ValidationError(_("Could not get the OEM Vendor of the machine %s. Here is the debugging information:\n%s") % (self.display_name, e)) finally: self.disconnect() def getFingerprintAlgorithm(self): ''' return the Fingerprint Algorithm ''' try: self.connect() self.enableDevice() return self.zk.get_fp_version() except Exception as e: _logger.error(e) raise ValidationError(_("Could not get the Fingerprint Algorithm of the machine %s. Here is the debugging information:\n%s") % (self.display_name, e)) finally: self.disconnect() def getPlatform(self): ''' return the serial number ''' try: self.connect() self.enableDevice() return self.zk.get_platform() except Exception as e: _logger.error(e) raise ValidationError(_("Could not get the platform of the machine %s. Here is the debugging information:\n%s") % (self.display_name, e)) finally: self.disconnect() def getDeviceName(self): ''' return the serial number ''' try: self.connect() self.enableDevice() return self.zk.get_device_name() except Exception as e: _logger.error(e) raise ValidationError(_("Could not get the Name of the machine %s. Here is the debugging information:\n%s") % (self.display_name, e)) finally: self.disconnect() def getWorkCode(self): ''' return the serial number ''' try: self.connect() self.enableDevice() return self.zk.get_workcode() except Exception as e: _logger.error(e) raise ValidationError(_("Could not get the Work Code of the machine %s. Here is the debugging information:\n%s") % (self.display_name, e)) finally: self.disconnect() def restartDevice(self): ''' restart the machine ''' try: self.connect() self.enableDevice() return self.zk.restart() except Exception as e: _logger.error(e) raise ValidationError(_("Could not restart the machine %s. Here is the debugging information:\n%s") % (self.display_name, e)) def setUser(self, uid=None, name='', privilege=0, password='', group_id='', user_id='', card=0): try: self.connect() self.enableDevice() self.disableDevice() return self.zk.set_user(uid, name, privilege, password, group_id, user_id, card) except Exception as e: _logger.info(e) raise ValidationError(_("Could not set user into the machine %s. Here is the user information:\n" "uid: %s\n" "name: %s\n" "privilege: %s\n" "password: %s\n" "group_id: %s\n" "user_id: %s\n" "Here is the debugging information:\n%s\n") % (self.display_name, uid, name, privilege, password, group_id, user_id, e)) finally: self.enableDevice() self.disconnect() def delUser(self, uid, user_id): ''' delete specific user by uid ''' try: self.connect() self.enableDevice() self.disableDevice() return self.zk.delete_user(uid, user_id) except ZKErrorResponse as e: raise ZKErrorResponse(_("Could not delete the user with uid '%s', user_id '%s' from the device %s\n%s") % (uid, user_id, self.display_name, e)) finally: self.enableDevice() self.disconnect() def getUser(self): ''' return a Python List of machine users in User(uid, name, privilege, password, group_id, user_id) ''' try: self.connect() self.enableDevice() self.disableDevice() return self.zk.get_users() except Exception as e: _logger.error(str(e)) raise ValidationError(_("Could not get users from the machine %s\n" "If you had connected to your machine, perhaps your machine had problem. " "Some bad machines allowed duplicated uid may cause such problem. In such case, " "if you still want to load users from those bad machines, please uncheck Data " "Acknowledge field.\n" "Here is the debugging error message:\n%s") % (self.display_name, str(e))) finally: self.enableDevice() self.disconnect() def upload_finger_templates(self, uid, name, privilege, password, group_id, user_id, fingers): user = User(uid, name, privilege, password, group_id, user_id, card=0) try: self.connect() self.enableDevice() self.disableDevice() return self.zk.save_user_template(user, fingers) except Exception as e: _logger.error(e) raise ValidationError(_("Could not set finger template into the machine %s. Here are the information:\n" "user_id: %s\n" "Debugging information:\n%s") % (self.display_name, user_id, e)) finally: self.enableDevice() self.disconnect() def delFingerTemplate(self, uid, fid, user_id): ''' delete finger template by uid and fid ''' try: self.connect() self.enableDevice() self.disableDevice() return self.zk.delete_user_template(uid, fid, user_id) except Exception as e: _logger.error(e) raise ValidationError(_("Could not delete finger template with fid '%s' of uid '%s' from the machine %s") % (fid, uid, self.display_name,)) finally: self.enableDevice() self.disconnect() def getFingerTemplate(self): ''' return a Python List of fingers template in Finger(uid, fid, valid, template) ''' try: self.connect() self.enableDevice() self.disableDevice() return self.zk.get_templates() except Exception as e: _logger.error(str(e)) raise ValidationError(_("Could not get finger templates from the machine %s\n" "If you had connected to your machine, perhaps your machine had problem. " "Some bad machines allowed duplicated uid may cause such problem. In such case, " "if you still want to load users from those bad machines, please uncheck Data " "Acknowledge field.\n" "Here is the debugging error message:\n%s") % (self.display_name, str(e))) finally: self.enableDevice() self.disconnect() def get_next_uid(self): ''' return max uid of users on attendance machine ''' try: self.connect() self.enableDevice() self.disableDevice() return self.zk.get_next_uid() except Exception as e: _logger.error(str(e)) raise ValidationError(_("Could not get max uid from the machine %s\n" "If you had connected to your machine, perhaps your machine had problem.\n" "Here is the debugging error message:\n%s") % (self.display_name, str(e))) finally: self.enableDevice() self.disconnect() def getMachineTime(self): """ Get naive machine date and time in its local timezone """ try: self.connect() self.enableDevice() local_dt = self.zk.get_time() utc = self.env['to.base'].convert_local_to_utc(local_dt, force_local_tz_name=self.tz, naive=False) return utc.astimezone(pytz.timezone(self.tz)) except Exception as e: _logger.error(str(e)) raise ValidationError(_("Could not get time from the machine %s\n" "Here is the debugging error message:\n%s") % (self.display_name, str(e))) finally: self.disconnect() def clearData(self): ''' clear all data (include: user, attendance report, finger database ) ''' try: self.connect() self.enableDevice() return self.zk.clear_data() except Exception: raise ValidationError(_("Could not clear all data from the machine %s") % (self.display_name,)) finally: self.enableDevice() self.disconnect() def getAttendance(self): post_err_msg = False try: self.connect() self.enableDevice() self.disableDevice() return self.zk.get_attendance() except Exception as e: _logger.error(str(e)) post_err_msg = True raise ValidationError(_("Could not get attendance data from the machine %s") % (self.display_name,)) finally: if post_err_msg and self.download_error_notification: email_template_id = self.env.ref('to_attendance_device.email_template_error_get_attendance') self.post_message(email_template_id) self.enableDevice() self.disconnect() def clearAttendance(self): ''' clear all attendance records from the machine ''' try: self.connect() self.enableDevice() self.disableDevice() return self.zk.clear_attendance() except Exception as e: raise ValidationError(_("Could not clear attendance data from the machine %s. Here is the debugging information:\n%s") % (self.display_name, e)) finally: self.enableDevice() self.disconnect() def _download_users_by_uid(self): """ This method download and update all machine users into model attendance.device.user using uid as key """ DeviceUser = self.env['attendance.device.user'] for r in self: error_msg = "" # device_users = User(uid, name, privilege, password, group_id, user_id) device_users = r.getUser() uids = [] for device_user in device_users: uids.append(device_user.uid) existing_user_ids = [] device_user_ids = DeviceUser.with_context(active_test=False).search([('device_id', '=', r.id)]) for user in device_user_ids.filtered(lambda user: user.uid in uids): existing_user_ids.append(user.uid) users_not_in_device = device_user_ids.filtered(lambda user: user.uid not in existing_user_ids) users_not_in_device.write({'not_in_device': True}) for device_user in device_users: uid = device_user.uid vals = { 'uid': uid, 'name': device_user.name, 'privilege': device_user.privilege, 'password': device_user.password, 'user_id': device_user.user_id, 'device_id': r.id, } if device_user.group_id.isdigit(): vals['group_id'] = device_user.group_id if uid not in existing_user_ids: try: DeviceUser.create(vals) except Exception as e: _logger.info(e) _logger.info(vals) error_msg += str(e) error_msg += _("\nData that caused the error: %s") % str(vals) else: existing = DeviceUser.with_context(active_test=False).search([('uid', '=', uid), ('device_id', '=', r.id)], limit=1) if existing: update_data = {} if existing.name != vals['name']: update_data['name'] = vals['name'] if existing.privilege != vals['privilege']: update_data['privilege'] = vals['privilege'] if existing.password != vals['password']: update_data['password'] = vals['password'] if 'group_id' in vals and existing.group_id != vals['group_id']: update_data['group_id'] = vals['group_id'] if existing.user_id != vals['user_id']: update_data['user_id'] = vals['user_id'] if existing.device_id.id != vals['device_id']: update_data['device_id'] = vals['device_id'] if bool(update_data): try: existing.write(update_data) except Exception as e: _logger.info(e) _logger.info(vals) error_msg += str(e) + "
" error_msg += _("\nData that caused the error: %s") % str(update_data) if error_msg and r.debug_message: r.message_post(body=error_msg) def _download_users_by_user_id(self): """ This method download and update all machine users into model attendance.device.user using user_id as key NOTE: This method is experimental as it failed on comparing user_id in unicode type from machines (unicode: string) with user_id in unicode string from System (u'string') """ DeviceUser = self.env['attendance.device.user'] for r in self: # device_users = User(uid, name, privilege, password, group_id, user_id) device_users = r.getUser() user_ids = [] for device_user in device_users: user_ids.append(str(device_user.user_id)) existing_user_ids = [] device_user_ids = DeviceUser.with_context(active_test=False).search([('device_id', '=', r.id)]) for user in device_user_ids.filtered(lambda user: user.user_id in user_ids): existing_user_ids.append(str(user.user_id)) for device_user in device_users: user_id = str(device_user.user_id) vals = { 'uid': device_user.uid, 'name': device_user.name, 'privilege': device_user.privilege, 'password': device_user.password, 'user_id': device_user.user_id, 'device_id': r.id, } if device_user.group_id.isdigit(): vals['group_id'] = device_user.group_id if user_id not in existing_user_ids: DeviceUser.create(vals) else: existing = DeviceUser.with_context(active_test=False).search([ ('user_id', '=', user_id), ('device_id', '=', r.id)], limit=1) if existing: existing.write(vals) def action_show_time(self): """ Show the time on the machine """ self.ensure_one() action = self._prepare_action_confirm() action['context'].update({ 'method': 'N/A', 'title': _('Machine Time'), 'content': _("The machine time is %s") % self.getMachineTime() }) return action def _user_download(self): """ This method download and update all machine users into model attendance.device.user """ for r in self: if r.unique_uid: r._download_users_by_uid() else: r._download_users_by_user_id() def _user_upload(self): """ This method will 1. Download users from machine 2. Map the users with emloyee 3. Upload users from model attendance.device.user into the machine """ ignored_employees_dict = {} for r in self: # Then we download and map all employees with users r._employee_map() # Then we create users from unmapped employee ignored_employees = [] for employee in r.unmapped_employee_ids: if not employee.barcode: ignored_employees.append(employee) continue employee.upload_to_attendance_device(r) # we download and map all employees with users again r._employee_map() if len(ignored_employees) > 0: ignored_employees_dict[r] = ignored_employees if bool(ignored_employees_dict): message = _("The following employees, who have no Badge ID defined, have not been uploaded to the corresponding machine:\n") for device in ignored_employees_dict.keys(): for employee in ignored_employees_dict[device]: message += device.name + ': ' + employee.name + '\n' return { 'warning': { 'title': "Some Employees could not be uploaded!", 'message': message, }, } def _employee_map(self): self._user_download() for r in self: for user in r.device_user_ids.filtered(lambda user: not user.employee_id): employee = user.smart_find_employee() if employee: user.write({ 'employee_id': employee.id, }) # upload users that are available in Odoo but not available in device for user in r.device_user_ids.filtered(lambda user: user.not_in_device): user.setUser() # upload users that are available in Odoo but not available in device for user in r.device_user_ids.filtered(lambda user: user.not_in_device): user.setUser() user.write({'not_in_device': False}) if r.create_employee_during_mapping: users = r.device_user_ids.filtered(lambda user: not user.employee_id) if users: users.generate_employees() def action_fetch_attendance_data(self): self._threaded_fetch_attendance_data() return {'type': 'ir.actions.client', 'tag': 'reload'} @Threading(db_connection_percentage=15, auto_commit=True, max_batch_size=1) def _threaded_fetch_attendance_data(self): return self._fetch_attendance_data() @api.model def _cron_download_device_attendance(self): devices = self.env['attendance.device'].search([('state', '=', 'confirmed')]) devices.with_context(ignore_error=True)._threaded_fetch_attendance_data() def _fetch_attendance_data(self): DeviceUserAttendance = self.env['user.attendance'] AttendanceUser = self.env['attendance.device.user'] map_before_dl = self.filtered(lambda r: r.map_before_dl) if map_before_dl: map_before_dl._finger_template_download() email_template = self.env.ref('to_attendance_device.email_template_unknown_attendance_status_code') for r in self: error_msg = "" attendance_states = {} for state_line in r.attendance_device_state_line_ids: attendance_states[state_line.attendance_state_id.code] = state_line.attendance_state_id.id attendance_data = r.getAttendance() # log unknown codes for the users to fix later existing_attendance_codes = list(attendance_states.keys()) unknown_attendance_codes = set([ attendance.punch for attendance in attendance_data if attendance.punch not in existing_attendance_codes ]) if unknown_attendance_codes: with registry(self._cr.dbname).cursor() as cr: env = r.env(cr=cr) for unknown_attendance_code in unknown_attendance_codes: try: with env.cr.savepoint(): context = { 'code': unknown_attendance_code, 'machine_name': r.display_name, } r.with_env(env).with_context(context).post_message(email_template) # pylint: disable=invalid-commit env.cr.commit() except Exception as e: env.cr.rollback() _logger.error( "Could not post message using the template %s. Here is debugging info: %s", email_template.display_name, str(e) ) # start storing data into the `user.attendance` for attendance in attendance_data: attendance_user = AttendanceUser.with_context(active_test=False).search([ ('user_id', '=', attendance.user_id), ('device_id', '=', r.id) ], limit=1) if attendance_user: utc_timestamp = r.convert_local_to_utc(attendance.timestamp, r.tz, naive=True) duplicate_attend = DeviceUserAttendance.search([ ('device_id', '=', r.id), ('user_id', '=', attendance_user.id), ('timestamp', '=', utc_timestamp) ], limit=1) if duplicate_attend: continue try: with r.env.cr.savepoint(): r.env.cr.execute("""SELECT id FROM user_attendance LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED""") DeviceUserAttendance.create({ 'device_id': r.id, 'user_id': attendance_user.id, 'timestamp': utc_timestamp, 'status': attendance.punch, 'attendance_state_id': attendance_states[attendance.punch] }) except Exception as e: error_msg += str(e) + "
" error_msg += _("Error create DeviceUserAttendance record: device_id %s; user_id %s; timestamp %s; attendance_state_id %s.
") % ( r.id, attendance_user.id, attendance.timestamp, attendance_states.get( attendance.punch, _("[Unknown attendance state, here is what we got for attendance.punch: %s") % attendance.punch ) ) _logger.error(error_msg) r.last_attendance_download = fields.Datetime.now() if error_msg and r.debug_message: r.message_post(body=error_msg) if not r.auto_clear_attendance: continue if r.auto_clear_attendance_schedule == 'on_download_complete': r._attendance_clear() elif r.auto_clear_attendance_schedule == 'time_scheduled': # datetime in the timezone of the device dt_now = self.convert_utc_to_local(fields.Datetime.now(), r.tz, naive=True) float_dt_now = self.time_to_float_hour(dt_now) if int(r.auto_clear_attendance_dow) == -1 or dt_now.weekday() == int(r.auto_clear_attendance_dow): delta = r.auto_clear_attendance_hour - float_dt_now if abs(delta) <= 0.5 or abs(delta) >= 23.5: r._attendance_clear() def _finger_template_download(self): FingerTemplate = self.env['finger.template'] self._employee_map() all_device_users = self.env['attendance.device.user'].search([('device_id', 'in', self.ids)]) for r in self: device_users = all_device_users.filtered(lambda dev: dev.device_id.id == r.id) # if there is still no device users, just ignore downloading finger templates if not device_users: continue template_data = r.getFingerTemplate() template_datas = [] for template in template_data: template_datas.append(str(template.uid) + '_' + str(template.fid)) existing_finger_template_ids = [] finger_template_ids = FingerTemplate.search([('device_id', '=', r.id)]) for template in finger_template_ids.filtered(lambda tmp: (str(tmp.uid) + '_' + str(tmp.fid)) in template_datas): existing_finger_template_ids.append(str(template.uid) + '_' + str(template.fid)) for template in template_data: uid = template.uid fid = template.fid valid = template.valid tmp = template.template device_user_id = self.env['attendance.device.user'].search([('uid', '=', uid), ('device_id', '=', r.id)], limit=1) device_user_id = device_users.filtered(lambda u: u.uid == uid) if not device_user_id: continue else: device_user_id = device_user_id[0] vals = { 'device_user_id': device_user_id.id, 'fid': fid, 'valid': valid, 'template': tmp, } if device_user_id.employee_id: vals['employee_id'] = device_user_id.employee_id.id if (str(template.uid) + '_' + str(template.fid)) not in existing_finger_template_ids: FingerTemplate.create(vals) else: existing = FingerTemplate.search([ ('uid', '=', uid), ('fid', '=', fid), ('device_id', '=', r.id), ], limit=1) if existing: existing.write(vals) return def is_attendance_clear_safe(self): """ If the data from machines has not been downloaded into Odoo, this method will return false """ UserAttendance = self.env['user.attendance'] User = self.env['attendance.device.user'] check_statuses = self.attendance_device_state_line_ids.mapped('code') attendances = self.getAttendance() # Attendance(user_id, timestamp, status) for att in attendances: if att.punch not in check_statuses: continue user = User.with_context(active_test=False).search([('user_id', '=', att.user_id), ('device_id', '=', self.id)], limit=1) utc_dt = self.convert_local_to_utc(att.timestamp, self.tz, naive=True) match = UserAttendance.search([('device_id', '=', self.id), ('user_id', '=', user.id), ('status', '=', att.punch), ('timestamp', '=', utc_dt)], limit=1) if not match: return False, att return True, False def _attendance_clear(self): """ Method to clear all attendance data from the machine """ email_template = self.env.ref('to_attendance_device.email_template_not_safe_to_clear_attendance') for r in self: error_msg = "" attendance_clear_safe, att = r.is_attendance_clear_safe() if attendance_clear_safe: r.clearAttendance() else: error_msg += _("It was not safe to clear attendance data from the machine %s.
") % (r.name,) error_msg += _("The following attendance data has not been stored in System yet:
") error_msg += _("user_id: %s
timestamp: %s
status: %s
") % (att.user_id, att.timestamp, att.punch) _logger.warning("It was not safe to clear attendance data from the machine %s" % r.name) if r.auto_clear_attendance_error_notif: try: with registry(self._cr.dbname).cursor() as cr: with cr.savepoint(): r.with_env(self.env(cr=cr)).post_message(email_template) # pylint: disable=invalid-commit cr.commit() except Exception as e: _logger.error( "Could not post message using the template %s. Here is debugging info: %s", email_template.display_name, str(e) ) if error_msg and r.debug_message: r.message_post(body=error_msg) if error_msg: raise ValidationError(error_msg.replace('
', '\n')) def action_check_connection(self): self.ensure_one() if self.connect(): self.disconnect() action = self._prepare_action_confirm() action['context'].update({ 'method': 'N/A', 'title': _('Machine Connection'), 'content': _("Connect to the machine %s successfully!") % (self.display_name,) }) return action def action_device_information(self): dbname = self._cr.dbname for r in self: try: with registry(dbname).cursor() as cr: with cr.savepoint(): r = r.with_env(r.env(cr=cr)) r.connect() r.firmware_version = r.zk.get_firmware_version() r.serialnumber = r.zk.get_serialnumber() r.platform = r.zk.get_platform() r.fingerprint_algorithm = r.zk.get_fp_version() r.device_name = r.zk.get_device_name() r.work_code = r.zk.get_workcode() r.oem_vendor = r.zk.get_oem_vendor() # pylint: disable=invalid-commit cr.commit() except Exception as e: _logger.error(e) raise UserError(e) @api.model def post_message(self, email_template): if self.user_id: self.message_subscribe([self.user_id.partner_id.id]) if email_template: # email_template can be a recordset or an ID - convert to recordset if needed if isinstance(email_template, int): email_template = self.env['mail.template'].browse(email_template) if email_template.exists(): self.message_post_with_source(email_template, email_layout_xmlid="mail.mail_notification_light") def action_view_users(self): result = self.env['ir.actions.act_window']._for_xml_id('to_attendance_device.device_user_list_action') # reset context result['context'] = {} # choose the view_mode accordingly if self.device_users_count != 1: result['domain'] = "[('id', 'in', %s)]" % self.with_context(active_test=False).device_user_ids.ids elif self.device_users_count == 1: res = self.env.ref('to_attendance_device.attendance_device_user_form_view', False) result['views'] = [(res and res.id or False, 'form')] result['res_id'] = self.device_user_ids.id return result def action_view_attendance_data(self): self.ensure_one() result = self.env['ir.actions.act_window']._for_xml_id('to_attendance_device.action_user_attendance_data') # reset context result['context'] = {} # choose the view_mode accordingly total_att_records = self.total_att_records if total_att_records != 1: result['domain'] = "[('device_id', 'in', " + str(self.ids) + ")]" elif total_att_records == 1: res = self.env.ref('to_attendance_device.view_attendance_data_form', False) result['views'] = [(res and res.id or False, 'form')] result['res_id'] = self.user_attendance_ids.id return result def action_view_mapped_employees(self): result = self.env['ir.actions.act_window']._for_xml_id('hr.open_view_employee_list_my') # reset context result['context'] = {} # choose the view_mode accordingly if self.mapped_employees_count != 1: result['domain'] = "[('id', 'in', " + str(self.with_context(active_test=False).mapped_employee_ids.ids) + ")]" elif self.mapped_employees_count == 1: res = self.env.ref('to_attendance_device.view_employee_form', False) result['views'] = [(res and res.id or False, 'form')] result['res_id'] = self.mapped_employee_ids.id return result def action_view_finger_template(self): self.ensure_one() result = self.env['ir.actions.act_window']._for_xml_id('to_attendance_device.action_finger_template') # reset context result['context'] = {} # choose the view_mode accordingly total_finger_template_records = self.total_finger_template_records if total_finger_template_records != 1: result['domain'] = "[('device_id', 'in', " + str(self.ids) + ")]" elif total_finger_template_records == 1: res = self.env.ref('to_attendance_device.view_finger_template_form', False) result['views'] = [(res and res.id or False, 'form')] result['res_id'] = self.finger_template_ids.id return result @api.ondelete(at_uninstall=False) def _unlink_if_draft_state(self): for r in self: if r.state != 'draft': raise UserError(_("You cannot delete the machine '%s' while its state is not Draft.") % (r.display_name,)) @api.ondelete(at_uninstall=False) def _unlink_except_not_force_delete(self): force_delete = self.env.context.get('force_delete', False) for r in self: if r.device_user_ids and not force_delete: raise UserError(_("You may not be able to delete the machine '%s' while its data is stored in System." " Please remove all the related data of this machine before removing it from System." " You may also consider to deactivate this machine so that you don't have to delete" " it.") % (r.display_name,)) def _prepare_action_confirm(self): return { 'type': 'ir.actions.act_window', 'name': _('Information'), 'res_model': 'device.confirm.wizard', 'view_mode': 'form', 'target': 'new', 'context': { 'active_id': self.id, 'safe_confirm': False, } } def action_user_upload(self): action = self._prepare_action_confirm() action['context'].update({ 'method': '_user_upload', 'title': _('Upload Users To Machine'), 'safe_confirm': True, 'content': _("System will map the existing users with System's employees." " The unmapped employees will be uploaded to this machine as new users" " and then download those new users into System and map them again" " with those unmapped employees. Do you want to proceed?") }) return action def action_user_download(self): action = self._prepare_action_confirm() action['context'].update({ 'method': '_user_download', 'title': _('Download Users From Machine'), 'content': _("System will connect and download all the users from your machine" " (without mapping those with the existing Employees in System)." " Do you want to proceed?") }) return action def action_employee_map(self): action = self._prepare_action_confirm() action['context'].update({ 'method': '_employee_map', 'title': _('Map Employees With Users'), 'content': _("System will connect and download all the users from your machine" " and try to map those with the System's employees." " Do you want to proceed?") }) return action def action_finger_template_download(self): action = self._prepare_action_confirm() action['context'].update({ 'method': '_finger_template_download', 'title': _('Download Fingerprints From Machine'), 'content': _("System will connect and download all the fingers template from your machine." " Do you want to proceed?") }) return action def action_clear_attendance_data(self): action = self._prepare_action_confirm() action['context'].update({ 'method': '_attendance_clear', 'title': _('Clear Attendance Data'), 'safe_confirm': True, 'content': _("System will connect and clear all the attendance data" " in this machine. Are you sure about this?") }) return action def action_restart(self): action = self._prepare_action_confirm() action['context'].update({ 'method': '_restart', 'title': _('Restart Machine'), 'safe_confirm': True, 'content': _("Are you sure to restart the machine: %s?") % (self.display_name,) }) return action class AttendanceDeviceStateLine(models.Model): _name = 'attendance.device.state.line' _description = 'Attendance Machine State' attendance_state_id = fields.Many2one('attendance.state', string='State Code', required=True, index=True,) device_id = fields.Many2one('attendance.device', string='Machine', required=True, ondelete='cascade', index=True, copy=False) code = fields.Integer(string='Code Number', related='attendance_state_id.code', store=True, readonly=True) type = fields.Selection(related='attendance_state_id.type', store=True) activity_id = fields.Many2one('attendance.activity', related='attendance_state_id.activity_id', help="Attendance activity, e.g. Normal Working, Overtime, etc", readonly=True, store=True, index=True) _sql_constraints = [ ('attendance_state_id_device_id_unique', 'UNIQUE(attendance_state_id, device_id)', "The Code must be unique per Machine"), ]