今天我們要探討安全性和合規性的部分,如何讓 AWS Bedrock 和 AWS Sagemaker
在企業內部實施安全控制措施,確保符合各項 AI 規範需求
以下為我們必須要考量的事情
如果有考過 AWS 認證,或是對 AWS 本身有了解的人,基本上對『共同責任模型』是相對熟的,
基本上意思為,AWS 的安全責任,為使用該服務的(客戶)和 AWS 共同承擔
AWS 負責雲端本身的安全,像是『基礎設施,硬體,網路』等
用戶則負責雲端中的安全,像是『資料加密,存取控制,應用的配置』等等
基本上遵循『最小權限原則』可以控制身份或是權限不會被濫用
import boto3
import json
def create_bedrock_role():
iam = boto3.client('iam')
# 定義信任政策
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "bedrock.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
# 定義權限政策 - 僅允許必要的操作
permissions_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream"
],
"Resource": "arn:aws:bedrock:*:*:model/*"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject"
],
"Resource": "arn:aws:s3:::my-secure-bucket/*",
"Condition": {
"StringEquals": {
"s3:ExistingObjectTag/Classification": "Approved"
}
}
}
]
}
# 建立角色
try:
response = iam.create_role(
RoleName='BedrockSecureRole',
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description='Secure role for Bedrock access',
MaxSessionDuration=3600
)
# 附加權限政策
iam.put_role_policy(
RoleName='BedrockSecureRole',
PolicyName='BedrockMinimalPermissions',
PolicyDocument=json.dumps(permissions_policy)
)
print(f"Role created: {response['Role']['Arn']}")
return response['Role']['Arn']
except Exception as e:
print(f"Error creating role: {str(e)}")
return None
def create_sagemaker_execution_role():
iam = boto3.client('iam')
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "sagemaker.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
# 更細緻的權限控制
permissions_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::sagemaker-training-data/*",
"arn:aws:s3:::sagemaker-models/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:log-group:/aws/sagemaker/*"
},
{
"Effect": "Allow",
"Action": [
"ecr:GetAuthorizationToken",
"ecr:BatchCheckLayerAvailability",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage"
],
"Resource": "*"
}
]
}
try:
response = iam.create_role(
RoleName='SageMakerSecureExecutionRole',
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description='Secure execution role for SageMaker'
)
iam.put_role_policy(
RoleName='SageMakerSecureExecutionRole',
PolicyName='SageMakerMinimalPermissions',
PolicyDocument=json.dumps(permissions_policy)
)
return response['Role']['Arn']
except Exception as e:
print(f"Error: {str(e)}")
return None
使用 AWS KMS(Key Management Service)
來管理加密金鑰,確保資料在 S3 中的安全
可以參考 AWS KMS
以下為使用 python 設定的例子
import boto3
from botocore.exceptions import ClientError
class SecureDataManager:
def __init__(self, kms_key_id):
self.s3 = boto3.client('s3')
self.kms = boto3.client('kms')
self.kms_key_id = kms_key_id
def create_encrypted_bucket(self, bucket_name):
"""建立啟用加密的 S3 bucket"""
try:
# 建立 bucket
self.s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={
'LocationConstraint': 'ap-northeast-1'
}
)
# 啟用預設加密
self.s3.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'aws:kms',
'KMSMasterKeyID': self.kms_key_id
},
'BucketKeyEnabled': True
}
]
}
)
# 啟用版本控制
self.s3.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={'Status': 'Enabled'}
)
# 封鎖公開存取
self.s3.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': True
}
)
print(f"Secure bucket created: {bucket_name}")
return True
except ClientError as e:
print(f"Error creating bucket: {str(e)}")
return False
def upload_encrypted_file(self, bucket_name, file_path, s3_key):
"""上傳加密檔案到 S3"""
try:
with open(file_path, 'rb') as file:
self.s3.put_object(
Bucket=bucket_name,
Key=s3_key,
Body=file,
ServerSideEncryption='aws:kms',
SSEKMSKeyId=self.kms_key_id,
Metadata={
'classification': 'sensitive',
'encrypted': 'true'
}
)
print(f"File uploaded with encryption: {s3_key}")
return True
except ClientError as e:
print(f"Error uploading file: {str(e)}")
return False
- 我們會用
vpc Endpoint
避免資料上到公網- 盡可能每個 API使用 https 進行傳輸
以下為 python 使用 vpc Endpoint 的操作
def create_vpc_endpoints():
"""建立 VPC Endpoint 以確保私密連線"""
ec2 = boto3.client('ec2')
# 為 SageMaker 建立 Interface Endpoint
try:
response = ec2.create_vpc_endpoint(
VpcId='vpc-xxxxxxxxx',
ServiceName='com.amazonaws.ap-northeast-1.sagemaker.runtime',
VpcEndpointType='Interface',
SubnetIds=['subnet-xxxxxxxxx'],
SecurityGroupIds=['sg-xxxxxxxxx'],
PrivateDnsEnabled=True
)
print(f"VPC Endpoint created: {response['VpcEndpoint']['VpcEndpointId']}")
# 為 Bedrock 建立 Interface Endpoint
bedrock_response = ec2.create_vpc_endpoint(
VpcId='vpc-xxxxxxxxx',
ServiceName='com.amazonaws.ap-northeast-1.bedrock-runtime',
VpcEndpointType='Interface',
SubnetIds=['subnet-xxxxxxxxx'],
SecurityGroupIds=['sg-xxxxxxxxx'],
PrivateDnsEnabled=True
)
return True
except ClientError as e:
print(f"Error creating VPC endpoint: {str(e)}")
return False
class SecureNetworkSetup:
def __init__(self):
self.ec2 = boto3.client('ec2')
def create_isolated_vpc(self, vpc_cidr='10.0.0.0/16'):
"""建立隔離的 VPC 環境"""
try:
# 建立 VPC
vpc_response = self.ec2.create_vpc(
CidrBlock=vpc_cidr,
TagSpecifications=[
{
'ResourceType': 'vpc',
'Tags': [
{'Key': 'Name', 'Value': 'AI-Workload-VPC'},
{'Key': 'Purpose', 'Value': 'Secure-AI-Training'}
]
}
]
)
vpc_id = vpc_response['Vpc']['VpcId']
# 建立私有子網路
private_subnet = self.ec2.create_subnet(
VpcId=vpc_id,
CidrBlock='10.0.1.0/24',
AvailabilityZone='ap-northeast-1a',
TagSpecifications=[
{
'ResourceType': 'subnet',
'Tags': [
{'Key': 'Name', 'Value': 'AI-Private-Subnet'},
{'Key': 'Type', 'Value': 'Private'}
]
}
]
)
# 建立安全群組
security_group = self.ec2.create_security_group(
GroupName='AI-Workload-SG',
Description='Security group for AI workloads',
VpcId=vpc_id
)
sg_id = security_group['GroupId']
# 僅允許內部流量
self.ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=[
{
'IpProtocol': 'tcp',
'FromPort': 443,
'ToPort': 443,
'IpRanges': [{'CidrIp': vpc_cidr}]
}
]
)
print(f"Isolated VPC created: {vpc_id}")
return vpc_id, sg_id
except ClientError as e:
print(f"Error: {str(e)}")
return None, None
class AuditingSetup:
def __init__(self):
self.cloudtrail = boto3.client('cloudtrail')
self.logs = boto3.client('logs')
self.s3 = boto3.client('s3')
def enable_comprehensive_logging(self, trail_name, bucket_name):
"""啟用完整的稽核日誌"""
try:
# 建立 CloudTrail
response = self.cloudtrail.create_trail(
Name=trail_name,
S3BucketName=bucket_name,
IncludeGlobalServiceEvents=True,
IsMultiRegionTrail=True,
EnableLogFileValidation=True,
IsOrganizationTrail=False
)
# 啟用 CloudTrail
self.cloudtrail.start_logging(Name=trail_name)
# 設定事件選擇器以記錄資料事件
self.cloudtrail.put_event_selectors(
TrailName=trail_name,
EventSelectors=[
{
'ReadWriteType': 'All',
'IncludeManagementEvents': True,
'DataResources': [
{
'Type': 'AWS::S3::Object',
'Values': ['arn:aws:s3:::*/']
},
{
'Type': 'AWS::Lambda::Function',
'Values': ['arn:aws:lambda:*:*:function/*']
}
]
}
]
)
print(f"CloudTrail enabled: {trail_name}")
return True
except ClientError as e:
print(f"Error enabling logging: {str(e)}")
return False
def create_log_metric_alarm(self, log_group_name):
"""建立日誌指標和告警"""
cloudwatch = boto3.client('cloudwatch')
try:
# 建立指標篩選器 - 監控未授權的 API 呼叫
self.logs.put_metric_filter(
logGroupName=log_group_name,
filterName='UnauthorizedAPICalls',
filterPattern='{ ($.errorCode = "*UnauthorizedOperation") || '
'($.errorCode = "AccessDenied*") }',
metricTransformations=[
{
'metricName': 'UnauthorizedAPICalls',
'metricNamespace': 'CloudTrail/SecurityMetrics',
'metricValue': '1'
}
]
)
# 建立告警
cloudwatch.put_metric_alarm(
AlarmName='UnauthorizedAPICallsAlarm',
AlarmDescription='Alert on unauthorized API calls',
MetricName='UnauthorizedAPICalls',
Namespace='CloudTrail/SecurityMetrics',
Statistic='Sum',
Period=300,
EvaluationPeriods=1,
Threshold=1,
ComparisonOperator='GreaterThanOrEqualToThreshold',
TreatMissingData='notBreaching'
)
print("Log metric alarm created successfully")
return True
except ClientError as e:
print(f"Error: {str(e)}")
return False
可以參考 guardrails官方文件
class BedrockSecurityGuardrails:
def __init__(self):
self.bedrock = boto3.client('bedrock')
def create_content_guardrail(self):
"""建立內容安全防護"""
try:
response = self.bedrock.create_guardrail(
name='EnterpriseContentGuardrail',
description='Content filtering for enterprise use',
topicPolicyConfig={
'topicsConfig': [
{
'name': 'SensitivePersonalInfo',
'definition': 'Block requests containing personal '
'identification information',
'examples': [
'What is John Doe\'s social security number?',
'Give me credit card numbers'
],
'type': 'DENY'
},
{
'name': 'FinancialAdvice',
'definition': 'Block financial advice requests',
'examples': [
'Should I invest in this stock?',
'What should I do with my retirement fund?'
],
'type': 'DENY'
}
]
},
contentPolicyConfig={
'filtersConfig': [
{
'type': 'SEXUAL',
'inputStrength': 'HIGH',
'outputStrength': 'HIGH'
},
{
'type': 'VIOLENCE',
'inputStrength': 'HIGH',
'outputStrength': 'HIGH'
},
{
'type': 'HATE',
'inputStrength': 'HIGH',
'outputStrength': 'HIGH'
},
{
'type': 'INSULTS',
'inputStrength': 'MEDIUM',
'outputStrength': 'MEDIUM'
},
{
'type': 'MISCONDUCT',
'inputStrength': 'MEDIUM',
'outputStrength': 'MEDIUM'
},
{
'type': 'PROMPT_ATTACK',
'inputStrength': 'HIGH',
'outputStrength': 'NONE'
}
]
},
wordPolicyConfig={
'wordsConfig': [
{'text': 'CONFIDENTIAL'},
{'text': 'INTERNAL_ONLY'}
],
'managedWordListsConfig': [
{'type': 'PROFANITY'}
]
},
sensitiveInformationPolicyConfig={
'piiEntitiesConfig': [
{'type': 'EMAIL', 'action': 'BLOCK'},
{'type': 'PHONE', 'action': 'BLOCK'},
{'type': 'CREDIT_DEBIT_CARD_NUMBER', 'action': 'BLOCK'},
{'type': 'SSN', 'action': 'BLOCK'}
],
'regexesConfig': [
{
'name': 'InternalDocumentID',
'description': 'Internal document identifiers',
'pattern': r'DOC-\d{6}',
'action': 'BLOCK'
}
]
},
blockedInputMessaging='Your request contains blocked content. '
'Please rephrase without sensitive information.',
blockedOutputsMessaging='The response contains blocked content.'
)
guardrail_id = response['guardrailId']
print(f"Guardrail created: {guardrail_id}")
return guardrail_id
except Exception as e:
print(f"Error creating guardrail: {str(e)}")
return None
def invoke_model_with_guardrail(self, model_id, guardrail_id, prompt):
"""使用 Guardrail 呼叫模型"""
bedrock_runtime = boto3.client('bedrock-runtime')
try:
response = bedrock_runtime.invoke_model(
modelId=model_id,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"messages": [
{
"role": "user",
"content": prompt
}
]
}),
guardrailIdentifier=guardrail_id,
guardrailVersion='DRAFT'
)
return json.loads(response['body'].read())
except Exception as e:
print(f"Error: {str(e)}")
return None
在訓練和部署時啟用網路隔離,防止模型存取網際網路
import sagemaker
from sagemaker.pytorch import PyTorch
class SecureSageMakerTraining:
def __init__(self, role_arn, vpc_config):
self.role = role_arn
self.vpc_config = vpc_config
self.session = sagemaker.Session()
def train_with_network_isolation(self, script_path, instance_type):
"""使用網路隔離進行訓練"""
estimator = PyTorch(
entry_point=script_path,
role=self.role,
instance_type=instance_type,
instance_count=1,
framework_version='2.0',
py_version='py310',
# 啟用網路隔離
enable_network_isolation=True,
# VPC 配置
subnets=self.vpc_config['subnets'],
security_group_ids=self.vpc_config['security_groups'],
# 加密配置
encrypt_inter_container_traffic=True,
# 輸出加密
output_kms_key='arn:aws:kms:region:account:key/key-id',
# 額外的安全設定
volume_kms_key='arn:aws:kms:region:account:key/key-id',
# 標籤
tags=[
{'Key': 'Project', 'Value': 'SecureAI'},
{'Key': 'Environment', 'Value': 'Production'},
{'Key': 'Compliance', 'Value': 'Required'}
]
)
# 開始訓練
estimator.fit(
inputs={
'training': 's3://secure-bucket/training-data/'
},
wait=False
)
return estimator
資料保護檢查清單
存取控制檢查清單
監控與稽核檢查清單
整合以上所有組件,建立一個完整的安全架構
class EnterpriseSecureAIStack:
def __init__(self, stack_name):
self.stack_name = stack_name
self.region = 'ap-northeast-1'
def deploy_secure_infrastructure(self):
"""部署完整的安全基礎架構"""
print(f"Deploying {self.stack_name}...")
# 1. 建立 KMS 金鑰
kms = boto3.client('kms')
key_response = kms.create_key(
Description=f'Master key for {self.stack_name}',
KeyUsage='ENCRYPT_DECRYPT',
Origin='AWS_KMS',
MultiRegion=False,
Tags=[
{'TagKey': 'Stack', 'TagValue': self.stack_name}
]
)
kms_key_id = key_response['KeyMetadata']['KeyId']
print(f"✓ KMS key created: {kms_key_id}")
# 2. 建立安全網路
network_setup = SecureNetworkSetup()
vpc_id, sg_id = network_setup.create_isolated_vpc()
print(f"✓ VPC created: {vpc_id}")
# 3. 建立加密的 S3 buckets
data_manager = SecureDataManager(kms_key_id)
data_manager.create_encrypted_bucket(f'{self.stack_name}-data')
data_manager.create_encrypted_bucket(f'{self.stack_name}-models')
data_manager.create_encrypted_bucket(f'{self.stack_name}-logs')
print("✓ Encrypted S3 buckets created")
# 4. 建立 IAM 角色
bedrock_role = create_bedrock_role()
sagemaker_role = create_sagemaker_execution_role()
print("✓ IAM roles created")
# 5. 啟用稽核日誌
audit_setup = AuditingSetup()
audit_setup.enable_comprehensive_logging(
trail_name=f'{self.stack_name}-trail',
bucket_name=f'{self.stack_name}-logs'
)
print("✓ Audit logging enabled")
# 6. 建立 Bedrock Guardrails
guardrails = BedrockSecurityGuardrails()
guardrail_id = guardrails.create_content_guardrail()
print(f"✓ Guardrails created: {guardrail_id}")
# 7. 建立監控告警
audit_setup.create_log_metric_alarm(f'/aws/cloudtrail/{self.stack_name}')
print("✓ Monitoring alarms created")
print(f"\n{self.stack_name} deployment completed successfully!")
return {
'kms_key_id': kms_key_id,
'vpc_id': vpc_id,
'security_group_id': sg_id,
'bedrock_role_arn': bedrock_role,
'sagemaker_role_arn': sagemaker_role,
'guardrail_id': guardrail_id
}
# 使用範例
if __name__ == '__main__':
stack = EnterpriseSecureAIStack('production-ai-workload')
infrastructure = stack.deploy_secure_infrastructure()
print("\nInfrastructure details:")
for key, value in infrastructure.items():
print(f" {key}: {value}")
class ComplianceMonitor:
def __init__(self):
self.config = boto3.client('config')
self.securityhub = boto3.client('securityhub')
def enable_aws_config_rules(self):
"""啟用 AWS Config 規則以監控合規性"""
rules = [
{
'ConfigRuleName': 's3-bucket-encryption-enabled',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'S3_BUCKET_SERVER_SIDE_ENCRYPTION_ENABLED'
}
},
{
'ConfigRuleName': 'iam-password-policy',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'IAM_PASSWORD_POLICY'
}
},
{
'ConfigRuleName': 'cloudtrail-enabled',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'CLOUD_TRAIL_ENABLED'
}
},
{
'ConfigRuleName': 'vpc-sg-open-only-to-authorized-ports',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'VPC_SG_OPEN_ONLY_TO_AUTHORIZED_PORTS'
}
}
]
for rule in rules:
try:
self.config.put_config_rule(ConfigRule=rule)
print(f"✓ Config rule enabled: {rule['ConfigRuleName']}")
except Exception as e:
print(f"✗ Error enabling {rule['ConfigRuleName']}: {str(e)}")
def check_compliance_status(self):
"""檢查所有資源的合規性狀態"""
try:
response = self.config.describe_compliance_by_config_rule()
compliance_summary = {
'compliant': 0,
'non_compliant': 0,
'not_applicable': 0
}
print("\n=== Compliance Status ===")
for rule in response['ComplianceByConfigRules']:
rule_name = rule['ConfigRuleName']
compliance = rule.get('Compliance', {})
status = compliance.get('ComplianceType', 'UNKNOWN')
if status == 'COMPLIANT':
compliance_summary['compliant'] += 1
print(f"✓ {rule_name}: COMPLIANT")
elif status == 'NON_COMPLIANT':
compliance_summary['non_compliant'] += 1
print(f"✗ {rule_name}: NON_COMPLIANT")
else:
compliance_summary['not_applicable'] += 1
print(f"- {rule_name}: {status}")
print(f"\nSummary:")
print(f" Compliant: {compliance_summary['compliant']}")
print(f" Non-Compliant: {compliance_summary['non_compliant']}")
print(f" Not Applicable: {compliance_summary['not_applicable']}")
return compliance_summary
except Exception as e:
print(f"Error checking compliance: {str(e)}")
return None
def enable_security_hub(self):
"""啟用 AWS Security Hub 進行集中安全監控"""
try:
self.securityhub.enable_security_hub(
EnableDefaultStandards=True
)
# 啟用特定標準
standards = [
'arn:aws:securityhub:ap-northeast-1::standards/aws-foundational-security-best-practices/v/1.0.0',
'arn:aws:securityhub:ap-northeast-1::standards/cis-aws-foundations-benchmark/v/1.2.0'
]
for standard in standards:
self.securityhub.batch_enable_standards(
StandardsSubscriptionRequests=[
{'StandardsArn': standard}
]
)
print("✓ Security Hub enabled with standards")
return True
except Exception as e:
print(f"Error enabling Security Hub: {str(e)}")
return False
import re
import hashlib
class DataMasking:
def __init__(self):
self.pii_patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
'taiwan_id': r'\b[A-Z][12]\d{8}\b'
}
def detect_pii(self, text):
"""偵測文本中的個人識別資訊"""
detected = {}
for pii_type, pattern in self.pii_patterns.items():
matches = re.findall(pattern, text)
if matches:
detected[pii_type] = matches
return detected
def mask_pii(self, text, mask_char='*'):
"""遮罩文本中的 PII"""
masked_text = text
# Email 遮罩
masked_text = re.sub(
self.pii_patterns['email'],
lambda m: m.group(0).split('@')[0][:2] + '***@' + m.group(0).split('@')[1],
masked_text
)
# 電話號碼遮罩
masked_text = re.sub(
self.pii_patterns['phone'],
lambda m: '***-***-' + m.group(0)[-4:],
masked_text
)
# 信用卡號遮罩
masked_text = re.sub(
self.pii_patterns['credit_card'],
lambda m: mask_char * 12 + m.group(0)[-4:],
masked_text
)
# 台灣身分證號遮罩
masked_text = re.sub(
self.pii_patterns['taiwan_id'],
lambda m: m.group(0)[0] + mask_char * 8 + m.group(0)[-1],
masked_text
)
return masked_text
def anonymize_with_hash(self, text):
"""使用雜湊進行匿名化"""
detected_pii = self.detect_pii(text)
anonymized_text = text
for pii_type, matches in detected_pii.items():
for match in matches:
# 使用 SHA-256 雜湊
hashed = hashlib.sha256(match.encode()).hexdigest()[:16]
anonymized_text = anonymized_text.replace(
match,
f"[{pii_type.upper()}:{hashed}]"
)
return anonymized_text
def safe_logging(self, message):
"""安全的日誌記錄 - 自動遮罩 PII"""
masked_message = self.mask_pii(message)
print(f"[SAFE LOG] {masked_message}")
return masked_message
# 使用範例
masking = DataMasking()
# 測試文本
test_text = """
客戶資訊:
姓名:王小明
Email: wang.xiaoming@example.com
電話:0912-345-678
身分證字號:A123456789
"""
print("原始文本:")
print(test_text)
print("\n偵測到的 PII:")
detected = masking.detect_pii(test_text)
for pii_type, matches in detected.items():
print(f" {pii_type}: {matches}")
print("\n遮罩後的文本:")
masked = masking.mask_pii(test_text)
print(masked)
print("\n匿名化後的文本:")
anonymized = masking.anonymize_with_hash(test_text)
print(anonymized)
目前先探討到此