255 lines
12 KiB
Python
255 lines
12 KiB
Python
import requests
|
|
import re
|
|
import logging
|
|
from odoo import _
|
|
from odoo.http import route, request, Controller
|
|
from odoo.addons.payment_hyperpay.data.payment_icon import payment_icon
|
|
from odoo.addons.payment.models.payment_acquirer import PaymentToken
|
|
from datetime import datetime
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
TEST_URL = "https://eu-test.oppwa.com"
|
|
LIVE_URL = "https://eu-prod.oppwa.com"
|
|
|
|
class HyperPayTokenization(Controller):
|
|
|
|
@route('/hyperpay/tokens/checkout', type='json', auth='public', website=True, methods=['POST'])
|
|
def token_checkout(self, **kwargs):
|
|
if not kwargs.get('acquirer_id'):
|
|
return {'state': False, 'message': 'Couldn\'t identify acquirer'}
|
|
try:
|
|
self._save_session_data(kwargs)
|
|
checkout_data = self._register_payment_card(kwargs)
|
|
return checkout_data
|
|
except Exception as er:
|
|
_logger.exception('Error in token_checkout: %s' % er)
|
|
return {
|
|
'state': False,
|
|
'message': str(er)
|
|
}
|
|
|
|
def _save_session_data(self, data):
|
|
# save data in session as a form of naive saving
|
|
return True
|
|
|
|
def _register_payment_card(self, data):
|
|
acquirer_id = int(data.get('acquirer_id'))
|
|
acquirer = request.env['payment.acquirer'].sudo().search([('id', '=', acquirer_id)])
|
|
payment_icons = acquirer.payment_icon_ids.mapped('name')
|
|
base_url = request.httprequest.host_url
|
|
brands = [payment_icon[i.upper()] for i in payment_icons if i.upper() in payment_icon.keys()]
|
|
data_brands = brands and " ".join(brands) or "VISA"
|
|
tx = self._create_validation_transaction(acquirer)
|
|
payload = self._get_hyperpay_token_payload(acquirer, data)
|
|
|
|
payload.update({
|
|
"merchantTransactionId": tx.reference,
|
|
})
|
|
|
|
if acquirer.state == 'test':
|
|
domain = TEST_URL
|
|
else:
|
|
domain = LIVE_URL
|
|
|
|
url = f"{domain}/v1/checkouts"
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {acquirer.hyperpay_authorization}"
|
|
}
|
|
_logger.info('Hyperpay Token Checkout Payload: %s' % payload)
|
|
response_data = requests.post(url=url, data=payload, headers=headers)
|
|
_logger.info('Hyperpay Token Checkout Response Text: %s' % response_data.text)
|
|
response = response_data.json()
|
|
_logger.info('Hyperpay Token Checkout Response JSON: %s' % response)
|
|
result = response.get('result', {})
|
|
result_code = result.get('code')
|
|
|
|
if result_code and not re.match(r"^(000\.000\.|000\.100\.1|000\.[36]|000\.400\.1[12]0|000\.400\.0[^3]|000\.400\.100|000\.200)", result_code):
|
|
return {'state': False, 'message': result.get('description', '')}
|
|
|
|
return_url = f'{base_url}hyperpay/tokens/result?transaction_id={tx.id}'
|
|
|
|
return {
|
|
'state': True,
|
|
'checkout_id': response.get('id'),
|
|
'service_domain': domain,
|
|
'base_url': base_url,
|
|
'data_brands': data_brands,
|
|
'return_url': return_url,
|
|
}
|
|
|
|
def _get_hyperpay_token_payload(self, acquirer, data):
|
|
reference_id = request.session.get('hyperpay_token_reference_id')
|
|
reference_model = request.session.get('hyperpay_token_reference_model')
|
|
partner_id = request.env.user.partner_id
|
|
currency = partner_id.currency_id
|
|
amount = self._get_validation_amount(currency)
|
|
return {
|
|
"entityId": acquirer.hyperpay_merchant_id,
|
|
'createRegistration': True,
|
|
"amount": '%.2f' % amount,
|
|
"currency": currency.name,
|
|
"paymentType": "DB",
|
|
'standingInstruction.mode': 'INITIAL',
|
|
'standingInstruction.source': 'CIT',
|
|
'standingInstruction.recurringType': 'STANDING_ORDER',
|
|
'customParameters[paymentFrequency]': 'OTHER',
|
|
'standingInstruction.type': 'UNSCHEDULED',
|
|
"billing.street1": partner_id.street or 'Riyadh',
|
|
"billing.street2": partner_id.street2 or '',
|
|
"billing.city": partner_id.city or 'Riyadh',
|
|
"billing.state": partner_id.state_id.name if partner_id.state_id else 'Riyadh',
|
|
"billing.postcode": partner_id.zip or '',
|
|
"billing.country": partner_id.country_id.code or 'SA',
|
|
"customer.givenName": partner_id.name,
|
|
"customer.surname": '',
|
|
"customer.email": partner_id.email or 'noemail@example.com',
|
|
"customer.mobile": partner_id.mobile or partner_id.phone or '',
|
|
"customer.phone": partner_id.phone or partner_id.mobile or '',
|
|
"customParameters[SHOPPER_acquirer_id]": data.get('acquirer_id', 0),
|
|
"customParameters[SHOPPER_hyperpay_token_reference_id]": reference_id,
|
|
"customParameters[SHOPPER_hyperpay_token_reference_model]": reference_model,
|
|
}
|
|
|
|
def _create_validation_transaction(self, acquirer_id):
|
|
partner_id = request.env.user.partner_id
|
|
currency = partner_id.currency_id
|
|
amount = self._get_validation_amount(currency)
|
|
|
|
reference = "VALIDATION-%s" % (datetime.now().strftime('%y%m%d_%H%M%S'))
|
|
tx = request.env['payment.transaction'].sudo().create({
|
|
'amount': amount,
|
|
'acquirer_id': acquirer_id.id,
|
|
'type': 'validation',
|
|
'currency_id': currency.id,
|
|
'reference': reference,
|
|
'partner_id': partner_id.id,
|
|
'partner_country_id': partner_id.country_id.id,
|
|
'state_message': _('This Transaction was automatically processed & refunded in order to validate a new credit card.'),
|
|
})
|
|
|
|
return tx
|
|
|
|
def _get_validation_amount(self, currency):
|
|
if PaymentToken.VALIDATION_AMOUNTS.get(currency.name):
|
|
return PaymentToken.VALIDATION_AMOUNTS.get(currency.name)
|
|
else:
|
|
# If we don't find the user's currency, then we set the currency to EUR and the amount to 1€50.
|
|
return 1.5
|
|
|
|
@route('/hyperpay/tokens/result', type='http', auth='public', website=True)
|
|
def token_return(self, **post):
|
|
try:
|
|
_logger.info('Hyperpay Token Return Post: %s' % post)
|
|
|
|
transaction_id = request.env['payment.transaction'].sudo().search([('id', '=', int(post.get('transaction_id', 0)))], limit=1)
|
|
if not transaction_id:
|
|
_logger.error('Transaction not found: %s' % post.get('transaction_id'))
|
|
return request.make_response('Transaction not found', headers=[('Content-Type', 'text/html')])
|
|
|
|
acquirer_id = transaction_id.acquirer_id
|
|
if not acquirer_id:
|
|
_logger.error('Acquirer not found for transaction: %s' % transaction_id.id)
|
|
return request.make_response('Payment provider not found', headers=[('Content-Type', 'text/html')])
|
|
|
|
resource_path = post.get('resourcePath')
|
|
if not resource_path:
|
|
_logger.error('resourcePath not found in callback data: %s' % post)
|
|
return request.make_response('Invalid callback data', headers=[('Content-Type', 'text/html')])
|
|
|
|
if acquirer_id.state == 'test':
|
|
domain = TEST_URL
|
|
else:
|
|
domain = LIVE_URL
|
|
|
|
url = f"{domain}{resource_path}?entityId={acquirer_id.hyperpay_merchant_id}"
|
|
headers = {
|
|
"Authorization": f"Bearer {acquirer_id.hyperpay_authorization}"
|
|
}
|
|
_logger.info('Hyperpay Token Status Request: %s' % url)
|
|
response_data = requests.get(url=url, headers=headers)
|
|
_logger.info('Hyperpay Token Status Response Text: %s' % response_data.text)
|
|
resp = response_data.json()
|
|
_logger.info('Hyperpay Token Status Response JSON: %s' % resp)
|
|
|
|
resp.update({'tx_id': transaction_id.id})
|
|
request.env['payment.transaction'].sudo().form_feedback(resp, "hyperpay")
|
|
|
|
transaction_id = request.env['payment.transaction'].sudo().search([('id', '=', transaction_id.id)], limit=1)
|
|
|
|
if transaction_id.state != 'done':
|
|
_logger.error('Validation transaction not done: %s (state: %s)' % (transaction_id.id, transaction_id.state))
|
|
return request.make_response('Payment validation failed. Please try again or contact support.', headers=[('Content-Type', 'text/html')])
|
|
|
|
res = self._post_process_token_return(transaction_id, resp)
|
|
if not res.get('state'):
|
|
return request.make_response(res.get('message'), headers=[('Content-Type', 'text/html')])
|
|
except Exception as er:
|
|
# request.env.cr.rollback()
|
|
_logger.exception('Error processing token return: %s' % er)
|
|
return request.make_response('An error occurred while processing your payment. Please contact support.\n Stack Trace: %s' % er, headers=[('Content-Type', 'text/html')])
|
|
|
|
return request.redirect('/my/recurring_donation')
|
|
|
|
def _post_process_token_return(self, transaction_id, data):
|
|
|
|
card = data.get('card', {})
|
|
if not card:
|
|
_logger.error('Card data not found in response')
|
|
return {'state': False, 'message': 'Card data not found in response'}
|
|
|
|
initial_tx_id = data.get('resultDetails', {}).get('CardholderInitiatedTransactionID') or data.get('CardholderInitiatedTransactionID', '')
|
|
registration_id = data.get('registrationId', '')
|
|
|
|
if not initial_tx_id:
|
|
_logger.error('Initial transaction ID not found - token will not work for recurring payments')
|
|
return {'state': False, 'message': 'Initial transaction ID not found - token will not work for recurring payments'}
|
|
|
|
if not registration_id:
|
|
_logger.error('Registration ID not found in response')
|
|
return {'state': False, 'message': 'Registration ID not found in response'}
|
|
|
|
acquirer_id = transaction_id.acquirer_id
|
|
existing_token = request.env['payment.token'].sudo().search([
|
|
('acquirer_ref', '=', registration_id),
|
|
('acquirer_id', '=', acquirer_id.id)
|
|
], limit=1)
|
|
|
|
if existing_token:
|
|
_logger.warning('Token already exists for registration %s, returning existing token' % registration_id)
|
|
reference_id = int(data.get('customParameters', {}).get('SHOPPER_hyperpay_token_reference_id', 0))
|
|
reference_model = data.get('customParameters', {}).get('SHOPPER_hyperpay_token_reference_model', '')
|
|
if reference_id and reference_model and reference_model in request.env:
|
|
record_id = request.env[reference_model].sudo().search([('id', '=', reference_id)])
|
|
if record_id and hasattr(record_id, '_post_process_card_tokenization'):
|
|
record_id._post_process_card_tokenization(existing_token)
|
|
return {'state': True, 'message': 'Token already exists for registration %s, returning existing token' % registration_id}
|
|
|
|
try:
|
|
transaction_id.hyperpay_s2s_do_refund()
|
|
except Exception as er:
|
|
_logger.error('Hyperpay Token Return Transaction refund failed: %s' % er)
|
|
|
|
|
|
card_vals = {
|
|
'name': f"{card.get('bin', '')}XXXXXXXXXXXX{card.get('last4Digits', '')}",
|
|
'partner_id': request.env.user.partner_id.id,
|
|
'acquirer_id': acquirer_id.id,
|
|
'acquirer_ref': registration_id,
|
|
'hyperpay_payment_brand': data.get('paymentBrand'),
|
|
'hyperpay_initial_transaction_id': initial_tx_id,
|
|
'verified': True,
|
|
}
|
|
|
|
token_id = request.env['payment.token'].sudo().create(card_vals)
|
|
_logger.info('Created payment token %s for partner %s' % (token_id.id, request.env.user.partner_id.id))
|
|
|
|
reference_id = int(data.get('customParameters', {}).get('SHOPPER_hyperpay_token_reference_id', 0))
|
|
reference_model = data.get('customParameters', {}).get('SHOPPER_hyperpay_token_reference_model', '')
|
|
if reference_id and reference_model and reference_model in request.env:
|
|
record_id = request.env[reference_model].sudo().search([('id', '=', reference_id)])
|
|
if record_id and hasattr(record_id, '_post_process_card_tokenization'):
|
|
record_id._post_process_card_tokenization(token_id)
|
|
|
|
return {'state': True, 'message': 'Token created successfully'} |