您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關使用python怎么爬取微信公眾號文章,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
TIMEOUT = 20 from requests import Request, Session, PreparedRequest import requests from selenium import webdriver from selenium.common.exceptions import NoSuchElementException from bs4 import BeautifulSoup as bs import pymysql # 要爬取的內容 keyword = '美女圖片' options = webdriver.ChromeOptions() # 設置中文 options.add_argument('lang=zh_CN.UTF-8') # 更換頭部 options.add_argument( 'user-agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36"') browser = webdriver.Chrome(chrome_options=options) REDIS_HOST = '192.168.1.248' REDIS_PORT = 6379 REDIS_PASSWORD = '*****' REDIS_KEY = 'requests' PROXY_POOL_URL = 'http://127.0.0.1:8080/random' MAX_FAILED_TIME = 5 MYSQL_HOST = 'localhost' MYSQL_PORT = 3306 MYSQL_USER = 'moxiao' MYSQL_PASSWORD = '******' class mysqlConn(): def __init__(self, host=MYSQL_HOST, username=MYSQL_USER, password=MYSQL_PASSWORD, port=MYSQL_PORT): """ mysql 初始化 :param host: :param username: :param password: :param port: """ try: self.db = pymysql.Connection(host=host, user=username, password=password, database='weixin_data', port=port) self.cursor = self.db.cursor() except pymysql.MySQLError as e: print(e.args) def insert(self, table, data): keys = ', '.join(data.keys()) values = ', '.join(['%s'] * len(data)) sql = 'insert into %s (%s) values (%s)' % (table, keys, values) try: self.cursor.execute(sql, tuple(data.values())) self.db.commit() except pymysql.MySQLError as e: print(e.args) self.db.rollback() class WeixinRequest(Request): def __init__(self, url, callback, method="GET", headers=None, need_proxy=False, fail_time=0, timeout=TIMEOUT): super(WeixinRequest, self).__init__(url=url, method=method, headers=headers) self.callback = callback self.need_proxy = need_proxy self.fail_time = fail_time self.timeout = timeout def prepare(self): p = PreparedRequest() p.prepare( method=self.method, url=self.url, headers=self.headers, ) return p class WeixinResponse(): def __init__(self, text): self.text = text def set_status_code(self, status_code): self.status_code = status_code import pickle from redis import StrictRedis class RedisQueue(): def __init__(self): """ 初始化redis """ self.db = StrictRedis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=3) def add(self, request): """ 向隊列添加序列化后的Request :param request:請求對象 :return:添加結果 """ if isinstance(request, WeixinRequest): return self.db.rpush(REDIS_KEY, pickle.dumps(request)) return False def pop(self): """ 取出下一個request并反序列化 :return: Request 或者 None """ if self.db.llen(REDIS_KEY): return pickle.loads(self.db.lpop(REDIS_KEY)) return False def empty(self): return self.db.llen(REDIS_KEY) == 0 def del_all(self): return self.db.delete(REDIS_KEY) def get_proxy(self): """ 從代理池獲取代理IP :return: """ try: response = requests.get(PROXY_POOL_URL) if response.status_code == 200: print('get Proxy', response.text) return response.text except requests.ConnectionError: return None from urllib.parse import urlencode from requests import ReadTimeout, ConnectionError from pyquery import PyQuery as pq VALD_STATUES = [200] class Spider(): base_url = 'http://weixin.sogou.com/weixin?' # 這里的page可以修改,即第幾頁,我本來想獲取所有的個數再除以10 這樣就能爬完了,但是我只是測試所以這里并沒有做 # 但如果需要做可以加到schedule方法的while循環內的最下面 即self.params['page']+=1 params = {'type': 2, 's_from': 'input', 'query': keyword, 'page': 1, 'ie': 'utf8', '_sug_': 'n', '_sug_type_': ''} headers = {'Host': 'weixin.sogou.com', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Referer': 'http: // weixin.sogou.com /', 'Cookie': '你的cookie'} # TODO 不可能把我的給你撒 session = Session() queue = RedisQueue() queue.del_all() mysql = mysqlConn() def start(self): """ 初始化工作 :return: """ # 全局更新headers # 如果你試過用這個方法修改headers,那么就知道這個在這里好像沒什么用,我在這里浪費了至少兩個小時 self.session.headers.update(self.headers) start_url = self.base_url + urlencode(self.params) # 這里我將need_proxy=False設為了False 即并沒有使用代理 ps:我也就是測試一下 # 真正修改了headers是在這里 weixin_request = WeixinRequest(url=start_url, callback=self.parse_index, headers=self.headers, need_proxy=False) # 調度第一個請求 self.queue.add(weixin_request) def schedule(self): """ 調度請求 :return: """ while not self.queue.empty(): weixin_request = self.queue.pop() callback = weixin_request.callback print('Schedule', weixin_request.url) response = self.request(weixin_request) if response and response.status_code in VALD_STATUES: results = list(callback(response)) if results: for result in results: print('New Result', result) if isinstance(result, WeixinRequest): # 將新的文章詳情的url也加入隊列 self.queue.add(result) if isinstance(result, dict): # 儲存到mysql self.mysql.insert('articles', result) else: self.error(weixin_request) else: self.error(weixin_request) def request(self, weixin_request): """ 執行請求 :param weixin_request:請求 :return: 響應 """ if not 'http://mp.weixin.qq.com/s?src' in weixin_request.url: try: if weixin_request.need_proxy: proxy = self.queue.get_proxy() if proxy: proxies = { 'http': 'http://' + proxy, 'https': 'https://' + proxy } return self.session.send(weixin_request.prepare(), timeout=weixin_request.timeout, allow_redirects=False, proxies=proxies) return self.session.send(weixin_request.prepare(), timeout=weixin_request.timeout, allow_redirects=False) except (ConnectionError, ReadTimeout) as e: print(e.args) return False else: print('-' * 20) browser.get(weixin_request.url) try: browser.find_element_by_class_name('rich_media_area_primary_inner') wr = WeixinResponse(browser.page_source) wr.set_status_code(200) return wr except NoSuchElementException: wr = WeixinResponse('') wr.set_status_code(403) return wr def parse_index(self, response): """ 解析索引頁 :param response: 響應 :return: 新的響應 """ doc = pq(response.text) items = doc('.news-box .news-list li .txt-box h4 a').items() for item in items: url = item.attr('href') weixin_request = WeixinRequest(url=url, callback=self.parse_detail) yield weixin_request def parse_detail(self, response): """ 解析詳情頁 :param response: 響應 :return: 微信公眾號文章 """ doc = pq(response.text) profile_inner = doc('.profile_inner') data = { 'title': doc('.rich_media_title').text(), 'content': doc('.rich_media_content').text(), 'date': doc('#publish_time').text(), # 'nickname':doc('#js_profile_qrcode > div > strong').text(), 'nickname': profile_inner.find('.profile_nickname').text(), 'wechat': [ns for ns in profile_inner.find('.profile_meta').find('.profile_meta_value').items()][ 0].text() } # 儲存圖片 print('#' * 30) soup = bs(response.text) wn = soup.find_all('img') for img in wn: if img.has_attr('_width') and img.has_attr('data-src'): print(img.attrs['data-src']) yield data def error(self, weixin_request): """ 錯誤處理 :param weixin_request:請求 :return: """ weixin_request.fail_time = weixin_request.fail_time + 1 print('Request Failed', weixin_request.fail_time, 'Times', weixin_request.url) if weixin_request.fail_time < MAX_FAILED_TIME: self.queue.add(weixin_request) def run(self): self.start() self.schedule() if __name__ == '__main__': spider = Spider() spider.run()
2018-10-6更新:
今天測試之后使用了cookie并不能登錄這個網站了,也許是騰訊使用了新的安全驗證,具體也無從得知,但使用瀏覽器訪問沒有問題
Python是一種編程語言,內置了許多有效的工具,Python幾乎無所不能,該語言通俗易懂、容易入門、功能強大,在許多領域中都有廣泛的應用,例如最熱門的大數據分析,人工智能,Web開發等。
上述就是小編為大家分享的使用python怎么爬取微信公眾號文章了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。