Scrapy 项目笔记

2020 年 06 月 04 日 • 阅读数: 145

Scrapy项目笔记

初始化项目

创建项目

创建一个名为Scrapy的文件夹,切换到该文件夹中新建一个虚拟环境

pipenv install

切换到新建的虚拟环境中(这里我的虚拟环境名称为Scrapy-AmnJqBH4)

workon Scrapy-AmnJqBH4

安装依赖

pipenv install scrapy

新建一个Scrapy的项目(这里命名为ArticleSpider)

scrapy startproject ArticleSpider

创建Spider

切换到项目目录下,新建一个Spider

scrapy genspider jobbole blog.jobbole.com

在项目根目录下新建 main.py 文件用于项目的调试

import os
import sys
from scrapy.cmdline import execute

# 将项目目录,加入到python环境中
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 运行spider
execute(['scrapy', 'crawl', 'jobbole'])

settings 中的 ROBOTSTXT_OBEY 设为 False

爬取伯乐在线文章

Spider的实现

Scrapy会先获取到 start_urls 中的地址,然后调用默认的回调函数 parse

parse 中解析出文章详情的 url 以及封面图片的 url ,用文章详情的 url 构建新的 Request 并调用 parse_detail 回调函数,将封面图片的 url 作为参数,传递到回调函数中

并且解析出下一页的 url 用来进一步爬取,下一页的 url 依然调用 parse 回调函数

parse_detail 中,我们使用了 ItemLoader 解析出字段直接传递给对应的 item

import scrapy
from scrapy import Request
from urllib import parse

from ..items import JobboleArticleItem, ArticleItemLoader
from ..utils.common import get_md5


class JobboleSpider(scrapy.Spider):
    name = 'jobbole'
    allowed_domains = ['blog.jobbole.com']
    start_urls = ['http://blog.jobbole.com/all-posts/']

    def parse(self, response):

        post_nodes = response.css('#archive .floated-thumb .post-thumb a')
        for post_node in post_nodes:
            image_url = post_node.css('img::attr(src)').extract_first()
            post_url = post_node.css('::attr(href)').extract_first()
            yield Request(url=parse.urljoin(response.url, post_url),
                          callback=self.parse_detail,
                          meta={'front_image_url': image_url})

        next_url = response.css('.next.page-numbers::attr(href)').extract_first()
        if next_url:
            yield Request(url=parse.urljoin(response.url, next_url), callback=self.parse)

    def parse_detail(self, response):

        item_loader = ArticleItemLoader(JobboleArticleItem(), response=response)
        
        item_loader.add_css('title', '.entry-header h1::text')
        item_loader.add_css('create_date', 'p.entry-meta-hide-on-mobile::text')
        item_loader.add_css('tags', 'p.entry-meta-hide-on-mobile a::text')
        item_loader.add_css('content', 'div.entry')
        item_loader.add_css('praise_nums', '.vote-post-up h10::text')
        item_loader.add_css('fav_nums', '.bookmark-btn::text')
        item_loader.add_css('comment_nums', 'a[href="#article-comment"] span::text')
        item_loader.add_value('url', response.url)
        item_loader.add_value('url_object_id', get_md5(response.url))
        item_loader.add_value('front_image_url', response.meta.get('front_image_url',
                                                                   None))

        article_item = item_loader.load_item()

        yield article_item

其中 url_object_id 的生成

import hashlib


def get_md5(url):
    if isinstance(url, str):
        url = url.encode('utf8')
    m = hashlib.md5()
    m.update(url)
    return m.hexdigest()

Item的实现

需要注意的是,我们自己实现了一个 ArticleItemLoader 它继承自 ItemLoader

  • date_convert :格式化时间的函数
  • get_nums :提取数字的函数
  • remove_comment_tags :移除 评论
import re

import scrapy
import datetime
from scrapy.loader import ItemLoader
from scrapy.loader.processors import MapCompose, TakeFirst, Join


def date_convert(value):
    try:
        value = value.strip().replace(' ·', '')
        value = datetime.datetime.strptime(value, '%Y/%m/%d')
    except Exception as e:
        print(e)
        value = datetime.datetime.now().date()
    return value


def get_nums(value):
    match_re = re.match(".*?(\d+).*", value)
    if match_re:
        nums = int(match_re.group(1))
    else:
        nums = 0

    return nums

class ArticleItemLoader(ItemLoader):

    default_output_processor = TakeFirst()


def remove_comment_tags(value):
    if value.endswith('评论'):
        pass
    else:
        return value


class JobboleArticleItem(scrapy.Item):
    title = scrapy.Field()
    tags = scrapy.Field(input_processor=MapCompose(remove_comment_tags),
                        output_processor=Join(','))
    url = scrapy.Field()
    url_object_id = scrapy.Field()
    create_date = scrapy.Field(input_processor=MapCompose(date_convert))
    content = scrapy.Field()
    comment_nums = scrapy.Field(input_processor=MapCompose(get_nums))
    praise_nums = scrapy.Field(input_processor=MapCompose(get_nums))
    fav_nums = scrapy.Field(input_processor=MapCompose(get_nums))
    front_image_url = scrapy.Field(output_processor=MapCompose(lambda x: [x]))
    front_image_path = scrapy.Field()

Model的实现

这里我们使用了一个第三方的库 peewee 这是一个轻量级的ORM框架,需要自己安装一下

使用方法如下,定义一个数据库连接 db

定义一个类,继承 Model ,定义字段的方式类似Django的Models

Meta 类中,需要指定数据库和数据表名称

from peewee import *


db = MySQLDatabase('scrapy_project', host='127.0.0.1', port=3306,
                   user='...', passwd='...')


class JobboleArticleModel(Model):
    title = CharField(max_length=255)
    tags = CharField(max_length=255)
    url = CharField(max_length=255)
    url_object_id = CharField(max_length=255)
    create_date = DateTimeField()
    content = TextField()
    comment_nums = IntegerField()
    praise_nums = IntegerField()
    fav_nums = IntegerField()
    front_image_url = CharField(max_length=255)
    front_image_path = CharField(max_length=255)

    class Meta:
        database = db
        db_table = 'jobbole_article'

Pipeline的实现

这里我们实现了两个Pipeline,一个用来保存数据库,一个用来保存图片

MysqlPipeline 中通过Model类,也就是 JobboleArticleModel 创建并保存了一个实例

ArticleImagePipeline 中,我们继承了 ImagesPipeline ,这是Scrapy内置的一个Pipeline,我们重写了它的 item_completed() 方法,在这里面我们获取到图片的实际保存路径,并把它赋给了 itemfront_image_path 字段,所以这个Pipeline必须在保存数据库之前执行

from scrapy.pipelines.images import ImagesPipeline

from .models import JobboleArticleModel


class MysqlPipeline(object):

    def process_item(self, item, spider):
        if JobboleArticleModel.table_exists() is False:
            JobboleArticleModel.create_table()
        try:
            JobboleArticleModel.create(title=item['title'],
                                       tags=item['tags'],
                                       url=item['url'],
                                       url_object_id=item['url_object_id'],
                                       create_date=item['create_date'],
                                       content=item['content'],
                                       comment_nums=item['comment_nums'],
                                       praise_nums=item['praise_nums'],
                                       fav_nums=item['fav_nums'],
                                       front_image_url=item['front_image_url'],
                                       front_image_path=item['front_image_path'])
        except Exception as e:
            print(e)


class ArticleImagePipeline(ImagesPipeline):
    def item_completed(self, results, item, info):
        for res, value in results:
            image_file_path = value['path']
            item['front_image_path'] = image_file_path

        return item

Pipeline的配置

可以看到 ArticleImagePipeline 必须在 MysqlPipeline 之前

ITEM_PIPELINES = {
    'ArticleSpider.pipelines.MysqlPipeline': 2,
    'ArticleSpider.pipelines.ArticleImagePipeline': 1,
}

其中 ArticleImagePipeline 还需要一些额外的配置

IMAGES_URLS_FIELD = 'front_image_url'
project_dir = os.path.abspath(os.path.dirname(__file__))
IMAGES_STORE = os.path.join(project_dir, 'images')

爬取知乎的问答

Spider的实现

在爬虫的入口函数 start_requests() 中,我们先调用了 login() 函数,这一步的目的是为了获取Cookie

同时在配置文件中,我么需要声明使用Cookie,修改 COOKIES_ENABLED = True

class ZhihuSpider(scrapy.Spider):
    name = 'zhihu'
    allowed_domains = ['www.zhihu.com']
    start_urls = ['https://www.zhihu.com/']

    start_answer_url ='https://www.zhihu.com/api/v4/questions/{0}/...'

    headers = {
        "HOST": "www.zhihu.com",
        "Referer": "https://www.zhizhu.com",
        'User-Agent': "Mozilla / 5.0(Windows NT 10.0;Win64;x64;rv: 61.0)..."
    }

    def start_requests(self):
        cookie_dict = self.login(True)
        return [Request(url=self.start_urls[0], dont_filter=True,
                        cookies=cookie_dict, headers=self.headers)]

login() 函数中获取到用户Cookie,这里我们使用模拟登录来获取到用户的Cookie(目前已经失效),所以这里的Cookie我是直接在浏览器中获取的,然后写入文件中,直接从文件中读取的数据

    @staticmethod
    def login(cookie):
        if cookie:
            cookie_dict = {'_xsrf': '', '_zap': '', 'capsion_ticket': '',
                           'd_c0': '', 'q_c1': '', 'tgw_l7_route': '', 'z_c0': ''}
            for cookie in cookie_dict:
                f = open('ArticleSpider/cookies/zhihu/' + cookie + '.zhihu', 'r')
                cookie_dict[cookie] = f.read()
            return cookie_dict
        else:
            browser = webdriver.Chrome(executable_path='...chromedriver.exe')
            browser.get('https://www.zhihu.com/signin')

            browser.find_element_by_css_selector(
                '.SignFlow input[name="username"]').send_keys('...')
            browser.find_element_by_css_selector(
                '.SignFlow input[name="password"]').send_keys('...')

            time.sleep(10)
            cookies = browser.get_cookies()
            cookie_dict = {}
            for cookie in cookies:
                f = open('ArticleSpider/cookies/zhihu/' + cookie['name'] + '.zhihu',
                         'w')
                f.write(cookie['value'])
                f.close()
                cookie_dict[cookie['name']] = cookie['value']
            browser.close()
            return cookie_dict

默认的解析函数

我们在 start_requests() 中获取到Cookie后,直接请求的知乎首页,然后解析首页信息,将提取出首页的所有URL,然后对URL进行过滤,逻辑如下

    def parse(self, response):
        """
        提取出页面中的所有URL,并跟踪这些URL进行下一步爬取
        如果提取的URL是问题的URL,下载之后直接进入解析函数
        """
        all_urls = response.css('a::attr(href)').extract()
        all_urls = [parse.urljoin(response.url, url) for url in all_urls]
        all_urls = filter(lambda x: True if x.startswith('https') else False, all_urls)
        for url in all_urls:
            match_obj = re.match('(.*zhihu.com/question/(\d+))(/|$).*', url)
            if match_obj:
                request_url = match_obj.group(1)
                question_id = match_obj.group(2)

                yield Request(request_url, headers=self.headers,
                              meta={'question_id': question_id},
                              callback=self.parse_question)
            yield Request(url, headers=self.headers, callback=self.parse)

问题详情的解析

我们依然使用 ItemLoader 的方式来提取数据

    def parse_question(self, response):
        item_loader = ZhihuItemLoader(item=ZhihuQuestionItem(), response=response)
        question_id = response.meta.get('question_id', None)

        item_loader.add_css('title', 'h1.QuestionHeader-title::text')
        item_loader.add_css('content', 'div.QuestionHeader-detail')
        item_loader.add_css('answer_num', 'h4.List-headerText span::text')
        item_loader.add_css('comments_num', 'div.QuestionHeader-Comment button::text')
        item_loader.add_css('watch_user_num', '.NumberBoard-itemValue::text')
        item_loader.add_css('topics', 'div.QuestionHeader-topics .Popover div::text')
        item_loader.add_value('url', response.url)
        item_loader.add_value('zhihu_id', int(question_id))
        item_loader.add_value('crawl_time', datetime.datetime.now())

        question_item = item_loader.load_item()

        yield Request(self.start_answer_url.format(question_id),
                      headers=self.headers, callback=self.parse_answer)

        yield question_item

回答详情的解析

回答的详情我们可以直接获取到对应的Json数据,所以解析起来相对简单很多

    def parse_answer(self, response):
        ans_json = json.loads(response.text)
        is_end = ans_json['paging']['is_end']
        next_url = ans_json['paging']['next']
        print(ans_json['paging']['totals'])

        for answer in ans_json['data']:
            answer_item = ZhihuAnswerItem()

            answer_item['zhihu_id'] = answer['id']
            answer_item['url'] = answer['url']
            answer_item['question_id'] = answer['question']['id']
            answer_item['author_id'] = answer['author']['id']
            answer_item['content'] = answer['content']
            answer_item['praise_num'] = answer['voteup_count']
            answer_item['comments_num'] = answer['comment_count']
            answer_item['create_time'] = answer['created_time']
            answer_item['update_time'] = answer['updated_time']
            answer_item['crawl_time'] = datetime.datetime.now()

            yield answer_item

        if not is_end:
            yield Request(next_url, headers=self.headers, callback=self.parse_answer)

Item的实现

问题Item的实现

需要注意的是,这里实现了一个 insert_db() 方法,该方法建议在所有的 Item 中都实现,然后在Pipeline中,直接调用实例的 inster_db() 方法,就可以完成数据库的写入,而不需要关心具体的是那一个 Item ,所以前面的 JobboleArticleModel 也需要做相应的修改

class ZhihuQuestionItem(scrapy.Item):
    zhihu_id = scrapy.Field()
    topics = scrapy.Field(output_processor=Join(','))
    url = scrapy.Field()
    title = scrapy.Field()
    content = scrapy.Field()
    answer_num = scrapy.Field(input_processor=MapCompose(answer_num))
    comments_num = scrapy.Field(
        input_processor=MapCompose(lambda x: [x.split(' ')], get_nums))
    watch_user_num = scrapy.Field(
        input_processor=MapCompose(lambda x: int(x.replace(',', ''))),
        output_processor=Identity())
    click_num = scrapy.Field()
    crawl_time = scrapy.Field()

    def insert_db(self):

        ex_question = ZhihuQuestionModel.filter(zhihu_id=self['zhihu_id'])
        if ex_question:
            question = ex_question[0]
            question.content = self['content']
            question.answer_num = self['answer_num']
            question.comments_num = self['comments_num']
            question.watch_user_num = self['watch_user_num'][0]
            question.click_num = self['watch_user_num'][1]
            question.crawl_update_time = datetime.datetime.now()
            try:
                question.save()
            except Exception as e:
                print(e)
        else:
            try:
                ZhihuQuestionModel.create(zhihu_id=self['zhihu_id'],
                                          topics=self['topics'], url=self['url'],
                                          title=self['title'], content=self['content'],
                                          answer_num=self['answer_num'],
                                          comments_num=self['comments_num'],
                                          watch_user_num=self['watch_user_num'][0],
                                          click_num=self['watch_user_num'][1],
                                          crawl_time=self['crawl_time'])
            except Exception as e:
                print(e)

回答Item的实现

class ZhihuAnswerItem(scrapy.Item):
    zhihu_id = scrapy.Field()
    url = scrapy.Field()
    question_id = scrapy.Field()
    author_id = scrapy.Field()
    content = scrapy.Field()
    praise_num = scrapy.Field()
    comments_num = scrapy.Field()
    create_time = scrapy.Field()
    update_time = scrapy.Field()
    crawl_time = scrapy.Field()
    crawl_update_time = scrapy.Field()

    def insert_db(self):

        ex_answer = ZhihuAnswerModel.filter(zhihu_id=self['zhihu_id'])
        if ex_answer:
            answer = ex_answer[0]
            answer.praise_num = self['praise_num']
            answer.content = self['content']
            answer.comments_num = self['comments_num']
            answer.crawl_update_time = datetime.datetime.now()
            answer.save()
        else:
            try:
                create_time = datetime.datetime.fromtimestamp(self['create_time'])
                update_time = datetime.datetime.fromtimestamp(self['update_time'])
                question_id = int(self['question_id'])
                ZhihuAnswerModel.create(zhihu_id=self['zhihu_id'],
                                        question_id=question_id, url=self['url'],
                                        author_id=self['author_id'],
                                        praise_num=self['praise_num'],
                                        content=self['content'],
                                        comments_num=self['comments_num'],
                                        create_time=create_time,
                                        update_time=update_time,
                                        crawl_time=self['crawl_time'])
            except Exception as e:
                print(e)

Model的实现

class ZhihuQuestionModel(Model):
    zhihu_id = IntegerField(primary_key=True)
    topics = CharField(max_length=255, null=True, verbose_name='主题')
    url = CharField(max_length=300, verbose_name='URL地址')
    title = CharField(max_length=200, verbose_name='标题')
    content = TextField(verbose_name='内容')
    create_time = DateTimeField(null=True, verbose_name='创建时间')
    update_time = DateTimeField(null=True, verbose_name='修改时间')
    answer_num = IntegerField(default=0, verbose_name='回答数')
    comments_num = IntegerField(default=0, verbose_name='评论数')
    watch_user_num = IntegerField(default=0, verbose_name='关注数')
    click_num = IntegerField(default=0, verbose_name='点击数')
    crawl_time = DateTimeField(verbose_name='爬取时间')
    crawl_update_time = DateTimeField(null=True, verbose_name='更新时间')

    class Meta:
        database = db
        db_table = 'zhihu_question'


class ZhihuAnswerModel(Model):
    zhihu_id = IntegerField(primary_key=True)
    url = CharField(max_length=300, verbose_name='URL地址')
    question_id = IntegerField()
    author_id = CharField(null=True, max_length=100, verbose_name='用户ID')
    content = TextField(verbose_name='内容')
    praise_num = IntegerField(default=0, verbose_name='回答数')
    comments_num = IntegerField(default=0, verbose_name='评论数')
    create_time = DateTimeField(null=True, verbose_name='创建时间')
    update_time = DateTimeField(null=True, verbose_name='修改时间')
    crawl_time = DateTimeField(verbose_name='爬取时间')
    crawl_update_time = DateTimeField(null=True, verbose_name='更新时间')

    class Meta:
        database = db
        db_table = 'zhihu_answer'

Pipeline的实现

前面我们已经在Item类中实现了插入数据库的操作,这里只需要调用Item的 insert_db() 方法,因为JobboleArticleModel 需要额外的参数,所以这里做了一个判断逻辑

class MysqlPipeline(object):

    def process_item(self, item, spider):
        if item.get('front_image_path', None):
            item.insert_db(path=item['front_image_path'])
        else:
            item.insert_db()

爬取拉勾网的职位

Spider的实现

我们使用了 CrawlSpider 来对拉勾网进行一个深度的爬取

import datetime

import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from items import LagouJobItemLoader, LagouJobItem
from utils.common import get_md5


class LagouSpider(CrawlSpider):
    name = 'lagou'
    allowed_domains = ['www.lagou.com']
    start_urls = ['https://www.lagou.com/']

    custom_settings = {
        "COOKIES_ENABLED": False,
        "DOWNLOAD_DELAY": 1,
        'DEFAULT_REQUEST_HEADERS': {
            'Accept': 'application/json, text/javascript, */*; q=0.01',
            'Accept-Encoding': 'gzip, deflate, br',
            'Accept-Language': 'zh-CN,zh;q=0.8',
            'Connection': 'keep-alive',
            'Cookie': '...',
            'Host': 'www.lagou.com',
            'Origin': 'https://www.lagou.com',
            'Referer': 'https://www.lagou.com/',
            'User-Agent': '...',
        }
    }

    def start_requests(self):
        return [scrapy.Request(url=self.start_urls[0], dont_filter=True)]

    rules = (
        Rule(LinkExtractor(allow=r'zhaopin/.*'), follow=True),
        Rule(LinkExtractor(allow=r'gongsi/j\d+.html'), follow=True),
        Rule(LinkExtractor(allow=r'jobs/\d+.html'), callback='parse_job', follow=True),
    )

    def parse_job(self, response):
        """
        解析拉勾网的职位
        """
        salary = response.css('.job_request .salary::text').extract_first().split('-')
        if not salary:
            salary = [0, 0]
        work_years = response.xpath(
            '//*[@class="job_request"]/p[1]/span[3]/text()').extract_first().split('-')
        if len(work_years) != 2:
            if work_years[0] == '经验1年以下 /':
                work_years = ['0', '1']
            elif work_years[0] == '经验10年以上 /':
                work_years = ['10', '10']
            else:
                work_years = ['0', '0']

        item_loader = LagouJobItemLoader(item=LagouJobItem(), response=response)

        item_loader.add_css('title', '.job-name::attr(title)')
        item_loader.add_css('publish_time', '.publish_time::text')
        item_loader.add_css('tags', '.position-label li::text')
        item_loader.add_css('job_advantage', '.job-advantage p::text')
        item_loader.add_css('job_desc', '.job_bt')
        item_loader.add_css('job_address', '.work_addr')
        item_loader.add_css('company_name', '.job_company a img::attr(alt)')

        item_loader.add_xpath(
            'job_city', '//*[@class="job_request"]/p[1]/span[2]/text()')
        item_loader.add_xpath(
            'degree_need', '//*[@class="job_request"]/p[1]/span[4]/text()')
        item_loader.add_xpath(
            'job_type', '//*[@class="job_request"]/p[1]/span[5]/text()')
        item_loader.add_xpath(
            'company_url', '//*[@class="c_feature"]/li[4]/a/text()')

        item_loader.add_value('url', response.url)
        item_loader.add_value('url_object_id', get_md5(response.url))
        item_loader.add_value('salary_min', salary[0])
        item_loader.add_value('salary_max', salary[1])
        item_loader.add_value('work_years_min', work_years[0])
        item_loader.add_value('work_years_max', work_years[1])
        item_loader.add_value('crawl_time', datetime.datetime.now())

        job_item = item_loader.load_item()

        return job_item

Item的实现

这里使用了 w3lib.html 下的 remove_tags 来去除字符串中的标签

from w3lib.html import remove_tags


class LagouJobItemLoader(ItemLoader):

    default_output_processor = TakeFirst()


def handle_jobaddr(value):
    addr_list = value.split('\n')
    addr_list = [i.strip() for i in addr_list]
    addr_str = ''.join(addr_list[0:-2])
    return addr_str


class LagouJobItem(scrapy.Item):
    """
    拉勾的职位
    """
    url = scrapy.Field()
    url_object_id = scrapy.Field()
    title = scrapy.Field()
    salary_min = scrapy.Field(input_processor=MapCompose(
        lambda x: int(x.replace('k', '000').replace('K', '000'))))
    salary_max = scrapy.Field(input_processor=MapCompose(
        lambda x: int(x.replace('k', '000').replace('K', '000'))))
    job_city = scrapy.Field(input_processor=MapCompose(
        lambda x: x.replace('/', '')))
    work_years_min = scrapy.Field(input_processor=MapCompose(get_nums))
    work_years_max = scrapy.Field(input_processor=MapCompose(get_nums))
    degree_need = scrapy.Field(input_processor=MapCompose(
        lambda x: x.replace('/', '')))
    job_type = scrapy.Field()
    publish_time = scrapy.Field()
    tags = scrapy.Field(output_processor=Join(','))
    job_advantage = scrapy.Field()
    job_desc = scrapy.Field()
    job_address = scrapy.Field(input_processor=MapCompose(remove_tags, handle_jobaddr))
    company_url = scrapy.Field()
    company_name = scrapy.Field()
    crawl_time = scrapy.Field()
    crawl_update_time = scrapy.Field()

    def insert_db(self):
        try:
            LagouModel.create(url_object_id=self['url_object_id'],
                              url=self['url'],
                              title=self['title'],
                              salary_min=self['salary_min'],
                              salary_max=self['salary_max'],
                              job_city=self['job_city'],
                              work_years_min=self['work_years_min'],
                              work_years_max=self['work_years_max'],
                              degree_need=self['degree_need'],
                              job_type=self['job_type'],
                              publish_time=self['publish_time'],
                              tags=self['tags'],
                              job_advantage=self['job_advantage'],
                              job_desc=self['job_desc'],
                              job_address=self['job_address'],
                              company_url=self['company_url'],
                              company_name=self['company_name'],
                              crawl_time=self['crawl_time'])
        except Exception as e:
            print(e)

Model的实现

class LagouModel(Model):
    url = CharField(max_length=300, verbose_name='URL地址')
    url_object_id = CharField(max_length=50, primary_key=True)
    title = CharField(max_length=100, verbose_name='标题')
    salary_min = IntegerField(verbose_name='薪资最低值')
    salary_max = IntegerField(verbose_name='薪资最高值')
    job_city = CharField(max_length=10)
    work_years_min = IntegerField(verbose_name='工作年限最低值')
    work_years_max = IntegerField(verbose_name='工作年限最高值')
    degree_need = CharField(max_length=30, verbose_name='学历')
    job_type = CharField(max_length=20, verbose_name='工作类型')
    publish_time = CharField(verbose_name='发布时间')
    tags = CharField(verbose_name='标签')
    job_advantage = CharField(max_length=1000, verbose_name='职位诱惑')
    job_desc = TextField(verbose_name='职位描述')
    job_address = CharField(max_length=50, verbose_name='工作地址')
    company_url = CharField(null=True, max_length=100, verbose_name='公司网站')
    company_name = CharField(max_length=100, verbose_name='公司名称')
    crawl_time = DateTimeField(verbose_name='爬取时间')
    crawl_update_time = DateTimeField(null=True, verbose_name='更新时间')

    class Meta:
        database = db
        db_table = 'lagou_job'

Pipline的实现

这里将逻辑都放在Item中了,所以没有做任何改变

class MysqlPipeline(object):

    def process_item(self, item, spider):
        if item.get('front_image_path', None):
            item.insert_db(path=item['front_image_path'])
        else:
            item.insert_db()

反爬虫的一些技巧

随机User-Agent

我们可以在每次请求的时候,都携带不同的 User-Agent ,这样可以很好的混淆网站的反爬虫机制

要实现每次请求都携带不同的 User-Agent ,我们可以自定义一个下载中间件

这里我们使用了一个第三方库 fake_useragent ,它提供了一套随机的 User-Agent 机制

我们还在配置文件中添加了一个配置 RANDOM_UA_TYPE = 'random'

可以通过修改该配置,指定随机 User-Agent 的类型(类型指浏览器类型,默认 random 表示全部随机)

from fake_useragent import UserAgent


class RandomUserAgentMiddleware(object):

    def __init__(self, crawler):
        super(RandomUserAgentMiddleware, self).__init__()
        self.ua = UserAgent()
        self.ua_type = crawler.settings.get('RANDOM_UA_TYPE', 'random')

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)

    def process_request(self, request, spider):

        def get_ua():
            return getattr(self.ua, self.ua_type)

        ua = get_ua()
        request.headers.setdefault('User-Agent', ua)

定义完下载中间件后需要在Settings中声明使用

DOWNLOADER_MIDDLEWARES = {
    'ArticleSpider.middlewares.RandomUserAgentMiddleware': 1,
}

RANDOM_UA_TYPE = 'random'

**注:**别忘了 RANDOM_UA_TYPE

IP代理池

突破IP封锁的方式有很多,使用IP代理不是一个很好的方法,但也是一个比较简单的方式,除了使用代理IP,还可以使用 Tor 的方式,对我们本地的IP进行加密,但这里不做过多介绍

GitHub 上有很多的开源的IP代理池,但这里我们先来实现一个自己的IP代理池

我们先编写一个简单的脚本爬虫获取网上的免费代理IP,然后保存到数据库中

import requests
import MySQLdb
from scrapy.selector import Selector


conn = MySQLdb.connect(host='127.0.0.1', user='root', passwd='77895zlp',
                       db='scrapy_project', charset='utf8')
cursor = conn.cursor()


def crawl_ips():

    headers = {'User-Agent': '...'}

    re = requests.get('http://www.xicidaili.com/nn/', headers=headers)

    selector = Selector(text=re.text)
    all_trs = selector.css('#ip_list tr')

    for tr in all_trs[1:]:
        speed_str = tr.css('.bar::attr(title)').extract_first()
        if speed_str:
            speed = float(speed_str.replace('秒', ''))

        all_texts = tr.css('td::text').extract()
        if all_texts:
            ip = all_texts[0]
            port = all_texts[1]
            proxy_type = all_texts[5]
        try:
            cursor.execute('insert proxy_ip(ip, port, speed, proxy_type)\
            VALUES("{0}", "{1}", {2}, "{3}")'.format(ip, port, speed, proxy_type))
            conn.commit()
        except Exception as e:
            conn.rollback()
            print(e)

定义一个 RandomIp 类,用来获取一个可用的代理IP,逻辑也很简单:

  • 从获取一个随机的数据

  • 对数据进行验证,看是否失效

  • 若失效,而将该数据从数据库删除,并且继续获取一个随机数据

  • 若验证成功,将数据返回

class RandomIp(object):

    def __init__(self):
        self.ip = None
        self.port = None
        self.proxy = None
        self.get_random_ip()

    def get_random_ip(self):
        random = 'SELECT ip, port, proxy_type FROM proxy_ip ORDER BY RAND() LIMIT 1'
        cursor.execute(random)
        for ip, port, proxy_type in cursor.fetchall():
            if self.judge_ip(ip, port, proxy_type):
                self.ip = ip
                self.port = port
                self.proxy = '{0}://{1}:{2}'.format(proxy_type, ip, port)
            else:
                self.get_random_ip()

    def judge_ip(self, ip, port, proxy_type):
        http_url = 'http://www.baidu.com'
        proxy_url = '{0}://{1}:{2}'.format(proxy_type, ip, port)
        try:
            proxy_dict = {proxy_type: proxy_url}
            response = requests.get(http_url, proxies=proxy_dict)
            code = response.status_code
            if 200 <= code < 300:
                return True
            else:
                print('invalid ip and port')
                self.delete_ip(ip)
                return False
        except Exception as e:
            print('invalid ip and port', e)
            self.delete_ip(ip)
            return False

    @staticmethod
    def delete_ip(ip):
        delete_sql = "delete from proxy_ip where ip='{0}'".format(ip)
        cursor.execute(delete_sql)
        conn.commit()
        return True

在中间件中使用,我们可以直接在前面的随机 User-Agent 的中间建后面继续添加一个代理,也可以重新写一个中间件,这里重新定义了一个中间件

class RandomProxyIpMiddleware(object):

    def process_request(self, request, spider):
        random_proxy = RandomIp()
        request.meta['proxy'] = random_proxy.proxy

记得将定义的中间件在配置文件中声明

DOWNLOADER_MIDDLEWARES = {
    ...
    'ArticleSpider.middlewares.RandomProxyIpMiddleware': 2,

导入Elasticsearch

elasticsearch_dsl

python提供了一个elasticsearch的第三方库 elasticsearch_dsl,通过这个库我们可以像ORM操作数据库一样操作elasticsearch

通过pip安装

pipenv install elasticsearch_dsl

创建model

和ORM一样,我们也需要创建一个索引对应的model,它也是一个python类,需要继承 DocType

在创建model之前,我们还需要进行一些初始化操作

from elasticsearch_dsl import *
from elasticsearch_dsl.connections import connections
from elasticsearch_dsl.analysis import CustomAnalyzer as _CustomAnalyzer

__author__ = '骆杨'


es = connections.create_connection(hosts=['127.0.0.1'])


class CustomAnalyzer(_CustomAnalyzer):
    def get_analysis_definition(self):
        return {}


ik_analyser = CustomAnalyzer('ik_max_word', filter=['lowercase'])

然后创建model类

class ArticleType(DocType):
    """
    伯乐在线文章类型
    """
    suggest = Completion(analyzer=ik_analyser)
    title = Text(analyzer='ik_max_word')
    tags = Text(analyzer='ik_max_word')
    url = Keyword()
    url_object_id = Keyword()
    create_date = Date()
    content = Text(analyzer='ik_max_word')
    comment_nums = Integer()
    praise_nums = Integer()
    fav_nums = Integer()
    front_image_url = Keyword()
    front_image_path = Keyword()

    class Meta:
        doc_type = 'article'
        index = 'jobbole'

    class Index:
        doc_type = 'article'
        index = 'jobbole'

最后生成索引需要使用 ArticleType.init()

保存数据

我们同样不在pipeline中直接保存数据,而是将逻辑写在 item

在pipeline中我们只需要调用itemsave_to_es() 方法即可

    def save_to_es(self, **kwargs):
        try:
            article = ArticleType()
            article.title = self['title']
            article.tags = self['tags']
            article.url = self['url']
            article.url_object_id = self['url_object_id']
            article.create_date = self['create_date']
            article.content = remove_tags(self['content'])
            article.comment_nums = self['comment_nums']
            article.praise_nums = self['praise_nums']
            article.fav_nums = self['fav_nums']
            article.front_image_url = self['front_image_url']
            article.front_image_path = kwargs['path']
            # 搜索建议词
            article.suggest = gen_suggests('jobbole',
                                           ((article.title, 10), (article.tags, 7)))

            article.save(index='jobbole')
        except Exception as e:
            print(e)

标签: Scrapy爬虫
添加评论
评论列表
没有更多内容