iT邦幫忙

2021 iThome 鐵人賽

DAY 27
0
永豐金融APIs

試著讀懂與串接永豐金融APIs系列 第 27

Day 0x1B - odoo addons 永豐金流開發(Part 2 - sinopac sdk... maybe)

  • 分享至 

  • xImage
  •  

*** 模組資料夾 payment_sinopac 以 "/" 來代表此資料夾 ***

0x1 永豐金流的sdk...吧

說實在要改的還太多,但這裡還是先放出來,
也沒什麼時間修正了,鐵人賽結束了還是會持續的修正 sdk

OK,那先建立 sdk 的 controller
在模組目錄先建立 controller 資料夾
接著建立 __init__.py
/controller/__init__.py

# -*- coding: utf-8 -*-
from . import sinopac_sdk

然後建立 sinopac_sdk.py
這個基本上是參考 laravel 當時寫的 sdk,
翻過來後也有測試是正常的
/controller/sinopac_sdk.py

import requests as req
import hashlib
import logging
from json import loads, dumps
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad

_logger = logging.getLogger(__name__)
href_origin = 'https://sandbox.sinopac.com/QPay.WebAPI'


def str2int(x):
    return int(x, 16)


class SinopacSDK:
    shop_no = False
    nonce = False
    key_a1 = False
    key_a2 = False
    key_b1 = False
    key_b2 = False

    def __init__(self, **params):
        """
        @params str shop_no: 商店代碼
        @params str key_a1: 官方提供 hash key a1
        @params str key_a2: 官方提供 hash key a2
        @params str key_b1: 官方提供 hash key b1
        @params str key_b2: 官方提供 hash key b2
        """
        names = ['shop_no', 'key_a1', 'key_a2', 'key_b1', 'key_b2']
        for n in names:
            if not params.get(n):
                raise Exception(f'Not given {n}')
            else:
                self.__setattr__(n, params.get(n))

    def get_nonce(self) -> str:
        url = f'{href_origin}/api/Nonce'
        header = {
            'Content-type': 'application/json'
        }
        data = dumps({
            'ShopNo': self.shop_no
        })
        response = req.post(url=url, headers=header, data=data)

        if response.status_code >= 500:
            raise Exception('Server Error')
        elif response.status_code >= 400:
            raise Exception('Self Error')
        elif response.status_code != 200:
            raise Exception(f'Error Status: {response.status_code}')

        content = loads(response.content)
        if not content or not content.get('Nonce'):
            raise Exception('Get Nonce Failure')

        self.nonce = content.get('Nonce')
        return self.nonce

    def calculate_hash_id(self) -> str:
        key_length = len(self.key_a1)
        a = hex(str2int(self.key_a1) ^ str2int(self.key_a2))[2:]
        b = hex(str2int(self.key_b1) ^ str2int(self.key_b2))[2:]
        return f'{a:0>{key_length}}{b:0>{key_length}}'.upper()

    def sha256(self, string: str) -> str:
        return hashlib.sha256(string.encode('utf-8')).hexdigest().upper()

    def calculate_sign(self, data: dict, nonce: str, hash_id: str) -> str:
        uppercase_keys = sorted([k.upper() for k in data])
        new_dict = {}
        for key, value in data.items():
            if type(value) in [list, dict]:
                continue

            if value == '' or value is False:
                continue

            new_dict[uppercase_keys.index(key.upper())] = f'{key}={value}'

        message = '&'.join([new_dict[i] for i in sorted(new_dict)]) + nonce + hash_id
        return self.sha256(message)

    def calculate_iv(self, nonce: str) -> str:
        if not nonce:
            nonce = self.nonce or self.get_nonce()

        return self.sha256(nonce)[-16:]

    def aes_cipher(self, hash_id: str, iv: str):
        byte_key = bytes(hash_id.encode('utf8'))
        byte_iv = bytes(iv.encode('utf8'))
        return AES.new(key=byte_key, mode=AES.MODE_CBC, iv=byte_iv)

    def encrypt_message(self, data: dict, hash_id: str, iv: str):
        cipher = self.aes_cipher(hash_id, iv)

        json_data = dumps(data).encode('utf8')
        encrypt_message = cipher.encrypt(pad(json_data, AES.block_size))
        return encrypt_message.hex().upper()

    def decrypt_message(self, encrypt_message: str, hash_id: str, iv: str):
        cipher = self.aes_cipher(hash_id, iv)
        encrypt_message = bytes.fromhex(encrypt_message)
        message = unpad(cipher.decrypt(encrypt_message), AES.block_size)
        return loads(message)

    def call_api(self, url: str, data: dict):
        header = {
            'Content-type': 'application/json'
        }

        response = req.post(url=url, headers=header, data=dumps(data))
        data = loads(response.content)

        if 'Nonce' not in data:
            _logger.warning('Reply message haven\'t Nonce. JSON: %s', response.content.decode('utf8'))
            return {}

        hash_id = self.calculate_hash_id()
        dec_data = self.decrypt_message(
            encrypt_message=data['Message'],
            hash_id=hash_id,
            iv=self.calculate_iv(data['Nonce'])
        )

        if dec_data.get('Status', '') != 'S':
            _logger.warning('訂單建立失敗,原因: %s', dec_data.get('Description', ''))
            return {}

        sign = self.calculate_sign(
            data=dec_data,
            hash_id=hash_id,
            nonce=data['Nonce']
        )

        if sign != data['Sign']:
            _logger.error('驗證錯誤,內文簽章不同. JSON: %s', response.content)
            return {}

        return dec_data

0x2 今日結語

基本上依照之前踩過的坑都考慮進去了
但在 sign 那段卡了一陣子
莫名其妙 python 跟 php sha256 算出來的不一樣
去上個廁所回來再按一次就正常了

真的很奇妙...
明天把主要Controller 新增 route 的部分丟上來
目前還卡在底層的流程釐清跟實現,希望可以順利完成


上一篇
Day 0x1A odoo addons 永豐金流開發(Part 2 - model, view, security)
下一篇
Day 0x 1C - odoo addons 永豐金流開發(Part 3 - controller)
系列文
試著讀懂與串接永豐金融APIs30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言