您好,登錄后才能下訂單哦!
本文實例為大家分享了python實現微信消息防撤回的具體代碼,供大家參考,具體內容如下
使用了sqlite3保存數據,當有人撤回消息時取出數據發送到文件傳輸助手。
文件的話會先保存到本地,語音會以文件的方式發送。
wxpy 和 itchat很久沒更新了,有些功能沒法用了,web微信也不知道什么時候會涼。
幫助信息在注釋里。
# -*- coding: utf-8 -*- # 使用sqlite3保存message,當有人撤回消息時在數據庫中通過ID檢索該消息是否存在,如果存在則將撤回的消息發送到文件助手里。 # 目前只支持 text picture map sharing recording video attachment 類型的消息。 import wxpy import sqlite3 import os import re # 準備工作 # 創建attachment目錄用于存儲 圖像、地圖/位置、分享、語音、視頻、文件 if not os.path.isdir('attachment'): os.mkdir('attachment') attachment_path = os.path.join(os.getcwd(), 'attachment') bot = wxpy.Bot() # 用于獲取msg ID pattern = re.compile(r'\d{19}') # 測試wxpy能否正常工作 myself = bot.friends()[0] myself.send('Hello?') # 創建數據庫和message表 try: conn = sqlite3.connect('wxpy.db') cursor = conn.cursor() # cursor.execute('DROP TABLE MESSAGES') cursor.execute("""CREATE TABLE IF NOT EXISTS MESSAGES (id INTEGER PRIMARY KEY AUTOINCREMENT, msg_id INTEGER NOT NULL, msg_text TEXT, create_time DATE NOT NULL, revoke_time DATE, attachment_path TEXT, msg_sender TEXT NOT NULL, msg_type TEXT NOT NULL, msg_url TEXT, msg_raw_data TEXT NOT NULL)""") # print('establish successfully') finally: conn.commit() cursor.close() conn.close() # 注冊所有消息,在程序運行期間將插入所有支持的信息 @bot.register() def store_data(msg): # print(msg.raw) # 如果消息是支持的類型就將數據插入數據庫 if msg.type in [wxpy.TEXT, wxpy.RECORDING, wxpy.PICTURE, wxpy.ATTACHMENT, wxpy.VIDEO, wxpy.SHARING, wxpy.MAP]: insert_data(msg) # 撤回的消息類型是note elif msg.type == wxpy.NOTE: send_revoke(msg) # 插入數據 def insert_data(msg): try: conn = sqlite3.connect('wxpy.db') cursor = conn.cursor() if msg.type == wxpy.TEXT: cursor.execute("INSERT INTO MESSAGES (msg_id, msg_text, create_time, msg_sender, msg_type, msg_raw_data)\ values (?, ?, ?, ?, ?, ?)", (msg.id, msg.text, msg.create_time, str(msg.sender)[9:-1], msg.type, str(msg.raw))) # 將錄音/圖像/文件/視頻下載到本地,插入保存路徑。 elif msg.type in [wxpy.RECORDING, wxpy.PICTURE, wxpy.ATTACHMENT, wxpy.VIDEO]: save_path = os.path.join(attachment_path, msg.file_name) msg.get_file(save_path) cursor.execute('INSERT INTO MESSAGES (msg_id, create_time, attachment_path, msg_sender, msg_type,\ msg_raw_data) values (?, ?, ?, ?, ?, ?)', (msg.id, msg.create_time, save_path, str(msg.sender)[9:-1], msg.type, str(msg.raw))) # 插入分享/位置鏈接 elif msg.type in [wxpy.SHARING, wxpy.MAP]: cursor.execute('INSERT INTO MESSAGES (msg_id, msg_text, create_time, msg_sender, msg_type, msg_url,\ msg_raw_data) values (?, ?, ?, ?, ?, ?, ?)', (msg.id, msg.text, msg.create_time, str(msg.sender)[9:-1], msg.type, str(msg.url), str(msg.raw))) # print('insert data successfully') finally: conn.commit() cursor.close() conn.close() # 在數據庫中檢索消息是否存在,如果存在則將被撤回的消息發送到文件傳輸助手。 def send_revoke(message): msg_id = pattern.search(message.raw['Content']).group() try: conn = sqlite3.connect('wxpy.db') cursor = conn.cursor() cursor.execute('INSERT INTO MESSAGES (msg_id, create_time, msg_sender, msg_type, msg_raw_data)\ values (?, ?, ?, ?, ?)', (message.id, message.create_time, str(message.sender)[9:-1], message.type, str(message.raw))) msg_data = cursor.execute('SELECT * FROM MESSAGES WHERE msg_id=?', (msg_id, )).fetchall() # print('take out data successfully') finally: conn.commit() cursor.close() conn.close() if msg_data[0][7] == 'Text': msg_info = '告訴你一個秘密 {} 在 {} 撤回了文本\n{}'.format(msg_data[0][6], msg_data[0][3], msg_data[0][2]) bot.file_helper.send(msg_info) else: send_revoke_nontext(msg_data) # 非文本信息發送 def send_revoke_nontext(msg_data): if msg_data[0][7] == 'Picture': if msg_data[0][5][-4:] == '.gif': # 現在wxpy & itchat發不了GIF了 bot.file_helper('很抱歉,暫時不支持表情(gif)的撤回重發。') else: msg_info = '告訴你一個秘密 {} 在 {} 撤回了圖像'.format(msg_data[0][6], msg_data[0][3]) bot.file_helper.send(msg_info) bot.file_helper.send_image(msg_data[0][5]) elif msg_data[0][7] == 'Recording': msg_info = '告訴你一個秘密 {} 在 {} 撤回了語音'.format(msg_data[0][6], msg_data[0][3]) bot.file_helper.send(msg_info) bot.file_helper.send_file(msg_data[0][5]) elif msg_data[0][7] == 'Attachment': msg_info = '告訴你一個秘密 {} 在 {} 撤回了文件'.format(msg_data[0][6], msg_data[0][3]) bot.file_helper.send(msg_info) bot.file_helper.send_file(msg_data[0][5]) elif msg_data[0][7] == 'Video': msg_info = '告訴你一個秘密 {} 在 {} 撤回了視頻'.format(msg_data[0][6], msg_data[0][3]) bot.file_helper.send(msg_info) bot.file_helper.send_video(msg_data[0][5]) elif msg_data[0][7] == 'Sharing': msg_info = '告訴你一個秘密 {} 在 {} 撤回了分享\n{}\n{}'.format(msg_data[0][6], msg_data[0][3], msg_data[0][2],\ msg_data[0][8]) bot.file_helper.send(msg_info) elif msg_data[0][7] == 'Map': msg_info = '告訴你一個秘密 {} 在 {} 撤回了位置\n{}\n{}'.format(msg_data[0][6], msg_data[0][3], msg_data[0][2],\ msg_data[0][8]) bot.file_helper.send(msg_info) wxpy.embed()
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。