最近在写一些并发爬虫程序,使用代理池来实现不同的 IP 来快速的执行网络请求,共试了以下三种方法。
第一种方法:使用 concurrent.futures 线程池。
import concurrent.futures
def request_task(url, proxy):
"""
:param url: url链接
:param proxy: 投票的代理
:return: 0 表示失败 1 成功
"""
try:
html = requests.get(
url,
proxies={"http": f"http://{proxy}", "https": f"https://{proxy}"},
timeout=30,
)
if html.status_code == 200:
message = json.loads(html.content, encoding="utf-8")
logger.info(f"{proxy} -> {message}")
if message["message"] == "成功!":
return 1
else:
return 0
else:
return 0
except Exception as e:
pass
return 0
def multi_request_from_proxy(url, proxys):
'''
开启 100 个线程,使用代理池中的代理,并发的对 url 执行函数 request_task
:param url: 待请求的 url
:param proxys: 代理池
:return: None
'''
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
executor.map(request_task, [url] * len(proxys), proxys)
第二种方法,使用 multiprocessing 线程池。
这种方法没有解决 request_task2 中的参数超过 1 个的情况,知道的朋友请赐教。
from multiprocessing.dummy import Pool as ThreadPool
def request_task2(proxy):
"""
:param proxy: 代理
:return: 0 表示失败 1 表示成功
"""
url = 'http://somesite.net.cn'
try:
html = requests.get(
url,
proxies={"http": f"http://{proxy}", "https": f"https://{proxy}"},
timeout=30,
)
# html = requests.get(vote_url, timeout=30)
if html.status_code == 200:
message = json.loads(html.content, encoding="utf-8")
logger.info(f"{proxy} -> {message}")
if message["message"] == "成功!":
return 1
else:
return 0
else:
return 0
except Exception as e:
pass
return 0
def multi_request_from_proxy2(proxys):
'''
使用代理池并发的执行请求函数 request_task2
:param proxys: 代理池
:return: None
'''
with ThreadPool(processes=100) as executor:
executor.map(request_task2, proxys)
第三种方法:采用协程,asyncio。
async def asyn_request_task(url,proxy):
proxyurl = f'http://{proxy}'
# print(proxy)
async with aiohttp.ClientSession() as session:
try:
for i in range(5):
async with session.get(url,proxy = proxyurl,timeout = 30) as resp:
message_str = await resp.text(encoding='utf-8')
message = json.loads(message_str)
if resp.status == 200:
if message["message"] == "成功!"
logger.info(f"{proxy} -> {message}")
else:
break
else:
break
# print("fail")
except Exception as e:
pass
# logger.info(e)
async def asyn_tasks(url,proxys):
tasks = [asyncio.create_task(asyn_request_task(url,proxy)) for proxy in proxys]
await asyncio.gather(*tasks)
asyncio.run(asyn_tasks(url,unused_proxys))
以上三种方法,第三种方法在请求量大时非常高效,第一种和第二种差不多,但第一种方法提供对线程的更多控制,以上供大家在写代码时做参考。
最后再分享一个免费的代理池项目,可以自己部署在本地电脑上使用:
网友评论