中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何將數據按指定格式存入zookeeper

發布時間:2021-12-22 14:10:47 來源:億速云 閱讀:131 作者:iii 欄目:開發技術

這篇文章主要講解了“如何將數據按指定格式存入zookeeper”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何將數據按指定格式存入zookeeper”吧!

環境:

  scala版本:2.11.8

  zookeeper版本:3.4.5-cdh6.7.0

package com.ruozedata.zk
import java.util.concurrent.TimeUnit
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.locks.InterProcessMutex
import org.apache.curator.retry.ExponentialBackoffRetry
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
import scala.collection.mutable
/**
  * Created by ganwei on 2018/08/21
  * 要求:
  * 1 通過storeOffsets方法把數據存入zookeeper中。
  *  存儲格式:
  *          /consumers/G322/offsets/ruoze_offset_topic/partition/0
  *          /consumers/G322/offsets/ruoze_offset_topic/partition/1
  *          /consumers/G322/offsets/ruoze_offset_topic/partition/2
  * 2 通過obtainOffsets方法把存入的數據讀取出來
  * 輸出格式:
  *           topic:ruoze_offset_topic	partition:0	offset:7
  *           topic:ruoze_offset_topic	partition:1	offset:3
  *           topic:ruoze_offset_topic	partition:2	offset:5
  */
object ZkConnectApp{
  val LOG = LoggerFactory.getLogger(ZkConnectApp.getClass)
  val client = {
    val client = CuratorFrameworkFactory
      .builder
      .connectString("172.16.100.31:2181")
      .retryPolicy(new ExponentialBackoffRetry(1000, 3))
      .namespace("consumers")
      .build()
    client.start()
    client
  }
  def lock(path: String)(body: => Unit) {
    val lock = new InterProcessMutex(client, path)
    lock.acquire()
    try {
      body
    } finally {
      lock.release()
    }
  }
  def tryDo(path: String)(body: => Unit): Boolean = {
    val lock = new InterProcessMutex(client, path)
    if (!lock.acquire(10, TimeUnit.SECONDS)) {
      LOG.info(s"不能獲得鎖 {$path},已經有任務在運行,本次任務退出")
      return false
    }
    try {
      LOG.info("獲準運行")
      body
      true
    } finally {
      lock.release()
      LOG.info(s"釋放鎖 {$path}")
    }
  }
  //zookeeper創建路徑
  def ensurePathExists(path: String): Unit = {
    if (client.checkExists().forPath(path) == null) {
      client.create().creatingParentsIfNeeded().forPath(path)
    }
  }
  /**
    * OffsetRange類定義(偏移量對象)
    * 用于存儲偏移量
    */
  case class OffsetRange(
                          val topic:String,     // 主題
                          val partition:Int,    // 分區
                          val fromOffset:Long,  // 起始偏移量
                          val utilOffset:Long   // 終止偏移量
                        )
  /**
    * zookeeper存儲offset的方法
    * 寫入格式:
    * /consumers/G322/offsets/ruoze_offset_topic/partition/0
    * /consumers/G322/offsets/ruoze_offset_topic/partition/1
    * /consumers/G322/offsets/ruoze_offset_topic/partition/2
    * @param OffsetsRanges
    * @param groupName
    */
  def storeOffsets(OffsetsRanges:Array[OffsetRange],groupName:String)={
    val offsetRootPath = s"/"+groupName
    if (client.checkExists().forPath(offsetRootPath) == null) {
      client.create().creatingParentsIfNeeded().forPath(offsetRootPath)
    }
    for(els <- OffsetsRanges ){
      val data = String.valueOf(els.utilOffset).getBytes
      val path = s"$offsetRootPath/offsets/${els.topic}/partition/${els.partition}"
      // 創建路徑
      ensurePathExists(path)
      // 寫入數據
      client.setData().forPath(path, data)
    }
  }
  /**
    * TopicAndPartition類定義(偏移量key對象)
    *  用于提取偏移量
    */
  case class TopicAndPartition(
                                topic:String,  // 主題
                                partition:Int  // 分區
                              )
  /**
    * zookeeper提取offset的方法
    * @param topic
    * @param groupName
    * @return
    */
  def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition,Long]={
    // 定義一個空的HashMap
    val maps = mutable.HashMap[TopicAndPartition,Long]()
    // offset的路徑
    val offsetRootPath = s"/"+groupName+"/offsets/"+topic+"/partition"
    // 判斷路徑是否存在
    val stat = client.checkExists().forPath(s"$offsetRootPath")
    if (stat == null ){
      println(stat)  // 路徑不存在 就將路徑打印在控制臺,檢查路徑
    }else{
      // 獲取 offsetRootPath路徑下一級的所有子目錄
      // 我們這里是獲取的所有分區
      val children = client.getChildren.forPath(s"$offsetRootPath")
     // 遍歷所有的分區
      for ( lines <- children ){
        // 獲取分區的數據
        val data = new String(client.getData().forPath(s"$offsetRootPath/"+lines)).toLong
        // 將 topic  partition  和數據賦值給 maps
        maps(TopicAndPartition(topic,lines.toInt)) = data
      }
    }
    // 按partition排序后 返回map對象
    maps.toList.sortBy(_._1.partition).toMap
  }
  def main(args: Array[String]) {
      //定義初始化數據
      val off1 = OffsetRange("ruoze_offset_topic",0,0,7)
      val off2 = OffsetRange("ruoze_offset_topic",1,0,3)
      val off3 = OffsetRange("ruoze_offset_topic",2,0,5)
      val arr = Array(off1,off2,off3)
      //獲取到namespace
//      println(client.getNamespace)
      // 創建路徑
//      val offsetRootPath = "/G322"
//      if (client.checkExists().forPath(offsetRootPath) == null) {
//        client.create().creatingParentsIfNeeded().forPath(offsetRootPath)
//      }
      //存儲值
      storeOffsets(arr,"G322")
      //獲取值
      /**
        * 輸出格式:
        * topic:ruoze_offset_topic	partition:0	offset:7
        * topic:ruoze_offset_topic	partition:1	offset:3
        * topic:ruoze_offset_topic	partition:2	offset:5
        */
      val result = obtainOffsets("ruoze_offset_topic","G322")
      for (map <- result){
        println("topic:"+map._1.topic+"\t" +"partition:"+map._1.partition+"\t"+"offset:"+map._2)
      }
  }
}

感謝各位的閱讀,以上就是“如何將數據按指定格式存入zookeeper”的內容了,經過本文的學習后,相信大家對如何將數據按指定格式存入zookeeper這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

息烽县| 新和县| 蓬安县| 灵台县| 东乌| 衢州市| 通辽市| 洪雅县| 东台市| 渝中区| 含山县| 吉木萨尔县| 石台县| 文成县| 肇庆市| 双峰县| 宜章县| 邹平县| 濮阳县| 无锡市| 葫芦岛市| 东明县| 郧西县| 无棣县| 锦州市| 杭锦旗| 水富县| 都昌县| 东丰县| 仪陇县| 永吉县| 道孚县| 鹤山市| 城步| 安溪县| 八宿县| 鄱阳县| 遵义县| 高雄市| 仪征市| 青田县|