python线程(二)代码部分

2019-04-25 06:58:49来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

使用threading创建线程:

from threading import Thread

def work(name):
    print(f"我是线程{name}")

if __name__ == "__main__":
    for i in range(3):
        t = Thread(target=work, args=(("aaa"+str(i)),))
        t.start()  # 开启线程
    print("主线程")

# 打印内容如下
我是线程aaa0
我是线程aaa1
我是线程aaa2
主线程

由上面的打印内容我们可以看出,在执行完所有线程后才执行的主线程print。如果是多进程的话会先执行主进程中的print然后才会执行子进程的print。主要是因为开启进程相比于开启线程更加耗费时间。

第二种开启线程的方式:

from threading import Thread

class test(Thread):  # 使用继承Thread的方式
    def __init__(self,name):
        # 如果要给对象封装属性,必须先调用父类
        super().__init__()
        self.name = name
    def run(self):  # 必须要有run类,因为start要调用
        print(f"我是线程:{self.name}")

if __name__ == "__main__":
    for i in range(3):
        t = test(("aaa"+str(i))) # 创建线程对象
        t.start()  # 开启线程
    print("主线程")

# 打印内容如下
我是线程:aaa0
我是线程:aaa1
我是线程:aaa2
主线程

对比多进程和多线程的效率:

from threading import Thread
from multiprocessing import Process
import time

def thread_work(name):
    print(f"{name}")
def process_work(name):
    print(f"{name}")

if __name__ == "__main__":
    pro = []
    start = time.time()
    for i in range(3):
        p = Process(target=process_work,args=(("进程-"+str(i)),))
        p.start()
        pro.append(p)
    for i in pro:
        i.join()
    end = time.time()
    print("进程运行了:%s" %(end - start))
    thread_l = []
    start = time.time()
    for i in range(3):
        t = Thread(target=process_work, args=(("线程-" + str(i)),))
        t.start()
        thread_l.append(t)
    for i in thread_l:
        i.join()
    end = time.time()
    print("进程运行了:%s" % (end - start))

# 打印内如下:
进程-1
进程-0
进程-2
进程运行了:0.15600895881652832
线程-0
线程-1
线程-2
进程运行了:0.0030002593994140625

我们可以从时间上看出,线程的效率是远远高于进程的。

守护线程

主线程会等待所有非守护线程执行完毕后,才结束主线程。主进程是进程内的代码结束后就结束主进程。

对比守护进程,代码执行完毕后立即关闭守护进程,因为在主进程看来代码执行完毕,主进程结束了,所以守护进程在代码结束后就被结束了。

守护线程 会等待主线程的结束而结束,这是因为如果主线程结束意味着程序即将退出,所以主线程会一直等着所有非守护线程结束,回收资源然后退出程序,所以当所有非守护线程结束后,守护线程结束,然后主线程回收资源后结束程序。

下面对比守护进程和守护线程的示例:

先来看守护进程:

from multiprocessing import Process

def process_work(name):
    print(f"{name}")

if __name__ == "__main__":
    p = Process(target=process_work,args=("守护进程"))
    p.daemon=True
    p.start()
    print("主进程")

# 打印内容如下
主进程

只打印了主进程,也就是说守护进程还没来得及被执行程序就结束了。

再来看守护线程:

from threading import Thread

def thread_work(name):
    print(name)

if __name__ == "__main__":
    t = Thread(target=thread_work,args=("守护线程",))
    t.daemo=True
    t.start()
    print("\n主线程")

# 打印内容如下
守护线程
主线程

也许你会说是由于线程太快了,所以才执行了守护线程。下面我们在线程中阻塞一段时间,在来看看会发生什么效果。

from threading import Thread
import time
def thread_work(name):
    time.sleep(3)  # 阻塞3秒
    print(name)

if __name__ == "__main__":
    t = Thread(target=thread_work,args=("守护线程",))
    t.daemo=True
    t.start()
    print("\n主线程")

# 打印内容如下
主线程
守护线程

守护线程还是被执行了,如果是守护进程,守护进程里的代码是不会被执行的。

线程锁

from threading import Thread
import time

def work():
    global n
    temp = n
    time.sleep(0.1)  # 由于线程太快了,所以这里停顿下
    n = temp -1

if __name__ == "__main__":
    n = 100
    t_l = []
    for i in range(100):
        t = Thread(target=work,args=())
        t.start()
        t_l.append(t)
    for i in t_l:
        i.join()
    print(n)

# 打印内容如下
99

在我们看来其实值应该是0的但却是99,就因为短暂停了0.1秒导致结果发生了变化。而我们这0.1秒的停留是模拟网络延迟或者进程调度等原因。造成了数据的结果的错乱。这个时候我们就需要线程锁来保证数据的安全性。

下面我们就通过给程序加锁,来保证数据的安全性:

from threading import Thread,Lock
import time

def work(lock):
    lock.acquire()  # 加锁
    global n
    temp = n
    time.sleep(0.1)  # 由于线程太快了,所以这里停顿下
    n = temp -1
    lock.release()  # 解锁

if __name__ == "__main__":
    n = 100
    t_l = []
    lock = Lock()  # 得到一把锁对象
    for i in range(100):
        t = Thread(target=work,args=(lock,))
        t.start()
        t_l.append(t)
    for i in t_l:
        i.join()
    print(n)

# 打印内容如下
0

我们会发现程序和上一个示例的运行效率上有着很大的差别。明显加锁后程序的运行效率降低了,我们管这种锁叫做线程同步锁,使原本并行的程序编程了串行所以程序的效率会慢很多。

死锁与递归锁:

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

queue队列 :使用import queue,用法与进程Queue一样

class queue.Queue(maxsize=0) #先进先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

# 打印内容如下
first
second
third

class queue.LifoQueue(maxsize=0) 

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

# 打印内容如下
third
second
first

class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())

# 打印内容如下
(10, 'b')
(20, 'a')
(30, 'c')

Python标准模块--concurrent.futures

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
基本方法
submit(fn, *args, **kwargs)       异步提交任务
map(func, *iterables, timeout=None, chunksize=1) 
shutdown(wait=True)   等待线程执行完毕
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
result(timeout=None)   取得结果
add_done_callback(fn)  回调函数

done() 判断某一个线程是否完成

cancle() 取消某个任务
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# 由于线程池和进程池的用法基本一样,所以只演示进程池的示例
import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':
    p_pool=ProcessPoolExecutor(max_workers=3)

    p_list=[]
    for i in range(3):
        p = p_pool.submit(task,i)
        p_list.append(p)
    p_pool.shutdown(True)
    for i in p_list:
        print(i.result(),end=" ")

# 打印内容如下
10012 is runing
6064 is runing
7308 is runing
0 1 4

使用map方法重新实现上面的功能

from concurrent.futures import ThreadPoolExecutor
import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2
if __name__ == '__main__':
    executor=ThreadPoolExecutor(max_workers=3)
    executor.map(task,range(3)) #map取代了for+submit

# 打印内容如下
9988 is runing
9988 is runing
9988 is runing

关于回调函数

from concurrent.futures import ProcessPoolExecutor
import requests
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)

if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.openstack.org',
        'http://www.sina.com.cn/'
    ]
    p=ProcessPoolExecutor(3)
    for url in urls:
        # parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
        p.submit(get_page,url).add_done_callback(parse_page)

 

 

-------------------------------------------------------- concurrent.futures待续 ---------------------------------------------------------

 

 

 

 

 

 

 

 

 

 

 

 

 

 


原文链接:https://www.cnblogs.com/caesar-id/p/10764710.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:python的基本数据类型(一)

下一篇:Windows安装python3.x后,pip list警告!DEPRECATION: The defau