kafka刪除積壓數(shù)據(jù) 從kafka讀取數(shù)據(jù)后,數(shù)據(jù)會自動刪除嗎?
從kafka讀取數(shù)據(jù)后,數(shù)據(jù)會自動刪除嗎?基于receiver的實現(xiàn)將使用kakfa的高級消費API。與所有其他接收器一樣,接收到的數(shù)據(jù)將保存到執(zhí)行器,然后sparkstreaming將啟動作業(yè)來處理
從kafka讀取數(shù)據(jù)后,數(shù)據(jù)會自動刪除嗎?
基于receiver的實現(xiàn)將使用kakfa的高級消費API。與所有其他接收器一樣,接收到的數(shù)據(jù)將保存到執(zhí)行器,然后sparkstreaming將啟動作業(yè)來處理數(shù)據(jù)。
在默認配置中,如果出現(xiàn)故障,此方法將丟失數(shù)據(jù)。為了確保零數(shù)據(jù)丟失,我們需要啟用wal(writeaheadlogs)。它將接收到的數(shù)據(jù)同步保存到分布式文件系統(tǒng),如HDFS。因此,在發(fā)生錯誤時可以恢復(fù)數(shù)據(jù)。
使用兩個步驟:1。添加依賴項:Spark streaming Kafka 2.10-1.3.0
2導(dǎo)入器g.apache.spark. 卡夫卡.ux
卡夫卡有兩種刪除數(shù)據(jù)的方法
根據(jù)時間,刪除一段時間后過期的消息
根據(jù)消息大小,消息數(shù)超過一定大小后刪除最早的數(shù)據(jù)
卡夫卡刪除數(shù)據(jù)的最小單位:segment
卡夫卡刪除數(shù)據(jù)的主要邏輯:卡夫卡源代碼
def cleanuplogs(){debug(”beging log cleanup。。。)var總計=0 val開始時間=時間.毫秒For(log
Kafka在一段時間內(nèi)(配置文件設(shè)置)調(diào)用cleanuplogs一次,刪除所有需要刪除的日志數(shù)據(jù)。
Cleanupexpiredsegments負責(zé)清除超時數(shù)據(jù)
private def Cleanupexpiredsegments(log:log):int={val startms=時間.毫秒log.deleteOldSegments文件(開始時間->上次修改時間>log.config.retentions保留)}
cleanupsegmenttomaintainsize負責(zé)清理大于大小的數(shù)據(jù)私有def cleanupsegmentstomaintainsize(log:log):int={if(log.config.retentionSize文件=0){差異-=段.尺寸真}否則{假}log.deleteOldSegments文件(應(yīng)該刪除)}