中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python大數據量文本文件問題怎么解決

發布時間:2022-12-28 15:57:04 來源:億速云 閱讀:87 作者:iii 欄目:開發技術

這篇文章主要介紹“Python大數據量文本文件問題怎么解決”,在日常操作中,相信很多人在Python大數據量文本文件問題怎么解決問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Python大數據量文本文件問題怎么解決”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

    測試環境

    Python 3.6.2

    Win 10 內存 8G,CPU I5 1.6 GHz

    背景描述

    這個作品來源于一個日志解析工具的開發,這個開發過程中遇到的一個痛點,就是日志文件多,日志數據量大,解析耗時長。在這種情況下,尋思一種高效解析數據解析方案。

    解決方案描述

    1、采用多線程讀取文件

    2、采用按塊讀取文件替代按行讀取文件

    由于日志文件都是文本文件,需要讀取其中每一行進行解析,所以一開始會很自然想到采用按行讀取,后面發現合理配置下,按塊讀取,會比按行讀取更高效。

    按塊讀取來的問題就是,可能導致完整的數據行分散在不同數據塊中,那怎么解決這個問題呢?解答如下:

    將數據塊按換行符\n切分得到日志行列表,列表第一個元素可能是一個完整的日志行,也可能是上一個數據塊末尾日志行的組成部分,列表最后一個元素可能是不完整的日志行(即下一個數據塊開頭日志行的組成部分),也可能是空字符串(日志塊中的日志行數據全部是完整的),根據這個規律,得出以下公式,通過該公式,可以得到一個新的數據塊,對該數據塊二次切分,可以得到數據完整的日志行

    上一個日志塊首部日志行 +\n + 尾部日志行 + 下一個數據塊首部日志行 + \n + 尾部日志行 + ...

    3、將數據解析操作拆分為可并行解析部分和不可并行解析部分

    數據解析往往涉及一些不可并行的操作,比如數據求和,最值統計等,如果不進行拆分,并行解析時勢必需要添加互斥鎖,避免數據覆蓋,這樣就會大大降低執行的效率,特別是不可并行操作占比較大的情況下。

    對數據解析操作進行拆分后,可并行解析操作部分不用加鎖。考慮到Python GIL的問題,不可并行解析部分替換為單進程解析。

    4、采用多進程解析替代多線程解析

    采用多進程解析替代多線程解析,可以避開Python GIL全局解釋鎖帶來的執行效率問題,從而提高解析效率。

    5、采用隊列實現“協同”效果

    引入隊列機制,實現一邊讀取日志,一邊進行數據解析:

    • 日志讀取線程將日志塊存儲到隊列,解析進程從隊列獲取已讀取日志塊,執行可并行解析操作

    • 并行解析操作進程將解析后的結果存儲到另一個隊列,另一個解析進程從隊列獲取數據,執行不可并行解析操作。

    代碼實現

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import re
    import time
    from datetime import datetime
    from joblib import Parallel, delayed, parallel_backend
    from collections import deque
    from multiprocessing import cpu_count
    import threading
    
    
    class LogParser(object):
        def __init__(self, chunk_size=1024*1024*10, process_num_for_log_parsing=cpu_count()):
            self.log_unparsed_queue = deque() # 用于存儲未解析日志
            self.log_line_parsed_queue = deque()  # 用于存儲已解析日志行
            self.is_all_files_read = False  # 標識是否已讀取所有日志文件
            self.process_num_for_log_parsing = process_num_for_log_parsing # 并發解析日志文件進程數
            self.chunk_size = chunk_size # 每次讀取日志的日志塊大小
            self.files_read_list = [] # 存放已讀取日志文件
            self.log_parsing_finished = False # 標識是否完成日志解析
    
    
        def read_in_chunks(self, filePath, chunk_size=1024*1024):
            """
            惰性函數(生成器),用于逐塊讀取文件。
            默認區塊大小:1M
            """
    
            with open(filePath, 'r', encoding='utf-8') as f:            
                while True:
                    chunk_data = f.read(chunk_size)
                    if not chunk_data:
                        break
                    yield chunk_data
    
    
        def read_log_file(self, logfile_path):
            '''
            讀取日志文件
            這里假設日志文件都是文本文件,按塊讀取后,可按換行符進行二次切分,以便獲取行日志
            '''
    
            temp_list = []  # 二次切分后,頭,尾行日志可能是不完整的,所以需要將日志塊頭尾行日志相連接,進行拼接
            for chunk in self.read_in_chunks(logfile_path, self.chunk_size):
                log_chunk = chunk.split('\n')
                temp_list.extend([log_chunk[0], '\n'])
                temp_list.append(log_chunk[-1])
                self.log_unparsed_queue.append(log_chunk[1:-1])
            self.log_unparsed_queue.append(''.join(temp_list).split('\n'))
            self.files_read_list.remove(logfile_path)
    
    
        def start_processes_for_log_parsing(self):
            '''啟動日志解析進程'''
    
            with parallel_backend("multiprocessing", n_jobs=self.process_num_for_log_parsing):
                Parallel(require='sharedmem')(delayed(self.parse_logs)() for i in range(self.process_num_for_log_parsing))
    
            self.log_parsing_finished = True
    
        def parse_logs(self):
            '''解析日志'''
    
            method_url_re_pattern = re.compile('(HEAD|POST|GET)\s+([^\s]+?)\s+',re.DOTALL)
            url_time_taken_extractor = re.compile('HTTP/1\.1.+\|(.+)\|\d+\|', re.DOTALL)
    
            while self.log_unparsed_queue or self.files_read_list:
                if not self.log_unparsed_queue:
                    continue
                log_line_list = self.log_unparsed_queue.popleft()
                for log_line in log_line_list:
                    #### do something with log_line
                    if not log_line.strip():
                        continue
    
                    res = method_url_re_pattern.findall(log_line)
                    if not res:
                        print('日志未匹配到請求URL,已忽略:\n%s' % log_line)
                        continue
                    method = res[0][0]
                    url = res[0][1].split('?')[0]  # 去掉了 ?及后面的url參數
    
                    # 提取耗時
                    res = url_time_taken_extractor.findall(log_line)
                    if res:
                        time_taken = float(res[0])
                    else:
                        print('未從日志提取到請求耗時,已忽略日志:\n%s' % log_line)
                        continue
    
                    # 存儲解析后的日志信息
                    self.log_line_parsed_queue.append({'method': method,
                                                       'url': url,
                                                       'time_taken': time_taken,
                                                       })
    
    
        def collect_statistics(self):
            '''收集統計數據'''
    
            def _collect_statistics():
                while self.log_line_parsed_queue or not self.log_parsing_finished:
                    if not self.log_line_parsed_queue:
                        continue
                    log_info = self.log_line_parsed_queue.popleft()
                    # do something with log_info
           
            with parallel_backend("multiprocessing", n_jobs=1):
                Parallel()(delayed(_collect_statistics)() for i in range(1))
    
        def run(self, file_path_list):
            # 多線程讀取日志文件
            for file_path in file_path_list:
                thread = threading.Thread(target=self.read_log_file,
                                          name="read_log_file",
                                          args=(file_path,))
                thread.start()
                self.files_read_list.append(file_path)
    
            # 啟動日志解析進程
            thread = threading.Thread(target=self.start_processes_for_log_parsing, name="start_processes_for_log_parsing")
            thread.start()
    
            # 啟動日志統計數據收集進程
            thread = threading.Thread(target=self.collect_statistics, name="collect_statistics")
            thread.start()
    
            start = datetime.now()
            while threading.active_count() > 1:
                print('程序正在努力解析日志...')
                time.sleep(0.5)
    
            end = datetime.now()
            print('解析完成', 'start', start, 'end', end, '耗時', end - start)
    
    if __name__ == "__main__":
        log_parser = LogParser()
        log_parser.run(['access.log', 'access2.log'])

    注意:

    需要合理的配置單次讀取文件數據塊的大小,不能過大,或者過小,否則都可能會導致數據讀取速度變慢。筆者實踐環境下,發現10M~15M每次是一個比較高效的配置。

    到此,關于“Python大數據量文本文件問題怎么解決”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    响水县| 黄石市| 洪江市| 江都市| 五大连池市| 始兴县| 黑河市| 渭南市| 利川市| 山丹县| 肇东市| 盐津县| 靖安县| 东莞市| 青海省| 天津市| 台安县| 通化县| 高青县| 即墨市| 德州市| 茶陵县| 沙坪坝区| 共和县| 彝良县| 都江堰市| 南部县| 社旗县| 郎溪县| 靖州| 洱源县| 德江县| 定陶县| 溧阳市| 二连浩特市| 运城市| 乐山市| 满城县| 金川县| 沙田区| 古田县|