iT邦幫忙

2025 iThome 鐵人賽

DAY 15
0

Day 15: 自動化工作流程設計

歡迎來到第三週的學習!今天我們要探索自動化工作流程設計,學習如何讓 AI 助理自動執行複雜的多步驟任務,提升工作效率和準確性。

🔄 什麼是自動化工作流程?

自動化工作流程是將重複性的多步驟任務標準化,讓系統能夠:

  • 📋 自動執行:無需人工干預完成整個流程
  • 🎯 標準化處理:確保每次執行的一致性
  • 定時觸發:按計劃或條件自動啟動
  • 📊 結果追蹤:記錄執行過程和結果

常見的自動化場景包括:郵件處理、報告生成、資料同步、內容審核等。

🏗 專案結構

workflow_automation/
├── main.py                      # 主程式
├── core/
│   ├── __init__.py
│   ├── workflow_engine.py       # 工作流程引擎
│   └── task_scheduler.py        # 任務排程器
├── workflows/
│   ├── __init__.py
│   ├── email_automation.py      # 郵件自動化
│   ├── report_generator.py      # 報告生成
│   └── data_processor.py        # 資料處理
├── triggers/
│   ├── __init__.py
│   ├── time_trigger.py          # 時間觸發器
│   └── event_trigger.py         # 事件觸發器
└── utils/
    ├── __init__.py
    └── workflow_builder.py      # 工作流程建構器

🔧 核心實作

1. 工作流程引擎 (core/workflow_engine.py)

from typing import Dict, List, Any, Callable, Optional
from datetime import datetime
import json
from dataclasses import dataclass
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class WorkflowTask:
    """工作流程任務"""
    id: str
    name: str
    function: Callable
    inputs: Dict[str, Any]
    outputs: Dict[str, Any] = None
    status: TaskStatus = TaskStatus.PENDING
    error_message: str = ""
    execution_time: float = 0.0
    depends_on: List[str] = None

    def __post_init__(self):
        if self.depends_on is None:
            self.depends_on = []
        if self.outputs is None:
            self.outputs = {}

class WorkflowEngine:
    """簡單的工作流程引擎"""
    
    def __init__(self):
        self.workflows = {}
        self.execution_history = []
    
    def create_workflow(self, workflow_id: str, tasks: List[WorkflowTask]) -> None:
        """創建工作流程"""
        self.workflows[workflow_id] = {
            'tasks': {task.id: task for task in tasks},
            'created_at': datetime.now(),
            'execution_count': 0
        }
    
    def execute_workflow(self, workflow_id: str, initial_inputs: Dict[str, Any] = None) -> Dict[str, Any]:
        """執行工作流程"""
        if workflow_id not in self.workflows:
            raise ValueError(f"工作流程 {workflow_id} 不存在")
        
        workflow = self.workflows[workflow_id]
        tasks = workflow['tasks'].copy()
        execution_context = initial_inputs or {}
        
        execution_log = {
            'workflow_id': workflow_id,
            'start_time': datetime.now(),
            'tasks_executed': [],
            'final_outputs': {},
            'success': False
        }
        
        try:
            # 執行任務
            executed_tasks = self._execute_tasks_in_order(tasks, execution_context, execution_log)
            
            execution_log['success'] = True
            execution_log['final_outputs'] = execution_context
            workflow['execution_count'] += 1
            
            return {
                'success': True,
                'outputs': execution_context,
                'executed_tasks': executed_tasks,
                'execution_log': execution_log
            }
            
        except Exception as e:
            execution_log['error'] = str(e)
            execution_log['end_time'] = datetime.now()
            
            return {
                'success': False,
                'error': str(e),
                'execution_log': execution_log
            }
        finally:
            execution_log['end_time'] = datetime.now()
            self.execution_history.append(execution_log)
    
    def _execute_tasks_in_order(self, tasks: Dict[str, WorkflowTask], 
                               context: Dict[str, Any], 
                               execution_log: Dict) -> List[str]:
        """按依賴順序執行任務"""
        executed = []
        remaining = list(tasks.keys())
        
        while remaining:
            ready_tasks = []
            
            # 找出可以執行的任務(依賴已滿足)
            for task_id in remaining:
                task = tasks[task_id]
                if all(dep in executed for dep in task.depends_on):
                    ready_tasks.append(task_id)
            
            if not ready_tasks:
                raise Exception("工作流程中存在循環依賴或無法執行的任務")
            
            # 執行準備好的任務
            for task_id in ready_tasks:
                task = tasks[task_id]
                self._execute_single_task(task, context)
                executed.append(task_id)
                remaining.remove(task_id)
                
                execution_log['tasks_executed'].append({
                    'task_id': task_id,
                    'task_name': task.name,
                    'status': task.status.value,
                    'execution_time': task.execution_time,
                    'error_message': task.error_message
                })
        
        return executed
    
    def _execute_single_task(self, task: WorkflowTask, context: Dict[str, Any]) -> None:
        """執行單個任務"""
        start_time = datetime.now()
        task.status = TaskStatus.RUNNING
        
        try:
            print(f"⚡ 執行任務: {task.name}")
            
            # 準備任務輸入
            task_inputs = {}
            for key, value in task.inputs.items():
                if isinstance(value, str) and value.startswith('$'):
                    # 從上下文中獲取值
                    context_key = value[1:]
                    task_inputs[key] = context.get(context_key, value)
                else:
                    task_inputs[key] = value
            
            # 執行任務函數
            result = task.function(**task_inputs)
            
            # 處理輸出
            if isinstance(result, dict):
                task.outputs = result
                context.update(result)
            else:
                task.outputs = {'result': result}
                context['result'] = result
            
            task.status = TaskStatus.COMPLETED
            print(f"✅ 任務完成: {task.name}")
            
        except Exception as e:
            task.status = TaskStatus.FAILED
            task.error_message = str(e)
            print(f"❌ 任務失敗: {task.name} - {e}")
            raise e
        finally:
            end_time = datetime.now()
            task.execution_time = (end_time - start_time).total_seconds()
    
    def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
        """獲取工作流程狀態"""
        if workflow_id not in self.workflows:
            return {'exists': False}
        
        workflow = self.workflows[workflow_id]
        return {
            'exists': True,
            'created_at': workflow['created_at'],
            'execution_count': workflow['execution_count'],
            'task_count': len(workflow['tasks'])
        }

2. 郵件自動化工作流程 (workflows/email_automation.py)

from core.workflow_engine import WorkflowEngine, WorkflowTask
import google.generativeai as genai
import os
from typing import Dict, Any, List

genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
model = genai.GenerativeModel('gemini-pro')

class EmailAutomation:
    """郵件自動化處理"""
    
    def __init__(self):
        self.engine = WorkflowEngine()
        self._setup_email_workflows()
    
    def _setup_email_workflows(self):
        """設置郵件處理工作流程"""
        
        # 客戶諮詢回覆工作流程
        inquiry_tasks = [
            WorkflowTask(
                id="classify_inquiry",
                name="分類客戶諮詢",
                function=self.classify_email,
                inputs={"email_content": "$email_content"}
            ),
            WorkflowTask(
                id="generate_response",
                name="生成回覆內容",
                function=self.generate_email_response,
                inputs={"email_content": "$email_content", "category": "$category"},
                depends_on=["classify_inquiry"]
            ),
            WorkflowTask(
                id="format_email",
                name="格式化郵件",
                function=self.format_professional_email,
                inputs={"response_content": "$response_content"},
                depends_on=["generate_response"]
            )
        ]
        
        self.engine.create_workflow("customer_inquiry", inquiry_tasks)
        
        # 報告生成工作流程
        report_tasks = [
            WorkflowTask(
                id="collect_data",
                name="收集數據",
                function=self.collect_report_data,
                inputs={"report_type": "$report_type", "date_range": "$date_range"}
            ),
            WorkflowTask(
                id="analyze_data",
                name="分析數據",
                function=self.analyze_collected_data,
                inputs={"raw_data": "$raw_data"},
                depends_on=["collect_data"]
            ),
            WorkflowTask(
                id="generate_report",
                name="生成報告",
                function=self.generate_analysis_report,
                inputs={"analysis_results": "$analysis_results", "report_type": "$report_type"},
                depends_on=["analyze_data"]
            )
        ]
        
        self.engine.create_workflow("generate_report", report_tasks)
    
    def classify_email(self, email_content: str) -> Dict[str, Any]:
        """分類郵件內容"""
        prompt = f"""
        請分析以下郵件內容並分類:
        
        郵件內容:{email_content}
        
        請判斷這是哪種類型的郵件:
        - inquiry: 一般諮詢
        - complaint: 投訴
        - technical: 技術問題
        - order: 訂單相關
        
        只回答類別名稱。
        """
        
        try:
            response = model.generate_content(prompt)
            category = response.text.strip().lower()
            
            return {
                "category": category,
                "confidence": 0.8
            }
        except Exception as e:
            print(f"郵件分類失敗: {e}")
            return {
                "category": "inquiry",
                "confidence": 0.5
            }
    
    def generate_email_response(self, email_content: str, category: str) -> Dict[str, Any]:
        """生成郵件回覆"""
        category_prompts = {
            "inquiry": "請以專業、友善的語氣回覆客戶諮詢",
            "complaint": "請以同理心和解決問題的態度回覆客戶投訴",
            "technical": "請提供技術支援和解決方案",
            "order": "請協助客戶處理訂單相關問題"
        }
        
        prompt_style = category_prompts.get(category, category_prompts["inquiry"])
        
        prompt = f"""
        {prompt_style},針對以下郵件內容:
        
        {email_content}
        
        請生成一封專業的回覆郵件內容,包括:
        1. 適當的開頭問候
        2. 針對問題的具體回應
        3. 後續行動建議
        4. 專業的結尾
        """
        
        try:
            response = model.generate_content(prompt)
            return {
                "response_content": response.text,
                "category": category
            }
        except Exception as e:
            return {
                "response_content": f"感謝您的來信,我們已收到您的{category},會儘快為您處理。",
                "category": category
            }
    
    def format_professional_email(self, response_content: str) -> Dict[str, Any]:
        """格式化專業郵件"""
        formatted_email = f"""
主旨:Re: 您的諮詢 - 我們的回覆

{response_content}

如有任何其他問題,歡迎隨時聯繫我們。

祝好,
客服團隊
"""
        
        return {
            "formatted_email": formatted_email,
            "ready_to_send": True
        }
    
    def collect_report_data(self, report_type: str, date_range: str) -> Dict[str, Any]:
        """收集報告數據(模擬)"""
        # 模擬數據收集
        sample_data = {
            "daily": {
                "total_emails": 150,
                "resolved": 142,
                "pending": 8,
                "categories": {"inquiry": 60, "complaint": 25, "technical": 40, "order": 25}
            },
            "weekly": {
                "total_emails": 1050,
                "resolved": 994,
                "pending": 56,
                "categories": {"inquiry": 420, "complaint": 175, "technical": 280, "order": 175}
            }
        }
        
        return {
            "raw_data": sample_data.get(report_type, sample_data["daily"]),
            "collection_time": "2024-01-15 10:00:00"
        }
    
    def analyze_collected_data(self, raw_data: Dict) -> Dict[str, Any]:
        """分析收集的數據"""
        total = raw_data["total_emails"]
        resolved = raw_data["resolved"]
        resolution_rate = (resolved / total) * 100 if total > 0 else 0
        
        # 找出最多的問題類型
        categories = raw_data["categories"]
        top_category = max(categories, key=categories.get)
        
        return {
            "analysis_results": {
                "resolution_rate": resolution_rate,
                "top_category": top_category,
                "total_processed": total,
                "efficiency_score": "高" if resolution_rate > 90 else "中等" if resolution_rate > 80 else "需改善"
            }
        }
    
    def generate_analysis_report(self, analysis_results: Dict, report_type: str) -> Dict[str, Any]:
        """生成分析報告"""
        results = analysis_results
        
        report = f"""
📊 {report_type.upper()} 客服績效報告

📈 關鍵指標:
• 總處理量:{results['total_processed']} 件
• 解決率:{results['resolution_rate']:.1f}%
• 主要問題類型:{results['top_category']}
• 效率評級:{results['efficiency_score']}

💡 建議:
"""
        
        if results['resolution_rate'] < 90:
            report += "• 提升處理效率,減少待處理案件\n"
        if results['top_category'] == 'complaint':
            report += "• 關注投訴問題,改善服務品質\n"
        
        report += "• 持續優化自動化流程"
        
        return {
            "final_report": report,
            "report_generated_at": "2024-01-15 10:30:00"
        }
    
    def run_email_workflow(self, workflow_type: str, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """執行郵件工作流程"""
        print(f"🚀 啟動 {workflow_type} 工作流程...")
        
        result = self.engine.execute_workflow(workflow_type, inputs)
        
        if result['success']:
            print("✅ 工作流程執行完成!")
        else:
            print(f"❌ 工作流程執行失敗:{result['error']}")
        
        return result

3. 主程式 (main.py)

from workflows.email_automation import EmailAutomation

def main():
    """自動化工作流程示例"""
    print("🔄 自動化工作流程系統")
    print("⚡ 支援郵件處理、報告生成等自動化任務")
    print("=" * 50)
    
    automation = EmailAutomation()
    
    while True:
        print("\n選擇自動化工作流程:")
        print("1. 客戶諮詢郵件處理")
        print("2. 生成績效報告")
        print("3. 查看工作流程狀態")
        print("4. 退出")
        
        choice = input("\n請選擇 (1-4):").strip()
        
        try:
            if choice == '1':
                print("\n📧 客戶諮詢郵件自動處理")
                email_content = input("請輸入郵件內容:").strip()
                
                if email_content:
                    inputs = {"email_content": email_content}
                    result = automation.run_email_workflow("customer_inquiry", inputs)
                    
                    if result['success']:
                        final_email = result['outputs'].get('formatted_email', '')
                        print(f"\n✉️ 生成的回覆郵件:\n{final_email}")
                else:
                    print("❌ 請輸入郵件內容")
            
            elif choice == '2':
                print("\n📊 自動生成績效報告")
                report_type = input("報告類型 (daily/weekly):").strip() or "daily"
                date_range = input("日期範圍:").strip() or "2024-01-15"
                
                inputs = {
                    "report_type": report_type,
                    "date_range": date_range
                }
                result = automation.run_email_workflow("generate_report", inputs)
                
                if result['success']:
                    report = result['outputs'].get('final_report', '')
                    print(f"\n📋 生成的報告:\n{report}")
            
            elif choice == '3':
                print("\n📈 工作流程狀態")
                workflows = ["customer_inquiry", "generate_report"]
                
                for wf_id in workflows:
                    status = automation.engine.get_workflow_status(wf_id)
                    if status['exists']:
                        print(f"• {wf_id}: 已執行 {status['execution_count']} 次")
                    else:
                        print(f"• {wf_id}: 不存在")
            
            elif choice == '4':
                print("👋 再見!")
                break
            
            else:
                print("❌ 無效的選擇")
                
        except KeyboardInterrupt:
            print("\n👋 再見!")
            break
        except Exception as e:
            print(f"❌ 發生錯誤:{e}")

if __name__ == "__main__":
    main()

🎯 工作流程特色

任務依賴管理:自動處理任務間的執行順序
錯誤處理:單個任務失敗不影響整體流程追蹤
執行記錄:完整的執行歷史和結果記錄
靈活配置:可輕鬆添加新的工作流程
實用場景:郵件處理、報告生成等常見自動化需求

🚀 使用示範

選擇自動化工作流程:
1. 客戶諮詢郵件處理

請輸入郵件內容:我想了解你們的產品價格

🚀 啟動 customer_inquiry 工作流程...
⚡ 執行任務: 分類客戶諮詢
✅ 任務完成: 分類客戶諮詢
⚡ 執行任務: 生成回覆內容
✅ 任務完成: 生成回覆內容
⚡ 執行任務: 格式化郵件
✅ 任務完成: 格式化郵件
✅ 工作流程執行完成!

✉️ 生成的回覆郵件:
主旨:Re: 您的諮詢 - 我們的回覆

感謝您對我們產品的關注!關於產品價格...

今天我們建立了基礎的自動化工作流程系統,讓 AI 助理能夠自動執行多步驟任務。明天我們將學習條件判斷與分支邏輯,讓工作流程變得更智能!


上一篇
Day 14: 週總結:打造智能客服系統
系列文
30 天從零到 AI 助理:Gemini CLI 與 LangGraph 輕鬆上手15
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言