您好,登錄后才能下訂單哦!
HBase的優化總結
總結起來:預分區,列族,批量讀寫,合并,鏈接池。詳細見下:
默認情況下,在創建HBase表的時候會自動創建一個region分區,當導入數據的時候,所有的HBase客戶端都向這一個region寫數據,直到這個region足夠大了才進行切分。一種可以加快批量寫入速度的方法是通過預先創建一些空的regions,這樣當數據寫入HBase時,會按照region分區情況,在集群內做數據的負載均衡。
有關預分區,詳情參見:Table Creation: Pre-Creating Regions,下面是一個例子:
public static boolean createTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits) throws IOException { try { admin.createTable(table, splits); return true; } catch (TableExistsException e) { logger.info("table " + table.getNameAsString() + " already exists"); // the table already exists... return false; } } //這個方法是傳入數據的起始和末尾key,以及想要分成幾個region,返回的是哪些數據分在哪些區里 //實際應用中可能需要對數據的特點進行分析,以免有些key對應的數據用戶傳入量很大,相鄰的key較為頻繁集中 public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) { byte[][] splits = new byte[numRegions-1][]; BigInteger lowestKey = new BigInteger(startKey, 16); BigInteger highestKey = new BigInteger(endKey, 16); BigInteger range = highestKey.subtract(lowestKey); BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions)); lowestKey = lowestKey.add(regionIncrement); for(int i=0; i < numRegions-1;i++) { BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i))); byte[] b = String.format("%016x", key).getBytes(); splits[i] = b; } return splits; }
預分區是根據預估的數據量,進行預先的region分割,設計哪些rowKey的數據放在哪些region上,避免數據傾斜。
HBase中row key用來檢索表中的記錄,就是用來查找表中數據的,支持以下三種方式:
通過單個row key訪問:即按照某個row key鍵值進行get操作;
通過row key的range進行scan:即通過設置startRowKey和endRowKey,在這個范圍內進行掃描;
全表掃描:即直接掃描整張表中所有行記錄。
在HBase中,row key可以是任意字符串,最大長度64KB,實際應用中一般為10~100bytes,存為byte[]字節數組,一般設計成定長的。
row key是按照字典序存儲,因此,設計row key時,要充分利用這個排序特點,將經常一起讀取的數據存儲到一塊,將最近可能會被訪問的數據放在一塊。
舉個例子:如果最近寫入HBase表中的數據是最可能被訪問的,可以考慮將時間戳作為row key的一部分,由于是字典序排序,所以可以使用Long.MAX_VALUE - timestamp作為row key,這樣能保證新寫入的數據在讀取時可以被快速命中。表示就是最早插入的數據row key越大,越靠后,越晚插入的數據row key越小,越靠前,因此可以使得最近插入的數據最先被訪問到,因為hbase在存儲表中數據時是按row key升序排列的。外界查詢時,是一次查region。
不要在一張表里定義太多的column family(列族)。目前Hbase并不能很好的處理超過2~3個column family的表。因為某個column family在flush的時候,它鄰近的column family也會因關聯效應被觸發flush,最終導致系統產生更多的I/O。感興趣的同學可以對自己的HBase集群進行實際測試,從得到的測試結果數據驗證一下。
創建表的時候,可以通過HColumnDescriptor.setInMemory(true)將表放到RegionServer的緩存中,保證在讀取的時候被cache命中。
緩存的一個思考:最靠近用戶的地方做緩存,不可以太底層。
創建表的時候,可以通過列族HColumnDescriptor.setMaxVersions(int maxVersions)設置表中數據的最大版本,如果只需要保存最新版本的數據,那么可以設置setMaxVersions(1)。每一個列族都可以設置這個Max Version。
hbase自身在服務器基本不設置,除了設置下zookeeper所在的位置,為了hbase可以找到zookeeper,一般在程序端,可以動態的創建表,并設置表內的屬性,例如該表中,某一個列族的Max Version。
創建表的時候,可以通過HColumnDescriptor.setTimeToLive(int timeToLive)設置表中數據的存儲生命期,過期數據將自動被刪除,例如如果只需要存儲最近兩天的數據,那么可以設置setTimeToLive(2 * 24 * 60 * 60)。
在HBase中,數據在更新時首先寫入WAL 日志(HLog)和內存(MemStore)中,MemStore中的數據是排序的,當MemStore累計到一定閾值時,就會創建一個新的MemStore,并且將老的MemStore添加到flush隊列,由單獨的線程flush到磁盤上,成為一個StoreFile。于此同時, 系統會在zookeeper中記錄一個redo point,表示這個時刻之前的變更已經持久化了(minor compact)。
StoreFile是只讀的,一旦創建后就不可以再修改。因此Hbase的更新其實是不斷追加的操作。當一個Store中的StoreFile達到一定的閾值后,就會進行一次合并(major compact),將對同一個key的修改合并到一起,形成一個大的StoreFile,當StoreFile的大小達到一定閾值后,又會對 StoreFile進行分割(split),等分為兩個StoreFile。
由于對表的更新是不斷追加的,處理讀請求時,需要訪問Store中全部的StoreFile和MemStore,將它們按照row key進行合并,由于StoreFile和MemStore都是經過排序的,并且StoreFile帶有內存中索引,通常合并過程還是比較快的。
實際應用中,可以考慮必要時手動進行major compact,將同一個row key的修改進行合并形成一個大的StoreFile。同時,可以將StoreFile設置大些,減少split的發生。
用戶訪問時,查詢先從region開始,查詢對應的row_key。因為插入時,是按row_key來插的數據,依序分在region上。
major compaction是將每個分區(region)下的所有store(列族)里的storeFile進行合并,方便查詢和插入,很耗資源的一種操作,因此不要頻繁進行,應使用程序手動操作合并。
總體有三種方式有major_compaction命令;api操作(常用);region server自動運行,默認是24小時一次。其中region server自動的方式需要設置hbase.hregion.majorcompaction.jetter,默認為0.2,也就是為了防止多個regionserver在同一時間合并,設定合并的時間有個±0.2的浮動。
minor compaction是較小范圍的合并,因為消耗資源少,因此設置好參數后,可以交由hbase自動管理,其中幾個參數:
hbase.hstore.compaction.min默認為3,至少需要3個滿足條件的storefile,才會啟動;
hbase.hstore.compaction.max默認為10,表示最多一次合并10個;
hbase.hstore.compaction.min.size
hbase.hstore.compaction.max.size這兩個表示storefile文件大小在哪個范圍內才會加入合并;
hbase.hstore.compaction.ratio將storefle按年齡排序來合并,先合并老的。
創建多個HTable客戶端用于寫操作,提高寫數據的吞吐量,一個例子:
htable創建時可以單獨傳入row-key來鎖定一行查詢,也可以設置scan,查詢多行數據。
static final Configuration conf = HBaseConfiguration.create(); static final String table_log_name = “user_log”; wTableLog = new HTable[tableN]; for (int i = 0; i < tableN; i++) { wTableLog[i] = new HTable(conf, table_log_name); wTableLog[i].setWriteBufferSize(5 * 1024 * 1024); //5MB wTableLog[i].setAutoFlush(false); }
通過調用HTable.setAutoFlush(false)方法可以將HTable寫客戶端的自動flush關閉,這樣可以批量寫入數據到HBase,而不是有一條put就執行一次更新,只有當put填滿客戶端寫緩存時,才實際向HBase服務端發起寫請求。默認情況下auto flush是開啟的。
通過調用HTable.setWriteBufferSize(writeBufferSize)方法可以設置HTable客戶端的寫buffer大小,如果新設置的buffer小于當前寫buffer中的數據時,buffer將會被flush到服務端。其中,writeBufferSize的單位是byte字節數,可以根據實際寫入數據量的多少來設置該值。
在HBae中,客戶端向集群中的RegionServer提交數據時(Put/Delete操作),首先會先寫WAL(Write Ahead Log)日志(即HLog,一個RegionServer上的所有Region共享一個HLog),只有當WAL日志寫成功后,再接著寫MemStore,然后客戶端被通知提交數據成功;如果寫WAL日志失敗,客戶端則被通知提交失敗。這樣做的好處是可以做到RegionServer宕機后的數據恢復。
因此,對于相對不太重要的數據,可以在Put/Delete操作時,通過調用Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函數,放棄寫WAL日志,從而提高數據寫入的性能。
值得注意的是:謹慎選擇關閉WAL日志,因為這樣的話,一旦RegionServer宕機,Put/Delete的數據將會無法根據WAL日志進行恢復。
通過調用HTable.put(Put)方法可以將一個指定的row key記錄寫入HBase,同樣HBase提供了另一個方法:通過調用HTable.put(List<Put>)方法可以將指定的row key列表,批量寫入多行記錄,這樣做的好處是批量執行,只需要一次網絡I/O開銷,這對于對數據實時性要求高,網絡傳輸RTT高的情景下可能帶來明顯的性能提升。
在客戶端開啟多個HTable寫線程,每個寫線程負責一個HTable對象的flush操作,這樣結合定時flush和寫buffer(writeBufferSize),可以既保證在數據量小的時候,數據可以在較短時間內被flush(如1秒內),同時又保證在數據量大的時候,寫buffer一滿就及時進行flush。下面給個具體的例子:
for (int i = 0; i < threadN; i++) { Thread th = new Thread() { public void run() { while (true) { try { sleep(1000); //1 second } catch (InterruptedException e) { e.printStackTrace(); } synchronized (wTableLog[i]) { try { wTableLog[i].flushCommits(); } catch (IOException e) { e.printStackTrace(); } } } } }; th.setDaemon(true); th.start(); }
創建多個HTable客戶端用于讀操作,提高讀數據的吞吐量,一個例子:
static final Configuration conf = HBaseConfiguration.create(); static final String table_log_name = “user_log”; rTableLog = new HTable[tableN]; for (int i = 0; i < tableN; i++) { rTableLog[i] = new HTable(conf, table_log_name); //每次scan數據時讀50條數據 rTableLog[i].setScannerCaching(50); }
hbase.client.scanner.caching配置項可以設置HBase scanner一次從服務端抓取的數據條數,默認情況下一次一條。通過將其設置成一個合理的值,可以減少scan過程中next()的時間開銷,代價是scanner需要通過客戶端的內存來維持這些被cache的行記錄。
有三個地方可以進行配置:1)在HBase的conf配置文件中進行配置;2)通過調用HTable.setScannerCaching(int scannerCaching)進行配置;3)通過調用Scan.setCaching(int caching)進行配置。三者的優先級越來越高。
scan時指定需要的Column Family,可以減少網絡傳輸數據量,否則默認scan操作會返回整行所有Column Family的數據。
通過scan取完數據后,記得要關閉ResultScanner,否則RegionServer可能會出現問題(對應的Server資源無法釋放)。
通過調用HTable.get(Get)方法可以根據一個指定的row key獲取一行記錄,同樣HBase提供了另一個方法:通過調用HTable.get(List<Get>)方法可以根據一個指定的row key列表,批量獲取多行記錄,這樣做的好處是批量執行,只需要一次網絡I/O開銷,這對于對數據實時性要求高而且網絡傳輸RTT高的情景下可能帶來明顯的性能提升。
在客戶端開啟多個HTable讀線程,每個讀線程負責通過HTable對象進行get操作。下面是一個多線程并發讀取HBase,獲取店鋪一天內各分鐘PV值的例子:
public class DataReaderServer { //獲取店鋪一天內各分鐘PV值的入口函數 public static ConcurrentHashMap<String, String> getUnitMinutePV(long uid, long startStamp, long endStamp){ long min = startStamp; int count = (int)((endStamp - startStamp) / (60*1000)); List<String> lst = new ArrayList<String>(); for (int i = 0; i <= count; i++) { min = startStamp + i * 60 * 1000; lst.add(uid + "_" + min); } return parallelBatchMinutePV(lst); } //多線程并發查詢,獲取分鐘PV值 private static ConcurrentHashMap<String, String> parallelBatchMinutePV(List<String> lstKeys){ ConcurrentHashMap<String, String> hashRet = new ConcurrentHashMap<String, String>(); int parallel = 3; List<List<String>> lstBatchKeys = null; if (lstKeys.size() < parallel ){ lstBatchKeys = new ArrayList<List<String>>(1); lstBatchKeys.add(lstKeys); } else{ lstBatchKeys = new ArrayList<List<String>>(parallel); for(int i = 0; i < parallel; i++ ){ List<String> lst = new ArrayList<String>(); lstBatchKeys.add(lst); } for(int i = 0 ; i < lstKeys.size() ; i ++ ){ lstBatchKeys.get(i%parallel).add(lstKeys.get(i)); } } List<Future< ConcurrentHashMap<String, String> >> futures = new ArrayList<Future< ConcurrentHashMap<String, String> >>(5); ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); builder.setNameFormat("ParallelBatchQuery"); ThreadFactory factory = builder.build(); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory); for(List<String> keys : lstBatchKeys){ Callable< ConcurrentHashMap<String, String> > callable = new BatchMinutePVCallable(keys); FutureTask< ConcurrentHashMap<String, String> > future = (FutureTask< ConcurrentHashMap<String, String> >) executor.submit(callable); futures.add(future); } executor.shutdown(); // Wait for all the tasks to finish try { boolean stillRunning = !executor.awaitTermination( 5000000, TimeUnit.MILLISECONDS); if (stillRunning) { try { executor.shutdownNow(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } catch (InterruptedException e) { try { Thread.currentThread().interrupt(); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } // Look for any exception for (Future f : futures) { try { if(f.get() != null) { hashRet.putAll((ConcurrentHashMap<String, String>)f.get()); } } catch (InterruptedException e) { try { Thread.currentThread().interrupt(); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } catch (ExecutionException e) { e.printStackTrace(); } } return hashRet; } //一個線程批量查詢,獲取分鐘PV值 protected static ConcurrentHashMap<String, String> getBatchMinutePV(List<String> lstKeys){ ConcurrentHashMap<String, String> hashRet = null; List<Get> lstGet = new ArrayList<Get>(); String[] splitValue = null; for (String s : lstKeys) { splitValue = s.split("_"); long uid = Long.parseLong(splitValue[0]); long min = Long.parseLong(splitValue[1]); byte[] key = new byte[16]; Bytes.putLong(key, 0, uid); Bytes.putLong(key, 8, min); Get g = new Get(key); g.addFamily(fp); lstGet.add(g); } Result[] res = null; try { res = tableMinutePV[rand.nextInt(tableN)].get(lstGet); } catch (IOException e1) { logger.error("tableMinutePV exception, e=" + e1.getStackTrace()); } if (res != null && res.length > 0) { hashRet = new ConcurrentHashMap<String, String>(res.length); for (Result re : res) { if (re != null && !re.isEmpty()) { try { byte[] key = re.getRow(); byte[] value = re.getValue(fp, cp); if (key != null && value != null) { hashRet.put(String.valueOf(Bytes.toLong(key, Bytes.SIZEOF_LONG)), String.valueOf(Bytes .toLong(value))); } } catch (Exception e2) { logger.error(e2.getStackTrace()); } } } } return hashRet; } } //調用接口類,實現Callable接口 class BatchMinutePVCallable implements Callable<ConcurrentHashMap<String, String>>{ private List<String> keys; public BatchMinutePVCallable(List<String> lstKeys ) { this.keys = lstKeys; } public ConcurrentHashMap<String, String> call() throws Exception { return DataReadServer.getBatchMinutePV(keys); } }
對于頻繁查詢HBase的應用場景,可以考慮在應用程序中做緩存,當有新的查詢請求時,首先在緩存中查找,如果存在則直接返回,不再查詢HBase;否則對HBase發起讀請求查詢,然后在應用程序中將查詢結果緩存起來。至于緩存的替換策略,可以考慮LRU等常用的策略。
也可以利用redis做緩存,就是從hbase查詢出的數據方式redis,外界訪問時,可以從redis里面去取。
HBase上Regionserver的內存分為兩個部分,一部分作為Memstore,主要用來寫;另外一部分作為BlockCache,主要用于讀。
寫請求會先寫入Memstore,Regionserver會給每個region提供一個Memstore,當Memstore滿64MB以后,會啟動 flush刷新到磁盤。當Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強行啟動flush進程,從最大的Memstore開始flush直到低于限制。
讀請求先到Memstore中查數據,查不到就到BlockCache中查,再查不到就會到磁盤上讀,并把讀的結果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache達到上限(heapsize * hfile.block.cache.size * 0.85)后,會啟動淘汰機制,淘汰掉最老的一批數據。
一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大于等于heapsize * 0.8,否則HBase不能啟動。默認BlockCache為0.2,而Memstore為0.4。對于注重讀響應時間的系統,可以將 BlockCache設大些,比如設置BlockCache=0.4,Memstore=0.39,以加大緩存的命中率。
有關鏈接可以參考對應鏈接的內部鏈接。
HTable是HBase客戶端與HBase服務端通訊的Java API對象,客戶端可以通過HTable對象與服務端進行CRUD操作(增刪改查)。它的創建很簡單(htable的創建):
Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "tablename");//TODO CRUD Operation……
HTable使用時的一些注意事項:
1. 規避HTable對象的創建開銷
因為客戶端創建HTable對象后,需要進行一系列的操作:檢查.META.表確認指定名稱的HBase表是否存在,表是否有效等等,整個時間開銷比較重,可能會耗時幾秒鐘之長,因此最好在程序啟動時一次性創建完成需要的HTable對象,如果使用Java API,一般來說是在構造函數中進行創建,程序啟動后直接重用。
2. HTable對象不是線程安全的
HTable對象對于客戶端讀寫數據來說不是線程安全的,因此多線程時,要為每個線程單獨創建復用一個HTable對象,不同對象間不要共享HTable對象使用,特別是在客戶端auto flash被置為false時,由于存在本地write buffer,可能導致數據不一致。
3. HTable對象之間共享Configuration
configuration不要創建太多,一個就夠了,通過zookeeper去連接hbase的類。
HTable對象共享Configuration對象,這樣的好處在于:
共享ZooKeeper的連接:每個客戶端需要與ZooKeeper建立連接,查詢用戶的table regions位置,這些信息可以在連接建立后緩存起來共享使用;
共享公共的資源:客戶端需要通過ZooKeeper查找-ROOT-和.META.表,這個需要網絡傳輸開銷,客戶端緩存這些公共資源后能夠減少后續的網絡傳輸開銷,加快查找過程速度。
因此,與以下這種方式相比:
HTable table1 = new HTable("table1"); HTable table2 = new HTable("table2");
下面的方式更有效些:
Configuration conf = HBaseConfiguration.create(); HTable table1 = new HTable(conf, "table1"); HTable table2 = new HTable(conf, "table2");
備注:即使是高負載的多線程程序,也并沒有發現因為共享Configuration而導致的性能問題;如果你的實際情況中不是如此,那么可以嘗試不共享Configuration。
HTablePool可以解決HTable存在的線程不安全問題,同時通過維護固定數量的HTable對象,能夠在程序運行期間復用這些HTable資源對象。
Configuration conf = HBaseConfiguration.create(); //創建池,使用時從池里去htable對象 HTablePool pool = new HTablePool(conf, 10);
1. HTablePool可以自動創建HTable對象,而且對客戶端來說使用上是完全透明的,可以避免多線程間數據并發修改問題。
2. HTablePool中的HTable對象之間是公用Configuration連接的,能夠可以減少網絡開銷。
HTablePool的使用很簡單:每次進行操作前,通過HTablePool的getTable方法取得一個HTable對象,然后進行put/get/scan/delete等操作,最后通過HTablePool的putTable方法將HTable對象放回到HTablePool中。
下面是個使用HTablePool的簡單例子:
public void createUser(String username, String firstName, String lastName, String email, String password, String roles) throws IOException { //從池里去htable對象 HTable table = rm.getTable(UserTable.NAME); Put put = new Put(Bytes.toBytes(username)); put.add(UserTable.DATA_FAMILY, UserTable.FIRSTNAME, Bytes.toBytes(firstName)); put.add(UserTable.DATA_FAMILY, UserTable.LASTNAME, Bytes.toBytes(lastName)); put.add(UserTable.DATA_FAMILY, UserTable.EMAIL, Bytes.toBytes(email)); //列族,列,數據 put.add(UserTable.DATA_FAMILY, UserTable.CREDENTIALS, Bytes.toBytes(password)); put.add(UserTable.DATA_FAMILY, UserTable.ROLES, Bytes.toBytes(roles)); table.put(put); table.flushCommits(); rm.putTable(table); }
至于多線程使用HTablePool的真實性能情況,需要通過實際的測試工作得到。
Coprocessor運行于HBase RegionServer服務端,各個Regions保持對與其相關的coprocessor實現類的引用,coprocessor類可以通過RegionServer上classpath中的本地jar或HDFS的classloader進行加載。
目前,已提供有幾種coprocessor:
Coprocessor:提供對于region管理的鉤子,例如region的open/close/split/flush/compact等;
RegionObserver:提供用于從客戶端監控表相關操作的鉤子,例如表的get/put/scan/delete等;
Endpoint:提供可以在region上執行任意函數的命令觸發器。一個使用例子是RegionServer端的列聚合,這里有代碼示例。
以上只是有關coprocessor的一些基本介紹,本人沒有對其實際使用的經驗,對它的可用性和性能數據不得而知。感興趣的同學可以嘗試一下,歡迎討論。
HBase本身可以看作是一個可以水平擴展的Key-Value存儲系統,但是其本身的計算能力有限(Coprocessor可以提供一定的服務端計算),因此,使用HBase時,往往需要從寫端或者讀端進行計算,然后將最終的計算結果返回給調用者。舉兩個簡單的例子:
PV計算:通過在HBase寫端內存中,累加計數,維護PV值的更新,同時為了做到持久化,定期(如1秒)將PV計算結果同步到HBase中,這樣查詢端最多會有1秒鐘的延遲,能看到秒級延遲的PV結果。
分鐘PV計算:與上面提到的PV計算方法相結合,每分鐘將當前的累計PV值,按照rowkey + minute作為新的rowkey寫入HBase中,然后在查詢端通過scan得到當天各個分鐘以前的累計PV值,然后順次將前后兩分鐘的累計PV值相減,就得到了當前一分鐘內的PV值,從而最終也就得到當天各個分鐘內的PV值。
對于UV的計算,就是個去重計算的例子。分兩種情況:
如果內存可以容納,那么可以在Hash表中維護所有已經存在的UV標識,每當新來一個標識時,通過快速查找Hash確定是否是一個新的UV,若是則UV值加1,否則UV值不變。另外,為了做到持久化或提供給查詢接口使用,可以定期(如1秒)將UV計算結果同步到HBase中。
如果內存不能容納,可以考慮采用Bloom Filter來實現,從而盡可能的減少內存的占用情況。除了UV的計算外,判斷URL是否存在也是個典型的應用場景。
如果對于響應時間要求比較苛刻的情況(如單次http請求要在毫秒級時間內返回),個人覺得讀端不宜做過多復雜的計算邏輯,盡量做到讀端功能單一化:即從HBase RegionServer讀到數據(scan或get方式)后,按照數據格式進行簡單的拼接,直接返回給前端使用。當然,如果對于響應時間要求一般,或者業務特點需要,也可以在讀端進行一些計算邏輯。
作為一個Key-Value存儲系統,HBase并不是萬能的,它有自己獨特的地方。因此,基于它來做應用時,我們往往需要從多方面進行優化改進(表設計、讀表操作、寫表操作、數據計算等),有時甚至還需要從系統級對HBase進行配置調優,更甚至可以對HBase本身進行優化。這屬于不同的層次范疇。
總之,概括來講,對系統進行優化時,首先定位到影響你的程序運行性能的瓶頸之處,然后有的放矢進行針對行
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。