中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

ORC文件讀寫工具類和Flink輸出ORC格式文件的方法

發布時間:2021-06-24 09:29:01 來源:億速云 閱讀:1607 作者:chen 欄目:大數據

本篇內容主要講解“ORC文件讀寫工具類和Flink輸出ORC格式文件的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“ORC文件讀寫工具類和Flink輸出ORC格式文件的方法”吧!

一.ORC文件:

壓縮

壓縮比例在1:7到1:10之間,3份副本的話會節省接近10倍空間

調查數據周末要給出

數據壓縮后要注意負載均衡問題,可以嘗試reblance

導出

hive的orc文件使用sqoop導出到mysql使用hcatalog直接增加一些配置參數即可

查看

以json方式查看orc文件

hive --orcfiledump -j -p /user/hive/warehouse/dim.db/dim_province/000000_0

下載

以KV形式查看orc文件

hive --orcfiledump -d /user/hive/warehouse/dim.db/dim_province/000000_0 > myfile.txt

orc讀取會查找字段在min和max中的值,不包含則跳過,所以速度會快

二,orc讀寫工具類

注意事項: 在windows讀寫時,請務必保證classpath ,path中不要有hadoop的環境變量! 如果有,請先刪除,并且重啟IDE 

2.1 讀:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;

import java.io.IOException;

public class CoreReader {
  public static void main(Configuration conf, String[] args) throws IOException {
    // Get the information from the file footer
    Reader reader = OrcFile.createReader(new Path("my-file.orc"),
                                         OrcFile.readerOptions(conf));
    System.out.println("File schema: " + reader.getSchema());
    System.out.println("Row count: " + reader.getNumberOfRows());

    // Pick the schema we want to read using schema evolution
    TypeDescription readSchema =
        TypeDescription.fromString("struct<z:int,y:string,x:bigint>");
    // Read the row data
    VectorizedRowBatch batch = readSchema.createRowBatch();
    RecordReader rowIterator = reader.rows(reader.options()
                                             .schema(readSchema));
    LongColumnVector z = (LongColumnVector) batch.cols[0];
    BytesColumnVector y = (BytesColumnVector) batch.cols[1];
    LongColumnVector x = (LongColumnVector) batch.cols[2];
    while (rowIterator.nextBatch(batch)) {
      for(int row=0; row < batch.size; ++row) {
        int zRow = z.isRepeating ? 0: row;
        int xRow = x.isRepeating ? 0: row;
        System.out.println("z: " +
            (z.noNulls || !z.isNull[zRow] ? z.vector[zRow] : null));
        System.out.println("y: " + y.toString(row));
        System.out.println("x: " +
            (x.noNulls || !x.isNull[xRow] ? x.vector[xRow] : null));
      }
    }
    rowIterator.close();
  }

  public static void main(String[] args) throws IOException {
    main(new Configuration(), args);
  }
}

 2.2,寫:

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.orc.OrcFile;import org.apache.orc.TypeDescription;import org.apache.orc.Writer;import java.io.IOException;import java.nio.charset.StandardCharsets;public class CoreWriter {  public static void main(Configuration conf, String[] args) throws IOException {
    TypeDescription schema =
      TypeDescription.fromString("struct<x:int,y:string>");
    Writer writer = OrcFile.createWriter(new Path("my-file.orc"),
                                         OrcFile.writerOptions(conf)
                                          .setSchema(schema));
    VectorizedRowBatch batch = schema.createRowBatch();
    LongColumnVector x = (LongColumnVector) batch.cols[0];
    BytesColumnVector y = (BytesColumnVector) batch.cols[1];for(int r=0; r < 10000; ++r) {      int row = batch.size++;
      x.vector[row] = r;      byte[] buffer = ("Last-" + (r * 3)).getBytes(StandardCharsets.UTF_8);
      y.setRef(row, buffer, 0, buffer.length);      // If the batch is full, write it out and start over.      if (batch.size == batch.getMaxSize()) {
        writer.addRowBatch(batch);
        batch.reset();
      }
    }if (batch.size != 0) {
      writer.addRowBatch(batch);
    }
    writer.close();
  }  public static void main(String[] args) throws IOException {main(new Configuration(), args);
  }
}

2.3 Flink Sink ORC文件示例:(基于flink1.12.3版本)

import org.apache.flink.core.fs.Path;
import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;

import org.apache.hadoop.conf.Configuration;
import org.apache.orc.TypeDescription;

import java.util.Properties;

public class StreamingWriteFileOrc {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);
        env.setParallelism(1);
        DataStream<RowData> dataStream = env.addSource(
                new MySource());

        //寫入orc格式的屬性
        final Properties writerProps = new Properties();
        writerProps.setProperty("orc.compress", "LZ4");

        //定義類型和字段名
        LogicalType[] orcTypes = new LogicalType[]{
                new IntType(), new DoubleType(), new VarCharType()};
        String[] fields = new String[]{"a", "b", "c"};
        TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(RowType.of(
                orcTypes,
                fields));

        //構造工廠類OrcBulkWriterFactory
        final OrcBulkWriterFactory<RowData> factory = new OrcBulkWriterFactory<>(
                new RowDataVectorizer(typeDescription.toString(), orcTypes),
                writerProps,
                new Configuration());

        StreamingFileSink orcSink = StreamingFileSink
                .forBulkFormat(new Path("file:///tmp/aaaa"), factory)
                .build();

        dataStream.addSink(orcSink);

        env.execute();
    }

    public static class MySource implements SourceFunction<RowData>{
        @Override
        public void run(SourceContext<RowData> sourceContext) throws Exception{
            while (true){
                GenericRowData rowData = new GenericRowData(3);
                rowData.setField(0, (int) (Math.random() * 100));
                rowData.setField(1, Math.random() * 100);
                rowData.setField(2, org.apache.flink.table.data.StringData.fromString(String.valueOf(Math.random() * 100)));
                sourceContext.collect(rowData);
                Thread.sleep(1);
            }
        }

        @Override
        public void cancel(){

        }
    }

}

2.4 POM依賴

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <encoding>UTF-8</encoding>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.tools.version>2.11</scala.tools.version>
        <scala.version>2.11</scala.version>
        <flink.cluster.version>1.12.3</flink.cluster.version>
        <logback.version>1.2.0</logback.version>
        <slf4j.version>1.7.21</slf4j.version>
        <hbase.version>1.3.1</hbase.version>
        <scope.value>compile</scope.value>
    </properties>

    <dependencies>
     
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.4</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.15</version>
        </dependency>

        <!-- 單元測試組件-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-yarn-common</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-yarn-api</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>hadoop-mapreduce-client-core</artifactId>
                    <groupId>org.apache.hadoop</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>hadoop-auth</artifactId>
                    <groupId>org.apache.hadoop</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>hadoop-common</artifactId>
                    <groupId>org.apache.hadoop</groupId>
                </exclusion>
            </exclusions>
        </dependency>


        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3.2</version>
        </dependency>


        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.cluster.version}</version>
            <scope>${scope.value}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>${flink.cluster.version}</version>
            <type>pom</type>
            <scope>${scope.value}</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.cluster.version}</version>
            <scope>${scope.value}</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.cluster.version}</version>
            <scope>${scope.value}</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>1.11.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_${scala.version}</artifactId>
            <version>1.11.3</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-orc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-orc_2.11</artifactId>
            <version>1.12.3</version>
            <scope>${scope.value}</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-ml_${scala.version}</artifactId>
            <version>1.8.1</version>
            <scope>${scope.value}</scope>
        </dependency>


        <!-- 新的Blink planner -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.cluster.version}</version>
            <scope>${scope.value}</scope>
        </dependency>

        <!-- 如果需要實現自定義的格式(比如和kafka交互)或者用戶自定義函數,需要添加如下依賴 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.cluster.version}</version>
            <scope>${scope.value}</scope>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>1.12.3</version>
            <scope>${scope.value}</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.cluster.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>commons-lang3</artifactId>
                    <groupId>org.apache.commons</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-cli</artifactId>
                    <groupId>commons-cli</groupId>
                </exclusion>
            </exclusions>
            <scope>${scope.value}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.cluster.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.3</version>
            <scope>${scope.value}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.3</version>
            <scope>${scope.value}</scope>
            <exclusions>
                <exclusion>
                    <groupId>xml-apis</groupId>
                    <artifactId>xml-apis</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet_${scala.version}</artifactId>
            <version>${flink.cluster.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>${flink.cluster.version}</version>
        </dependency>

        <!-- 日志相關組件-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>

        <!-- 用于redis訪問-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.5.0</version>
        </dependency>

        <!--alibaba druid數據庫連接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.11</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.cluster.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.2.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>

    </dependencies>

到此,相信大家對“ORC文件讀寫工具類和Flink輸出ORC格式文件的方法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

龙门县| 上蔡县| 鸡东县| 无为县| 大田县| 江源县| 玉龙| 射洪县| 虞城县| 辽源市| 河东区| 从化市| 辛集市| 垦利县| 安西县| 常熟市| 龙陵县| 阿克陶县| 定兴县| 宜都市| 吉安市| 日喀则市| 虎林市| 樟树市| 芜湖市| 株洲市| 乐平市| 呼伦贝尔市| 金平| 册亨县| 大埔县| 广德县| 乌审旗| 萨嘎县| 沁阳市| 辽中县| 靖远县| 台南市| 汶上县| 麦盖提县| 安义县|