Python 并发编程

VogtMarlon 发布于7月前
0 条问题

资料推荐

这里只是学习资料的一个笔记与总结, 更详细、仔细的学习还请各位看官自行看看原始的资料。在此罗列一下参考到的有用的资料。

主要参考资料:

Python并行编程 中文版

译者非常的用心, 原著(英文版)的代码译者应该大部分都亲自测试过。 因为原著很多疑似疏漏的地方, 译者都做了特别的标识。在此鄙视一下由 张龙 翻译并出版成书的版本。 翻译狗屁不通, 上面的代码应该也没跑过。

使用Python进行并发编程-asyncio篇( 一 )

使用Python进行并发编程-asyncio篇( 二 )

异步编程讲得还是满详细的! 学到了很多东西!

基于线程的并行

Python Thread的定义

class threading.Thread(group=None,  # 一般设置为 None ,这是为以后的一些特性预留的
                       target=None, # 当线程启动的时候要执行的函数
                       name=None,   # 线程的名字,默认会分配一个唯一名字 Thread-N
                       args=(),     # 传递给 target 的参数,要使用tuple类型
                       kwargs={})   # 传递给 target 的参数,要使用dict类型
 

例子: 简单的Python Thread 示例代码

import threading
import time
 
def first_function():
    print(threading.currentThread().getName() + str(' is Starting '))
    time.sleep(2)
    print (threading.currentThread().getName() + str(' is Exiting '))
    return
 
def second_function():
    print(threading.currentThread().getName() + str(' is Starting '))
    time.sleep(2)
    print (threading.currentThread().getName() + str(' is Exiting '))
    return
 
def third_function():
    print(threading.currentThread().getName() + str(' is Starting '))
    time.sleep(2)
    print(threading.currentThread().getName() + str(' is Exiting '))
    return
 
if __name__ == "__main__":
    t1 = threading.Thread(name='first_function', target=first_function)
    t2 = threading.Thread(name='second_function', target=second_function)
    t3 = threading.Thread(name='third_function', target=third_function)
    t1.start()
    t2.start()
    t3.start()
    print("main thread!!!")
 

输出示例:

first_function is Starting 
second_function is Starting 
third_function is Starting 
main thread!!!
# 问题: 请问如果多执行几次, 下面的输出结果的顺序是固定的吗?
first_function is Exiting 
second_function is Exiting 
third_function is Exiting 
 
 
# 结论: 测试结果是固定的
 

问题2: 在最底部按如下顺序加入join()

t1.join()
t2.join()
t3.join()
 
"""
结果:第二部分的顺序看起来是随机的
"""
 

问题3: 按照以下顺序开始&join()

t1.start()
t1.join()
 
t2.start()
t2.join()
 
t3.start()
t3.join()
 
print("main thread!!!")
 
"""
输出顺序固定, 如下所示:
first_function is Starting 
first_function is Exiting 
second_function is Starting 
second_function is Exiting 
third_function is Starting 
third_function is Exiting 
main thread!!!
 
我的理解: join() 是让当前线程完成了再回到主线程, 多线程就成了“单线程”在执行了
"""
 

几个线程的事实

参考资料: https://blog.csdn.net/zhiyuan_2007/article/details/48807761

  1. python 默认参数创建线程后,不管主线程是否执行完毕,都会等待子线程执行完毕才一起退出,有无join结果一样
  2. 如果创建线程,并且设置了daemon为true,即thread.setDaemon(True), 则主线程执行完毕后自动退出,不会等待子线程的执行结果。而且随着主线程退出,子线程也消亡。

  3. join方法的作用是阻塞,等待子线程结束,join方法有一个参数是timeout,即如果主线程等待timeout,子线程还没有结束,则主线程强制结束子线程。

  4. 如果线程daemon属性为False, 则join里的timeout参数无效。主线程会一直等待子线程结束。

  5. 如果线程daemon属性为True, 则join里的timeout参数是有效的, 主线程会等待timeout时间后,结束子线程。此处有一个坑,即如果同时有N个子线程join(timeout),那么实际上主线程会等待的超时时间最长为 N * timeout, 因为每个子线程的超时开始时刻是上一个子线程超时结束的时刻。

自定义线程类的实现

  1. 定义一个 Thread 的子类
  2. 覆盖构造函数 __init__(self [,args]) , 可以添加更多的参数
  3. 覆盖 run() 函数, 实现该线程要做的事情
  4. 启动方法: 调用 start() 而不是 run()
  5. 同样的, 也可以调用 join() 函数

一个使用多线程来抓取豆瓣API的例子:

参考: https://zhuanlan.zhihu.com/p/34004447

import threading
import requests
from bs4 import BeautifulSoup
 
class MyThread(threading.Thread):
 
    def __init__(self, i):
        threading.Thread.__init__(self)
        self.i = i
 
    def run(self):
        url = 'https://movie.douban.com/top250?start={}&filter='.format(self.i*25)
        r = requests.get(url)
        soup = BeautifulSoup(r.content, 'html.parser')
        lis = soup.find('ol', class_='grid_view').find_all('li')
        title_lst = []
        for li in lis:
            title = li.find('span', class_="title").text
            title_lst.append(title)
        print( "thread-%s" % self.i, ",".join(title_lst))
 
for i in range(10):
    th = MyThread(i)
    th.start()
 

输出示例: (宽度有限, 用省略号来替代了)

thread-6 心迷宫,纵横四海,荒...
thread-7 燃情岁月,未麻的部屋...
thread-9 绿里奇迹,2001太...
thread-1 蝙蝠侠:黑暗骑士,乱...
thread-8 谍影重重,战争之王,...
thread-0 肖申克的救赎,霸王别...
thread-5 蝙蝠侠:黑暗骑士崛起...
thread-3 猫鼠游戏,沉默的羔羊...
thread-4 消失的爱人,大鱼,一...
thread-2 指环王2:双塔奇兵,...
 

多线程的应用场景

补充:

Python的GIL是什么鬼,多线程性能究竟如何

一些简单的结论:

  1. 全称 Global Interpreter Lock : 全局解释器锁
  2. 作用: CPython 引入的锁机制。 GIL在解释器层面阻止了真正的并行运行
  3. 因此, 如果是CPU密集型的任务, 多线程反而会拖累整体性能

  4. I/O 密集型才是CPython 解释器下多线程的正确应用场景

    注意: 如果读的是本地文件, 也需要读取不同的文件, 否则不一定能提高性能。

    典型应用: 多线程抓取网络数据 / 调用WebAPI 。。。

  5. Why IO: 因为在进行IO调用的时候, GIL会释放相应的锁!

多线程同步

书中介绍了多种同步机制: Lock / RLock / 信号量(semaphore) / 事件 / with / 队列(quene)

我没有细看, 暂时还不需要处理这种场景。 (当前处理的场景还不需要这么复杂。)

多线程运行结果汇总

举个例子: 上面我们通过多线程抓取了豆瓣Top250 的电影名称,那么我们如何直接得到汇总之后的运行结果呢?

基于上面使用线程类的方式

import threading
import requests
from bs4 import BeautifulSoup
 
class MyThread(threading.Thread):
 
    def __init__(self, i):
        threading.Thread.__init__(self)
        self.i = i
        self.result = []    # 新增一个变量用于存放结果
 
    def run(self):
        url = 'https://movie.douban.com/top250?start={}&filter='.format(self.i*25)
        r = requests.get(url)
        soup = BeautifulSoup(r.content, 'html.parser')
        lis = soup.find('ol', class_='grid_view').find_all('li')
        title_lst = []
        for li in lis:
            title = li.find('span', class_="title").text
            title_lst.append(title)
        print( "thread-%s" % self.i, ",".join(title_lst)[:10] + "...")
        self.result = title_lst # 存储运算结果
 
    def get_result(self):   # 专门用于返回运算结果
        return self.result
 
 
result_list = []
thread_list = []
for i in range(10):
    th = MyThread(i)
    th.start()
    thread_list.append(th) # 把全部线程类都放到一个list之中
 
for th in thread_list :     # 遍历线程类list
    th.join()               # 让每个线程都执行完
    result_list.extend(th.get_result()) # 获取线程类的执行结果
 
print("==== final result:", len(result_list))
print(result_list)
 

运行结果如下:

thread-4 消失的爱人,大鱼,一...
thread-8 谍影重重,战争之王,...
thread-1 蝙蝠侠:黑暗骑士,乱...
thread-3 猫鼠游戏,沉默的羔羊...
thread-7 燃情岁月,未麻的部屋...
thread-0 肖申克的救赎,霸王别...
thread-5 蝙蝠侠:黑暗骑士崛起...
thread-2 指环王2:双塔奇兵,...
thread-9 绿里奇迹,2001太...
thread-6 心迷宫,纵横四海,荒...
==== final result: 250
['肖申克的救赎', '霸王别姬', 。。。。。]
 
 

基于 threading.Thread 直接调用的方式

目前没有找到直接从 Thread 类返回结果的方式。

线程池ThreadPool的方式

from multiprocessing.pool import ThreadPool
import requests
from bs4 import BeautifulSoup
 
 
def run(page_no):
    url = 'https://movie.douban.com/top250?start={}&filter='.format(page_no * 25)
    r = requests.get(url)
    soup = BeautifulSoup(r.content, 'html.parser')
    lis = soup.find('ol', class_='grid_view').find_all('li')
    title_lst = []
    for li in lis:
        title = li.find('span', class_="title").text
        title_lst.append(title)
    print("thread-%s" % page_no, ",".join(title_lst)[:10] + "...")
    return title_lst
 
import time
t0 = time.time()
pool = ThreadPool(processes=4)
result_list = pool.map(run, [0,1])
print(len(result_list))
print(result_list)
t1 = time.time()
print("cost time:", (t1 - t0))
 

输出结果:

thread-1 蝙蝠侠:黑暗骑士,乱...
thread-0 肖申克的救赎,霸王别...
2
[['肖申克的救赎', '霸王别姬', ...], 
 ['蝙蝠侠:黑暗骑士', '乱世佳人', ...]]
cost time: 2.3958077430725098
 

注意: 刚才只抓了两页, 因此结果集的长度=2, 现在改成抓取4页, 即有如下改动:

# 原来
result_list = pool.map(run, [0,1])
 
# 改为
result_list = pool.map(run, [0,1,2,3])
 

输出结果:

thread-3 猫鼠游戏,沉默的羔羊...
thread-0 肖申克的救赎,霸王别...
thread-1 蝙蝠侠:黑暗骑士,乱...
thread-2 指环王2:双塔奇兵,...
4
[
    ['肖申克的救赎', '霸王别姬', ...], 
    ['蝙蝠侠:黑暗骑士', '乱世佳人', ...], 
    ['指环王2:双塔奇兵', '教父2', ...], 
    ['猫鼠游戏', '沉默的羔羊', ...]]
cost time: 2.5353810787200928
 

从运行结果上面来说, 时间并没有增加太多(网络开销无法固定)。 如果我们改成抓取10页, 执行速度就要慢得多了(毕竟我们设置线程池的数量为 4 )

线程池

ThreadPool VS Pool

from multiprocessing.pool import ThreadPool
from multiprocessing import Pool
from multiprocessing.dummy import Pool

具体选择哪个线程池, 两个线程池有什么区别? 我也在好奇之中。。。

貌似推荐 dummy.Pool 的人多一些。 两个线程池在Python官网的介绍都比较少。。。

之所以推荐 dummy.Pool 的人多一些, 原因总结如下:

  1. 实现了跟Pool 完全一致的API , 切换线程池、进程池比较方便
  2. ThreadPool 没有文档说明。。。

几种执行方式

  • apply_async
  • apply
  • map_async
  • map

简单说说他们的区别与联系:

  • map & apply

    他们都是同步/阻塞的

    即: map/apply之后直接运行线程/进程,运行结束后再执行之后语句

  • map 对比 apply 就是调用参数不太一样

    def apply(self, func, args=(), kwds={}):
    def map(self, func, iterable, chunksize=None):
  • 带 _async 就是原来同步的基础之上变成是异步执行。直到遇到 wait() 之后才阻塞

几种执行方式例子:

参考: python进程池multiprocessing.Pool和线程池multiprocessing.dummy.Pool实例

正确关闭pool的姿势

需要注意的是, 正确关闭Pool的方式:

pool = ThreadPool(x)
# do something ...
pool.close()    # 先close
pool.join()     # 再join
 

异步编程

一个关键概念: 协程

除了并发之外, 还有一种编程模式:异步。 其中很重要的一个概念就是: 协程 (Coroutine / 微线程,纤程)

引用一个文章的说法: 小米安全中心: 乱谈 Python 并发

Python的线程(包括进程)其实都是对系统原生内核级线程的包装,切换时,需要在内核与用户态空间来来回回,开销明显会大很多,而且多个线程完全由系统来调度,什么时候执行哪个线程是无法预知的。相比而言,协程就轻量级很多,是对线程的一种模拟,原理与内核级线程类似,只不过切换时,上下文环境保存在用户态的堆栈里,协程“挂起”的时候入栈,“唤醒”的时候出栈,所以其调度是可以人为控制的,这也是“协程”名字的来由,大伙协作着来,别总抢来抢去的,伤感情。

简单来说: 协程是一种用户态的轻量级线程。 因此在任务切换的时候要更轻量得多。

futures 做爬虫的例子

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed
import time 
import requests
 
def download(url):
    headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0',
                'Connection':'keep-alive',
                'Host':'example.webscraping.com'}
    response = requests.get(url, headers=headers)
    return response.status_code
 
if __name__ == '__main__':
    urllist = ['http://example.webscraping.com/places/default/view/Afghanistan-1',
               'http://example.webscraping.com/places/default/view/Aland-Islands-2']
 
    start = time.time()               
    pool = ProcessPoolExecutor(max_workers = 2)  # 也可以改成 ThreadPoolExecutor          
    futures = [pool.submit(download,url) for url in urllist]
    for future in futures:
        print('执行中:%s, 已完成:%s' % (future.running(), future.done()))
    print('#### 分界线 ####')
    for future in as_completed(futures, timeout=2):
        print('执行中:%s, 已完成:%s' % (future.running(), future.done()))
        print(future.result())
    end = time.time()
    print('使用多线程--timestamp:{:.3f}'.format(end-start))
 

注意:

  1. Python的这个 concurrent.futures 库感觉跟Java 的很像, 可能相互借鉴与学习的结果。
  2. 不过感觉这样写还略麻烦。 可以考虑使用 aiohttp 来做网络爬虫, 用法跟直接使用 requests 比较像, 但是需要接触到新的关键字: asycn wait / asycn with

  3. 使用了异步编程之后, 感觉整个人生观都变了, 需要注意时刻在你的代码中使用异步操作,你如果在代码中使用同步操作,爬虫并不会报错,但是速度可能会受影响。 (就像在写nodejs, 需要调整到回调思维那样)

异步编程个人经验小结

  1. 心智模式上, 线程池、进程池的思维模式我们还是比较熟悉的, 跟之前的同步思想差别不大

    但是异步编程就差别很多了, 而且很多地方都用到了 asycn / await 的关键字。

  2. 可维护性上,考虑到项目交接等等,我的选择: (IO密集型的任务)

    1. 单进程、单线程同步
    2. 单进程、多线程
    3. 异步编程

    也就是说: 除非性能扛不住, 否则就用最简单的编程模型。

  3. 如果是网络爬虫, aiohttp + asycnio 搭配的执行效率是最高的

    性能数据参考: 使用Python进行并发编程-asyncio篇(一)

  4. 你碰到了CPU密集型? 比如机器学习、数据分析等等~

    个人不推荐使用多进程,各种开销、维护很头疼。 推荐使用消息队列、Celery 等等。

  5. asyncio 还在快速发展之中, 在Python3.7 之中,新增了 asyncio.run 的写法, 否则我们得这样写:

    loop = asyncio.get_event_loop()
    loop.run_until_complete(target_function())
    loop.close()
     

aiohttp做爬虫的例子

import aiohttp
import asyncio
 
 
NUMBERS = range(12)
# URL = 'http://httpbin.org/get?a={}'
URL = "https://www.baidu.com/s?wd={}"
sema = asyncio.Semaphore(10)        # 通过信号量控制并发数
 
final_list = []
 
def update_dict(new_ele) :
    """
    更新final_list, 相当于把异步并发的结果合并到final_list之中。 
    并且在这里控制, 如果长度到达一定的标准, 就可以做下一步的事情。 这一步是同步的。
    """
    final_list.append(new_ele)
    print("============ list length:", len(final_list))
 
    if len(final_list) >= 3 :
        print("3333 sleep 3 seconds, could insert data list into db")
        time.sleep(3)
        final_list = []
 
async def fetch_async(a):
    """
    使用aiohttp发出异步请求
    """
    async with aiohttp.request('GET', URL.format(a)) as r:
        data = await r.text()
    return data
 
async def print_result(a):
    with (await sema):
        r = await fetch_async(a)
        update_dict(r)
        print('fetch({}) = {}'.format(a, r))
 
import time
 
t0 = time.time()
loop = asyncio.get_event_loop()
f = asyncio.wait([print_result(num) for num in NUMBERS])
loop.run_until_complete(f)
 
t1 = time.time()
print("============  cost time:", t1 - t0)
 

一个可以优化的地方: 官网建议重用 ClientSession , 使用方法跟 requests 的 session 比较类似。 在这里图省事, 暂时没有改造成这样的形式。重用Session, 根据我的理解, 抓取同一个域名会比较有效。(么有实际测试过~)

原文链接:https://www.flyml.net/2019/07/07/python-parallel-programming/

查看原文: Python 并发编程

  • smallelephant
  • goldenbutterfly
  • lazypanda
需要 登录 后回复方可回复, 如果你还没有账号你可以 注册 一个帐号。