久久精品五月,日韩不卡视频在线观看,国产精品videossex久久发布 ,久久av综合

站長資訊網
最全最豐富的資訊網站

Kafka單機環(huán)境配置及基本使用詳解

基本概念介紹

在Kafka中有一些基本的概念,

Topic

  • 簡介:Topic在Kafka中是一個抽象的概念,一個主題是已經發(fā)布的記錄的種類。主題在Kafka中是可以被多重訂閱的,這就意味著一個主題可能有0個、一個、或者許多個消費者去訂閱這個主題中的消息。

  • Partitions:在每一個topic在Kafka中可以有多個分區(qū),增加一個主題的分區(qū)可以提高Kafka的吞吐率,但是不是越多越好,因為如果分區(qū)數(shù)量越多的話生產者插入的效率也會降低。所以真正到生產環(huán)境時,需要權衡生產與消費的一個平衡關系,消費稍微大于生產者,不會產生消息的堆積,也能夠充分提高Kafka的效率。

  • Replication Factor:復制因子,是對于當前的Topic是否需要副本。如果設置成1的話,代表當前Topic在整個Kafka中只有一份。這里有個限制Topic的數(shù)量不能夠多于當前Kafka的Broker數(shù)量。

  • 存儲方式:在Kafka的配置中(Server.properties)有l(wèi)ogs.dir的配置,這個是Kafka存儲消息的位置。如果Topic復制因子是1分區(qū)是1的話,在對應的文件夾下會有一個名稱為topicname的文件夾;如果復制因子是2分區(qū)是2,假設存在兩個Broker,在每個Broker中將會存在兩個文件夾分別為topicname_0 topicname_1的文件夾

  • Leader與Follower:由于每個topic如果存在副本的話,是對于partition進行復制。這么多存在在不同的Broker上的副本,其中有一個partition是leader其他的是Followers,當一個broker宕機會在副本中選擇一個充當Leader。

Producer

生產者,顧明思議是生產消息,允許應用發(fā)布一個流的消息到一個或者多個主題中,

Consumer

  • 簡介:消費者是訂閱某個topic消息。
  • Group:每個消費者都有個groupid 來標定當前消費者屬于哪個group。Group的作用是,當同一個group的兩個消費者訂閱一個topic的時候,如果當前topic沒有分區(qū)那么其中一個消費者是獲得不了任何消息的;如果有分區(qū)的話,將會按照數(shù)量進行負載均衡,每個消費者獲得不同的分區(qū)的消息。
  • 同一個Group下的消費者不會同時訂閱一個主題下的同一個分區(qū),如果消費者數(shù)量杜宇分區(qū)數(shù)量,則多出的消費者是不會有任何消息獲得的。

    Broker

Broker 是一個Kafka的Server,一臺單物理機或者集群都可以擁有多個broker一個broker可以容納多個主題,這個與復制因子、主題的分區(qū)都有關系。

Kafka單機配置,一個Broker

環(huán)境:

  • win10物理機
  • Wmare CentOS7虛擬機
  • XShell 訪問虛擬機

配置zookeeper

  • 下載
# zookeeper  wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
  • 解壓后進入目錄
cd zookeeper-3.4.13/conf
  • 復制zookeeper的配置文件
cp zoo_sample.cfg zoo.cfg   
  • 返回上級進入bin目錄下,鍵入如下命令
./zkServer.sh start 
  • 查看是否成功開啟zookeeper服務
#注:這里提示一下開啟后提示的成功不一定是真的成功,所以需要查看一下  netstat -tunlp|egrep 2181  # 如果沒有結果查看統(tǒng)計目錄下的 zookeeper.out文件 查看log信息  # 使用jps命令查看 QuorumPeerMain是zookeeper的守護進程  11089 QuorumPeerMain  11114 Jps

配置Kafka

  • 下載安裝包
# Kafka  wget http://mirror.bit.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
  • 解壓后進入文件夾下bin目錄下
# 第一個是start.sh位置第二個是server.rpoperties的位置,所以確認好路徑的正確性  ./kafka-server-start.sh ./../config/server.properties &  # 我們可以在Kafka的目錄下直接執(zhí)行,而不進入到bin下,命令看著更舒服些  ./bin/kafka-server-start.sh ./config/server.properties &
  • 查看是否開啟成功:默認的Kafka端口是9092,zookeeper是2181
netstat -tunlp|egrep "(2181|9092)"  # 結果如下  [root@localhost ~]# netstat -tunlp|egrep "(2181|9092)"  tcp6      0     0 :::9092               :::*                  LISTEN      1877/java  tcp6      0     0 :::2181               :::*                  LISTEN      1820/java  # jps 查看  11089 QuorumPeerMain  11458 Kafka  11847 Jps
  • 至此Kafka配置成功

使用Kafka

創(chuàng)建topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  # 返回結果  Created topic "test"

在虛擬機用sh腳本上作為生產者生產消息

  • 我們重新開一個Xshell窗口,CD到Kafka目錄/bin下,我們先介紹這一節(jié)會使用到的 kafka-console-producer.sh
# 鍵入如下命令  ./kafka-console-producer.sh --broker-list localhost:9092 --topic test  >today message  >  # 最近本的指定,broker-list與topic是必須的參數(shù)  # 成功命令行會進入一個>的情況,鍵入消息按回車鍵就是發(fā)送消息到Kafka了  # 發(fā)送一個【today message】
  • kafka-console-producer.sh參數(shù)說明,運行./kafka-console-producer.sh --help可查看

在虛擬機上用sh腳本作為消費者消費消息

  • 重新開另個一Xshell窗口CD到Kafka目錄/bin下,我們先介紹這一節(jié)會使用到的 kafka-console-consumer.sh
# 鍵入如下命令  ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning  # 最近本的指定,bootstrap-server與topic/whitelist是必須的參數(shù)  # 由于有 from-beginning 參數(shù) 會從頭load所有消息  # 消費后返回如下  today message  #在生產端鍵入消息后,消費端會同步消息出現(xiàn)
  • kafka-console-consumer.sh參數(shù)說明運行./kafka-console-consumer.sh --help可查看

使用Python作為生產者、消費者

  • 在物理機上寫一個Python生產者的腳本
from kafka.producer import KafkaProducer  import time  def send_data(data):      producer = KafkaProducer(bootstrap_servers='192.168.233.138:9092')      producer.send("test",b''+str(data)+'')      producer.flush()      print ("end")        if __name__=="__main__":      send_data("physics python message");
  • 查看Xshell上消費的命令行
[root@localhost ~]# /home/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.138:9092 --topic test --from-beginning  111  333    1  12  physics python message
  • 在物理機上寫一個消費者的腳本
from kafka import KafkaConsumer  import time  def get_data(data):      consumer = KafkaConsumer('test',bootstrap_servers='192.168.233.138:9092', group_id='my_favorite_group')      print ("end")      for msg in consumer:          print(msg)        if __name__=="__main__":      get_data();
  • 物理機消費者的結果
# 我這邊是先運行的消費者的腳本,所以實時接收到了物理機產生的消息  ConsumerRecord(topic=u'test', partition=0, offset=5, timestamp=1551762485911L, timestamp_type=0, key=None, value='physics python message', checksum=1520092583, serialized_key_size=-1, serialized_value_size=22)
  • 測試使用虛擬機sh端的生產者發(fā)送123 消息,查看物理機消費者結果
ConsumerRecord(topic=u'test', partition=0, offset=6, timestamp=1551762784609L, timestamp_type=0, key=None, value='123', checksum=1760815061, serialized_key_size=-1, serialized_value_size=3)
  • 幾點注意
# 物理機連接時可能出現(xiàn)【kafka.errors.NoBrokersAvailable: NoBrokersAvailable】這個錯誤按照如下順序依次更改  1. 查看虛擬機防火墻是否關閉      systemctl status firewalld      systemctl stop firewalld  2. 更改kafka服務端的server.properties:      增加 [ listeners=PLAINTEXT://192.168.233.138:9092 ]這一行  3. 修改物理機的hosts文件 C:WindowsSystem32driversetchosts      增加 【虛擬機ip 虛擬機主機名】 Eg:[192.168.233.138 localhost]

使用Springboot 作為生產者、消費者

注:我直接在我的一個寄存的Spring Boot Demo項目上更改

  • 在pom.xml中添加kafka依賴
 <dependency>   <groupId>org.springframework.kafka</groupId>   <artifactId>spring-kafka</artifactId>   </dependency>  <!-- 提示一件事情此處別指定version了,直接用最新的就可以,老的版本一些包找不到 -->
  • 寫一個kafka 生產者配置類
package com.example.kane.config;    import java.util.HashMap;  import java.util.Map;  import java.util.regex.Pattern;    import org.apache.kafka.clients.consumer.ConsumerConfig;  import org.apache.kafka.clients.producer.ProducerConfig;  import org.apache.kafka.common.serialization.StringDeserializer;  import org.apache.kafka.common.serialization.StringSerializer;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;  import org.springframework.kafka.annotation.EnableKafka;  import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;  import org.springframework.kafka.core.ConsumerFactory;  import org.springframework.kafka.core.DefaultKafkaConsumerFactory;  import org.springframework.kafka.core.DefaultKafkaProducerFactory;  import org.springframework.kafka.core.KafkaTemplate;  import org.springframework.kafka.core.ProducerFactory;    @Configuration  @EnableKafka  public class kafka_config {       public Map<String, Object> producerConfigs() {              Map<String, Object> props = new HashMap<>();              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.138:9092");              props.put(ProducerConfig.RETRIES_CONFIG, 0);              props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);              props.put(ProducerConfig.LINGER_MS_CONFIG, 1);              props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);              props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);              props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);              return props;          }                 public ProducerFactory<String, String> producerFactory() {              return new DefaultKafkaProducerFactory<>(producerConfigs());          }                 @Bean          public KafkaTemplate<String, String> kafkaTemplate() {              return new KafkaTemplate<String, String>(producerFactory());          }    }
  • 創(chuàng)建一個生產數(shù)據(jù)的Controller
package com.example.kane.Controller;    import org.slf4j.Logger;  import org.slf4j.LoggerFactory;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.kafka.core.KafkaTemplate;  import org.springframework.web.bind.annotation.*;  import javax.servlet.http.HttpServletRequest;  import javax.servlet.http.HttpServletResponse;      @RestController  @RequestMapping("/kafka")  public class CollectController {       protected final Logger logger = LoggerFactory.getLogger(this.getClass());          @Autowired          private KafkaTemplate kafkaTemplate;            @RequestMapping(value = "/send", method = RequestMethod.GET)          public void sendKafka(HttpServletRequest request, HttpServletResponse response) {              try {                  String message = request.getParameter("message");                  logger.info("kafka的消息={}", message);                  kafkaTemplate.send("test", "key", message);                  logger.info("發(fā)送kafka成功.");              } catch (Exception e) {                  logger.error("發(fā)送kafka失敗", e);              }          }    }
  • 啟動項目后,在瀏覽器訪問http://localhost:8080/kafka/send?message=url_producer
# 查看結果  2019-03-05 13:57:16.438  INFO 10208 --- [nio-8080-exec-1] c.e.kane.Controller.CollectController    : 發(fā)送kafka成功.  2019-03-05 13:57:45.871  INFO 10208 --- [nio-8080-exec-5] c.e.kane.Controller.CollectController    : kafka的消息=url_producer  2019-03-05 13:57:45.872  INFO 10208 --- [nio-8080-exec-5] c.e.kane.Controller.CollectController    : 發(fā)送kafka成功.  # 查看虛擬機 Consumer結果    [root@localhost ~]# /home/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.138:9092 --topic test --from-beginning  physics python message  123  null  url_producer
  • 增加消費者的配置
package com.example.kane.config;    import org.apache.kafka.clients.consumer.ConsumerConfig;  import org.apache.kafka.common.serialization.StringDeserializer;  import org.springframework.beans.factory.annotation.Value;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;  import org.springframework.kafka.annotation.EnableKafka;  import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;  import org.springframework.kafka.config.KafkaListenerContainerFactory;  import org.springframework.kafka.core.ConsumerFactory;  import org.springframework.kafka.core.DefaultKafkaConsumerFactory;  import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;    import java.util.HashMap;  import java.util.Map;    import com.example.kane.service.kafka_listener;  @Configuration  @EnableKafka  public class kafka_consumer_config {      @Bean      public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {          ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();          factory.setConsumerFactory(consumerFactory());          return factory;      }        public ConsumerFactory<String, String> consumerFactory() {          return new DefaultKafkaConsumerFactory<>(consumerConfigs());      }          public Map<String, Object> consumerConfigs() {          Map<String, Object> propsMap = new HashMap<>();          propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.138:9092");          propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);          propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);          propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);          propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");          return propsMap;      }      @Bean      public kafka_listener listener() {          return new kafka_listener();      }  }
  • 增加listener類
package com.example.kane.service;  import org.apache.kafka.clients.consumer.ConsumerRecord;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;  import org.springframework.kafka.annotation.KafkaListener;  public class kafka_listener {      protected final Logger logger = LoggerFactory.getLogger(this.getClass());          @KafkaListener(topics = {"test"})      public void listen(ConsumerRecord<?, ?> record) {          logger.info(record.toString());          logger.info("kafka的key: " + record.key());          logger.info("kafka的value: " + record.value().toString());      }  }
  • 同樣我們用訪問http://localhost:8080/kafka/send?message=url_producer1重新發(fā)一個消息
# 結果  2019-03-05 14:31:04.787  INFO 10208 --- [nio-8080-exec-1] c.e.kane.Controller.CollectController    : 發(fā)送kafka成功.  2019-03-05 14:31:04.848  INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener  : ConsumerRecord(topic = test, partition = 0, offset = 10, CreateTime = 1551767464787, serialized key size = 3, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = url_producer1)  2019-03-05 14:31:04.848  INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener  : kafka的key: key  2019-03-05 14:31:04.848  INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener  : kafka的value: url_producer1  # 查看虛擬機 消費者信息  physics python message  123  null  url_producer  url_producer1  url_producer1

一些需要注意的問題

  1. 現(xiàn)在kafka官方提供自帶zookeeper版本,不建議使用自帶的,還是建議自己安裝zookeeper
  2. 物理機沒法訪問的時候,看文中的注意事項,依次更改一定能訪問
贊(0)
分享到: 更多 (0)
?
網站地圖   滬ICP備18035694號-2    滬公網安備31011702889846號
久久精品五月,日韩不卡视频在线观看,国产精品videossex久久发布 ,久久av综合
丝袜诱惑制服诱惑色一区在线观看 | 亚洲视频国产| 136国产福利精品导航网址| 国产一区日韩| 里番精品3d一二三区| 奇米亚洲欧美| 成人看片网站| 免费av一区二区三区四区| 国产精品大片| 欧美黄色精品| 日本久久精品| 久久高清免费观看| 蜜臀国产一区二区三区在线播放| 久久国产精品久久久久久电车| 女同性一区二区三区人了人一| 午夜久久99| 免费观看久久久4p| 日本久久二区| 国产日韩欧美| 国产精品日韩精品中文字幕| 麻豆国产精品777777在线| 日本久久二区| 国产精品流白浆在线观看| 精品不卡一区| 成人看片网站| 午夜久久免费观看| 日韩专区在线视频| 亚洲免费成人av在线| 欧美一区成人| 超级白嫩亚洲国产第一| 久久九九精品| 丝袜美腿一区二区三区| 青草久久视频| 欧美日韩国产观看视频| 美女毛片一区二区三区四区| 美女网站久久| 日韩精品亚洲一区二区三区免费| 日本成人中文字幕| 美女精品久久| 午夜av一区| 91精品国产自产在线丝袜啪| 国产激情欧美| 九九色在线视频| 久久aⅴ国产紧身牛仔裤| 日本综合精品一区| 色综合五月天| 每日更新成人在线视频| 国产亚洲字幕| 久久久久国产精品一区三寸| 深夜福利一区| 精品亚洲自拍| 久久精品一区二区不卡| 亚洲资源在线| 国产精品久久久久蜜臀| 国产女优一区| 国产福利一区二区三区在线播放| 蜜桃精品在线| 日本伊人午夜精品| 成人精品高清在线视频| 日韩一级欧洲| 国产欧美成人| 欧洲激情综合| 国产日韩欧美一区二区三区在线观看 | 日韩av中文字幕一区二区 | 蜜桃91丨九色丨蝌蚪91桃色| 久久国产精品免费精品3p| 青青青免费在线视频| 久久亚洲视频| 免费看av不卡| 日韩av一区二区三区四区| 播放一区二区| 国产精品久久亚洲不卡| 最新亚洲激情| 精品国产欧美日韩| 四虎精品一区二区免费| 高清不卡亚洲| 日韩精品三区四区| 欧美国产91| 日本aⅴ免费视频一区二区三区| 成人在线免费观看网站| 日韩精品成人在线观看| 国产91精品对白在线播放| 国产麻豆一区二区三区| 亚洲激情av| 国产在线看片免费视频在线观看| 亚洲精品成人一区| 宅男在线一区| 成人美女视频| 91欧美日韩在线| 亚洲精品一二三区区别| 精品久久一区| 亚洲精品麻豆| 欧美日韩激情| 精品72久久久久中文字幕| 亚洲精品黄色| 欧美高清不卡| 日韩免费在线| 精品三区视频| 97久久亚洲| 日本欧洲一区二区| 91精品国产成人观看| 久久免费福利| 国产精选久久| 亚洲精品国产精品粉嫩| 在线亚洲自拍| 国模 一区 二区 三区| 国产一区二区三区网| 91精品日本| 蘑菇福利视频一区播放| 99视频精品全国免费| 国产一区二区三区四区五区| 欧美日韩精品一区二区三区在线观看| 亚洲在线一区| 欧美日韩国产免费观看视频| 中文字幕在线视频网站| 精品伊人久久| 国产经典一区| 国产精品成人自拍| 欧美精品中文字幕亚洲专区| 亚洲啊v在线免费视频| 国产免费av国片精品草莓男男| 91欧美国产| 韩日一区二区| 伊人影院久久| 欧美日韩国产观看视频| 青青草伊人久久| 亚洲91精品| 日本高清不卡一区二区三区视频| 国产视频亚洲| 免费成人av在线播放| 精品久久久久久久| 丝袜美腿成人在线| | 精品三级国产| 青青青免费在线视频| 国产精品jk白丝蜜臀av小说| 国产精品91一区二区三区| 久久视频国产| 欧美aa国产视频| 国产精品亲子伦av一区二区三区| 亚洲专区视频| 中文字幕av一区二区三区人| 精品视频在线一区二区在线| 亚洲在线久久| 麻豆精品在线视频| 精品国产乱码久久久久久樱花| 亚洲精品三级| 91国内精品| 国产夫妻在线| 欧美a一区二区| 日韩成人精品一区| 国产精品久av福利在线观看| 最新国产精品| 91精品国产91久久久久久黑人| 九色porny丨国产首页在线| 亚洲成人二区| 亚洲深夜福利| 日本一区二区三区中文字幕| 国产私拍福利精品视频二区| 国产精品激情电影| 国产精品黑丝在线播放| 亚洲午夜久久久久久尤物 | 日韩不卡一二三区| 精品国产亚洲一区二区在线观看| 群体交乱之放荡娇妻一区二区| 欧美13videosex性极品| 香蕉久久国产| 国产精品男女| 99久久激情| 久久国产精品免费精品3p| 日韩精品电影| 国产高清亚洲| 欧美精品91| 亚洲免费观看高清完整版在线观| 国产精品久久久亚洲一区| 婷婷国产精品| 亚洲精品无播放器在线播放| 久久久久国产精品一区三寸| 婷婷综合亚洲| 久久国产电影| 国产日韩在线观看视频| 黄色亚洲免费| 日韩一区二区三区高清在线观看| 日韩成人av影视| 玖玖玖国产精品| 欧美精品成人| 福利精品一区| 老鸭窝毛片一区二区三区| 一区二区三区四区在线观看国产日韩| 久久av偷拍| 丁香婷婷久久| 亚洲精一区二区三区| 国产成人精品一区二区免费看京 | 亚洲精品在线二区| 红杏一区二区三区| 日韩毛片在线| 亚洲丝袜啪啪| 久久亚洲资源中文字| 91精品精品| 欧美+日本+国产+在线a∨观看|