中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

修改SequenceFileInputFormat hdfs blocksize

發布時間:2020-07-04 09:50:55 來源:網絡 閱讀:579 作者:xiaobin0303 欄目:開發技術

用spark讀取sequencefile時,非常消耗時間,默認情況下SequenceFileInputFormat切分文件是沿用FIleInputFormat,對于大文件會切成Hdfs block size大小,如果想切的更小,增加spark任務的并法度,可以自己修改:

class MySequenceFileInputFormat[K, V] extends FileInputFormat[K, V] {
  private val LOG: Log = LogFactory.getLog(classOf[MySequenceFileInputFormat[K, V]])

  val sequenceFileBlockSize = 30000000 //手動設置blocksize為30M
  val  SPLIT_SLOP:Double = 1.1;   // 10% slop
  val  NUM_INPUT_FILES:String ="mapreduce.input.fileinputformat.numinputfiles";

  @throws[IOException]
  def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[K, V] = new SequenceFileRecordReader

  override protected def getFormatMinSplitSize: Long = 2000L

  @throws[IOException]
  override protected def listStatus(job: JobContext): List[FileStatus] = {
    val files: List[FileStatus] = super.listStatus(job)
    val len: Int = files.size
    var j: Int = 0

    for (i<-0 to len-1){
      val f = files.get(i)
      if(f.isDirectory){
        val pth:Path = f.getPath
        val fs: FileSystem = pth.getFileSystem(job.getConfiguration)
        files.set(i,fs.getFileStatus(new Path(pth, "data")))
      }
      if((files.get(i)).getLen() != 0L) {
        files.set(j, files.get(i))
        j+=1
      }
    }

    files.subList(0, j)
  }

  @throws[IOException]
  override def getSplits(job: JobContext): List[InputSplit] = {
    val sw :Stopwatch= new Stopwatch().start();
    val minSize:Long = Math.max(getFormatMinSplitSize(), FileInputFormat.getMinSplitSize(job));
    val maxSize :Long= FileInputFormat.getMaxSplitSize(job);

    // generate splits
    val splits: ArrayList[InputSplit] = new ArrayList[InputSplit]
    val files: List[FileStatus] = listStatus(job)
    for ( i<- 0 to files.size()-1) {
      val file = files.get(i)
       val path:Path = file.getPath();
      val length:Long = file.getLen();
      if (length != 0) {
        var blkLocations: Array[BlockLocation] = null
        if (file.isInstanceOf[LocatedFileStatus] ) {
          blkLocations = ( file.asInstanceOf[LocatedFileStatus]).getBlockLocations()
        } else {
          val fs:FileSystem = path.getFileSystem(job.getConfiguration())
          blkLocations = fs.getFileBlockLocations(file, 0, length)
        }
        if (isSplitable(job, path)) {
//          val blockSize:Long = file.getBlockSize()
          val blockSize:Long = sequenceFileBlockSize
          val splitSize:Long = computeSplitSize(blockSize, minSize, maxSize)

          var bytesRemaining:Long = length;
          while (( bytesRemaining.toDouble)/splitSize > SPLIT_SLOP) {
            val blkIndex:Int = getBlockIndex(blkLocations, length-bytesRemaining)
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
              blkLocations(blkIndex).getHosts(),
              blkLocations(blkIndex).getCachedHosts()))
            bytesRemaining -= splitSize
          }

          if (bytesRemaining != 0) {
            val blkIndex:Int = getBlockIndex(blkLocations, length-bytesRemaining)
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
              blkLocations(blkIndex).getHosts(),
              blkLocations(blkIndex).getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations(0).getHosts(),
          blkLocations(0).getCachedHosts()));
        }
      } else {
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new Array[String](0)))
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size())
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
        + ", TimeTaken: " + sw.elapsedMillis())
    }
    return splits
  }


}


sequenceFileBlockSize  改成自己想要的大小


使用:

val dd = sc.newAPIHadoopFile[BytesWritable,BytesWritable, MySequenceFileInputFormat[BytesWritable,BytesWritable]](sourceDir).flatMap(x=>{
  function(new String(x._2.getBytes))
})


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

舒兰市| 昂仁县| 威海市| 富蕴县| 马鞍山市| 昌黎县| 徐州市| 凤阳县| 响水县| 本溪| 盐源县| 富锦市| 永寿县| 封开县| 金塔县| 齐齐哈尔市| 华亭县| 维西| 澜沧| 炉霍县| 泽普县| 齐齐哈尔市| 双峰县| 田东县| 县级市| 柏乡县| 新晃| 芦山县| 肇庆市| 福清市| 宁海县| 军事| 台北市| 鄯善县| 祁东县| 舒城县| 昌都县| 长岭县| 巩留县| 乌拉特前旗| 玉龙|