卖逼视频免费看片|狼人就干网中文字慕|成人av影院导航|人妻少妇精品无码专区二区妖婧|亚洲丝袜视频玖玖|一区二区免费中文|日本高清无码一区|国产91无码小说|国产黄片子视频91sese日韩|免费高清无码成人网站入口

kafka刪除積壓數(shù)據(jù) 從kafka讀取數(shù)據(jù)后,數(shù)據(jù)會自動刪除嗎?

從kafka讀取數(shù)據(jù)后,數(shù)據(jù)會自動刪除嗎?基于receiver的實現(xiàn)將使用kakfa的高級消費API。與所有其他接收器一樣,接收到的數(shù)據(jù)將保存到執(zhí)行器,然后sparkstreaming將啟動作業(yè)來處理

從kafka讀取數(shù)據(jù)后,數(shù)據(jù)會自動刪除嗎?

基于receiver的實現(xiàn)將使用kakfa的高級消費API。與所有其他接收器一樣,接收到的數(shù)據(jù)將保存到執(zhí)行器,然后sparkstreaming將啟動作業(yè)來處理數(shù)據(jù)。

在默認配置中,如果出現(xiàn)故障,此方法將丟失數(shù)據(jù)。為了確保零數(shù)據(jù)丟失,我們需要啟用wal(writeaheadlogs)。它將接收到的數(shù)據(jù)同步保存到分布式文件系統(tǒng),如HDFS。因此,在發(fā)生錯誤時可以恢復(fù)數(shù)據(jù)。

使用兩個步驟:1。添加依賴項:Spark streaming Kafka 2.10-1.3.0

2導(dǎo)入器g.apache.spark. 卡夫卡.ux

卡夫卡有兩種刪除數(shù)據(jù)的方法

根據(jù)時間,刪除一段時間后過期的消息

根據(jù)消息大小,消息數(shù)超過一定大小后刪除最早的數(shù)據(jù)

卡夫卡刪除數(shù)據(jù)的最小單位:segment

卡夫卡刪除數(shù)據(jù)的主要邏輯:卡夫卡源代碼

def cleanuplogs(){debug(”beging log cleanup。。。)var總計=0 val開始時間=時間.毫秒For(log

Kafka在一段時間內(nèi)(配置文件設(shè)置)調(diào)用cleanuplogs一次,刪除所有需要刪除的日志數(shù)據(jù)。

Cleanupexpiredsegments負責(zé)清除超時數(shù)據(jù)

private def Cleanupexpiredsegments(log:log):int={val startms=時間.毫秒log.deleteOldSegments文件(開始時間->上次修改時間>log.config.retentions保留)}

cleanupsegmenttomaintainsize負責(zé)清理大于大小的數(shù)據(jù)私有def cleanupsegmentstomaintainsize(log:log):int={if(log.config.retentionSize文件=0){差異-=段.尺寸真}否則{假}log.deleteOldSegments文件(應(yīng)該刪除)}