您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關Python中的區塊鏈怎么利用POS算法實現,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
區塊鏈中的共識算法
在比特幣公鏈架構解析中,就曾提到過為了實現去中介化的設計,比特幣設計了一套共識協議,并通過此協議來保證系統的穩定性和防攻擊性。 并且我們知道,截止目前使用最廣泛,也是最被大家接受的共識算法,是我們先前介紹過的POW(proof of work)工作量證明算法。目前市值排名前二的比特幣和以太坊也是采用的此算法。
雖然POW共識算法取得了巨大的成功,但對它的質疑也從來未曾停止過。 其中最主要的一個原因就是電力消耗。據不完全統計,基于POW的挖礦機制所消耗的電量是非常巨大的,甚至比絕大多數國家耗電量還要多。這對我們的資源造成了極大的浪費,此外隨著比特大陸等公司的強勢崛起,造成了算力的高度集中。
基于以上種種原因,更多的共識算法被提出來 POS、DPOS、BPFT等等。 今天我們就來認識POS(proof of stake)算法。
Proof of stake,譯為權益證明。你可能已經猜到了,權益證明簡單理解就是擁有更多token的人,有更大的概率獲得記賬權利,然后獲得獎勵。 這個概率具體有多大呢? 下面我們在代碼實現中會展示,分析也放在后面。 當然,POS是會比POW更好嗎? 會更去中心化嗎? 現在看來未必,所以我們這里也不去對比誰優誰劣。 我們站在中立的角度,單純的來討論討論POS這種算法。
代碼實戰
生成一個Block
既然要實現POS算法,那么就難免要生成一條鏈,鏈又是由一個個Block生成的,所以下面我們首先來看看如何生成Block,當然在前面的內容里面,關于如何生成Block,以及交易、UTXO等等都已經介紹過了。由于今天我們的核心是實現POS,所以關于Block的生成,我們就用最簡單的實現方式,好讓大家把目光聚焦在核心的內容上面。
我們用三個方法來實現生成一個合法的區塊
calculate_hash 計算區塊的hash值
is_block_valid 校驗區塊是否合法
generate_block 生成一個區塊
from hashlib import sha256 from datetime import datetime def generate_block(oldblock, bpm, address): """ :param oldblock: :param bpm: :param address: :return: """ newblock = { "Index": oldblock["Index"] + 1, "BPM": bpm, "Timestamp": str(datetime.now()), "PrevHash": oldblock["Hash"], "Validator": address } newblock["Hash"] = calculate_hash(newblock) return newblock def calculate_hash(block): record = "".join([ str(block["Index"]), str(block["BPM"]), block["Timestamp"], block["PrevHash"] ]) return sha256(record.encode()).hexdigest() def is_block_valid(newblock, oldblock): """ :param newblock: :param oldblock: :return: """ if oldblock["Index"] + 1 != newblock["Index"]: return False if oldblock["Hash"] != newblock["PrevHash"]: return False if calculate_hash(newblock) != newblock["Hash"]: return False return True
這里為了更靈活,我們沒有用類的實現方式,直接采用函數來實現了Block生成,相信很容易看懂。
創建一個TCP服務器
由于我們需要用權益證明算法來選擇記賬人,所以需要從很多Node(節點)中選擇記賬人,也就是需要一個server讓節點鏈接上來,同時要同步信息給節點。因此需要一個TCP長鏈接。
from socketserver import BaseRequestHandler, ThreadingTCPServer def run(): # start a tcp server serv = ThreadingTCPServer(('', 9090), HandleConn) serv.serve_forever()
在這里我們用了python內庫socketserver來創建了一個TCPServer。 需要注意的是,這里我們是采用的多線程的創建方式,這樣可以保證有多個客戶端同時連接上來,而不至于被阻塞。當然,這里這個server也是存在問題的,那就是有多少個客戶端連接,就會創建多少個線程,更好的方式是創建一個線程池。由于這里是測試,所以就采用更簡單的方式了。
相信大家已經看到了,在我們創建TCPServer的時候,使用到了HandleConn,但是我們還沒有定義,所以接下來我們就來定義一個HandleConn
消息處理器
下面我們來實現Handler函數,Handler函數在跟Client Node通信的時候,需要我們的Node實現下面的功能
Node可以輸入balance(token數量) 也就是股權數目
Node需要能夠接收廣播,方便Server同步區塊以及記賬人信息
添加自己到候選人名單 (候選人為持有token的人)
輸入BPM生成Block
驗證一個區塊的合法性
感覺任務還是蠻多的,接下來我們看代碼實現
import threading from queue import Queue, Empty # 定義變量 block_chain = [] temp_blocks = [] candidate_blocks = Queue() # 創建隊列,用于線程間通信 announcements = Queue() validators = {} My_Lock = threading.Lock() class HandleConn(BaseRequestHandler): def handle(self): print("Got connection from", self.client_address) # validator address self.request.send(b"Enter token balance:") balance = self.request.recv(8192) try: balance = int(balance) except Exception as e: print(e) t = str(datetime.now()) address = sha256(t.encode()).hexdigest() validators[address] = balance print(validators) while True: announce_winner_t = threading.Thread(target=annouce_winner, args=(announcements, self.request,), daemon=True) announce_winner_t.start() self.request.send(b"\nEnter a new BPM:") bpm = self.request.recv(8192) try: bpm = int(bpm) except Exception as e: print(e) del validators[address] break # with My_Lock: last_block = block_chain[-1] new_block = generate_block(last_block, bpm, address) if is_block_valid(new_block, last_block): print("new block is valid!") candidate_blocks.put(new_block) self.request.send(b"\nEnter a new BPM:\n") annouce_blockchain_t = threading.Thread(target=annouce_blockchain, args=(self.request,), daemon=True) annouce_blockchain_t.start()
這段代碼,可能對大多數同學來說是有難度的,在這里我們采用了多線程的方式,同時為了能夠讓消息在線程間通信,我們使用了隊列。 這里使用隊列,也是為了我們的系統可以更好的拓展,后面如果可能,這一節的程序很容易拓展為分布式系統。 將多線程里面處理的任務拆分出去成獨立的服務,然后用消息隊列進行通信,就是一個簡單的分布式系統啦。(是不是很激動?)
由于這里有難度,所以代碼還是講一講吧
# validator address self.request.send(b"Enter token balance:") balance = self.request.recv(8192) try: balance = int(balance) except Exception as e: print(e) t = str(datetime.now()) address = sha256(t.encode()).hexdigest() validators[address] = balance print(validators)
這一段就是我們提到的Node 客戶端添加自己到候選人的代碼,每鏈接一個客戶端,就會添加一個候選人。 這里我們用添加的時間戳的hash來記錄候選人。 當然也可以用其他的方式,比如我們代碼里面的client_address
announce_winner_t = threading.Thread(target=annouce_winner, args=(announcements, self.request,), daemon=True) announce_winner_t.start() def annouce_winner(announcements, request): """ :param announcements: :param request: :return: """ while True: try: msg = announcements.get(block=False) request.send(msg.encode()) request.send(b'\n') except Empty: time.sleep(3) continue
然后接下來我們起了一個線程去廣播獲得記賬權的節點信息到所有節點。
self.request.send(b"\nEnter a new BPM:") bpm = self.request.recv(8192) try: bpm = int(bpm) except Exception as e: print(e) del validators[address] break # with My_Lock: last_block = block_chain[-1] new_block = generate_block(last_block, bpm, address) if is_block_valid(new_block, last_block): print("new block is valid!") candidate_blocks.put(new_block)
根據節點輸入的BPM值生成一個區塊,并校驗區塊的有效性。 將有效的區塊放到候選區塊當中,等待記賬人將區塊添加到鏈上。
annouce_blockchain_t = threading.Thread(target=annouce_blockchain, args=(self.request,), daemon=True) annouce_blockchain_t.start() def annouce_blockchain(request): """ :param request: :return: """ while True: time.sleep(30) with My_Lock: output = json.dumps(block_chain) try: request.send(output.encode()) request.send(b'\n') except OSError: pass
最后起一個線程,同步區塊鏈到所有節點。
看完了,節點跟Server交互的部分,接下來是最重要的部分,
POS算法實現
def pick_winner(announcements): """ 選擇記賬人 :param announcements: :return: """ time.sleep(10) while True: with My_Lock: temp = temp_blocks lottery_pool = [] # if temp: for block in temp: if block["Validator"] not in lottery_pool: set_validators = validators k = set_validators.get(block["Validator"]) if k: for i in range(k): lottery_pool.append(block["Validator"]) lottery_winner = choice(lottery_pool) print(lottery_winner) # add block of winner to blockchain and let all the other nodes known for block in temp: if block["Validator"] == lottery_winner: with My_Lock: block_chain.append(block) # write message in queue. msg = "\n{0} 贏得了記賬權利\n".format(lottery_winner) announcements.put(msg) break with My_Lock: temp_blocks.clear()
這里我們用pick_winner 來選擇記賬權利,我們根據token數量構造了一個列表。 一個人獲得記賬權利的概率為:
p = mount['NodeA']/mount['All']
文字描述就是其token數目在總數中的占比。 比如總數有100個,他有10個,那么其獲得記賬權的概率就是0.1, 到這里核心的部分就寫的差不多了,接下來,我們來添加節點,開始測試吧
測試POS的記賬方式
在測試之前,起始還有一部分工作要做,前面我們的run方法需要完善下,代碼如下:
def run(): # create a genesis block t = str(datetime.now()) genesis_block = { "Index": 0, "Timestamp": t, "BPM": 0, "PrevHash": "", "Validator": "" } genesis_block["Hash"] = calculate_hash(genesis_block) print(genesis_block) block_chain.append(genesis_block) thread_canditate = threading.Thread(target=candidate, args=(candidate_blocks,), daemon=True) thread_pick = threading.Thread(target=pick_winner, args=(announcements,), daemon=True) thread_canditate.start() thread_pick.start() # start a tcp server serv = ThreadingTCPServer(('', 9090), HandleConn) serv.serve_forever() def candidate(candidate_blocks): """ :param candidate_blocks: :return: """ while True: try: candi = candidate_blocks.get(block=False) except Empty: time.sleep(5) continue temp_blocks.append(candi) if __name__ == '__main__': run()
添加節點連接到TCPServer
為了充分減少程序的復雜性,tcp client我們這里就不實現了,可以放在后面拓展部分。 畢竟我們這個系統是很容易擴展的,后面我們拆分了多線程的部分,在實現tcp client就是一個完整的分布式系統了。
所以,我們這里用linux自帶的命令 nc,不知道nc怎么用的同學可以google或者 man nc
啟動服務 運行 python pos.py
打開3個終端
分別輸入下面命令
nc localhost 9090
終端如果輸出
Enter token balance:
說明你client已經鏈接服務器ok啦.
測試POS的記賬方式
接下來依次按照提示操作。 balance可以按心情來操作,因為這里是測試,我們輸入100,
緊接著會提示輸入BPM,我們前面提到過,輸入BPM是為了生成Block,那么就輸入吧,隨便輸入個9. ok, 接下來就稍等片刻,等待記賬。
輸出如同所示
依次在不同的終端,根據提示輸入數字,等待消息同步。
生成區塊鏈
下面是我這邊獲得的3個block信息。
上述就是小編為大家分享的Python中的區塊鏈怎么利用POS算法實現了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。