中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

實戰:Streaming data into HBase using Flum

發布時間:2020-07-24 15:05:04 來源:網絡 閱讀:1995 作者:forestwater 欄目:大數據

1. create Idea project for AsyncHbaseEventSerializer

實戰:Streaming data into HBase using Flum

添加dependency 到pom.xml

<dependency>

   <groupId>org.apache.flume.flume-ng-sinks</groupId>

   <artifactId>flume-ng-hbase-sink</artifactId>

   <version>1.6.0</version>

</dependency>

Implements AsyncHbaseEventSerializer according to the business.

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.conf.ComponentConfiguration;

import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer;

import org.hbase.async.AtomicIncrementRequest;

import org.hbase.async.PutRequest;

import java.util.ArrayList;

import java.util.List;

/**

* Created by root on 12/5/17.

*/

public class SplittingSerializer implements AsyncHbaseEventSerializer {

   private byte[] table;

   private byte[] colFam;

   private Event currentEvent;

   private byte[][]rentRowKey;

   private final byte[] eventCountCol = "eventCount".getBytes();

   columnNames;

   private final List<PutRequest> puts = new ArrayList<PutRequest>();

   private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>();

   private byte[] cur

   public void initialize(byte[] table, byte[] cf) {

       this.table = table;

       this.colFam = cf;

       //Can not get the columns from context in configure method. Had to hard coded here.

       columnNames = new byte[3][];

       columnNames[0] = "name".getBytes();

       columnNames[1] = "id".getBytes();

       columnNames[2] = "phone".getBytes();

   }

   public void setEvent(Event event) {

       // Set the event and verify that the rowKey is not present

       this.currentEvent = event;

/*

       //Don't know how to set the key of event header.

       String rowKeyStr = currentEvent.getHeaders().get("rowKey");

       if (rowKeyStr == null) {

           throw new FlumeException("No row key found in headers!");

       }

       currentRowKey = rowKeyStr.getBytes();*/

   }

   public List<PutRequest> getActions() {

       // Split the event body and get the values for the columns

       String eventStr = new String(currentEvent.getBody());

       String[] cols = eventStr.split(",");

       Long currTime = System.currentTimeMillis();

       long revTs = Long.MAX_VALUE - currTime;

       currentRowKey = (Long.toString(revTs) + cols[0]).getBytes();

       puts.clear();

       for (int i = 0; i < cols.length; i++) {

           //Generate a PutRequest for each column.

           PutRequest req = new PutRequest(table, currentRowKey, colFam,

                   columnNames[i], cols[i].getBytes());

           puts.add(req);

       }

       return puts;

   }

   public List<AtomicIncrementRequest> getIncrements() {

       incs.clear();

       //Increment the number of events received

       incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));

       return incs;

   }

   public void cleanUp() {

       table = null;

       colFam = null;

       currentEvent = null;

       columnNames = null;

       currentRowKey = null;

   }

   public void configure(Context context) {

       //Get the column names from the configuration

       //Did not work. Don't know how to use it.

       String cols = new String(context.getString("columns"));

       String[] names = cols.split(",");

       byte[][] columnNames = new byte[names.length][];

       int i = 0;

       System.out.println("getting columnNames");

       for(String name : names) {

           columnNames[i++] = name.getBytes();

       }

   }

   public void configure(ComponentConfiguration componentConfiguration) {

   }

}

build and deploy the jar file

實戰:Streaming data into HBase using Flum

實戰:Streaming data into HBase using Flum

build --> build artifacts

copy to the lib directory of flume. Here I use scp to upload to the flume of another host.

實戰:Streaming data into HBase using Flum

2. configure flume

a1.sources = r1

a1.channels = c1 c2

a1.sinks = k1 sink2

a1.source.s1.selector.type = replicating

#NetCat TCP source

a1.sources.r1.type = netcat

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 6666

a1.sources.r1.channels = c1 c2

#channel

a1.channels.c2.type = memory

a1.channels.c2.capacity = 10000

a1.channels.c2.transactionCapacity = 1000

#HBase sink

a1.sinks.sink2.type = org.apache.flume.sink.hbase.AsyncHBaseSink

a1.sinks.sink2.channel = c2

a1.sinks.sink2.table = law

a1.sinks.sink2.columnFamily = lawfile

a1.sinks.sink2.batchSize = 5000

#The serializer to use

a1.sinks.sink2.serializer = ifre.flume.hbase.SplittingSerializer

#List of columns each event writes to.

a1.sinks.sink2.serializer.columns = name,id,phone

3. create hbase table

# hbase shell

create "law" "lawfile"

4. run flume agent

[root@ifrebigsearch2 apache-flume-1.6.0-bin]# bin/flume-ng agent --conf conf --conf-file conf/crawler-hdfs-conf.properties --name a1 -Dflume.root.logger=INFO,console

5. run nc

[root@ifrebigsearch0 dkh]# nc ifrebigsearch2 6666

zhangsan,10110198806054561,13812345678

OK

6.result

hbase(main):002:0> scan 'law'

ROW                   COLUMN+CELL                                              

9223370524386395508z column=lawfile:id, timestamp=1512468380362, value=10110198

hangsan              806054561                                                

9223370524386395508z column=lawfile:name, timestamp=1512468380361, value=zhangs

hangsan              an                                                        

9223370524386395508z column=lawfile:phone, timestamp=1512468380363, value=13812

hangsan              345678                                                    


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

安阳县| 神农架林区| 隆德县| 定结县| 扎鲁特旗| 和田县| 天全县| 衡山县| 鸡东县| 永年县| 县级市| 西林县| 华池县| 滨海县| 陆河县| 辽中县| 蒲江县| 昌黎县| 固安县| 山东省| 潢川县| 钟祥市| 韶关市| 临沭县| 茶陵县| 江山市| 浦江县| 浦东新区| 锡林郭勒盟| 侯马市| 沙湾县| 英吉沙县| 洱源县| 分宜县| 祁连县| 老河口市| 霍州市| 勐海县| 安远县| 新巴尔虎右旗| 淮北市|