概念 进程就是操作系统中执行的一个程序,操作系统以进程为单位分配存储空间,每个进程都有自己的地址空间、数据栈以及其他用于跟踪进程执行的辅助数据,操作系统管理所有进程的执行,为它们合理的分配资源。进程可以通过fork或spawn的方式来创建新的进程来执行其他的任务,不过新的进程也有自己独立的内存空间,因此必须通过进程间通信机制(IPC,Inter-Process Communication)来实现数据共享,具体的方式包括管道、信号、套接字、共享内存区等。 一个进程还可以拥有多个并发的执行线索,简单的说就是拥有多个可以获得CPU调度的执行单元,这就是所谓的线程。由于线程在同一个进程下,它们可以共享相同的上下文,因此对于进程而言,线程间的信息共享和通信更加容易。当然在单核DDPU系统中,真正的并发是不可能的,因为在某个时刻能够获得CPU的只有唯一的一个线程,多个线程共享了CPU的执行时间 。使用多线程实现并发编程为程序带来的好处是不言而喻的,最主要的体现是提升程序的性能和改善用户体验。
Python中的多进程 Unix和Linux操作系统上提供了fork()系统调用来创建进程,调用fork()函数的是父进程,创建出的是子进程,子进程是父进程的一个拷贝,但是子进程拥有自己的PID。fork()函数非常特殊,它会返回两次,父进程中可以通过fork()函数的返回值得到子进程的PID,而子进程中的返回值永远都是0.Python的OS模块提供了fork()函数。由于Windows系统没有fork()调用,因此要实现跨平台的多进程编程,可以使用multiprocessing模块的Process类来创建子进程,而且该模块还提供了更高级的封装,例如批量启动进程的进程池(Pool)、用于进程间通信的队列(Queue)和管道(Pipe)等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from random import randintfrom time import time ,sleepdef download_task(filename): print ('开始下载%s...' % filename) time_to_download = randint (5 ,10 ) sleep (time_to_download) print ('%s下载完成!耗费了%d秒' % (filename,time_to_download)) def main (): start = time () download_task ('Python从入门到住院.pdf' ) download_task ('Peking Hot.avi' ) end = time () print ('总共耗费了%.2f秒.' % (end-start)) if __name__ == '__main__' : main ()
这个例子可以得出,如果程序中的代码只能按顺序一点点的往下执行,那么即使执行两个互不相关的下载任务,也需要先等待一个文件下载完成后才能开始下一个下载任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from multiprocessing import Processfrom os import getpgidfrom random import randintfrom time import time ,sleepdef download_task(filename): print ('启动下载进程,进程号[%d].' % getpid ()) print ('开始下载%s...' % filename) time_to_download = randint (5 ,10 ) sleep (time_to_download) print ('%s下载完成!耗费了%d秒' % (filename,time_to_download)) def main (): start = time () p1 = Process (target=download_task,args=('Python从入门到住院.pdf' )) p1.start () p2 = Process (target=download_task,args=('Peking Hot.avi' )) p2.start () p1.join () p2.join () end = time () print ('总共耗费了%.2f秒.' % (end-start)) if __name__ == '__main__' : main ()
上面通过Process类创建了进程对象,通过target参数传入一个函数表示进程启动后要执行的代码,后面args是一个元祖,代表传递给函数的参数。Process和start方法用来启动进程,而join方法表示等待进程执行结束。两个任务同时启动了。
也可以使用subprocess模块中的类和函数来创建和启动子进程,然后通过管道来和子进程通讯
Python中的多线程 Python通过thread模块(现在名为_thread)来实现多线程编程,该模块过于底层,而且很多功能都没有提供,推荐threading模块,该模块对多线程编程提供了更好的面对对象的封装。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from random import randintfrom threading import Threadfrom time import time ,sleepdef download(filename): print ('开始下载%s...' % filename) time_to_download = randint (5 ,10 ) sleep (time_to_download) print ('%s下载完成!耗费%d秒' % (filename,time_to_download)) def main (): start = time () t1 = Thread (target=download,args=('Python从入门到住院.pdf' )) t1.start () t2 = Thread (target=download,args=('Peking Hot.avi' ,)) t2.start () t1.join () t2.join () end = time () print ('总共耗费了%.3f秒' % (end-start)) if __name__ == '__main__' : main ()
直接使用threading模块的Thread类来创建进程,但是我们之前讲过一个非常重要的概念叫“继承”,我们可以从已有的类创建类,因此可以通过继承Thread类的方式来创建自定义的线程类,然后再创建线程对象并启动线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from random import randintfrom threading import Threadfrom time import time,sleepclass DownloadTask (Thread ): def __init__(self , filename ): super().__init__() self._filename = filename def run(self ): print('开始下载%s ...' % self ._filename ) time_to_download = randint(5,10) sleep(time_to_download ) print('%s 下载完成!耗费%d 秒' % (self ._filename ,time_to_download )) def main(): start = time() t1 = DownloadTask ('Python 从入门到住院.pdf' ) t1.start() t2 = DownloadTask ('Peking Hot .avi' ) t2.start() t1.join() t2.join() end = time() print('总共耗费了%.2f 秒' % (end -start )) if __name__ == '__main__': main()
多个线程可以共享进程的内存空间,要实现多个线程间的通信相对简单,设置一个全局变量,多个线程共享这个全局变量即可。但是当多个线程共享同一个变量(称之为资源)的时候,很有可能产生不可控的结果导致程序失效甚至崩溃。如果一个资源被多个线程竞争使用,通常称之为“临界资源”,对“临界资源”的访问需要加上保护,否则资源会处于“混乱”状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from time import sleepfrom threading import Thread,Lockclass Account (object ): def __init__ (self ): self._balance = 0 self._lock = Lock() def deposit (self,money ): self._lock.acquire() try : new_balance = self._balance + money sleep(0.01 ) self._balance = new_balance finally : self._lock.release() @property def balance (self ): return self._balance class AddMoneyThread (Thread ): def __init__ (self,account,money ): super ().__init__() self._account = account self._money = money def run (self ): self._account.deposit(self._money) def main (): account = Account() threads = [] for _ in range (100 ): t = AddMoneyThread(account,1 ) threads.append(t) t.start() for t in threads: t.join() print ('账户余额为:¥%d元' % account.balance) if __name__ == '__main__' : main()
多进程还是多线程 无论是多进程还是多线程,只要数量一多,效率肯定上不去,为什么呢?我们打个比方,假设你不幸正在准备中考,每天晚上需要做语文、数学、英语、物理、化学这5科的作业,每项作业耗时1小时。如果你先花1小时做语文作业,做完了,再花1小时做数学作业,这样,依次全部做完,一共花5小时,这种方式称为单任务模型。如果你打算切换到多任务模型,可以先做1分钟语文,再切换到数学作业,做1分钟,再切换到英语,以此类推,只要切换速度足够快,这种方式就和单核CPU执行多任务是一样的了,以旁观者的角度来看,你就正在同时写5科作业。
但是,切换作业是有代价的,比如从语文切到数学,要先收拾桌子上的语文书本、钢笔(这叫保存现场),然后,打开数学课本、找出圆规直尺(这叫准备新环境),才能开始做数学作业。操作系统在切换进程或者线程时也是一样的,它需要先保存当前执行的现场环境(CPU寄存器状态、内存页等),然后,把新任务的执行环境准备好(恢复上次的寄存器状态,切换内存页等),才能开始执行。这个切换过程虽然很快,但是也需要耗费时间。如果有几千个任务同时进行,操作系统可能就主要忙着切换任务,根本没有多少时间去执行任务了,这种情况最常见的就是硬盘狂响,点窗口无反应,系统处于假死状态。所以,多任务一旦多到一个限度,反而会使得系统性能急剧下降,最终导致所有任务都做不好。
是否采用多任务的第二个考虑是任务的类型,可以把任务分为计算密集型和I/O密集型。计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如对视频进行编码解码或者格式转换等等,这种任务全靠CPU的运算能力,虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低。计算密集型任务由于主要消耗CPU资源,这类任务用Python这样的脚本语言去执行效率通常很低,最能胜任这类任务的是C语言,我们之前提到了Python中有嵌入C/C++代码的机制。
除了计算密集型任务,其他的涉及到网络、存储介质I/O的任务都可以视为I/O密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待I/O操作完成(因为I/O的速度远远低于CPU和内存的速度)。对于I/O密集型任务,如果启动多任务,就可以减少I/O等待时间从而让CPU高效率的运转。
单线程+异步I/O 现代操作系统对I/O操作的改进中最为重要的就是支持异步I/O。如果充分利用操作系统提供的异步I/O支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型。Nginx就是支持异步I/O的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。
在Python语言中,单线程+异步I/O的编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。协程最大的优势就是极高的执行效率,因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销。协程的第二个优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不用加锁,只需要判断状态就好了,所以执行效率比多线程高很多。如果想要充分利用CPU的多核特性,最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
例1:将耗时间的任务放到线程中以获得更好的用户体验 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import time import tkinter import tkinter.messagebox from threading import Threaddef main (): class DownloadTaskHandler (Thread): def run (self): time.sleep (10 ) tkinter.messagebox.showinfo ('提示' ,'下载完成!' ) # 启用下载按钮 button1.config (state = tkinter.NORMAL) def download (): # 禁用下载按钮 button1.config (state = tkinter.DISABLED) # 通过daemon参数将参数设置为守护线程(主程序退出就不再保留执行) # 在线程中处理耗时间的下载任务 DownLoadTaskHandler (daemon=True.start ()) def show_about (): tkinter.messagebox.showinfo ('关于' ,'作者:张三' ) top = tkinter.TK () top.title ('单线程' ) top.geometry ('200×150' ) top.wm_attributes ('-topmost' ,True) panel = tkinter.Frame (top) button1 = tkinter.Button (panel,text='下载' ,command=download) button1.pack (side='left' ) button2 = tkinter.Button (panel,text='关于' ,command=show_about) button2.pack (side='right' ) panel.pack (side='bottom' ) tkinter.mainloop () if __name__ == '__main__' : main ()
例2:使用多进程对复杂任务进行“分而治之” 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from multiprocessing import Process,Queuefrom random import randintfrom time import timedef task_handler(curr_list,result_queue): total = 0 for number in curr_list: total += number result_queue.put( total) def main( ): processes = [] number_lisst = [x for x in range( 1,100000001)] result_queue = Queue() index = 0 # 启动8个进程将数据切片后进行运算 for _ in range( 8): p = Process(target=task_handler,args=(number_lisst[index :index +122500000],result_queue)) index += 12500000 processes.append(p) p.start() # 开始记录所有进程执行完成花费的时间 start = time( ) for p in processes: p.join( ) # 合并执行结果 total = 0 while not result_queue.empty(): total += result_queue.get() print( total) end = time( ) print( 'Execution time:' ,(end -start),'s' ,sep='' ) if __name__ == '__main__' : main( )