Kafka REST Proxy
一、业务场景需求
某公司A,需要把游戏日志传给另外一家公司B。架构大致是,游戏服务器把日志传到kafka消息队列,kafka在到es进行数据分析。由于服务器是公司B自己提供,公司A就需要提供一个外网接口让对方把数据写到云ukafka。 由于ukafka只有内网IP,zookeeper中只能注册一个地址,通过nginx代理或者ssh端口转发都不能做到外网正常使用,除非节点本身有外网IP。这个时候就需要用到Kafka REST Proxy。
二、Kafka REST Proxy 介绍
Kafka REST代理为Kafka集群提供RESTful接口。它可以轻松生成和使用消息,查看集群状态,以及在不使用本机Kafka协议或客户端的情况下执行管理操作。用例示例包括从任何语言构建的任何前端应用程序向Kafka报告数据,将消息提取到尚不支持Kafka的流处理框架,以及脚本管理操作。
kafka rest proxy 是rest api接口,通过这个代理把数据转发到kafka的。
主要有两部分组成- Schema Registry提供元数据的存储和解析。 - Producer的序列化和Consumer的反序列化都会去Schema Registry读取对应的Schema
通过Kafka REST Proxy API接口做代理,把外网传输数据转发到内网kafka中。
相关资料1 https://github.com/confluentinc/kafka-rest
相关资料2 https://docs.confluent.io/current/kafka-rest/docs/index.html
三、安装
1、安装jdk1.8 以上
下载JDK
1
2 cd /usr/local/src/kafka
wget -c http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.rpm?AuthParam=1493105955_1f866324e85307d5f9b495e276577b05
安装JDK
1
2
3
4
5
6
7
8
9
10
11
12
13
14 # 检查jdk 是否安装
rpm -qa | grep jdk
# 安装jdk
cd /usr/local/src/kafka
rpm -ivh jdk-8u131-linux-x64.rpm
# 设置环境
cat >> /etc/profile<<"EOF"
export JAVA_HOME=/usr/java/jdk1.8.0_131
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
EOF
# 加载环境变量
. /etc/profile
2、添加相关源
vim /etc/yum.repos.d/confluent.repo
1
2
3
4
5
6
7
8
9
10
11
12
13 [Confluent.dist]
name=Confluent repository (dist)
baseurl=http://packages.confluent.io/rpm/3.1/6
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=http://packages.confluent.io/rpm/3.1
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
enabled=1
3、安装
1 | yum install confluent-kafka-rest confluent-schema-registry -y |
4、修改confluent-schema-registry 配置
vim /etc/schema-registry/schema-registry.properties
1
2
3
4 listeners=http://0.0.0.0:8080 ## 提供给kafka-rest 连接的端口
kafkastore.connection.url=10.10.41.3x:2181,10.10.187.3x:2181,10.10.87.5x:2181 ## 实际kafka的zookeeper
kafkastore.topic=_schemas
debug=false
启动
1 schema-registry-start /etc/schema-registry/schema-registry.properties
5、修改confluent-kafka-rest配置
vim /etc/kafka-rest/kafka-rest.properties
1
2
3
4
5 #id=kafka-rest-test-server
#port=9092 ## 自定义端口 默认 8082
schema.registry.url=http://localhost:8080 ## 上面schema 端口
zookeeper.connect=10.10.41.3x:2181,10.10.187.3x:2181,10.10.87.5x:2181
## 实际kafka的zookeeper
启动
1 kafka-rest-start /etc/kafka-rest/kafka-rest.properties
测试
发送消息
1
2
3 curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records":[{"value":{"foo":"bar2"}}]}' "http://x.x.x.x:8082/topics/opstest"
# 正常返回结果
{"offsets":[{"partition":0,"offset":5,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}
接收消息
1
2
3
4 ./kafka-console-consumer.sh --zookeeper 10.10.x.x:2181 --from-beginning --topic opstest
# 结果
{"foo":"bar"}
{"foo":"bar2"}