中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python 關于日志的分析

發布時間:2020-09-29 18:57:12 來源:網絡 閱讀:1143 作者:ghbSunny 欄目:編程語言

項目情況介紹:
基于Python 3.6.6 ,實現對nginx訪問的日志分析代碼,實現了對日志中code的占比統計和瀏覽器類型和訪問情況統計
實現的代碼段有:
1.編寫窗戶函數,實現在一定的時間內對數據進行分析
2.通過正則表達式對日志進行匹配,加載日志文件,提取出文本里每行的日志信息
3.編寫消費端代碼,即使得提取到的數據能夠按照消費端的代碼進行處理
4.消息分發代碼實現,通過queue,將提取的的文本放到隊列里,供消費端代碼處理
項目代碼如下

import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path
from user_agents import parse
"""
這段代碼,實現了再一段時間內獲得數據,通過不同的handler(即消費端函數)
對獲取到的同一份數據進行處理,主要是兩段消費函數,網頁返回的code的統計和瀏覽器的分析
這段代碼,窗口函數中,data = src.get(),使得沒有新的數據產生時,該代碼會阻塞,直到有新的數據生成,再次進行處理
"""
pattern = '''(?P<remote>[\d.]{7,}\s-\s-\s\[(?P<datetime>[^\[\]]+)\])\s\
"(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''

#編譯
regex = re.compile(pattern)

#構造字典
ops = {
    'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int,
    'useragent': lambda ua: parse(ua)
}

#提取信息
def extract(line: str) -> dict:
    matcher = regex.match(line)
    if matcher:
        return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()}

# 打開文件
def openfile(path: str):
    """裝載日志文件"""
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue

#裝載文件,判斷文件類型已經是否存在
def load(*paths):
    for item in paths:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():
                    yield from openfile(str(file))
        elif p.is_file():
            yield from openfile(str(p))

# 隨機生成100以內的數字
def source(second=1):
    """生成數據"""
    while True:
        yield {
            'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
            'value': random.randint(1, 100)
        }
        time.sleep(second)

# 滑動窗口函數
def window(src: Queue, handler, width: int, interval: int):
    '''
    窗口函數,表示間隔一段時間取出一定的數據進行處理
    :param src:數據源,這里是緩存隊列,用于獲取數據
    :param handler:數據處理的函數
    :param width:時間窗口函數,秒
    :param interval:處理時間間隔,秒
    '''
    start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
    current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
    buffer = []
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        # 從數據源獲取數據
        data = src.get()  # 這個代碼會阻塞,等待數據輸入,沒有數據輸入就阻塞
        if data:
            buffer.append(data)
            current = data['datetime']  # 存入臨時緩沖等待計算

        # 每隔interval重新計算buffer中的一次數據
        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            start = current
            # 清除超出width的數據
            buffer = [x for x in buffer if x['datetime'] > current - delta]

# 隨機數平均的測算函數
source()
def handler(iterable):
    #return sum(map(lambda x: x['value'], iterable)) / len(iterable)
    print(sum(map(lambda x:x['value'],iterable))/len(iterable))

# 測試函數
def donothing_handler(iterable):
    #return iterable
    print(iterable)

# 狀態碼占比
def status_handler(iterable):
    # 時間窗口內的一批數據
    status = {}
    for item in iterable:
        key = item['status']
        status[key] = status.get(key, 0) + 1
    total = len(iterable)
    print({k:float( "{:.2f}".format(status[k] / total)) for k, v in status.items()})
    return {k: status[k] / total for k, v in status.items()}

# 瀏覽器分析
allbrowsers = {}

def browser_handler(iterable):
    browsers = {}
    for item in iterable:
        ua = item['useragent']
        key = (ua.browser.family, ua.browser.version_string)
        browsers[key] = browsers.get(key, 0) + 1
        allbrowsers[key] = allbrowsers.get(key, 0) + 1

    print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10])
    return browsers

# 分發器
def dispatcher(src):
    # 分發器中記錄handler,同時保存各自的隊列
    handlers = []
    queues = []

    def reg(handler, width: int, interval: int):
        """
        注冊窗口處理函數
        :param handler:注冊數據處理函數
        :param width:時間窗口寬度
        :param interval:時間間隔
        """
        q = Queue()
        queues.append(q)
        # 多線程,數據并行
        h = threading.Thread(target=window, args=(q, handler, width, interval))
        handlers.append(h)

    def run():
        # 啟動線程處理數據
        for t in handlers:
            t.start()

        # 將獲取到的數據分發到所有的隊列中
        for item in src:
            for q in queues:
                q.put(item)
                # print(q.get())

    return reg, run

if __name__ == "__main__":
    import sys

    path = '/tmp/test.log'
    """
    以下的代碼為測試用的,用于統計每隔5s統計10s內的隨機數字的平均值
    reg, run = dispatcher(source())
    reg(handler, 10, 5)
    """

    reg, run = dispatcher(load(path))

    #每隔5s返回過去10s的數據,但是不做處理
    reg(donothing_handler, 10, 5)
    #每隔5s統計10s內的返回狀態碼的占比情況
    reg(status_handler, 10, 5)
    # 每隔5s統計10s內的瀏覽器類型占比情況,展示排行10s內訪問量前十的瀏覽器
    reg(browser_handler,10,5)
    run()
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

紫云| 淮安市| 平和县| 平舆县| 新安县| 旅游| 砚山县| 会理县| 广德县| 庆元县| 锦屏县| 浦东新区| 镇坪县| 乐陵市| 甘德县| 静乐县| 灌阳县| 浪卡子县| 通榆县| 贺州市| 民和| 将乐县| 隆昌县| 黔南| 永仁县| 鄂尔多斯市| 星子县| 德清县| 江源县| 历史| 亚东县| 伊川县| 偃师市| 永济市| 甘德县| 永福县| 池州市| 罗城| 桓台县| 靖远县| 尤溪县|