在 Serverless 架構中,AWS SQS(Simple Queue Service)搭配 Dead Letter Queue 是常見處理訊息和請求的設計,可以設定一定的 retry 次數,讓失敗的訊息可以自動修復,也可在嘗試幾次之後,把訊息丟到 Dead Letter Queue ,讓開發人員人工介入查看問題或等待系統穩定再重試。
而開發人員要查看 Queue 的狀況,或者要執行 Dead Letter Queue 裡面的信息,必須透過 AWS console ,或者透過一些 Devops 工具( ex: github action 或 Jenkins ),並搭配 SQS 的 API ,才能對 SQS 做 operation ,往往需要學習操作流程,才能正確執行指定的操作。
如果使用 bedrock agent ,也許可以讓開發人員或 SRE 更直覺的使用自然語言去做這些 operation ,而且不只可以針對 SQS ,也有機會把不同的 AWS service 的 operation 融合進來,變成一個可以簡單上手的工具。
bedrock agent 的流程和設定跟前面幾篇一樣,主要差別都是在 Instructions 、 Action group 的設定和 Lambda 的程式碼。
Instructions 可以參考下面的寫法,這個 Agent 有兩個功能,一個是查看 queue 的狀況,一個是重新執行 Dead Letter Queue 裡面的訊息,所以會有一個 action 的參數讓 Lambda 知道要做哪一件事。
這是一個存取sqs的agent,主要功能是查看sqs當前的狀況,看有沒有message囤積,或者透過這個agent去把dlq的message從新丟回原本的queue執行,如果是要看sqs的狀況,action是1,要重新執行source_queue_arn的dlq的message,action是2
要查詢和 retry queue 的狀況需要知道 queue 的 arn,所以設了 source_queue_arn
這個參數。另外,因為這個 agent 有兩個需求,所以還設了 action
這個參數,如同 Instructions 所寫,AI 會根據使用者的描述,自動把 action
填 1 或 2 。
Lambda 的部分,程式碼可以參考下面的範例。
import logging
import boto3
import json
from typing import Dict, Any
from http import HTTPStatus
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.setLevel(logging.INFO)
sqs = boto3.client('sqs')
def parse_arn(arn: str) -> Dict[str, str]:
parts = arn.split(':')
if len(parts) != 6 or parts[2] != "sqs":
raise ValueError("Invalid SQS ARN")
return {"region": parts[3], "account_id": parts[4], "queue_name": parts[5]}
def queue_url_from_arn(arn: str) -> str:
p = parse_arn(arn)
return f"https://sqs.{p['region']}.amazonaws.com/{p['account_id']}/{p['queue_name']}"
def get_queue_info(queue_arn: str) -> Dict[str, Any]:
url = queue_url_from_arn(queue_arn)
attrs = sqs.get_queue_attributes(QueueUrl=url, AttributeNames=['All'])['Attributes']
info = {
"arn": queue_arn,
"queue_url": url,
"attributes": attrs
}
rp = attrs.get("RedrivePolicy")
if rp:
try:
dead_letter_arn = json.loads(rp).get("deadLetterTargetArn")
if dead_letter_arn:
info["dlq"] = get_queue_info(dead_letter_arn)
except Exception as e:
info["dlq_error"] = str(e)
return info
def start_redrive_from_main(main_arn: str, max_msgs: int = None) -> Dict[str, Any]:
# 從主佇列 RedrivePolicy 取得 dlq_arn
main_info = get_queue_info(main_arn)
rp = main_info.get("attributes", {}).get("RedrivePolicy")
dlq_arn = None
if rp:
try:
dlq_arn = json.loads(rp).get("deadLetterTargetArn")
except Exception:
pass
if not dlq_arn:
return {"error": "No DLQ configured on main queue", "main_arn": main_arn}
params = {"SourceArn": dlq_arn, "DestinationArn": main_arn}
if max_msgs:
params["MaxNumberOfMessagesPerSecond"] = max_msgs
try:
resp = sqs.start_message_move_task(**params)
return {
"task_handle": resp["TaskHandle"],
"source_dlq_arn": dlq_arn,
"destination_main_arn": main_arn
}
except ClientError as e:
return {"error": str(e), "params": params}
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
try:
action_group = event['actionGroup']
function = event['function']
message_version = event.get('messageVersion', 1)
params = {p['name']: p['value'] for p in event.get("parameters", [])}
action = str(params.get("action", "1")).strip()
response_body = {}
if action == "1":
queue_arn = params.get("source_queue_arn")
if not queue_arn:
raise ValueError("Missing source_queue_arn")
result = get_queue_info(queue_arn)
response_body = {
'TEXT': {
'body': json.dumps({
'message': f'Successfully retrieved SQS information for function {function}',
'queue_info': result
}, indent=2, ensure_ascii=False)
}
}
elif action == "2":
main_arn = params.get("source_queue_arn")
max_msgs = int(params["max_messages"]) if "max_messages" in params and params["max_messages"] else 10
if not main_arn:
raise ValueError("Missing source_queue_arn")
result = start_redrive_from_main(main_arn, max_msgs)
response_body = {
'TEXT': {
'body': json.dumps({
'message': f'DLQ redrive operation initiated for function {function}',
'redrive_result': result
}, indent=2, ensure_ascii=False)
}
}
else:
response_body = {
'TEXT': {
'body': json.dumps({'error': f'Unknown action {action}'})
}
}
action_response = {
'actionGroup': action_group,
'function': function,
'functionResponse': {
'responseBody': response_body
}
}
return {
'response': action_response,
'messageVersion': message_version
}
except Exception as e:
logger.error(str(e))
return {
'statusCode': HTTPStatus.BAD_REQUEST,
'body': f"Error: {str(e)}"
}
Lambda 的 role ,除了寫 Log 需要有 logs
的權限之外,因為需要讀取 queue 資訊,所以需要 SQS 的 GetQueueAttributes
這個權限,重新執行 message 則需要 StartMessageMoveTask
、 ListMessageMoveTasks
、 receivemessage
、 deletemessage
和 sendmessage
這些權限,基本上就是一些對 message 操作的權限。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "CreateLogGroup",
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:us-east-1:262969866776:*"
},
{
"Sid": "CreateLogStreamAndPutLogEvents",
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:262969866776:log-group:/aws/lambda/action_group_for_sqs-bresg:*"
]
},
{
"Sid": "CheckSQSStatusAndRetry",
"Effect": "Allow",
"Action": [
"sqs:GetQueueAttributes",
"sqs:StartMessageMoveTask",
"sqs:ListMessageMoveTasks",
"sqs:receivemessage",
"sqs:deletemessage",
"sqs:sendmessage"
],
"Resource": "*"
}
]
}
Reference: StartMessageMoveTask
設定好之後,就可以測試了,明天再繼續來測試流程。