iT邦幫忙

2025 iThome 鐵人賽

DAY 27
0
Build on AWS

亞馬遜熱帶雨林生存日記系列 第 27

Day 27: 如何使用 AWS bedrock agent 對 SQS 做 operation (上)

  • 分享至 

  • xImage
  •  

在 Serverless 架構中,AWS SQS(Simple Queue Service)搭配 Dead Letter Queue 是常見處理訊息和請求的設計,可以設定一定的 retry 次數,讓失敗的訊息可以自動修復,也可在嘗試幾次之後,把訊息丟到 Dead Letter Queue ,讓開發人員人工介入查看問題或等待系統穩定再重試。

而開發人員要查看 Queue 的狀況,或者要執行 Dead Letter Queue 裡面的信息,必須透過 AWS console ,或者透過一些 Devops 工具( ex: github action 或 Jenkins ),並搭配 SQS 的 API ,才能對 SQS 做 operation ,往往需要學習操作流程,才能正確執行指定的操作。

如果使用 bedrock agent ,也許可以讓開發人員或 SRE 更直覺的使用自然語言去做這些 operation ,而且不只可以針對 SQS ,也有機會把不同的 AWS service 的 operation 融合進來,變成一個可以簡單上手的工具。


設定 Instructions

bedrock agent 的流程和設定跟前面幾篇一樣,主要差別都是在 Instructions 、 Action group 的設定和 Lambda 的程式碼。

Instructions 可以參考下面的寫法,這個 Agent 有兩個功能,一個是查看 queue 的狀況,一個是重新執行 Dead Letter Queue 裡面的訊息,所以會有一個 action 的參數讓 Lambda 知道要做哪一件事。

這是一個存取sqs的agent,主要功能是查看sqs當前的狀況,看有沒有message囤積,或者透過這個agent去把dlq的message從新丟回原本的queue執行,如果是要看sqs的狀況,action是1,要重新執行source_queue_arn的dlq的message,action是2

設定 Action group

要查詢和 retry queue 的狀況需要知道 queue 的 arn,所以設了 source_queue_arn 這個參數。另外,因為這個 agent 有兩個需求,所以還設了 action 這個參數,如同 Instructions 所寫,AI 會根據使用者的描述,自動把 action 填 1 或 2 。

設定 Lambda

Lambda 的部分,程式碼可以參考下面的範例。

import logging
import boto3
import json
from typing import Dict, Any
from http import HTTPStatus
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)
sqs = boto3.client('sqs')

def parse_arn(arn: str) -> Dict[str, str]:
    parts = arn.split(':')
    if len(parts) != 6 or parts[2] != "sqs":
        raise ValueError("Invalid SQS ARN")
    return {"region": parts[3], "account_id": parts[4], "queue_name": parts[5]}

def queue_url_from_arn(arn: str) -> str:
    p = parse_arn(arn)
    return f"https://sqs.{p['region']}.amazonaws.com/{p['account_id']}/{p['queue_name']}"

def get_queue_info(queue_arn: str) -> Dict[str, Any]:
    url = queue_url_from_arn(queue_arn)
    attrs = sqs.get_queue_attributes(QueueUrl=url, AttributeNames=['All'])['Attributes']
    info = {
        "arn": queue_arn,
        "queue_url": url,
        "attributes": attrs
    }
    rp = attrs.get("RedrivePolicy")
    if rp:
        try:
            dead_letter_arn = json.loads(rp).get("deadLetterTargetArn")
            if dead_letter_arn:
                info["dlq"] = get_queue_info(dead_letter_arn)
        except Exception as e:
            info["dlq_error"] = str(e)
    return info

def start_redrive_from_main(main_arn: str, max_msgs: int = None) -> Dict[str, Any]:
    # 從主佇列 RedrivePolicy 取得 dlq_arn
    main_info = get_queue_info(main_arn)
    rp = main_info.get("attributes", {}).get("RedrivePolicy")
    dlq_arn = None
    if rp:
        try:
            dlq_arn = json.loads(rp).get("deadLetterTargetArn")
        except Exception:
            pass
    if not dlq_arn:
        return {"error": "No DLQ configured on main queue", "main_arn": main_arn}
    params = {"SourceArn": dlq_arn, "DestinationArn": main_arn}
    if max_msgs:
        params["MaxNumberOfMessagesPerSecond"] = max_msgs
    try:
        resp = sqs.start_message_move_task(**params)
        return {
            "task_handle": resp["TaskHandle"],
            "source_dlq_arn": dlq_arn,
            "destination_main_arn": main_arn
        }
    except ClientError as e:
        return {"error": str(e), "params": params}

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    try:
        action_group = event['actionGroup']
        function = event['function']
        message_version = event.get('messageVersion', 1)
        params = {p['name']: p['value'] for p in event.get("parameters", [])}
        action = str(params.get("action", "1")).strip()
        response_body = {}

        if action == "1":
            queue_arn = params.get("source_queue_arn")
            if not queue_arn:
                raise ValueError("Missing source_queue_arn")
            result = get_queue_info(queue_arn)
            response_body = {
                'TEXT': {
                    'body': json.dumps({
                        'message': f'Successfully retrieved SQS information for function {function}',
                        'queue_info': result
                    }, indent=2, ensure_ascii=False)
                }
            }
        elif action == "2":
            main_arn = params.get("source_queue_arn")
            max_msgs = int(params["max_messages"]) if "max_messages" in params and params["max_messages"] else 10
            if not main_arn:
                raise ValueError("Missing source_queue_arn")
            result = start_redrive_from_main(main_arn, max_msgs)
            response_body = {
                'TEXT': {
                    'body': json.dumps({
                        'message': f'DLQ redrive operation initiated for function {function}',
                        'redrive_result': result
                    }, indent=2, ensure_ascii=False)
                }
            }
        else:
            response_body = {
                'TEXT': {
                    'body': json.dumps({'error': f'Unknown action {action}'})
                }
            }

        action_response = {
            'actionGroup': action_group,
            'function': function,
            'functionResponse': {
                'responseBody': response_body
            }
        }
        return {
            'response': action_response,
            'messageVersion': message_version
        }
    except Exception as e:
        logger.error(str(e))
        return {
            'statusCode': HTTPStatus.BAD_REQUEST,
            'body': f"Error: {str(e)}"
        }

Lambda 的 role ,除了寫 Log 需要有 logs 的權限之外,因為需要讀取 queue 資訊,所以需要 SQS 的 GetQueueAttributes 這個權限,重新執行 message 則需要 StartMessageMoveTaskListMessageMoveTasksreceivemessagedeletemessagesendmessage 這些權限,基本上就是一些對 message 操作的權限。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CreateLogGroup",
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:us-east-1:262969866776:*"
        },
        {
            "Sid": "CreateLogStreamAndPutLogEvents",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:us-east-1:262969866776:log-group:/aws/lambda/action_group_for_sqs-bresg:*"
            ]
        },
        {
            "Sid": "CheckSQSStatusAndRetry",
            "Effect": "Allow",
            "Action": [
                "sqs:GetQueueAttributes",
                "sqs:StartMessageMoveTask",
                "sqs:ListMessageMoveTasks",
                "sqs:receivemessage",
                "sqs:deletemessage",
                "sqs:sendmessage"
            ],
            "Resource": "*"
        }
    ]
}

Reference: StartMessageMoveTask

設定好之後,就可以測試了,明天再繼續來測試流程。


上一篇
Day 26: 如何使用 AWS bedrock agent 分析 AWS 帳單 (下)
下一篇
Day 28: 如何使用 AWS bedrock agent 對 SQS 做 operation (下)
系列文
亞馬遜熱帶雨林生存日記28
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言