在上一篇文章《神器!五分钟完成大型爬虫项目!》,我们介绍了一个类似于 scrapy 的开源爬虫框架——feapder,并着重介绍了该框架的一种应用——airspider,它是一个轻量级的爬虫。

接下来我们再来介绍另一种爬虫应用——spider,它是是一款基于 redis 的分布式爬虫,适用于海量数据采集,支持断点续爬、爬虫报警、数据自动入库等功能。

安装

和 airspider 一样,我们也是通过命令行安装。

由于 spider 是分布式爬虫,可能涉及到多个爬虫,所以最好以项目的方式来创建。

创建项目

我们首先来创建项目:

feapder create -p spider-project

创建的项目目录是这样的:

创建好项目后,开发时我们需要将项目设置为工作区间,否则引入非同级目录下的文件时,编译器会报错。

设置工作区间方式(以pycharm为例):项目->右键->mark directory as -> sources root。

创建爬虫

创建爬虫的命令行语句为:

feapder create -s <spider_name> <spider_type>

  • airspider 对应的 spider_type 值为 1
  • spider 对应的 spider_type 值为 2
  • batchspider 对应的 spider_type 值为 3

默认 spider_type 值为 1。

所以创建 spider 的语句为:

feapder create -s spider_test 2

运行语句后,我们可以看到在 spiders 目录下生成了 spider_test.py 文件。

对应的文件内容为:


import feapder


class spidertest(feapder.spider):
    # 自定义数据库,若项目中有setting.py文件,此自定义可删除
    __custom_setting__ = dict(
        redisdb_ip_ports="localhost:6379", redisdb_user_pass="", redisdb_db=0
    )

    def start_requests(self):
        yield feapder.request("https://www.baidu.com")

    def parse(self, request, response):
        print(response)


if __name__ == "__main__":
    spidertest(redis_key="xxx:xxx").start()

因spider是基于redis做的分布式,因此模板代码默认给了redis的配置方式。关于 redis 的配置信息:

  • redisdb_ip_ports:连接地址,若为集群或哨兵模式,多个连接地址用逗号分开,若为哨兵模式,需要加个redisdb_service_name参数。
  • redisdb_user_pass:连接密码。
  • redisdb_db:数据库。

在 main 函数中,我们可以看到有个 redis_key 的参数,这个参数是redis中存储任务等信息的 key 前缀,如 redis_key=”feapder:spider_test”, 则redis中会生成如下:

特性

我们在 airspider 里面讲的方法,在 spider 这里都支持,下面我们来看看 spider 相对于 airspider 的不同之处。

数据自动入库

写过爬虫的人都知道,如果要将数据持久化到 mysql 数据库,如果碰到字段特别多的情况,就会很烦人,需要解析之后手写好多字段,拼凑 sql 语句。

这个问题,spider 帮我们想到了,我们可以利用框架帮我们自动入库。

建表

第一步,我们需要在数据库中创建一张数据表,这个大家都会,这里就不说了。

配置 setting

将 setting.py 里面的数据库配置改为自己的配置:

# # mysql
mysql_ip = ""
mysql_port = 
mysql_db = ""
mysql_user_name = ""
mysql_user_pass = ""

也就是这几个配置。

生成实体类 item

接着,我们将命令行切换到我们项目的 items 目录,运行命令:

feapder create -i <item_name>

我这里数据库里使用的是 report 表,所以命令为:

feapder create -i report

然后,我们就可以在 items 目录下看到生成的 report_item.py 实体类了。我这里生成的实体类内容是:

from feapder import item


class reportitem(item):
    """
    this class was generated by feapder.
    command: feapder create -i report.
    """

    __table_name__ = "report"

    def __init__(self, *args, **kwargs):
        self.count = none
        self.emratingname = none  # 评级名称
        self.emratingvalue = none  # 评级代码
        self.encodeurl = none  # 链接
        # self.id = none
        self.indvinducode = none  # 行业代码
        self.indvinduname = none  # 行业名称
        self.lastemratingname = none  # 上次评级名称
        self.lastemratingvalue = none  # 上次评级代码
        self.orgcode = none  # 机构代码
        self.orgname = none  # 机构名称
        self.orgsname = none  # 机构简称
        self.predictnexttwoyeareps = none
        self.predictnexttwoyearpe = none
        self.predictnextyeareps = none
        self.predictnextyearpe = none
        self.predictthisyeareps = none
        self.predictthisyearpe = none
        self.publishdate = none  # 发表时间
        self.ratingchange = none  # 评级变动
        self.researcher = none  # 研究员
        self.stockcode = none  # 股票代码
        self.stockname = none  # 股票简称
        self.title = none  # 报告名称

若字段有默认值或者自增,则默认注释掉,可按需打开。大家可以看到我这张表的 id 字段在这里被注释了。

若item字段过多,不想逐一赋值,可通过如下方式创建:

feapder create -i report 1

这时候生成的实体类是这样的:

class reportitem(item):
    """
    this class was generated by feapder.
    command: feapder create -i report 1.
    """

    __table_name__ = "report 1"

    def __init__(self, *args, **kwargs):
        self.count = kwargs.get('count')
        self.emratingname = kwargs.get('emratingname')  # 评级名称
        self.emratingvalue = kwargs.get('emratingvalue')  # 评级代码
        self.encodeurl = kwargs.get('encodeurl')  # 链接
        # self.id = kwargs.get('id')
        self.indvinducode = kwargs.get('indvinducode')  # 行业代码
        self.indvinduname = kwargs.get('indvinduname')  # 行业名称
        self.lastemratingname = kwargs.get('lastemratingname')  # 上次评级名称
        self.lastemratingvalue = kwargs.get('lastemratingvalue')  # 上次评级代码
        self.orgcode = kwargs.get('orgcode')  # 机构代码
        self.orgname = kwargs.get('orgname')  # 机构名称
        self.orgsname = kwargs.get('orgsname')  # 机构简称
        self.predictnexttwoyeareps = kwargs.get('predictnexttwoyeareps')
        self.predictnexttwoyearpe = kwargs.get('predictnexttwoyearpe')
        self.predictnextyeareps = kwargs.get('predictnextyeareps')
        self.predictnextyearpe = kwargs.get('predictnextyearpe')
        self.predictthisyeareps = kwargs.get('predictthisyeareps')
        self.predictthisyearpe = kwargs.get('predictthisyearpe')
        self.publishdate = kwargs.get('publishdate')  # 发表时间
        self.ratingchange = kwargs.get('ratingchange')  # 评级变动
        self.researcher = kwargs.get('researcher')  # 研究员
        self.stockcode = kwargs.get('stockcode')  # 股票代码
        self.stockname = kwargs.get('stockname')  # 股票简称
        self.title = kwargs.get('title')  # 报告名称

这样当我们请求回来的json数据时,可直接赋值,如:

response_data = {"title":" 测试"} # 模拟请求回来的数据
item = spiderdataitem(**response_data)

想要数据自动入库也比较简单,在解析完数据之后,将数据赋值给 item,然后 yield 就行了:

def parse(self, request, response):
        html = response.content.decode("utf-8")
        if len(html):
            content = html.replace('datatable1351846(', '')[:-1]
            content_json = json.loads(content)
            print(content_json)

            for obj in content_json['data']:
                result = reportitem()
                result['orgname'] = obj['orgname'] #机构名称
                result['orgsname'] = obj['orgsname'] #机构简称
                result['publishdate'] = obj['publishdate'] #发布日期
                result['predictnexttwoyeareps'] = obj['predictnexttwoyeareps'] #后年每股盈利
                result['title'] = obj['title'] #报告名称
                result['stockname'] = obj['stockname'] #股票名称
                result['stockcode'] = obj['stockcode'] #股票code
                result['orgcode'] = obj['stockcode'] #机构code
                result['predictnexttwoyearpe'] = obj['predictnexttwoyearpe'] #后年市盈率
                result['predictnextyeareps'] = obj['predictnextyeareps'] # 明年每股盈利
                result['predictnextyearpe'] = obj['predictnextyearpe'] # 明年市盈率
                result['predictthisyeareps'] = obj['predictthisyeareps'] #今年每股盈利
                result['predictthisyearpe'] = obj['predictthisyearpe'] #今年市盈率
                result['indvinducode'] = obj['indvinducode'] # 行业代码
                result['indvinduname'] = obj['indvinduname'] # 行业名称
                result['lastemratingname'] = obj['lastemratingname'] # 上次评级名称
                result['lastemratingvalue'] = obj['lastemratingvalue'] # 上次评级代码
                result['emratingvalue'] = obj['emratingvalue'] # 评级代码
                result['emratingname'] = obj['emratingname'] # 评级名称
                result['ratingchange'] = obj['ratingchange'] # 评级变动
                result['researcher'] = obj['researcher'] # 研究员
                result['encodeurl'] = obj['encodeurl'] # 链接
                result['count'] = int(obj['count']) # 近一月个股研报数
                yield result

返回item后,item 会流进到框架的 itembuffer, itembuffer 每.05秒或当item数量积攒到5000个,便会批量将这些 item 批量入库。表名为类名去掉 item 的小写,如 reportitem 数据会落入到 report 表。

调试

开发过程中,我们可能需要针对某个请求进行调试,常规的做法是修改下发任务的代码。但这样并不好,改来改去可能把之前写好的逻辑搞乱了,或者忘记改回来直接发布了,又或者调试的数据入库了,污染了库里已有的数据,造成了很多本来不应该发生的问题。

本框架支持debug爬虫,可针对某条任务进行调试,写法如下:

if __name__ == "__main__":
    spider = spidertest.to_debugspider(
        redis_key="feapder:spider_test", request=feapder.request("http://www.baidu.com")
    )
    spider.start()

对比之前的启动方式:

spider = spidertest(redis_key="feapder:spider_test")
spider.start()

可以看到,代码中 to_debugspider 方法可以将原爬虫直接转为 debug 爬虫,然后通过传递 request 参数抓取指定的任务。

通常结合断点来进行调试,debug 模式下,运行产生的数据默认不入库。

除了指定 request 参数外,还可以指定 request_dict 参数,request_dict 接收字典类型,如 request_dict={“url”:”http://www.baidu.com”}, 其作用于传递 request 一致。request 与 request_dict 二者选一传递即可。

运行多个爬虫

通常,一个项目下可能存在多个爬虫,为了规范,建议启动入口统一放到项目下的 main.py 中,然后以命令行的方式运行指定的文件。

例如如下项目:

项目中包含了两个spider,main.py写法如下:

from spiders import *
from feapder import request
from feapder import argumentparser


def test_spider():
    spider = test_spider.testspider(redis_key="spider:report")
    spider.start()


def test_spider2():
    spider = test_spider.testspider2(redis_key="spider:report")
    spider.start()


if __name__ == "__main__":
    parser = argumentparser(description="spider测试")

    parser.add_argument(
        "--test_spider", action="store_true", help="测试spider", function=test_spider
    )
    parser.add_argument(
        "--test_spider2", action="store_true", help="测试spider2", function=test_spider2
    )
   
    parser.start()

这里使用了 argumentparser 模块,使其支持命令行参数,如运行 test_spider:

python3 main.py –test_spider

分布式

分布式说白了就是启动多个进程,处理同一批任务。spider 支持启动多份,且不会重复发下任务,我们可以在多个服务器上部署启动,也可以在同一个机器上启动多次。

总结

到这里, spider 分布式爬虫咱就讲完了,还有一些细节的东西,大家在用的时候还需要琢磨一下。总体来说,这个框架还是比较好用的,上手简单,应对一些不是很复杂的场景绰绰有余,大家可以尝试着将自己的爬虫重构一下,试试这款框架