来自 威尼斯国际官方网站 2019-11-03 04:19 的文章
当前位置: 威尼斯国际官方网站 > 威尼斯国际官方网站 > 正文

21天打造分布式爬虫,小白爬虫之爬虫快跑

5.1.threading模块

简单使用

import threading,time

def coding():
    for x in range(3):
        print('正在写代码%s'%x)
        time.sleep(2)

def drawing():
    for x in range(3):
        print('正在画画%s'%x)
        time.sleep(2)

def main():
    t1 = threading.Thread(target=coding)
    t2 = threading.Thread(target=drawing)
    t1.start()
    t2.start()

if __name__ == '__main__':
    main()

使用多线程时好像在目录切换的问题上存在问题,可以给线程加个锁试试

5.2.生产者和消费者

Lock模式的生产者和消费者

import threading
import random,time

gMoney = 1000
gLock = threading.Lock()
gTotalTimes = 10
gTimes = 0


class Producer(threading.Thread):
    def run(self):
        global gMoney
        global gTimes
        while True:
            money = random.randint(100,1000)
            gLock.acquire()
            #只生产10次,超过就停止,必须把锁给释放掉,否则产生死锁
            if gTimes >= gTotalTimes:
                gLock.release()
                break
            gMoney += money
            print('%s生产了%d元钱,剩余%d元钱' % (threading.current_thread(), money, gMoney))
            #生产一次,次数加1,总共10次
            gTimes += 1
            gLock.release()
            time.sleep(0.5)


class Consumer(threading.Thread):
    def run(self):
        global gMoney
        while True:
            money = random.randint(100,1000)
            gLock.acquire()
            if gMoney >= money:
                gMoney -= money
                print('%s消费了%d元钱,剩余%d元钱' % (threading.current_thread(), money,gMoney))
            else:
                if gTimes >= gTotalTimes:
                    gLock.release()
                    break
            gLock.release()
            time.sleep(0.5)


def main():
    for x in range(5):
        t1 = Producer()
        t1.start()

    for x in range(2):
        t2 = Consumer()
        t2.start()

if __name__ == '__main__':
    main()

Hello 大家好!我又来了。

5.3.下载表情包

网址:

解析:xpath

不用多线程,速度相对会很慢

import requests
from lxml import etree
from urllib import request
import os
import re

def parse_page(url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36',
        'Referer': 'https://movie.douban.com/'
    }
    response = requests.get(url,headers=headers)
    text = response.text
    html = etree.HTML(text)
    imgs = html.xpath("//div[@class='page-content text-center']//img[@class!='gif']")
    for img in imgs:
        # print(etree.tostring(img))
        #图片地址
        img_url = img.get('data-original')
        #图片名字
        alt = img.get('alt')
        #替换掉名字里面的特殊字符
        alt = re.sub(r'[??.,。!!*]','',alt)
        #获取图片的后缀名(.gif .jpg)
        suffix = os.path.splitext(img_url)[1]
        #保存的时候完整的图片名字
        filename = alt + suffix
        request.urlretrieve(img_url,'C:/Users/Administrator/Desktop/images/'+filename)

def main():
    for x in range(1,10):
        url = 'http://www.doutula.com/photo/list/?page=%d'%x
        parse_page(url)

if __name__ == '__main__':
    main()

利用多线程

 main()

  • 定义两个队列,和创建多线程
  • page_queue():存放每一页的url
  • img_queue():存放每一页里面所有的表情的url

Producer()

  • 从page_queue()队列中去每一页的url,直到队列为空则break
  • 用xpath提取出每一页的所有图片的url
  • 把每个图片的url和名字存放到img_queue()队列里面

Consumer()

  • 从img_queue()队列中取出图片的url和名字
  • 下载保存
  • 直到page_queue()和img_queue()两个队列都为空则break

代码

import requests
from lxml import etree
from urllib import request
import os
import re
import threading
from queue import Queue

class Producer(threading.Thread):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36',
        'Referer': 'https://movie.douban.com/'
    }

    def __init__(self, page_queue, img_queue, *args, **kwargs):
        super(Producer, self).__init__(*args, **kwargs)
        self.page_queue = page_queue
        self.img_queue = img_queue

    def run(self):
        while True:
            if self.page_queue.empty():
                break
            url = self.page_queue.get()
            self.parse_page(url)

    def parse_page(self,url):
        response = requests.get(url,headers=self.headers)
        text = response.text
        html = etree.HTML(text)
        imgs = html.xpath("//div[@class='page-content text-center']//img[@class!='gif']")
        for img in imgs:
            # print(etree.tostring(img))
            #图片地址
            img_url = img.get('data-original')
            #图片名字
            alt = img.get('alt')
            #替换掉名字里面的特殊字符
            alt = re.sub(r'[??.,。!!*]','',alt)
            #获取图片的后缀名(.gif .jpg)
            suffix = os.path.splitext(img_url)[1]
            #保存的时候完整的图片名字
            filename = alt + suffix
            self.img_queue.put((img_url,filename))


class Consumer(threading.Thread):
    def __init__(self,page_queue,img_queue,*args,**kwargs):
        super(Consumer, self).__init__(*args,**kwargs)
        self.page_queue = page_queue
        self.img_queue = img_queue

    def run(self):
        while True:
            if self.img_queue.empty() and self.page_queue.empty():
                break
            img_url,filename = self.img_queue.get()
            request.urlretrieve(img_url, 'C:/Users/Administrator/Desktop/images/' + filename)
            print("已下载完一张图片")


def main():
    page_queue = Queue(1000)
    img_queue = Queue(10000)

    for x in range(1,1758):
        url = 'http://www.doutula.com/photo/list/?page=%d'%x
        page_queue.put(url)

    for x in range(10):
        t = Producer(page_queue,img_queue)
        t.start()

    for x in range(10):
        t = Consumer(page_queue,img_queue)
        t.start()

if __name__ == '__main__':
    main()

结果:

图片 1

 

你是不是发现下载图片速度特别慢、难以忍受啊!对于这种问题一般解决办法就是多进程了!一个进程速度慢!我就用十个进程,相当于十个人一起干。速度就会快很多啦!(为什么不说多线程?懂点Python的小伙伴都知道、GIL的存在 导致Python的多线程有点坑啊!)今天就教大家来做一个多进程的爬虫(其实吧、可以用来做一个超简化版的分布式爬虫)

其实吧!还有一种加速的方法叫做“异步”!不过这玩意儿我没怎么整明白就不出来误人子弟了!(因为爬虫大部分时间都是在等待response中!‘异步’则能让程序在等待response的时间去做的其他事情。)

学过Python基础的同学都知道、在多进程中,进程之间是不能相互通信的,这就有一个很坑爹的问题的出现了!多个进程怎么知道那那些需要爬取、哪些已经被爬取了!

这就涉及到一个东西!这玩意儿叫做队列!!队列!!队列!!其实吧正常来说应该给大家用队列来完成这个教程的,比如Tornado 的queue模块。(如果需要更为稳定健壮的队列,则请考虑使用Celery这一类的专用消息传递工具)

不过为了简化技术种类啊!(才不会告诉你们是我懒,嫌麻烦呢!)这次我们继续使用MongoDB。

好了!先来理一下思路:

每个进程需要知道那些URL爬取过了、哪些URL需要爬取!我们来给每个URL设置两种状态:

outstanding:等待爬取的URL

complete:爬取完成的URL

诶!等等我们好像忘了啥?失败的URL的怎么办啊?我们在增加一种状态:

processing:正在进行的URL。

嗯!当一个所有初始的URL状态都为outstanding;当开始爬取的时候状态改为:processing;爬取完成状态改为:complete;失败的URL重置状态为:outstanding。为了能够处理URL进程被终止的情况、我们设置一个计时参数,当超过这个值时;我们则将状态重置为outstanding。

下面开整Go Go Go!

首先我们需要一个模块:datetime(这个模块比内置time模块要好使一点)不会装??不是吧!  pip install datetime

还有上一篇博文我们已经使用过的pymongo

下面是队列的代码:

Python

fromdatetimeimportdatetime,timedelta

frompymongoimportMongoClient,errors

classMogoQueue():

OUTSTANDING=1##初始状态

PROCESSING=2##正在下载状态

COMPLETE=3##下载完成状态

def__init__(self,db,collection,timeout=300):##初始mongodb连接

self.client=MongoClient()

self.Client=self.client[db]

self.db=self.Client[collection]

self.timeout=timeout

def__bool__(self):

"""

这个函数,我的理解是如果下面的表达为真,则整个类为真

至于有什么用,后面我会注明的(如果我的理解有误,请指点出来谢谢,我也是Python新手)

$ne的意思是不匹配

"""

record=self.db.find_one(

{'status':{'$ne':self.COMPLETE}}

)

returnTrueifrecordelseFalse

defpush(self,url,title):##这个函数用来添加新的URL进队列

try:

self.db.insert({'_id':url,'status':self.OUTSTANDING,'主题':title})

print(url,'插入队列成功')

excepterrors.DuplicateKeyErrorase:##报错则代表已经存在于队列之中了

print(url,'已经存在于队列中了')

pass

defpush_imgurl(self,title,url):

try:

self.db.insert({'_id':title,'statue':self.OUTSTANDING,'url':url})

print('图片地址插入成功')

excepterrors.DuplicateKeyErrorase:

print('地址已经存在了')

pass

defpop(self):

"""

这个函数会查询队列中的所有状态为OUTSTANDING的值,

更改状态,(query后面是查询)(update后面是更新)

并返回_id(就是我们的URL),MongDB好使吧,^_^

如果没有OUTSTANDING的值则调用repair()函数重置所有超时的状态为OUTSTANDING,

$set是设置的意思,和MySQL的set语法一个意思

"""

record=self.db.find_and_modify(

query={'status':self.OUTSTANDING},

update={'$set':{'status':self.PROCESSING,'timestamp':datetime.now()}}

)

ifrecord:

returnrecord['_id']

else:

self.repair()

raiseKeyError

defpop_title(self,url):

record=self.db.find_one({'_id':url})

returnrecord['主题']

defpeek(self):

"""这个函数是取出状态为 OUTSTANDING的文档并返回_id(URL)"""

record=self.db.find_one({'status':self.OUTSTANDING})

ifrecord:

returnrecord['_id']

defcomplete(self,url):

"""这个函数是更新已完成的URL完成"""

self.db.update({'_id':url},{'$set':{'status':self.COMPLETE}})

defrepair(self):

"""这个函数是重置状态$lt是比较"""

record=self.db.find_and_modify(

query={

'timestamp':{'$lt':datetime.now()-timedelta(seconds=self.timeout)},

'status':{'$ne':self.COMPLETE}

},

update={'$set':{'status':self.OUTSTANDING}}

)

ifrecord:

print('重置URL状态',record['_id'])

defclear(self):

"""这个函数只有第一次才调用、后续不要调用、因为这是删库啊!"""

self.db.drop()

好了,队列我们做好了,下面是获取所有页面的代码:

Python

fromDownloadimportrequest

frommongodb_queueimportMogoQueue

frombs4importBeautifulSoup

本文由威尼斯国际官方网站发布于威尼斯国际官方网站,转载请注明出处:21天打造分布式爬虫,小白爬虫之爬虫快跑

关键词: