博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HBase + Solr Cloud实现HBase二级索引
阅读量:6702 次
发布时间:2019-06-25

本文共 12972 字,大约阅读时间需要 43 分钟。

1. 执行流程

 

 

2. Solr Cloud实现

3. HBase实现

1) 自定义Observer

① 代码

 

[java] 
 
 
  1. package cn.bfire.coprocessor;  
  2.   
  3. import com.typesafe.config.Config;  
  4. import com.typesafe.config.ConfigFactory;  
  5. import org.apache.hadoop.hbase.Cell;  
  6. import org.apache.hadoop.hbase.CellUtil;  
  7. import org.apache.hadoop.hbase.client.Delete;  
  8. import org.apache.hadoop.hbase.client.Durability;  
  9. import org.apache.hadoop.hbase.client.Put;  
  10. import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;  
  11. import org.apache.hadoop.hbase.coprocessor.ObserverContext;  
  12. import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;  
  13. import org.apache.hadoop.hbase.regionserver.wal.WALEdit;  
  14. import org.apache.hadoop.hbase.util.Bytes;  
  15. import org.apache.solr.common.SolrInputDocument;  
  16. import org.slf4j.Logger;  
  17. import org.slf4j.LoggerFactory;  
  18.   
  19. import java.io.IOException;  
  20. import java.util.List;  
  21.   
  22. /** 
  23.  * 为hbase提供二级索引的协处理器 Coprocesser 
  24.  */  
  25. public class UserDevPiSolrObserver extends BaseRegionObserver {  
  26.   
  27.     //加载配置文件属性  
  28.     static Config config = ConfigFactory.load("userdev_pi_solr.properties");  
  29.   
  30.     //log记录  
  31.     private static final Logger logger = LoggerFactory.getLogger(UserDevPiSolrObserver.class);  
  32.   
  33.     @Override  
  34.     public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {  
  35.         // 获取行键值  
  36.         String rowkey = Bytes.toString(put.getRow());  
  37.         //实例化 SolrDoc  
  38.         SolrInputDocument doc = new SolrInputDocument();  
  39.         //添加Solr uniqueKey值  
  40.         doc.addField("rowkey", rowkey);  
  41.         // 获取需要索引的列  
  42.         String[] hbase_columns = config.getString("hbase_column").split(",");  
  43.   
  44.         // 获取需要索引的列的值并将其添加到SolrDoc  
  45.         for (int i = 0; i < hbase_columns.length; i++) {  
  46.             String colName = hbase_columns[i];  
  47.             String colValue = "";  
  48.             // 获取指定列  
  49.             List<Cell> cells = put.get("cf".getBytes(), colName.getBytes());  
  50.             if (cells != null) {  
  51.                 try {  
  52.                     colValue = Bytes.toString(CellUtil.cloneValue(cells.get(0)));  
  53.                 } catch (Exception ex) {  
  54.                     logger.error("添加solrdoc错误", ex);  
  55.                 }  
  56.             }  
  57.   
  58.             doc.addField(colName, colValue);  
  59.         }  
  60.   
  61.         //发送数据到本地缓存  
  62.         SolrIndexTools.addDoc(doc);  
  63.     }  
  64.   
  65.     @Override  
  66.     public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {  
  67.         //得到rowkey  
  68.         String rowkey = Bytes.toString(delete.getRow());  
  69.         //发送数据本地缓存  
  70.         String solr_collection = config.getString("solr_collection");  
  71.         SolrIndexTools.delDoc(rowkey);  
  72.     }  
  73. }  

 

[java] 
 
 
  1. package cn.bfire.coprocessor;  
  2.   
  3. import com.typesafe.config.Config;  
  4. import com.typesafe.config.ConfigFactory;  
  5. import org.apache.solr.client.solrj.impl.CloudSolrClient;  
  6. import org.apache.solr.common.SolrInputDocument;  
  7. import org.slf4j.Logger;  
  8. import org.slf4j.LoggerFactory;  
  9.   
  10. import java.util.ArrayList;  
  11. import java.util.List;  
  12. import java.util.Timer;  
  13. import java.util.TimerTask;  
  14. import java.util.concurrent.Semaphore;  
  15.   
  16. /** 
  17.  * solr索引处理客户端 
  18.  * 注意问题,并发提交时,需要线程协作资源 
  19.  */  
  20. public class SolrIndexTools {  
  21.     //加载配置文件属性  
  22.     static Config config = ConfigFactory.load("userdev_pi_solr.properties");  
  23.     //log记录  
  24.     private static final Logger logger = LoggerFactory.getLogger(SolrIndexTools.class);  
  25.     //实例化solr的client  
  26.     static CloudSolrClient client = null;  
  27.     //添加批处理阈值  
  28.     static int add_batchCount = config.getInt("add_batchCount");  
  29.     //删除的批处理阈值  
  30.     static int del_batchCount = config.getInt("del_batchCount");  
  31.     //添加的集合缓冲  
  32.     static List<SolrInputDocument> add_docs = new ArrayList<SolrInputDocument>();  
  33.     //删除的集合缓冲  
  34.     static List<String> del_docs = new ArrayList<String>();  
  35.   
  36.   
  37.     static final List<String> zkHosts = new ArrayList<String>();  
  38.   
  39.     static {  
  40.         logger.info("初始化索引调度........");  
  41.         String zk_host = config.getString("zk_host");  
  42.         String[] data = zk_host.split(",");  
  43.         for (String zkHost : data) {  
  44.             zkHosts.add(zkHost);  
  45.         }  
  46.         client = new CloudSolrClient.Builder().withZkHost(zkHosts).build();  
  47.         // 获取Solr collection  
  48.         String solr_collection = config.getString("solr_collection");  
  49.         client.setDefaultCollection(solr_collection);  
  50.         client.setZkClientTimeout(10000);  
  51.         client.setZkConnectTimeout(10000);  
  52.   
  53.         //启动定时任务,第一次延迟1s执行,之后每隔指定时间30S执行一次  
  54.         Timer timer = new Timer();  
  55.         timer.schedule(new SolrCommit(), config.getInt("first_delay") * 1000, config.getInt("interval_commit_index") * 1000);  
  56.     }  
  57.   
  58.     public static class SolrCommit extends TimerTask {  
  59.         @Override  
  60.         public void run() {  
  61.   
  62.             logger.info("索引线程运行中........");  
  63.             //只有等于true时才执行下面的提交代码  
  64.             try {  
  65.                 semp.acquire();//获取信号量  
  66.                 if (add_docs.size() > 0) {  
  67.                     client.add(add_docs);//添加  
  68.                 }  
  69.                 if (del_docs.size() > 0) {  
  70.                     client.deleteById(del_docs);//删除  
  71.                 }  
  72.                 //确保都有数据才提交  
  73.                 if (add_docs.size() > 0 || del_docs.size() > 0) {  
  74.                     client.commit();//共用一个提交策略  
  75.                     //清空缓冲区的添加和删除数据  
  76.                     add_docs.clear();  
  77.                     del_docs.clear();  
  78.                 } else {  
  79.                     logger.info("暂无索引数据,跳过commit,继续监听......");  
  80.                 }  
  81.             } catch (Exception e) {  
  82.                 logger.error("间隔提交索引数据出错!", e);  
  83.             } finally {  
  84.                 semp.release();//释放信号量  
  85.             }  
  86.   
  87.   
  88.         }  
  89.     }  
  90.   
  91.   
  92.     /** 
  93.      * 添加数据到临时存储中,如果 
  94.      * 大于等于batchCount时,就提交一次, 
  95.      * 再清空集合,其他情况下走对应的时间间隔提交 
  96.      * 
  97.      * @param doc 单个document对象 
  98.      */  
  99.     public static void addDoc(SolrInputDocument doc) {  
  100.         commitIndex(add_docs, add_batchCount, doc, true);  
  101.     }  
  102.   
  103.   
  104.     /*** 
  105.      * 删除的数据添加到临时存储中,如果大于 
  106.      * 对应的批处理就直接提交,再清空集合, 
  107.      * 其他情况下走对应的时间间隔提交 
  108.      * 
  109.      * @param rowkey 删除的rowkey 
  110.      */  
  111.     public static void delDoc(String rowkey) {  
  112.         commitIndex(del_docs, del_batchCount, rowkey, false);  
  113.     }  
  114.   
  115.     // 任何时候,保证只能有一个线程在提交索引,并清空集合  
  116.     final static Semaphore semp = new Semaphore(1);  
  117.   
  118.   
  119.     /*** 
  120.      * 此方法需要加锁,并且提交索引时,与时间间隔提交是互斥的 
  121.      * 百分百确保不会丢失数据 
  122.      * 
  123.      * @param datas 用来提交的数据集合 
  124.      * @param count 对应的集合提交数量 
  125.      * @param doc   添加的单个doc 
  126.      * @param isAdd 是否为添加动作 
  127.      */  
  128.     public synchronized static void commitIndex(List datas, int count, Object doc, boolean isAdd) {  
  129.         try {  
  130.             semp.acquire();//获取信号量  
  131.             if (datas.size() >= count) {  
  132.   
  133.                 if (isAdd) {  
  134.                     client.add(datas);//添加数据到服务端中  
  135.                 } else {  
  136.                     client.deleteById(datas);//删除数据  
  137.                 }  
  138.                 client.commit();//提交数据  
  139.   
  140.                 datas.clear();//清空临时集合  
  141.   
  142.   
  143.             }  
  144.         } catch (Exception e) {  
  145.             e.printStackTrace();  
  146.             logger.error("按阈值" + (isAdd == true ? "添加" : "删除") + "操作索引数据出错!", e);  
  147.         } finally {  
  148.             datas.add(doc);//添加单条数据  
  149.             semp.release();//释放信号量  
  150.         }  
  151.   
  152.     }  
  153.   
  154. }  

 

[java] 
 
 
    1. <pre code_snippet_id="1962705" snippet_file_name="blog_20161102_1_8333418" style="font-family: Consolas; font-size: 11.3pt; background-color: rgb(255, 255, 255);">pom文件配置</pre>  
    2. <pre style="font-family:Consolas; font-size:11.3pt; background-color:rgb(255,255,255)"><pre code_snippet_id="1962705" snippet_file_name="blog_20161227_4_7977704" name="code" class="html"><?xml version="1.0" encoding="UTF-8"?>  
    3. <project xmlns="http://maven.apache.org/POM/4.0.0"  
    4.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    5.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
    6.     <modelVersion>4.0.0</modelVersion>  
    7.   
    8.     <groupId>cn.gcks</groupId>  
    9.     <artifactId>hbase</artifactId>  
    10.     <version>1.0-SNAPSHOT</version>  
    11.   
    12.     <dependencies>  
    13.         <!-- https://mvnrepository.com/artifact/org.apache.solr/solr-solrj -->  
    14.         <dependency>  
    15.             <groupId>org.apache.solr</groupId>  
    16.             <artifactId>solr-solrj</artifactId>  
    17.             <version>6.2.1</version>  
    18.             <exclusions>  
    19.                 <exclusion>  
    20.                     <groupId>org.slf4j</groupId>  
    21.                     <artifactId>slf4j-api</artifactId>  
    22.                 </exclusion>  
    23.             </exclusions>  
    24.         </dependency>  
    25.   
    26.         <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->  
    27.         <dependency>  
    28.             <groupId>org.apache.hbase</groupId>  
    29.             <artifactId>hbase-client</artifactId>  
    30.             <version>1.1.2</version>  
    31.             <exclusions>  
    32.                 <exclusion>  
    33.                     <groupId>org.apache.hadoop</groupId>  
    34.                     <artifactId>*</artifactId>  
    35.                 </exclusion>  
    36.             </exclusions>  
    37.         </dependency>  
    38.         <dependency>  
    39.             <groupId>org.apache.hbase</groupId>  
    40.             <artifactId>hbase-server</artifactId>  
    41.             <version>1.1.2</version>  
    42.             <exclusions>  
    43.                 <exclusion>  
    44.                     <groupId>org.apache.hadoop</groupId>  
    45.                     <artifactId>*</artifactId>  
    46.                 </exclusion>  
    47.             </exclusions>  
    48.         </dependency>  
    49.   
    50.         <!-- https://mvnrepository.com/artifact/com.typesafe/config -->  
    51.         <dependency>  
    52.             <groupId>com.typesafe</groupId>  
    53.             <artifactId>config</artifactId>  
    54.             <version>1.3.1</version>  
    55.         </dependency>  
    56.   
    57.     </dependencies>  
    58. </project></pre></pre>  
    59. <pre style="font-family:Consolas; font-size:11.3pt; background-color:rgb(255,255,255)"><p>  
    60. </p><p><span style="font-weight:bold; color:rgb(0,128,0); font-size:11.3pt; background-color:rgb(228,228,255)">userdev_pi_solr.properties</span></p><p></p><pre code_snippet_id="1962705" snippet_file_name="blog_20161227_5_7563783" name="code" class="plain">#需要建索引的列  
    61. hbase_column=oid,pi_id,statdate  
    62. # solr的collection名称  
    63. solr_collection=userdev_pi_day  
    64. #定义solr的url地址,如果是cloud模式,可以配置多个以逗号分隔  
    65. zk_host=1.1.1.1:2181,1.1.1.2:2181,1.1.1.3:2181  
    66. #调度第一次开始时,延迟多少秒执行  
    67. first_delay=10  
    68. #后台线程多久提交一次索引,单位秒  
    69. interval_commit_index=30  
    70. #添加索引的批处理数量  
    71. add_batchCount=10000  
    72. #删除索引的批处理数量  
    73. del_batchCount=2000</pre><br><br><p></p><p></p><p>② 打包代码并上传到<span style="font-family:Calibri">hdfs</span><span style="font-family:宋体">目录</span></p><p>③ 修改<span style="font-family:Calibri">HBase</span><span style="font-family:宋体">表(设置自定义</span><span style="font-family:Calibri">observer</span><span style="font-family:宋体">所在</span><span style="font-family:Calibri">hdfs</span><span style="font-family:宋体">位置,以及指定自定义</span><span style="font-family:Calibri">Observer</span><span style="font-family:宋体">全类名)</span></p><p>alter 'radius:raduserlog', 'coprocessor' => 'hdfs://<span style="color:rgb(0,112,192)">/apps/hbase/jars/hbase_solr.jar</span>|cn.bfire.coprocessor.UserDevPiSolrObserver|'</p><p>2) 数据查询代码</p><p></p><pre code_snippet_id="1962705" snippet_file_name="blog_20161102_4_5934630" name="code" class="java">package cn.bfire.solr;  
    74.   
    75. import org.apache.commons.logging.Log;  
    76. import org.apache.commons.logging.LogFactory;  
    77. import org.apache.hadoop.hbase.Cell;  
    78. import org.apache.hadoop.hbase.CellUtil;  
    79. import org.apache.hadoop.hbase.HBaseConfiguration;  
    80. import org.apache.hadoop.hbase.TableName;  
    81. import org.apache.hadoop.hbase.client.*;  
    82. import org.apache.hadoop.hbase.util.Bytes;  
    83. import org.apache.solr.client.solrj.SolrQuery;  
    84. import org.apache.solr.client.solrj.impl.CloudSolrClient;  
    85. import org.apache.solr.client.solrj.response.QueryResponse;  
    86. import org.apache.solr.common.SolrDocument;  
    87. import org.apache.solr.common.SolrDocumentList;  
    88. import org.apache.solr.common.SolrInputDocument;  
    89.   
    90. import java.util.ArrayList;  
    91. import java.util.Collection;  
    92. import java.util.List;  
    93.   
    94. public class SolrCloudTest {  
    95.     public static final Log LOG = LogFactory.getLog(SolrCloudTest.class);  
    96.     private static CloudSolrClient cloudSolrClient;  
    97.   
    98.     private static Connection connection;  
    99.     private static Table table;  
    100.     private static Get get;  
    101.     private static String defaultCollection = "userdev_pi_day";  
    102.     private static String hbaseTable = "<span style="font-family: Arial, Helvetica, sans-serif;">userdev_pi_day</span><span style="font-family: Arial, Helvetica, sans-serif;">";</span>  
    103.     List<Get> list = new ArrayList<Get>();  
    104.   
    105.     static {  
    106.         final List<String> zkHosts = new ArrayList<String>();  
    107.         zkHosts.add("1.1.1.1:2181");  
    108.         zkHosts.add("1.1.1.2:2181");  
    109.         zkHosts.add("1.1.1.3:2181");  
    110.         cloudSolrClient = new CloudSolrClient.Builder().withZkHost(zkHosts).build();  
    111.         final int zkClientTimeout = 10000;  
    112.         final int zkConnectTimeout = 10000;  
    113.         cloudSolrClient.setDefaultCollection(defaultCollection);  
    114.         cloudSolrClient.setZkClientTimeout(zkClientTimeout);  
    115.         cloudSolrClient.setZkConnectTimeout(zkConnectTimeout);  
    116.         try {  
    117.             connection = ConnectionFactory.createConnection(HBaseConfiguration.create());  
    118.             table = connection.getTable(TableName.valueOf(hbaseTable));  
    119.         } catch (Exception e) {  
    120.             e.printStackTrace();  
    121.         }  
    122.     }  
    123.   
    124.     private void addIndex(CloudSolrClient cloudSolrClient) throws Exception {  
    125.         Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();  
    126.         for (int i = 0; i <= 100; i++) {  
    127.             SolrInputDocument doc = new SolrInputDocument();  
    128.             String key = "";  
    129.             key = String.valueOf(i);  
    130.             doc.addField("rowkey", key);  
    131.             doc.addField("usermac", key + "usermac");  
    132.             doc.addField("userid", key + "userid");  
    133.             doc.addField("usertype", key + "usertype");  
    134.             doc.addField("city_id", key + "city_id");  
    135.             docs.add(doc);  
    136.         }  
    137.         LOG.info("docs info:" + docs + "\n");  
    138.         cloudSolrClient.add(docs);  
    139.         cloudSolrClient.commit();  
    140.     }  
    141.   
    142.     public void search(CloudSolrClient cloudSolrClient, String Str) throws Exception {  
    143.         SolrQuery query = new SolrQuery();  
    144.         query.setRows(100);  
    145.         query.setQuery(Str);  
    146.         LOG.info("query string: " + Str);  
    147.         QueryResponse response = cloudSolrClient.query(query);  
    148.         SolrDocumentList docs = response.getResults();  
    149.         System.out.println("文档个数:" + docs.getNumFound()); //数据总条数也可轻易获取  
    150.         System.out.println("查询时间:" + response.getQTime());  
    151.         System.out.println("查询总时间:" + response.getElapsedTime());  
    152.         for (SolrDocument doc : docs) {  
    153.             String rowkey = (String) doc.getFieldValue("rowkey");  
    154.             get = new Get(Bytes.toBytes(rowkey));  
    155.             list.add(get);  
    156.         }  
    157.   
    158.         Result[] res = table.get(list);  
    159.   
    160.         for (Result rs : res) {  
    161.             Cell[] cells = rs.rawCells();  
    162.   
    163.             for (Cell cell : cells) {  
    164.                 System.out.println("============");  
    165.                 System.out.println(new String(CellUtil.cloneRow(cell)));  
    166.                 System.out.println(new String(CellUtil.cloneFamily(cell)));  
    167.                 System.out.println(new String(CellUtil.cloneQualifier(cell)));  
    168.                 System.out.println(new String(CellUtil.cloneValue(cell)));  
    169.                 System.out.println("============");  
    170.                 break;  
    171.             }  
    172.         }  
    173.   
    174.   
    175.         table.close();  
    176.     }  
    177.   
    178.     public static void main(String[] args) throws Exception {  
    179.         cloudSolrClient.connect();  
    180.         SolrCloudTest solrt = new SolrCloudTest();  
    181. //            solrt.addIndex(cloudSolrClient);  
    182.         solrt.search(cloudSolrClient, "userid:11111");  
    183.         cloudSolrClient.close();  
    184.     }  
    185. }  
    186. </pre><br><br><p></p><p></p><pre></pre><pre></pre></pre>  
    187. <pre></pre>  
    188. <link rel="stylesheet" href="http://static.blog.csdn.net/public/res-min/markdown_views.css?v=2.0">  
    189.          

转载于:https://www.cnblogs.com/cxhfuujust/p/7755201.html

你可能感兴趣的文章
GNU make manual 翻译( 一百六十)
查看>>
POJ 2296 Map Labeler(二分+2SAT可行性判断)
查看>>
Entity Framework练习题
查看>>
SQL语句中用Parameters有什么好处
查看>>
SQL中的连接
查看>>
前沿设计推荐-使用jquery打造动感的浮动web界面
查看>>
mfc 类的定义
查看>>
FreeSWITCH 添加中文语音包
查看>>
Delegate如何进行类型转换?
查看>>
销售的最高境界竟然是聊天
查看>>
【原创】解决jquery在ie中不能解析字符串类型xml结构的xml字符串的问题
查看>>
高速排序算法
查看>>
MySql 触发器同步备份数据表记录
查看>>
Flex强制类型转换错误
查看>>
oracle中LAG()和LEAD()等分析统计函数的使用方法(统计月增长率)
查看>>
二分查找
查看>>
【进阶修炼】——改善C#程序质量(1)
查看>>
Ansible@一个高效的配置管理工具--Ansible configure management--翻译(八)
查看>>
Redis多机功能之Sentinel
查看>>
C# 利用WORD模板和标签(bookmark) 批量生成WORD
查看>>