中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

發布時間:2020-07-29 14:51:09 來源:網絡 閱讀:216 作者:wx5d9ed7c8443c3 欄目:編程語言

最近的一個項目是風控過程數據實時統計分析和聚合的一個 OLAP 分析監控平臺,日流量峰值在 10 到 12 億上下,每年數據約 4000 億條,占用空間大概 200T。

面對這樣一個數據量級的需求,我們的數據如何存儲和實現實時查詢將是一個嚴峻的挑戰。

經過對 Elasticsearch 多方調研和超過幾百億條數據的插入和聚合查詢的驗證之后,我們總結出以下幾種能夠有效提升性能和解決這一問題的方案:

  1. 集群規劃
  2. 存儲策略
  3. 索引拆分
  4. 壓縮
  5. 冷熱分區等

本文所使用的 Elasticsearch 版本為 5.3.3。

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

什么是時序索引?其主要特點體現在如下兩個方面:

存,以時間為軸,數據只有增加,沒有變更,并且必須包含 timestamp(日期時間,名稱隨意)字段。

其作用和意義要大于數據的 id 字段,常見的數據比如我們通常要記錄的操作日志、用戶行為日志、或股市行情數據、服務器 CPU、內存、網絡的使用率等。

取,一定是以時間范圍為第一過濾條件,然后是其他查詢條件,比如近一天、一周、本月等等,然后在這個范圍內進行二次過濾。

比如性別或地域等,查詢結果中比較關注的是每條數據和 timestamp 字段具體發生的時間點,而非 id。

此類數據一般用于 OLAP、監控分析等場景。

一、集群部署規劃

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

我們都知道在 Elasticsearch(下稱 ES)集群中有兩個主要角色:Master Node 和 Data Node,其他如 Tribe Node 等節點可根據業務需要另行設立。

為了讓集群有更好的性能表現,我們應該對這兩個角色有一個更好的規劃,在 Nodes 之間做讀取分離,保證集群的穩定性和快速響應,在大規模的數據存儲和查詢的壓力之下能夠坦然面對,各自愉快的協作。

1、Master Nodes

Master Node,整個集群的管理者,負有對 index 的管理、shards 的分配,以及整個集群拓撲信息的管理等功能。

眾所周知,Master Node 可以通過 Data Node 兼任,但是,如果對群集規模和穩定要求很高的話,就要職責分離,Master Node 推薦獨立,它的狀態關乎整個集群的存活。

Master 的配置:

        node.master: true

        node.data: false

        node.ingest: false

這樣 Master 不參與 I、O,從數據的搜索和索引操作中解脫出來,專門負責集群的管理工作,因此 Master Node 的節點配置可以相對低一些。

另外防止 ES 集群 split brain(腦裂),合理配置 discovery.zen.minimum_master_nodes 參數,官方推薦 master-eligible nodes / 2 + 1 向下取整的個數。

這個參數決定選舉 Master 的 Node 個數,太小容易發生“腦裂”,可能會出現多個 Master,太大 Master 將無法選舉。

2、Data Nodes

Data Node 是數據的承載者,對索引的數據存儲、查詢、聚合等操作提供支持。

這些操作嚴重消耗系統的 CPU、內存、IO 等資源,因此,應該把最好的資源分配給 Data Node,因為它們是真正干累活的角色,同樣 Data Node 也不兼任 Master 的功能。

Data 的配置:

        node.master: false

        node.data: true

        node.ingest: false
3、Coordinating Only Nodes

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

ES 本身是一個分布式的計算集群,每個 Node 都可以響應用戶的請求,包括 Master Node、Data Node,它們都有完整的 Cluster State 信息。

正如我們知道的一樣,在某個 Node 收到用戶請求的時候,會將請求轉發到集群中所有索引相關的 Node 上,之后將每個 Node 的計算結果合并后返回給請求方。

我們暫且將這個 Node 稱為查詢節點,整個過程跟分布式數據庫原理類似。那問題來了,這個查詢節點如果在并發和數據量比較大的情況下,由于數據的聚合可能會讓內存和網絡出現瓶頸。

因此,在職責分離指導思想的前提下,這些操作我們也應該從這些角色中剝離出來,官方稱它是 Coordinating Nodes,只負責路由用戶的請求,包括讀、寫等操作,對內存、網絡和 CPU 要求比較高。

本質上,Coordinating Only Nodes 可以籠統的理解為是一個負載均衡器,或者反向代理,只負責讀,本身不寫數據。

它的配置是:

? ? ? ? node.master: false

? ? ? ? node.data: false

? ? ? ? node.ingest: false

? ? ? ? search.remote.connect: false

增加 Coordinating Nodes 的數量可以提高 API 請求響應的性能,我們也可以針對不同量級的 Index 分配獨立的 Coordinating Nodes 來滿足請求性能。

那是不是越多越好呢?在一定范圍內是肯定的,但凡事有個度,過了負作用就會突顯,太多的話會給集群增加負擔。

在做 Master 選舉的時候會先確保所有 Node 的 Cluster State 是一致的,同步的時候會等待每個 Node 的 Acknowledgement 確認,所以適量分配可以讓集群暢快的工作。

search.remote.connect 是禁用跨集群查詢,防止在進行集群之間查詢時發生二次路由:

二、Routing

類似于分布式數據庫中的分片原則,將符合規則的數據存儲到同一分片。ES 通過哈希算法來決定數據存儲于哪個 Shard:

shard_num = hash(_routing) % num_primary_shards

其中 hash(_routing) 得出一個數字,然后除以主 Shards 的數量得到一個余數,余數的范圍是 0 到 number_of_primary_shards - 1,這個數字就是文檔所在的 Shard。

Routing 默認是 id 值,當然可以自定義,合理指定 Routing 能夠大幅提升查詢效率,Routing 支持 GET、Delete、Update、Post、Put 等操作。

如:

PUT my_index/my_type/1?routing=user1

{

"title": "This is a document"

}

GET my_index/my_type/1?routing=user1

不指定 Routing 的查詢過程:

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

簡單的來說,一個查詢請求過來以后會查詢每個 Shard,然后做結果聚合,總的時間大概就是所有 Shard 查詢所消耗的時間之和。

指定 Routing 以后:

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

會根據 Routing 查詢特定的一個或多個 Shard,這樣就大大減少了查詢時間,提高了查詢效率。

當然,如何設置 Routing 是一個難點,需要一點技巧,要根據業務特點合理組合 Routing 的值,來劃分 Shard 的存儲,最終保持數據量相對均衡。

可以組合幾個維度做為 Routing ,有點類似于 Hbase Key,例如不同的業務線加不同的類別,不同的城市和不同的類型等等,如:

? ? ? ? _search?routing=beijing:按城市。

? ? ? ? _search?routing=beijing_user123:按城市和用戶。

? ? ? ? _search?routing=beijing_android,shanghai_android:按城市和手機類型等。

數據不均衡?假如你的業務在北京、上海的數據遠遠大于其他二三線城市的數據。

再例如我們的業務場景,A 業務線的數據量級遠遠大于 B 業務線,有時候很難通過 Routing 指定一個值保證數據在所有 Shards 上均勻分布,會讓部分 Shard 變的越來越大,影響查詢性能,怎么辦?

一種解決辦法是單獨為這些數據量大的渠道創建獨立的 Index

這樣可以根據需要在不同 Index 之間查詢,然而每個 Index 中 Shards 的數據可以做到相對均衡。

另一種辦法是指定 Index 參數 index.routing_partition_size,來解決最終可能產生群集不均衡的問題,指定這個參數后新的算法如下:

   shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards

index.routing_partition_size 應具有大于 1 且小于 index.number_of_shards 的值。

最終數據會在 routing_partition_size 幾個 Shard 上均勻存儲,是哪個 Shard 取決于 hash(_id) % routing_partition_size 的計算結果。

指定參數 index.routing_partition_size 后,索引中的 Mappings 必須指定 _routing 為 "required": true,另外 Mappings 不支持 parent-child 父子關系。

很多情況下,指定 Routing 后會大幅提升查詢性能,畢竟查詢的 Shard 只有那么幾個,但是如何設置 Routing 是個難題,可根據業務特性巧妙組合。

三、索引拆分

Index 通過橫向擴展 Shards 實現分布式存儲,這樣可以解決 Index 大數據存儲的問題。

但在一個 Index 變的越來越大,單個 Shard 也越來越大,查詢和存儲的速度也越來越慢。

更重要的是一個 Index 其實是有存儲上限的(除非你設置足夠多的 Shards 和機器),如官方聲明單個 Shard 的文檔數不能超過 20 億(受限于 Lucene index,每個 Shard 是一個 Lucene index)。

考慮到 I、O,針對 Index 每個 Node 的 Shards 數最好不超過 3 個,那面對這樣一個龐大的 Index,我們是采用更多的 Shards,還是更多的 Index,我們如何選擇?

Index 的 Shards 總量也不宜太多,更多的 Shards 會帶來更多的 I、O 開銷,其實答案就已經很明確,除非你能接受長時間的查詢等待。

Index 拆分的思路很簡單,時序索引有一個好處就是只有增加,沒有變更,按時間累積,天然對索引的拆分友好支持,可以按照時間和數據量做任意時間段的拆分。

ES 提供的 Rollover Api + Index Template 可以非常便捷和友好的實現 Index 的拆分工作,把單個 index docs 數量控制在百億內,也就是一個 Index 默認 5 個 Shards 左右即可,保證查詢的即時響應。

簡單介紹一下 Rollover API 和 Index Template 這兩個東西,如何實現 index 的拆分。

1、Index Template

我們知道 ES 可以為同一目的或同一類索引創建一個 Index Template,之后創建的索引只要符合匹配規則就會套用這個 Template,不必每次指定 Settings 和 Mappings 等屬性。

一個 Index 可以被多個 Template 匹配,那 Settings 和 Mappings 就是多個 Template 合并后的結果。

有沖突通過 Template 的屬性"order" : 0 從低到高覆蓋(這部分據說會在 ES6 中會做調整,更好的解決 Template 匹配沖突問題)。

示例:

? ? ? ? PUT _template/template_1{

? ? ? ? "index_patterns" : ["log-*"],

? ? ? ? "order" : 0,

? ? ? ? "settings" : {

? ? ? ? ? ? "number_of_shards" : 5

? ? ? ? },

? ? "aliases" : {

? ? ? ? ?"alias1" : {}

? ? ? ? }

? ? }
2、Rollover Index

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

Rollover Index 可以將現有的索引通過一定的規則,如數據量和時間,索引的命名必須是 logs-000001 這種格式,并指定 aliases,示例:

? ? PUT /logs-000001

? ? {

? ? ? ? ?"aliases": {

? ? ? ? ?"logs_write": {}

? ? ? ? }

? ? }

? ? # Add > 1000 documents to logs-000001

? ? ?POST /logs_write/_rollover

? ? ?{

? ? ? "conditions": {

? ? ? ? ? ? "max_age": "7d",

? ? ? ? ? ? ?"max_docs": 1000

? ? ? ? }

? ? }

先創建索引并指定別名 logs_write,插入 1000 條數據,然后請求 _rollover api 并指定拆分規則。

? ? 如果索引中的數據大于規則中指定的數據量或者時間過時,新的索引將被創建,索引名稱為 logs-000002,并根據規則套用 Index Template, 同時別名 logs_write 也將被變更到 logs-000002。

注意事項:

索引命名規則必須如同:logs-000001。

索引必須指定 aliases。

Rollover Index API 調用時才去檢查索引是否超出指定規則,不會自動觸發,需要手動調用,可以通過 Curator 實現自動化。

如果符合條件會創建新的索引,老索引的數據不會發生變化,如果你已經插入 2000 條,拆分后還是 2000 條。

插入數據時一定要用別名,否則你可能一直在往一個索引里追加數據。

技巧是按日期滾動索引:

PUT /<logs-{now/d}-1>

{

"aliases": {

"logs_write": {}

}

}

假如生成的索引名為 logs-2017.04.13-1,如果你在當天執行 Rollover 會生成 logs-2017.04.13-000001,次日的話是 logs-2017.04.14-000001。

這樣就會按日期進行切割索引,那如果你想查詢 3 天內的數據可以通過日期規則來匹配索引名,如:

GET /<logs-{now/d}-*>,<logs-{now/d-1d}-*>,<logs-{now/d-2d}-*>/_search

到此,我們就可以通過 Index Template 和 Rollover API 實現對 Index 的任意拆分,并按照需要進行任意時間段的合并查詢,這樣只要你的時間跨度不是很大,查詢速度一般可以控制在毫秒級,存儲性能也不會遇到瓶頸。

四、Hot-Warm 架構

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

冷熱架構,為了保證大規模時序索引實時數據分析的時效性,可以根據資源配置不同將 Data Nodes 進行分類形成分層或分組架構。

一部分支持新數據的讀寫,另一部分僅支持歷史數據的存儲,存放一些查詢發生機率較低的數據。

即 Hot-Warm 架構,對 CPU,磁盤、內存等硬件資源合理的規劃和利用,達到性能和效率的最大化。

我們可以通過 ES 的 Shard Allocation Filtering 來實現 Hot-Warm 的架構。

實現思路如下:

將 Data Node 根據不同的資源配比打上標簽,如:Host、Warm。

定義 2 個時序索引的 Index Template,包括 Hot Template 和 Warm Template,Hot Template 可以多分配一些 Shard 和擁有更好資源的 Hot Node。

用 Hot Template 創建一個 Active Index 名為 active-logs-1,別名 active-logs,支持索引切割。

插入一定數據后,通過 roller over api 將 active-logs 切割,并將切割前的 Index 移動到 Warm Nodes 上,如 active-logs-1,并阻止寫入。

通過 Shrinking API 收縮索引 active-logs-1 為 inactive-logs-1,原 Shard 為 5,適當收縮到 2 或 3,可以在 Warm Template 中指定,減少檢索的 Shard,使查詢更快。

通過 force-merging api 合并 inactive-logs-1 索引每個 Shard 的 Segment,節省存儲空間。

刪除 active-logs-1。

1、Hot,Warm Nodes

Hot Nodes

擁有最好資源的 Data Nodes,如更高性能的 CPU、SSD 磁盤、內存等資源,這些特殊的 Nodes 支持索引所有的讀、寫操作。

如果你計劃以 100 億為單位來切割 Index,那至少需要三個這樣的 Data Nodes,Index 的 Shard 數為 5,每個 Shard 支持 20 億 Documents 數據的存儲。

為這類 Data Nodes 打上標簽,以便我們在 Template 中識別和指定,啟動參數如下:

./bin/elasticsearch -Enode.attr.box_type=hot

或者配置文件:

node.attr.box_type: hot

Warm Nodes

存儲只讀數據,并且查詢量較少,但用于存儲長多時間歷史數據的 Data Nodes,這類 Nodes 相對 Hot Nodes 較差的硬件配置,根據需求配置稍差的 CPU、機械磁盤和其他硬件資源,至于數量根據需要保留多長時間的數據來配比,同樣需要打上標簽,方法跟 Hot Nodes 一樣,指定為 Warm,box_type: warm。

2、Hot,Warm Template

Hot Template

我們可以通過指定參數"routing.allocation.include.box_type": "hot",讓所有符合命名規則索引的 Shard 都將被分配到 Hot Nodes 上:

PUT _template/active-logs

{

"template": "active-logs-*",

"settings": {

"number_of_shards": 5,

"number_of_replicas": 1,

"routing.allocation.include.box_type": "hot",

"routing.allocation.total_shards_per_node": 2

},

"aliases": {

"active-logs": {}

}

}
Warm Template

同樣符合命名規則索引的 Shard 會被分配到 Warm Nodes 上,我們指定了更少的 Shards 數量和復本數。

注意,這里的復本數為 0,和 best_compression 級別的壓縮,方便做遷移等操作,以及進行一些數據的壓縮:

PUT _template/inactive-logs

{

"template": "inactive-logs-*",

"settings": {

"number_of_shards": 1,

"number_of_replicas": 0,

"routing.allocation.include.box_type": "warm",

"codec": "best_compression"

}

}

假如我們已經創建了索引 active-logs-1 ,當然你可以通過 _bulk API 快速寫入測試的數據,然后參考上文中介紹的 Rollover API 進行切割。

3、Shrink Index
Rollover API 切割以后,active-logs-1 將變成一個冷索引,我們將它移動到 Warm Nodes 上。

先將索引置為只讀狀態,拒絕任何寫入操作,然后修改 index.routing.allocation.include.box_type 屬性,ES 會自動移動所有 Shards 到目標 Data Nodes 上:

PUT active-logs-1/_settings

{

"index.blocks.write": true,

"index.routing.allocation.include.box_type": "warm"

}

Cluster Health API 可以查看遷移狀態,完成后進行收縮操作,其實相當于復制出來一個新的索引,舊的索引還存在。

POST active-logs-1/_shrink/inactive-logs-1

我們可以通過 Head 插件查看整個集群索引的變化情況。

Forcemerge

到目前為止我們已經實現了索引的冷熱分離,和索引的收縮,我們知道每個 Shard 下面由多個 Segment 組成,那 inactive-logs-1 的 Shard 數是 1,但 Segment 還是多個。

這類索引不會在接受寫入操作,為了節約空間和改善查詢性能,通過 Forcemerge API 將 Segment 適量合并:

PUT inactive-logs-1/_settings

{ "number_of_replicas": 1 }

ES 的 Forcemerge 過程是先創建新的 Segment 刪除舊的,所以舊 Segment 的壓縮方式 best_compression 不會在新的 Segment 中生效,新的 Segment 還是默認的壓縮方式。

現在 inactive-logs-1 的復本還是 0,如果有需要的話,可以分配新的復本數:

PUT inactive-logs-1/_settings

{ "number_of_replicas": 1 }

最后刪除 active-logs-1,因為我們已經為它做了一個查詢復本 inactive-logs-1。

DELETE active-logs-1

走到這里,我們就已經實現了 Index 的 Hot-Warm 架構,根據業務特點將一段時間的數據放在 Hot Nodes,更多的歷史數據存儲于 Warm Nodes。

五、其他優化方案

這一部分我們會展示更多的優化方案。

1、索引隔離

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

在多條業務線的索引共用一個 ES 集群時會發生流量被獨吃獨占的情況,因為大家都共用相同的集群資源,流量大的索引會占用大部分計算資源而流量小的也會被拖慢,得不到即時響應,或者說業務流量大的索引可以按天拆分,幾個流量小的索引可以按周或月拆分。

這種情況下我們可以將規模大的索引和其他相對小規模的索引獨立存儲,分開查詢或合并查詢。

除了 Master Nodes 以外,Data Nodes 和 Coordinating Nodes 都可以獨立使用(其實如果每個索引的量都特別大也應該采用這種架構),還有一個好處是對方的某個 Node 掛掉,自己不受影響。

同樣利用 ES 支持 Shard Allocation Filtering 功能來實現索引的資源獨立分配,先將 Nodes 進行打標簽,劃分區域,類似于 Hot-Warm 架構:

node.attr.zone=zone_a、node.attr.zone=zone_b

或者:

node.attr.zone =zone_hot_a、node.attr.zone=zone_hot_b

等打標簽的方式來區別對應不同的索引,然后在各自的 Index Template 中指定不同的 node.attr.zone 即可。

如"index.routing.allocation.include.zone" : "zone_a,zone_hot_a",或者排除法"index.routing.allocation.exclude.size": "zone_hot_b"分配到 zone_hot_b 以外的所有 Data Nodes 上。

更多用法可以參考 Hot-Warm 架構,或 shard-allocation-filtering:

2、跨數據中心

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

如果你的業務遍布全國各地,四海八荒,如果你數據要存儲到多個機房,如果你的 Index 有幾萬個甚至更多( Index 特別多,集群龐大會導致 Cluster State 信息量特別大,因為 Cluster State 包含了所有 Shard、Index、Node 等所有相關信息,它存儲在每個 Node 上,這些數據發生變化都會實時同步到所有 Node 上,當這個數據很大的時候會對集群的性能造成影響)。

這些情況下我們會考慮部署多個 ES Cluster,那我們將如何解決跨集群查詢的問題呢?

目前 ES 針對跨集群操作提供了兩種方案 Tribe Node 和 Cross Cluster Search。

Tribe Node

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

需要一個獨立的 Node 節點,加入其他 ES Cluster,用法有點類似于 Coordinating Only Node。

所不同的是 Tribe 是針對多個 ES 集群之間的所有節點,Tribe Node 收到請求廣播到相關集群中所有節點,將結果合并處理后返回。

表面上看起來 Tribe Node 將多個集群串連成了一個整體,遇到同名 Index 發生沖突,會選擇其中一個 Index,也可以指定:

tribe:

on_conflict: prefer_t1

t1:

cluster.name: us-cluster

discovery.zen.ping.multicast.enabled: false

discovery.zen.ping.unicast.hosts: ['usm1','usm2','usm3']

t2:

cluster.name: eu-cluster

discovery.zen.ping.multicast.enabled: false

discovery.zen.ping.unicast.hosts: ['eum1','eum2','eum3']
Cross Cluster Search

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

Cross Cluster Search 可以讓集群中的任意一個節點聯合查詢其他集群中的數據, 通過配置 elasticsearch.yml 或者 API 來啟用這個功能,API 示例:

PUT _cluster/settings

{

"persistent": {

"search": {

"remote": {

"cluster_one": {

"seeds": [

"127.0.0.1:9300"

]

...

}

}

}

}

}

提交以后整個集群所有節點都會生效,都可以做為代理去做跨集群聯合查詢,不過我們最好還是通過 Coordinating Only Nodes 去發起請求。

POST /cluster_one:decision,decision/_search

{

"match_all": {}

}

對集群 cluster_one 和本集群中名為 Decision 的索引聯合查詢。

目前這個功能還在測試階段,但未來可能會取代 Tribe Node,之間的最大的差異是 Tribe Node 需要設置獨立的節點,而 Cross Cluster Search 不需要,集群中的任意一個節點都可以兼任。

比如可以用我們的 Coordinating Only Nodes 做為聯合查詢節點,另一個優點是配置是動態的,不需要重啟節點。

實際上可以理解為是一個 ES 集群之間特定的動態代理工具,支持所有操作,包括 Index 的創建和修改,并且通過 Namespace 對 Index 進行隔離,也解決了 Tribe Node 之 Index 名稱沖突的問題。

六、總結

我們在文中介紹了幾種方案用來解決時序索引的海量數據存儲和查詢的問題,根據業務特點和使用場景來單獨或組合使用能夠發揮出意想不到的效果。

特別是 Nodes 之間的讀寫分離、索引拆分、Hot-Warm 等方案的組合應用對索引的查詢和存儲性能有顯著的提升。

另外 Routing 在新版本中增加了 routing_partition_size,解決了 Shard 難以均衡的問題。

如果你的索引 Mapping 中沒有 parent-child 關聯關系可以考慮使用,對查詢的性能提升非常有效。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

涞水县| 米泉市| 靖江市| 和硕县| 洛浦县| 九台市| 剑川县| 太白县| 丹江口市| 七台河市| 宁晋县| 山丹县| 厦门市| 澎湖县| 武城县| 泾川县| 兴国县| 内乡县| 华宁县| 五峰| 吐鲁番市| 五河县| 蚌埠市| 上林县| 广东省| 丽江市| 蓝山县| 沂源县| 武城县| 许昌县| 财经| 横山县| 冷水江市| 新建县| 武强县| 柳江县| 土默特右旗| 招远市| 民乐县| 如东县| 庄河市|