深入理解多进程

2018 年 10 月 12 日 • 阅读数: 124

多进程

多线程与多进程的应用场景的区别

  • 对于耗费cpu的操作,多进程优于多线程
  • 对于io操作来说,多线程更好
  • 操作系统对于进程间的切换的代价比线程间切换要大

斐波那契的计算

  • 第一次用最大线程为5的线程池来计算
  • 第二次用最大进程为5的进程池来计算
  • 第三次直接计算
  • 最终的结果都差不多ㄟ( ▔, ▔ )ㄏ
  • 由于本身就是单核的,所以多进程没有任何优势,反而在进程切换上落后于多线程
def fib(n):
    if n <=2:
        return 1
    return fib(n-1) + fib(n-2)
from concurrent.futures import ThreadPoolExecutor, as_completed
%%time
with ThreadPoolExecutor(5) as executor:
    all_task = [executor.submit(fib, (num)) for num in range(30,40)]
    for future in as_completed(all_task):
        print("exe result: {}".format(future.result()))
exe result: 832040
exe result: 1346269
exe result: 2178309
exe result: 3524578
exe result: 5702887
exe result: 9227465
exe result: 14930352
exe result: 24157817
exe result: 39088169
exe result: 63245986
CPU times: user 36.7 s, sys: 28 ms, total: 36.7 s
Wall time: 36.9 s
from concurrent.futures import ProcessPoolExecutor
%%time
with ProcessPoolExecutor(5) as executor:
    all_task = [executor.submit(fib, (num)) for num in range(30,40)]
    for future in as_completed(all_task):
        print("exe result: {}".format(future.result()))
exe result: 832040
exe result: 1346269
exe result: 2178309
exe result: 3524578
exe result: 5702887
exe result: 9227465
exe result: 14930352
exe result: 24157817
exe result: 39088169
exe result: 63245986
CPU times: user 16 ms, sys: 40 ms, total: 56 ms
Wall time: 37 s
%%time
all_task = [fib(num) for num in range(30,40)]
for data in all_task:
    print(data)
832040
1346269
2178309
3524578
5702887
9227465
14930352
24157817
39088169
63245986
CPU times: user 36.1 s, sys: 8 ms, total: 36.1 s
Wall time: 36.2 s

子进程

  • 在linux中通过os模块的fork对象,可以创建出一个子进程
  • 子进程的数据和父线程完全相同
  • 这里创建了子进程后,子进程就会从fork后面的代码开始执行,fork前的代码不会执行
  • 下面的效果是jupyter notebook的bug (╯‵□′)╯︵┻━┻
  • 父进程结束后,子进程依然可以独立运行
import os
import time
%%time
print("start")
pid = os.fork()
print("amor")
if pid == 0:
    print("子进程 {},父进程是:{}".format(os.getpid(),os.getppid()))
else:
    print("我是父进程:{}".format(pid))
time.sleep(2)
start
amor
我是父进程:31717
CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 18.3 ms
start
amor
子进程 31717,父进程是:31493
CPU times: user -5.26e+11 ns, sys: -4.64e+08 ns, total: -5.26e+11 ns
Wall time: 74.2 ms

多进程编程

  • 我们可以使用上面的进程池来实现多进程编程,它的api和线程池的api非常类似
  • 除此之外还可以使用 multiprocessing 来实现多进程,这和通过threading模块来实现多线程也非常类似
import multiprocessing
def get_html(n):
    time.sleep(n)
    print("sub_progress success {}".format(os.getpid()))
    return n
%%time
progress1 = multiprocessing.Process(target=get_html, args=(2,))
progress2 = multiprocessing.Process(target=get_html, args=(3,))
progress1.start()
progress2.start()
progress1.join()
progress2.join()
print("main progress end")
sub_progress success 31766
sub_progress success 31767
main progress end
CPU times: user 0 ns, sys: 8 ms, total: 8 ms
Wall time: 3.1 s
multiprocessing.cpu_count()
1

使用进程池

  • multiprocessing 也提供了一个进程池
  • 使用 multiprocessing.Pool 可以实例化一个进程池
%%time
pool = multiprocessing.Pool(2)
result = pool.apply_async(get_html, args=(3,))
pool.close()
pool.join()
print(result.get())
sub_progress success 31948
3
CPU times: user 4 ms, sys: 12 ms, total: 16 ms
Wall time: 3.15 s
%%time
pool = multiprocessing.Pool(2)
for result in pool.imap(get_html, [1,5,3,4]):
    print(result)
sub_progress success 32351
1
sub_progress success 32351
sub_progress success 32352
5
3
sub_progress success 32351
4
CPU times: user 16 ms, sys: 12 ms, total: 28 ms
Wall time: 8.1 s
%%time
pool = multiprocessing.Pool(2)
for result in pool.imap_unordered(get_html, [1,5,3,4]):
    print(result)
sub_progress success 32364
1
sub_progress success 32364
3
sub_progress success 32365
5
sub_progress success 32364
4
CPU times: user 28 ms, sys: 8 ms, total: 36 ms
Wall time: 8.11 s

多进程间通信

  • 共享全局变量不能适用于多进程
  • 在多线程通信中的Queue在多进程中也是不可用的,但是multiprocessing提供了一个类似的Queue
  • 但multiprocessing中的Queue,不能用于pool进程池,也时候需要用到Manager中得Queue
import time
from multiprocessing import Process, Queue, Manager, Pipe
def producer(queue):
    queue.put("a")
    queue.put("b")
    time.sleep(2)
def consumer(queue):
    time.sleep(2)
    res = queue.get()
    print(res)
%%time
queue = Queue(10)
my_producer = Process(target=producer, args=(queue,))
my_consumer1 = Process(target=consumer, args=(queue,))
my_consumer2 = Process(target=consumer, args=(queue,))
my_producer.start()
my_consumer1.start()
my_consumer2.start()
my_producer.join()
my_consumer1.join()
my_consumer2.join()
a
b
CPU times: user 16 ms, sys: 0 ns, total: 16 ms
Wall time: 2.1 s

Pipe实现进程间通信

  • Pipe只能适用于两个进程间通信
  • Pipe的性能高于Queue
def producer(pipe):
    pipe.send("amor")
def consumer(pipe):
    print(pipe.recv())
recevie_pipe, send_pipe = Pipe()
my_producer = Process(target=producer, args=(send_pipe,))
my_consumer = Process(target=consumer, args=(recevie_pipe,))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()
amor

进程间共享内存

  • 通过multiprocessing中的Manager可以做到共享变量的操作
  • Manager中包含了很多的数据结构,这些数据结构都可以做到进程间内存共享的
def add_data(p_dict, key, value):
    p_dict[key] = value
progress_dict = Manager().dict()
first_progress = Process(target=add_data, args=(progress_dict, "amor", 22))
second_progress = Process(target=add_data, args=(progress_dict, "zxy", 23))
first_progress.start()
second_progress.start()
first_progress.join()
second_progress.join()
print(progress_dict)
{'amor': 22, 'zxy': 23}
标签: Python多进程

召唤伊斯特瓦尔