iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 9
2

實作

首先先建立一個consumer/資料夾,新增__init__.py以及notify_handler.py

notify_handler.py輸入以下程式碼

import json
import requests


def line_notify_handler(event, context):
    print(event)
    body = json.loads(event['Records'][0]['body'])
    print(body)
    headers = {
        'Authorization': f"Bearer {body['token']}",
        'Content-Type': 'application/x-www-form-urlencoded'
    }
    r = requests.post('https://notify-api.line.me/api/notify',
                      headers=headers,
                      data={'message': body['message']})
    print(r)

# AWS record sample
# {'Records': [{'messageId': 'fddc42ba-a122-4581-965e-d0144ac8a5ad', 'receiptHandle': 'AQEBjO32gY5pXOfOrmDR0hD4k1av9KyjbHFpc+rIBPV2Brif7Lo+jqnGevSjfFwlICyGf+BhWwKaxFw8XdB3QTzRbw0vnLURjnQeDSBrJHa/S57SRs9TOLRBq38maycAVg69iZbetg9VhLMBCcLtOtPHTzKkmo+/Sosm51WA5CzXK7A0rteikx6nxS1CUIpq6MAujodupP0Hgr5RjK5nH/nmxA4Db0leWEmLokalZbtlx4W14tp7PZxPOrQOLDaGrH//p4h32tY8IN3MkCqi+gyNT7kCU4KwCGOIrybb07ZWyKBTKw+KOMNr/Ykj4z2N1qxIvTM55UY9d8V29YsH32OjrZTei5P7Nke/51E2tWkmkqoFAlqzxDjQPvpP+Pvvr8aazeeZ6opkr59UefAiiyM71Q==', 'body': 'hi', 'attributes': {'ApproximateReceiveCount': '9', 'SentTimestamp': '1566621263072', 'SenderId': '901588721449', 'ApproximateFirstReceiveTimestamp': '1566621263072'}, 'messageAttributes': {}, 'md5OfBody': '49f68a5c8493ec2c0bf489821c21fc3b', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-east-1:901588721449:LINE_notify_consumer', 'awsRegion': 'us-east-1'}]}

接著在requirements.txt加入boto3,他是一個使用 python 介接 AWS 的套件

boto3==1.9.189

加入SQS_URL以及SQS_ARN.env裡面

SQS_URL=sqs url
SQS_ARN=your sqs arn

add controller/notify_sqs_controller.py

from flask_restful import Resource, reqparse
import json
from lib.db import Database
import psycopg2.extras
import os
import boto3


def send_message(url, attr, body, delay=0):
    cli.send_message(
        QueueUrl=url,
        DelaySeconds=0,
        MessageAttributes=attr,
        MessageBody=body,
    )


class SendNotifyBySQSController(Resource):
    cli = boto3.client("sqs", region_name=os.environ("region"))

    def post(self):
        parser = reqparse.RequestParser()
        parser.add_argument(
            'message', required=True, help='message can not be blank!')
        args = parser.parse_args()
        msg = args['message']
        with Database() as db, db.connect() as conn:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute(
                    f"SELECT token FROM notify")
                fetch = cur.fetchall()
        for f in fetch:
            body = {
                'token': f"Bearer {f['token']}",
                'message': f"Hello everyone, {msg}"
            }
            cli.send_message(
                QueueUrl=os.environ("SQS_URL"),
                DelaySeconds=0,
                MessageAttributes={},
                MessageBody=json.dumps(body),
            )
        return {'result': 'ok'}, 200

程式寫完了就是要加一條路由/notify/sqs

from controller.notify_sqs_controller import SendNotifyBySQSController
api.add_resource(SendNotifyBySQSController, '/notify/sqs')

接著透過wsgi在本地起一個 server

sls wsgi serve

再搭配 postman 來做測試,測試內容如下

{
  "message": "test Content"
}

接著透過sls deploy部署上會遇到一個問題,會有 Access Denied,所以要在serverless.yml加入 IAM role 的設定

add iam in provider

iamRoleStatements:
  - Effect: Allow
    Action:
      - sqs:SendMessage
    Resource:
      - ${env:SQS_ARN}

測試


結論

使用 SQS 這類服務都會需要透過boto3來幫忙串接,最需要注意的就是 IAM role,因為在本地端的 key 通常權限都是最大的,但上到 AWS 上就會有權限的問題,所以要記得加入 IAM 哦!

Code is here

專案也會持續更新,更多詳情可以 follow 我的專案 aws-python-line-api


上一篇
Day 8 - SQS 簡單介紹 & 建立
下一篇
Day 10 - LINE 的服務都玩了,怎麼能忘了 Message api (Bot) 呢?(1) - 介紹
系列文
一步步帶你了解 AWS & LINE API 並使用 Serverless 介接的各種應用30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言