中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

使用pandas怎么對大文件進行計數處理

發布時間:2021-05-19 17:29:24 來源:億速云 閱讀:198 作者:Leah 欄目:開發技術

這期內容當中小編將會給大家帶來有關使用pandas怎么對大文件進行計數處理,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

Pandas讀取大文件

要處理的是由探測器讀出的脈沖信號,一組數據為兩列,一列為時間,一列為脈沖能量,數據量在千萬級,為了有一個直接的認識,先使用Pandas讀取一些

import pandas as pd
data = pd.read_table('filename.txt', iterator=True)
chunk = data.get_chunk(5)

而輸出是這樣的:

Out[4]: 
332.977889999979 -0.0164794921875 
0 332.97790 -0.022278 
1 332.97791 -0.026855 
2 332.97792 -0.030518 
3 332.97793 -0.045776 
4 332.97794 -0.032654

DataFram基本用法

這里,data只是個容器,pandas.io.parsers.TextFileReader。

使用astype可以實現dataframe字段類型轉換

輸出數據中,每組數據會多處一行,因為get_chunk返回的是pandas.core.frame.DataFrame格式, 而data在讀取過程中并沒有指定DataFrame的columns,因此在get_chunk過程中,默認將第一組數據作為columns。因此需要在讀取過程中指定names即DataFrame的columns。

import pandas as pd
data = pd.read_table('filename.txt', iterator=True, names=['time', 'energe'])
chunk = data.get_chunk(5) 
data['energe'] = df['energe'].astype('int')

輸出為

Out[6]:

indextimeenerge
0332.97789-0.016479
1332.97790-0.022278
2332.97791-0.026855
3332.97792-0.030518
4332.97793-0.045776

DataFram存儲和索引

這里講一下DataFrame這個格式,與一般二維數據不同(二維列表等),DataFrame既有行索引又有列索引,因此在建立一個DataFrame數據是

DataFrame(data, columns=[‘year', ‘month', ‘day'], 
index=[‘one', ‘two', ‘three'])

yearmonthday
0201041
1201152
2201263
3201375
4201489

而pd.read_table中的names就是指定DataFrame的columns,而index自動設置。 而DataFrame的索引格式有很多




類型說明例子
obj[val]選取單列或者一組列
obj.ix[val]選取單個行或者一組行
obj.ix[:,val]選取單個列或列子集
obj.ix[val1, val2]同時選取行和列
reindex方法將一個或多個軸匹配到新索引
xs方法根據標簽選取單行或單列,返回一個Series
icol,lrow方法根據整數位置選取單列或單行,返回一個Series
get_value,set_value根據行標簽列標簽選取單個值

exp: In[1]:data[:2]

Out[2]:


yearmonthday
0201041
1201152

In[2]:data[data[‘month']>5]

Out[2]:


yearmonthday
2201263
4201489

如果我們直接把data拿來比較的話,相當于data中所有的標量元素

In[3]:data[data<6]=0

Out[3]:


yearmonthday
0201000
1201100
2201260
3201370
4201489

Pandas運算

series = data.ix[0]
data - series

Out:


yearmonthday
0000
1111
2222
3334
4448

DataFrame與Series之間運算會將Series索引匹配到DataFrame的列,然后沿行一直向下廣播

如果令series1 = data[‘year']

data.sub(series1,axis=0)

則每一列都減去該series1,axis為希望匹配的軸,=0行索引,即匹配列,=1列索引,則按行匹配。

DataFrame的一些函數方法

這個就有很多了,比如排序和排名;求和、平均數以及方差、協方差等數學方法;還有就是唯一值(類似于集合)、值計數和成員資格等方法。

當然還有一些更高級的屬性,用的時候再看吧

數據處理

在得到數據樣式后我們先一次性讀取數據

start = time.time()
data = pd.read_table('Eu155_Na22_K40_MR_0CM_3Min.csv', names=['time', 'energe'])
end = time.time()
data.index
print("The time is %f s" % (end - start))
plus = data['energe']
plus[plus < 0] = 0
The time is 29.403917 s 
RangeIndex(start=0, stop=68319232, step=1)

對于一個2G大小,千萬級的數據,這個讀取速度還是挺快的。之前使用matlab load用時160多s,但是不知道這個是否把數據完全讀取了。然后只抽取脈沖信號,將負值歸0,因為會出現一定的電子噪聲從而產生一定負值。

然后就需要定位脈沖信號中的能峰了,也就是findpeaks

這里用到了scipy.signal中的find_peaks_cwt,具體用法可以參見官方文檔

peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)),它返回找到的peaks的位置,輸入第一個為數據,第二個為窗函數,也就是在這個寬度的能窗內尋找峰,我是這樣理解的。剛開始以為是數據的另一維坐標,結果找了半天沒結果。不過事實上這個找的確定也挺慢的。

50w條的數據,找了足足7分鐘,我這一個數據3000w條不得找半個多小時,而各種數據有好幾十,恩。。這樣是不行的,于是想到了并行的方法。這個下篇文章會講到,也就是把數據按照chunksize讀取,然后同時交給(map)幾個進程同時尋峰,尋完后返回(reduce)一起計數,計數的同時,子進程再此尋峰。

在處理的時候碰到我自己的破 筆記本由于內存原因不能load這個數據,并且想著每次copy這么大數據好麻煩,就把一個整體數據文件分割成了幾個部分,先對方法進行一定的實驗,時間快,比較方便。

import pandas as pd


def split_file(filename, size):
 name = filename.split('.')[0]
 data = pd.read_table(filename, chunksize=size, names=['time', 'intension'])
 i = 1
 for piece in data:
 outname = name + str(i) + '.csv'
 piece.to_csv(outname, index=False, names = ['time', 'intension'])
 i += 1

def split_csvfile(filename, size):
 name = filename.split('.')[0]
 data = pd.read_csv(filename, chunksize=size, names=['time', 'intension'])
 i = 1
 for piece in data:
 outname = name + str(i) + '.csv'
 piece = piece['intension']
 piece.to_csv(outname, index=False)
 i += 1

額..使用并行尋峰通過map/reduce的思想來解決提升效率這個想法,很早就實現了,但是,由于效果不是特別理想,所以放那也就忘了,今天整理代碼來看了下當時記的些筆記,然后竟然發現有個評論…..我唯一收到的評論竟然是“催稿”=。=。想一想還是把下面的工作記錄下來,免得自己后來完全忘記了。

rom scipy import signal
import os
import time
import pandas as pd
import numpy as np
from multiprocessing import Pool
import matplotlib.pylab as plt
from functools import partial


def findpeak(pluse):
 pluse[pluse < 0.05] = 0
 print('Sub process %s.' % os.getpid())
 start = time.time()
 peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)) # 返回一個列表
 end = time.time()
 print("The time is %f s" % (end - start))
 pks = [pluse[x] for x in peaks]
 return pks


def histcnt(pks, edge=None, channel=None):
 cnt = plt.hist(pks, edge)
 res = pd.DataFrame(cnt[0], index=channel, columns=['cnt'])
 return res


if __name__ == '__main__':
 with Pool(processes=8) as p:
 start = time.time()
 print('Parent process %s.' % os.getpid())
 pluse = pd.read_csv('data/samples.csv', chunksize=50000, names=['time', 'energe'])
 channel = pd.read_csv('data/channels.txt', names=['value'])
 edges = channel * 2
 edges = pd.DataFrame({'value': [0]}).append(edges, ignore_index=True)
 specal = []
 for data in pluse:
 total = p.apply_async(findpeak, (data['energe'],),
   callback=partial(histcnt, edge=edges['value'], channel=channel['value']))
 specal.append(total)
 print('Waiting for all subprocesses done...')
 p.close()
 p.join()
 print('All subprocesses done.')
 spec = sum(specal)
 plt.figure()
 plt.plot(spec['cnt'])
 spec.to_csv('data/spec1.csv', header=False)
 print('every is OK')
 end = time.time()
 print("The time is %f s" % (end - start))

由于對對進程線程的編程不是很了解,其中走了很多彎路,嘗試了很多方法也,這個是最終效果相對較好的。

首先,通過 pd.readtable以chunksize=50000分塊讀取,edges為hist過程中的下統計box。

然后,apply_async為非阻塞調用findpeak,然后將結果返回給回調函數histcnt,但是由于回調函數除了進程返回結果還有額外的參數,因此使用partial,對特定的參數賦予固定的值(edge和channel)并返回了一個全新的可調用對象,這個新的可調用對象仍然需要通過制定那些未被賦值的參數(findpeak返回的值)來調用。這個新的課調用對象將傳遞給partial()的固定參數結合起來,同一將所有參數傳遞給原始函數(histcnt)。(至于為啥不在histcnt中確定那兩個參數,主要是為了避免一直打開文件。。當然,有更好的辦法只是懶得思考=。=),還有個原因就是,apply_async返回的是一個對象,需要通過該對象的get方法才能獲取值。。

對于 apply_async官方上是這樣解釋的

Apply_async((func[, args[, kwds[, callback[, error_callback]]]])),apply()方法的一個變體,返回一個結果對象

如果指定回調,那么它應該是一個可調用的接受一個參數。結果準備好回調時,除非調用失敗,在這種情況下,應用error_callback代替。

如果error_callback被指定,那么它應該是一個可調用的接受一個參數。如果目標函數失敗,那么error_callback叫做除了實例。

回調應立即完成以來,否則線程處理結果將被封鎖。

不使用回調函數的版本如下,即先將所有子進程得到的數據都存入peaks列表中,然后所有進程完畢后在進行統計計數。

import pandas as pd
import time
import scipy.signal as signal
import numpy as np
from multiprocessing import Pool
import os
import matplotlib.pyplot as plt


def findpeak(pluse):
 pluse[pluse < 0] = 0
 pluse[pluse > 100] = 0
 print('Sub process %s.' % os.getpid())
 start = time.time()
 peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10))
 end = time.time()
 print("The time is %f s" % (end - start))
 res = [pluse[x] for x in peaks]
 return res


if __name__ == '__main__':
 with Pool(processes=8) as p:
 start = time.time()
 print('Parent process %s.' % os.getpid())
 pluse = pd.read_csv('data/sample.csv', chunksize=200000, names=['time', 'energe'])
 pks = []
 for data in pluse:
 pks.append(p.apply_async(findpeak, (data['energe'],)))
 print('Waiting for all subprocesses done...')
 p.close()
 p.join()
 print('All subprocesses done.')
 peaks = []
 for i, ele in enumerate(pks):
 peaks.extend(ele.get())
 peaks = pd.DataFrame(peaks, columns=['energe'])
 peaks.to_csv('peaks.csv', index=False, header=False, chunksize=50000)
 channel = pd.read_csv('data/channels.txt', names=['value'])
 channel *= 2
 channel = pd.DataFrame({'value': [0]}).append(channel, ignore_index=True)
 plt.figure()
 spec = plt.hist(peaks['energe'], channel['value'])
 # out.plot.hist(bins=1024)
 # print(out)
 # cnt = peaks.value_counts(bins=1024)
 # cnt.to_csv('data/cnt.csv', index=False, header=False)
 print('every is OK')
 end = time.time()
 print("The time is %f s" % (end - start))

上述就是小編為大家分享的使用pandas怎么對大文件進行計數處理了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

建宁县| 沙坪坝区| 高邮市| 辽阳市| 包头市| 海阳市| 新乐市| 雷波县| 五台县| 德庆县| 康保县| 蕲春县| 阿克陶县| 石城县| 墨竹工卡县| 通辽市| 蒙自县| 长乐市| 上思县| 阿合奇县| 虹口区| 衢州市| 平乐县| 安丘市| 南部县| 西华县| 清丰县| 徐州市| 城步| 龙川县| 万载县| 垣曲县| 三都| 炎陵县| 积石山| 依安县| 衡东县| 新竹县| 禹城市| 阆中市| 米脂县|