歡迎來到第三週的學習!今天我們要探索自動化工作流程設計,學習如何讓 AI 助理自動執行複雜的多步驟任務,提升工作效率和準確性。
自動化工作流程是將重複性的多步驟任務標準化,讓系統能夠:
常見的自動化場景包括:郵件處理、報告生成、資料同步、內容審核等。
workflow_automation/
├── main.py # 主程式
├── core/
│ ├── __init__.py
│ ├── workflow_engine.py # 工作流程引擎
│ └── task_scheduler.py # 任務排程器
├── workflows/
│ ├── __init__.py
│ ├── email_automation.py # 郵件自動化
│ ├── report_generator.py # 報告生成
│ └── data_processor.py # 資料處理
├── triggers/
│ ├── __init__.py
│ ├── time_trigger.py # 時間觸發器
│ └── event_trigger.py # 事件觸發器
└── utils/
├── __init__.py
└── workflow_builder.py # 工作流程建構器
from typing import Dict, List, Any, Callable, Optional
from datetime import datetime
import json
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class WorkflowTask:
"""工作流程任務"""
id: str
name: str
function: Callable
inputs: Dict[str, Any]
outputs: Dict[str, Any] = None
status: TaskStatus = TaskStatus.PENDING
error_message: str = ""
execution_time: float = 0.0
depends_on: List[str] = None
def __post_init__(self):
if self.depends_on is None:
self.depends_on = []
if self.outputs is None:
self.outputs = {}
class WorkflowEngine:
"""簡單的工作流程引擎"""
def __init__(self):
self.workflows = {}
self.execution_history = []
def create_workflow(self, workflow_id: str, tasks: List[WorkflowTask]) -> None:
"""創建工作流程"""
self.workflows[workflow_id] = {
'tasks': {task.id: task for task in tasks},
'created_at': datetime.now(),
'execution_count': 0
}
def execute_workflow(self, workflow_id: str, initial_inputs: Dict[str, Any] = None) -> Dict[str, Any]:
"""執行工作流程"""
if workflow_id not in self.workflows:
raise ValueError(f"工作流程 {workflow_id} 不存在")
workflow = self.workflows[workflow_id]
tasks = workflow['tasks'].copy()
execution_context = initial_inputs or {}
execution_log = {
'workflow_id': workflow_id,
'start_time': datetime.now(),
'tasks_executed': [],
'final_outputs': {},
'success': False
}
try:
# 執行任務
executed_tasks = self._execute_tasks_in_order(tasks, execution_context, execution_log)
execution_log['success'] = True
execution_log['final_outputs'] = execution_context
workflow['execution_count'] += 1
return {
'success': True,
'outputs': execution_context,
'executed_tasks': executed_tasks,
'execution_log': execution_log
}
except Exception as e:
execution_log['error'] = str(e)
execution_log['end_time'] = datetime.now()
return {
'success': False,
'error': str(e),
'execution_log': execution_log
}
finally:
execution_log['end_time'] = datetime.now()
self.execution_history.append(execution_log)
def _execute_tasks_in_order(self, tasks: Dict[str, WorkflowTask],
context: Dict[str, Any],
execution_log: Dict) -> List[str]:
"""按依賴順序執行任務"""
executed = []
remaining = list(tasks.keys())
while remaining:
ready_tasks = []
# 找出可以執行的任務(依賴已滿足)
for task_id in remaining:
task = tasks[task_id]
if all(dep in executed for dep in task.depends_on):
ready_tasks.append(task_id)
if not ready_tasks:
raise Exception("工作流程中存在循環依賴或無法執行的任務")
# 執行準備好的任務
for task_id in ready_tasks:
task = tasks[task_id]
self._execute_single_task(task, context)
executed.append(task_id)
remaining.remove(task_id)
execution_log['tasks_executed'].append({
'task_id': task_id,
'task_name': task.name,
'status': task.status.value,
'execution_time': task.execution_time,
'error_message': task.error_message
})
return executed
def _execute_single_task(self, task: WorkflowTask, context: Dict[str, Any]) -> None:
"""執行單個任務"""
start_time = datetime.now()
task.status = TaskStatus.RUNNING
try:
print(f"⚡ 執行任務: {task.name}")
# 準備任務輸入
task_inputs = {}
for key, value in task.inputs.items():
if isinstance(value, str) and value.startswith('$'):
# 從上下文中獲取值
context_key = value[1:]
task_inputs[key] = context.get(context_key, value)
else:
task_inputs[key] = value
# 執行任務函數
result = task.function(**task_inputs)
# 處理輸出
if isinstance(result, dict):
task.outputs = result
context.update(result)
else:
task.outputs = {'result': result}
context['result'] = result
task.status = TaskStatus.COMPLETED
print(f"✅ 任務完成: {task.name}")
except Exception as e:
task.status = TaskStatus.FAILED
task.error_message = str(e)
print(f"❌ 任務失敗: {task.name} - {e}")
raise e
finally:
end_time = datetime.now()
task.execution_time = (end_time - start_time).total_seconds()
def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
"""獲取工作流程狀態"""
if workflow_id not in self.workflows:
return {'exists': False}
workflow = self.workflows[workflow_id]
return {
'exists': True,
'created_at': workflow['created_at'],
'execution_count': workflow['execution_count'],
'task_count': len(workflow['tasks'])
}
from core.workflow_engine import WorkflowEngine, WorkflowTask
import google.generativeai as genai
import os
from typing import Dict, Any, List
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
model = genai.GenerativeModel('gemini-pro')
class EmailAutomation:
"""郵件自動化處理"""
def __init__(self):
self.engine = WorkflowEngine()
self._setup_email_workflows()
def _setup_email_workflows(self):
"""設置郵件處理工作流程"""
# 客戶諮詢回覆工作流程
inquiry_tasks = [
WorkflowTask(
id="classify_inquiry",
name="分類客戶諮詢",
function=self.classify_email,
inputs={"email_content": "$email_content"}
),
WorkflowTask(
id="generate_response",
name="生成回覆內容",
function=self.generate_email_response,
inputs={"email_content": "$email_content", "category": "$category"},
depends_on=["classify_inquiry"]
),
WorkflowTask(
id="format_email",
name="格式化郵件",
function=self.format_professional_email,
inputs={"response_content": "$response_content"},
depends_on=["generate_response"]
)
]
self.engine.create_workflow("customer_inquiry", inquiry_tasks)
# 報告生成工作流程
report_tasks = [
WorkflowTask(
id="collect_data",
name="收集數據",
function=self.collect_report_data,
inputs={"report_type": "$report_type", "date_range": "$date_range"}
),
WorkflowTask(
id="analyze_data",
name="分析數據",
function=self.analyze_collected_data,
inputs={"raw_data": "$raw_data"},
depends_on=["collect_data"]
),
WorkflowTask(
id="generate_report",
name="生成報告",
function=self.generate_analysis_report,
inputs={"analysis_results": "$analysis_results", "report_type": "$report_type"},
depends_on=["analyze_data"]
)
]
self.engine.create_workflow("generate_report", report_tasks)
def classify_email(self, email_content: str) -> Dict[str, Any]:
"""分類郵件內容"""
prompt = f"""
請分析以下郵件內容並分類:
郵件內容:{email_content}
請判斷這是哪種類型的郵件:
- inquiry: 一般諮詢
- complaint: 投訴
- technical: 技術問題
- order: 訂單相關
只回答類別名稱。
"""
try:
response = model.generate_content(prompt)
category = response.text.strip().lower()
return {
"category": category,
"confidence": 0.8
}
except Exception as e:
print(f"郵件分類失敗: {e}")
return {
"category": "inquiry",
"confidence": 0.5
}
def generate_email_response(self, email_content: str, category: str) -> Dict[str, Any]:
"""生成郵件回覆"""
category_prompts = {
"inquiry": "請以專業、友善的語氣回覆客戶諮詢",
"complaint": "請以同理心和解決問題的態度回覆客戶投訴",
"technical": "請提供技術支援和解決方案",
"order": "請協助客戶處理訂單相關問題"
}
prompt_style = category_prompts.get(category, category_prompts["inquiry"])
prompt = f"""
{prompt_style},針對以下郵件內容:
{email_content}
請生成一封專業的回覆郵件內容,包括:
1. 適當的開頭問候
2. 針對問題的具體回應
3. 後續行動建議
4. 專業的結尾
"""
try:
response = model.generate_content(prompt)
return {
"response_content": response.text,
"category": category
}
except Exception as e:
return {
"response_content": f"感謝您的來信,我們已收到您的{category},會儘快為您處理。",
"category": category
}
def format_professional_email(self, response_content: str) -> Dict[str, Any]:
"""格式化專業郵件"""
formatted_email = f"""
主旨:Re: 您的諮詢 - 我們的回覆
{response_content}
如有任何其他問題,歡迎隨時聯繫我們。
祝好,
客服團隊
"""
return {
"formatted_email": formatted_email,
"ready_to_send": True
}
def collect_report_data(self, report_type: str, date_range: str) -> Dict[str, Any]:
"""收集報告數據(模擬)"""
# 模擬數據收集
sample_data = {
"daily": {
"total_emails": 150,
"resolved": 142,
"pending": 8,
"categories": {"inquiry": 60, "complaint": 25, "technical": 40, "order": 25}
},
"weekly": {
"total_emails": 1050,
"resolved": 994,
"pending": 56,
"categories": {"inquiry": 420, "complaint": 175, "technical": 280, "order": 175}
}
}
return {
"raw_data": sample_data.get(report_type, sample_data["daily"]),
"collection_time": "2024-01-15 10:00:00"
}
def analyze_collected_data(self, raw_data: Dict) -> Dict[str, Any]:
"""分析收集的數據"""
total = raw_data["total_emails"]
resolved = raw_data["resolved"]
resolution_rate = (resolved / total) * 100 if total > 0 else 0
# 找出最多的問題類型
categories = raw_data["categories"]
top_category = max(categories, key=categories.get)
return {
"analysis_results": {
"resolution_rate": resolution_rate,
"top_category": top_category,
"total_processed": total,
"efficiency_score": "高" if resolution_rate > 90 else "中等" if resolution_rate > 80 else "需改善"
}
}
def generate_analysis_report(self, analysis_results: Dict, report_type: str) -> Dict[str, Any]:
"""生成分析報告"""
results = analysis_results
report = f"""
📊 {report_type.upper()} 客服績效報告
📈 關鍵指標:
• 總處理量:{results['total_processed']} 件
• 解決率:{results['resolution_rate']:.1f}%
• 主要問題類型:{results['top_category']}
• 效率評級:{results['efficiency_score']}
💡 建議:
"""
if results['resolution_rate'] < 90:
report += "• 提升處理效率,減少待處理案件\n"
if results['top_category'] == 'complaint':
report += "• 關注投訴問題,改善服務品質\n"
report += "• 持續優化自動化流程"
return {
"final_report": report,
"report_generated_at": "2024-01-15 10:30:00"
}
def run_email_workflow(self, workflow_type: str, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""執行郵件工作流程"""
print(f"🚀 啟動 {workflow_type} 工作流程...")
result = self.engine.execute_workflow(workflow_type, inputs)
if result['success']:
print("✅ 工作流程執行完成!")
else:
print(f"❌ 工作流程執行失敗:{result['error']}")
return result
from workflows.email_automation import EmailAutomation
def main():
"""自動化工作流程示例"""
print("🔄 自動化工作流程系統")
print("⚡ 支援郵件處理、報告生成等自動化任務")
print("=" * 50)
automation = EmailAutomation()
while True:
print("\n選擇自動化工作流程:")
print("1. 客戶諮詢郵件處理")
print("2. 生成績效報告")
print("3. 查看工作流程狀態")
print("4. 退出")
choice = input("\n請選擇 (1-4):").strip()
try:
if choice == '1':
print("\n📧 客戶諮詢郵件自動處理")
email_content = input("請輸入郵件內容:").strip()
if email_content:
inputs = {"email_content": email_content}
result = automation.run_email_workflow("customer_inquiry", inputs)
if result['success']:
final_email = result['outputs'].get('formatted_email', '')
print(f"\n✉️ 生成的回覆郵件:\n{final_email}")
else:
print("❌ 請輸入郵件內容")
elif choice == '2':
print("\n📊 自動生成績效報告")
report_type = input("報告類型 (daily/weekly):").strip() or "daily"
date_range = input("日期範圍:").strip() or "2024-01-15"
inputs = {
"report_type": report_type,
"date_range": date_range
}
result = automation.run_email_workflow("generate_report", inputs)
if result['success']:
report = result['outputs'].get('final_report', '')
print(f"\n📋 生成的報告:\n{report}")
elif choice == '3':
print("\n📈 工作流程狀態")
workflows = ["customer_inquiry", "generate_report"]
for wf_id in workflows:
status = automation.engine.get_workflow_status(wf_id)
if status['exists']:
print(f"• {wf_id}: 已執行 {status['execution_count']} 次")
else:
print(f"• {wf_id}: 不存在")
elif choice == '4':
print("👋 再見!")
break
else:
print("❌ 無效的選擇")
except KeyboardInterrupt:
print("\n👋 再見!")
break
except Exception as e:
print(f"❌ 發生錯誤:{e}")
if __name__ == "__main__":
main()
✅ 任務依賴管理:自動處理任務間的執行順序
✅ 錯誤處理:單個任務失敗不影響整體流程追蹤
✅ 執行記錄:完整的執行歷史和結果記錄
✅ 靈活配置:可輕鬆添加新的工作流程
✅ 實用場景:郵件處理、報告生成等常見自動化需求
選擇自動化工作流程:
1. 客戶諮詢郵件處理
請輸入郵件內容:我想了解你們的產品價格
🚀 啟動 customer_inquiry 工作流程...
⚡ 執行任務: 分類客戶諮詢
✅ 任務完成: 分類客戶諮詢
⚡ 執行任務: 生成回覆內容
✅ 任務完成: 生成回覆內容
⚡ 執行任務: 格式化郵件
✅ 任務完成: 格式化郵件
✅ 工作流程執行完成!
✉️ 生成的回覆郵件:
主旨:Re: 您的諮詢 - 我們的回覆
感謝您對我們產品的關注!關於產品價格...
今天我們建立了基礎的自動化工作流程系統,讓 AI 助理能夠自動執行多步驟任務。明天我們將學習條件判斷與分支邏輯,讓工作流程變得更智能!