Redis主要是提供高性能快取、且有多數據結構支持和持久化。
假設我要顯示我收到的訊息數,可以用迴圈來讓redis計數。
from confluent_kafka import Consumer, KafkaError
import redis
# Kafka消費者配置
kafka_config = {
'bootstrap.servers': 'localhost:9092', # Kafka broker地址
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
}
# Redis連接
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
# Kafka主題訂閱
kafka_topic = 'your_topic' # 替換成您的Kafka主題
# 創建Kafka
consumer = Consumer(kafka_config)
consumer.subscribe([kafka_topic])
# 處理Kafka消息並更新Redis計數
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('Reached end of partition')
else:
print('Error: {}'.format(msg.error()))
else:
# 增加Redis中的計數
redis_client.incr('message_count')
print('Received message: {}'.format(msg.value()))