kafka生產(chǎn)數(shù)據(jù)命令 如何刪除kafka積壓數(shù)據(jù)?
如何刪除kafka積壓數(shù)據(jù)?卡夫卡可以通過兩種方式刪除數(shù)據(jù)根據(jù)時間,刪除一段時間后過期的消息根據(jù)消息大小,消息數(shù)超過一定大小后刪除最早的數(shù)據(jù)卡夫卡刪除數(shù)據(jù)的最小單位:segment卡夫卡刪除數(shù)據(jù)的主要
如何刪除kafka積壓數(shù)據(jù)?
卡夫卡可以通過兩種方式刪除數(shù)據(jù)
根據(jù)時間,刪除一段時間后過期的消息
根據(jù)消息大小,消息數(shù)超過一定大小后刪除最早的數(shù)據(jù)
卡夫卡刪除數(shù)據(jù)的最小單位:segment
卡夫卡刪除數(shù)據(jù)的主要邏輯:卡夫卡源代碼
def cleanuplogs(){debug(”beging log cleanup。。。)var總計=0 val開始時間=時間.毫秒For(log
Kafka在一段時間內(nèi)(配置文件設(shè)置)調(diào)用cleanuplogs一次,刪除所有需要刪除的日志數(shù)據(jù)。
Cleanupexpiredsegments負(fù)責(zé)清除超時數(shù)據(jù)
private def Cleanupexpiredsegments(log:log):int={val startms=時間.毫秒log.deleteOldSegments文件(開始時間->上次修改時間>log.config.retentions保留)}
cleanupsegmenttomaintainsize負(fù)責(zé)清理大于大小的數(shù)據(jù)私有def cleanupsegmentstomaintainsize(log:log):int={if(log.config.retentionSize文件=0){差異-=段.尺寸真}否則{假}log.deleteOldSegments文件(shouldDelete)}
我沒事,來這里玩,開始在各種網(wǎng)絡(luò)上尋找技術(shù)信息,之后以“標(biāo)題”為主。從尋找信息到交朋友。因?yàn)槲矣X得事情落后于時代,有人認(rèn)為,是因?yàn)樽约核讲桓摺V皇窃谛睦锵?,無法實(shí)現(xiàn)現(xiàn)實(shí)
JAVA面試如何保證消息不被重復(fù)消費(fèi)?如何保證消息消費(fèi)的冪等性?
基于接收器的實(shí)現(xiàn)將使用kakfa的高級消費(fèi)API。與所有其他接收器一樣,接收到的數(shù)據(jù)將保存到執(zhí)行器,然后sparkstreaming將啟動作業(yè)來處理數(shù)據(jù)。
在默認(rèn)配置中,如果出現(xiàn)故障,此方法將丟失數(shù)據(jù)。為了確保零數(shù)據(jù)丟失,我們需要啟用wal(writeaheadlogs)。它將接收到的數(shù)據(jù)同步保存到分布式文件系統(tǒng),如HDFS。因此,在發(fā)生錯誤時可以恢復(fù)數(shù)據(jù)。
使用兩個步驟:1。添加依賴項(xiàng):Spark streaming Kafka 2.10-1.3.0
2導(dǎo)入器g.apache.spark. 卡夫卡._