2022年 11月 5日

python3多线程爬虫

多线程爬虫涉及到队列queue,多线程threading,模块,由于多线程模块我再前面提过,这儿简单提一下queue模块的简单功能。

1. queue模块:详细http://blog.csdn.net/iamaiearner/article/details/9363837

import queue
myqueue = queue.Queue(maxsize = 10)
queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

将一个值放入队列中
myqueue.put(10)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

将一个值从队列中取出
myqueue.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

queue.Queue.qsize() 返回队列的大小
queue.Queue.empty() 如果队列为空,返回True,反之False
queue.Queue.full() 如果队列满了,返回True,反之False
queue.Queue.full 与 maxsize 大小对应
queue.Queue.get([block[, timeout]])获取队列,timeout等待时间
queue.Queue.get_nowait() 相当queue.Queue.get(False)
非阻塞 queue.Queue.put(item) 写入队列,timeout等待时间
queue.Queue.put_nowait(item) 相当queue.Queue.put(item, False)
queue.Queue.task_done() 在完成一项工作之后,queue.Queue.task_done()函数向任务已经完成的队列发送一个信号
queue.Queue.join() 实际上意味着等到队列为空,再执行别的操作

干货:

Python

from <span class=”wp_keywordlink_affiliate”><a href=”https://www.168seo.cn/tag/threading” title=”View all posts in threading” target=”_blank”>threading</a></span> import Thread from queue import Queue from time import sleep #q是任务队列 #NUM是并发线程总数 #JOBS是有多少任务 q = Queue() NUM = 4 JOBS = 16 #具体的处理函数,负责处理单个任务 def do_somthing_using(arguments): print(arguments) #这个是工作进程,负责不断从队列取数据并处理 def working(): while True: arguments = q.get() #默认队列为空时,线程暂停 do_somthing_using(arguments) sleep(1) q.task_done() #开启线程 threads = [] for i in range(NUM): t = Thread(target=working)#线程的执行函数为working threads.append(t) for item in threads: item.setDaemon(True) item.start() #JOBS入队 for i in range(JOBS): q.put(i) #等待所有队列为空、再执行别的语句 q.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

from

View all posts in threading

import

Thread
  

from

queue

import

Queue
  

from

time

import

sleep
  

#q是任务队列  
#NUM是并发线程总数  
#JOBS是有多少任务  
q

=

Queue
(
)
  

NUM

=

4
  

JOBS

=

16
  

#具体的处理函数,负责处理单个任务  
def

do_somthing_using
(
arguments
)
:
  

    
print
(
arguments
)
  
#这个是工作进程,负责不断从队列取数据并处理  
def

working
(
)
:
  

    
while

True
:
  

        
arguments

=

q
.
get
(
)

#默认队列为空时,线程暂停  

        
do_somthing_using
(
arguments
)
  
        
sleep
(
1
)
  
        
q
.
task_done
(
)
  
#开启线程  
threads

=

[
]
  

for

i

in

range
(
NUM
)
:
  

    
t

=

Thread
(
target
=
working
)
#线程的执行函数为working  

    
threads
.
append
(
t
)
  
for

item
in

threads
:
  

    
item
.
setDaemon
(
True
)
  
    
item
.
start
(
)
  
#JOBS入队  
for

i

in

range
(
JOBS
)
:
  

    
q
.
put
(
i
)
  
#等待所有队列为空、再执行别的语句  
q
.
join
(
)
  

有了基础知识,则可以进行多线程爬虫了,好的学习资料有:各种爬虫http://www.pythonclub.org/python-network-application/observer-spider

和http://blog.sina.com.cn/s/articlelist_1549622495_6_1.html,爬虫用到的正则匹配:http://blog.sina.com.cn/s/blog_5c5d5cdf0101jqke.html和http://www.cnblogs.com/huxi/archive/2010/07/04/1771073.html。下面开始我的第一个多线程的爬虫程序:

Python

# coding =utf-8 import queue import os import urllib.request as request import re import <span class=”wp_keywordlink_affiliate”><a href=”https://www.168seo.cn/tag/threading” title=”View all posts in threading” target=”_blank”>threading</a></span> #创建队列 all_net = queue.Queue() count = 0 threads = [] myLock = threading.RLock() #定义抓取网页并且存入all_net中的函数,定义停止条件,防止无限循环抓取。 def obtain_net(url): #路径设置 global count path = ‘D:\\test\\2’ if not os.path.isdir(path): os.makedirs(path) #读取URL数据 urlData = request.urlopen(url).read() data = urlData.decode(‘GBK’) #爬取当前的网页 myLock.acquire() #修改共享数据count的锁 net_path = path +’\\’ + ‘{}.html’.format(count) print(count) count +=1 with open(net_path,’wb’) as file: file.write(urlData) #次数要解码前的数据,不然类型不匹配,所以不能用data file.close() myLock.release() #解开锁 #匹配当前网页里面的网页链接,存在队列里 link_object = re.compile(r'<a href=”(http://.+?)” ‘) for item in link_object.findall(data): all_net.put(item) #把网址存在队列中 def thread(number): global count while count<5: #线程循环 print(‘aaaaa: {}’.format(count)) if all_net.qsize() >= number: for i in range(number): t = <span class=”wp_keywordlink_affiliate”><a href=”https://www.168seo.cn/tag/threading-thread” title=”View all posts in threading.Thread” target=”_blank”>threading.Thread</a></span>(target=obtain_net,args=(all_net.get(),)) t.setDaemon(True) t.start() # threads.append(t) #for item in threads: # item.setDaemon(True) # item.start() # item.join() #等待线程终止 def main(): URL = r’http://www.taobao.com/’ obtain_net(URL) #第一次先获取URL number = 3 thread(number) if __name__ == “__main__”: main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

# coding =utf-8  
import

queue
  

import

os
  

import

urllib
.
request
as

request  

import

re
  

import

threading
  

#创建队列  
all_net

=

queue
.
Queue
(
)
  

count

=

0
  

threads

=

[
]
  

myLock

=

threading
.
RLock
(
)
  

#定义抓取网页并且存入all_net中的函数,定义停止条件,防止无限循环抓取。  
def

obtain_net
(
url
)
:
  

    
#路径设置  
    
global

count  

    
path

=

‘D:\\test\\2’
  

    
if

not

os.path
.
isdir
(
path
)
:
  

        
os
.
makedirs
(
path
)
  
    
#读取URL数据  
    
urlData

=

request
.
urlopen
(
url
)
.
read
(
)
  

    
data

=

urlData
.
decode
(
‘GBK’
)
  

    
#爬取当前的网页  
    
myLock
.
acquire
(
)
  
#修改共享数据count的锁  
    
net_path

=

path

+
‘\\’

+

‘{}.html’
.
format
(
count
)
  

    
print
(
count
)
  
    
count

+=
1
  

    
with

open
(
net_path
,
‘wb’
)

as

file
:
  

        
file
.
write
(
urlData
)
  
#次数要解码前的数据,不然类型不匹配,所以不能用data  
        
file
.
close
(
)
  
          
    
myLock
.
release
(
)

#解开锁  

    
#匹配当前网页里面的网页链接,存在队列里  
    
link_object

=

re
.
compile
(
r
‘<a href=”(http://.+?)” ‘
)
  

    
for

item
in

link_object
.
findall
(
data
)
:
  

        
all_net
.
put
(
item
)

#把网址存在队列中  

def

thread
(
number
)
:
  

    
global

count  

    
while

count
<
5
:

#线程循环  

        
print
(
‘aaaaa: {}’
.
format
(
count
)
)
  
        
if

all_net
.
qsize
(
)

>=

number
:
  

            
for

i

in

range
(
number
)
:
  

                
t

=

threading
.
Thread
(
target
=
obtain_net
,
args
=
(
all_net
.
get
(
)
,
)
)
  

                
t
.
setDaemon
(
True
)
  
                
t
.
start
(
)
  
              
# threads.append(t)  
            
#for item in threads:  
              
# item.setDaemon(True)  
              
# item.start()  
          
# item.join() #等待线程终止  
def

main
(
)
:
  

    
URL

=

r
‘http://www.taobao.com/’
  

    
obtain_net
(
URL
)

#第一次先获取URL  

    
number

=

3
  

    
thread
(
number
)
  
if

__name__

==

“__main__”
:
  

    
main
(
)
  

代理访问网页:http://blog.csdn.net/vah101/article/details/6279423和http://wenku.baidu.com/view/4c30a74fff00bed5b8f31d45.html
http://mayulin.blog.51cto.com/1628315/543559/

Python

import urllib.request as request proxy_handler = request.ProxyHandler({‘http’:’user:passwd@www.baidu.com:3128′}) proxy_auth_handler = request.ProxyBasicAuthHandler() proxy_auth_handler.add_password(‘realm’,’www.baidu.com’,’user’,’passwd’) opener = request.build_opener(proxy_handler,proxy_auth_handler) f = opener.open(‘http://www.baidu.com/’) a = f.read()
1
2
3
4
5
6
7
8
9

import

urllib
.
request
as

request  

  
proxy_handler

=

request
.
ProxyHandler
(
{
‘http’
:
‘user:passwd@www.baidu.com:3128’
}
)
  

proxy_auth_handler

=

request
.
ProxyBasicAuthHandler
(
)
  

proxy_auth_handler
.
add_password
(
‘realm’
,
‘www.baidu.com’
,
‘user’
,
‘passwd’
)
  
  
opener

=

request
.
build_opener
(
proxy_handler
,
proxy_auth_handler
)
  

f

=

opener
.
open
(
‘http://www.baidu.com/’
)
  

a

=

f
.
read
(
)
  

模拟百度登陆:

Python

#-*-coding:utf-8-*- ””’ Created on 2014年1月10日 @author: hhdys ”’ import urllib.request,http.cookiejar,re class Baidu: def login(self): cj = http.cookiejar.CookieJar() opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj)) opener.addheaders = [(‘User-agent’, ‘Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36’)] resp=opener.open(‘http://weigou.baidu.com/’) for c in cj: print(c.name,”====”,c.value) getapiUrl = “https://passport.baidu.com/v2/api/?getapi&class=login&tpl=mn&tangram=true” resp2=opener.open(getapiUrl) getapiRespHtml = resp2.read().decode(“utf-8”) foundTokenVal = re.search(“bdPass\.api\.params\.login_token='(?P<tokenVal>\w+)’;”, getapiRespHtml) if foundTokenVal : tokenVal = foundTokenVal.group(“tokenVal”) print(tokenVal) staticpage = “http://zhixin.baidu.com/Jump/index?module=onesite” baiduMainLoginUrl = “https://passport.baidu.com/v2/api/?login” postDict = { ‘charset’:”utf-8″, ‘token’:tokenVal, ‘isPhone’:”false”, ‘index’:”0″, ‘staticpage’: staticpage, ‘loginType’: “1”, ‘tpl’: “mn”, ‘callback’: “parent.bd__pcbs__n1a3bg”, ‘username’:”*****”, #用户名 ‘password’:”*****”, #密码 ‘mem_pass’:”on”, “apiver”:”v3″, “logintype”:”basicLogin” } postData = urllib.parse.urlencode(postDict); postData = postData.encode(‘utf-8’) resp3=opener.open(baiduMainLoginUrl,data=postData) for c in cj: print(c.name,”=”*6,c.value) if __name__==”__main__”: print(“=”*10,”开始”) bd=Baidu() bd.login()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

#-*-coding:utf-8-*-  
””’
Created on 2014年1月10日
@author: hhdys
”’
  
import

urllib
.
request
,
http
.
cookiejar
,
re
  

class

Baidu
:
  

    
def

login
(
self
)
:
  

        
cj

=

http
.
cookiejar
.
CookieJar
(
)
  

        
opener

=

urllib
.
request
.
build_opener
(
urllib
.
request
.
HTTPCookieProcessor
(
cj
)
)
  

        
opener
.
addheaders

=

[
(
‘User-agent’
,

‘Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36’
)
]
  

        
resp
=
opener
.
open
(
‘http://weigou.baidu.com/’
)
  
        
for

c

in

cj
:
  

            
print
(
c
.
name
,
“====”
,
c
.
value
)
  
        
getapiUrl

=

“https://passport.baidu.com/v2/api/?getapi&class=login&tpl=mn&tangram=true”
  

        
resp2
=
opener
.
open
(
getapiUrl
)
  
        
getapiRespHtml

=

resp2
.
read
(
)
.
decode
(
“utf-8”
)
  

        
foundTokenVal

=

re
.
search
(
“bdPass\.api\.params\.login_token='(?P<tokenVal>\w+)’;”
,

getapiRespHtml
)
  

        
if

foundTokenVal

:
  

            
tokenVal

=

foundTokenVal
.
group
(
“tokenVal”
)
  

            
print
(
tokenVal
)
  
  
            
staticpage

=

“http://zhixin.baidu.com/Jump/index?module=onesite”
  

            
baiduMainLoginUrl

=

“https://passport.baidu.com/v2/api/?login”
  

            
postDict

=

{
  

                        
‘charset’
:
“utf-8”
,
  
                        
‘token’
:
tokenVal
,
  
                        
‘isPhone’
:
“false”
,
  
                        
‘index’
:
“0”
,
  
                        
‘staticpage’
:

staticpage
,
  

                        
‘loginType’
:

“1”
,
  

                        
‘tpl’
:

“mn”
,
  

                        
‘callback’
:

“parent.bd__pcbs__n1a3bg”
,
  

                        
‘username’
:
“*****”
,
  
#用户名  
                        
‘password’
:
“*****”
,
  
#密码  
                        
‘mem_pass’
:
“on”
,
  
                        
“apiver”
:
“v3”
,
  
                        
“logintype”
:
“basicLogin”
  
                        
}
  
            
postData

=

urllib
.
parse
.
urlencode
(
postDict
)
;
  

            
postData

=

postData
.
encode
(
‘utf-8’
)
  

            
resp3
=
opener
.
open
(
baiduMainLoginUrl
,
data
=
postData
)
  
            
for

c

in

cj
:
  

                
print
(
c
.
name
,
“=”
*
6
,
c
.
value
)
  
  
      
if

__name__
==
“__main__”
:
  

    
print
(
“=”
*
10
,
“开始”
)
  
    
bd
=
Baidu
(
)
  
    
bd
.
login
(
)
  

  • zeropython 微信公众号
    5868037 QQ号
    5868037@qq.com QQ邮箱