Kafka

消息队列

交互模型

image-20230808163355639

消息队列两种模式

点对点模式

image-20230808220946268

点对点模式特点:

  • 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)

  • 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;

  • 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

发布订阅模式

image-20230808221052743

发布/订阅模式特点:

  • 每个消息可以有多个订阅者;

  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。

  • 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;

Kafka

简介

image-20230808221349788

Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写

Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:

  1. 发布和订阅流数据流,类似于消息队列或者是企业消息传递系统

  2. 以容错的持久化方式存储数据流

  3. 处理数据流

image-20230808221634603

上图,我们可以看到:

  1. Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。

  2. Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。

  3. Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到

数据库中。

  1. Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。

应用场景

  • 消息队列
  • 流量削峰
  • 日志处理
  • 异步任务
  • 流处理

为什么选择Kafka

  • 多生产者、多消费者
  • 基于磁盘存储,磁盘持久化
  • 高伸缩性
  • 高性能

搭建Kafka环境

介绍

搭建Kafka必须需要JDK 、Zookeeper 和 Kafka

1、安装jdk

可以直接一键云安装jdk、也可以自己下载安装包进行安装、此处为了方便,采用云安装方式

1
yum install -y java-1.8.0-openjdk.x86_64

2、 安装Zookeeper

  • 下载Zookeeper安装包

    官网 https://zookeeper.apache.org/

    直接使用wegt命令进行下载

    1
    wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
  • 解压ZK

    1
    tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz 
  • 修改配置文件

    由于服务启动的时候默认回去读conf下的zoo.cfg配置文件,如果没有直接会报错!
    刚下载的zookeeper的conf目录下是没有zoo.cfg,但是给我们提供了zoo_sample.cfg(模板配置文件)

    记得修改指定Data文件位置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    # zookeeper时间配置中的基本单位 (毫秒)
    tickTime=2000

    # 允许follower初始化连接到leader最⼤时⻓,它表示tickTime时间倍数
    # 即:initLimit*tickTime
    initLimit=10

    # 允许follower与leader数据同步最⼤时⻓,它表示tickTime时间倍数
    syncLimit=5

    #zookeper 数据存储⽬录及⽇志保存⽬录(如果没有指明dataLogDir,则⽇志也保存在这个⽂件中)
    dataDir=/tmp/zookeeper

    #对客户端提供的端⼝号
    clientPort=2181

    #单个客户端与zookeeper最⼤并发连接数
    maxClientCnxns=60

    # 保存的数据快照数量,之外的将会被清除
    autopurge.snapRetainCount=3

    #⾃动触发清除任务时间间隔,⼩时为单位。默认为0,表示不⾃动清除。
    autopurge.purgeInterval=1
  • 配置环境变量

    在/etc/profile目录下面进行环境变量的配置, 直接追加到结尾也可以不进行配置,在bin目录下面执行命令即可

    1
    2
    3
    #zookeeper
    export ZOOKEEPER_HOME=/opt/module/zookeeper-3.7.1
    export PATH=$PATH:${ZOOKEEPER_HOME}/bin

    编辑完成之后记得执行以下命令,使其立即生效

    1
    source /etc/profile

    ZK命令

    1
    2
    3
    zkServer.sh start #启动
    zkServer.sh status #查看状态
    zkServer.sh stop #停止

    启动zkCli.sh客户端

    zkCli.sh可以理解成客户端,也可以理解成命令行工具,把命令交给他,让他和zk的服务端打交道。
    类似于mysql,我们安装完mysql想要执行命令,那么就必须要通过mysql -u账号 -p密码进入命令行工具里面,才能执行sql。

    1
    2
    zkCli.sh #启动客户端
    ls / #查询节点

    image-20230808223612688

3、安装Kafka

  • 上传Kafka安装包并且解压

    1
    tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/
  • 修改 server.properties

1
2
3
4
5
6
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/export/server/kafka_2.12-2.4.1/data
# 配置zk的三个节点
zookeeper.connect=zookeeperIP:2181
  • 配置Kafka环境变量

    1
    2
    3
    #环境变量
    export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
    export PATH=:$PATH:${KAFKA_HOME}
  • 启动Kafka并且检查是否成功

    1
    2
    kafka-server-start.sh -daemon ../config/server.properties #启动
    ps -aux | grep server.properties #查看是否成功

    image-20230808224602971

Kafka基础概念

概念一:生产者与消费者

​ 对于 Kafka 来说客户端有两种基本类型:生产者(Producer)消费者(Consumer)。除此之外,还有用来做数据集成的 Kafka Connect API 和流式处理的 Kafka Streams 等高阶客户端,但这些高阶客户端底层仍然是生产者和消费者API,它们只不过是在上层做了封装。

​ 这很容易理解,生产者(也称为发布者)创建消息,而消费者(也称为订阅者)负责消费or读取消息。

image-20230808225802784

概念二:主题(Topic)与分区(Partition)

​ 在 Kafka 中,消息以主题(Topic)来分类,每一个主题都对应一个「消息队列」,这有点儿类似于数据库中的表。但是如果我们把所有同类的消息都塞入到一个“中心”队列中,势必缺少可伸缩性,无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储。

​ 我们使用一个生活中的例子来说明:现在 A 城市生产的某商品需要运输到 B 城市,走的是公路,那么单通道的高速公路不论是在「A 城市商品增多」还是「现在 C 城市也要往 B 城市运输东西」这样的情况下都会出现「吞吐量不足」的问题。所以我们现在引入分区(Partition)的概念,类似“允许多修几条道”的方式对我们的主题完成了水平扩展。

image-20230808225837382

概念三:Broker 和集群(Cluster)

​ 一个 Kafka 服务器也称为 Broker,它接受生产者发送的消息并存入磁盘;Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。使用特定的机器硬件,一个 Broker 每秒可以处理成千上万的分区和百万量级的消息。

​ 若干个 Broker 组成一个集群(Cluster),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。下图是一个样例:

image-20230808230011124

​ Kafka 的一个关键性质是日志保留(retention),我们可以配置主题的消息保留策略,譬如只保留一段时间的日志或者只保留特定大小的日志。当超过这些限制时,老的消息会被删除。我们也可以针对某个主题单独设置消息过期策略,这样对于不同应用可以实现个性化。

概念四:多集群

随着业务发展,我们往往需要多集群,通常处于下面几个原因:

  • 基于数据的隔离;
  • 基于安全的隔离;
  • 多数据中心(容灾)

当构建多个数据中心时,往往需要实现消息互通。举个例子,假如用户修改了个人资料,那么后续的请求无论被哪个数据中心处理,这个更新需要反映出来。又或者,多个数据中心的数据需要汇总到一个总控中心来做数据分析。

上面说的分区复制冗余机制只适用于同一个 Kafka 集群内部,对于多个 Kafka 集群消息同步可以使用 Kafka 提供的 MirrorMaker 工具。本质上来说,MirrorMaker 只是一个 Kafka 消费者和生产者,并使用一个队列连接起来而已。它从一个集群中消费消息,然后往另一个集群生产消息。