您好,登錄后才能下訂單哦!
本篇文章為大家展示了Python消費組怎么實時跨域監測多日志庫數據,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
本文主要介紹如何使用消費組實時監控多個域中的多個日志庫中的異常數據,并進行下一步告警動作。具備配置簡單、邏輯靈活、支持跨域多Region、實時監測,無需配置索引等特點,并且性能可靠、自動負載均衡與保持高可用性。
使用日志服務進行數據處理與傳遞的過程中,你是否遇到如下監測場景不能很好的解決:
特定數據上傳到日志服務中需要檢查數據內的異常情況,而沒有現成監控工具?
需要檢索數據里面的關鍵字,但數據沒有建立索引,無法使用日志服務的告警功能?
數據監測要求實時性(<5秒,例如Web訪問500錯誤),而特定功能都有一定延遲(1分鐘以上)?
存在多個域的多個日志庫(例如每個Region的錯誤文件對應的日志庫),數據量不大,但監控邏輯類似,每個目標都要監控與配置,比較繁瑣?
如果是的,您可以考慮使用日志服務Python消費組進行跨域實時數據監控,主要介紹如何使用消費組實時監控多個域中的多個日志庫中的異常數據,并進行下一步告警動作。可以很好解決以上問題,并利用消費組的特點,達到自動平衡、負載均衡和高可用性。
基本概念
協同消費庫(Consumer Library)是對日志服務中日志進行消費的高級模式,提供了消費組(ConsumerGroup)的概念對消費端進行抽象和管理,和直接使用SDK進行數據讀取的區別在于,用戶無需關心日志服務的實現細節,只需要專注于業務邏輯,另外,消費者之間的負載均衡、failover等用戶也都無需關心。
消費組(Consumer Group) - 一個消費組由多個消費者構成,同一個消費組下面的消費者共同消費一個logstore中的數據,消費者之間不會重復消費數據。
消費者(Consumer) - 消費組的構成單元,實際承擔消費任務,同一個消費組下面的消費者名稱必須不同。
在日志服務中,一個logstore下面會有多個shard,協同消費庫的功能就是將shard分配給一個消費組下面的消費者,分配方式遵循以下原則:
每個shard只會分配到一個消費者。
一個消費者可以同時擁有多個shard。
新的消費者加入一個消費組,這個消費組下面的shard從屬關系會調整,以達到消費負載均衡的目的,但是上面的分配原則不會變,分配過程對用戶透明。
協同消費庫的另一個功能是保存checkpoint,方便程序故障恢復時能接著從斷點繼續消費,從而保證數據不會被重復消費。
這里我們描述用Python使用消費組進行編程,實時跨域監測多個域的多個日志庫,全文或特定字段檢查
注意:文章的相關代碼可能會更新,最新版本在這里可以找到:Github樣例.
環境
建議程序運行在靠近源日志庫同Region下的ECS上,并使用局域網服務入口,這樣好處是網絡速度最快,其次是讀取沒有外網費用產生。
強烈推薦PyPy3來運行本程序,而不是使用標準CPython解釋器。
日志服務的Python SDK可以如下安裝:
pypy3 -m pip install aliyun-log-python-sdk -U
如下展示如何配置程序:
配置程序日志文件,以便后續測試或者診斷可能的問題(跳過,具體參考樣例)。
基本的日志服務連接與消費組的配置選項。
目標Logstore的一些連接信息
請仔細閱讀代碼中相關注釋并根據需要調整選項:
#encoding: utf8 def get_option(): ########################## # 基本選項 ########################## # 從環境變量中加載SLS參數與選項,endpoint、project、logstore可以多個并配對 endpoints = os.environ.get('SLS_ENDPOINTS', '').split(";") # ;分隔 projects = os.environ.get('SLS_PROJECTS', '').split(";") # ;分隔 logstores = os.environ.get('SLS_LOGSTORES', '').split(";") # ;分隔,同一個Project下的用,分隔 accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') consumer_group = os.environ.get('SLS_CG', '') # 消費的起點。這個參數在第一次跑程序的時候有效,后續再次運行將從上一次消費的保存點繼續。 # 可以使”begin“,”end“,或者特定的ISO時間格式。 cursor_start_time = "2018-12-26 0:0:0" # 一般不要修改消費者名,尤其是需要并發跑時 consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # 設定共享執行器 exeuctor = ThreadPoolExecutor(max_workers=2) # 構建多個消費組(每個logstore一個) options = [] for i in range(len(endpoints)): endpoint = endpoints[i].strip() project = projects[i].strip() if not endpoint or not project: logger.error("project: {0} or endpoint {1} is empty, skip".format(project, endpoint)) continue logstore_list = logstores[i].split(",") for logstore in logstore_list: logstore = logstore.strip() if not logstore: logger.error("logstore for project: {0} or endpoint {1} is empty, skip".format(project, endpoint)) continue option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, shared_executor=exeuctor) options.append(option) # 設定檢測目標字段與目標值,例如這里是檢測status字段是否有500等錯誤 keywords = {'status': r'5\d{2}'} return exeuctor, options, keywords
注意,配置了多個endpoint、project、logstore,需要用分號分隔,并且一一對應;如果一個project下有多個logstore需要檢測,可以將他們直接用逗號分隔。如下是一個檢測3個Region下的4個Logstore的配置:
export SLS_ENDPOINTS=cn-hangzhou.log.aliyuncs.com;cn-beijing.log.aliyuncs.com;cn-qingdao.log.aliyuncs.com export SLS_PROJECTS=project1;project2;project3 export SLS_LOGSTORES=logstore1;logstore2;logstore3_1,logstore3_2
如下代碼展示如何構建一個關鍵字檢測器,針對數據中的目標字段進行檢測,您也可以修改邏輯設定為符合需要的場景(例如多個字段的組合關系等)。
class KeywordMonitor(ConsumerProcessorBase): """ this consumer will keep monitor with k-v fields. like {"content": "error"} """ def __init__(self, keywords=None, logstore=None): super(KeywordMonitor, self).__init__() # remember to call base init self.keywords = keywords self.kw_check = {} for k, v in self.keywords.items(): self.kw_check[k] = re.compile(v) self.logstore = logstore def process(self, log_groups, check_point_tracker): logs = PullLogResponse.loggroups_to_flattern_list(log_groups) match_count = 0 sample_error_log = "" for log in logs: m = None for k, c in self.kw_check.items(): if k in log: m = c.search(log[k]) if m: logger.debug('Keyword detected for shard "{0}" with keyword: "{1}" in field "{2}", log: {3}' .format(self.shard_id, log[k], k, log)) if m: match_count += 1 sample_error_log = log if match_count: logger.info("Keyword detected for shard {0}, count: {1}, example: {2}".format(self.shard_id, match_count, sample_error_log)) # TODO: 這里添加通知下游的代碼 else: logger.debug("No keyword detected for shard {0}".format(self.shard_id)) self.save_checkpoint(check_point_tracker)
如下展示如何控制多個消費者,并管理退出命令:
def main(): exeuctor, options, keywords = get_monitor_option() logger.info("*** start to consume data...") workers = [] for option in options: worker = ConsumerWorker(KeywordMonitor, option, args=(keywords,) ) workers.append(worker) worker.start() try: for i, worker in enumerate(workers): while worker.is_alive(): worker.join(timeout=60) logger.info("worker project: {0} logstore: {1} exit unexpected, try to shutdown it".format( options[i].project, options[i].logstore)) worker.shutdown() except KeyboardInterrupt: logger.info("*** try to exit **** ") for worker in workers: worker.shutdown() # wait for all workers to shutdown before shutting down executor for worker in workers: while worker.is_alive(): worker.join(timeout=60) exeuctor.shutdown() if __name__ == '__main__': main()
假設程序命名為"monitor_keyword.py",可以如下啟動:
export SLS_ENDPOINTS=cn-hangzhou.log.aliyuncs.com;cn-beijing.log.aliyuncs.com;cn-qingdao.log.aliyuncs.com export SLS_PROJECTS=project1;project2;project3 export SLS_LOGSTORES=logstore1;logstore2;logstore3_1,logstore3_2 export SLS_AK_ID=<YOUR AK ID> export SLS_AK_KEY=<YOUR AK KEY> export SLS_CG=<消費組名,可以簡單命名為"dispatch_data"> pypy3 monitor_keyword.py
如果您的目標logstore存在多個shard,或者您的目標監測日志庫較多,您可以進行一定劃分并并啟動多次程序:
# export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES nohup pypy3 dispatch_data.py & # export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES nohup pypy3 dispatch_data.py & # export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES nohup pypy3 dispatch_data.py & ...
注意:
所有消費者使用了同一個消費組的名字和不同的消費者名字(因為消費者名以進程ID為后綴)。
但數據量較大或者目標日志庫較多時,單個消費者的速度可能無法滿足需求,且因為Python的GIL的原因,只能用到一個CPU核。強烈建議您根據目標日志庫的Shard數以及CPU的數量進行劃分,啟動多次以便重復利用CPU資源。
基于測試,在沒有帶寬限制、接收端速率限制(如Splunk端)的情況下,以推進硬件用pypy3
運行上述樣例,單個消費者占用大約10%的單核CPU
下可以消費達到5 MB/s
原始日志的速率。因此,理論上可以達到50 MB/s
原始日志每個CPU核
,也就是每個CPU核每天可以消費4TB原始日志
。
注意: 這個數據依賴帶寬、硬件參數等。
消費組會將檢測點(check-point)保存在服務器端,當一個消費者停止,另外一個消費者將自動接管并從斷點繼續消費。
可以在不同機器上啟動消費者,這樣當一臺機器停止或者損壞的清下,其他機器上的消費者可以自動接管并從斷點進行消費。
理論上,為了備用,也可以啟動大于shard數量的消費者。
每一個日志庫(logstore)最多可以配置10個消費組,如果遇到錯誤ConsumerGroupQuotaExceed
則表示遇到限制,建議在控制臺端刪除一些不用的消費組。
在控制臺查看消費組狀態
通過云監控查看消費組延遲,并配置報警
如果服務入口(endpoint)配置為https://
前綴,如https://cn-beijing.log.aliyuncs.com
,程序與SLS的連接將自動使用HTTPS加密。
服務器證書*.aliyuncs.com
是GlobalSign簽發,默認大多數Linux/Windows的機器會自動信任此證書。如果某些特殊情況,機器不信任此證書,可以參考這里下載并安裝此證書。
上述內容就是Python消費組怎么實時跨域監測多日志庫數據,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。