中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python如何處理復雜CSV文件

發布時間:2022-07-20 09:37:55 來源:億速云 閱讀:149 作者:iii 欄目:開發技術

這篇“Python如何處理復雜CSV文件”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Python如何處理復雜CSV文件”文章吧。

    項目簡介

    鑒于項目保密的需要,不便透露太多項目的信息,因此,簡單介紹一下項目存在的難點:

    • 海量數據:項目是對CSV文件中的數據進行處理,而特點是數據量大...真的大!!!拿到的第一個CSV示例文件是110多萬行(小CASE),而第二個文件就到了4500萬行,等到第三個文件......好吧,一直沒見到第三個完整示例文件,因為太大了,據說是第二個示例文件的40多倍,大概二十億行......

    • 業務邏輯復雜:項目是需要對CSV文件的每一行數據的各種組合可能性進行判斷,而判斷的業務邏輯較為復雜,如何在解決復雜邏輯的同時保證較高的處理效率是難點之一。

    項目筆記與心得

    1.分批處理與多進程及多線程加速

    • 因為數據量太大,肯定是要分批對數據進行處理,否則,效率低不談,大概率也沒有足夠的內存能夠支撐,需要用到chunksize,此外,為了節約內存,以及提高處理效率,可以將文本類的數據存儲為“category”格式:

    • 項目整體是計算密集型的任務,因此,需要用到多進程,充分利用CPU的多核性能;

    • 多線程進行讀取與寫入,其中,寫入使用to_csv的增量寫入方法,mode參數設置為'a';

    • 多進程與多線程開啟一般為死循環,需要在合適的位置,放入結束循環的信號,以便處理完畢后退出多進程或多線程

    """鑒于項目保密需要,以下代碼僅為示例"""
    import time
    import pathlib as pl
    import pandas as pd
    from threading import Thread
    from multiprocessing import Queue, Process, cpu_count
    # 導入多線程Thread,多進程的隊列Queue,多進程Process,CPU核數cpu_count
    # 存放分段讀取的數據隊列,注:maxsize控制隊列的最大數量,避免一次性讀取到內存中的數據量太大
    data_queue = Queue(maxsize=cpu_count() * 2)  
    # 存放等待寫入磁盤的數據隊列
    write_queue = Queue()  
    def read_data(path: pl.Path, data_queue: Queue, size: int = 10000):
        """
        讀取數據放入隊列的方法
        :return:
        """
        data_obj = pd.read_csv(path, sep=',', header=0, chunksize=size, dtype='category')
        for idx, df in enumerate(data_obj):
            while data_queue.full():  # 如果隊列滿了,那就等待
                time.sleep(1)
            data_queue.put((idx + 1, df))
        data_queue.put((None, None))  # 放入結束信號
    def write_data(out_path: pl.Path, write_queue: Queue):
        """
        將數據增量寫入CSV的方法
        :return:
        """
        while True:
            while write_queue.empty():
                time.sleep(1)
            idx, df = write_queue.get()
            if df is None:
                return  # 結束退出
            df.to_csv(out_path, mode='a', header=None, index=False, encoding='ansi')  # 輸出CSV
    def parse_data(data_queue: Queue, write_queue: Queue):
        """
        從隊列中取出數據,并加工的方法
        :return:
        """
        while True:
            while write_queue.empty():
                time.sleep(1)
            idx, df = data_queue.get()
            if df is None:  # 如果是空的結束信號,則結束退出進程,
            # 特別注意結束前把結束信號放回隊列,以便其他進程也能接收到結束信號!!!
                data_queue.put((idx, df))
                return
            """處理數據的業務邏輯略過"""
            write_queue.put((idx, df))  # 將處理后的數據放入寫隊列
    # 創建一個讀取數據的線程
    read_pool = Thread(target=read_data, args=(read_data_queue, *args))
    read_pool.start()  # 開啟讀取線程
    # 創建一個增量寫入CSV數據的線程
    write_pool = Thread(target=write_data, args=(write_data_queue, *args))
    write_pool.start()  # 開啟寫進程
    pools = []  # 存放解析進程的隊列
    for i in range(cpu_count()):  # 循環開啟多進程,不確定開多少個進程合適的情況下,那么按CPU的核數開比較合理
        pool = Process(target=parse_data, args=(read_data_queue, write_data_queue, *args))
        pool.start()  # 啟動進程
        pools.append(pool)  # 加入隊列
    for pool in pools:
        pool.join()  # 等待所有解析進程完成
    # 所有解析進程完成后,在寫隊列放入結束寫線程的信號
    write_data_queue.put((None, None))  
    write_pool.join()  # 等待寫線程結束
    print('任務完成')

    2.優化算法提高效率

    將類對象存入dataframe列

    在嘗試了n種方案之后,最終使用了將類對象存到dataframe的列中,使用map方法,運行類方法,最后,將運行結果展開到多列中的方式。該方案本項目中取得了最佳的處理效率。

    """鑒于保密需要,以下代碼僅為示例"""
    class Obj:
        def __init__(self, ser: pd.Series):
            """
            初始化類對象
            :param ser: 傳入series
            """
            self.ser = ser  # 行數據
            self.attrs1 = []  # 屬性1
            self.attrs2 = []  # 屬性2
            self.attrs3 = []  # 屬性3
        def __repr__(self):
            """
            自定義輸出
            """
            attrs1 = '_'.join([str(a) for a in self.attrs1])
            attrs2 = '_'.join([str(a) for a in self.attrs2])
            attrs3 = '_'.join([str(a) for a in self.attrs3])
            return '_'.join([attrs1, attrs2, attrs3])
        def run(self):
            """運行業務邏輯"""
    # 創建obj列,存入類對象
    data['obj'] = data.apply(lambda x: Obj(x), axis=1)
    # 運行obj列中的類方法獲得判斷結果
    data['obj'] = data['obj'].map(lambda x: x.run())
    # 鏈式調用,1將類對象文本化->2拆分到多列->3刪除空列->4轉換為category格式
    data[['col1', 'col2', 'col3', ...省略]] = data['obj'].map(str).str.split('_', expand=True).dropna(axis=1).astype('category')
    # 刪除obj列
    data.drop(columns='obj', inplace=True)

    減少計算次數以提高運行效率

    在整個優化過程中,對運行效率產生最大優化效果的有兩項:

    • 一是改變遍歷算法,采用直接對整行數據進行綜合判斷的方法,使原需要遍歷22個組合的計算與判斷大大減少

    • 二是提前計算特征組合,制作成字典,后續直接查詢結果,而不再進行重復計算

    使用numpy加速計算

    numpy還是數據處理上的神器,使用numpy的方法,比自己實現的方法效率要高非常多,本項目中就用到了:bincount、argsort,argmax、flipud、in1d、all等,即提高了運行效率,又解決了邏輯判斷的問題:

    """numpy方法使用示例"""
    import numpy as np
    # 計算數字的個數組合bincount
    np.bincount([9, 2, 13, 12, 9, 10, 11])
    # 輸出結果:array([0, 0, 1, 0, 0, 0, 0, 0, 0, 2, 1, 1, 1, 1], dtype=int64)
    # 取得個數最多的數字argmax
    np.argmax(np.bincount([9, 2, 13, 12, 9, 10, 11]))
    # 輸出結果: 9
    # 將數字按照個數優先,其次大小進行排序argsort
    np.argsort(np.bincount([9, 2, 13, 12, 9, 10, 11]))
    # 輸出結果:array([ 0,  1,  3,  4,  5,  6,  7,  8,  2, 10, 11, 12, 13,  9], dtype=int64)
    # 翻轉列表flipud
    np.flipud(np.argsort(np.bincount([9, 2, 13, 12, 9, 10, 11])))
    # 輸出結果: array([ 9, 13, 12, 11, 10,  2,  8,  7,  6,  5,  4,  3,  1,  0], dtype=int64)
    # 查找相同值in1d
    np.in1d([2, 3, 4], [2, 9, 3])
    # 輸出結果: array([ True,  True, False]) 注:指2,3True,4False
    np.all(np.in1d([2, 3], [2, 9, 3]))
    # 輸出結果: array([ True,  True])
    # 是否全是all
    np.all(np.in1d([2, 3, 4], [2, 9, 3]))  # 判斷組合1是否包含在組合2中
    # 輸出結果: False
    np.all(np.in1d([2, 3], [2, 9, 3]))
    # 輸出結果: True

    優化前后的效率對比

    Python如何處理復雜CSV文件

    以上就是關于“Python如何處理復雜CSV文件”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    拉孜县| 扶余县| 巴青县| 永新县| 射洪县| 大埔区| 稻城县| 宜春市| 西乌珠穆沁旗| 宽城| 彝良县| 上蔡县| 苏州市| 景东| 英德市| 农安县| 威信县| 亳州市| 中宁县| 临武县| 中西区| 怀宁县| 鲁山县| 宁远县| 会泽县| 宁强县| 库尔勒市| 东乌珠穆沁旗| 永平县| 克拉玛依市| 高碑店市| 五大连池市| 盖州市| 台州市| 衡水市| 孝义市| 大厂| 涪陵区| 余干县| 霍州市| 蒲江县|