您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“Python怎么使用ClickHouse”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Python怎么使用ClickHouse”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
ClickHouse是近年來備受關注的開源列式數據庫(DBMS),主要用于數據聯機分析(OLAP)領域,于2016年開源。目前國內社區火熱,各個大廠紛紛跟進大規模使用。
今日頭條,內部用ClickHouse來做用戶行為分析,內部一共幾千個ClickHouse節點,單集群最大1200節點,總數據量幾十PB,日增原始數據300TB左右。
騰訊內部用ClickHouse做游戲數據分析,并且為之建立了一整套監控運維體系。
攜程內部從2018年7月份開始接入試用,目前80%的業務都跑在ClickHouse上。每天數據增量十多億,近百萬次查詢請求。
快手內部也在使用ClickHouse,存儲總量大約10PB, 每天新增200TB, 90%查詢小于3S。
在國外,Yandex內部有數百節點用于做用戶點擊行為分析,CloudFlare、Spotify等頭部公司也在使用。
ClickHouse最初是為 YandexMetrica 世界第二大Web分析平臺 而開發的。多年來一直作為該系統的核心組件被該系統持續使用著。
首先,我們回顧一些基礎概念:
OLTP
:是傳統的關系型數據庫,主要操作增刪改查,強調事務一致性,比如銀行系統、電商系統。
OLAP
:是倉庫型數據庫,主要是讀取數據,做復雜數據分析,側重技術決策支持,提供直觀簡單的結果。
ClickHouse做為列式數據庫,列式數據庫更適合OLAP場景,OLAP場景的關鍵特征:
絕大多數是讀請求
數據以相當大的批次(> 1000行)更新,而不是單行更新;或者根本沒有更新。
已添加到數據庫的數據不能修改。
對于讀取,從數據庫中提取相當多的行,但只提取列的一小部分。
寬表,即每個表包含著大量的列
查詢相對較少(通常每臺服務器每秒查詢數百次或更少)
對于簡單查詢,允許延遲大約50毫秒
列中的數據相對較小:數字和短字符串(例如,每個URL 60個字節)
處理單個查詢時需要高吞吐量(每臺服務器每秒可達數十億行)
事務不是必須的
對數據一致性要求低
每個查詢有一個大表。除了他以外,其他的都很小。
查詢結果明顯小于源數據。換句話說,數據經過過濾或聚合,因此結果適合于單個服務器的RAM中
Clickhouse客戶端工具為dbeaver,官網為https://dbeaver.io/。
dbeaver是免費和開源(GPL)為開發人員和數據庫管理員通用數據庫工具。[百度百科]
易用性是該項目的主要目標,是經過精心設計和開發的數據庫管理工具。免費、跨平臺、基于開源框架和允許各種擴展寫作(插件)。
它支持任何具有一個JDBC驅動程序數據庫。
它可以處理任何的外部數據源。
通過操作界面菜單中“數據庫”創建配置新連接,如下圖所示,選擇并下載ClickHouse驅動(默認不帶驅動)。
DBeaver配置是基于Jdbc方式,一般默認URL和端口如下:
jdbc:clickhouse://192.168.17.61:8123
如下圖所示。
在是用DBeaver連接Clickhouse做查詢時,有時候會出現連接或查詢超時的情況,這個時候可以在連接的參數中添加設置socket_timeout參數來解決問題。
jdbc:clickhouse://{host}:{port}[/{database}]?socket_timeout=600000
環境簡要說明:
硬件資源有限,僅有16G內存,交易數據為億級。
本應用是某交易大數據,主要包括交易主表、相關客戶信息、物料信息、歷史價格、優惠及積分信息等,其中主交易表為自關聯樹狀表結構。
為了分析客戶交易行為,在有限資源的條件下,按日和交易點抽取、匯集交易明細為交易記錄,如下圖所示。
其中,在ClickHouse上,交易數據結構由60個列(字段)組成,截取部分如下所示:
針對頻繁出現“would use 10.20 GiB , maximum: 9.31 GiB”等內存不足的情況,基于ClickHouse的SQL,編寫了提取聚合數據集SQL語句,如下所示。
大約60s返回結果,如下所示:
ClickHouse沒有提供官方Python接口驅動,常用第三方驅動接口為clickhouse_driver,可以使用pip方式安裝,如下所示:
pip install clickhouse_driver Collecting clickhouse_driver Downloading https://files.pythonhosted.org/packages/88/59/c570218bfca84bd0ece896c0f9ac0bf1e11543f3c01d8409f5e4f801f992/clickhouse_driver-0.2.1-cp36-cp36m-win_amd64.whl (173kB) 100% |████████████████████████████████| 174kB 27kB/s Collecting tzlocal<3.0 (from clickhouse_driver) Downloading https://files.pythonhosted.org/packages/5d/94/d47b0fd5988e6b7059de05720a646a2930920fff247a826f61674d436ba4/tzlocal-2.1-py2.py3-none-any.whl Requirement already satisfied: pytz in d:\python\python36\lib\site-packages (from clickhouse_driver) (2020.4) Installing collected packages: tzlocal, clickhouse-driver Successfully installed clickhouse-driver-0.2.1 tzlocal-2.1
使用的client api不能用了,報錯如下:
File "clickhouse_driver\varint.pyx", line 62, in clickhouse_driver.varint.read_varint
File "clickhouse_driver\bufferedreader.pyx", line 55, in clickhouse_driver.bufferedreader.BufferedReader.read_one
File "clickhouse_driver\bufferedreader.pyx", line 240, in clickhouse_driver.bufferedreader.BufferedSocketReader.read_into_buffer
EOFError: Unexpected EOF while reading bytes
Python驅動使用ClickHouse端口9000。
ClickHouse服務器和客戶端之間的通信有兩種協議:http(端口8123)和本機(端口9000)。DBeaver驅動配置使用jdbc驅動方式,端口為8123。
ClickHouse接口返回數據類型為元組,也可以返回Pandas的DataFrame,本文代碼使用的為返回DataFrame。
collection = self.client.query_dataframe(self.query_sql)
由于我本機最初資源為8G內存(現擴到16G),以及實際可操作性,分批次取數據保存到多個文件中,每個文件大約為1G。
# -*- coding: utf-8 -*- ''' Created on 2021年3月1日 @author: xiaoyw ''' import pandas as pd import json import numpy as np import datetime from clickhouse_driver import Client #from clickhouse_driver import connect # 基于Clickhouse數據庫基礎數據對象類 class DB_Obj(object): ''' 192.168.17.61:9000 ebd_all_b04.card_tbl_trade_m_orc ''' def __init__(self, db_name): self.db_name = db_name host='192.168.17.61' #服務器地址 port ='9000' #'8123' #端口 user='***' #用戶名 password='***' #密碼 database=db_name #數據庫 send_receive_timeout = 25 #超時時間 self.client = Client(host=host, port=port, database=database) #, send_receive_timeout=send_receive_timeout) #self.conn = connect(host=host, port=port, database=database) #, send_receive_timeout=send_receive_timeout) def setPriceTable(self,df): self.pricetable = df def get_trade(self,df_trade,filename): print('Trade join price!') df_trade = pd.merge(left=df_trade,right=self.pricetable[['occurday','DIM_DATE','END_DATE','V_0','V_92','V_95','ZDE_0','ZDE_92', 'ZDE_95']],how="left",on=['occurday']) df_trade.to_csv(filename,mode='a',encoding='utf-8',index=False) def get_datas(self,query_sql): n = 0 # 累計處理卡客戶數據 k = 0 # 取每次DataFrame數據量 batch = 100000 #100000 # 分批次處理 i = 0 # 文件標題順序累加 flag=True # 數據處理解釋標志 filename = 'card_trade_all_{}.csv' while flag: self.query_sql = query_sql.format(n, n+batch) print('query started') collection = self.client.query_dataframe(self.query_sql) print('return query result') df_trade = collection #pd.DataFrame(collection) i=i+1 k = len(df_trade) if k > 0: self.get_trade(df_trade, filename.format(i)) n = n + batch if k == 0: flag=False print('Completed ' + str(k) + 'trade details!') print('Usercard count ' + str(n) ) return n # 價格變動數據集 class Price_Table(object): def __init__(self, cityname, startdate): self.cityname = cityname self.startdate = startdate self.filename = 'price20210531.csv' def get_price(self): df_price = pd.read_csv(self.filename) ...... self.price_table=self.price_table.append(data_dict, ignore_index=True) print('generate price table!') class CardTradeDB(object): def __init__(self,db_obj): self.db_obj = db_obj def insertDatasByCSV(self,filename): # 存在數據混合類型 df = pd.read_csv(filename,low_memory=False) # 獲取交易記錄 def getTradeDatasByID(self,ID_list=None): # 字符串過長,需要使用''' query_sql = '''select C.carduser_id,C.org_id,C.cardasn,C.occurday as ...... limit {},{}) group by C.carduser_id,C.org_id,C.cardasn,C.occurday order by C.carduser_id,C.occurday''' n = self.db_obj.get_datas(query_sql) return n if __name__ == '__main__': PTable = Price_Table('湖北','2015-12-01') PTable.get_price() db_obj = DB_Obj('ebd_all_b04') db_obj.setPriceTable(PTable.price_table) CTD = CardTradeDB(db_obj) df = CTD.getTradeDatasByID()
返回本地文件為:
ClickHouse在OLAP場景下應用,查詢速度非常快,需要大內存支持。Python第三方clickhouse-driver 驅動基本滿足數據處理需求,如果能返回Pandas DataFrame最好。
ClickHouse和Pandas聚合都是非常快的,ClickHouse聚合函數也較為豐富(例如文中anyLast(x)返回最后遇到的值),如果能通過SQL聚合的,還是在ClickHouse中完成比較理想,把更小的結果集反饋給Python進行機器學習。
def info_del2(i): client = click_client(host='地址', port=端口, user='用戶名', password='密碼', database='數據庫') sql_detail='alter table SS_GOODS_ORDER_ALL delete where order_id='+str(i)+';' try: client.execute(sql_detail) except Exception as e: print(e,'刪除商品數據失敗')
在進行數據刪除的時候,python操作clickhou和mysql的方式不太一樣,這里不能使用以往常用的%s然后添加數據的方式,必須完整的編輯一條語句,如同上面方法所寫的一樣,傳進去的參數統一使用str類型
讀到這里,這篇“Python怎么使用ClickHouse”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。