https://blog.csdn.net/qq_41262248/article/details/80790918

kafka简介(摘自百度百科)

一、简介:
详见:https://blog.csdn.net/Beyond_F4/article/details/80310507

二、安装
详见博客:https://blog.csdn.net/beyond_f4/article/details/80095689

三、按照官网的样例,先跑一个应用
1、生产者:
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=[‘172.21.10.136:9092’])  #此处ip可以是多个[‘0.0.0.1:9092′,’0.0.0.2:9092′,’0.0.0.3:9092’ ]

for i in range(3):
msg = “msg%d” % i
producer.send(‘test’, msg)
producer.close()

2、消费者(简单demo):
from kafka import KafkaConsumer

consumer = KafkaConsumer(‘test’,
bootstrap_servers=[‘172.21.10.136:9092’])

for message in consumer:
print (“%s:%d:%d: key=%s value=%s” % (message.topic, message.partition,
message.offset, message.key,
message.value))

启动后生产者、消费者可以正常消费。

3、消费者(消费群组)
from kafka import KafkaConsumer

consumer = KafkaConsumer(‘test’,
group_id=’my-group’,
bootstrap_servers=[‘172.21.10.136:9092’])

for message in consumer:
print (“%s:%d:%d: key=%s value=%s” % (message.topic, message.partition,
message.offset, message.key,
message.value))

启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力

4、消费者(读取目前最早可读的消息)
from kafka import KafkaConsumer

consumer = KafkaConsumer(‘test’,
auto_offset_reset=’earliest’,
bootstrap_servers=[‘172.21.10.136:9092’])

for message in consumer:
print (“%s:%d:%d: key=%s value=%s” % (message.topic, message.partition,
message.offset, message.key,
message.value))

auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
源码定义:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}

5、消费者(手动设置偏移量)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(‘test’,
bootstrap_servers=[‘172.21.10.136:9092′])

print consumer.partitions_for_topic(“test”)  #获取test主题的分区信息
print consumer.topics()  #获取主题列表
print consumer.subscription()  #获取当前消费者订阅的主题
print consumer.assignment()  #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=u’test’, partition=0), 5)  #重置偏移量,从第5个偏移量消费
for message in consumer:
print (“%s:%d:%d: key=%s value=%s” % (message.topic, message.partition,
message.offset, message.key,
message.value))

6、消费者(订阅多个主题)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers=[‘172.21.10.136:9092’])
consumer.subscribe(topics=(‘test’,’test0′))  #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic=u’test’, partition=0)) #获取当前主题的最新偏移量
for message in consumer:
print (“%s:%d:%d: key=%s value=%s” % (message.topic, message.partition,
message.offset, message.key,
message.value))

7、消费者(手动拉取消息)
from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(bootstrap_servers=[‘172.21.10.136:9092’])
consumer.subscribe(topics=(‘test’,’test0′))
while True:
msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
print msg
time.sleep(1)

8、消费者(消息挂起与恢复)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

consumer = KafkaConsumer(bootstrap_servers=[‘172.21.10.136:9092’])
consumer.subscribe(topics=(‘test’))
consumer.topics()
consumer.pause(TopicPartition(topic=u’test’, partition=0))
num = 0
while True:
print num
print consumer.paused()   #获取当前挂起的消费者
msg = consumer.poll(timeout_ms=5)
print msg
time.sleep(2)
num = num + 1
if num == 10:
print “resume…”
consumer.resume(TopicPartition(topic=u’test’, partition=0))
print “resume……”
pause执行后,consumer不能读取,直到调用resume后恢复。

#######################

转自:https://blog.csdn.net/Beyond_F4/article/details/80310340

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注