深入理解多线程

2018 年 10 月 12 日 • 阅读数: 102

多线程

python中的GIL

  • global interpreter lock 全局解释器锁

  • python 中的一个线程对应于c语言中的一个线程

  • GIL使得同一个时刻只有一个线程在cpu上执行字节码

  • 也无法将多个线程映射到多个cpu上

  • 但是虽然又GIL也不代表一定是线程安全的

  • 在某些情况下cpython解释器会释放GIL

  • GIL会根据执行的字节码行数以及时间片释放GIL

  • 还有就是在遇到IO操作的时候也会主动释放GIL

import threading

total = 0
def add():
    global total
    for i in range(10000000):
        total += 1
def desc():
    global total
    for i in range(10000000):
        total -= 1
%%time
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)
-588606
CPU times: user 1.44 s, sys: 0 ns, total: 1.44 s
Wall time: 1.45 s

简单的多线程

  • 通过threading模块的Thread类实例化一个线程
  • 一般需要两个参数,target函数名称,args函数需要的参数,需要传递一个tuple
  • 通过start()来运行一个线程
import time

def get_detail_html(url):
    print('get detail html started')
    time.sleep(2)
    print('get detail html end')
def get_detail_url(url):
    print('get detail url started')
    time.sleep(2)
    print('get detail url end')
%%time
import threading

thread1 = threading.Thread(target=get_detail_html,args=("",))
thread2 = threading.Thread(target=get_detail_url,args=("",))
start_time = time.time()
thread1.start()
thread2.start()
print('')
print("last time: {}".format(time.time()-start_time))
get detail html started
get detail url started
last time: 0.002210378646850586
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 2.29 ms
  • 在上面的例子中会存在三个线程,一个是主线程,还有两个是我们自己创建的子进程
  • 默认情况下,主进程会等待子线程结束才结束
  • 可以通过setDaemon来将子进程设置为守护进程,守护进程当主进程结束,直接结束
  • 可以通过join来阻塞一个线程的执行
%%time
"""通过join来达到阻塞的功能,但线程之间依然是并发的执行的"""
thread3 = threading.Thread(target=get_detail_html,args=("",))
thread4 = threading.Thread(target=get_detail_url,args=("",))

thread3.setDaemon(True)
thread4.setDaemon(True)

thread3.start()
thread4.start()
thread3.join()
thread4.join()
get detail html started
get detail url started
get detail html end
get detail url end
get detail html end
get detail url end
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 2 s

继承Thread实现多线程

  • 可以重载init方法和run方法,不要去重载start方法
  • 通过类的方法和直接使用Thread类并无太大的差别
  • 只不过,通过继承Thread类,我们可以实现更多复杂的逻辑
class GetDetailHtml(threading.Thread):
    def __init__(self,name):
        super().__init__(name=name)
    def run(self):
        print('get detail html started')
        time.sleep(2)
        print('get detail html end')
        
class GetDetailUrl(threading.Thread):
    def __init__(self,name):
        super().__init__(name=name)
    def run(self):
        print('get detail url started')
        time.sleep(2)
        print('get detail url end')
%%time
thread1 = GetDetailHtml('get_detail_html')
thread2 = GetDetailUrl('get_detail_url')
thread1.start()
thread2.start()
thread1.join()
thread2.join()
get detail html started
get detail url started
get detail url end
get detail url started
get detail html started
get detail html started
get detail html started
get detail html started
get detail html started
get detail html started
get detail html started
get detail html started
get detail html started
get detail html started
get detail html end
get detail url end
CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 2.01 s

线程间通信

  • 通过全局变量的方式通信,在线程中直接通过global操作全局变量
  • 通过共享变量的方式通信,给每个线程传入相同的变量参数
  • 但多线程访问同一数据,并不是线程安全的,所以一般需要加一把锁来保证数据的安全性
  • 也可以使用线程安全的消息队列来实现线程间通信

队列

  • 队列是一种更加安全的线程间通信方式,其内部实现运用了双端队列
  • 从queue模块中导入Queue,实例化一个Queue,需要一个maxsize参数
  • queue中最基本的就是put和get方法
  • 还有一些基本的方法使用说明:
    qsize 获取队列长度
    empty 判断是否为空
    full 判断队列是否满了
    join 从queue的角度阻塞主线程
    task_done 结束阻塞,它和join一般是成对出现的
from queue import Queue

def get_detail_html(queue):
    while True:
        url = queue.get()
        print('get detail html started')
        time.sleep(2)
        print('get detail html end')       

def get_detail_url(queue):
    while True:
        print('get detail url started')
        time.sleep(4)
        for i in range(20):
            queue.put("http://projectsedu.com/{id}".format(id=i))
        print('get detail url end')
detail_url_queue = Queue(maxsize=1000)
thread_detail_url = threading.Thread(target=get_detail_url,args=(detail_url_queue,))
thread_detail_url.start()
for i in range(10):
    html_thread = threading.Thread(target=get_detail_html,args=(detail_url_queue,))
    html_thread.start()
get detail url started

线程同步

  • 通过 threading.Lock() 创建一把锁
  • 在操作共有数据的代码段之前通过 acquire 方法获取锁
  • 在操作完共有数据的代码段之后通过 release 方法释放锁
  • 但是加上锁之后,因为获取锁和释放锁都需要时间,所以运行速度会降低
  • 而且有可能造成死锁,所以在用锁的时候要非常小心
  • python中也提供了一种可重入的锁RLock
  • 在同一个线程里面,可以连续调用多次 acquire
  • 但一定要注意 acquire 的次数一定要和 release 一致
import threading

total = 0
lock = threading.Lock()
def add():
    global total
    global lock
    for i in range(10000000):
        lock.acquire()
        total += 1
        lock.release()
        
def desc():
    global total
    global lock
    for i in range(10000000):
        lock.acquire()
        total -= 1
        lock.release()
%%time
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)
0
CPU times: user 5.4 s, sys: 384 ms, total: 5.79 s
Wall time: 5.82 s

线程同步的另一个实现方式(条件变量)

  • threading 中的 Condition 可以用于复杂的线程间通信

  • Condition实现了 enterexit 魔法方法,所以可以用with语句控制获取锁和释放锁

  • 实际上 Condition 源码中也是通过 acquire 和 release 来实现的

  • Condition 重要的两个方法是 notify 和 wait 方法

  • wait 方法用于等待一个信号,只有当接收到信号之后,才会执行代码

  • notify 方法用于发生一个信号,由此也就实现了线程间的通信

  • Condition 的启动顺序很重要

  • 在调用 with cond 之后才能调用wait或者notify方法

  • condition 有两层锁,一把底层锁会在线程调用wait方法的时候释放

  • 上层的锁会在每次调用wait方法的时候分配一把,并放入到condition的等待队列中,等待notify方法的唤醒

class XiaoAi(threading.Thread):
    def __init__(self, cond):
        super().__init__(name='小爱同学')
        self.cond = cond

    def run(self):
        with self.cond:
            self.cond.wait()
            print("{} : 在".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 好啊".format(self.name))
            self.cond.notify()


class TianMao(threading.Thread):
    def __init__(self, cond):
        super().__init__(name='天猫精灵')
        self.cond = cond

    def run(self):
        with self.cond:
            print("{} : 小爱同学".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我们来对古诗吧".format(self.name))
            self.cond.notify()
            self.cond.wait()
cond = threading.Condition()
xiaoai = XiaoAi(cond)
tianmao = TianMao(cond)

xiaoai.start()
tianmao.start()
天猫精灵 : 小爱同学
小爱同学 : 在
天猫精灵 : 我们来对古诗吧
小爱同学 : 好啊

信号量

  • 控制并发数量
import time

class HtmlSpider(threading.Thread):
    def __init__(self,sem):
        super().__init__()
        self.sem = sem
    def run(self):
        time.sleep(2)
        print('get html text success')
        self.sem.release()
        
class UrlProducer(threading.Thread):
    def __init__(self,sem):
        super().__init__()
        self.sem = sem
    def run(self):
        for i in range(20):
            self.sem.acquire()
            html_thread = HtmlSpider(self.sem)
            html_thread.start()
sem = threading.Semaphore(5)
url_produce = UrlProducer(sem)
url_produce.start()
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
get html text success
标签: Python多线程

召唤伊斯特瓦尔