网络编程,进程,线程,协程
守护线程
在python中: 守护线程执行效果是一样的,当非守护线程全部结束,守护线程也会跟着立即结束。import threading,time def run(n):print('task---',n,threading.current_thread())time.sleep(1)print('task done--',n) if __name__ == '__main__':for i in range(5):t=threading.Thread(target=run,args=(i,))t.setDaemon(True) #设置为后台线程或前台线程(默认false),通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用 t.start()print('therd done' ,threading.current_thread())
由上面可知道,1.函数的主流程,就是主线程。其他我们定义的线程,都是子线程
2.守护线程的语句要写在start()前,将线程定义为守护线程后,并开启。
3.非守护线程结束,守护线程也跟着结束。所以我们定义的print("task done-", n)语句未能执行。
thread方法说明
t.start() : 激活线程,
t.getName() : 获取线程的名称
t.setName() : 设置线程的名称
t.name : 获取或设置线程的名称
t.is_alive() : 判断线程是否为激活状态
t.isAlive() :判断线程是否为激活状态
t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
t.isDaemon() : 判断是否为守护线程
t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.run() :线程被cpu调度后自动执行线程对象的run方法
GIL全称Global Interpreter Lock(全局解释锁)
为什么存在GIL
由于物理上得限制,各CPU厂商在核心频率上的比赛已经被多核所取代。为了更有效的利用多核处理器的性能,就出现了多线程的编程方式,而随之带来的就是线程间数据一致性和状态同步的困难。即使在CPU内部的Cache也不例外,为了有效解决多份缓存之间的数据同步时各厂商花费了不少心思,也不可避免的带来了一定的性能损失。
Python为了完全支持多线程编程, 但是python解释器的C语言实现部分在完全并行执行时并不是线程安全的。 于是就有了GIL,解释器被全局解释器锁(GIL)保护着,它确保任何时候都只有一个Python线程执行。 GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势 (比如一个使用了多个线程的计算密集型程序只会在一个单CPU上面运行)。
但是有一点要强调的是GIL只会影响到那些严重依赖CPU的程序(比如计算型的)。 如果你的程序大部分只会涉及到I/O,比如网络交互,那么使用多线程就很合适, 因为它们大部分时间都在等待,在等待过程中,当前线程会释放GIL锁。所以,你完全可以放心的创建几千个Python线程, 现代操作系统运行这么多线程没有任何压力,没啥可担心的。
故:
1.计算密集型:CPU操作密集的,使用多进程
2.IO密集型:使用多线程
IO多路复用
I/O多路复用指:通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
Linux中的 select,poll,epoll 都是IO多路复用的机制。
select
select最早于
1983
年出现在
4.2BSD
中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为
1024
,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
poll
poll在
1986
年诞生于System V Release
3
,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。
epoll
直到Linux2.
6
才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.
6
下性能最好的多路I
/
O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select
/
poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。 Windows Python:提供: select Mac Python:提供: select Linux Python:提供: select、poll、epoll 注意:网络操作、文件操作、终端操作等均属于IO操作,对于windows只支持Socket操作,其他系统支持其他IO操作,但是无法检测 普通文件操作 自动上次读取是否已经变化。
socket
tcp(传输控制协议) 是面向连接的 面向流,提供了高可靠性服务
udp(用户数据报协议) 是无连接的,面向消息. 提供高效率服务
发送数据
tcp数据不能为空,会粘包
udp输入空格(回车),那也不是空消息,udp协议会帮你封装上消息头,不会粘包
SOCK_DGRAM 数据报协议
基于UCP的套接字
udp是无链接的,先启动哪一端都不会报错
udp服务端1 ss = socket() #创建一个服务器的套接字 2 ss.bind() #绑定服务器套接字 3 inf_loop: #服务器无限循环 4 cs = ss.recvfrom()/ss.sendto() # 对话(接收与发送) 5 ss.close() # 关闭服务器套接字
udp客户端cs = socket() # 创建客户套接字 comm_loop: # 通讯循环cs.sendto()/cs.recvfrom() # 对话(发送/接收) cs.close() # 关闭客户套接字
udp套接字简单示例
import socket ip_port=('127.0.0.1',9000) BUFSIZE=1024 udp_server_client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)udp_server_client.bind(ip_port)while True:msg,addr=udp_server_client.recvfrom(BUFSIZE)print(msg,addr)udp_server_client.sendto(msg.upper(),addr)udp服务端
import socket ip_port=('127.0.0.1',9000) BUFSIZE=1024 udp_server_client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)while True:msg=input('>>: ').strip()if not msg:continueudp_server_client.sendto(msg.encode('utf-8'),ip_port)back_msg,addr=udp_server_client.recvfrom(BUFSIZE)print(back_msg.decode('utf-8'),addr) 复制代码udp客户端
由于udp无连接,所以可以同时多个客户端去跟服务端通信
import socket ip_port=('127.0.0.1',8081) udp_server_sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) #买手机 udp_server_sock.bind(ip_port)while True:qq_msg,addr=udp_server_sock.recvfrom(1024)print('来自[%s:%s]的一条消息:\033[1;44m%s\033[0m' %(addr[0],addr[1],qq_msg.decode('utf-8')))back_msg=input('回复消息: ').strip()udp_server_sock.sendto(back_msg.encode('utf-8'),addr)udp服务端
import socket BUFSIZE=1024 udp_client_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)qq_name_dic={'狗哥alex':('127.0.0.1',8081),'瞎驴':('127.0.0.1',8081),'一棵树':('127.0.0.1',8081),'egon':('127.0.0.1',8081), }while True:qq_name=input('请选择聊天对象: ').strip()while True:msg=input('请输入消息,回车发送: ').strip()if msg == 'quit':breakif not msg or not qq_name or qq_name not in qq_name_dic:continueudp_client_socket.sendto(msg.encode('utf-8'),qq_name_dic[qq_name])back_msg,addr=udp_client_socket.recvfrom(BUFSIZE)print('来自[%s:%s]的一条消息:\033[1;44m%s\033[0m' %(addr[0],addr[1],back_msg.decode('utf-8')))udp_client_socket.close()udp客户端1
import socket BUFSIZE=1024 udp_client_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)qq_name_dic={'狗哥alex':('127.0.0.1',8081),'瞎驴':('127.0.0.1',8081),'一棵树':('127.0.0.1',8081),'egon':('127.0.0.1',8081), }while True:qq_name=input('请选择聊天对象: ').strip()while True:msg=input('请输入消息,回车发送: ').strip()if msg == 'quit':breakif not msg or not qq_name or qq_name not in qq_name_dic:continueudp_client_socket.sendto(msg.encode('utf-8'),qq_name_dic[qq_name])back_msg,addr=udp_client_socket.recvfrom(BUFSIZE)print('来自[%s:%s]的一条消息:\033[1;44m%s\033[0m' %(addr[0],addr[1],back_msg.decode('utf-8')))udp_client_socket.close()udp客户端2
基于UDP并发(udp最稳定512字节)
#server import socketserver class MyUDPhandler(socketserver.BaseRequestHandler):def handle(self):print(self.request) #request值如下: udp没有链接,只有发消息时,触发链接,#(b'a', <socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8100)>)self.request[1].sendto(self.request[0].upper(),self.client_address) #发消息给client端,发送data数据的大写形式 if __name__=="__main__":s=socketserver.ThreadingUDPServer(("127.0.0.1",8100),MyUDPhandler)s.serve_forever() #client from socket import * udp_client=socket(AF_INET,SOCK_DGRAM) while True:msg=input(">>:").strip()udp_client.sendto(msg.encode("utf-8"),("127.0.0.1",8100))data,server_addr=udp_client.recvfrom(1024)print(data.decode("utf-8"))#启动服务端 在启动客户端,(启动2个client端) #客户端直接回车(会显示返回数据为空,不会报错),再输入aa(会显示AA,)
进程与线程的历史
我们都知道计算机是由硬件和软件组成的。硬件中的CPU是计算机的核心,它承担计算机的所有任务。 操作系统是运行在硬件之上的软件,是计算机的管理者,它负责资源的管理和分配、任务的调度。 程序是运行在系统上的具有某种功能的软件,比如说浏览器,音乐播放器等。 每次执行程序的时候,都会完成一定的功能,比如说浏览器帮我们打开网页,为了保证其独立性,就需要一个专门的管理和控制执行程序的数据结构——进程控制块。 进程就是一个程序在一个数据集上的一次动态执行过程。 进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
在早期的操作系统里,计算机只有一个核心,进程执行程序的最小单位,任务调度采用时间片轮转的抢占式方式进行进程调度。每个进程都有各自的一块独立的内存,保证进程彼此间的内存地址空间的隔离。 随着计算机技术的发展,进程出现了很多弊端,一是进程的创建、撤销和切换的开销比较大,二是由于对称多处理机(对称多处理机(SymmetricalMulti-Processing)又叫SMP,是指在一个计算机上汇集了一组处理器(多CPU),各CPU之间共享内存子系统以及总线结构)的出现,可以满足多个运行单位,而多进程并行开销过大。 这个时候就引入了线程的概念。 线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合 和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能。 线程没有自己的系统资源,只拥有在运行时必不可少的资源。但线程可以与同属与同一进程的其他线程共享进程所拥有的其他资源。
进程与线程之间的关系
线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。线程可与属于同一进程的其它线程共享进程所拥有的全部资源,但是其本身基本上不拥有系统资源,只拥有一点在运行中必不可少的信息(如程序计数器、一组寄存器和栈)。
进程与线程的区别:
1.进程,内存独立,线程共享同一进程的内存
2.进程是资源的整合,线程是执行单位
3.进程之间不能直接互相访问,线程可以互相通信
4.创建新进程非常消耗系统资源,线程非常轻量,只保存线程需要运行时的必要数据。如上下文,程序堆栈。
5.同一进程里的线程是可以互相控制,父进程是可以子进程。
python线程
Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。
应用场景程序====》至少有一个进程====》至少有一个线程
IO密集型:线程
计算密集型:进程
信号量(允许同一时间几个线程访问公共数据,最大限制是5)
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
from threading import Thread,current_thread,Semaphore #Semaphore限制数据库连接数 import time,random #randomrandom模块用于生成随机数。 sm=Semaphore(5) def work():sm.acquire()print('%s 去吃饭' %current_thread().getName())time.sleep(random.randint(1,3))sm.release() if __name__ == '__main__':for i in range(20):t=Thread(target=work)t.start()
事件锁(Event)
Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()为False时就阻塞当前线程,知道结束阻塞状态
Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。
构造方法:
Event()
实例方法:
is_Set(): 当内置标志为True时返回True。
set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。
clear(): 将标志设为False。
wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。
from threading import Thread,current_thread,Event import time event=Event() def conn_mysql():count=1while not event.is_set():if count>3:raise ConnectionAbortedError('链接失败!')print('%s 等待%s次链接mysql' %(current_thread().getName(),count))event.wait(0.5)count+=1print('%s 正在检查mysql状态' %current_thread().getName()) def check_mysql():print('%s 正在检查mysql状态' %current_thread().getName())time.sleep(1)event.set() if __name__ == '__main__':t1=Thread(target=conn_mysql)t2=Thread(target=check_mysql)check=Thread(target=check_mysql)t1.start()t2.start()check.start()
线程开启的两种方式:
#1.使用替换threading模块提供的Thread from threading import Thread def task():print('is runing.....') if __name__ == '__main__':t=Thread(target=task)t.start()print('主')
#2.自定义类,继承Thread from threading import Thread class MyThread(Thread):def __init__(self,name):super().__init__()self.name=namedef run(self):print('%s is runing.....' %self.name) if __name__ == '__main__':t=MyThread('gongxu')t.start()print('主')
#同一个进程内的多个线程的pid,都是相同的,是该进程的pid from multiprocessing import Process import os def task():print('%s is runing.....') if __name__ == '__main__':t1=Process(target=task)t2=Process(target=task)t1.start()t2.start()print('主',os.getpid())
多线程共享一个进程内的资源
from threading import Thread from multiprocessing import Process n=100 def work():global nn=0 if __name__ == '__main__':t=Thread(target=work)t.start() #同一进程内的多个线程,共享该进程的地址空间 t.join()print('主',n)p=Process(target=work)p.start()p.join() #多个进程地址空间是隔离的print('主',n)
同一进程内开启多线程
#强调:主线程从执行层面上代表了其所在进程的执行过程 from threading import Thread msg_l=[] fromat_l=[] def talk():while True:msg=input('>>>>>:').strip()msg_l.append(msg) def fromat():while True:if msg_l:data=msg_l.pop()fromat_l.append(data.uppper()) def save():while True:if fromat_l:data=fromat_l.pop()with open('db.txt','a') as f:f.write('%s\n' %data) if __name__ == '__main__':t1=Thread(target=talk)t2=Thread(target=fromat)t3=Thread(target=save)t1.start()t2.start()t3.start()
死锁(Lock)
from threading import Thread,Lock import time murexA=Lock() murexB=Lock() class Mythread(Thread):def run(self):self.f1()self.f2()def f1(self):murexA.acquire()print('\33[45m%s 抢到A锁\033[0m' %self.name)murexB.acquire()print('\38[45m%s 抢到B锁\038[0m' % self.name)murexA.release()murexB.release()def f2(self):murexB.acquire()print('\38[45m%s 抢到B锁\038[0m' % self.name)murexA.acquire()print('\33[45m%s 抢到A锁\033[0m' % self.name)murexB.release()murexA.release() if __name__ == '__main__':for i in range(20):t=Mythread()t.start()
递归锁(RLock,说白了就是在一个大锁中还要再包含子锁,在同一线程内,程序不会阻塞)
from threading import Thread,RLock import time murex=RLock() class Mythread(Thread):def run(self):self.f1()self.f2()def f1(self):murex.acquire()print('\33[45m%s 抢到A锁\033[0m' %self.name)murex.acquire()print('\38[45m%s 抢到B锁\038[0m' % self.name)murex.release()murex.release()def f2(self):murex.acquire()print('\38[45m%s 抢到B锁\038[0m' % self.name)time.sleep(1)murex.acquire()print('\33[45m%s 抢到A锁\033[0m' % self.name) murex.release()murex.release() if __name__ == '__main__':for i in range(20):t=Mythread()t.start()
综上所述,为了避免代码的错误,所以不论是需要多重锁还是不需要,都推荐直接写 rLock = threading.RLock()
Python标准模块--concurrent.futures
concurrent.futures模块是在Python3.2中添加的。根据Python的官方文档,concurrent.futures模块提供给开发者一个执行异步调用的高级接口。concurrent.futures基本上就是在Python的threading和multiprocessing模块之上构建的抽象层,更易于使用。尽管这个抽象层简化了这些模块的使用,但是也降低了很多灵活性,所以如果你需要处理一些定制化的任务,concurrent.futures或许并不适合你。
concurrent.futures包括抽象类Executor,它并不能直接被使用,所以你需要使用它的两个子类:ThreadPoolExecutor或者ProcessPoolExecutor。正如你所猜的,这两个子类分别对应着Python的threading和multiprocessing接口。这两个子类都提供了池,你可以将线程或者进程放入其中。
模块使用
#进程池 from concurrent.futures import ProcessPoolExecutor import os,time,random def work(n):print('%s is runing......' %os.getpid())time.sleep(random.randint(1,3))return n**2 if __name__ == '__main__':p=ProcessPoolExecutor()objs=[]for i in range(10):obj=p.submit(work,i)objs.append(obj)p.shutdown()for obj in objs:print(obj.result())
#线程池 from concurrent.futures import ThreadPoolExecutor import os,time,random def work(n):print('%s is runing......' %os.getpid())time.sleep(random.randint(1,3))return n**2 if __name__ == '__main__':p=ThreadPoolExecutor()objs=[]for i in range(10):obj=p.submit(work,i)objs.append(obj)p.shutdown()for obj in objs:print(obj.result())
1、threading模块
threading 模块建立在 _thread 模块之上。thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。
import threading import timedef worker(num):"""thread worker function:return:"""time.sleep(1)print("The num is %d" % num)returnfor i in range(20):t = threading.Thread(target=worker,args=(i,),name=“t.%d” % i)t.start()
上述代码创建了20个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。
Thread方法说明
t.start() : 激活线程,
t.getName() : 获取线程的名称
t.setName() : 设置线程的名称
t.name : 获取或设置线程的名称
t.is_alive() : 判断线程是否为激活状态
t.isAlive() :判断线程是否为激活状态
t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之前才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
t.isDaemon() : 判断是否为守护线程
t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.run() :线程被cpu调度后自动执行线程对象的run方法
2、线程锁threading.RLock和threading.Lock
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。为了保证数据的准确性,引入了锁的概念。所以,可能出现如下问题:
例:假设列表A的所有元素就为0,当一个线程从前向后打印列表的所有元素,另外一个线程则从后向前修改列表的元素为1,那么输出的时候,列表的元素就会一部分为0,一部分为1,这就导致了数据的不一致。锁的出现解决了这个问题。
import threading import timeglobals_num = 0lock = threading.RLock()def Func():lock.acquire() # 获得锁 global globals_numglobals_num += 1time.sleep(1)print(globals_num)lock.release() # 释放锁 for i in range(10):t = threading.Thread(target=Func)t.start()
threading.RLock和threading.Lock 的区别
RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。
import threading lock = threading.Lock() #Lock对象 lock.acquire() lock.acquire() #产生了死琐。 lock.release() lock.release() import threading rLock = threading.RLock() #RLock对象 rLock.acquire() rLock.acquire() #在同一线程内,程序不会堵塞。 rLock.release() rLock.release()
threading.Condition
一个condition变量总是与某些类型的锁相联系,这个可以使用默认的情况或创建一个,当几个condition变量必须共享和同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪。
condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 acquire() 和 release() 会调用与锁相关联的相应的方法。
其他和锁关联的方法必须被调用,wait()方法会释放锁,当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,
Condition类实现了一个conditon变量。 这个conditiaon变量允许一个或多个线程等待,直到他们被另一个线程通知。 如果lock参数,被给定一个非空的值,,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。
- wait(timeout=None) : 等待通知,或者等到设定的超时时间。当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError 异常。 wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前 会一直阻塞。wait() 还可以指定一个超时时间。
如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。
注意: notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait() 调用。除非线程调用notify()和notify_all()之后放弃了锁的所有权。
在典型的设计风格里,利用condition变量用锁去通许访问一些共享状态,线程在获取到它想得到的状态前,会反复调用wait()。修改状态的线程在他们状态改变时调用 notify() or notify_all(),用这种方式,线程会尽可能的获取到想要的一个等待者状态。
例子: 生产者-消费者模型
import threading import time def consumer(cond):with cond:print("consumer before wait")cond.wait()print("consumer after wait")def producer(cond):with cond:print("producer before notifyAll")cond.notifyAll()print("producer after notifyAll")condition = threading.Condition() c1 = threading.Thread(name="c1", target=consumer, args=(condition,)) c2 = threading.Thread(name="c2", target=consumer, args=(condition,))p = threading.Thread(name="p", target=producer, args=(condition,))c1.start() time.sleep(2) c2.start() time.sleep(2) p.start()
生产者消费者模型
在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产 生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商 品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模型。结构图如下:
生产者消费者模型的优点:
1、解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化, 可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
举个例子,我们去邮局投递信件,如果不使用邮筒(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他(光凭身上穿的制服,万一有人假冒,就惨了)。这就产生和你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮筒相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。
2、支持并发
由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区作为桥梁连接,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区了拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。
接上面的例子,如果我们不使用邮筒,我们就得在邮局等邮递员,直到他回来,我们把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞),或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。
3、支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。 等生产者的制造速度慢下来,消费者再慢慢处理掉。
为了充分复用,我们再拿寄信的例子来说事。假设邮递员一次只能带走1000封信。万一某次碰上情人节(也可能是圣诞节)送贺卡,需要寄出去的信超过1000封,这时 候邮筒这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮筒中,等下次过来 时再拿走。
在一个程序中实现又有生产者又有消费者,生产者不断生产,消费者不断消费,达到并行数据安全完整交互的目的。所以会有消息队列的关键字产生,队列是典型的生产者消费者模型
多线程中的生产者和消费者模型:
生产者和消费者可以用多线程实现,它们通过Queue队列进行通信。
import queue import threading import time q = queue.Queue() # 生成者(client) def productor(arg):# 序号加包子,将做好的包子放到篮子(队列)里q.put(str(arg) + '包子')print("product",arg,"包子") """ join() 保持阻塞状态,直到处理了队列中的所有项目为止。在将一个项目添加到该队列时,未完成的任务的总数就会增加。 当使用者线程调用 task_done() 以表示检索了该项目、并完成了所有的工作时,那么未完成的任务的总数就会减少。当未完成的任务的总数减少到零时,join()就会结束阻塞状态。 """q.join() #用来判断队列中是否为空,如果为空,则继续执行下面的代码。否则下面的代码阻塞print( "包子被吃完") # 创建30个包子 for i in range(4):t = threading.Thread(target=productor, args=(i,))t.start() # ============================================================== # # 消费者(server) def consumer(arg):while True:# arg(0-3)吃包子得人, q.get()从篮子(队列)里取包子,包子有序号print(arg, "eat",q.get())time.sleep(1.5)q.task_done() #用来判断队列中是否为空 # 三个线程一起吃包子 for j in range(2):t = threading.Thread(target=consumer, args=(j,))t.start()
queue模块
如前所述,当多个线程需要共享数据或者资源的时候,可能会使得线程的使用变得复杂。线程模块提供了许多同步原语,包括信号量、事件和锁。当这些选项存在时,最佳实践是转而关注于使用队列。相比较而言,队列更容易处理,并且可以使得线程编程更加安全,因为它们能够有效地传送单个线程对资源的所有访问,并支持更加清晰的、可读性更强的设计模式。
那什么是队列呢?
举例来说,我们去加油站加油。此时车就会自动排成一排形成这一排就是队列。
如果此时加油站就一个加油台(相当于线程),那么所有的车(相当于数据)就只能排着,同时人(相当于线程)也要跟随车在这儿等着。这种队列会形成线程的同步阻塞。
那么又如我们去人员爆满的高端酒店住宿,我们到了酒店门口车太多,就得排队。但是因为酒店有专门的3位停靠司机(处理队列数据的多个线程),所以我们(需要数据的线程)可以直接下车(数据、资源)并去酒店办理业务,而让车一直在那儿排着就行,由3位司机依次将队伍的车停完。停完了车然后再把钥匙交给我们,我们需要车的时候再去取就行。这种队列就是我们python中的队列。
特点:1.队列的数据可以由指定的多个线程分别同步处理
2.发送数据进队列的线程不用等待数据被执行。
3.线程安全
queue的作用:
1.解耦合(解除线程间的关系),
2.提高效率
queue队列模式:
class queue.Queue(maxsize=0)
-
构造一个FIFO(first in first out 先进先出)队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
- class queue.LifoQueue(maxsize=0)
-
构造一个LIFO(先进后出)队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
出现于版本2.6.
- class queue.PriorityQueue(maxsize=0)
-
构造一个优先队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
下面以先进先出队列的为例(另外两个都一样):
在 Python 2中,queue模块被重命名为 Queue 。其他都一样。请猛戳:queue
import queueq = queue.Queue(maxsize=0) # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。 q.join() # 等到队列为kong的时候,在执行别的操作 q.qsize() # 返回队列的大小 (不可靠) q.empty() # 当队列为空的时候,返回True 否则返回False (不可靠) q.full() # 当队列满的时候,返回True,否则返回False (不可靠) q.put(item, block=True, timeout=None) # 将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置, 为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,如果队列无法给出放入item的位置,则引发 queue.Full 异常 q.get(block=True, timeout=None) # 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞, 若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。 q.put_nowait(item) # 等效于 put(item,block=False) q.get_nowait() # 等效于 get(item,block=False)
看下面的例子如果队列里没有值了怎么办?他会等待直到有数据为止:
import queue q = queue.Queue() # 调用队列生成对象 q.put(1) #存放第一个值到队列 q.put(2) #存放第二个值到队列 a = q.get() # 获取队列的第一个值 print('get frist one:%s' % a) b = q.get() # 获取队列的第二个值 print('get second one:%s' % b) c = q.get() # 获取队列的第三个值 #因为此时没有第三个值所以就会形成阻塞,一直等到put()进去第三个值 print('get third one:%s' % c) #结果: ''' get frist one:1 get second one:2 #这里没有获取到值堵塞住,一直在等待着值进来~ '''
- 如果不想让他等待,不管是否队列里都取数据,可以使用
get_nowait()
,但是如果队列中没有数据就会报错!
import queue q = queue.Queue() # 调用队列生成对象 q.put(1) #存放第一个值到队列 q.put(2) #存放第二个值到队列try:a = q.get() # 获取队列的第一个值print('get frist one:%s' % a)b = q.get() # 获取队列的第二个值print('get second one:%s' % b)c = q.get_nowait() # 获取队列的第三个值,使用:get_nowait() 不堵塞! #等同于q.get(block=False)print('get third one:%s' % c) except queue.Empty as q_error:print('The Queue is empty!')
- 如果队列为空的时候可以通过异常处理进行捕获:
import queue q = queue.Queue() # 调用队列生成对象 q.put(1) #存放第一个值到队列 q.put(2) #存放第二个值到队列 try:a = q.get() # 获取队列的第一个值print('get frist one:%s' % a)b = q.get() # 获取队列的第二个值print('get second one:%s' % b)c = q.get_nowait() # 获取队列的第三个值,使用:get_nowait() 不堵塞!print('get third one:%s' % c) except queue.Empty as q_error:print('The Queue is empty!')
- 同样的如果队列长度为2,如果队列满了之后,同样他也是等待,直到有位置才会继续如下代码:
import queue q = queue.Queue(2) # 调用队列生成对象,2:设置队列长度为2 q.put(1) # 存放第一个值到队列 print('put value 1 done') q.put(2) # 存放第二个值到队列 print('put vlaue 2 done') q.put(3) # 存放第三个值到队列 print('put value 3 done') #结果: ''' put value 1 done put vlaue 2 done #这里会一直等待~ '''
- 同样如果存放数值的时候如果不想让他等待,使用
put_nowait()
但是队列无法存放后会报错!
import queue q = queue.Queue(2) # 调用队列生成对象,2:设置队列长度为2 q.put(1) # 存放第一个值到队列 print('put value 1 done') q.put(2) # 存放第二个值到队列 print('put vlaue 2 done') # q.put(33, block=False) # 不堵塞 # q.put(33, block=False, timeout=2) # 不堵塞,等待2秒 q.put_nowait(3) # 存放第三个值到队列,使用:put_nowait() 不堵塞! #等同于q.put(block=False) print('put value 3 done')
自己做个线程池
# 简单往队列中传输线程数 import threading import time import queueclass Threadingpool():def __init__(self,max_num = 10):self.queue = queue.Queue(max_num)for i in range(max_num):self.queue.put(threading.Thread)def getthreading(self):return self.queue.get()def addthreading(self):self.queue.put(threading.Thread)def func(p,i):time.sleep(1)print(i)p.addthreading()if __name__ == "__main__":p = Threadingpool()for i in range(20):thread = p.getthreading()t = thread(target = func, args = (p,i))t.start()方法一
#往队列中无限添加任务 import queue import threading import contextlib import timeStopEvent = object()class ThreadPool(object):def __init__(self, max_num):self.q = queue.Queue()self.max_num = max_numself.terminal = Falseself.generate_list = []self.free_list = []def run(self, func, args, callback=None):"""线程池执行一个任务:param func: 任务函数:param args: 任务函数所需参数:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数):return: 如果线程池已经终止,则返回True否则None"""if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:self.generate_thread()w = (func, args, callback,)self.q.put(w)def generate_thread(self):"""创建一个线程"""t = threading.Thread(target=self.call)t.start()def call(self):"""循环去获取任务函数并执行任务函数"""current_thread = threading.currentThreadself.generate_list.append(current_thread)event = self.q.get() # 获取线程while event != StopEvent: # 判断获取的线程数不等于全局变量 func, arguments, callback = event # 拆分元祖,获得执行函数,参数,回调函数try:result = func(*arguments) # 执行函数status = Trueexcept Exception as e: # 函数执行失败status = Falseresult = eif callback is not None:try:callback(status, result)except Exception as e:pass# self.free_list.append(current_thread)# event = self.q.get()# self.free_list.remove(current_thread) with self.work_state():event = self.q.get()else:self.generate_list.remove(current_thread)def close(self):"""关闭线程,给传输全局非元祖的变量来进行关闭:return:"""for i in range(len(self.generate_list)):self.q.put(StopEvent)def terminate(self):"""突然关闭线程:return:"""self.terminal = Truewhile self.generate_list:self.q.put(StopEvent)self.q.empty()@contextlib.contextmanagerdef work_state(self):self.free_list.append(threading.currentThread)try:yieldfinally:self.free_list.remove(threading.currentThread)def work(i):print(i)return i +1 # 返回给回调函数def callback(ret):print(ret)pool = ThreadPool(10) for item in range(50):pool.run(func=work, args=(item,),callback=callback)pool.terminate() # pool.close()方法二
python进程
multiprocessing模块在Python2.6中引入。最初的multiprocessing是由Jesse Noller和Richard Oudkerk在PEP 371中定义。就像你可以在threading模块中使用多个线程一样,multiprocessing模块允许你使用多个进程。当你使用多个进程时,你可以避免GIL锁,并充分利用机器的多处理器。multiprocessing是python的多进程管理包,和threading.Thread类似。
1、multiprocessing模块
直接从侧面用subprocesses替换线程使用GIL的方式,由于这一点,multiprocessing模块可以让程序员在给定的机器上充分的利用CPU。在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法,
from multiprocessing import Processdef func(name):print('hello', name)if __name__ == "__main__":p = Process(target=func,args=('zhangyanlin',))p.start()p.join() # 等待进程执行完毕
在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。 如果你真有需要 要共享数据, multiprocessing提供了两种方式。
(1)multiprocessing,Array,Value
数据可以用Value或Array存储在一个共享内存地图里,如下:
from multiprocessing import Array,Value,Processdef func(a,b):a.value = 3.333333333333333for i in range(len(b)):b[i] = -b[i]if __name__ == "__main__":num = Value('d',0.0)arr = Array('i',range(11))c = Process(target=func,args=(num,arr))d= Process(target=func,args=(num,arr))c.start()d.start()c.join()d.join()print(num.value)for i in arr:print(i) 输出:3.1415927[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
创建num和arr时,“d”和“i”参数由Array模块使用的typecodes创建:“d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。
Array(‘i’, range(10))中的‘i’参数:
‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte ‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint ‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double
)multiprocessing,Manager
由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。
from multiprocessing import Process,Manager def f(d,l):d["name"] = "zhangyanlin"d["age"] = 18d["Job"] = "pythoner"l.reverse()if __name__ == "__main__":with Manager() as man:d = man.dict()l = man.list(range(10))p = Process(target=f,args=(d,l))p.start()p.join()print(d)print(l)输出:{0.25: None, 1: '1', '2': 2}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Server process manager比 shared memory 更灵活,因为它可以支持任意的对象类型。另外,一个单独的manager可以通过进程在网络上不同的计算机之间共享,不过他比shared memory要慢。
paramiko模块
paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实。基于SSH用于连接远程服务器并执行相关操作。
一,安装
pip3 install paramiko
二,使用
import paramiko # 创建SSH对象 ssh = paramiko.SSHClient() #key连接方式 #private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') # 允许连接不在know_hosts文件中的主机 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接服务器 # #password='admin_1234' 改成key=private_key 就是key连接方式 ssh.connect(hostname='172.16.94.138', port=22, username='root', password='admin_1234') # 执行命令 stdin, stdout, stderr = ssh.exec_command('sdf') print(stderr.read().decode()) #看获取命令结果 # 执行命令 stdin, stdout, stderr = ssh.exec_command('df') print(stdout.read().decode()) #看获取命令结果 # 获取命令结果 很重要 #stdout 正确输出,#stderr错误输出,stdin正确输入 read(读取 decode()解码 # 关闭连接 ssh.close()
2、进程池(Using a pool of workers)
Pool类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
我们可以用Pool类创建一个进程池, 展开提交的任务给进程池。 例:
#apply from multiprocessing import Pool import timedef f1(i):time.sleep(0.5)print(i)return i + 100if __name__ == "__main__":pool = Pool(5)for i in range(1,31):pool.apply(func=f1,args=(i,))#apply_async def f1(i):time.sleep(0.5)print(i)return i + 100 def f2(arg):print(arg)if __name__ == "__main__":pool = Pool(5)for i in range(1,31):pool.apply_async(func=f1,args=(i,),callback=f2)pool.close()pool.join()
一个进程池对象可以控制工作进程池的哪些工作可以被提交,它支持超时和回调的异步结果,有一个类似map的实现。
- processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
- initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
- maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个心的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
- context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context
注意:Pool对象的方法只可以被创建pool的进程所调用。
New in version 3.2: maxtasksperchild
New in version 3.4: context
进程池的方法
-
apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。
-
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。
-
close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
-
terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。
-
join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。
-
map(func, iterable[, chunksize])¶
-
map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
-
imap(func, iterable[, chunksize])¶
-
imap_unordered(func, iterable[, chunksize])
-
starmap(func, iterable[, chunksize])¶
-
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
python协程
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
event loop是协程执行的控制点, 如果你希望执行协程, 就需要用到它们。
event loop提供了如下的特性:
- 注册、执行、取消延时调用(异步函数)
- 创建用于通信的client和server协议(工具)
- 创建和别的程序通信的子进程和协议(工具)
- 把函数调用送入线程池中
协程示例:
import asyncioasync def cor1():print("COR1 start")await cor2()print("COR1 end")async def cor2():print("COR2")loop = asyncio.get_event_loop() loop.run_until_complete(cor1()) loop.close()
最后三行是重点。
- asyncio.get_event_loop() : asyncio启动默认的event loop
- run_until_complete() : 这个函数是阻塞执行的,知道所有的异步函数执行完成,
- close() : 关闭event loop。
1、greenlet
import greenletdef fun1():print("12")gr2.switch()print("56")gr2.switch()def fun2():print("34")gr1.switch()print("78")gr1 = greenlet.greenlet(fun1) gr2 = greenlet.greenlet(fun2) gr1.switch()
gevent
gevent属于第三方模块需要下载安装包
-
pip3 install --upgrade pip3
- pip3 install gevent
import geventdef fun1():print("www.baidu") # 第一步 gevent.sleep(0)print("end the baidu") # 第三步def fun2():print("www.zhihu") # 第二步 gevent.sleep(0)print("end th zhihu") # 第四步 gevent.joinall([gevent.spawn(fun1),gevent.spawn(fun2), ])
遇到IO操作自动切换
import gevent import requestsdef func(url):print("get: %s"%url)gevent.sleep(0)date =requests.get(url)ret = date.textprint(url,len(ret))gevent.joinall([gevent.spawn(func, '/'),gevent.spawn(func, '/'),gevent.spawn(func, '/'), ])工作中用到协程的地方
转载于:.html
网络编程,进程,线程,协程
守护线程
在python中: 守护线程执行效果是一样的,当非守护线程全部结束,守护线程也会跟着立即结束。import threading,time def run(n):print('task---',n,threading.current_thread())time.sleep(1)print('task done--',n) if __name__ == '__main__':for i in range(5):t=threading.Thread(target=run,args=(i,))t.setDaemon(True) #设置为后台线程或前台线程(默认false),通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用 t.start()print('therd done' ,threading.current_thread())
由上面可知道,1.函数的主流程,就是主线程。其他我们定义的线程,都是子线程
2.守护线程的语句要写在start()前,将线程定义为守护线程后,并开启。
3.非守护线程结束,守护线程也跟着结束。所以我们定义的print("task done-", n)语句未能执行。
thread方法说明
t.start() : 激活线程,
t.getName() : 获取线程的名称
t.setName() : 设置线程的名称
t.name : 获取或设置线程的名称
t.is_alive() : 判断线程是否为激活状态
t.isAlive() :判断线程是否为激活状态
t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
t.isDaemon() : 判断是否为守护线程
t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.run() :线程被cpu调度后自动执行线程对象的run方法
GIL全称Global Interpreter Lock(全局解释锁)
为什么存在GIL
由于物理上得限制,各CPU厂商在核心频率上的比赛已经被多核所取代。为了更有效的利用多核处理器的性能,就出现了多线程的编程方式,而随之带来的就是线程间数据一致性和状态同步的困难。即使在CPU内部的Cache也不例外,为了有效解决多份缓存之间的数据同步时各厂商花费了不少心思,也不可避免的带来了一定的性能损失。
Python为了完全支持多线程编程, 但是python解释器的C语言实现部分在完全并行执行时并不是线程安全的。 于是就有了GIL,解释器被全局解释器锁(GIL)保护着,它确保任何时候都只有一个Python线程执行。 GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势 (比如一个使用了多个线程的计算密集型程序只会在一个单CPU上面运行)。
但是有一点要强调的是GIL只会影响到那些严重依赖CPU的程序(比如计算型的)。 如果你的程序大部分只会涉及到I/O,比如网络交互,那么使用多线程就很合适, 因为它们大部分时间都在等待,在等待过程中,当前线程会释放GIL锁。所以,你完全可以放心的创建几千个Python线程, 现代操作系统运行这么多线程没有任何压力,没啥可担心的。
故:
1.计算密集型:CPU操作密集的,使用多进程
2.IO密集型:使用多线程
IO多路复用
I/O多路复用指:通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
Linux中的 select,poll,epoll 都是IO多路复用的机制。
select
select最早于
1983
年出现在
4.2BSD
中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为
1024
,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
poll
poll在
1986
年诞生于System V Release
3
,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。
epoll
直到Linux2.
6
才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.
6
下性能最好的多路I
/
O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select
/
poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。 Windows Python:提供: select Mac Python:提供: select Linux Python:提供: select、poll、epoll 注意:网络操作、文件操作、终端操作等均属于IO操作,对于windows只支持Socket操作,其他系统支持其他IO操作,但是无法检测 普通文件操作 自动上次读取是否已经变化。
socket
tcp(传输控制协议) 是面向连接的 面向流,提供了高可靠性服务
udp(用户数据报协议) 是无连接的,面向消息. 提供高效率服务
发送数据
tcp数据不能为空,会粘包
udp输入空格(回车),那也不是空消息,udp协议会帮你封装上消息头,不会粘包
SOCK_DGRAM 数据报协议
基于UCP的套接字
udp是无链接的,先启动哪一端都不会报错
udp服务端1 ss = socket() #创建一个服务器的套接字 2 ss.bind() #绑定服务器套接字 3 inf_loop: #服务器无限循环 4 cs = ss.recvfrom()/ss.sendto() # 对话(接收与发送) 5 ss.close() # 关闭服务器套接字
udp客户端cs = socket() # 创建客户套接字 comm_loop: # 通讯循环cs.sendto()/cs.recvfrom() # 对话(发送/接收) cs.close() # 关闭客户套接字
udp套接字简单示例
import socket ip_port=('127.0.0.1',9000) BUFSIZE=1024 udp_server_client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)udp_server_client.bind(ip_port)while True:msg,addr=udp_server_client.recvfrom(BUFSIZE)print(msg,addr)udp_server_client.sendto(msg.upper(),addr)udp服务端
import socket ip_port=('127.0.0.1',9000) BUFSIZE=1024 udp_server_client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)while True:msg=input('>>: ').strip()if not msg:continueudp_server_client.sendto(msg.encode('utf-8'),ip_port)back_msg,addr=udp_server_client.recvfrom(BUFSIZE)print(back_msg.decode('utf-8'),addr) 复制代码udp客户端
由于udp无连接,所以可以同时多个客户端去跟服务端通信
import socket ip_port=('127.0.0.1',8081) udp_server_sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) #买手机 udp_server_sock.bind(ip_port)while True:qq_msg,addr=udp_server_sock.recvfrom(1024)print('来自[%s:%s]的一条消息:\033[1;44m%s\033[0m' %(addr[0],addr[1],qq_msg.decode('utf-8')))back_msg=input('回复消息: ').strip()udp_server_sock.sendto(back_msg.encode('utf-8'),addr)udp服务端
import socket BUFSIZE=1024 udp_client_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)qq_name_dic={'狗哥alex':('127.0.0.1',8081),'瞎驴':('127.0.0.1',8081),'一棵树':('127.0.0.1',8081),'egon':('127.0.0.1',8081), }while True:qq_name=input('请选择聊天对象: ').strip()while True:msg=input('请输入消息,回车发送: ').strip()if msg == 'quit':breakif not msg or not qq_name or qq_name not in qq_name_dic:continueudp_client_socket.sendto(msg.encode('utf-8'),qq_name_dic[qq_name])back_msg,addr=udp_client_socket.recvfrom(BUFSIZE)print('来自[%s:%s]的一条消息:\033[1;44m%s\033[0m' %(addr[0],addr[1],back_msg.decode('utf-8')))udp_client_socket.close()udp客户端1
import socket BUFSIZE=1024 udp_client_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)qq_name_dic={'狗哥alex':('127.0.0.1',8081),'瞎驴':('127.0.0.1',8081),'一棵树':('127.0.0.1',8081),'egon':('127.0.0.1',8081), }while True:qq_name=input('请选择聊天对象: ').strip()while True:msg=input('请输入消息,回车发送: ').strip()if msg == 'quit':breakif not msg or not qq_name or qq_name not in qq_name_dic:continueudp_client_socket.sendto(msg.encode('utf-8'),qq_name_dic[qq_name])back_msg,addr=udp_client_socket.recvfrom(BUFSIZE)print('来自[%s:%s]的一条消息:\033[1;44m%s\033[0m' %(addr[0],addr[1],back_msg.decode('utf-8')))udp_client_socket.close()udp客户端2
基于UDP并发(udp最稳定512字节)
#server import socketserver class MyUDPhandler(socketserver.BaseRequestHandler):def handle(self):print(self.request) #request值如下: udp没有链接,只有发消息时,触发链接,#(b'a', <socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8100)>)self.request[1].sendto(self.request[0].upper(),self.client_address) #发消息给client端,发送data数据的大写形式 if __name__=="__main__":s=socketserver.ThreadingUDPServer(("127.0.0.1",8100),MyUDPhandler)s.serve_forever() #client from socket import * udp_client=socket(AF_INET,SOCK_DGRAM) while True:msg=input(">>:").strip()udp_client.sendto(msg.encode("utf-8"),("127.0.0.1",8100))data,server_addr=udp_client.recvfrom(1024)print(data.decode("utf-8"))#启动服务端 在启动客户端,(启动2个client端) #客户端直接回车(会显示返回数据为空,不会报错),再输入aa(会显示AA,)
进程与线程的历史
我们都知道计算机是由硬件和软件组成的。硬件中的CPU是计算机的核心,它承担计算机的所有任务。 操作系统是运行在硬件之上的软件,是计算机的管理者,它负责资源的管理和分配、任务的调度。 程序是运行在系统上的具有某种功能的软件,比如说浏览器,音乐播放器等。 每次执行程序的时候,都会完成一定的功能,比如说浏览器帮我们打开网页,为了保证其独立性,就需要一个专门的管理和控制执行程序的数据结构——进程控制块。 进程就是一个程序在一个数据集上的一次动态执行过程。 进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
在早期的操作系统里,计算机只有一个核心,进程执行程序的最小单位,任务调度采用时间片轮转的抢占式方式进行进程调度。每个进程都有各自的一块独立的内存,保证进程彼此间的内存地址空间的隔离。 随着计算机技术的发展,进程出现了很多弊端,一是进程的创建、撤销和切换的开销比较大,二是由于对称多处理机(对称多处理机(SymmetricalMulti-Processing)又叫SMP,是指在一个计算机上汇集了一组处理器(多CPU),各CPU之间共享内存子系统以及总线结构)的出现,可以满足多个运行单位,而多进程并行开销过大。 这个时候就引入了线程的概念。 线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合 和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能。 线程没有自己的系统资源,只拥有在运行时必不可少的资源。但线程可以与同属与同一进程的其他线程共享进程所拥有的其他资源。
进程与线程之间的关系
线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。线程可与属于同一进程的其它线程共享进程所拥有的全部资源,但是其本身基本上不拥有系统资源,只拥有一点在运行中必不可少的信息(如程序计数器、一组寄存器和栈)。
进程与线程的区别:
1.进程,内存独立,线程共享同一进程的内存
2.进程是资源的整合,线程是执行单位
3.进程之间不能直接互相访问,线程可以互相通信
4.创建新进程非常消耗系统资源,线程非常轻量,只保存线程需要运行时的必要数据。如上下文,程序堆栈。
5.同一进程里的线程是可以互相控制,父进程是可以子进程。
python线程
Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。
应用场景程序====》至少有一个进程====》至少有一个线程
IO密集型:线程
计算密集型:进程
信号量(允许同一时间几个线程访问公共数据,最大限制是5)
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
from threading import Thread,current_thread,Semaphore #Semaphore限制数据库连接数 import time,random #randomrandom模块用于生成随机数。 sm=Semaphore(5) def work():sm.acquire()print('%s 去吃饭' %current_thread().getName())time.sleep(random.randint(1,3))sm.release() if __name__ == '__main__':for i in range(20):t=Thread(target=work)t.start()
事件锁(Event)
Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()为False时就阻塞当前线程,知道结束阻塞状态
Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。
构造方法:
Event()
实例方法:
is_Set(): 当内置标志为True时返回True。
set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。
clear(): 将标志设为False。
wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。
from threading import Thread,current_thread,Event import time event=Event() def conn_mysql():count=1while not event.is_set():if count>3:raise ConnectionAbortedError('链接失败!')print('%s 等待%s次链接mysql' %(current_thread().getName(),count))event.wait(0.5)count+=1print('%s 正在检查mysql状态' %current_thread().getName()) def check_mysql():print('%s 正在检查mysql状态' %current_thread().getName())time.sleep(1)event.set() if __name__ == '__main__':t1=Thread(target=conn_mysql)t2=Thread(target=check_mysql)check=Thread(target=check_mysql)t1.start()t2.start()check.start()
线程开启的两种方式:
#1.使用替换threading模块提供的Thread from threading import Thread def task():print('is runing.....') if __name__ == '__main__':t=Thread(target=task)t.start()print('主')
#2.自定义类,继承Thread from threading import Thread class MyThread(Thread):def __init__(self,name):super().__init__()self.name=namedef run(self):print('%s is runing.....' %self.name) if __name__ == '__main__':t=MyThread('gongxu')t.start()print('主')
#同一个进程内的多个线程的pid,都是相同的,是该进程的pid from multiprocessing import Process import os def task():print('%s is runing.....') if __name__ == '__main__':t1=Process(target=task)t2=Process(target=task)t1.start()t2.start()print('主',os.getpid())
多线程共享一个进程内的资源
from threading import Thread from multiprocessing import Process n=100 def work():global nn=0 if __name__ == '__main__':t=Thread(target=work)t.start() #同一进程内的多个线程,共享该进程的地址空间 t.join()print('主',n)p=Process(target=work)p.start()p.join() #多个进程地址空间是隔离的print('主',n)
同一进程内开启多线程
#强调:主线程从执行层面上代表了其所在进程的执行过程 from threading import Thread msg_l=[] fromat_l=[] def talk():while True:msg=input('>>>>>:').strip()msg_l.append(msg) def fromat():while True:if msg_l:data=msg_l.pop()fromat_l.append(data.uppper()) def save():while True:if fromat_l:data=fromat_l.pop()with open('db.txt','a') as f:f.write('%s\n' %data) if __name__ == '__main__':t1=Thread(target=talk)t2=Thread(target=fromat)t3=Thread(target=save)t1.start()t2.start()t3.start()
死锁(Lock)
from threading import Thread,Lock import time murexA=Lock() murexB=Lock() class Mythread(Thread):def run(self):self.f1()self.f2()def f1(self):murexA.acquire()print('\33[45m%s 抢到A锁\033[0m' %self.name)murexB.acquire()print('\38[45m%s 抢到B锁\038[0m' % self.name)murexA.release()murexB.release()def f2(self):murexB.acquire()print('\38[45m%s 抢到B锁\038[0m' % self.name)murexA.acquire()print('\33[45m%s 抢到A锁\033[0m' % self.name)murexB.release()murexA.release() if __name__ == '__main__':for i in range(20):t=Mythread()t.start()
递归锁(RLock,说白了就是在一个大锁中还要再包含子锁,在同一线程内,程序不会阻塞)
from threading import Thread,RLock import time murex=RLock() class Mythread(Thread):def run(self):self.f1()self.f2()def f1(self):murex.acquire()print('\33[45m%s 抢到A锁\033[0m' %self.name)murex.acquire()print('\38[45m%s 抢到B锁\038[0m' % self.name)murex.release()murex.release()def f2(self):murex.acquire()print('\38[45m%s 抢到B锁\038[0m' % self.name)time.sleep(1)murex.acquire()print('\33[45m%s 抢到A锁\033[0m' % self.name) murex.release()murex.release() if __name__ == '__main__':for i in range(20):t=Mythread()t.start()
综上所述,为了避免代码的错误,所以不论是需要多重锁还是不需要,都推荐直接写 rLock = threading.RLock()
Python标准模块--concurrent.futures
concurrent.futures模块是在Python3.2中添加的。根据Python的官方文档,concurrent.futures模块提供给开发者一个执行异步调用的高级接口。concurrent.futures基本上就是在Python的threading和multiprocessing模块之上构建的抽象层,更易于使用。尽管这个抽象层简化了这些模块的使用,但是也降低了很多灵活性,所以如果你需要处理一些定制化的任务,concurrent.futures或许并不适合你。
concurrent.futures包括抽象类Executor,它并不能直接被使用,所以你需要使用它的两个子类:ThreadPoolExecutor或者ProcessPoolExecutor。正如你所猜的,这两个子类分别对应着Python的threading和multiprocessing接口。这两个子类都提供了池,你可以将线程或者进程放入其中。
模块使用
#进程池 from concurrent.futures import ProcessPoolExecutor import os,time,random def work(n):print('%s is runing......' %os.getpid())time.sleep(random.randint(1,3))return n**2 if __name__ == '__main__':p=ProcessPoolExecutor()objs=[]for i in range(10):obj=p.submit(work,i)objs.append(obj)p.shutdown()for obj in objs:print(obj.result())
#线程池 from concurrent.futures import ThreadPoolExecutor import os,time,random def work(n):print('%s is runing......' %os.getpid())time.sleep(random.randint(1,3))return n**2 if __name__ == '__main__':p=ThreadPoolExecutor()objs=[]for i in range(10):obj=p.submit(work,i)objs.append(obj)p.shutdown()for obj in objs:print(obj.result())
1、threading模块
threading 模块建立在 _thread 模块之上。thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。
import threading import timedef worker(num):"""thread worker function:return:"""time.sleep(1)print("The num is %d" % num)returnfor i in range(20):t = threading.Thread(target=worker,args=(i,),name=“t.%d” % i)t.start()
上述代码创建了20个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。
Thread方法说明
t.start() : 激活线程,
t.getName() : 获取线程的名称
t.setName() : 设置线程的名称
t.name : 获取或设置线程的名称
t.is_alive() : 判断线程是否为激活状态
t.isAlive() :判断线程是否为激活状态
t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之前才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
t.isDaemon() : 判断是否为守护线程
t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.run() :线程被cpu调度后自动执行线程对象的run方法
2、线程锁threading.RLock和threading.Lock
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。为了保证数据的准确性,引入了锁的概念。所以,可能出现如下问题:
例:假设列表A的所有元素就为0,当一个线程从前向后打印列表的所有元素,另外一个线程则从后向前修改列表的元素为1,那么输出的时候,列表的元素就会一部分为0,一部分为1,这就导致了数据的不一致。锁的出现解决了这个问题。
import threading import timeglobals_num = 0lock = threading.RLock()def Func():lock.acquire() # 获得锁 global globals_numglobals_num += 1time.sleep(1)print(globals_num)lock.release() # 释放锁 for i in range(10):t = threading.Thread(target=Func)t.start()
threading.RLock和threading.Lock 的区别
RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。
import threading lock = threading.Lock() #Lock对象 lock.acquire() lock.acquire() #产生了死琐。 lock.release() lock.release() import threading rLock = threading.RLock() #RLock对象 rLock.acquire() rLock.acquire() #在同一线程内,程序不会堵塞。 rLock.release() rLock.release()
threading.Condition
一个condition变量总是与某些类型的锁相联系,这个可以使用默认的情况或创建一个,当几个condition变量必须共享和同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪。
condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 acquire() 和 release() 会调用与锁相关联的相应的方法。
其他和锁关联的方法必须被调用,wait()方法会释放锁,当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,
Condition类实现了一个conditon变量。 这个conditiaon变量允许一个或多个线程等待,直到他们被另一个线程通知。 如果lock参数,被给定一个非空的值,,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。
- wait(timeout=None) : 等待通知,或者等到设定的超时时间。当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError 异常。 wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前 会一直阻塞。wait() 还可以指定一个超时时间。
如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。
注意: notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait() 调用。除非线程调用notify()和notify_all()之后放弃了锁的所有权。
在典型的设计风格里,利用condition变量用锁去通许访问一些共享状态,线程在获取到它想得到的状态前,会反复调用wait()。修改状态的线程在他们状态改变时调用 notify() or notify_all(),用这种方式,线程会尽可能的获取到想要的一个等待者状态。
例子: 生产者-消费者模型
import threading import time def consumer(cond):with cond:print("consumer before wait")cond.wait()print("consumer after wait")def producer(cond):with cond:print("producer before notifyAll")cond.notifyAll()print("producer after notifyAll")condition = threading.Condition() c1 = threading.Thread(name="c1", target=consumer, args=(condition,)) c2 = threading.Thread(name="c2", target=consumer, args=(condition,))p = threading.Thread(name="p", target=producer, args=(condition,))c1.start() time.sleep(2) c2.start() time.sleep(2) p.start()
生产者消费者模型
在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产 生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商 品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模型。结构图如下:
生产者消费者模型的优点:
1、解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化, 可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
举个例子,我们去邮局投递信件,如果不使用邮筒(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他(光凭身上穿的制服,万一有人假冒,就惨了)。这就产生和你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮筒相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。
2、支持并发
由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区作为桥梁连接,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区了拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。
接上面的例子,如果我们不使用邮筒,我们就得在邮局等邮递员,直到他回来,我们把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞),或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。
3、支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。 等生产者的制造速度慢下来,消费者再慢慢处理掉。
为了充分复用,我们再拿寄信的例子来说事。假设邮递员一次只能带走1000封信。万一某次碰上情人节(也可能是圣诞节)送贺卡,需要寄出去的信超过1000封,这时 候邮筒这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮筒中,等下次过来 时再拿走。
在一个程序中实现又有生产者又有消费者,生产者不断生产,消费者不断消费,达到并行数据安全完整交互的目的。所以会有消息队列的关键字产生,队列是典型的生产者消费者模型
多线程中的生产者和消费者模型:
生产者和消费者可以用多线程实现,它们通过Queue队列进行通信。
import queue import threading import time q = queue.Queue() # 生成者(client) def productor(arg):# 序号加包子,将做好的包子放到篮子(队列)里q.put(str(arg) + '包子')print("product",arg,"包子") """ join() 保持阻塞状态,直到处理了队列中的所有项目为止。在将一个项目添加到该队列时,未完成的任务的总数就会增加。 当使用者线程调用 task_done() 以表示检索了该项目、并完成了所有的工作时,那么未完成的任务的总数就会减少。当未完成的任务的总数减少到零时,join()就会结束阻塞状态。 """q.join() #用来判断队列中是否为空,如果为空,则继续执行下面的代码。否则下面的代码阻塞print( "包子被吃完") # 创建30个包子 for i in range(4):t = threading.Thread(target=productor, args=(i,))t.start() # ============================================================== # # 消费者(server) def consumer(arg):while True:# arg(0-3)吃包子得人, q.get()从篮子(队列)里取包子,包子有序号print(arg, "eat",q.get())time.sleep(1.5)q.task_done() #用来判断队列中是否为空 # 三个线程一起吃包子 for j in range(2):t = threading.Thread(target=consumer, args=(j,))t.start()
queue模块
如前所述,当多个线程需要共享数据或者资源的时候,可能会使得线程的使用变得复杂。线程模块提供了许多同步原语,包括信号量、事件和锁。当这些选项存在时,最佳实践是转而关注于使用队列。相比较而言,队列更容易处理,并且可以使得线程编程更加安全,因为它们能够有效地传送单个线程对资源的所有访问,并支持更加清晰的、可读性更强的设计模式。
那什么是队列呢?
举例来说,我们去加油站加油。此时车就会自动排成一排形成这一排就是队列。
如果此时加油站就一个加油台(相当于线程),那么所有的车(相当于数据)就只能排着,同时人(相当于线程)也要跟随车在这儿等着。这种队列会形成线程的同步阻塞。
那么又如我们去人员爆满的高端酒店住宿,我们到了酒店门口车太多,就得排队。但是因为酒店有专门的3位停靠司机(处理队列数据的多个线程),所以我们(需要数据的线程)可以直接下车(数据、资源)并去酒店办理业务,而让车一直在那儿排着就行,由3位司机依次将队伍的车停完。停完了车然后再把钥匙交给我们,我们需要车的时候再去取就行。这种队列就是我们python中的队列。
特点:1.队列的数据可以由指定的多个线程分别同步处理
2.发送数据进队列的线程不用等待数据被执行。
3.线程安全
queue的作用:
1.解耦合(解除线程间的关系),
2.提高效率
queue队列模式:
class queue.Queue(maxsize=0)
-
构造一个FIFO(first in first out 先进先出)队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
- class queue.LifoQueue(maxsize=0)
-
构造一个LIFO(先进后出)队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
出现于版本2.6.
- class queue.PriorityQueue(maxsize=0)
-
构造一个优先队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
下面以先进先出队列的为例(另外两个都一样):
在 Python 2中,queue模块被重命名为 Queue 。其他都一样。请猛戳:queue
import queueq = queue.Queue(maxsize=0) # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。 q.join() # 等到队列为kong的时候,在执行别的操作 q.qsize() # 返回队列的大小 (不可靠) q.empty() # 当队列为空的时候,返回True 否则返回False (不可靠) q.full() # 当队列满的时候,返回True,否则返回False (不可靠) q.put(item, block=True, timeout=None) # 将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置, 为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,如果队列无法给出放入item的位置,则引发 queue.Full 异常 q.get(block=True, timeout=None) # 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞, 若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。 q.put_nowait(item) # 等效于 put(item,block=False) q.get_nowait() # 等效于 get(item,block=False)
看下面的例子如果队列里没有值了怎么办?他会等待直到有数据为止:
import queue q = queue.Queue() # 调用队列生成对象 q.put(1) #存放第一个值到队列 q.put(2) #存放第二个值到队列 a = q.get() # 获取队列的第一个值 print('get frist one:%s' % a) b = q.get() # 获取队列的第二个值 print('get second one:%s' % b) c = q.get() # 获取队列的第三个值 #因为此时没有第三个值所以就会形成阻塞,一直等到put()进去第三个值 print('get third one:%s' % c) #结果: ''' get frist one:1 get second one:2 #这里没有获取到值堵塞住,一直在等待着值进来~ '''
- 如果不想让他等待,不管是否队列里都取数据,可以使用
get_nowait()
,但是如果队列中没有数据就会报错!
import queue q = queue.Queue() # 调用队列生成对象 q.put(1) #存放第一个值到队列 q.put(2) #存放第二个值到队列try:a = q.get() # 获取队列的第一个值print('get frist one:%s' % a)b = q.get() # 获取队列的第二个值print('get second one:%s' % b)c = q.get_nowait() # 获取队列的第三个值,使用:get_nowait() 不堵塞! #等同于q.get(block=False)print('get third one:%s' % c) except queue.Empty as q_error:print('The Queue is empty!')
- 如果队列为空的时候可以通过异常处理进行捕获:
import queue q = queue.Queue() # 调用队列生成对象 q.put(1) #存放第一个值到队列 q.put(2) #存放第二个值到队列 try:a = q.get() # 获取队列的第一个值print('get frist one:%s' % a)b = q.get() # 获取队列的第二个值print('get second one:%s' % b)c = q.get_nowait() # 获取队列的第三个值,使用:get_nowait() 不堵塞!print('get third one:%s' % c) except queue.Empty as q_error:print('The Queue is empty!')
- 同样的如果队列长度为2,如果队列满了之后,同样他也是等待,直到有位置才会继续如下代码:
import queue q = queue.Queue(2) # 调用队列生成对象,2:设置队列长度为2 q.put(1) # 存放第一个值到队列 print('put value 1 done') q.put(2) # 存放第二个值到队列 print('put vlaue 2 done') q.put(3) # 存放第三个值到队列 print('put value 3 done') #结果: ''' put value 1 done put vlaue 2 done #这里会一直等待~ '''
- 同样如果存放数值的时候如果不想让他等待,使用
put_nowait()
但是队列无法存放后会报错!
import queue q = queue.Queue(2) # 调用队列生成对象,2:设置队列长度为2 q.put(1) # 存放第一个值到队列 print('put value 1 done') q.put(2) # 存放第二个值到队列 print('put vlaue 2 done') # q.put(33, block=False) # 不堵塞 # q.put(33, block=False, timeout=2) # 不堵塞,等待2秒 q.put_nowait(3) # 存放第三个值到队列,使用:put_nowait() 不堵塞! #等同于q.put(block=False) print('put value 3 done')
自己做个线程池
# 简单往队列中传输线程数 import threading import time import queueclass Threadingpool():def __init__(self,max_num = 10):self.queue = queue.Queue(max_num)for i in range(max_num):self.queue.put(threading.Thread)def getthreading(self):return self.queue.get()def addthreading(self):self.queue.put(threading.Thread)def func(p,i):time.sleep(1)print(i)p.addthreading()if __name__ == "__main__":p = Threadingpool()for i in range(20):thread = p.getthreading()t = thread(target = func, args = (p,i))t.start()方法一
#往队列中无限添加任务 import queue import threading import contextlib import timeStopEvent = object()class ThreadPool(object):def __init__(self, max_num):self.q = queue.Queue()self.max_num = max_numself.terminal = Falseself.generate_list = []self.free_list = []def run(self, func, args, callback=None):"""线程池执行一个任务:param func: 任务函数:param args: 任务函数所需参数:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数):return: 如果线程池已经终止,则返回True否则None"""if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:self.generate_thread()w = (func, args, callback,)self.q.put(w)def generate_thread(self):"""创建一个线程"""t = threading.Thread(target=self.call)t.start()def call(self):"""循环去获取任务函数并执行任务函数"""current_thread = threading.currentThreadself.generate_list.append(current_thread)event = self.q.get() # 获取线程while event != StopEvent: # 判断获取的线程数不等于全局变量 func, arguments, callback = event # 拆分元祖,获得执行函数,参数,回调函数try:result = func(*arguments) # 执行函数status = Trueexcept Exception as e: # 函数执行失败status = Falseresult = eif callback is not None:try:callback(status, result)except Exception as e:pass# self.free_list.append(current_thread)# event = self.q.get()# self.free_list.remove(current_thread) with self.work_state():event = self.q.get()else:self.generate_list.remove(current_thread)def close(self):"""关闭线程,给传输全局非元祖的变量来进行关闭:return:"""for i in range(len(self.generate_list)):self.q.put(StopEvent)def terminate(self):"""突然关闭线程:return:"""self.terminal = Truewhile self.generate_list:self.q.put(StopEvent)self.q.empty()@contextlib.contextmanagerdef work_state(self):self.free_list.append(threading.currentThread)try:yieldfinally:self.free_list.remove(threading.currentThread)def work(i):print(i)return i +1 # 返回给回调函数def callback(ret):print(ret)pool = ThreadPool(10) for item in range(50):pool.run(func=work, args=(item,),callback=callback)pool.terminate() # pool.close()方法二
python进程
multiprocessing模块在Python2.6中引入。最初的multiprocessing是由Jesse Noller和Richard Oudkerk在PEP 371中定义。就像你可以在threading模块中使用多个线程一样,multiprocessing模块允许你使用多个进程。当你使用多个进程时,你可以避免GIL锁,并充分利用机器的多处理器。multiprocessing是python的多进程管理包,和threading.Thread类似。
1、multiprocessing模块
直接从侧面用subprocesses替换线程使用GIL的方式,由于这一点,multiprocessing模块可以让程序员在给定的机器上充分的利用CPU。在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法,
from multiprocessing import Processdef func(name):print('hello', name)if __name__ == "__main__":p = Process(target=func,args=('zhangyanlin',))p.start()p.join() # 等待进程执行完毕
在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。 如果你真有需要 要共享数据, multiprocessing提供了两种方式。
(1)multiprocessing,Array,Value
数据可以用Value或Array存储在一个共享内存地图里,如下:
from multiprocessing import Array,Value,Processdef func(a,b):a.value = 3.333333333333333for i in range(len(b)):b[i] = -b[i]if __name__ == "__main__":num = Value('d',0.0)arr = Array('i',range(11))c = Process(target=func,args=(num,arr))d= Process(target=func,args=(num,arr))c.start()d.start()c.join()d.join()print(num.value)for i in arr:print(i) 输出:3.1415927[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
创建num和arr时,“d”和“i”参数由Array模块使用的typecodes创建:“d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。
Array(‘i’, range(10))中的‘i’参数:
‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte ‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint ‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double
)multiprocessing,Manager
由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。
from multiprocessing import Process,Manager def f(d,l):d["name"] = "zhangyanlin"d["age"] = 18d["Job"] = "pythoner"l.reverse()if __name__ == "__main__":with Manager() as man:d = man.dict()l = man.list(range(10))p = Process(target=f,args=(d,l))p.start()p.join()print(d)print(l)输出:{0.25: None, 1: '1', '2': 2}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Server process manager比 shared memory 更灵活,因为它可以支持任意的对象类型。另外,一个单独的manager可以通过进程在网络上不同的计算机之间共享,不过他比shared memory要慢。
paramiko模块
paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实。基于SSH用于连接远程服务器并执行相关操作。
一,安装
pip3 install paramiko
二,使用
import paramiko # 创建SSH对象 ssh = paramiko.SSHClient() #key连接方式 #private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') # 允许连接不在know_hosts文件中的主机 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接服务器 # #password='admin_1234' 改成key=private_key 就是key连接方式 ssh.connect(hostname='172.16.94.138', port=22, username='root', password='admin_1234') # 执行命令 stdin, stdout, stderr = ssh.exec_command('sdf') print(stderr.read().decode()) #看获取命令结果 # 执行命令 stdin, stdout, stderr = ssh.exec_command('df') print(stdout.read().decode()) #看获取命令结果 # 获取命令结果 很重要 #stdout 正确输出,#stderr错误输出,stdin正确输入 read(读取 decode()解码 # 关闭连接 ssh.close()
2、进程池(Using a pool of workers)
Pool类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
我们可以用Pool类创建一个进程池, 展开提交的任务给进程池。 例:
#apply from multiprocessing import Pool import timedef f1(i):time.sleep(0.5)print(i)return i + 100if __name__ == "__main__":pool = Pool(5)for i in range(1,31):pool.apply(func=f1,args=(i,))#apply_async def f1(i):time.sleep(0.5)print(i)return i + 100 def f2(arg):print(arg)if __name__ == "__main__":pool = Pool(5)for i in range(1,31):pool.apply_async(func=f1,args=(i,),callback=f2)pool.close()pool.join()
一个进程池对象可以控制工作进程池的哪些工作可以被提交,它支持超时和回调的异步结果,有一个类似map的实现。
- processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
- initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
- maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个心的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
- context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context
注意:Pool对象的方法只可以被创建pool的进程所调用。
New in version 3.2: maxtasksperchild
New in version 3.4: context
进程池的方法
-
apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。
-
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。
-
close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
-
terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。
-
join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。
-
map(func, iterable[, chunksize])¶
-
map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
-
imap(func, iterable[, chunksize])¶
-
imap_unordered(func, iterable[, chunksize])
-
starmap(func, iterable[, chunksize])¶
-
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
python协程
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
event loop是协程执行的控制点, 如果你希望执行协程, 就需要用到它们。
event loop提供了如下的特性:
- 注册、执行、取消延时调用(异步函数)
- 创建用于通信的client和server协议(工具)
- 创建和别的程序通信的子进程和协议(工具)
- 把函数调用送入线程池中
协程示例:
import asyncioasync def cor1():print("COR1 start")await cor2()print("COR1 end")async def cor2():print("COR2")loop = asyncio.get_event_loop() loop.run_until_complete(cor1()) loop.close()
最后三行是重点。
- asyncio.get_event_loop() : asyncio启动默认的event loop
- run_until_complete() : 这个函数是阻塞执行的,知道所有的异步函数执行完成,
- close() : 关闭event loop。
1、greenlet
import greenletdef fun1():print("12")gr2.switch()print("56")gr2.switch()def fun2():print("34")gr1.switch()print("78")gr1 = greenlet.greenlet(fun1) gr2 = greenlet.greenlet(fun2) gr1.switch()
gevent
gevent属于第三方模块需要下载安装包
-
pip3 install --upgrade pip3
- pip3 install gevent
import geventdef fun1():print("www.baidu") # 第一步 gevent.sleep(0)print("end the baidu") # 第三步def fun2():print("www.zhihu") # 第二步 gevent.sleep(0)print("end th zhihu") # 第四步 gevent.joinall([gevent.spawn(fun1),gevent.spawn(fun2), ])
遇到IO操作自动切换
import gevent import requestsdef func(url):print("get: %s"%url)gevent.sleep(0)date =requests.get(url)ret = date.textprint(url,len(ret))gevent.joinall([gevent.spawn(func, '/'),gevent.spawn(func, '/'),gevent.spawn(func, '/'), ])工作中用到协程的地方
转载于:.html