【Kafka】使用Java實現(xiàn)數(shù)據(jù)的生產(chǎn)和消費(fèi)
點(diǎn)擊關(guān)注,與你共同成長!
(資料圖片)
【Kafka】Java實現(xiàn)數(shù)據(jù)的生產(chǎn)和消費(fèi)Kafka介紹
Kafka 是由 LinkedIn公司開發(fā)的,它是一個分布式的,支持多分區(qū)、多副本,基于 Zookeeper 的分布式消息流平臺,它同時也是一款開源的基于發(fā)布訂閱模式的消息引擎系統(tǒng)。
Kafka術(shù)語 Broker:消息中間件處理節(jié)點(diǎn),一個Kafka節(jié)點(diǎn)就是一個Broker,一個或者多個Broker可以組成一個Kafka集群;Topic:每條發(fā)布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處);Partition:Partition是物理上的概念,每個Topic包含一個或多個Partition;Producer:負(fù)責(zé)發(fā)布消息到Kafka Broker;Consumer:消息消費(fèi)者,向Kafka Broker讀取消息的客戶端;Consumer Group:每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定Groupname,若不指定Groupname則屬于默認(rèn)的Group);Consumer Offset:消費(fèi)者在消費(fèi)消息的過程中,記錄消費(fèi)者在分區(qū)中消費(fèi)進(jìn)度的字段,就是消息位移,它是一個偏移量,隨著消費(fèi)者不斷消費(fèi)分區(qū)中的消息而遞增;Replica:Kafka 中消息的備份又叫做 副本(Replica),副本的數(shù)量是可以配置的,Kafka 定義了兩類副本,領(lǐng)導(dǎo)者副本(Leader Replica) 和 追隨者副本(Follower Replica),前者對外提供服務(wù),后者只是被動跟隨;Rebalance:當(dāng) Kafka 的某個主題的消費(fèi)者組中,有一個消費(fèi)者不可用后,其他消費(fèi)者會自動重新分配訂閱的主題分區(qū),這個過程叫做 Rebalance,是 Kafka 實現(xiàn)消費(fèi)者端高可用的重要手段。Kafka特性 高吞吐、低延遲:kakfa 最大的特點(diǎn)就是收發(fā)消息非??欤琸afka 每秒可以處理幾十萬條消息,它的最低延遲只有幾毫秒;高伸縮性:每個主題(topic) 包含多個分區(qū)(partition),主題中的分區(qū)可以分布在不同的主機(jī)(broker)中;持久性、可靠性:Kafka 能夠允許數(shù)據(jù)的持久化存儲,消息被持久化到磁盤,并支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失,Kafka 底層的數(shù)據(jù)存儲是基于 Zookeeper 存儲的,Zookeeper 的數(shù)據(jù)能夠持久存儲;容錯性:允許集群中的節(jié)點(diǎn)失敗,某個節(jié)點(diǎn)宕機(jī),Kafka 集群能夠正常工作;高并發(fā):支持?jǐn)?shù)千個客戶端同時讀寫。Kafka應(yīng)用場景 活動跟蹤:Kafka 可以用來跟蹤用戶行為,比如你經(jīng)常回去App購物,你打開App的那一刻,你的登陸信息,登陸次數(shù)都會作為消息傳輸?shù)?Kafka ,當(dāng)你瀏覽購物的時候,你的瀏覽信息,你的搜索指數(shù),你的購物愛好都會作為一個個消息傳遞給 Kafka ,這樣就可以生成報告,可以做智能推薦,購買喜好等;傳遞消息:Kafka 另外一個基本用途是傳遞消息,應(yīng)用程序向用戶發(fā)送通知就是通過傳遞消息來實現(xiàn)的,這些應(yīng)用組件可以生成消息,而不需要關(guān)心消息的格式,也不需要關(guān)心消息是如何發(fā)送的;度量指標(biāo):Kafka也經(jīng)常用來記錄運(yùn)營監(jiān)控數(shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報警和報告;日志記錄:Kafka 的基本概念來源于提交日志,比如可以把數(shù)據(jù)庫的更新發(fā)送到 Kafka 上,用來記錄數(shù)據(jù)庫的更新時間,通過Kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer,例如hadoop、Hbase、Solr等;流式處理:流式處理是有一個能夠提供多種應(yīng)用程序的領(lǐng)域;限流削峰:Kafka 多用于互聯(lián)網(wǎng)領(lǐng)域某一時刻請求特別多的情況下,可以把請求寫入Kafka 中,避免直接請求后端程序?qū)е路?wù)崩潰。以上介紹參考Kafka官方文檔。
Kafka核心APIKafka有4個核心API
應(yīng)用程序使用Producer API發(fā)布消息到1個或多個Topics中;應(yīng)用程序使用ConsumerAPI來訂閱1個或多個Topics,并處理產(chǎn)生的消息;應(yīng)用程序使用Streams API充當(dāng)一個流處理器,從1個或多個Topics消費(fèi)輸入流,并產(chǎn)生一個輸出流到1個或多個Topics,有效地將輸入流轉(zhuǎn)換到輸出流;Connector API允許構(gòu)建或運(yùn)行可重復(fù)使用的生產(chǎn)者或消費(fèi)者,將Topic鏈接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)。Kafka為何如此之快Kafka 實現(xiàn)了零拷貝原理來快速移動數(shù)據(jù),避免了內(nèi)核之間的切換。Kafka 可以將數(shù)據(jù)記錄分批發(fā)送,從生產(chǎn)者到文件系統(tǒng)(Kafka 主題日志)到消費(fèi)者,可以端到端的查看這些批次的數(shù)據(jù)。批處理能夠進(jìn)行更有效的數(shù)據(jù)壓縮并減少 I/O 延遲,Kafka 采取順序?qū)懭氪疟P的方式,避免了隨機(jī)磁盤尋址的浪費(fèi)。
總結(jié)一下其實就是四個要點(diǎn):
順序讀寫;零拷貝;消息壓縮;分批發(fā)送。案例項目創(chuàng)建:
Dependencies:
構(gòu)建工具為Maven,Maven的依賴如下:
Kafka Producerorg.apache.kafka kafka_2.12 1.0.0 provided org.apache.kafka kafka-clients 1.0.0 org.apache.kafka kafka-streams 1.0.0
packagecn.com.codingce.module;importjava.util.Properties;importjava.util.Random;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.common.serialization.StringSerializer;publicclassProducer{//定義主題publicstaticStringtopic="codingce_test";publicstaticvoidmain(String[]args)throwsInterruptedException{Propertiesp=newProperties();//bootstrap.servers:kafka的地址,多個地址用逗號分割p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.31.150:9092");//acks:消息的確認(rèn)機(jī)制,默認(rèn)值是0.acks=0:如果設(shè)置為0,生產(chǎn)者不會等待kafka的響應(yīng);acks=1:這個配置意味著kafka會把這條消息寫到本地日志文件中,但是不會等待集群中其他機(jī)器的成功響應(yīng)//acks=all:這個配置意味著leader會等待所有的follower同步完成.這個確保消息不會丟失,除非kafka集群中所有機(jī)器掛掉.這是最強(qiáng)的可用性保證.p.put("acks","all");//retries:配置為大于0的值的話,客戶端會在消息發(fā)送失敗時重新發(fā)送.p.put("retries",0);//batch.size:當(dāng)多條消息需要發(fā)送到同一個分區(qū)時,生產(chǎn)者會嘗試合并網(wǎng)絡(luò)請求.這會提高client和生產(chǎn)者的效率.p.put("batch.size",16384);//key.serializer:鍵序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer.p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);//value.deserializer:值序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer.p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);KafkaProducerkafkaProducer=newKafkaProducer<>(p);try{do{Stringmsg="后端碼匠,"+newRandom().nextInt(100);ProducerRecord record=newProducerRecord<>(topic,msg);kafkaProducer.send(record);System.out.println("======消息發(fā)送成功:"+msg+"======");Thread.sleep(1000L);}while(true);}finally{kafkaProducer.close();}}}
output
======消息發(fā)送成功:后端碼匠,97============消息發(fā)送成功:后端碼匠,35============消息發(fā)送成功:后端碼匠,81============消息發(fā)送成功:后端碼匠,46============消息發(fā)送成功:后端碼匠,62============消息發(fā)送成功:后端碼匠,53============消息發(fā)送成功:后端碼匠,42============消息發(fā)送成功:后端碼匠,56============消息發(fā)送成功:后端碼匠,99============消息發(fā)送成功:后端碼匠,46============消息發(fā)送成功:后端碼匠,49============消息發(fā)送成功:后端碼匠,35============消息發(fā)送成功:后端碼匠,17============消息發(fā)送成功:后端碼匠,78============消息發(fā)送成功:后端碼匠,66============消息發(fā)送成功:后端碼匠,4============消息發(fā)送成功:后端碼匠,9============消息發(fā)送成功:后端碼匠,69============消息發(fā)送成功:后端碼匠,52============消息發(fā)送成功:后端碼匠,2============消息發(fā)送成功:后端碼匠,8============消息發(fā)送成功:后端碼匠,86============消息發(fā)送成功:后端碼匠,12============消息發(fā)送成功:后端碼匠,67============消息發(fā)送成功:后端碼匠,91============消息發(fā)送成功:后端碼匠,8============消息發(fā)送成功:后端碼匠,56============消息發(fā)送成功:后端碼匠,89============消息發(fā)送成功:后端碼匠,37============消息發(fā)送成功:后端碼匠,39============消息發(fā)送成功:后端碼匠,71======Kafka Consumer
packagecn.com.codingce.module;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.util.Collections;importjava.util.Properties;publicclassConsumer{privatestaticfinalStringGROUPID="codingce_consumer_a";publicstaticvoidmain(String[]args){Propertiesp=newProperties();//bootstrap.servers:kafka的地址,多個地址用逗號分割p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.31.150:9092");//消費(fèi)者所屬的分組id,組名不同組名可以重復(fù)消費(fèi).例如你先使用了組名A消費(fèi)了Kafka的1000條數(shù)據(jù),但是你還想再次進(jìn)行消費(fèi)這1000條數(shù)據(jù),//并且不想重新去產(chǎn)生,那么這里你只需要更改組名就可以重復(fù)消費(fèi)了.p.put(ConsumerConfig.GROUP_ID_CONFIG,GROUPID);//是否自動提交,默認(rèn)為true.p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//從poll(拉)的回話處理時長p.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");//超時時間p.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");//一次最大拉取的條數(shù)p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1000);//消費(fèi)規(guī)則,默認(rèn)earliestp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//key.serializer:鍵序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer.p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//value.deserializer:值序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer.p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);KafkaConsumerkafkaConsumer=newKafkaConsumer<>(p);//訂閱消息kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));do{//訂閱之后,再從kafka中拉取數(shù)據(jù)ConsumerRecords records=kafkaConsumer.poll(100);for(ConsumerRecord record:records){System.out.printf("-----topic:%s,offset:%d,消息:%s-----\n",record.topic(),record.offset(),record.value());}}while(true);}}
output
-----topic:codingce_test,offset:289,消息:后端碼匠,97----------topic:codingce_test,offset:290,消息:后端碼匠,35----------topic:codingce_test,offset:291,消息:后端碼匠,81----------topic:codingce_test,offset:292,消息:后端碼匠,46----------topic:codingce_test,offset:293,消息:后端碼匠,62----------topic:codingce_test,offset:294,消息:后端碼匠,53----------topic:codingce_test,offset:295,消息:后端碼匠,42----------topic:codingce_test,offset:296,消息:后端碼匠,56----------topic:codingce_test,offset:297,消息:后端碼匠,99----------topic:codingce_test,offset:298,消息:后端碼匠,46----------topic:codingce_test,offset:299,消息:后端碼匠,49----------topic:codingce_test,offset:300,消息:后端碼匠,35----------topic:codingce_test,offset:301,消息:后端碼匠,17----------topic:codingce_test,offset:302,消息:后端碼匠,78----------topic:codingce_test,offset:303,消息:后端碼匠,66----------topic:codingce_test,offset:304,消息:后端碼匠,4----------topic:codingce_test,offset:305,消息:后端碼匠,9----------topic:codingce_test,offset:306,消息:后端碼匠,69----------topic:codingce_test,offset:307,消息:后端碼匠,52----------topic:codingce_test,offset:308,消息:后端碼匠,2----------topic:codingce_test,offset:309,消息:后端碼匠,8----------topic:codingce_test,offset:310,消息:后端碼匠,86----------topic:codingce_test,offset:311,消息:后端碼匠,12----------topic:codingce_test,offset:312,消息:后端碼匠,67----------topic:codingce_test,offset:313,消息:后端碼匠,91----------topic:codingce_test,offset:314,消息:后端碼匠,8----------topic:codingce_test,offset:315,消息:后端碼匠,56----------topic:codingce_test,offset:316,消息:后端碼匠,89----------topic:codingce_test,offset:317,消息:后端碼匠,37----------topic:codingce_test,offset:318,消息:后端碼匠,39----------topic:codingce_test,offset:319,消息:后端碼匠,71-----
本次采用Docker 搭建的單機(jī) Kafka、Zookeeper,Kafka介紹參考官方文檔:http://kafka.apache.org/intro
項目地址:https://gitee.com/codingce/codingce-leetcode
【Java】線程池梳理
【C++】const關(guān)鍵字
【Java】原子類
以上,便是今天的分享,希望大家喜歡,覺得內(nèi)容不錯的,歡迎「分享」「贊」或者點(diǎn)擊「在看」支持,謝謝各位。
關(guān)鍵詞: 應(yīng)用程序
相關(guān)閱讀
-
【Kafka】使用Java實現(xiàn)數(shù)據(jù)的生產(chǎn)和消費(fèi)
點(diǎn)擊關(guān)注,與你共同成長!【Kafka】Java實現(xiàn)數(shù)據(jù)的生產(chǎn)和消費(fèi)Kafka... -
傳統(tǒng)電視娛樂體驗大升級,Oorbit與LG電...
Block-10143D云技術(shù)平臺Oorbit宣布與LG電子達(dá)成戰(zhàn)略合作伙伴關(guān)系,... -
建筑管理模式升級之路數(shù)字化全過程工程...
▲點(diǎn)擊上方藍(lán)字,關(guān)注BIM大數(shù)據(jù)!建筑行業(yè)經(jīng)過了幾十年的發(fā)展和變革... -
產(chǎn)品經(jīng)理防身指南
產(chǎn)品經(jīng)理和團(tuán)隊成員打架的案例屢見不鮮。有平安產(chǎn)品經(jīng)理要求程序員... -
產(chǎn)品經(jīng)理學(xué)技術(shù)之?dāng)?shù)據(jù)結(jié)構(gòu)
今天給產(chǎn)品經(jīng)理普及一下數(shù)據(jù)結(jié)構(gòu)的相關(guān)知識,數(shù)據(jù)結(jié)構(gòu)是指相互之間... -
離職員工為啥不愿好好交接? 全球百事通
有個小伙伴微信上跟我訴苦,說自己入職新公司,老板讓即將離職的員...