python线程

        Python中提供了threading模块来对多线程的操作,

1. 多线程实例

        线程是应用程序中工作的最小单元。

        多线程是现实有两种方式:

方法一

        将要执行的方法作为参数传给Thread的构造方法(和多进程类似)

1
t = threading.Thread(target=action, args=(i,))

方法二

        从Thread继承,并重写run()

1
2
3
4
5
6
7
8
9
class MyThread(threading.Thread):
def __init__(self, arg):
super(MyThread, self).__init__()#注意:一定要显式的调用父类的初始化函数。
self.arg = arg
def run(self):#定义每个线程要运行的函数
time.sleep(5)
with open('{0}.txt'.format(self.arg), 'wb') as f:
f.write(str(self.arg))
print 'the arg is:%s\r' % self.arg

        看源码:

1
2
3
4
5
6
P = threading.Thread
p.start() _start_new_thread(self.__bootstrap, ()) self.__bootstrap_inner() 
self.run()
try:
if self.__target:
self.__target(*self.__args, **self.__kwargs)

        所以如果重写了run,就直接调用run的函数了,如果run没有重新,就调用target函数。

2. 线程锁

        通过threading.Lock()来创建锁,函数在执行的只有先要获得锁,左后执行完以后要释放锁:

1
2
3
with lock:
lock.acquire()
lock.release()

        实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def worker(name, lock):
with lock:
print("start {0}".format(name))
time.sleep(5)
print("end {0}".format(name))
if __name__ == "__main__":
lock = threading.Lock()
t1 = threading.Thread(target=worker, args=("worker1", lock))
t2 = threading.Thread(target=worker, args=("worker2", lock))
t1.start()
t2.start()

3. 线程共享变量

        多线程和多进程不同之处在于多线程本身就是可以和父进程共享内存的,这也是为什么其中一个线程挂掉以后,为什么其他线程也会死掉的道理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
l = list()
l += range(1, 10)
def worker():
l.append("ling")
l.append("shang")
l.append("hello")
if __name__ == "__main__":
t = threading.Thread(target=worker)
t.start()
print(l)

4. 线程池

        通过传入一个参数组来实现多线程,并且它的多线程是有序的,顺序与参数组中的参数顺序保持一致。

        安装包:

1
pip install threadpool

        调用格式:

1
2
3
4
5
from threadpool import *
pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threadpool
def hello(m, n, o):
""""""
print "m = %s, n = %s, o = %s" % (m, n, o)
if __name__ == '__main__':
# 方法1
lst_vars_1 = ['1', '2', '3']
lst_vars_2 = ['4', '5', '6']
func_var = [(lst_vars_1, None), (lst_vars_2, None)]
# 方法2
dict_vars_1 = {'m': '1', 'n': '2', 'o': '3'}
dict_vars_2 = {'m': '4', 'n': '5', 'o': '6'}
func_var = [(None, dict_vars_1), (None, dict_vars_2)]
pool = threadpool.ThreadPool(2)
requests = threadpool.makeRequests(hello, func_var)
[pool.putRequest(req) for req in requests]
pool.wait()

消息列队

“        ”消息队列”是在消息的传输过程中保存消息的容器。

        消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不通的进程。生产者往管道中写消息,消费者从管道中读消息。

        操作系统提供了很多机制来实现进程间的通信,multiprocessing模块就提供了Queue和Pipe两种方法来实现。

        使用multiprocessing里面的Queue来实现消息队列

1
2
3
4
from multiprocessing import Queue
q = Queue
q.put(data)
data = q.get(data)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Time : 2017/6/6 17:36
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()

        通过Mutiprocess里面的Pipe来实现消息队列:

  • Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
  • send和recv方法分别是发送和接受消息的方法。close方法表示关闭管道,当消息接受结束以后,关闭管道。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Time : 2017/6/6 16:32
import multiprocessing
import time
def proc1(pipe):
for i in xrange(5):
print "send: %s" %(i)
pipe.send(i)
# print(dir(pipe))
time.sleep(1)
def proc2(pipe):
n = 5
while n:
print "proc2 rev:", pipe.recv()
time.sleep(1)
n -= 1
if __name__ == "__main__":
pipe = multiprocessing.Pipe(duplex=False)
print(type(pipe))
print(pipe)
p1 = multiprocessing.Process(target=proc1, args=(pipe[1],))
p2 = multiprocessing.Process(target=proc2, args=(pipe[0],))
p1.start()
p2.start()
p1.join()
p2.join()
pipe[0].close()
pipe[1].close()

Queue模块

        Python提供了Queue模块来专门实现消息队列 Queue对象

        Queue对象实现一个fifo队列(其他的还有lifo、priority队列,这里不再介绍)。queue只有maxsize一个构造参数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:

  • Queue.qsize():返回消息队列的当前空间。返回的值不一定可靠。
  • Queue.empty():判断消息队列是否为空,返回True或False。同样不可靠。
  • Queue.full():类似上边,判断消息队列是否满
  • Queue.put(item, block=True, timeout=None):往消息队列中存放消息。block可以控制是否阻塞,timeout指定阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。
  • Queue.put_nowait(item):相当于put(item, False).
  • Queue.get(block=True, timeout=None):获取一个消息,其他同put。

        以下两个函数用来判断消息对应的任务是否完成。

  • Queue.task_done():接受消息的线程通过调用这个函数来说明消息对应的任务已完成。
  • Queue.join(): 实际上意味着等到队列为空,再执行别的操作

        例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import random,threading,time
from Queue import Queue
#Producer thread
class Producer(threading.Thread):
def __init__(self, t_name, queue):
threading.Thread.__init__(self,name=t_name)
self.data=queue
def run(self):
for i in range(10): #随机产生10个数字 ,可以修改为任意大小
# randomnum=random.randint(1,99)
print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), i)
self.data.put(i) #将数据依次存入队列
# time.sleep(1)
print "%s: %s finished!" %(time.ctime(), self.getName())
#Consumer thread
class Consumer_even(threading.Thread):
def __init__(self,t_name,queue):
threading.Thread.__init__(self,name=t_name)
self.data=queue
def run(self):
while 1:
try:
val_even = self.data.get(1,5) #get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒
if val_even%2==0:
print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even)
time.sleep(2)
else:
self.data.put(val_even)
time.sleep(2)
except: #等待输入,超过5秒 就报异常
print "%s: %s finished!" %(time.ctime(),self.getName())
break
class Consumer_odd(threading.Thread):
def __init__(self,t_name,queue):
threading.Thread.__init__(self, name=t_name)
self.data=queue
def run(self):
while 1:
try:
val_odd = self.data.get(1,5)
if val_odd%2!=0:
print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)
time.sleep(2)
else:
self.data.put(val_odd)
time.sleep(2)
except:
print "%s: %s finished!" % (time.ctime(), self.getName())
break
#Main thread
def main():
queue = Queue()
producer = Producer('Pro.', queue)
consumer_even = Consumer_even('Con_even.', queue)
consumer_odd = Consumer_odd('Con_odd.',queue)
producer.start()
consumer_even.start()
consumer_odd.start()
producer.join()
consumer_even.join()
consumer_odd.join()
print 'All threads terminate!'
if __name__ == '__main__':
main()

Celery异步分布式

什么是celery

        Celery是一个python开发的异步分布式任务调度模块。

        Celery本身并不提供消息服务,使用第三方服务,也就是borker来传递任务,目前支持rebbimq,redis, 数据库等。

        这里使用redis

        连接url的格式为:

1
redis://:password@hostname:port/db_number

        例如:

1
BROKER_URL = 'redis://localhost:6379/0'

images

        安装celery

1
2
pip install celery
pip install redis

        在服务器上安装redis服务器,并启动redis

        第一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
[root@localhost celery]# cat ling.py
#/usr/bin/env python
#-*- coding:utf-8 -*-
from celery import Celery
broker="redis://192.168.48.131:6379/5"
backend="redis://192.168.48.131:6379/6"
app = Celery("ling", broker=broker, backend=backend)
@app.task
def add(x, y):
return x+y

        启动worker

1
#celery -A ling worker -l info

        生产者

1
2
3
4
5
6
form ling import add
re = add.delay(10, 20)
print(re.result) #获取结果
print(re.ready) #是否处理
print(re.get(timeout=1)) #获取结果
print(re.status) #是否处理

celery模块调用

        既然crlery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢,celery可以支持多台不通的计算机执行不同的任务或者相同的任务。

        如果说celery的分布式应用的是话,我觉得就要提到celery的消息路由机制,就是要提一下AMQP协议。具体的可以查看AMQP的文档。简单是说就是可以有多个消息队列(Message Queue),不同的消息可以指定发送给不同的Message Queue,而这是通过Exchange来实现的。发送消息到Message Queue中时,可以指定routiing key,Exchange通过routing key来把消息路由(routes)到不同的Message Queue中去。

images

        多worker,多队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
root@localhost celery]# cat tasks.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from celery import Celery
app = Celery()
app.config_from_object("celeryconfig")
@app.task
def taskA(x,y):
return x + y
@app.task
def taskB(x,y,z):
return x + y + z
@app.task
def add(x,y):
return x + y
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@localhost celery]# cat celeryconfig.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from kombu import Exchange,Queue
BROKER_URL = "redis://192.168.48.131:6379/1"
CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2"
CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)
CELERY_ROUTES = {
'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},
'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}
}

        配置文件一般单独写在一个文件中。

        启动一个worker来指定taskA

1
2
celery -A tasks worker -l info -n workerA.%h -Q for_task_A
celery -A tasks worker -l info -n workerB.%h -Q for_task_B
1
2
3
4
5
6
7
from tasks import *
re1 = taskA.delay(100, 200)
print(re1.result)
re2 = taskB.delay(1, 2, 3)
print(re2.result)
re3 = add.delay(1, 2, 3)
print(re3.status) #PENDING

        我们看到状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。

1
2
3
celery -A tasks worker -l info -n worker.%h -Q celery
print(re3.status) #SUCCESS

        Celery与定时任务

        下面我们接着在celeryconfig.py中添加CELERYBEAT_SCHEDULE变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
'taskA_schedule' : {
'task':'tasks.taskA',
'schedule':20,
'args':(5,6)
},
'taskB_scheduler' : {
'task':"tasks.taskB",
"schedule":200,
"args":(10,20,30)
},
'add_schedule': {
"task":"tasks.add",
"schedule":10,
"args":(1,2)
}
}

        注意格式,否则会有问题