中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么使用Python實現tail

發布時間:2023-03-01 16:09:13 來源:億速云 閱讀:99 作者:iii 欄目:開發技術

本篇內容介紹了“怎么使用Python實現tail”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

1.第一版--從文件尾部讀取實時數據

主要思路是: 打開文件, 把指針移動到文件最后, 然后有數據則輸出數據, 無數據則休眠一段時間.

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            f.seek(0, 2)  # 從文件結尾處開始seek
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都會每次都打印新的一行
                else:
                    time.sleep(self.interval)


if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()

之后只要做如下調用即可:

python xxx.py filename 

2.第二版--實現tail -f

tail -f默認先讀取最后10行數據,再從文件尾部讀取實時數據.如果對于小文件,可以先讀取所有文件內容,并輸出最后10行, 但是讀取全文再獲取最后10行的性能不高, 而從后滾10行的邊界條件也很復雜, 先看先讀取全文再獲取最后10行的實現:

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            self.read_last_line(f)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都會每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, f):
        last_lines = f.readlines()[-10:]
        for line in last_lines:
            self.output(line)

if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()

可以看到實現很簡單, 相比第一版只多了個read_last_line的函數, 接下來就要解決性能的問題了, 當文件很大的時候, 這個邏輯是不行的, 特別是有些日志文件經常有幾個G大, 如果全讀出來內存就爆了. 而在Linux系統中, 沒有一個接口可以指定指針跳到倒數10行, 只能使用如下方法來模擬輸出倒數10行:

  • 首先游標跳轉到最新的字符, 保存當前游標, 然后預估一行數據的字符長度, 最好偏多, 這里我按1024字符長度為一行來處理

  • 然后利用seek的方法,跳轉到seek(-1024 * 10, 2)的字符, 這就是我們預估的倒數10行內的內容

  • 接著對內容進行判斷, 如果跳轉的字符長度小于 10 * 1024, 則證明整個文件沒有10行, 則采用原來的read_last_line方法.

  • 如果跳轉到字符長度等于1024 * 10, 則利用換行符計算已取字符長度共有多少行,如果行數大于10,那只輸出最后10行,如果只讀了4行,則繼續讀6*1024,直到讀滿10行為止

通過以上步奏, 就把倒數10行的數據計算好了可以打印出來, 可以進入追加數據了, 但是這時候文件內容可能發生改變了, 我們的游標也發生改變了, 這時候要把游標跳回到剛才保存的游標,防止漏打或者重復打印數據.

分析完畢后, 就可以開始重構read_last_line函數了.

import time
import sys

from typing import Callable, List, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

    def __call__(self, n: int = 10):
        with open(self.file_name) as f:
            self.read_last_line(f, n)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都會每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 跳轉游標到最后
        file.seek(0, 2)
        # 獲取當前結尾的游標位置
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳轉的字符長度大于原來文件長度,那就把所有文件內容打印出來
                file.seek(0) # 由于read方法是按照游標進行打印, 所以要重置游標
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新獲取游標位置
                now_tell: int = file.tell()
                break
            # 跳轉到我們預估的字符位置
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果獲取的行數大于要求的行數,則獲取前n行的行數
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果獲取的行數小于要求的行數,則預估需要獲取的行數,繼續獲取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游標,確保接下來打印的數據不重復
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

3.第三版--優雅的讀取輸出日志文件

可以發現實時讀取那塊的邏輯性能還是很差, 如果每秒讀一次文件,實時性就太慢了,把間隔改小了,則處理器占用太多. 性能最好的情況是如果能得知文件更新再進行打印文件, 那性能就能得到保障了.慶幸的是,在Linux中inotify提供了這樣的功能. 此外,日志文件有一個特點就是會進行logrotate,如果日志被logrotate了,那我們就需要重新打開文件,并進一步讀取數據, 這種情況也可以利用到inotify, 當inotify獲取到文件重新打開的事件時,我們就重新打開文件,再讀取.

import os
import sys

from typing import Callable, List, NoReturn

import pyinotify

multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF  # 監控多個事件


class InotifyEventHandler(pyinotify.ProcessEvent):  # 定制化事件處理類,注意繼承
    """
    執行inotify event的封裝
    """
    f: 'open()'
    filename: str
    path: str
    wm: 'pyinotify.WatchManager'
    output: Callable

    def my_init(self, **kargs):
        """pyinotify.ProcessEvent要求不能直接繼承__init__, 而是要重寫my_init, 我們重寫這一段并進行初始化"""

        # 獲取文件
        filename: str = kargs.pop('filename')
        if not os.path.exists(filename):
            raise RuntimeError('Not Found filename')
        if '/' not in filename:
            filename = os.getcwd() + '/' + filename
        index = filename.rfind('/')
        if index == len(filename) - 1 or index == -1:
            raise RuntimeError('Not a legal path')

        self.f = None
        self.filename = filename
        self.output: Callable = kargs.pop('output')
        self.wm = kargs.pop('wm')
        # 只監控路徑,這樣就能知道文件是否移動
        self.path = filename[:index]
        self.wm.add_watch(self.path, multi_event)

    def read_line(self):
        """統一的輸出方法"""
        for line in self.f.readlines():
            self.output(line)

    def process_IN_MODIFY(self, event):
        """必須為process_事件名稱,event表示事件對象, 這里表示監控到文件發生變化, 進行文件讀取"""
        if event.pathname == self.filename:
            self.read_line()

    def process_IN_MOVE_SELF(self, event):
        """必須為process_事件名稱,event表示事件對象, 這里表示監控到文件發生重新打開, 進行文件讀取"""
        if event.pathname == self.filename:
            # 檢測到文件被移動重新打開文件
            self.f.close()
            self.f = open(self.filename)
            self.read_line()

    def __enter__(self) -> 'InotifyEventHandler':
        self.f = open(self.filename)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.f.close()


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

        wm = pyinotify.WatchManager()  # 創建WatchManager對象
        inotify_event_handler = InotifyEventHandler(
            **dict(filename=file_name, wm=wm, output=output)
        )  # 實例化我們定制化后的事件處理類, 采用**dict傳參數
        wm.add_watch('/tmp', multi_event)  # 添加監控的目錄,及事件
        self.notifier = pyinotify.Notifier(wm, inotify_event_handler)  # 在notifier實例化時傳入,notifier會自動執行
        self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler

    def __call__(self, n: int = 10):
        """通過inotify的with管理打開文件"""
        with self.inotify_event_handle as i:
            # 先讀取指定的行數
            self.read_last_line(i.f, n)
            # 啟用inotify的監聽
            self.notifier.loop()

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 獲取當前結尾的游標位置
        file.seek(0, 2)
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳轉的字符長度大于原來文件長度,那就把所有文件內容打印出來
                file.seek(0)
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新獲取游標位置
                now_tell: int = file.tell()
                break
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果獲取的行數大于要求的行數,則獲取前n行的行數
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果獲取的行數小于要求的行數,則預估需要獲取的行數,繼續獲取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游標,確保接下來打印的數據不重復
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

可以看到, 從原本的open打開文件改為用inotify打開文件(這時候會調用my_init方法進行初始化), 打開后還是運行我們打開原來n行的代碼, 然后就交給inotify運行. 在inotify運行之前, 我們把重新打開文件方法和打印文件方法都掛載在inotifiy對應的事件里, 之后inotify運行時, 會根據對應的事件執行對應的方法。

“怎么使用Python實現tail”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

泌阳县| 宁海县| 娄烦县| 夹江县| 沾益县| 武隆县| 锡林郭勒盟| 乐清市| 连州市| 招远市| 新兴县| 张家口市| 长治市| 信宜市| 手机| 淮安市| 内黄县| 鲁甸县| 页游| 定陶县| 宁津县| 新泰市| 沅陵县| 枝江市| 舟曲县| 赤城县| 淮南市| 四平市| 永济市| 乾安县| 洱源县| 巩义市| 兰溪市| 延吉市| 天等县| 临湘市| 扬中市| 琼结县| 莱芜市| 亚东县| 鄄城县|