博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
学习kafka教程(一)
阅读量:6622 次
发布时间:2019-06-25

本文共 3323 字,大约阅读时间需要 11 分钟。

简介

kafka是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点,并已在成千上万家公司运行。

kafka

目标

  • 了解kafka的基本原理
  • 掌握kafka的基本操作
  • kafka的深度探究在另一篇文章。

相关概念

producer:生产者,就是它来生产“叉烧包”的饭堂阿姨。

consumer:消费者,生产出来的“叉烧包”它来消费。
topic:你把它理解为标签,生产者每生产出来一个叉烧包就贴上一个标签(topic),消费者可不是谁生产的“叉烧包”都吃的,这样不同的生产者生产出来的“叉烧包”,消费者就可以选择性的“吃”了。
broker:就是蒸笼了。

所以整个过程可以如下形象的说明:

饭堂阿姨制作一个叉烧包,消费者就消费一个叉烧包。
1.假设消费者消费叉烧包的时候噎住了(系统宕机了),生产者还在生产叉烧包,那新生产的叉烧包就丢失了。
2.再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个叉烧包,消费者1秒钟只能吃50个叉烧包,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”叉烧包“又丢失了。
3.这个时候我们放个篮子在它们中间,生产出来的叉烧包都放到篮子里,消费者去篮子里拿叉烧包,这样叉烧包就不会丢失了,都在篮子里,而这个篮子就是”kafka“。
4.叉烧包其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是tcp、http什么的),也称为报文,也叫“消息”。
5.消息队列满了,其实就是篮子满了,”叉烧包“ 放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。
所以说
kafka == 篮子

安装

1.由于kafka需要zookeeper的。所以您可以参考【】

2.kafka安装
2.1下载地址:
2.2 配置:
(注:KAFKA_HOME为你配置的环境变量。hadoop01为你配置hosts)
编辑$KAFKA_HOME/config/下的server.properties文件
server.properties

broker.id=0#listeners=PLAINTEXT://:9092log.dirs=/root/app/tmp/kafkalognum.partitions=1zookeeper.connect=hadoop01:2181

2.3 多broker的kafka安装配置

config/server-1.properties:    broker.id=1    listeners=PLAINTEXT://:9093    log.dir=/tmp/kafka-logs-1
config/server-2.properties:   broker.id=2   listeners=PLAINTEXT://:9094   log.dir=/tmp/kafka-logs-2

常用操作命令

启动kafka

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

创建topic

bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic hello_topic

查看topic

./kafka-topics.sh --list --zookeeper hadoop01:2181

查看指定topic的详细信息

kafka-topics.sh --describe --zookeeper hadoop01:2181

生产消息

./kafka-console-producer.sh --broker-list hadoop01:9092 --topic hello_topic

消费消息

./kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic hello_topic --from-beginning

0.9.0版本的用下面的命令

./kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic hello_topic --from-beginning

解析:--from-beginning:是从producer开始的位置开始拿数据的。

Springboot操作kafka

特别注意(巨坑):kafka有很多版本的。各版本对应使用的springboot或者jar是不一样。请参考spring官网的说明:

本文使用的是springboot1.5系列+0.10.0.x的

pom.xml

org.springframework.kafka
spring-kafka
1.0.5.RELEASE

生产者代码

主要是向kafka服务发送消息(生产消息)。

/** * 测试kafka生产者 */@RestController@RequestMapping("kafka")public class TestKafkaProducerController {    @Autowired    private KafkaTemplate
kafkaTemplate; @RequestMapping("send") public String send(String msg){ kafkaTemplate.send("hello_topic", msg); return "success"; }}

消费者代码

从主题(topic)中获取消息进行消费。

/** * kafka消费者测试 */@Componentpublic class TestConsumer {    @KafkaListener(topics = "hello_topic")    public void listen (ConsumerRecord
record) throws Exception { System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value()); }}

yml配置文件

主要是配置kafka的服务地址。

spring:  kafka:    bootstrap-servers: 120.79.xxx.x:9092    producer:      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      group-id: test      enable-auto-commit: true      auto-commit-interval: 1000      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

最后

本人水平有限,欢迎各位建议以及指正。顺便关注一下公众号呗,会经常更新文章的哦。

n平方

转载地址:http://iajpo.baihongyu.com/

你可能感兴趣的文章
测试妹子的呐喊:为什么总是收不到推送?
查看>>
linux NFS
查看>>
Jquery DataTable基本使用
查看>>
New UWP Community Toolkit
查看>>
JDBC连接数据库(二)
查看>>
leetcode 674. Longest Continuous Increasing Subsequence
查看>>
Extensions in UWP Community Toolkit - SurfaceDialTextbox
查看>>
Golang 语言的单元测试和性能测试(也叫 压力测试)
查看>>
springboot数据库连接池使用策略
查看>>
Java中CAS详解
查看>>
Java线程的学习_线程池
查看>>
Android 虚拟导航挡住应用底部解决方案(屏幕底部的三个按键)
查看>>
工厂函数
查看>>
Java Spring MVC 错误 及 常见问题 总结
查看>>
Linux系统实战项目——sudo日志审计
查看>>
native.js是什么且如何使用
查看>>
Android Application Task Activities的关系
查看>>
浅谈CSS盒子模型
查看>>
实现iFrame自适应高度,原来很简单!
查看>>
get app id
查看>>