简介

  • Kafka 是一种高吞吐量的分布式发布订阅消息系统

    kafka角色必知

  • producer:生产者。
  • consumer:消费者。
  • topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分类, 每一类的消息称之为一个主题(Topic)
  • broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic), 并从Broker拉数据,从而消费这些已发布的消息。

    kafka安装和简单启动

  • 官方下载地址
  • 你的本地环境必须安装有Java 8+。
  • Apache Kafka2.8版本之后可以不需要使用ZooKeeper。
  • 加压即可无需编译安装。

    wget https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.12-3.1.0.tgz
    tar -xzf kafka_2.12-3.1.0.tgz
    cd kafka_2.12-3.1.0
    #Apache Kafka2.8版本之前需要使用ZooKeeper,启动zookeeper
    bin/zookeeper-server-start.sh config/zookeeper.properties
    #打开另一个命令终端启动kafka服务,启动完成Kafka已经可以使用了
    bin/kafka-server-start.sh config/server.properties &

    创建一个主题(topic)

    #启动kafka客户端,创建一个只有一个分区和一个备份名称为"test"的Topic,partitions分区参数、replication-factor参数
     #Apache Kafka2.8版本之前命令
     bin/kafka-topics.sh --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic test
     #Apache Kafka2.8版本之后命令
     bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    
    #查看已创建的topic信息
    #Apache Kafka2.8版本之前命令
    bin/kafka-topics.sh --describe --zookeeper localhost:9092 --topic test
    #Apache Kafka2.8版本之后命令
    bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test 
    #显示所有话题
     #Apache Kafka2.8版本之前命令
     bin/kafka-topics.sh --list --zookeeper localhost:2181 test
     #Apache Kafka2.8版本之后命令
     bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --topic test
    #除了手工创建topic外,也可以配置你的broker,当发布一个不存在的topic时自动创建topic
     #设置自动创建topic时设置默认的分区和副本数(在server.properties中配置)
     # 自动创建主题
     auto.create.topics.enable=true
     # 默认主题的分区数
     num.partitions=8
     # 默认分区副本,default.replication.factor 默认分区副本数不得超过kafka节点数,一个节点放2份没意义,每个节点都需要配置,然后重启即可。
     default.replication.factor=3
  • 更多配置参考

    发送消息

    #启动一个生产者,每一行是一条消息,在控制台输入几条消息到服务器
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  #[等待输入自己的内容]
    #输入message1,换行输入message2,换行输入message3

    消费消息

    #启动一个消费者(等待消息)注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning   #[等待消息]
    #显示message1换行message2换行message3
Last modification:February 28, 2022
如果觉得我的文章对你有用,请随意赞赏