中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Mapreduce如何掃描hbase表建立solr索引

發布時間:2021-12-08 15:09:09 來源:億速云 閱讀:119 作者:小新 欄目:云計算

小編給大家分享一下Mapreduce如何掃描hbase表建立solr索引,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

package com.hbase.index;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RebuildHbaseIndex {
    public static final Logger LOG = LoggerFactory
            .getLogger(RebuildHbaseIndex.class);

    public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {
        
        Configuration conf = HBaseConfiguration.create();
        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        //每次讀取100條數據
        conf.setInt("hbase.client.scanner.caching", 100);
        String[] tbNames={"Suggest"};
        for(int i=0;i<tbNames.length;i++){
            Job job = SolrIndexerMapper.createSubmittableJob(conf, tbNames[i]);
            
            if (job == null) {
                System.exit(-1);
            }
            job.waitForCompletion(true);
            Counter counter = job.getCounters().findCounter(SolrIndexerMapper.Counters.ROWS);
            LOG.info("tbNames[i]: Put " + counter.getValue() + " records to Solr!"); // 打印日志
        }
    }

}
package com.hbase.index;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class SolrIndexerMapper extends TableMapper<Text, Text> {
    
    public static final Logger LOG = LoggerFactory.getLogger(SolrIndexerMapper.class);
    //計數器
    public static enum Counters {ROWS}; 
    //只創建一個SolrServer實例
    private SolrServer solr;
    public String solrURL="http://192.168.1.79:8983/solr/IK_shard1_replica1";
    private int commitSize;
    private final List<SolrInputDocument> docs=new ArrayList<SolrInputDocument>();
    
    //任務開始調用
    protected void setup(Context context){
        Configuration conf=context.getConfiguration();
        solr=new HttpSolrServer(solrURL);
        //一次性添加文檔數
        commitSize=conf.getInt("solr.commit.size", 1000);
    }

    @Override
    protected void map(ImmutableBytesWritable row, Result values,Context context)throws IOException, InterruptedException {
        SolrInputDocument solrDoc = new SolrInputDocument();
        String rowkey=Bytes.toString(values.getRow());
        String id=Bytes.toString(values.getRow());
        String tableName="Suggest";
        
        solrDoc.addField("id", id);
        solrDoc.addField("rowkey", rowkey);
        //hbase里面需要增加tableName字段
        solrDoc.addField("tableName", tableName);   
        
          for (KeyValue kv : values.list()) {
            String fieldName = Bytes.toString(kv.getQualifier());
            String fieldValue = Bytes.toString(kv.getValue());
            solrDoc.addField(fieldName, fieldValue);
        }
          
          docs.add(solrDoc);
        if (docs.size() >= commitSize) {
            try {
                LOG.info("添加文檔:Adding " + Integer.toString(docs.size()) + " documents");
                solr.add(docs); // 索引文檔
            } catch (final SolrServerException e) {
                final IOException ioe = new IOException();
                ioe.initCause(e);
                throw ioe;
            }
            docs.clear();
        }
        context.getCounter(Counters.ROWS).increment(1);
    }
    
    //任務結束時候調用
    @Override
    protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
            throws IOException, InterruptedException {
        try {
            if(!docs.isEmpty()){
                LOG.info("清空隊列:Adding " + Integer.toString(docs.size()) + " documents");
                solr.add(docs);
                docs.clear();
            }
            
        } catch (final SolrServerException e) {
            final IOException ioe=new IOException();
            ioe.initCause(e);
            throw ioe;
        }
    }

    public static Job createSubmittableJob(Configuration conf, String tableName) throws IOException {
        Job job=Job.getInstance(conf,"SolrIndex_" + tableName);
        job.setJarByClass(SolrIndexerMapper.class);
        Scan scan=new Scan();
        //scan的數據不放在緩存中,一次性的
        scan.setCacheBlocks(false);
        job.setOutputFormatClass(NullOutputFormat.class);
        TableMapReduceUtil.initTableMapperJob(tableName, scan,
                SolrIndexerMapper.class, null, null, job); // 不需要輸出,鍵、值類型為null
        job.setNumReduceTasks(0); // 無reduce任務
        return job;
        
    }
}

看完了這篇文章,相信你對“Mapreduce如何掃描hbase表建立solr索引”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

始兴县| 高陵县| 邛崃市| 历史| 肃宁县| 花莲县| 冕宁县| 黄石市| 容城县| 张家界市| 东安县| 延边| 广灵县| 上犹县| 和硕县| 繁峙县| 龙海市| 安溪县| 凤台县| 松溪县| 沈丘县| 栖霞市| 淳安县| 安庆市| 玉龙| 康乐县| 宜君县| 盐源县| 楚雄市| 洪江市| 漳平市| 太仆寺旗| 林甸县| 拜城县| 临沭县| 炎陵县| 阜平县| 视频| 吐鲁番市| 修水县| 广南县|