您好,登錄后才能下訂單哦!
python是怎么實現skywalking的trace模塊過濾和報警,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
skywalking本身的報警功能,用起來視乎不是特別好用,目前想實現對skywalking的trace中的錯誤接口進行過濾并報警通知管理員和開發。所以自己就用python對skywalking做了二次數據清洗實現。項目方在了自己了github(https://github.com/shygit-dev/skywalking-cli-python)上了,有興趣的同學可以做二次改造,共同學習。下面簡單列出了代碼內容:
sw-trace.py
#!/usr/bin/env python # _*_ coding: utf-8 _*_ # Tile: # Author:shy import requests import time import smtplib from email.mime.text import MIMEText import re def interface_content_filter(trace_id): ''' 對詳細日志內容(業務邏輯報錯)進行過濾 :param trace_id: :return: 【1|0】 ''' url = "http://172.16.53.232:50001/query" params = { "trace_id": trace_id } detail_trace_id_log = requests.request(method="GET",url=url,params=params) detail_trace_id_log = detail_trace_id_log.text print(detail_trace_id_log) print(type(detail_trace_id_log)) with open("blackname_keyword_list","r",encoding="utf-8") as f: for line in f: print(line) result = re.search(line.strip(),detail_trace_id_log) print(result) if result != None: print("哥們匹配到日志黑名單關鍵字了:%s" % line) return 0 print("提示:%s不在關鍵字黑名單中" % trace_id) return 1 def interface_filter(endpointName): """ 設置接口黑名單 :param endpointName: :return: 【1|0】 """ endpointName = re.sub("\(|\)",".",endpointName) with open("blackname_list","r",encoding="utf-8") as f: bn_list = f.read() match_result = re.search(endpointName.strip(),bn_list) if match_result == None: print("提示:接口不存在黑名單中") return 1 print("提示:接口在黑名單中") return 0 def trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr): """ skywalking trace功能對錯誤接口進行過濾,默認最大一次獲取2000條數據,每分鐘執行一次 :param start_time: :param end_time: :return: """ url = sw_url data = { "query": "query queryTraces($condition: TraceQueryCondition) {\n data: queryBasicTraces(condition: $condition) {\n traces {\n key: segmentId\n endpointNames\n duration\n start\n isError\n traceIds\n }\n total\n }}", "variables": { "condition": { "queryDuration": { "start": start_time, #"2021-12-07 1734" "end": end_time, "step": "MINUTE" }, "traceState": "ERROR", "paging": { "pageNum": 1, "pageSize": per_page_size, "needTotal": "true" }, "queryOrder": "BY_START_TIME" # "traceId": "b669d0069be84fce82261901de412e7c.430.16388637511348105" } } } result = requests.request(method="post",url=url,json=data) i = 0 # print(result.content) # print(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:]))))) with open("mail.html","w",encoding="utf-8") as f: f.write('<head><meta charset="UTF-8"><title>Title</title><style>.t {border-right: 2px solid black;border-bottom: 2px solid black;}.t th,td {border-top: 2px solid black;border-left: 2px solid black;font-size: 10px;}</style></head><body><div >最近15分鐘統計:</div><table class="t" border="0" cellspacing="0" cellpadding="10px"><thead><tr width: 100px;">時間</th><th>持續時長</th><th>接口名稱</th><th>追蹤ID</th></tr></thead><tbody>') for trace in result.json()["data"]["data"]["traces"]: # print(trace["endpointNames"]) print("時間:%s\n" % time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:])))), "持續時長:%s\n" % trace["duration"], "接口名稱:%s\n" % trace["endpointNames"][0], "跟蹤ID:%s" % trace["traceIds"][0]) # print(time.localtime(1638869640.194)) i+=1 print(i) s_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:])))) dur_time = trace["duration"] endpointName = trace["endpointNames"][0] trace_id = trace["traceIds"][0] # 調用接口黑名單過濾功能 result = interface_filter(endpointName) if result == 0: print("哥們進入黑名單了!",endpointName) continue # 調用關鍵字黑名單過濾功能 keyword_result = interface_content_filter(trace_id) if keyword_result == 0: print("哥們進入關鍵字黑名單了!", trace_id) continue with open("mail.html","a",encoding="utf-8") as f: f.write('<tr><td>%s</td><td>%s</td><td>%s</td><td><a href="http://%s/query?trace_id=%s" rel="external nofollow" >%s</a></td></tr>' %(s_time,dur_time,endpointName,trace_detail_addr,trace_id,trace_id)) with open("mail.html","a",encoding="utf-8") as f: f.write('</tbody></table></body>') def send_mail(receiver): """ 發送報錯接口郵件 :return: """ server = "mail.test.com" sender = "sa@test.com" sender_pwd = "1qaz@WSX" send_addr = "sa@test.com" receiver = receiver with open("mail.html","r",encoding="utf-8") as f: content = f.read() if re.search("<td>",content) == None: print("無報錯接口!",content) return 0 print("郵件前",content) msg_mail = MIMEText(content,"html","utf-8") msg_mail["Subject"] = "Skywalking報錯接口統計" msg_mail["From"] = sender msg_mail["To"] = receiver server_obj = smtplib.SMTP_SSL(server) server_obj.connect(server,465) server_obj.login(sender,sender_pwd) server_obj.sendmail(send_addr,receiver,msg_mail.as_string()) if __name__ == "__main__": # 設定查詢時間間隔,默認900s(15min) end_time = time.time() start_time = end_time - 900 start_time=time.strftime("%Y-%m-%d %H%M",time.localtime(start_time)) end_time = time.strftime("%Y-%m-%d %H%M", time.localtime(end_time)) print(start_time) print(end_time) sw_url = "http://172.16.53.232:9412/graphql" # skywalking的前端服務的地址和端口 per_page_size = 5000 #指定一次獲取endpoint接口的數目 trace_detail_addr = "127.0.0.1:5000" #指定查詢指定trace_id詳細日志 receiver = "shy@test.com" #報警郵件接收人地址 trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr) send_mail(receiver) # interface_filter() # interface_content_filter("3c4212dd2dd548d394ba312c4619405d.104.16390380592724487")
sw-trace-id.py
#!/usr/bin/env python # _*_ coding: utf-8 _*_ # Tile: # Author:shy import requests import time from flask import Flask,request app = Flask(__name__) @app.route("/query",methods=["get"]) def trace_id_query(): """ 查詢指定trace_id詳細日志信息 :return: f.read() """ trace_id = request.args.get("trace_id") url="http://172.16.53.232:9412/graphql" # url="http://skywalking.roulw.com/graphql" data = { "query": "query queryTrace($traceId: ID!) {\n trace: queryTrace(traceId: $traceId) {\n spans {\n traceId\n segmentId\n spanId\n parentSpanId\n refs {\n traceId\n parentSegmentId\n parentSpanId\n type\n }\n serviceCode\n serviceInstanceName\n startTime\n endTime\n endpointName\n type\n peer\n component\n isError\n layer\n tags {\n key\n value\n }\n logs {\n time\n data {\n key\n value\n }\n }\n }\n }\n }", "variables": { "traceId": trace_id } } result = requests.request(method="post",url=url,json=data) with open("detail_log", "w", encoding="utf-8") as f: f.write("<div style='color: red;font-size: 30px;'>生產Skywalking報錯接口跟蹤日志日志:<br /></div>") for trace_id in result.json()["data"]["trace"]["spans"]: if trace_id["isError"]: # print(trace_id) print("服務名稱:%s\n" % trace_id["serviceCode"], "開始時間:%s\n" % trace_id["startTime"], "接口名稱:%s\n" % trace_id["endpointName"], "peer名稱:%s\n" % trace_id["peer"], "tags名稱:%s\n" % trace_id["tags"], "詳細日志:%s" % trace_id["logs"]) content = "服務名稱:%s<br />開始時間:%s<br />接口名稱:%s<br />peer名稱:%s<br />tags名稱:%s" % (trace_id["serviceCode"],trace_id["startTime"],trace_id["endpointName"],trace_id["peer"],trace_id["tags"]) with open("detail_log","a",encoding="utf-8") as f: f.write(content) f.write("<br />********詳細日志**********<br />") for logs in trace_id["logs"]: for log in logs["data"]: if log["key"] == "message": print(log["value"]) with open("detail_log","a",encoding="utf-8") as f: f.write(log["value"]) # return log["value"] elif log["key"] == "stack": print(log["value"]) with open("detail_log","a",encoding="utf-8") as f: f.write(log["value"]) with open("detail_log", "a", encoding="utf-8") as f: f.write("<div style='color: red;font-size: 20px;'><br />========下一個接口信息=========<br /></div>") with open("detail_log","r",encoding="utf-8") as f: return f.read() if __name__ == "__main__": # trace_id = "14447ae7199c40a2b9862411daba180b.2142.16388920322367785" # trace_id_query(trace_id) app.run()
關于python是怎么實現skywalking的trace模塊過濾和報警問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。