編程學習網 > 編程語言 > Python > Python操作Kafka教程,數據連接你我他
2023
07-20

Python操作Kafka教程,數據連接你我他

Kafka是最初由Linkedin公司開發,是一個分布式、分區的、多副本的、多訂閱者,基于zookeeper協調的分布式日志系統(也可以當做MQ系統),常見可以用于web/nginx日志、訪問日志,消息服務等等。今天給大家帶來用Python操作Kafka的知識介紹。


Kafka的使用場景
日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
消息系統:解耦和生產者和消費者、緩存消息等。
用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
流式處理:比如spark streaming和storm。
Kafka的基本術語概念
Topic:一組消息數據的標記符;
Producer:生產者,用于生產數據,可將生產后的消息送入指定的Topic;
Consumer:消費者,獲取數據,可消費指定的Topic;
Group:消費者組,同一個group可以有多個消費者,一條消息在一個group中,只會被一個消費者獲??;
Partition:分區,為了保證kafka的吞吐量,一個Topic可以設置多個分區。同一分區只能被一個消費者訂閱。
Python操作Kafka
安裝kafka-python

pip install kafka-python
生產者和消費者的簡易Demo

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json


def producer_demo():
    # 假設生產的消息為鍵值對(不是一定要鍵值對),且序列化方式為json
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'], 
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode())
    # 發送三條消息
    for i in range(0, 3):
        future = producer.send(
            'kafka_demo',
            key='count_num',  # 同一個key值,會被送至同一個分區
            value=str(i),
            partition=1)  # 向分區1發送消息
        print("send {}".format(str(i)))
        try:
            future.get(timeout=10) # 監控是否發送成功           
        except kafka_errors:  # 發送失敗拋出kafka_errors
            traceback.format_exc()


def consumer_demo():
    consumer = KafkaConsumer(
        'kafka_demo', 
        bootstrap_servers=':9092',
        group_id='test'
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
            )
        )
重要參數詳解
group_id
高并發量,則需要有多個消費者協作,消費進度,則由group_id統一。例如消費者A與消費者B,在初始化時使用同一個group_id。在進行消費時,一條消息被消費者A消費后,在kafka中會被標記,這條消息不會再被B消費(前提是A消費后正確commit)。當一個新的group_id產生時,默認offset=latest。

key_deserializer, value_deserializer
自動解析序列化

auto_offset_reset
消費者啟動的時刻,消息隊列中或許已經有堆積的未消費消息,有時候需求是從上一次未消費的位置開始讀(則該參數設置為earliest),有時候的需求為從當前時刻開始讀之后產生的,之前產生的數據不再消費(則該參數設置為latest)。

enable_auto_commit, auto_commit_interval_ms
是否自動commit,當前消費者消費完該數據后,需要commit,才可以將消費完的信息傳回消息隊列的控制中心。enable_auto_commit設置為True后,消費者將自動commit,并且兩次commit的時間間隔為auto_commit_interval_ms。

def consumer_demo():
    consumer = KafkaConsumer(
        'kafka_demo', 
        bootstrap_servers=':9092',
        group_id='test',
        enable_auto_commit=False
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
            )
        )
        consumer.commit()
總結

今天給大家帶來的是Python操作Kafka,Kafka作為最重要的消息中間件之一,使用頻率很高,通過Python去操作Kafka除了解決業務問題,也可以方便編寫一些測試小工具來對業務進行調試。

以上就是Python操作Kafka教程,數據連接你我他的詳細內容,想要了解更多Python教程歡迎持續關注編程學習網。

掃碼二維碼 獲取免費視頻學習資料

Python編程學習

查 看2022高級編程視頻教程免費獲取