iT邦幫忙

2025 iThome 鐵人賽

DAY 23
0

前言

今天我們要探討安全性和合規性的部分,如何讓 AWS Bedrock 和 AWS Sagemaker
在企業內部實施安全控制措施,確保符合各項 AI 規範需求

為什麼安全性與合規性如此重要?

以下為我們必須要考量的事情

  • 資料隱私保護:確保敏感資料不會外洩或被不當使用,特別是個人識別資訊(PII)和商業機密。
  • 法規遵循:滿足 GDPR、HIPAA、SOC 2 等各種產業法規要求。
  • 存取控制:確保只有授權人員能夠存取 AI 模型和訓練資料。
  • 稽核追蹤:記錄所有操作,以便事後稽核和問題追查。
  • 模型安全:防止模型被惡意攻擊或產生有害輸出

AWS 安全規範

如果有考過 AWS 認證,或是對 AWS 本身有了解的人,基本上對『共同責任模型』是相對熟的,
基本上意思為,AWS 的安全責任,為使用該服務的(客戶)和 AWS 共同承擔

AWS 負責雲端本身的安全,像是『基礎設施,硬體,網路』等
用戶則負責雲端中的安全,像是『資料加密,存取控制,應用的配置』等等

IAM 身份以及身份存取

基本上遵循『最小權限原則』可以控制身份或是權限不會被濫用

我們以 Bedrock 為例 :

import boto3
import json

def create_bedrock_role():
    iam = boto3.client('iam')
    
    # 定義信任政策
    trust_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "bedrock.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }
    
    # 定義權限政策 - 僅允許必要的操作
    permissions_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "bedrock:InvokeModel",
                    "bedrock:InvokeModelWithResponseStream"
                ],
                "Resource": "arn:aws:bedrock:*:*:model/*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject"
                ],
                "Resource": "arn:aws:s3:::my-secure-bucket/*",
                "Condition": {
                    "StringEquals": {
                        "s3:ExistingObjectTag/Classification": "Approved"
                    }
                }
            }
        ]
    }
    
    # 建立角色
    try:
        response = iam.create_role(
            RoleName='BedrockSecureRole',
            AssumeRolePolicyDocument=json.dumps(trust_policy),
            Description='Secure role for Bedrock access',
            MaxSessionDuration=3600
        )
        
        # 附加權限政策
        iam.put_role_policy(
            RoleName='BedrockSecureRole',
            PolicyName='BedrockMinimalPermissions',
            PolicyDocument=json.dumps(permissions_policy)
        )
        
        print(f"Role created: {response['Role']['Arn']}")
        return response['Role']['Arn']
        
    except Exception as e:
        print(f"Error creating role: {str(e)}")
        return None

我們以 SageMaker 為例

def create_sagemaker_execution_role():
    iam = boto3.client('iam')
    
    trust_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "sagemaker.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }
    
    # 更細緻的權限控制
    permissions_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject"
                ],
                "Resource": [
                    "arn:aws:s3:::sagemaker-training-data/*",
                    "arn:aws:s3:::sagemaker-models/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents"
                ],
                "Resource": "arn:aws:logs:*:*:log-group:/aws/sagemaker/*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "ecr:GetAuthorizationToken",
                    "ecr:BatchCheckLayerAvailability",
                    "ecr:GetDownloadUrlForLayer",
                    "ecr:BatchGetImage"
                ],
                "Resource": "*"
            }
        ]
    }
    
    try:
        response = iam.create_role(
            RoleName='SageMakerSecureExecutionRole',
            AssumeRolePolicyDocument=json.dumps(trust_policy),
            Description='Secure execution role for SageMaker'
        )
        
        iam.put_role_policy(
            RoleName='SageMakerSecureExecutionRole',
            PolicyName='SageMakerMinimalPermissions',
            PolicyDocument=json.dumps(permissions_policy)
        )
        
        return response['Role']['Arn']
        
    except Exception as e:
        print(f"Error: {str(e)}")
        return None

資料加密策略

使用 AWS KMS(Key Management Service)來管理加密金鑰,確保資料在 S3 中的安全

可以參考 AWS KMS

以下為使用 python 設定的例子

import boto3
from botocore.exceptions import ClientError

class SecureDataManager:
    def __init__(self, kms_key_id):
        self.s3 = boto3.client('s3')
        self.kms = boto3.client('kms')
        self.kms_key_id = kms_key_id
    
    def create_encrypted_bucket(self, bucket_name):
        """建立啟用加密的 S3 bucket"""
        try:
            # 建立 bucket
            self.s3.create_bucket(
                Bucket=bucket_name,
                CreateBucketConfiguration={
                    'LocationConstraint': 'ap-northeast-1'
                }
            )
            
            # 啟用預設加密
            self.s3.put_bucket_encryption(
                Bucket=bucket_name,
                ServerSideEncryptionConfiguration={
                    'Rules': [
                        {
                            'ApplyServerSideEncryptionByDefault': {
                                'SSEAlgorithm': 'aws:kms',
                                'KMSMasterKeyID': self.kms_key_id
                            },
                            'BucketKeyEnabled': True
                        }
                    ]
                }
            )
            
            # 啟用版本控制
            self.s3.put_bucket_versioning(
                Bucket=bucket_name,
                VersioningConfiguration={'Status': 'Enabled'}
            )
            
            # 封鎖公開存取
            self.s3.put_public_access_block(
                Bucket=bucket_name,
                PublicAccessBlockConfiguration={
                    'BlockPublicAcls': True,
                    'IgnorePublicAcls': True,
                    'BlockPublicPolicy': True,
                    'RestrictPublicBuckets': True
                }
            )
            
            print(f"Secure bucket created: {bucket_name}")
            return True
            
        except ClientError as e:
            print(f"Error creating bucket: {str(e)}")
            return False
    
    def upload_encrypted_file(self, bucket_name, file_path, s3_key):
        """上傳加密檔案到 S3"""
        try:
            with open(file_path, 'rb') as file:
                self.s3.put_object(
                    Bucket=bucket_name,
                    Key=s3_key,
                    Body=file,
                    ServerSideEncryption='aws:kms',
                    SSEKMSKeyId=self.kms_key_id,
                    Metadata={
                        'classification': 'sensitive',
                        'encrypted': 'true'
                    }
                )
            
            print(f"File uploaded with encryption: {s3_key}")
            return True
            
        except ClientError as e:
            print(f"Error uploading file: {str(e)}")
            return False

傳輸中資料加密

  1. 我們會用 vpc Endpoint 避免資料上到公網
  2. 盡可能每個 API使用 https 進行傳輸

以下為 python 使用 vpc Endpoint 的操作

def create_vpc_endpoints():
    """建立 VPC Endpoint 以確保私密連線"""
    ec2 = boto3.client('ec2')
    
    # 為 SageMaker 建立 Interface Endpoint
    try:
        response = ec2.create_vpc_endpoint(
            VpcId='vpc-xxxxxxxxx',
            ServiceName='com.amazonaws.ap-northeast-1.sagemaker.runtime',
            VpcEndpointType='Interface',
            SubnetIds=['subnet-xxxxxxxxx'],
            SecurityGroupIds=['sg-xxxxxxxxx'],
            PrivateDnsEnabled=True
        )
        
        print(f"VPC Endpoint created: {response['VpcEndpoint']['VpcEndpointId']}")
        
        # 為 Bedrock 建立 Interface Endpoint
        bedrock_response = ec2.create_vpc_endpoint(
            VpcId='vpc-xxxxxxxxx',
            ServiceName='com.amazonaws.ap-northeast-1.bedrock-runtime',
            VpcEndpointType='Interface',
            SubnetIds=['subnet-xxxxxxxxx'],
            SecurityGroupIds=['sg-xxxxxxxxx'],
            PrivateDnsEnabled=True
        )
        
        return True
        
    except ClientError as e:
        print(f"Error creating VPC endpoint: {str(e)}")
        return False

網路隔離與 VPC 配置

class SecureNetworkSetup:
    def __init__(self):
        self.ec2 = boto3.client('ec2')
    
    def create_isolated_vpc(self, vpc_cidr='10.0.0.0/16'):
        """建立隔離的 VPC 環境"""
        try:
            # 建立 VPC
            vpc_response = self.ec2.create_vpc(
                CidrBlock=vpc_cidr,
                TagSpecifications=[
                    {
                        'ResourceType': 'vpc',
                        'Tags': [
                            {'Key': 'Name', 'Value': 'AI-Workload-VPC'},
                            {'Key': 'Purpose', 'Value': 'Secure-AI-Training'}
                        ]
                    }
                ]
            )
            
            vpc_id = vpc_response['Vpc']['VpcId']
            
            # 建立私有子網路
            private_subnet = self.ec2.create_subnet(
                VpcId=vpc_id,
                CidrBlock='10.0.1.0/24',
                AvailabilityZone='ap-northeast-1a',
                TagSpecifications=[
                    {
                        'ResourceType': 'subnet',
                        'Tags': [
                            {'Key': 'Name', 'Value': 'AI-Private-Subnet'},
                            {'Key': 'Type', 'Value': 'Private'}
                        ]
                    }
                ]
            )
            
            # 建立安全群組
            security_group = self.ec2.create_security_group(
                GroupName='AI-Workload-SG',
                Description='Security group for AI workloads',
                VpcId=vpc_id
            )
            
            sg_id = security_group['GroupId']
            
            # 僅允許內部流量
            self.ec2.authorize_security_group_ingress(
                GroupId=sg_id,
                IpPermissions=[
                    {
                        'IpProtocol': 'tcp',
                        'FromPort': 443,
                        'ToPort': 443,
                        'IpRanges': [{'CidrIp': vpc_cidr}]
                    }
                ]
            )
            
            print(f"Isolated VPC created: {vpc_id}")
            return vpc_id, sg_id
            
        except ClientError as e:
            print(f"Error: {str(e)}")
            return None, None

稽核與紀錄 - AWS CloudTrail 以及 CloudWatch Logs

class AuditingSetup:
    def __init__(self):
        self.cloudtrail = boto3.client('cloudtrail')
        self.logs = boto3.client('logs')
        self.s3 = boto3.client('s3')
    
    def enable_comprehensive_logging(self, trail_name, bucket_name):
        """啟用完整的稽核日誌"""
        try:
            # 建立 CloudTrail
            response = self.cloudtrail.create_trail(
                Name=trail_name,
                S3BucketName=bucket_name,
                IncludeGlobalServiceEvents=True,
                IsMultiRegionTrail=True,
                EnableLogFileValidation=True,
                IsOrganizationTrail=False
            )
            
            # 啟用 CloudTrail
            self.cloudtrail.start_logging(Name=trail_name)
            
            # 設定事件選擇器以記錄資料事件
            self.cloudtrail.put_event_selectors(
                TrailName=trail_name,
                EventSelectors=[
                    {
                        'ReadWriteType': 'All',
                        'IncludeManagementEvents': True,
                        'DataResources': [
                            {
                                'Type': 'AWS::S3::Object',
                                'Values': ['arn:aws:s3:::*/']
                            },
                            {
                                'Type': 'AWS::Lambda::Function',
                                'Values': ['arn:aws:lambda:*:*:function/*']
                            }
                        ]
                    }
                ]
            )
            
            print(f"CloudTrail enabled: {trail_name}")
            return True
            
        except ClientError as e:
            print(f"Error enabling logging: {str(e)}")
            return False
    
    def create_log_metric_alarm(self, log_group_name):
        """建立日誌指標和告警"""
        cloudwatch = boto3.client('cloudwatch')
        
        try:
            # 建立指標篩選器 - 監控未授權的 API 呼叫
            self.logs.put_metric_filter(
                logGroupName=log_group_name,
                filterName='UnauthorizedAPICalls',
                filterPattern='{ ($.errorCode = "*UnauthorizedOperation") || '
                             '($.errorCode = "AccessDenied*") }',
                metricTransformations=[
                    {
                        'metricName': 'UnauthorizedAPICalls',
                        'metricNamespace': 'CloudTrail/SecurityMetrics',
                        'metricValue': '1'
                    }
                ]
            )
            
            # 建立告警
            cloudwatch.put_metric_alarm(
                AlarmName='UnauthorizedAPICallsAlarm',
                AlarmDescription='Alert on unauthorized API calls',
                MetricName='UnauthorizedAPICalls',
                Namespace='CloudTrail/SecurityMetrics',
                Statistic='Sum',
                Period=300,
                EvaluationPeriods=1,
                Threshold=1,
                ComparisonOperator='GreaterThanOrEqualToThreshold',
                TreatMissingData='notBreaching'
            )
            
            print("Log metric alarm created successfully")
            return True
            
        except ClientError as e:
            print(f"Error: {str(e)}")
            return False

Bedrock 過濾幻覺以及額外的內容 (內容審查) - Bedrock Guardrail

可以參考 guardrails官方文件

class BedrockSecurityGuardrails:
    def __init__(self):
        self.bedrock = boto3.client('bedrock')
    
    def create_content_guardrail(self):
        """建立內容安全防護"""
        try:
            response = self.bedrock.create_guardrail(
                name='EnterpriseContentGuardrail',
                description='Content filtering for enterprise use',
                topicPolicyConfig={
                    'topicsConfig': [
                        {
                            'name': 'SensitivePersonalInfo',
                            'definition': 'Block requests containing personal '
                                        'identification information',
                            'examples': [
                                'What is John Doe\'s social security number?',
                                'Give me credit card numbers'
                            ],
                            'type': 'DENY'
                        },
                        {
                            'name': 'FinancialAdvice',
                            'definition': 'Block financial advice requests',
                            'examples': [
                                'Should I invest in this stock?',
                                'What should I do with my retirement fund?'
                            ],
                            'type': 'DENY'
                        }
                    ]
                },
                contentPolicyConfig={
                    'filtersConfig': [
                        {
                            'type': 'SEXUAL',
                            'inputStrength': 'HIGH',
                            'outputStrength': 'HIGH'
                        },
                        {
                            'type': 'VIOLENCE',
                            'inputStrength': 'HIGH',
                            'outputStrength': 'HIGH'
                        },
                        {
                            'type': 'HATE',
                            'inputStrength': 'HIGH',
                            'outputStrength': 'HIGH'
                        },
                        {
                            'type': 'INSULTS',
                            'inputStrength': 'MEDIUM',
                            'outputStrength': 'MEDIUM'
                        },
                        {
                            'type': 'MISCONDUCT',
                            'inputStrength': 'MEDIUM',
                            'outputStrength': 'MEDIUM'
                        },
                        {
                            'type': 'PROMPT_ATTACK',
                            'inputStrength': 'HIGH',
                            'outputStrength': 'NONE'
                        }
                    ]
                },
                wordPolicyConfig={
                    'wordsConfig': [
                        {'text': 'CONFIDENTIAL'},
                        {'text': 'INTERNAL_ONLY'}
                    ],
                    'managedWordListsConfig': [
                        {'type': 'PROFANITY'}
                    ]
                },
                sensitiveInformationPolicyConfig={
                    'piiEntitiesConfig': [
                        {'type': 'EMAIL', 'action': 'BLOCK'},
                        {'type': 'PHONE', 'action': 'BLOCK'},
                        {'type': 'CREDIT_DEBIT_CARD_NUMBER', 'action': 'BLOCK'},
                        {'type': 'SSN', 'action': 'BLOCK'}
                    ],
                    'regexesConfig': [
                        {
                            'name': 'InternalDocumentID',
                            'description': 'Internal document identifiers',
                            'pattern': r'DOC-\d{6}',
                            'action': 'BLOCK'
                        }
                    ]
                },
                blockedInputMessaging='Your request contains blocked content. '
                                    'Please rephrase without sensitive information.',
                blockedOutputsMessaging='The response contains blocked content.'
            )
            
            guardrail_id = response['guardrailId']
            print(f"Guardrail created: {guardrail_id}")
            return guardrail_id
            
        except Exception as e:
            print(f"Error creating guardrail: {str(e)}")
            return None
    
    def invoke_model_with_guardrail(self, model_id, guardrail_id, prompt):
        """使用 Guardrail 呼叫模型"""
        bedrock_runtime = boto3.client('bedrock-runtime')
        
        try:
            response = bedrock_runtime.invoke_model(
                modelId=model_id,
                body=json.dumps({
                    "anthropic_version": "bedrock-2023-05-31",
                    "max_tokens": 1000,
                    "messages": [
                        {
                            "role": "user",
                            "content": prompt
                        }
                    ]
                }),
                guardrailIdentifier=guardrail_id,
                guardrailVersion='DRAFT'
            )
            
            return json.loads(response['body'].read())
            
        except Exception as e:
            print(f"Error: {str(e)}")
            return None

SageMaker 網路隔離模式

在訓練和部署時啟用網路隔離,防止模型存取網際網路

import sagemaker
from sagemaker.pytorch import PyTorch

class SecureSageMakerTraining:
    def __init__(self, role_arn, vpc_config):
        self.role = role_arn
        self.vpc_config = vpc_config
        self.session = sagemaker.Session()
    
    def train_with_network_isolation(self, script_path, instance_type):
        """使用網路隔離進行訓練"""
        
        estimator = PyTorch(
            entry_point=script_path,
            role=self.role,
            instance_type=instance_type,
            instance_count=1,
            framework_version='2.0',
            py_version='py310',
            
            # 啟用網路隔離
            enable_network_isolation=True,
            
            # VPC 配置
            subnets=self.vpc_config['subnets'],
            security_group_ids=self.vpc_config['security_groups'],
            
            # 加密配置
            encrypt_inter_container_traffic=True,
            
            # 輸出加密
            output_kms_key='arn:aws:kms:region:account:key/key-id',
            
            # 額外的安全設定
            volume_kms_key='arn:aws:kms:region:account:key/key-id',
            
            # 標籤
            tags=[
                {'Key': 'Project', 'Value': 'SecureAI'},
                {'Key': 'Environment', 'Value': 'Production'},
                {'Key': 'Compliance', 'Value': 'Required'}
            ]
        )
        
        # 開始訓練
        estimator.fit(
            inputs={
                'training': 's3://secure-bucket/training-data/'
            },
            wait=False
        )
        
        return estimator

合規檢查清單

資料保護檢查清單

  • 靜態加密:所有儲存在 S3 的資料都使用 KMS 加密
  • 傳輸加密:所有 API 呼叫都透過 HTTPS 和 VPC Endpoint
  • 資料分類:敏感資料已正確標記和分類
  • 資料留存:符合資料保留政策和法規要求
  • 資料銷毀:不再需要的資料已安全刪除

存取控制檢查清單

  • 最小權限:IAM 角色遵循最小權限原則
  • MFA 啟用:關鍵操作需要多因素認證
  • 密碼政策:強密碼政策已啟用
  • 定期審查:定期審查和撤銷不必要的權限
  • 職責分離:關鍵角色已適當分離

監控與稽核檢查清單

  • CloudTrail 啟用:所有 API 呼叫都被記錄
  • Log保留:日誌按照政策要求保留
  • 告警設定:異常活動會觸發告警
  • 定期審查:定期審查日誌和存取模式
  • 事件回應:建立安全事件回應計畫

實作安全架構

整合以上所有組件,建立一個完整的安全架構

class EnterpriseSecureAIStack:
    def __init__(self, stack_name):
        self.stack_name = stack_name
        self.region = 'ap-northeast-1'
        
    def deploy_secure_infrastructure(self):
        """部署完整的安全基礎架構"""
        
        print(f"Deploying {self.stack_name}...")
        
        # 1. 建立 KMS 金鑰
        kms = boto3.client('kms')
        key_response = kms.create_key(
            Description=f'Master key for {self.stack_name}',
            KeyUsage='ENCRYPT_DECRYPT',
            Origin='AWS_KMS',
            MultiRegion=False,
            Tags=[
                {'TagKey': 'Stack', 'TagValue': self.stack_name}
            ]
        )
        kms_key_id = key_response['KeyMetadata']['KeyId']
        print(f"✓ KMS key created: {kms_key_id}")
        
        # 2. 建立安全網路
        network_setup = SecureNetworkSetup()
        vpc_id, sg_id = network_setup.create_isolated_vpc()
        print(f"✓ VPC created: {vpc_id}")
        
        # 3. 建立加密的 S3 buckets
        data_manager = SecureDataManager(kms_key_id)
        data_manager.create_encrypted_bucket(f'{self.stack_name}-data')
        data_manager.create_encrypted_bucket(f'{self.stack_name}-models')
        data_manager.create_encrypted_bucket(f'{self.stack_name}-logs')
        print("✓ Encrypted S3 buckets created")
        
        # 4. 建立 IAM 角色
        bedrock_role = create_bedrock_role()
        sagemaker_role = create_sagemaker_execution_role()
        print("✓ IAM roles created")
        
        # 5. 啟用稽核日誌
        audit_setup = AuditingSetup()
        audit_setup.enable_comprehensive_logging(
            trail_name=f'{self.stack_name}-trail',
            bucket_name=f'{self.stack_name}-logs'
        )
        print("✓ Audit logging enabled")
        
        # 6. 建立 Bedrock Guardrails
        guardrails = BedrockSecurityGuardrails()
        guardrail_id = guardrails.create_content_guardrail()
        print(f"✓ Guardrails created: {guardrail_id}")
        
        # 7. 建立監控告警
        audit_setup.create_log_metric_alarm(f'/aws/cloudtrail/{self.stack_name}')
        print("✓ Monitoring alarms created")
        
        print(f"\n{self.stack_name} deployment completed successfully!")
        
        return {
            'kms_key_id': kms_key_id,
            'vpc_id': vpc_id,
            'security_group_id': sg_id,
            'bedrock_role_arn': bedrock_role,
            'sagemaker_role_arn': sagemaker_role,
            'guardrail_id': guardrail_id
        }

# 使用範例
if __name__ == '__main__':
    stack = EnterpriseSecureAIStack('production-ai-workload')
    infrastructure = stack.deploy_secure_infrastructure()
    
    print("\nInfrastructure details:")
    for key, value in infrastructure.items():
        print(f"  {key}: {value}")

合規性監控

class ComplianceMonitor:
    def __init__(self):
        self.config = boto3.client('config')
        self.securityhub = boto3.client('securityhub')
    
    def enable_aws_config_rules(self):
        """啟用 AWS Config 規則以監控合規性"""
        
        rules = [
            {
                'ConfigRuleName': 's3-bucket-encryption-enabled',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'S3_BUCKET_SERVER_SIDE_ENCRYPTION_ENABLED'
                }
            },
            {
                'ConfigRuleName': 'iam-password-policy',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'IAM_PASSWORD_POLICY'
                }
            },
            {
                'ConfigRuleName': 'cloudtrail-enabled',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'CLOUD_TRAIL_ENABLED'
                }
            },
            {
                'ConfigRuleName': 'vpc-sg-open-only-to-authorized-ports',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'VPC_SG_OPEN_ONLY_TO_AUTHORIZED_PORTS'
                }
            }
        ]
        
        for rule in rules:
            try:
                self.config.put_config_rule(ConfigRule=rule)
                print(f"✓ Config rule enabled: {rule['ConfigRuleName']}")
            except Exception as e:
                print(f"✗ Error enabling {rule['ConfigRuleName']}: {str(e)}")
    
    def check_compliance_status(self):
        """檢查所有資源的合規性狀態"""
        try:
            response = self.config.describe_compliance_by_config_rule()
            
            compliance_summary = {
                'compliant': 0,
                'non_compliant': 0,
                'not_applicable': 0
            }
            
            print("\n=== Compliance Status ===")
            for rule in response['ComplianceByConfigRules']:
                rule_name = rule['ConfigRuleName']
                compliance = rule.get('Compliance', {})
                status = compliance.get('ComplianceType', 'UNKNOWN')
                
                if status == 'COMPLIANT':
                    compliance_summary['compliant'] += 1
                    print(f"✓ {rule_name}: COMPLIANT")
                elif status == 'NON_COMPLIANT':
                    compliance_summary['non_compliant'] += 1
                    print(f"✗ {rule_name}: NON_COMPLIANT")
                else:
                    compliance_summary['not_applicable'] += 1
                    print(f"- {rule_name}: {status}")
            
            print(f"\nSummary:")
            print(f"  Compliant: {compliance_summary['compliant']}")
            print(f"  Non-Compliant: {compliance_summary['non_compliant']}")
            print(f"  Not Applicable: {compliance_summary['not_applicable']}")
            
            return compliance_summary
            
        except Exception as e:
            print(f"Error checking compliance: {str(e)}")
            return None
    
    def enable_security_hub(self):
        """啟用 AWS Security Hub 進行集中安全監控"""
        try:
            self.securityhub.enable_security_hub(
                EnableDefaultStandards=True
            )
            
            # 啟用特定標準
            standards = [
                'arn:aws:securityhub:ap-northeast-1::standards/aws-foundational-security-best-practices/v/1.0.0',
                'arn:aws:securityhub:ap-northeast-1::standards/cis-aws-foundations-benchmark/v/1.2.0'
            ]
            
            for standard in standards:
                self.securityhub.batch_enable_standards(
                    StandardsSubscriptionRequests=[
                        {'StandardsArn': standard}
                    ]
                )
            
            print("✓ Security Hub enabled with standards")
            return True
            
        except Exception as e:
            print(f"Error enabling Security Hub: {str(e)}")
            return False

資料遮罩與去識別化

import re
import hashlib

class DataMasking:
    def __init__(self):
        self.pii_patterns = {
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
            'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
            'taiwan_id': r'\b[A-Z][12]\d{8}\b'
        }
    
    def detect_pii(self, text):
        """偵測文本中的個人識別資訊"""
        detected = {}
        
        for pii_type, pattern in self.pii_patterns.items():
            matches = re.findall(pattern, text)
            if matches:
                detected[pii_type] = matches
        
        return detected
    
    def mask_pii(self, text, mask_char='*'):
        """遮罩文本中的 PII"""
        masked_text = text
        
        # Email 遮罩
        masked_text = re.sub(
            self.pii_patterns['email'],
            lambda m: m.group(0).split('@')[0][:2] + '***@' + m.group(0).split('@')[1],
            masked_text
        )
        
        # 電話號碼遮罩
        masked_text = re.sub(
            self.pii_patterns['phone'],
            lambda m: '***-***-' + m.group(0)[-4:],
            masked_text
        )
        
        # 信用卡號遮罩
        masked_text = re.sub(
            self.pii_patterns['credit_card'],
            lambda m: mask_char * 12 + m.group(0)[-4:],
            masked_text
        )
        
        # 台灣身分證號遮罩
        masked_text = re.sub(
            self.pii_patterns['taiwan_id'],
            lambda m: m.group(0)[0] + mask_char * 8 + m.group(0)[-1],
            masked_text
        )
        
        return masked_text
    
    def anonymize_with_hash(self, text):
        """使用雜湊進行匿名化"""
        detected_pii = self.detect_pii(text)
        anonymized_text = text
        
        for pii_type, matches in detected_pii.items():
            for match in matches:
                # 使用 SHA-256 雜湊
                hashed = hashlib.sha256(match.encode()).hexdigest()[:16]
                anonymized_text = anonymized_text.replace(
                    match, 
                    f"[{pii_type.upper()}:{hashed}]"
                )
        
        return anonymized_text
    
    def safe_logging(self, message):
        """安全的日誌記錄 - 自動遮罩 PII"""
        masked_message = self.mask_pii(message)
        print(f"[SAFE LOG] {masked_message}")
        return masked_message

# 使用範例
masking = DataMasking()

# 測試文本
test_text = """
客戶資訊:
姓名:王小明
Email: wang.xiaoming@example.com
電話:0912-345-678
身分證字號:A123456789
"""

print("原始文本:")
print(test_text)

print("\n偵測到的 PII:")
detected = masking.detect_pii(test_text)
for pii_type, matches in detected.items():
    print(f"  {pii_type}: {matches}")

print("\n遮罩後的文本:")
masked = masking.mask_pii(test_text)
print(masked)

print("\n匿名化後的文本:")
anonymized = masking.anonymize_with_hash(test_text)
print(anonymized)

目前先探討到此


上一篇
監控與日誌分析實作
下一篇
Auto Scaling與高可用性設計
系列文
從零開始的AWS AI之路:用Bedrock與SageMaker打造智慧應用的30天實戰24
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言