在現代雲端架構中,成本監控和分析對企業降低成本非常重要, AWS 提供了豐富的計費資訊,但如何有效地自動化抓取這些資料並進行分析和比較,一直是許多開發人員和管理人員面臨的挑戰。
因為除了監控之外,企業往往有自己的報表格式,要如何把 AWS 提供的報表,轉換成符合企業的報表,往往會耗費許多時間,所以本文將分享如何使用 AWS Bedrock Agent 搭配 Lambda 實現自動化撈取 Billing Report ,並轉換成企業可以使用的報表。
可以先照著如何使用 AWS bedrock agent 查詢 DynamoDB 資料這篇文章設定好 Bedrock agent ,並調整 Instructions for the Agent
和指定對應的 Lambda 即可。
Instructions 只要調整像下面這樣,明確說明這個 agent 是為了產出 report ,即可觸發 agent 背後的 Lambda 去撈取資料。
這是一個幫忙開發人員分析 billing 的 agent ,可以幫助開發人員分析 service 的 cost ,產出每月報表,並進行 cost down
因為撈取 billing report 需要指定要取得資料的區間,所以這部分多設定了 Parameters
,指定 month
和 year
,讓 Lambda 可以根據使用者要求的時間區間撈資料。
Lambda 的部分,程式碼可以參考下面的範例。
import json
import boto3
from datetime import datetime, timedelta
import calendar
import logging
from typing import Dict, Any, List
# 設定日誌
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class BillingAnalyzer:
def __init__(self):
try:
# 初始化 AWS Cost Explorer 客戶端
self.ce_client = boto3.client('ce', region_name='us-east-1')
except Exception as e:
logger.error(f"Failed to initialize AWS clients: {str(e)}")
raise
def parse_bedrock_parameters(self, parameters: List[Dict]) -> Dict[str, str]:
"""解析 Bedrock 傳入的參數"""
params = {}
try:
for param in parameters:
name = param.get('name')
value = param.get('value')
if name and value:
params[name] = value
logger.info(f"Parsed parameters: {params}")
return params
except Exception as e:
logger.error(f"Error parsing parameters: {str(e)}")
return {}
def get_date_range_from_params(self, year: str, month: str) -> Dict[str, str]:
"""根據年月參數計算日期範圍"""
try:
year_int = int(year)
month_int = int(month)
# 取得該月份的第一天
start_date = datetime(year_int, month_int, 1)
# 取得該月份的最後一天
last_day = calendar.monthrange(year_int, month_int)[1]
end_date = datetime(year_int, month_int, last_day)
# 格式化為 AWS Cost Explorer 需要的格式
start_str = start_date.strftime('%Y-%m-%d')
end_str = (end_date + timedelta(days=1)).strftime('%Y-%m-%d') # CE API 需要 exclusive end date
return {
'start': start_str,
'end': end_str,
'year': year,
'month': month,
'month_name': start_date.strftime('%B')
}
except ValueError as e:
logger.error(f"Invalid date parameters: year={year}, month={month}, error={str(e)}")
raise ValueError(f"Invalid date parameters: year={year}, month={month}")
def get_monthly_service_costs(self, year: str, month: str) -> Dict[str, Any]:
"""獲取指定年月的各服務成本分析"""
try:
date_range = self.get_date_range_from_params(year, month)
response = self.ce_client.get_cost_and_usage(
TimePeriod={
'Start': date_range['start'],
'End': date_range['end']
},
Granularity='MONTHLY',
Metrics=['BlendedCost', 'UsageQuantity'],
GroupBy=[
{'Type': 'DIMENSION', 'Key': 'SERVICE'}
]
)
# 處理數據,提取服務成本
service_costs = []
total_cost = 0
if 'ResultsByTime' in response and response['ResultsByTime']:
for result in response['ResultsByTime']:
for group in result.get('Groups', []):
service_name = group['Keys'][0]
cost = float(group['Metrics']['BlendedCost']['Amount'])
usage_quantity = float(group['Metrics']['UsageQuantity']['Amount']) if 'UsageQuantity' in group['Metrics'] else 0
currency = group['Metrics']['BlendedCost']['Unit']
if cost > 0: # 只包含有成本的服務
service_costs.append({
'service': service_name,
'cost': round(cost, 2),
'usage_quantity': round(usage_quantity, 2),
'currency': currency
})
total_cost += cost
# 按成本排序
service_costs.sort(key=lambda x: x['cost'], reverse=True)
return {
'status': 'success',
'period': {
'year': date_range['year'],
'month': date_range['month'],
'month_name': date_range['month_name'],
'start_date': date_range['start'],
'end_date': date_range['end']
},
'total_cost': round(total_cost, 2),
'services': service_costs,
'total_services': len(service_costs),
'currency': 'USD'
}
except Exception as e:
logger.error(f"Error getting monthly service costs: {str(e)}")
return {
'status': 'error',
'error': str(e)
}
def get_daily_costs_for_month(self, year: str, month: str) -> Dict[str, Any]:
"""獲取指定年月的每日成本趨勢"""
try:
date_range = self.get_date_range_from_params(year, month)
response = self.ce_client.get_cost_and_usage(
TimePeriod={
'Start': date_range['start'],
'End': date_range['end']
},
Granularity='DAILY',
Metrics=['BlendedCost']
)
daily_costs = []
total_monthly_cost = 0
if 'ResultsByTime' in response:
for result in response['ResultsByTime']:
date = result['TimePeriod']['Start']
cost = float(result['Total']['BlendedCost']['Amount'])
daily_costs.append({
'date': date,
'cost': round(cost, 2)
})
total_monthly_cost += cost
# 計算平均每日成本
avg_daily_cost = round(total_monthly_cost / len(daily_costs), 2) if daily_costs else 0
return {
'status': 'success',
'period': {
'year': date_range['year'],
'month': date_range['month'],
'month_name': date_range['month_name']
},
'daily_costs': daily_costs,
'total_monthly_cost': round(total_monthly_cost, 2),
'average_daily_cost': avg_daily_cost,
'total_days': len(daily_costs)
}
except Exception as e:
logger.error(f"Error getting daily costs: {str(e)}")
return {
'status': 'error',
'error': str(e)
}
def get_cost_by_region(self, year: str, month: str) -> Dict[str, Any]:
"""獲取指定年月的各區域成本分析"""
try:
date_range = self.get_date_range_from_params(year, month)
response = self.ce_client.get_cost_and_usage(
TimePeriod={
'Start': date_range['start'],
'End': date_range['end']
},
Granularity='MONTHLY',
Metrics=['BlendedCost'],
GroupBy=[
{'Type': 'DIMENSION', 'Key': 'REGION'}
]
)
region_costs = []
total_cost = 0
if 'ResultsByTime' in response and response['ResultsByTime']:
for result in response['ResultsByTime']:
for group in result.get('Groups', []):
region_name = group['Keys'][0] or 'Global'
cost = float(group['Metrics']['BlendedCost']['Amount'])
if cost > 0:
region_costs.append({
'region': region_name,
'cost': round(cost, 2)
})
total_cost += cost
# 按成本排序
region_costs.sort(key=lambda x: x['cost'], reverse=True)
return {
'status': 'success',
'period': {
'year': date_range['year'],
'month': date_range['month'],
'month_name': date_range['month_name']
},
'total_cost': round(total_cost, 2),
'regions': region_costs,
'total_regions': len(region_costs)
}
except Exception as e:
logger.error(f"Error getting region costs: {str(e)}")
return {
'status': 'error',
'error': str(e)
}
def generate_monthly_summary(self, year: str, month: str) -> Dict[str, Any]:
"""生成指定年月的完整成本摘要"""
try:
service_data = self.get_monthly_service_costs(year, month)
daily_data = self.get_daily_costs_for_month(year, month)
region_data = self.get_cost_by_region(year, month)
summary = {
'status': 'success',
'report_period': {
'year': year,
'month': month,
'month_name': service_data.get('period', {}).get('month_name', 'Unknown')
},
'cost_summary': {},
'top_services': [],
'top_regions': [],
'daily_trend': []
}
if service_data.get('status') == 'success':
summary['cost_summary']['total_cost'] = service_data.get('total_cost', 0)
summary['cost_summary']['total_services'] = service_data.get('total_services', 0)
summary['top_services'] = service_data.get('services', [])[:5] # 前5名服務
if region_data.get('status') == 'success':
summary['top_regions'] = region_data.get('regions', [])[:5] # 前5名區域
if daily_data.get('status') == 'success':
summary['cost_summary']['average_daily_cost'] = daily_data.get('average_daily_cost', 0)
summary['daily_trend'] = daily_data.get('daily_costs', [])
return summary
except Exception as e:
logger.error(f"Error generating monthly summary: {str(e)}")
return {
'status': 'error',
'error': str(e)
}
def lambda_handler(event, context):
"""Lambda 函數入口點 - 處理來自 Bedrock Agent 的事件"""
logger.info(f"Received event: {json.dumps(event)}")
# 從事件中提取必要資訊(保持原始值不變)
action_group = event.get("actionGroup", "")
api_path = event.get("apiPath", "")
http_method = event.get("httpMethod", "")
try:
# 初始化分析器
analyzer = BillingAnalyzer()
# 解析 Bedrock 參數
parameters = event.get('parameters', [])
if not parameters:
raise ValueError('Missing required parameters: month and year')
# 解析參數
params = analyzer.parse_bedrock_parameters(parameters)
year = params.get('year')
month = params.get('month')
if not year or not month:
raise ValueError(f'Missing required parameters. Expected year and month, got: {params}')
# 驗證參數
try:
year_int = int(year)
month_int = int(month)
if not (1 <= month_int <= 12):
raise ValueError("Month must be between 1 and 12")
if not (2000 <= year_int <= 2030):
raise ValueError("Year must be between 2000 and 2030")
except ValueError as ve:
raise ValueError(f"Invalid parameter values: {str(ve)}")
# 根據 API path 決定報表類型
report_type = 'summary' # 預設
if api_path: # 如果有 API path,根據路徑決定類型
if 'services' in api_path.lower():
report_type = 'services'
elif 'daily' in api_path.lower():
report_type = 'daily'
elif 'regions' in api_path.lower():
report_type = 'regions'
# 建構結果資料
result_data = {
'timestamp': datetime.now().isoformat(),
'request_parameters': {
'year': year,
'month': month,
'report_type': report_type
}
}
# 根據報表類型生成對應的資料
if report_type == 'services':
logger.info(f"Getting service costs for {year}-{month}")
result_data['analysis'] = analyzer.get_monthly_service_costs(year, month)
elif report_type == 'daily':
logger.info(f"Getting daily costs for {year}-{month}")
result_data['analysis'] = analyzer.get_daily_costs_for_month(year, month)
elif report_type == 'regions':
logger.info(f"Getting region costs for {year}-{month}")
result_data['analysis'] = analyzer.get_cost_by_region(year, month)
else: # summary (預設)
logger.info(f"Getting billing summary for {year}-{month}")
result_data['analysis'] = analyzer.generate_monthly_summary(year, month)
function_response = {
'actionGroup': event['actionGroup'],
'function': event['function'],
'functionResponse': {
'responseBody': {
'TEXT': {
'body': json.dumps(result_data, ensure_ascii=False, default=str)
}
}
}
}
session_attributes = event['sessionAttributes']
prompt_session_attributes = event['promptSessionAttributes']
action_response = {
'messageVersion': '1.0',
'response': function_response,
'sessionAttributes': session_attributes,
'promptSessionAttributes': prompt_session_attributes
}
logger.info(action_response)
return action_response
except Exception as e:
logger.error(f"Lambda execution error: {str(e)}")
# 錯誤回應也必須使用原始的事件值
error_data = {
'error': 'Processing failed',
'message': str(e),
'timestamp': datetime.now().isoformat()
}
error_response = {
"messageVersion": "1.0",
"response": {
"actionGroup": action_group, # 使用原始值
'function': event['function'],
"functionResponse": {
"responseBody": {
"TEXT": {
"body": json.dumps(error_data, ensure_ascii=False, default=str)
}
}
}
}
}
return error_response
這邊要注意的是, Lambda 的 role 要有存取 billing report 的權限,可以在 role 加上 AWSBillingReadOnlyAccess
這個 policy 。
基本上只要設定好 Action group
和寫好 Lambda 的 code,以及加上權限,就可以完成這個 agent 了!明天接著來測試這個 Agent 吧!