odex30_standard/odex30_base/data_chart/models/data_chart.py

690 lines
23 KiB
Python

# -*- coding: utf-8 -*-
##############################################################################
#
# Expert Co. Ltd.
# Copyright (C) 2018 (<http://www.exp-sa.com/>).
#
##############################################################################
import base64
from odoo import models, fields, api, _
import datetime
from odoo.http import request
from itertools import groupby
import time
import sqlite3
from odoo.tools import DEFAULT_SERVER_DATETIME_FORMAT
from odoo.exceptions import ValidationError
import json
sqllite_keys = ["ABORT",
"ACTION",
"ADD",
"AFTER",
"ALL",
"ALTER",
"ANALYZE",
"AND",
"AS",
"ASC",
"ATTACH",
"AUTOINCREMENT",
"BEFORE",
"BEGIN",
"BETWEEN",
"BY",
"CASCADE",
"CASE",
"CAST",
"CHECK",
"COLLATE",
"COLUMN",
"COMMIT",
"CONFLICT",
"CONSTRAINT",
"CREATE",
"CROSS",
"CURRENT",
"CURRENT_DATE",
"CURRENT_TIME",
"CURRENT_TIMESTAMP",
"DATABASE",
"DEFAULT",
"DEFERRABLE",
"DEFERRED",
"DELETE",
"DESC",
"DETACH",
"DISTINCT",
"DO",
"DROP",
"EACH",
"ELSE",
"END",
"ESCAPE",
"EXCEPT",
"EXCLUDE",
"EXCLUSIVE",
"EXISTS",
"EXPLAIN",
"FAIL",
"FILTER",
"FOLLOWING",
"FOR",
"FOREIGN",
"FROM",
"FULL",
"GLOB",
"GROUP",
"GROUPS",
"HAVING",
"IF",
"IGNORE",
"IMMEDIATE",
"IN",
"INDEX",
"INDEXED",
"INITIALLY",
"INNER",
"INSERT",
"INSTEAD",
"INTERSECT",
"INTO",
"IS",
"ISNULL",
"JOIN",
"KEY",
"LEFT",
"LIKE",
"LIMIT",
"MATCH",
"NATURAL",
"NO",
"NOT",
"NOTHING",
"NOTNULL",
"NULL",
"OF",
"OFFSET",
"ON",
"OR",
"ORDER",
"OTHERS",
"OUTER",
"OVER",
"PARTITION",
"PLAN",
"PRAGMA",
"PRECEDING",
"PRIMARY",
"QUERY",
"RAISE",
"RANGE",
"RECURSIVE",
"REFERENCES",
"REGEXP",
"REINDEX",
"RELEASE",
"RENAME",
"REPLACE",
"RESTRICT",
"RIGHT",
"ROLLBACK",
"ROW",
"ROWS",
"SAVEPOINT",
"SELECT",
"SET",
"TABLE",
"TEMP",
"TEMPORARY",
"THEN",
"TIES",
"TO",
"TRANSACTION",
"TRIGGER",
"UNBOUNDED",
"UNION",
"UNIQUE",
"UPDATE",
"USING",
"VACUUM",
"VALUES",
"VIEW",
"VIRTUAL",
"WHEN",
"WHERE",
"WINDOW",
"WITH",
"WITHOUT"]
class data_chart_report(models.Model):
_name = 'data.chart.report'
_description = 'Data Chart Report'
name = fields.Char(string='Name', copy=False)
title = fields.Text(string='Title', copy=False)
type = fields.Selection(
selection=[('model', 'Model'),
('query', 'Query')],
string='Type', copy=True)
header = fields.Html(string='Header')
footer = fields.Html(string='Footer')
seq = fields.Char(string='Sequence')
menu_id = fields.Many2one(comodel_name='ir.ui.menu', string='Menu', copy=False)
parent_menu_id = fields.Many2one(comodel_name='ir.ui.menu', string='Parent Menu')
action_id = fields.Many2one(comodel_name='ir.actions.client', string='Action', copy=False)
data_items_ids = fields.One2many(comodel_name='data_chart_data_item',
inverse_name='report_id', string='Models', copy=True)
report_names = fields.One2many(comodel_name='report_cols_names',
inverse_name='report_id', string='Cols Names', copy=True)
query = fields.Text(string='Query')
model_id = fields.Many2one(comodel_name='ir.model', string='Model')
groups_ids = fields.Many2many(comodel_name='res.groups', string='Groups', copy=True)
def create_menu(self):
self.ensure_one()
if type == 'query':
self.get_data()
if self.menu_id:
self.menu_id.unlink()
if self.action_id:
self.action_id.unlink()
if self.type == 'query':
action_id = self.env['ir.actions.client'].create({
'name': self.name,
'tag': 'data_view',
'context': {
'model': 'data.chart.report',
'active_id': self.id,
'title': self.title,
'create': False,
},
'target': 'current',
})
else:
action_id = self.env['ir.actions.client'].create({
'name': self.name,
'tag': 'data_view',
'context': {
'model': self.model_id.model,
'active_id': self.id,
'title': self.title,
'create': False,
},
'target': 'current',
})
self.action_id = action_id.id
menu_id = self.env['ir.ui.menu'].create({
'name': self.name,
'parent_id': self.parent_menu_id and self.parent_menu_id.id or False,
'action': 'ir.actions.client,%d' % (action_id,),
'groups_id': [[6, 0, [x.id for x in self.groups_ids]]]
})
self.menu_id = menu_id.id
def show_data(self):
if self.type == 'query':
return {
'name': self.name,
'type': 'ir.actions.client',
'tag': 'data_chart',
'context': {
'model': 'data.chart.report',
'active_id': self.id,
'title': self.title,
'admin_view': True,
},
}
else:
return {
'name': self.name,
'tag': 'data_view',
'context': {
'model': self.model_id.model,
'active_id': self.id,
'title': self.title,
'admin_view': True,
},
'type': 'ir.actions.client',
'target': 'new',
}
def get_data(self):
self.ensure_one()
# Create database connection to an in-memory database
connectionObject = sqlite3.connect(":memory:")
def display_name(model, field, value):
try:
# context = {'lang': user.lang}
# self.env = self.env(context=context)
data_items = self.data_items_ids.filtered(lambda x: x.name == model)
if data_items:
data_item = data_items[0]
model_id = data_item.data_item
model = model_id.model
if self.env[model]._fields[field].type == 'selection':
value = dict(self.env[model]._fields[field]._description_selection(
self.env))[value]
except:
return value
return value
connectionObject.create_function("display_name", 3, display_name)
connectionObject.row_factory = dict_factory
# Obtain a cursor object
cursorObject = connectionObject.cursor()
# Create a table in the in-memory database
for data_item in self.data_items_ids:
model = table = False
# if data_item.data_item.startswith('model_'):
# model = data_item.data_item.replace('model_', '', 1)
#model_id = self.env['ir.model'].search([('model', '=', model)])
model_id = data_item.data_item
model = model_id.model
fields = self.env['ir.model.fields'].search(
[('model_id', 'in', model_id.ids)])
fields = fields.read(['name', 'field_description', 'ttype'])
fields_types = {x['name']: x['ttype'] for x in fields}
cols = ""
cols_names = ""
for field in fields_types:
if field.upper() in sqllite_keys:
continue
if fields_types[field] in ['char', 'text', 'selection']:
cols += field + ' text , '
cols_names += field + ' , '
elif fields_types[field] in ['integer']:
cols += field + ' int , '
cols_names += field + ' , '
elif fields_types[field] in ['float', 'monetary']:
cols += field + ' REAL , '
cols_names += field + ' , '
elif fields_types[field] in ['date']:
cols += field + ' DATE , '
cols_names += field + ' , '
elif fields_types[field] in ['datetime']:
cols += field + ' datetime , '
cols_names += field + ' , '
elif fields_types[field] in ['many2one']:
cols += field + ' int , '
cols_names += field + ' , '
elif fields_types[field] in ['boolean']:
cols += field + ' boolean , '
cols_names += field + ' , '
createTable = "CREATE TABLE " + data_item.name + " (" + cols[0:-2] + " )"
cursorObject.execute(createTable)
# insert the data
data = self.env[model].with_context({'data_chart_search': True}).search([])
for rec in data:
row = ""
readed = rec.with_context({'data_chart_search': False}).read()[0]
for item in cols_names[0:-3].split(' , '):
item_data = readed.get(item, False)
if item == 'name':
readed['name'] = rec.name_get()[0][1]
if item_data:
row += '"'+str(item_data)+'"' + " , "
if not item_data:
row += '" "' + " , "
elif fields_types[item] == 'many2one':
if item_data:
row += str(item_data[0]) + " , "
else:
row += 'null' + " , "
elif fields_types[item] in ['char', 'text', 'selection']:
if item_data:
item_data = str(item_data).replace('"', '\'')
row += '"'+str(item_data)+'"' + " , "
if not item_data:
row += '" "' + " , "
elif fields_types[item] in ['date', 'datetime']:
row += '"'+str(item_data)+'"' + " , "
elif fields_types[item] in ['boolean']:
if item_data:
row += str('1') + " , "
if not item_data:
row += str('0') + " , "
else:
if item_data:
row += str(item_data) + " , "
if not item_data:
row += 'null' + " , "
insertValues = """INSERT INTO """ + data_item.name + \
"""( """ + cols_names[0:-3] + """ )""" + """ values(""" + row[0:-3] + """ )"""
cursorObject.execute(insertValues)
queryTable = self.query
queryResults = cursorObject.execute(queryTable)
data = queryResults.fetchall()
report_names = self.report_names
#report_names = {x.d_name: x.t_name for x in report_names}
report_names = {x.t_name: x.d_name for x in report_names}
new_data = []
for rec in data:
new_rec = rec
for item in list(filter(lambda x: x in report_names, rec.keys())):
new_rec[report_names[item]] = rec[item]
del new_rec[item]
new_data.append(new_rec)
connectionObject.close()
return new_data
class data_chart_report_models(models.Model):
_name = 'data_chart_data_item'
_description = 'Data Chart Data Item'
def _get_models_tables(self):
ir_model_relation = self.env['ir.model.relation'].search([])
ir_model_relation = ir_model_relation.read(['name'])
ir_model_relation = [('rel'+'_'+x['name'], x['name']) for x in ir_model_relation]
ir_model = self.env['ir.model'].search([])
ir_model = ir_model.read(['model', 'name'])
ir_model = [('model'+'_'+x['model'], x['name']+'_'+x['model']) for x in ir_model]
return ir_model_relation + ir_model
data_item = fields.Many2one('ir.model', string='Data node')
name = fields.Char(string='Name')
report_id = fields.Many2one(comodel_name='data.chart.report', string='Report')
class report_cols_names(models.Model):
_name = 'report_cols_names'
_description = 'Data Chart Data Cols Names'
t_name = fields.Char(string='Technical Name')
d_name = fields.Char(string='Display Name')
report_id = fields.Many2one(comodel_name='data.chart.report', string='Report')
class data_chart_model(models.Model):
_name = 'data_chart_model'
model = fields.Char(string='Model')
user_id = fields.Many2one(comodel_name='res.users', string='User')
res_id = fields.Integer()
options = fields.Text(string='Options')
slice = fields.Text(string='slice')
conditions = fields.Text(string='conditions')
formats = fields.Text(string='formats')
cols_types = fields.Text(string='Column Types')
@api.model
def data_chart_details(self, model, res_id=False, with_default=False):
try:
return self._data_chart_details(model, res_id, with_default)
except:
raise ValidationError(
_('''ERROR IN REPORT CONFIGURATION ! '''))
@api.model
def _data_chart_details(self, model, res_id=False, with_default=False):
"""
get the list of dicts in sorted oreder to fit in data view
"""
uid = request.session.uid
exist = self.search([('user_id', '=', uid), ('model', '=', model), ('res_id', '=', res_id)])
if(with_default) or not exist:
exist = self.search(
[('user_id', '=', False),
('model', '=', model),
('res_id', '=', res_id)])
company_image = False
header = footer = False
report_names = {}
user = self.env['res.users'].search([('id', '=', uid)])
if user:
company_image = ""+str(user.company_id.logo, 'utf-8', 'ignore')+""
context = {'lang': user.lang}
self.env = self.env(context=context)
if res_id and model == 'data.chart.report':
data_object = self.env[model].search(
[('id', '=', res_id)])
data = data_object.get_data()
cols_types = self.get_data_types(data)
# for a dum reason
cols_types = json.dumps(cols_types)
header = data_object.header
footer = data_object.footer
if exist:
return {'data': data, 'options': exist.options,
'slice': exist.slice, 'conditions': exist.conditions, 'formats': exist.formats,
'cols_types': exist.cols_types and exist.cols_types or cols_types,
'header': header, 'footer': footer, 'company_image': company_image}
else:
return {'data': data, 'header': header, 'footer': footer, 'cols_types': cols_types, 'company_image': company_image}
if res_id and model != 'data.chart.report':
data_object = self.env['data.chart.report'].search(
[('id', '=', res_id)])
data = self.env[model].search([])
header = data_object.header
footer = data_object.footer
report_names = data_object.report_names
report_names = {x.t_name: x.d_name for x in report_names}
else:
data = self.env[model].search([])
model_id = self.env['ir.model'].search([('model', '=', model)])
fields = self.env['ir.model.fields'].search(
[('model_id', 'in', model_id.ids)])
fields = fields.read(['name', 'field_description', 'ttype'])
fields_names = {x['name']: x['field_description'] for x in fields}
fields_types = {x['name']: x['ttype'] for x in fields}
selectoin_fields = {}
for fff in self.env[model]._fields:
if self.env[model]._fields[fff].type == 'selection':
selectoin_fields[fff] = dict(
self.env[model]._fields[fff]._description_selection(self.env))
all_data = []
cols_types = {}
cols_in_report = []
for field in fields_types:
if fields_types[field] in ['char', 'text', 'selection']:
cols_types[field in report_names and report_names[field]
or fields_names[field]] = {'type': 'string'}
cols_in_report.append(field)
elif fields_types[field] in ['integer', 'float', 'monetary']:
cols_types[field in report_names and report_names[field]
or fields_names[field]] = {'type': 'number'}
cols_in_report.append(field)
elif fields_types[field] in ['date']:
cols_types[field in report_names and report_names[field]
or fields_names[field]] = {'type': 'date string'}
cols_in_report.append(field)
elif fields_types[field] in ['datetime']:
cols_types[field in report_names and report_names[field]
or fields_names[field]] = {'type': 'datetime'}
cols_in_report.append(field)
elif fields_types[field] in ['many2one']:
cols_types[field in report_names and report_names[field]
or fields_names[field]] = {'type': 'string'}
cols_in_report.append(field)
# all_data.append(cols_types)
# for a dum reason
cols_types = json.dumps(cols_types)
for rec in data:
row = {}
readed = rec.read()[0]
for item in cols_in_report:
if item == 'name':
readed['name'] = rec.name_get()[0][1]
if fields_types[item] == 'many2one':
readed[item] = readed[item] and readed[item][1] or ""
if fields_types[item] == 'selection':
readed[item] = selectoin_fields[item].get(readed[item], "")
row[item in report_names and report_names[item] or fields_names[item]] = readed[item]
all_data.append(row)
if exist:
return {'data': all_data, 'options': exist.options,
'slice': exist.slice, 'conditions': exist.conditions,
'cols_types': exist.cols_types and exist.cols_types or cols_types,
'formats': exist.formats,
'header': header, 'footer': footer,
'company_image': company_image}
else:
return {'data': all_data, 'company_image': company_image,
'header': header, 'footer': footer, 'cols_types': cols_types}
def get_key_type(self, key, data, index):
if not data[index][key]:
if index + 1 < len(data):
return self.get_key_type(key, data, index + 1)
else:
return {}
# check if the data type is not string
if type(data[index][key]) in [int, float]:
return {'type': 'number'}
# check date
try:
datetime.datetime.strptime(data[index][key], DEFAULT_SERVER_DATE_FORMAT)
return {'type': 'date string'}
except:
pass
# check datetime
try:
datetime.datetime.strptime(data[index][key], DEFAULT_SERVER_DATETIME_FORMAT)
return {'type': 'datetime'}
except:
pass
if data[index][key].isdigit():
return {'type': 'number'}
elif data[index][key] != 'null':
return {'type': 'string'}
else:
if index + 1 < len(data):
return self.get_key_type(key, data, index + 1)
else:
return {}
def get_data_types(self, data):
types_dict = {}
for key in data[0]:
types_dict[key] = self.get_key_type(key, data, 0)
return types_dict
@api.model
def save_options(self, model, res_id, options, slice, conditions, formats, cols_types):
uid = request.session.uid
exist = self.search([('user_id', '=', uid), ('model', '=', model), ('res_id', '=', res_id)])
if exist:
exist.options = options
exist.slice = slice
exist.conditions = conditions
exist.formats = formats
exist.cols_types = cols_types
else:
self.create({
'user_id': uid,
'model': model,
'res_id': res_id,
'options': options,
'slice': slice,
'conditions': conditions,
'formats': formats,
'cols_types': cols_types,
})
@api.model
def set_default(self, model, res_id, options, slice, conditions, formats, cols_types):
exist = self.search(
[('user_id', '=', False),
('model', '=', model),
('res_id', '=', res_id)])
if exist:
exist.options = options
exist.slice = slice
exist.conditions = conditions
exist.formats = formats
exist.cols_types = cols_types
else:
self.create({
'user_id': False,
'model': model,
'res_id': res_id,
'options': options,
'slice': slice,
'conditions': conditions,
'formats': formats,
'cols_types': cols_types,
})
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d