卖逼视频免费看片|狼人就干网中文字慕|成人av影院导航|人妻少妇精品无码专区二区妖婧|亚洲丝袜视频玖玖|一区二区免费中文|日本高清无码一区|国产91无码小说|国产黄片子视频91sese日韩|免费高清无码成人网站入口

查看kafka消費(fèi)組的消息數(shù) 如何使用消息隊(duì)列解決分布式事務(wù)?

如何使用消息隊(duì)列解決分布式事務(wù)?消息事務(wù)是指一系列的生產(chǎn)、消費(fèi)操作可以要么都完成,要么都失敗,類似數(shù)據(jù)庫的事務(wù)。在此用Kafka 為例做進(jìn)一步解說吧!一、基本概念為了支持事務(wù),Kafka 0.11.0

如何使用消息隊(duì)列解決分布式事務(wù)?

消息事務(wù)是指一系列的生產(chǎn)、消費(fèi)操作可以要么都完成,要么都失敗,類似數(shù)據(jù)庫的事務(wù)。在此用Kafka 為例做進(jìn)一步解說吧!

一、基本概念

為了支持事務(wù),Kafka 0.11.0版本引入以下概念:

1.事務(wù)協(xié)調(diào)者:類似于消費(fèi)組負(fù)載均衡的協(xié)調(diào)者,每一個(gè)實(shí)現(xiàn)事務(wù)的生產(chǎn)端都被分配到一個(gè)事務(wù)協(xié)調(diào)者(Transaction Coordinator)。

2.引入一個(gè)內(nèi)部Kafka Topic作為事務(wù)Log:類似于消費(fèi)管理Offset的Topic,事務(wù)Topic本身也是持久化的,日志信息記錄事務(wù)狀態(tài)信息,由事務(wù)協(xié)調(diào)者寫入。

3.引入控制消息(Control Messages):這些消息是客戶端產(chǎn)生的并寫入到主題的特殊消息,但對于使用者來說不可見。它們是用來讓broker告知消費(fèi)者之前拉取的消息是否被原子性提交。

4.引入TransactionId:不同生產(chǎn)實(shí)例使用同一個(gè)TransactionId表示是同一個(gè)事務(wù),可以跨Session的數(shù)據(jù)冪等發(fā)送。當(dāng)具有相同Transaction ID的新的Producer實(shí)例被創(chuàng)建且工作時(shí),舊的且擁有相同Transaction ID的Producer將不再工作,避免事務(wù)僵死。

ID:每個(gè)新的Producer在初始化的時(shí)候會被分配一個(gè)唯一的PID,這個(gè)PID對用戶是不可見的。主要是為提供冪等性時(shí)引入的。

Numbler。(對于每個(gè)PID,該P(yáng)roducer發(fā)送數(shù)據(jù)的每個(gè)ltTopic, Partitiongt都對應(yīng)一個(gè)從0開始單調(diào)遞增的Sequence Number。

7.每個(gè)生產(chǎn)者增加一個(gè)epoch:用于標(biāo)識同一個(gè)事務(wù)Id在一次事務(wù)中的epoch,每次初始化事務(wù)時(shí)會遞增,從而讓服務(wù)端可以知道生產(chǎn)者請求是否舊的請求。

8.冪等性:保證發(fā)送單個(gè)分區(qū)的消息只會發(fā)送一次,不會出現(xiàn)重復(fù)消息。增加一個(gè)冪等性的開關(guān),可以獨(dú)立與事務(wù)使用,即可以只開啟冪等但不開啟事務(wù)

二、事務(wù)流程

1、查找事務(wù)協(xié)調(diào)者

生產(chǎn)者會首先發(fā)起一個(gè)查找事務(wù)協(xié)調(diào)者的請求(FindCoordinatorRequest)。協(xié)調(diào)者會負(fù)責(zé)分配一個(gè)PID給生產(chǎn)者。類似于消費(fèi)組的協(xié)調(diào)者。

2、獲取produce ID

在知道事務(wù)協(xié)調(diào)者后,生產(chǎn)者需要往協(xié)調(diào)者發(fā)送初始化pid請求(initPidRequest)。這個(gè)請求分兩種情況:

●不帶transactionID

這種情況下直接生成一個(gè)新的produce ID即可,返回給客戶端

●帶transactionID

這種情況下,kafka根據(jù)transactionalId獲取對應(yīng)的PID,這個(gè)對應(yīng)關(guān)系是保存在事務(wù)日志中(上圖2a)。這樣可以確保相同的TransactionId返回相同的PID,用于恢復(fù)或者終止之前未完成的事務(wù)。

3、啟動事務(wù)

生產(chǎn)者通過調(diào)用beginTransaction接口啟動事務(wù),此時(shí)只是內(nèi)部的狀態(tài)記錄為事務(wù)開始,但是事務(wù)協(xié)調(diào)者認(rèn)為事務(wù)開始只有當(dāng)生產(chǎn)者開始發(fā)送第一條消息才開始。

4、消費(fèi)和生產(chǎn)配合過程

這一步是消費(fèi)和生成互相配合完成事務(wù)的過程,其中涉及多個(gè)請求:

●增加分區(qū)到事務(wù)請求

當(dāng)生產(chǎn)者有新分區(qū)要寫入數(shù)據(jù),則會發(fā)送AddPartitionToTxnRequest到事務(wù)協(xié)調(diào)者。協(xié)調(diào)者會處理請求,主要做的事情是更新事務(wù)元數(shù)據(jù)信息,并把信息寫入到事務(wù)日志中(事務(wù)Topic)。

●生產(chǎn)請求

生產(chǎn)者通過調(diào)用send接口發(fā)送數(shù)據(jù)到分區(qū),這些請求新增pid,epoch和sequence number字段。

●增加消費(fèi)offset到事務(wù)

生產(chǎn)者通過新增的snedOffsets ToTransaction接口,會發(fā)送某個(gè)分區(qū)的Offset信息到事務(wù)協(xié)調(diào)者。協(xié)調(diào)者會把分區(qū)信息增加到事務(wù)中。

●事務(wù)提交offset請求

當(dāng)生產(chǎn)者調(diào)用事務(wù)提交offset接口后,會發(fā)送一個(gè)TxnOffsetCommitRequest請求到消費(fèi)組協(xié)調(diào)者,消費(fèi)組協(xié)調(diào)者會把offset存儲在__consumer-offsets Topic中。協(xié)調(diào)者會根據(jù)請求的PID和epoch驗(yàn)證生產(chǎn)者是否允許發(fā)起這個(gè)請求。 消費(fèi)offset只有當(dāng)事務(wù)提交后才對外可見。

5、提交或回滾事務(wù)

用戶通過調(diào)用commitTransaction或abortTranssaction方法提交或回滾事務(wù)。

●EndTxnRequest

當(dāng)生產(chǎn)者完成事務(wù)后,客戶端需要顯式調(diào)用結(jié)束事務(wù)或者回滾事務(wù)。前者會使得消息對消費(fèi)者可見,后者會對生產(chǎn)數(shù)據(jù)標(biāo)記為Abort狀態(tài),使得消息對消費(fèi)者不可見。無論是提交或者回滾,都是發(fā)送一個(gè)EndTnxRequest請求到事務(wù)協(xié)調(diào)者,寫入PREPARE_COMMIT或者PREPARE_ABORT信息到事務(wù)記錄日志中(5.1a)。

●WriteTxnMarkerRequest

這個(gè)請求是事務(wù)協(xié)調(diào)者向事務(wù)中每個(gè)TopicPartition的Leader發(fā)送的。每個(gè)Broker收到請求后會寫入COMMIT(PID)或者ABORT(PID)控制信息到數(shù)據(jù)日志中(5.2a)。

這個(gè)信息用于告知消費(fèi)者當(dāng)前消息是哪個(gè)事務(wù),消息是否應(yīng)該接受或者丟棄。而對于未提交消息,消費(fèi)者會緩存該事務(wù)的消息直到提交或者回滾。

這里要注意,如果事務(wù)也涉及到__consumer_offsets,即該事務(wù)中有消費(fèi)數(shù)據(jù)的操作且將該消費(fèi)的Offset存于__consumer_offsets中,Transaction Coordinator也需要向該內(nèi)部Topic的各Partition的Leader發(fā)送WriteTxnMarkerRequest從而寫入COMMIT(PID)或COMMIT(PID)控制信息(5.2a 左邊)。

●寫入最終提交或回滾信息

當(dāng)提交和回滾信息寫入數(shù)據(jù)日子后,事務(wù)協(xié)調(diào)者會往事務(wù)日志中寫入最終的提交或者終止信息以表示事務(wù)已經(jīng)完成(圖5.3),此時(shí)大部分于事務(wù)有關(guān)系的消息都可以被刪除(通過標(biāo)記后面在日志壓縮時(shí)會被移除),我們只需要保留事務(wù)ID以及其時(shí)間戳即可。

apache kafka是什么開源的系統(tǒng)?

Apache Kafka是一個(gè)開源消息系統(tǒng),由Scala寫成。是由Apache軟件基金會開發(fā)的一個(gè)開源消息系統(tǒng)項(xiàng)目。

Kafka最初是由LinkedIn開發(fā),并于2011年初開源。2012年10月從Apache Incubator畢業(yè)。該項(xiàng)目的目標(biāo)是為處理實(shí)時(shí)數(shù)據(jù)提供一個(gè)統(tǒng)一、高通量、低等待的平臺。

Kafka是一個(gè)分布式消息隊(duì)列:生產(chǎn)者、消費(fèi)者的功能。它提供了類似于JMS的特性,但是在設(shè)計(jì)實(shí)現(xiàn)上完全不同,此外它并不是JMS規(guī)范的實(shí)現(xiàn)。

Kafka對消息保存時(shí)根據(jù)Topic進(jìn)行歸類,發(fā)送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個(gè)kafka實(shí)例組成,每個(gè)實(shí)例(server)成為broker。

無論是kafka集群,還是producer和consumer都依賴于zookeeper集群保存一些meta信息,來保證系統(tǒng)可用性