使用Python爬取公号文章(上)

阅读文本大概须要 10 分钟。数据库


01 抓取目标

场景:有时候咱们想爬取某个大 V 的发布的所有的文章进行学习或者分析。json

这个爬虫任务咱们须要借助「 Charles 」这个抓包工具,设置好手机代理 IP 去请求某个页面,经过分析,模拟请求,获取到实际的数据。bash

咱们要爬取文章的做者、文章标题、封面图、推送时间、文件内容、阅读量、点赞数、评论数、文章实际连接等数据,最后要把数据存储到「 MongoDB 」数据库中。微信


02 准备工做

首先,在 PC 上下载 Charles,并获取本地的 IP 地址。cookie


而后,手机连上同一个网段,并手动设置代理 IP,端口号默认填 8888 。最后配置 PC 和手机上的证书及 SSL Proxying,保证能顺利地抓到 HTTPS 的请求。具体的方法能够参考下面的文章。session


「https://www.jianshu.com/p/595e8b556a60?from=timeline&isappinstalled=0 」app



03 爬取思路

首先咱们选中一个微信公众号,依次点击右上角的头像、历史消息,就能够进入到所有消息的主界面。默认展现的是前 10 天历史消息。框架

而后能够查看 Charles 抓取的请求数据,能够经过「 mp.weixin.qq.com 」去过滤请求,获取到消息首页发送的请求及请求方式及响应内容。ide

继续往下滚动页面,能够加载到下一页的数据,一样能够获取到请求和响应的数据。工具


爬取的数据最后要保存在 MongoDB 文档型数据库中,因此不须要创建数据模型,只须要安装软件和开启服务就能够了。MongoDB 的使用教程能够参考下面的连接:


「 https://www.jianshu.com/p/4c5deb1b7e7c 」



为了操做 MongoDB 数据库,这里使用「 MongoEngine 」这个相似于关系型数据库中的 ORM 框架来方便咱们处理数据。

pip3 install mongoengine
复制代码


04 代码实现

从上面的分析中能够知道首页消息、更多页面消息的请求 URL 规律以下:

# 因为微信屏蔽的关键字, 字段 netloc + path 用 ** 代替
# 首页请求url
https://**?action=home&__biz=MzIxNzYxMTU0OQ==&scene=126&bizpsid=0&sessionid=1545633855&subscene=0&devicetype=iOS12.1.2&version=17000027&lang=zh_CN&nettype=WIFI&a8scene=0&fontScale=100&pass_ticket=U30O32QRMK6dba2iJ3ls6A3PRbrhksX%2B7D8pF3%2Bu3uXSKvSAa1hnHzfsSClawjKg&wx_header=1

# 第二页请求url
https://**?action=getmsg&__biz=MzIxNzYxMTU0OQ==&f=json&offset=10&count=10&is_ok=1&scene=126&uin=777&key=777&pass_ticket=U30O32QRMK6dba2iJ3ls6A3PRbrhksX%2B7D8pF3%2Bu3uXSKvSAa1hnHzfsSClawjKg&wxtoken=&appmsg_token=988_rETfljlGIZqE%252F6MobN1rEtqBx5Ai9wBDbbH_sw~~&x5=0&f=json

# 第三页请求url
https://**?action=getmsg&__biz=MzIxNzYxMTU0OQ==&f=json&offset=21&count=10&is_ok=1&scene=126&uin=777&key=777&pass_ticket=U30O32QRMK6dba2iJ3ls6A3PRbrhksX%2B7D8pF3%2Bu3uXSKvSAa1hnHzfsSClawjKg&wxtoken=&appmsg_token=988_rETfljlGIZqE%252F6MobN1rEtqBx5Ai9wBDbbH_sw~~&x5=0&f=json
复制代码

能够经过把 offset 设置为可变数据,请求全部页面的数据 URL能够写成下面的方式:

https://**?action=getmsg&__biz=MzIxNzYxMTU0OQ==&f=json&offset={}&count=10&is_ok=1&scene=126&uin=777&key=777&pass_ticket=U30O32QRMK6dba2iJ3ls6A3PRbrhksX%2B7D8pF3%2Bu3uXSKvSAa1hnHzfsSClawjKg&wxtoken=&appmsg_token=988_rETfljlGIZqE%252F6MobN1rEtqBx5Ai9wBDbbH_sw~~&x5=0&f=json
复制代码

另外,经过 Charles 获取到请求头。因为微信的反爬机制,这里的 Cookie 和 Referer 有必定的时效性,须要定时更换。

self.headers = {
            'Host': 'mp.weixin.qq.com',
            'Connection': 'keep-alive',
            'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 12_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/16B92 MicroMessenger/6.7.4(0x1607042c) NetType/WIFI Language/zh_CN',
            'Accept-Language': 'zh-cn',
            'X-Requested-With': 'XMLHttpRequest',
            'Cookie': 'devicetype=iOS12.1; lang=zh_CN; pass_ticket=fXbGiNdtFY050x9wsyhMnmaSyaGbSIXNzubjPBqiD+c8P/2GyKpUSimrtIKQJsQt; version=16070430; wap_sid2=CMOw8aYBElx2TWQtOGJfNkp3dmZHb3dyRnpRajZsVlVGX0pQem4ycWZSNzNFRmY3Vk9zaXZUM0Y5b0ZpbThVeWgzWER6Z0RBbmxqVGFiQ01ndFJyN01LNU9PREs3OXNEQUFBfjC409ngBTgNQJVO; wxuin=349984835; wxtokenkey=777; rewardsn=; pac_uid=0_f82bd5abff9aa; pgv_pvid=2237276040; tvfe_boss_uuid=05faefd1e90836f4',
            'Accept': '*/*',
            'Referer': 'https://**?action=home&__biz=MzIxNzYxMTU0OQ==&scene=126&sessionid=1544890100&subscene=0&devicetype=iOS12.1&version=16070430&lang=zh_CN&nettype=WIFI&a8scene=0&fontScale=100&pass_ticket=pg%2B0C5hdqENXGO6Fq1rED9Ypx20C2vuodaL8DCwZwVe22sv9OtWgeL5YLjUujPOR&wx_header=1'
        }
复制代码

最后经过 requests 去模拟发送请求。

response = requests.get(current_request_url, headers=self.headers, verify=False)
result = response.json()
复制代码

经过 Charles 返回的数据格式能够得知消息列表的数据存储在 general_msg_list 这个 Key 下面。所以能够须要拿到数据后进行解析操做。


has_next_page 字段能够判断是否存在下一页的数据;若是有下一页的数据,能够继续爬取,不然终止爬虫程序。

ps:因为 Wx 反爬作的很完善,因此尽可能下降爬取的速度。


response = requests.get(current_request_url, headers=self.headers, verify=False)
        result = response.json()

        if result.get("ret") == 0:
            msg_list = result.get('general_msg_list')

            # 保存数据
            self._save(msg_list)
            self.logger.info("获取到一页数据成功, data=%s" % (msg_list))

            # 获取下一页数据
            has_next_page = result.get('can_msg_continue')
            if has_next_page == 1:
                # 继续爬取写一页的数据【经过next_offset】
                next_offset = result.get('next_offset')

                # 休眠2秒,继续爬下一页
                time.sleep(2)
                self.spider_more(next_offset)
            else:  # 当 has_next 为 0 时,说明已经到了最后一页,这时才算爬完了一个公众号的全部历史文章
                print('爬取公号完成!')
        else:
            self.logger.info('没法获取到更多内容,请更新cookie或其余请求头信息')
复制代码

因为获取到的列表数据是一个字符串,须要经过 json 库去解析,获取有用的数据。

def _save(self, msg_list):
        """ 数据解析 :param msg_list: :return: """
        # 1.去掉多余的斜线,使【连接地址】可用
        msg_list = msg_list.replace("\/", "/")
        data = json.loads(msg_list)

        # 2.获取列表数据
        msg_list = data.get("list")
        for msg in msg_list:
            # 3.发布时间
            p_date = msg.get('comm_msg_info').get('datetime')

            # 注意:非图文消息没有此字段
            msg_info = msg.get("app_msg_ext_info")

            if msg_info:  # 图文消息
                # 若是是多图文推送,把第二条第三条也保存
                multi_msg_info = msg_info.get("multi_app_msg_item_list")

                # 若是是多图文,就从multi_msg_info中获取数据插入;反之直接从app_msg_ext_info中插入
                if multi_msg_info:
                    for multi_msg_item in multi_msg_info:
                        self._insert(multi_msg_item, p_date)
                else:
                    self._insert(msg_info, p_date)
            else:
                # 非图文消息
                # 转换为字符串再打印出来
                self.logger.warning(u"此消息不是图文推送,data=%s" % json.dumps(msg.get("comm_msg_info")))
复制代码

最后一步是将数据保存保存到 MongoDB 数据库中。

首先要建立一个 Model 保存咱们须要的数据。

from datetime import datetime

from mongoengine import connect
from mongoengine import DateTimeField
from mongoengine import Document
from mongoengine import IntField
from mongoengine import StringField
from mongoengine import URLField

__author__ = 'xag'

# 权限链接数据库【数据库设置了权限,这里必须指定用户名和密码】
response = connect('admin', host='localhost', port=27017,username='root', password='xag')

class Post(Document):
    """ 文章【模型】 """
    title = StringField()  # 标题
    content_url = StringField()  # 文章连接
    source_url = StringField()  # 原文连接
    digest = StringField()  # 文章摘要
    cover = URLField(validation=None)  # 封面图
    p_date = DateTimeField()  # 推送时间
    author = StringField()  # 做者

    content = StringField()  # 文章内容

    read_num = IntField(default=0)  # 阅读量
    like_num = IntField(default=0)  # 点赞数
    comment_num = IntField(default=0)  # 评论数
    reward_num = IntField(default=0)  # 点赞数

    c_date = DateTimeField(default=datetime.now)  # 数据生成时间
    u_date = DateTimeField(default=datetime.now)  # 数据最后更新时间
复制代码

使用命令行开启数据库服务,而后就能够往数据库写入数据了。


def _insert(self, msg_info, p_date):
        """ 数据插入到 MongoDB 数据库中 :param msg_info: :param p_date: :return: """
        keys = ['title', 'author', 'content_url', 'digest', 'cover', 'source_url']

        # 获取有用的数据,构建数据模型
        data = sub_dict(msg_info, keys)
        post = Post(**data)

        # 时间格式化
        date_pretty = datetime.fromtimestamp(p_date)
        post["p_date"] = date_pretty

        self.logger.info('save data %s ' % post.title)

        # 保存数据
        try:
            post.save()
        except Exception as e:
            self.logger.error("保存失败 data=%s" % post.to_json(), exc_info=True)
复制代码

05 爬取结果

推荐使用工具 Robo3T 链接 MongoDB 数据库,能够查看到公号文章数据已经所有保存到数据库中。

本文首发于公众号「 AirPython 」,后台回复「公号1」便可获取完整代码。

相关文章
相关标签/搜索