【美高梅开户网址】python多进度计算,Python多进度编制程序

序. multiprocessing
python中的二10八线程其实并不是真正的八线程,若是想要丰裕地应用多核CPU的财富,在python中山大学部场地必要使用多进度。Python提供了特别好用的多进度包multiprocessing,只必要定义2个函数,Python会完毕此外全部职业。借助那些包,能够轻便做到从单进度到并发施行的改变。multiprocessing补助子进程、通信和共享数据、实施不一方式的协同,提供了Process、Queue、Pipe、Lock等零件。

读书目录

  python中的10二线程其实并不是确实的十贰线程,假使想要充足地动用多核CPU的能源,在python中山大学部情况需求运用多进程。Python提供了万分好用的多进程包multiprocessing,只必要定义多少个函数,Python会完成别的具备业务。借助那几个包,能够轻易完结从单进程到并发执行的更动。multiprocessing协理子进度、通讯和共享数据、实施不1款式的同步,提供了Process、Queue、Pipe、Lock等零件。

原文:

 

    1. Process
    1. Lock
    1. Semaphore
    1. Event
    1. Queue
    1. Pipe
    1. Pool

1、Process

读书目录

1. Process

创立进程的类【美高梅开户网址】python多进度计算,Python多进度编制程序。:Process([group [, target [, name [, args [,
kwargs]]]]]),target表示调用对象,args表示调用对象的岗位参数元组。kwargs表示调用对象的字典。name为外号。group实质上不选拔。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()运维某些进程。

属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、假使为–N,表示被非实信号N截止)、name、pid。当中daemon是父进程终止后自行终止,且自个儿无法发生新进度,必须在start()在此以前设置。

 

例一.一:创设函数并将其看作单个进度

import multiprocessing
import time

def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "p.pid:", p.pid
    print "p.name:", p.name
    print "p.is_alive:", p.is_alive()

结果

p.pid: 8736
p.name: Process-1
p.is_alive: True
The time is Tue Apr 21 20:55:12 2015
The time is Tue Apr 21 20:55:15 2015
The time is Tue Apr 21 20:55:18 2015
The time is Tue Apr 21 20:55:21 2015
The time is Tue Apr 21 20:55:24 2015

 

例一.二:创建函数并将其视作两个经过

import multiprocessing
import time

def worker_1(interval):
    print "worker_1"
    time.sleep(interval)
    print "end worker_1"

def worker_2(interval):
    print "worker_2"
    time.sleep(interval)
    print "end worker_2"

def worker_3(interval):
    print "worker_3"
    time.sleep(interval)
    print "end worker_3"

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print "END!!!!!!!!!!!!!!!!!"

结果

The number of CPU is:4
child   p.name:Process-3 p.id7992
child   p.name:Process-2 p.id4204
child   p.name:Process-1 p.id6380
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3

 

例1.三:将经过定义为类

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()      

:进程p调用start()时,自动调用run()

结果

the time is Tue Apr 21 20:31:30 2015
the time is Tue Apr 21 20:31:33 2015
the time is Tue Apr 21 20:31:36 2015
the time is Tue Apr 21 20:31:39 2015
the time is Tue Apr 21 20:31:42 2015

 

例一.4:daemon程序相比较结果

#1.4-1 不加daemon属性

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "end!"

结果

end!
work start:Tue Apr 21 21:29:10 2015
work end:Tue Apr 21 21:29:13 2015

#1.4-2 加上daemon属性

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print "end!"

结果

end!

:因子进度设置了daemon属性,主进程结束,它们就趁着停止了。

#1.4-3 设置daemon试行完停止的点子

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    p.join()
    print "end!"

结果

work start:Tue Apr 21 22:16:32 2015
work end:Tue Apr 21 22:16:35 2015
end!

 

序. multiprocessing
python中的二十四线程其实并不是实在的多线程,要是想要充足地应用多核CPU的财富,在python中山高校部地方要求选择多进度。Python提供了十一分好用的多进程包multiprocessing,只要求定义二个函数,Python会达成其他全数业务。借助这些包,可以轻易做到从单进度到并发执行的转移。multiprocessing支持子进度、通信和共享数据、实行不1格局的一道,提供了Process、Queue、Pipe、Lock等零件。

制程的类:Process([group [, target [, name [, args [,
kwargs]]]]]),target表示调用对象,args表示调用对象的职位参数元组。kwargs表示调用对象的字典。name为别称。group实质上不选取。

  • 1.
    Process
  • 2.
    Lock
  • 3.
    Semaphore
  • 4.
    Event
  • 5.
    Queue
  • 6.
    Pipe
  • 7.
    Pool

2. Lock

当七个进程供给访问共享能源的时候,Lock能够用来防止访问的争辨。

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print "end"

结果(输出文件)

Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

 

 

方法:is_alive()、join([timeout])、run()、start()、terminate()。个中,Process以start()运营有个别进度。

序. multiprocessing
python中的二十拾2线程其实并不是实在的四线程,假若想要丰硕地行使多核CPU的能源,在python中大部场地必要接纳多进度。Python提供了要命好用的多进度包multiprocessing,只须求定义3个函数,Python会达成别的全部职业。借助那么些包,能够轻巧做到从单进程到出现试行的转移。multiprocessing协助子进程、通讯和共享数据、施行差异方式的同台,提供了Process、Queue、Pipe、Lock等零件。

3. Semaphore

Semaphore用来决定对共享资源的拜访数量,举例池的最浦那接数。

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

结果

Process-1acquire
Process-1release

Process-2acquire
Process-3acquire
Process-2release

Process-5acquire
Process-3release

Process-4acquire
Process-5release

Process-4release

 

属性:authkey、daemon(要经过start()设置)、exitcode(进度在运行时为None、假若为–N,表示被能量信号N甘休)、name、pid。当中daemon是父进程终止后自动截止,且自身不可能发生新进度,必须在start()在此以前安装。

 

4. Event

伊夫nt用来促成进程间1块通讯。

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")

结果

wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

 

1. Process

创设进程的类:Process([group [, target [, name [, args [,
kwargs]]]]]),

  target代表调用对象,

  args代表调用对象的职分参数元组。

  kwargs表示调用对象的字典。

  name为别名。

  group实质上不采纳。
方法:is_alive()、join([timeout])、run()、start()、terminate()。

  在那之中,Process以start()运维有个别进度。

属性:authkey、daemon(要经过start()设置)、exitcode(进度在运营时为None、假若为–N,表示被时域信号N甘休)、name、pid。

  当中daemon是父进程终止后自动终止,且自个儿不能发生新进度,必须在start()从前设置。

 

例一.1:创立函数并将其看成单个进程

美高梅开户网址 1

import multiprocessing
import time

def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print("p.pid:", p.pid)
    print("p.name:", p.name)
    print("p.is_alive:", p.is_alive())

美高梅开户网址 2

结果

1
2
3
4
5
6
7
8
p.pid: 8736
p.name: Process-1
p.is_alive: True
The time is Tue Apr 21 20:55:12 2015
The time is Tue Apr 21 20:55:15 2015
The time is Tue Apr 21 20:55:18 2015
The time is Tue Apr 21 20:55:21 2015
The time is Tue Apr 21 20:55:24 2015

 

例壹.二:创造函数并将其用作多少个进程

美高梅开户网址 3

import multiprocessing
import time

def worker_1(interval):
    print("worker_1")
    time.sleep(interval)
    print("end worker_1")

def worker_2(interval):
    print("worker_2")
    time.sleep(interval)
    print("end worker_2")

def worker_3(interval):
    print("worker_3")
    time.sleep(interval)
    print("end worker_3")

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print("END!!!!!!!!!!!!!!!!!")

美高梅开户网址 4

结果

1
2
3
4
5
6
7
8
9
10
11
The number of CPU is:4
child   p.name:Process-3    p.id7992
child   p.name:Process-2    p.id4204
child   p.name:Process-1    p.id6380
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3

 

例1.三:将经过定义为类

美高梅开户网址 5

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()      

美高梅开户网址 6

:进度p调用start()时,自动调用run()

结果

1
2
3
4
5
the time is Tue Apr 21 20:31:30 2015
the time is Tue Apr 21 20:31:33 2015
the time is Tue Apr 21 20:31:36 2015
the time is Tue Apr 21 20:31:39 2015
the time is Tue Apr 21 20:31:42 2015

 

例一.四:daemon程序相比较结果

#1.4-1 不加daemon属性

美高梅开户网址 7

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print("end!")

美高梅开户网址 8

结果

1
2
3
end!
work start:Tue Apr 21 21:29:10 2015
work end:Tue Apr 21 21:29:13 2015

#1.4-2 加上daemon属性

美高梅开户网址 9

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print("end!")

美高梅开户网址 10

结果

1
end!

:因子进度设置了daemon属性,主进程甘休,它们就趁机结束了。

#1.4-3 设置daemon施行完截至的主意

美高梅开户网址 11

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    p.join()
    print("end!")

美高梅开户网址 12

结果

1
2
3
work start:Tue Apr 21 22:16:32 2015
work end:Tue Apr 21 22:16:35 2015
end!

 

注:

回去顶部

5. Queue

Queue是多进度安全的行列,能够采纳Queue达成多进度之间的数目传递。put方法用以插入数据到行列中,put方法还有多个可选参数:blocked和timeout。借使blocked为True(暗中同意值),并且timeout为正在,该方法会阻塞timeout钦定的时光,直到该队列有结余的半空中。假若超时,会抛出Queue.Full万分。如若blocked为False,但该Queue已满,会立即抛出Queue.Full相当。

 

get方法能够从队列读取并且删除2个要素。同样,get方法有多少个可选参数:blocked和timeout。假若blocked为True(暗中认可值),并且timeout为正在,那么在守候时间内未有取到任何因素,会抛出Queue.Empty卓殊。如若blocked为False,有三种状态存在,假设Queue有2个值可用,则随即回到该值,不然,假诺队列为空,则立时抛出Queue.Empty至极。Queue的1段示例代码:

import multiprocessing

def writer_proc(q):      
    try:         
        q.put(1, block = False) 
    except:         
        pass   

def reader_proc(q):      
    try:         
        print q.get(block = False) 
    except:         
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))  
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))  
    reader.start()  

    reader.join()  
    writer.join()

结果

1

 

2. Lock

当七个经过必要访问共享财富的时候,Lock可以用来制止访问的争辩。

美高梅开户网址 13

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print("end")

美高梅开户网址 14

结果(输出文件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

 

is_live()用来查看进程的情况

1. Process

制程的类:Process([group [, target [, name [, args [,
kwargs]]]]]),target表示调用对象,args表示调用对象的地点参数元组。kwargs表示调用对象的字典。name为外号。group实质上不行使。
方法:is_alive()、join([timeout])、run()、start()、terminate()。当中,Process以start()运营某些进度。

属性:authkey、daemon(要因而start()设置)、exitcode(进度在运作时为None、即使为–N,表示被非非确定性信号N结束)、name、pid。在那之中daemon是父进度终止后活动结束,且自身不能够发出新历程,必须在start()在此以前设置。

 

例一.1:创立函数并将其看成单个进度

美高梅开户网址 15

import multiprocessing
import time

def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "p.pid:", p.pid
    print "p.name:", p.name
    print "p.is_alive:", p.is_alive()

美高梅开户网址 16

结果

1
2
3
4
5
6
7
8
p.pid: 8736
p.name: Process-1
p.is_alive: True
The time is Tue Apr 21 20:55:12 2015
The time is Tue Apr 21 20:55:15 2015
The time is Tue Apr 21 20:55:18 2015
The time is Tue Apr 21 20:55:21 2015
The time is Tue Apr 21 20:55:24 2015

 

例壹.二:创设函数并将其看做几个经过

美高梅开户网址 17

import multiprocessing
import time

def worker_1(interval):
    print "worker_1"
    time.sleep(interval)
    print "end worker_1"

def worker_2(interval):
    print "worker_2"
    time.sleep(interval)
    print "end worker_2"

def worker_3(interval):
    print "worker_3"
    time.sleep(interval)
    print "end worker_3"

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print "END!!!!!!!!!!!!!!!!!"

美高梅开户网址 18

结果

1
2
3
4
5
6
7
8
9
10
11
The number of CPU is:4
child   p.name:Process-3    p.id7992
child   p.name:Process-2    p.id4204
child   p.name:Process-1    p.id6380
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3

 

例一.三:将经过定义为类

美高梅开户网址 19

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()      

美高梅开户网址 20

:进程p调用start()时,自动调用run()

结果

1
2
3
4
5
the time is Tue Apr 21 20:31:30 2015
the time is Tue Apr 21 20:31:33 2015
the time is Tue Apr 21 20:31:36 2015
the time is Tue Apr 21 20:31:39 2015
the time is Tue Apr 21 20:31:42 2015

 

例一.四:daemon程序比较结果

#1.4-1 不加daemon属性

美高梅开户网址 21

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "end!"

美高梅开户网址 22

结果

1
2
3
end!
work start:Tue Apr 21 21:29:10 2015
work end:Tue Apr 21 21:29:13 2015

#1.4-2 加上daemon属性

美高梅开户网址 23

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print "end!"

美高梅开户网址 24

结果

1
end!

:因子进度设置了daemon属性,主进程甘休,它们就趁早截至了。

#1.4-3 设置daemon推行完截止的章程

美高梅开户网址 25

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    p.join()
    print "end!"

美高梅开户网址 26

结果

1
2
3
work start:Tue Apr 21 22:16:32 2015
work end:Tue Apr 21 22:16:35 2015
end!

 

美高梅开户网址,回来顶部

6. Pipe

Pipe方法重临(conn一,
conn2)代表贰个管道的七个端。Pipe方法有duplex参数,要是duplex参数为True(私下认可值),那么这几个管道是全双工格局,也正是说conn①和conn2均可收发。duplex为False,conn三头承担接受新闻,conn二只承担发送新闻。

 

send和recv方法分别是出殡和埋葬和承受音信的方法。比如,在全双工格局下,能够调用conn壹.send出殡和埋葬新闻,conn一.recv接收新闻。倘若未有新闻可收取,recv方法会一向不通。假诺管道已经被关闭,那么recv方法会抛出EOFError。

import multiprocessing
import time

def proc1(pipe):
    while True:
        for i in xrange(10000):
            print "send: %s" %(i)
            pipe.send(i)
            time.sleep(1)

def proc2(pipe):
    while True:
        print "proc2 rev:", pipe.recv()
        time.sleep(1)

def proc3(pipe):
    while True:
        print "PROC3 rev:", pipe.recv()
        time.sleep(1)

if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))

    p1.start()
    p2.start()
    #p3.start()

    p1.join()
    p2.join()
    #p3.join()

结果

美高梅开户网址 27

 

3. Semaphore

Semaphore用来支配对共享能源的走访数量,比如池的最浦那接数。

美高梅开户网址 28

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

美高梅开户网址 29

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Process-1acquire
Process-1release
 
Process-2acquire
Process-3acquire
Process-2release
 
Process-5acquire
Process-3release
 
Process-4acquire
Process-5release
 
Process-4release

 

terminate()用来终止进度。

2. Lock

当四个经过要求访问共享财富的时候,Lock能够用来防止访问的冲突。

美高梅开户网址 30

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print "end"

美高梅开户网址 31

结果(输出文件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

 

回到顶部

7. Pool

在应用Python进行系统管理的时候,尤其是同时操作四个文件目录,大概远程序调控制多台主机,并行操作能够省去多量的光阴。当被操作对象数目非常的小时,能够一向利用multiprocessing中的Process动态成生多少个进度,十多个幸好,但只倘使无数个,上千个目的,手动的去界定进度数量却又太过繁琐,此时能够公布进度池的效果。
Pool能够提供内定数量的长河,供用户调用,当有新的请求提交到pool中时,假若池还不曾满,那么就能够创设三个新的进度用来实施该请求;但倘若池中的进度数已经达到规定最大值,那么该请求就能等待,直到池中有经过截至,才会成立新的进程来它。

 

例7.一:使用进度池(非阻塞)

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."

2次实行结果

mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0

msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.

函数解释:

  • apply_async(func[, args[, kwds[, callback]]])
    它是非阻塞,apply(func[, args[,
    kwds]])是阻塞的(精晓差距,看例1例二结果区别)
  • close()    关闭pool,使其不在接受新的职责。
  • terminate()    甘休职业进度,不在管理未成功的天职。
  • join()    主进度阻塞,等待子进度的退出,
    join方法要在close或terminate之后采取。

进行表达:创造二个进度池pool,并设定进度的数码为三,xrange(四)会相继爆发八个目的[0,
1, 2,
4],多个对象被交付到pool中,因pool钦赐进度数为3,所以0、一、二会间接送到进程中实施,当个中一个试行到位后才空出三个进程管理对象3,所以会师世出口“msg:
hello
三”出现在”end”后。因为为非阻塞,主函数会自个儿奉行自个的,不搭理进度的举行,所以运营完for循环后一贯出口“mMsg:
hark~ Mark~
Mark~~~~~~”,主程序在pool.join()处等待各样进度的落成。

 

例七.二:使用进度池(阻塞)

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."

三回执行的结果

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

  

例7.三:使用进度池,并关怀结果

import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"
    return "done" + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."

二遍实施结果

msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

 

例七.4:使用多少个进程池

#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
    end = time.time()
    print 'Task Lee, runs %0.2f seconds.' %(end - start)

def Marlon():
    print "\nRun task Marlon-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print 'Task Marlon runs %0.2f seconds.' %(end - start)

def Allen():
    print "\nRun task Allen-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print 'Task Allen runs %0.2f seconds.' %(end - start)

def Frank():
    print "\nRun task Frank-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print 'Task Frank runs %0.2f seconds.' %(end - start)

if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print "parent process %s" %(os.getpid())

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print 'Waiting for all subprocesses done...'
    pool.close()
    pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print 'All subprocesses done.'

1次举办结果

parent process 7704

Waiting for all subprocesses done...
Run task Lee-6948

Run task Marlon-2896

Run task Allen-7304

Run task Frank-3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.

4. Event

伊芙nt用来贯彻进度间共同通讯。

美高梅开户网址 32

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")

美高梅开户网址 33

结果

1
2
3
4
5
wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

 

单进程:

3. Semaphore

Semaphore用来支配对共享财富的访问数量,比方池的最辛辛那提接数。

美高梅开户网址 34

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

美高梅开户网址 35

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Process-1acquire
Process-1release
 
Process-2acquire
Process-3acquire
Process-2release
 
Process-5acquire
Process-3release
 
Process-4acquire
Process-5release
 
Process-4release

 

回去顶部

5. Queue

Queue是多进度安全的类别,能够利用Queue达成多进度之间的多寡传递。put方法用以插入数据到行列中,put方法还有七个可选参数:blocked和timeout。假诺blocked为True(暗中同意值),并且timeout为正在,该方法会阻塞timeout钦命的流年,直到该队列有结余的长空。假如超时,会抛出Queue.Full十分。如果blocked为False,但该Queue已满,会立时抛出Queue.Full万分。

 

get方法能够从队列读取并且删除1个因素。一样,get方法有五个可选参数:blocked和timeout。假若blocked为True(默许值),并且timeout为正在,那么在守候时间内并未有取到任何因素,会抛出Queue.Empty至极。假设blocked为False,有三种状态存在,借使Queue有二个值可用,则随即回到该值,不然,假若队列为空,则霎时抛出Queue.Empty分外。Queue的一段示例代码:

美高梅开户网址 36

import multiprocessing

def writer_proc(q):      
    try:         
        q.put(1, block = False) 
    except:         
        pass   

def reader_proc(q):      
    try:         
        print(q.get(block = False) )
    except:         
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))  
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))  
    reader.start()  

    reader.join()  
    writer.join()

美高梅开户网址 37

结果

1
1

 

 1 import multiprocessing
 2 import time
 3 def worker(interval):
 4     n=5
 5     while n > 0:
 6         print("The time is {0}".format(time.ctime()))
 7         time.sleep(interval)
 8         n -=1
 9 
10 if __name__ == "__main__":
11     p = multiprocessing.Process(target=worker,args=(3,))
12     p.start()
13     print("p.pid:",p.pid)
14     print("p.name:",p.name)
15     print("p.is_alive:",p.is_alive())

4. Event

伊夫nt用来促成进度间壹块通信。

美高梅开户网址 38

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")

美高梅开户网址 39

结果

1
2
3
4
5
wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

 

归来顶部

6. Pipe

Pipe方法再次回到(conn一,
conn2)代表1个管道的多个端。Pipe方法有duplex参数,假设duplex参数为True(暗中认可值),那么那几个管道是全双工情势,也便是说conn一和conn2均可收发。duplex为False,conn叁头肩负接受新闻,conn二只担任发送新闻。

 

send和recv方法分别是发送和收受新闻的法子。例如,在全双工情势下,能够调用conn一.send出殡和埋葬消息,conn1.recv接收音信。倘使未有新闻可接收,recv方法会一向不通。如果管道已经被关门,那么recv方法会抛出EOFError。

美高梅开户网址 40

import multiprocessing
import time

def proc1(pipe):
    while True:
        for i in xrange(10000):
            print("send: %s" %(i))
            pipe.send(i)
            time.sleep(1)

def proc2(pipe):
    while True:
        print("proc2 rev:", pipe.recv())
        time.sleep(1)

def proc3(pipe):
    while True:
        print("PROC3 rev:", pipe.recv())
        time.sleep(1)

if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))

    p1.start()
    p2.start()
    #p3.start()

    p1.join()
    p2.join()
    #p3.join()

美高梅开户网址 41

结果

美高梅开户网址 42

 

多进程:

5. Queue

Queue是多进度安全的种类,能够使用Queue达成多进度之间的数额传递。put方法用以插入数据到行列中,put方法还有多少个可选参数:blocked和timeout。要是blocked为True(默许值),并且timeout为正在,该方法会阻塞timeout钦命的时光,直到该队列有多余的上空。假诺超时,会抛出Queue.Full非凡。假如blocked为False,但该Queue已满,会立马抛出Queue.Full非常。

 

get方法能够从队列读取并且删除四个成分。相同,get方法有四个可选参数:blocked和timeout。如若blocked为True(暗许值),并且timeout为正值,那么在等候时间内并未有取到任何因素,会抛出Queue.Empty异常。要是blocked为False,有二种景况存在,假如Queue有一个值可用,则马上回去该值,不然,假使队列为空,则马上抛出Queue.Empty卓殊。Queue的壹段示例代码:

美高梅开户网址 43

import multiprocessing

def writer_proc(q):      
    try:         
        q.put(1, block = False) 
    except:         
        pass   

def reader_proc(q):      
    try:         
        print q.get(block = False) 
    except:         
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))  
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))  
    reader.start()  

    reader.join()  
    writer.join()

美高梅开户网址 44

结果

1
1

 

回去顶部

7. Pool

在采纳Python举行系统管理的时候,特别是同时操作三个文件目录,或许远程序调整制多台主机,并行操作能够省去大量的时日。当被操作对象数目不大时,能够一向利用multiprocessing中的Process动态成生多个经过,二十个幸亏,但倘如若过多少个,上千个对象,手动的去界定进度数量却又太过繁琐,此时能够公布进度池的成效。
Pool能够提供内定数量的历程,供用户调用,当有新的伸手提交到pool中时,如若池还向来不满,那么就能成立2个新的进程用来施行该请求;但万一池中的进程数壹度达到规定的标准规定最大值,那么该请求就能等待,直到池中有经过停止,才会制造新的长河来它。

 

例七.一:使用进度池(非阻塞)

美高梅开户网址 45

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

美高梅开户网址 46

二遍举办结果

1
2
3
4
5
6
7
8
9
10
mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
 
msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.

函数解释:

  • apply_async(func[, args[, kwds[, callback]]])
    它是非阻塞,apply(func[, args[,
    kwds]])是阻塞的(驾驭差距,看例一例2结实不一致)
  • close()    关闭pool,使其不在接受新的天职。
  • terminate()    甘休职业进度,不在管理未到位的职责。
  • join()    主进程阻塞,等待子进程的淡出,
    join方法要在close或terminate之后选择。

实行表达:创造二个进度池pool,并设定进度的数码为三,xrange(四)会挨个发出多个对象[0,
1, 2,
4],多个目标被交付到pool中,因pool钦点进程数为三,所以0、1、二会间接送到进程中实践,当其中2个实践到位后才空出一个经过管理目的三,所以晤面世出口“msg:
hello
叁”出现在”end”后。因为为非阻塞,主函数会本人施行自个的,不搭理进度的进行,所以运维完for循环后直接出口“mMsg:
hark~ Mark~
Mark~~~~~~”,主程序在pool.join()处等待各样过程的终结。

 

例7.二:使用进度池(阻塞)

美高梅开户网址 47

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

美高梅开户网址 48

三回试行的结果

1
2
3
4
5
6
7
8
9
10
msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

  

例7.三:使用进程池,并关心结果

美高梅开户网址 49

import multiprocessing
import time

def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
    return "done" + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print(":::", res.get())
    print(Sub-process(es) done.")

美高梅开户网址 50

一次实行结果

1
2
3
4
5
6
7
8
9
10
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

 

例7.四:使用四个过程池

美高梅开户网址 51

#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print("\nRun task Lee-%s" %(os.getpid())) #os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
    end = time.time()
    print('Task Lee, runs %0.2f seconds.' %(end - start))

def Marlon():
    print("\nRun task Marlon-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print('Task Marlon runs %0.2f seconds.' %(end - start))

def Allen():
    print("\nRun task Allen-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print('Task Allen runs %0.2f seconds.' %(end - start))

def Frank():
    print("\nRun task Frank-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print('Task Frank runs %0.2f seconds.' %(end - start))

if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print("parent process %s" %(os.getpid()))

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print('Waiting for all subprocesses done...')
    pool.close()
    pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print('All subprocesses done.')

美高梅开户网址 52

3回推行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
parent process 7704
 
Waiting for all subprocesses done...
Run task Lee-6948
 
Run task Marlon-2896
 
Run task Allen-7304
 
Run task Frank-3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.
 1 import multiprocessing
 2 import time
 3 
 4 def worker_1(interval):
 5     print ("worker_1")
 6     time.sleep(interval)
 7     print ("end worker_1")
 8 
 9 def worker_2(interval):
10     print ("worker_2")
11     time.sleep(interval)
12     print ("end worker_2")
13 
14 def worker_3(interval):
15     print ("worker_3")
16     time.sleep(interval)
17     print ("end worker_3")
18 
19 if __name__ == "__main__":
20     p1 = multiprocessing.Process(target = worker_1, args = (2,))
21     p2 = multiprocessing.Process(target = worker_2, args = (3,))
22     p3 = multiprocessing.Process(target = worker_3, args = (4,))
23 
24     p1.start()
25     p2.start()
26     p3.start()
27     # 用来获得当前的CPU的核数,可以用来设置接下来子进程的个数。
28     # 用来获得当前所有的子进程,包括daemon和非daemon子进程。
29     # p.name,p.pid分别表示进程的名字,进程id。 
30     print("The number of CPU is:" + str(multiprocessing.cpu_count()))
31     for p in multiprocessing.active_children():
32         print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
33     print ("END!!!!!!!!!!!!!!!!!")

6. Pipe

Pipe方法重回(conn1,
conn二)代表三个管道的五个端。Pipe方法有duplex参数,假若duplex参数为True(暗中认可值),那么这几个管道是全双工方式,也便是说conn壹和conn贰均可收发。duplex为False,conn三头肩负接受消息,conn3头承担发送音信。

 

send和recv方法分别是发送和承受音讯的办法。比如,在全双工方式下,能够调用conn一.send发送音讯,conn一.recv接收音讯。借使未有音讯可接收,recv方法会一向不通。固然管道已经被关闭,那么recv方法会抛出EOFError。

美高梅开户网址 53

import multiprocessing
import time

def proc1(pipe):
    while True:
        for i in xrange(10000):
            print "send: %s" %(i)
            pipe.send(i)
            time.sleep(1)

def proc2(pipe):
    while True:
        print "proc2 rev:", pipe.recv()
        time.sleep(1)

def proc3(pipe):
    while True:
        print "PROC3 rev:", pipe.recv()
        time.sleep(1)

if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))

    p1.start()
    p2.start()
    #p3.start()

    p1.join()
    p2.join()
    #p3.join()

美高梅开户网址 54

结果

美高梅开户网址 55

 

归来顶部

将经过定义为类:

7. Pool

在采用Python进行系统处理的时候,特别是同时操作多个文件目录,也许远程序调节制多台主机,并行操作能够省去大量的小运。当被操作对象数目非常的小时,能够一向运用multiprocessing中的Process动态成生两个进度,2十二个辛亏,但一旦是累累个,上千个目的,手动的去界定进程数量却又太过繁琐,此时能够表明进度池的机能。
Pool能够提供钦点数量的经过,供用户调用,当有新的呼吁提交到pool中时,要是池还没有满,那么就能够创建二个新的进程用来实行该请求;但借使池中的进度数已经达到规定的标准规定最大值,那么该请求就能够等待,直到池中有进程截止,才会创制新的历程来它。

 

例柒.一:使用进度池

美高梅开户网址 56

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."

美高梅开户网址 57

二遍施行结果

1
2
3
4
5
6
7
8
9
10
mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
 
msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.

函数解释:

  • apply_async(func[, args[, kwds[, callback]]])
    它是非阻塞,apply(func[, args[,
    kwds]])是阻塞的(精通不一致,看例1例二结出不相同)
  • close()    关闭pool,使其不在接受新的职务。
  • terminate()    截止工作历程,不在处理未成功的义务。
  • join()    主进程阻塞,等待子进度的脱离,
    join方法要在close或terminate之后采取。

实施表明:成立1个历程池pool,并设定进度的数量为3,xrange(四)会挨个发出四个目的[0,
1, 2,
4],四个对象被提交到pool中,因pool钦点进程数为三,所以0、一、二会直接送到进程中实施,当个中三个实施到位后才空出3个经过管理对象三,所以会并发出口“msg:
hello
三”出现在”end”后。因为为非阻塞,主函数会本人施行自个的,不搭理进度的施行,所以运营完for循环后一向出口“mMsg:
hark~ Mark~
Mark~~~~~~”,主程序在pool.join()处等待各样进程的停止。

 

例⑦.二:使用进程池(阻塞)

美高梅开户网址 58

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."

美高梅开户网址 59

1回实行的结果

1
2
3
4
5
6
7
8
9
10
msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

  

例7.三:使用进度池,并关切结果

美高梅开户网址 60

import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"
    return "done" + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."

美高梅开户网址 61

一遍实践结果

1
2
3
4
5
6
7
8
9
10
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

 

例7.四:使用八个进度池

美高梅开户网址 62

#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
    end = time.time()
    print 'Task Lee, runs %0.2f seconds.' %(end - start)

def Marlon():
    print "\nRun task Marlon-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print 'Task Marlon runs %0.2f seconds.' %(end - start)

def Allen():
    print "\nRun task Allen-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print 'Task Allen runs %0.2f seconds.' %(end - start)

def Frank():
    print "\nRun task Frank-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print 'Task Frank runs %0.2f seconds.' %(end - start)

if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print "parent process %s" %(os.getpid())

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print 'Waiting for all subprocesses done...'
    pool.close()
    pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print 'All subprocesses done.'

美高梅开户网址 63

二遍举办结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
parent process 7704
 
Waiting for all subprocesses done...
Run task Lee-6948
 
Run task Marlon-2896
 
Run task Allen-7304
 
Run task Frank-3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.
 1 import multiprocessing
 2 import time
 3 
 4 class ClockProcess(multiprocessing.Process):
 5     def __init__(self, interval):
 6         multiprocessing.Process.__init__(self)
 7         self.interval = interval
 8 
 9     def run(self):
10         n = 5
11         while n > 0:
12             print("the time is {0}".format(time.ctime()))
13             time.sleep(self.interval)
14             n -= 1
15 
16 if __name__ == '__main__':
17     p = ClockProcess(3)
18     p.start()

daemon程序相比结果:
1.不加daemon

 1 import multiprocessing
 2 import time
 3 
 4 def worker(interval):
 5     print("work start:{0}".format(time.ctime()));
 6     time.sleep(interval)
 7     print("work end:{0}".format(time.ctime()));
 8 
 9 if __name__ == "__main__":
10     p = multiprocessing.Process(target = worker, args = (3,))
11     p.start()
12     print ("end!")
13 
14 #程序运行结果
15 '''
16 end!
17 work start:Wed Jun 28 00:07:57 2017
18 work end:Wed Jun 28 00:08:00 2017
19 '''

2.加daemon

 1 import multiprocessing
 2 import time
 3 
 4 def worker(interval):
 5     print("work start:{0}".format(time.ctime()));
 6     time.sleep(interval)
 7     print("work end:{0}".format(time.ctime()));
 8 
 9 if __name__ == "__main__":
10     p = multiprocessing.Process(target = worker, args = (3,))
11     p.daemon = True
12     p.start()
13     print ("end!")
14 
15 #程序运行结果
16 '''
17 end!
18 
19 '''

PS:因子进程设置了daemon属性,主过程结束,它们就趁着甘休了。
3.装置daemon施行完停止的情势

 1 import multiprocessing
 2 import time
 3 
 4 def worker(interval):
 5     print("work start:{0}".format(time.ctime()));
 6     time.sleep(interval)
 7     print("work end:{0}".format(time.ctime()));
 8 
 9 if __name__ == "__main__":
10     p = multiprocessing.Process(target = worker, args = (3,))
11     p.daemon = True
12     p.start()
13     p.join()
14     print "end!"
15 
16 # 结果
17 '''
18 work start:Tue Apr 21 22:16:32 2015
19 work end:Tue Apr 21 22:16:35 2015
20 end!
21 '''

2、Lock
当多少个经过需求访问共享财富的时候,Lock能够用来制止访问的争论。

 1 import multiprocessing
 2 import sys
 3 
 4 def worker_with(lock, f):
 5     with lock:
 6         fs = open(f, 'a+')
 7         n = 10
 8         while n > 1:
 9             fs.write("Lockd acquired via with\n")
10             n -= 1
11         fs.close()
12         
13 def worker_no_with(lock, f):
14     lock.acquire()
15     try:
16         fs = open(f, 'a+')
17         n = 10
18         while n > 1:
19             fs.write("Lock acquired directly\n")
20             n -= 1
21         fs.close()
22     finally:
23         lock.release()
24     
25 if __name__ == "__main__":
26     lock = multiprocessing.Lock()
27     f = "file.txt"
28     w = multiprocessing.Process(target = worker_with, args=(lock, f))
29     nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
30     w.start()
31     nw.start()
32     print ("end")

3、Semaphore
Semaphore用来调节对共享能源的拜会数量,例如池的最达累斯萨Lamb接数。

 1 import multiprocessing
 2 import time
 3 
 4 def worker(s, i):
 5     s.acquire()
 6     print(multiprocessing.current_process().name + "acquire")
 7     time.sleep(i)
 8     print(multiprocessing.current_process().name + "release\n")
 9     s.release()
10 
11 if __name__ == "__main__":
12     s = multiprocessing.Semaphore(2)   # 限制最多有两个进程同时执行
13     for i in range(5):
14         p = multiprocessing.Process(target = worker, args=(s, i*2))
15         p.start()

运维结果:

 1 Process-4acquire
 2 Process-2acquire
 3 Process-2release
 4 
 5 Process-1acquire
 6 Process-1release
 7 
 8 Process-3acquire
 9 Process-4release
10 
11 Process-5acquire
12 Process-3release
13 
14 Process-5release

4、Event
伊夫nt兑现进程间协同通讯

 1 import multiprocessing
 2 import time
 3 
 4 def wait_for_event(e):
 5     print("wait_for_event: starting")
 6     e.wait()
 7     print("wairt_for_event: e.is_set()->" + str(e.is_set()))
 8 
 9 def wait_for_event_timeout(e, t):
10     print("wait_for_event_timeout:starting")
11     e.wait(t)
12     print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))
13 
14 if __name__ == "__main__":
15     e = multiprocessing.Event()
16     w1 = multiprocessing.Process(name = "block",
17             target = wait_for_event,
18             args = (e,))
19 
20     w2 = multiprocessing.Process(name = "non-block",
21             target = wait_for_event_timeout,
22             args = (e, 2))
23     w1.start()
24     w2.start()
25 
26     time.sleep(3)
27 
28     e.set()
29     print("main: event is set")
30 
31 # 运行结果
32 '''
33 
34 wait_for_event: starting
35 
36 wait_for_event_timeout:starting
37 
38 wait_for_event_timeout:e.is_set->False
39 
40 main: event is set
41 
42 wairt_for_event: e.is_set()->True
43 
44 '''

5、Queue

Queue是多进度安全的队列,能够运用Queue完毕多进度之间的数目传递。put方法用以插入数据到行列中,put方法还有四个可选参数:blocked和timeout。假使blocked为True(私下认可值),并且timeout为正在,该方法会阻塞timeout钦命的小时,直到该队列有结余的长空。要是超时,会抛出Queue.Full卓殊。假若blocked为False,但该Queue已满,会立即抛出Queue.Full非凡。

 

get方法能够从队列读取并且删除1个要素。同样,get方法有多少个可选参数:blocked和timeout。如若blocked为True(默许值),并且timeout为正值,那么在等候时间内未有取到任何因素,会抛出Queue.Empty万分。假诺blocked为False,有二种状态存在,假设Queue有二个值可用,则随即回去该值,不然,若是队列为空,则立即抛出Queue.Empty分外。

 1 import multiprocessing
 2 def writer_proc(q):
 3     try:
 4         q.put(1, block = False)
 5     except:
 6         pass
 7 
 8 def reader_proc(q):
 9     try:
10         print (q.get(block = False))
11     except:
12         pass
13 
14 if __name__ == "__main__":
15     q = multiprocessing.Queue()
16     writer = multiprocessing.Process(target=writer_proc, args=(q,))
17     writer.start()
18 
19     reader = multiprocessing.Process(target=reader_proc, args=(q,))
20     reader.start()
21 
22     reader.join()
23     writer.join()
24 
25 # 运行结果
26 # 1

6、Pipe

Pipe方法再次来到(conn一,
conn二)代表多个管道的八个端。Pipe方法有duplex参数,假设duplex参数为True(暗许值),那么这么些管道是全双工形式,也正是说conn壹和conn二均可收发。duplex为False,conn一只担当接受音信,conn3只承担发送音信。

 

send和recv方法分别是发送和经受新闻的法子。举例,在全双工格局下,能够调用conn一.send出殡和埋葬音信,conn一.recv接收新闻。假使未有新闻可吸收接纳,recv方法会一直不通。假使管道已经被关门,那么recv方法会抛出EOFError。

Pipe能够是单向(half-duplex),也能够是双向(duplex)。大家通过mutiprocessing.Pipe(duplex=False)创制单向管道
(默感到双向)。五个进度从PIPE一端输入对象,然后被PIPE另壹端的进度接收,单向管道只同意管道一端的进度输入,而双向管道则允许从两端输入。

 1 # proc1 发送消息,proc2,proc3轮流接收消息
 2 import multiprocessing
 3 import time
 4 
 5 def proc1(pipe):
 6     while True:
 7         for i in range(100):
 8             print ("send: %s" %(i))
 9             pipe.send(i)
10             time.sleep(1)
11 
12 def proc2(pipe):
13     while True:
14         print ("proc2 rev:", pipe.recv())
15         time.sleep(1)
16 
17 def proc3(pipe):
18     while True:
19         print ("proc3 rev:", pipe.recv())
20         time.sleep(1)
21 
22 if __name__ == "__main__":
23     pipe = multiprocessing.Pipe()
24     p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
25     p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
26     p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
27 
28     p1.start()
29     p2.start()
30     p3.start()
31 
32     p1.join()
33     p2.join()
34     p3.join()
35 
36 # 运行结果
37 '''
38 send: 0
39 proc2 rev: 0
40 send: 1
41 proc3 rev: 1
42 send: 2
43 proc2 rev: 2
44 send: 3
45 proc3 rev: 3
46 send: 4
47 proc2 rev: 4
48 send: 5
49 proc3 rev: 5
50 send: 6
51 proc2 rev: 6
52 send: 7
53 proc3 rev: 7
54 send: 8
55 proc2 rev: 8
56 send: 9
57 proc3 rev: 9
58 send: 10
59 proc2 rev: 10
60 ......
61 '''

7、Pool
在利用Python举行系统管理的时候,尤其是同时操作多少个文件目录,只怕远程序调控制多台主机,并行操作能够省去多量的光阴。当被操作对象数目相当小时,能够一贯运用multiprocessing中的Process动态成生多少个进程,二10个幸好,但一旦是多多益善个,上千个目的,手动的去界定进度数量却又太过繁琐,此时能够公布进程池的作用。
Pool能够提供钦赐数量的长河,供用户调用,当有新的请求提交到pool中时,要是池还不曾满,那么就能创建一个新的经过用来试行该请求;但假设池中的进程数已经到达规定最大值,那么该请求就能够等待,直到池中有经过停止,才会创立新的进度来进行它。

利用进度池(非阻塞)

 1 import multiprocessing
 2 import time
 3 
 4 def func(msg):
 5     print ("msg:", msg)
 6     time.sleep(3)
 7     print ("end")
 8 
 9 if __name__ == "__main__":
10     pool = multiprocessing.Pool(processes = 3)   # 池中最大进程数为3
11     for i in range(10):
12         msg = "hello %d" %(i)
13         pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
14 
15     print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
16     pool.close()
17     pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
18     print ("Sub-process(es) done.")

运作结果:

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
msg: hello 3
end
msg: hello 4
end
msg: hello 5
end
msg: hello 6
end
msg: hello 7
end
msg: hello 8
end
msg: hello 9
end
end
end
Sub-process(es) done.

函数解释:

  • apply_async(func[, args[, kwds[, callback]]])
    它是非阻塞,apply(func[, args[,
    kwds]])是阻塞的(精晓差异,看例壹例二结出差别)
  • close()    关闭pool,使其不在接受新的职务。
  • terminate()    甘休工作经过,不在管理未成功的任务。
  • join()    主进度阻塞,等待子进程的脱离,
    join方法要在close或terminate之后选用。

进行表达:创造贰个经过池pool,并设定进度的数目为3,range(四)会挨个发出多少个目的[0,
1, 2,
3,4,5,6,7,8,9],十个目的被提交到pool中,因pool内定进度数为三,所以0、一、2会直接送到进度中实施,当在那之中2个推行到位后才空出叁个进度管理对象叁,所以会面世出口“msg:
hello
3”出现在”end”后。因为为非阻塞,主函数会本人推行自个的,不搭理进程的进行,所以运维完for循环后间接出口“mMsg:
hark~ Mark~
Mark~~~~~~”,主程序在pool.join()处等待各样进度的告竣。

 

使用线程池(阻塞)

 1 import multiprocessing
 2 import time
 3 
 4 def func(msg):
 5     print ("msg:", msg)
 6     time.sleep(3)
 7     print ("end")
 8 
 9 if __name__ == "__main__":
10     pool = multiprocessing.Pool(processes = 3)   # 池中最大进程数为3
11     for i in range(10):
12         msg = "hello %d" %(i)
13         pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
14 
15     print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
16     pool.close()
17     pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
18     print ("Sub-process(es) done.")
19 
20 # 运行结果
21 '''
22 msg: hello 0
23 end
24 msg: hello 1
25 end
26 msg: hello 2
27 end
28 msg: hello 3
29 end
30 msg: hello 4
31 end
32 msg: hello 5
33 end
34 msg: hello 6
35 end
36 msg: hello 7
37 end
38 msg: hello 8
39 end
40 msg: hello 9
41 end
42 Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
43 Sub-process(es) done.
44 '''

运用四个进度池

 1 import multiprocessing
 2 import os, time, random
 3 
 4 
 5 def Lee():
 6     print("\nRun task Lee-%s" % (os.getpid()))  # os.getpid()获取当前的进程的ID
 7     start = time.time()
 8     time.sleep(random.random() * 10)  # random.random()随机生成0-1之间的小数
 9     end = time.time()
10     print( 'Task Lee, runs %0.2f seconds.' % (end - start))
11 
12 
13 def Marlon():
14     print("\nRun task Marlon-%s" % (os.getpid()))
15     start = time.time()
16     time.sleep(random.random() * 40)
17     end = time.time()
18     print('Task Marlon runs %0.2f seconds.' % (end - start))
19 
20 
21 def Allen():
22     print("\nRun task Allen-%s" % (os.getpid()))
23     start = time.time()
24     time.sleep(random.random() * 30)
25     end = time.time()
26     print('Task Allen runs %0.2f seconds.' % (end - start))
27 
28 
29 def Frank():
30     print( "\nRun task Frank-%s" % (os.getpid()))
31     start = time.time()
32     time.sleep(random.random() * 20)
33     end = time.time()
34     print( 'Task Frank runs %0.2f seconds.' % (end - start))
35 
36 
37 if __name__ == '__main__':
38     function_list = [Lee, Marlon, Allen, Frank]
39     print("parent process %s" % (os.getpid()))
40 
41     pool = multiprocessing.Pool(4)
42     for func in function_list:
43         pool.apply_async(func)  # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
44 
45     print('Waiting for all subprocesses done...')
46     pool.close()
47     pool.join()  # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
48     print( 'All subprocesses done.')
49 
50 # 运行结果
51 '''
52 parent process 3256
53 Waiting for all subprocesses done...
54 
55 Run task Lee-2196
56 
57 Run task Marlon-4580
58 
59 Run task Allen-5920
60 
61 Run task Frank-6384
62 Task Allen runs 2.15 seconds.
63 Task Lee, runs 9.99 seconds.
64 Task Frank runs 14.14 seconds.
65 Task Marlon runs 32.74 seconds.
66 All subprocesses done.
67 
68 '''

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图