Kafka集群搭建

Kafka集群搭建手记

简介 Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。 入门请参照:https://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html 在此不再赘述。

zookeeper集群搭建

这部分不是本文的重点,但是kafka需要用到kafka集群,所以先搭建kafka集群。 从kafka官方文档看到,kafka似乎在未来的版本希望抛弃zookeep集群,自己维护集群的一致性,拭目以待吧。 我们搭建集群使用的是三台亦庄的S1,因为zookeeper不怎么占资源也不怎么占空间(我们的业务目前比较简单),所以三台S1上都搭建了zookeeper集群。 搭建zookeeper集群没什么难度,参考文档:http://www.cnblogs.com/huangxincheng/p/5654170.html 下面列一下我的配置并解析:

集群规划

一共用三台亦庄的S1,搭建一个Kafka集群。 这三个机器是10.1.112.57, 10.1.112.58, 10.1.112.59 每台服务器的硬盘划分都是一样的,每个独立的物理磁盘挂在一个单独的分区里面,这样很方便用于Kafka多个partition的数据读写与冗余。 /data1比较小,为了不成为集群的瓶颈,所以/data1用于存放kafka以及Zookeeper 每台机器的磁盘分布如下:

kafka集群配置 下面是kafka的简单配置,三台服务器都一样,如有不一致的在下文有说明。 kafka安装在目录/usr/local/kafka/下,下面的说明以10.1.112.57为例。

最重要的配置文件server.properties,需要配置的信息如下:

# broker.id,同一个集群里面,每一个服务器都需要唯一的一个ID,非负整数
broker.id=57

# listeners,监听客户端请求的IP和端口,默认都是9092
listeners=PLAINTEXT://10.1.112.57:9092

# 下面两个,最多的网络线程数,做多的磁盘IO线程数,根据业务量决定
# 我考虑到业务量并不大,而且一台S1有24个逻辑CPU,网络和磁盘IO线程都是IO密集型,所以我觉得线程数可以设置成CPU数目的2-3倍
# 现在都设成30个线程(最多),反正线程多了也是睡眠,对系统性能影响不大,应该可以满足相当长时间的业务增长了。
num.network.threads=30
num.io.threads=30

# socket收发的缓冲区大小
# 适当设置大一点,S1有32G内存,只用在一个没多少数据的zookeeper和kafka,不需要太节省,设置大一点
socket.receive.buffer.bytes=809600
socket.request.max.bytes=104857600

# 数据到底保存在哪里?
# 这里需要给出一个list,数据放在这些分区(目录)中
# 多个topic,多个拷贝的情况下,kafka会自动做LB
log.dirs=/data3/kafka,/data4/kafka,/data5/kafka,/data6/kafka,/data7/kafka,/data8/kafka,/data9/kafka,/data10/kafka,/data11/kafka,/data12/kafka

# 【很重要】默认的partition数目
# 如果新建topic,没有配置的话默认的partition数。
# 在之前的版本,我配置成2,于是踩坑了:
# 我在没有新建topic并明确指定partition数目的情况下,给kafka生产了一路数据,于是硬生生塞满了一个磁盘分区,其他分区丝毫未动。
# 在这么多磁盘分区的情况下,默认partition可以多一点,LB的需要。
# 不过新建topic的时候,指定分区数目是一个好习惯。
# kafka生产者在不需要新建topic的情况下可以kafka写数据,这个未新建的topic,zookeeper是不知道的。
# 此坑到此一游
num.partitions=10

# 多少条消息会flush一次缓存到磁盘,多久flush一次到磁盘
# 这两个参数关系到性能与安全性,也就是,太频繁刷新数据磁盘会影响性能,长时间不刷数据到磁盘会担心kafka挂掉丢数据,云云,这些教科书上讲得多了
log.flush.interval.messages=10000
log.flush.interval.ms=1000

#【重要】数据保存时间
# kafka很好用,可以自动帮你删除历史数据,这里大概保存4天
log.retention.hours=100

# 【重要】文件分段的大小
# kafka的消息保存在磁盘,那么问题来了:每一个文件需要多大呢?
# 文件太小,导致文件数太多而inode太大,文件太大,影响并发度(可能)
# 设为1GB
log.segment.bytes=1073741824

# 上面说了历史数据保存的时间是4天,那么多长时间删除一次历史数据呢?
# 这里设成10分钟左右
# 保存数据都是以小时计算的,所以10分钟检测一次就够了
# 因为以lvs的数据速度而言,10分钟可以写满一个1GB的文件,1GB是文件分段(log.segment.bytes)
log.retention.check.interval.ms=600000

# zookeeper服务器地址,用逗号分开,最末的是目录,放在根目录下一个叫kafka的子目录下。
# 在搭建好集群的时候,下面会列出这个目录下有什么东西
zookeeper.connect=10.1.112.57:2181,10.1.112.58:2181,10.1.112.59:2181/kafka

# zk连接超时
# zookeeper.connection.timeout.ms=6000

kafka集群的启动 从上面的配置看到,kafka集群不需要像hadoop集群那样,配置ssh通讯,而且一个kafka服务器(官方文档称之为broker,下面统一使用这个称呼)并不知道其他的kafka服务器的存在,因此你需要逐个broker去启动kafka。各个broker根据自己的配置,会自动去配置文件上的zk服务器报到,这就是一个有zk服务器粘合起来的kafka集群。 我写了一个启动脚本,放在/usr/local/kafka/bin下面。启动脚本每个broker都一样:

KAFKA_HOME="/usr/local/kafka"

path="."
cd ${KAFKA_HOME}/libs
for f in *jar
do
path="${path}:`pwd`/$f"
done
export CLASSPATH=$path

cd ${KAFKA_HOME}/bin  && ( sh kafka-server-start.sh ${KAFKA_HOME}/config/server.properties > ${KAFKA_HOME}/logs/start.log 2>&1 & )
**kafka集群的关闭**
如同kafka集群里面每一个broker都需要单独启动一样,kafka集群里面每一个broker都需要单独关闭。
官方给出的关闭脚本是单独运行 bin/kafka-server-stop.sh
但是我运行的结果是无法关闭。打开脚本一看,才发现是最简单的办法,发一个TERM信号到kafka的java进程,官方脚本给出的grep有点问题。
发信号之后,一直tail着kafka日志,看到正常关闭。

新建topic

./bin/kafka-topics.sh --zookeeper 10.1.112.57:2181,10.1.112.58:2181,10.1.112.59:2181/kafka --create --topic LvsKafka --replication-factor 2 --partitions 24 

指定zookeeper服务器,topic名称是LvsKafka(注意topic名称不能有英文句号(.)和下划线(_),否则会通不过,理由是名称会冲突,下文对此略有解析) replication-factor指出重复因子是2,也就是每条数据有两个拷贝,可靠性考虑。 partitions 指出需要多少个partition,数据量大的多一点,无论生产和消费,这是负载均衡和高并发的需要。

查看新的topic

z=10.1.112.57:2181,10.1.112.58:2181,10.1.112.59:2181/kafka
/usr/local/kafka/bin/kafka-topics.sh --zookeeper $z --describe

可以看到刚才新建的24个partition,比如partition 5, 他的leader是broker 59,也就是10.1.112.59这台机器。 建立topic时我们指出需要2个拷贝,从上面的输出的Replicas字段看到,这两个拷贝放在59,58两个机器,也就是10.1.112.59和10.1.112.58. Isr估计是【Is Service?】的意思,表示当前partition的所有拷贝所在的机器中,哪些是还活着(可以提供服务)的。现在是59和58都还存活。

这个命令另外还会看到一些类似于下面的内容: consumer_offsets到底是什么呢?其实就是客户端的消费进度,客户端会定时上报到kafka集群,而kafka集群会把每个客户端的消费进度放入一个自己内部的topic中,这个topic就是__consumer_offsets。我查看过consumer_offsets的内容,其实就是每个客户端的消费进度作为一条消息,放入__consumer_offsets这个topic中。 这里给了我们两个提示: 1、kafka自己管理客户端的消费进度,而不是依靠zk,这就是kafka官方文档说的kafka未来会抛弃zk的底气之一; 2、留意到这个kafka自己的topic是带下划线的,也就是,kafka担心我们自己建的topic如果带下划线的话会跟这些内部自用的topic冲突;

看看kafka在zk里面保存了什么

转到/usr/local/zookeeper/bin,执行sh zkCli.sh

看看kafka往磁盘里面写了什么

随便打开一个目录,如/data3/kafka/LvsKafka-20,这里保存了LvsKafka这个topic的第20个partition

不难看出,log文件是数据存放的文件,每个1GB大小(如上所述) index文件应该是索引文件。 不是一般性,打开文件00000000000021148927.log,文件名表示这个文件是从offset=21148927开始的。

因为我们的消息没有加密,所以还是比较容易看出来一些字段的,其他的不清楚。这部分找不到资料。 00000000000021148927.index文件看不出来是什么,估计是每个offset在文件中的偏移量对应,因为客户端有可能从任何一个offset开始消费。