知识十遗篇,进程间通讯

 

多进程

经过之间是并行独立的,python是运营进度的时候,是运转的是原生进程。进度是没有GIL锁的,而且不设有锁的概念,进度之间的数据式不可能共享的,而线程是能够的。

当八线程制造落成之后,start并不曾了当下运维,依然要求和此外线程抢CPU的身份,只是
日子非常的短。
进程之间的通信分为三种,queue和pipe

写一个game 循环
game
loop是各种游戏的中央.它不停的拿走用户输入,更新游戏处境,渲染游戏结果到荧屏上.网游分为客户端和服务端两部分.两边的loop通过互联网连接起来.平时意况下,客户端获取用户输入,发送到服务端,服务端管理总计数据,更新游戏者状态,发送结果个客户端.比方游戏的使用者只怕游戏物体的地点.非凡首要的是,不要把客户端和服务端的功能混淆了,若是未有足够的说辞的话.
要是在客户端做游戏总计,那么不相同的客户端格外轻便就不一齐了.

进程

一、进度的定义

用muliprocessing那几个包中的Process来定义多进度,跟定义拾2线程类似

from multiprocessing import Process   # 导入进程模块
import time


def run(name):
    time.sleep(2)
    print("hello", name)

if __name__ == "__main__":
    p_obj_list = list()    # 存放进程对象
    for i in range(10):    # 启动10个进程
        p = Process(target=run, args=("QQ{0}".format(i),))  # 产生一个进程实例
        p.start()   # 启动进程
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()   # 等待进程结果
 1 import multiprocessing
 2 def foo(q):
 3     q.put([1,'hello',True])
 4 if __name__=='__main__':
 5     q=multiprocessing.Queue()#通过multiprocessing建立一个队列
 6     p=multiprocessing.Process(target=foo,args=(q,))
 7   #用multiprocessing在调用Process,建立一个子进程,定义函数名,将q作为参数传到foo函数,
 8     #foo函数就可以通过这个参数来与主进程做交互了。
 9     p.start()#激活这个子进程
10     print(q.get())#主进程

A game loop iteration is often called a tick. Tick is an event
meaning that current game loop iteration is over and the data for the
next frame(s) is ready.

一.意义:计算机中的程序关于某数码集合上的一遍运营活动,是系统举办能源分配和调整的核心单位。说白了就是一个先后的实践实例。

实行1个先后正是贰个经过,举个例子你张开浏览器看到本人的博客,浏览器本身是一个软件程序,你此时开采的浏览器就是三个进度。

 

二、进度中投入线程

from multiprocessing import Process
import time,threading


def thread_run(name):   # 定义线程执行的方法
    print("{0}:{1}".format(name, threading.get_ident()))  # thread.get_ident ()返回当前线程的标识符,标识符是一个非零整数


def run(name):
    time.sleep(2)
    print("hello", name)
    t = threading.Thread(target=thread_run, args=(name,))   # 嵌入线程
    t.start()   # 执行线程


if __name__ == "__main__":
    p_obj_list = list()
    for i in range(10):
        p = Process(target=run, args=("QQ{0}".format(i),))
        p.start()
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()

地点函数通过multiprocessing的queue来达成进程间通讯。

在下2个例子中,我们写1个客户端,这么些客户端通过WebSocket连接服务器,同时运维三个简便的loop,接受输入发送给服务器,回显信息.Client
source code is located
here.

二.过程的性状

  • ### 1个历程里能够有四个子进程

  • ### 新的经过的开创是一点一滴拷贝整个主进度

  • ### 进度里能够分包线程

  • ### 进度之间(包涵主进程和子进程)不设有数据共享,相互通讯(浏览器和python之间的多寡无法互通的),要通信则要信赖队列,管道之类的

 

叁、老爹和儿子进程

各类子进度都是由3个父进度运维的,每一种程序也是有叁个父进度

from multiprocessing import Process
import os


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  # 获得父进程ID
    print('process id:', os.getpid())  # 获得子进程ID
    print("\n\n")


def f(name):
    info('\033[31;1m function f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1m main process line\033[0m')
    p = Process(target=f, args=('QQ',))
    p.start()
    p.join()

  

 

 

 1 from multiprocessing import  Pipe,Process
 2 def foo(sk):
 3     sk.send('hello world')#通过管道sk发送内容
 4     print(sk.recv())#打印接收到的内容
 5 if __name__ == '__main__':
 6     sock,conn=Pipe()#定义一个管道的两头
 7     p=Process(target=foo,args=(sock,))#由于上面已经通过multiprocessing导入了Process,
 8     # 所以这里直接就可以创建一个子进程,并将sock(管道的一头)作为参数给foo函数
 9     p.start()#激活这个进程
10     print(conn.recv())#打印接收到的内容,conn是管道的另一头
11     conn.send('hi son')#通过管道发送内容

3.1

Example 3.1 source
code

大家运用aiohttp来成立二个game
server.那几个库能够创制asyncio的client和server.这些库的裨益是同时帮助http请求和websocket.所以服务器就无需把结果管理成html了.
来看一下server怎么样运维:

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


async def wshandler(request):
    app = request.app
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    app["sockets"].append(ws)

    while 1:
        msg = await ws.receive()
        if msg.tp == web.MsgType.text:
            print("Got message %s" % msg.data)
            ws.send_str("Pressed key code: {}".format(msg.data))
        elif msg.tp == web.MsgType.close or\
             msg.tp == web.MsgType.error:
            break

    app["sockets"].remove(ws)
    print("Closed connection")
    return ws

async def game_loop(app):
    while 1:
        for ws in app["sockets"]:
            ws.send_str("game loop says: tick")
        await asyncio.sleep(2)


app = web.Application()
app["sockets"] = []

asyncio.ensure_future(game_loop(app))

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

这一个代码就不翻译了,

3.经过和线程之间的分别

  • ### 线程共享地址空间,而经过之间有互动独立的半空中

  • ### 线程之间数据互通,互相操作,而经过不得以

  • ### 新的线程比新的长河创立轻巧,比开进程的支付小好些个

  • ### 主线程能够影响子线程,而主进程不能够影响子进度

 

 

经过间数据交互与共享

知晓差异进程之间内部存款和储蓄器是不共享的,要想完结三个进程间的通信必要使用multiprocessing库中的queue(队列)模块,那么些multiprocessing库中的queue模块跟单纯的queue库是区别等的。进度导入前者(这里的queue是专程为经过之间的通讯设计的)不失误,导入后者(这里的queue首如果线程间数据交互)出错。

下边代码通过Pipe来达成七个经过间的通讯。

三.2 有请求才起来loop

上面的事例,server是不停的loop.以后改成有请求才loop.
同时,server上可能存在多个room.3个player创设了贰个session(一场竞赛依然1个副本?),其余的player能够参加.

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


async def wshandler(request):
    app = request.app
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    app["sockets"].append(ws)

    if app["game_is_running"] == False:
        asyncio.ensure_future(game_loop(app))
    while 1:
        msg = await ws.receive()
        if msg.tp == web.MsgType.text:
            print("Got message %s" % msg.data)
            ws.send_str("Pressed key code: {}".format(msg.data))
        elif msg.tp == web.MsgType.close or\
             msg.tp == web.MsgType.error:
            break

    app["sockets"].remove(ws)
    print("Closed connection")

    return ws

async def game_loop(app):
    app["game_is_running"] = True
    while 1:
        for ws in app["sockets"]:
            ws.send_str("game loop says: tick")
        if len(app["sockets"]) == 0:
            break
        await asyncio.sleep(2)
    app["game_is_running"] = False


app = web.Application()

app["sockets"] = []
app["game_is_running"] = False

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

四.在python中,进度与线程的用法就只是名字不一样,使用的章程也是没多大分裂

壹、线程访问queue

import queue,threading


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把这个q传给了子线程
    p = threading.Thread(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
[66, None, 'hello word']
 1 from multiprocessing import  Manager,Process
 2 def foo(l,i):#收到参数,l是Mlist,i是循环的i
 3     l.append(i*i)#将i平方添加到Mlist
 4 if __name__=='__main__':
 5     manager=Manager()
 6     Mlist=manager.list([11,22,33])#定义一个列表
 7 
 8     l=[]
 9     for i in range(5):#创建5个子进程
10         p=Process(target=foo,args=(Mlist,i))#定义一个进程,将Mlist和i作为参数传到foo
11         p.start()#激活这个进程,执行foo函数
12         l.append(p)#将5个进程添加到l这个列表
13     for i in l:
14         i.join()#循环这个列表,然后将每个进程join
15     print(Mlist)#当所有的子进程都结束,运行主进程

3.3 管理task

从来操作task对象.未有人的时候,能够cancel掉task.
注意!:
This cancel()
call tells scheduler not to pass execution to this coroutine anymore and
sets its state tocancelled
which then can be checked by cancelled()
method. And here is one caveat worth to mention: when you have external
references to a task object and exception happens in this task, this
exception will not be raised. Instead, an exception is set to this task
and may be checked by exception()
method. Such silent fails are not useful when debugging a code. Thus,
you may want to raise all exceptions instead. To do so you need to call
result()
method of unfinished task explicitly. This can be done in a callback:

假诺想要cancel掉,也不想触发exception,那么就检查一下canceled状态.
app["game_loop"].add_done_callback(lambda t: t.result() if not t.cancelled() else None)

5.粗略实例

一)创制2个大概的多进度:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

def func(name):
    time.sleep(1)
    print('hello',name,time.ctime())

ml = []
for i in range(3):
    p = multiprocessing.Process(target=func,args=('yang',))
    p.start()
    ml.append(p)

for i in ml:
    i.join() #注意这里,进程必须加join方法,不然会导致僵尸进程

  

运维结果:

美高梅开户网址 1

 

不管怎么说,反正报错了,同样的代码,在python自带的IDLE里搜求:

美高梅开户网址 2

并未有别的交事务物就谢世了。好的,这里要说下了,依照我个人的掌握,当你用pycharm大概IDLE时,pycharm只怕IDLE在您的计算机里本人也是多少个经过,并且私下认可是主进度。所以在pycharm会报错,而在IDLE里运转就是空手,个人通晓,对不对暂时不谈,早先时期学到子进度时再说。

 

化解办法正是,别的的不改变,加1个if __name == ‘__main__’推断就行:

美高梅开户网址 3

 

那样就一举成功了,好的,你今后得以回味到那句话了,进度与线程的用法就只是名字不一致,使用的秘技也是没多大不一致。不多说,自行体会。而运维结果看出的岁月是联合具名的,那么那进程才是真的含义上的互相运转。

 

知识十遗篇,进程间通讯。二)自定义类式进度

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

class myprocess(multiprocessing.Process):
    def __init__(self,name):
        super(myprocess,self).__init__()
        self.name = name

    def run(self):
        time.sleep(1)
        print('hello',self.name,time.ctime())

if __name__ == '__main__':
    ml = []
    for i in range(3):
        p = myprocess('yang')
        p.start()
        ml.append(p)

    for j in ml:
        j.join()

  

运营结果:

美高梅开户网址 4

 

 

然后setDaemon之类的章程和线程也是完全壹致的。

 

三)每3个进度都有根进度,换句话,每一个经过都有父进度

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time,os

def info():
    print('mudule name:',__name__)
    print('parent process:',os.getppid()) #父进程号
    print('son process:',os.getpid())     #子进程号

if __name__ == '__main__':
    info()
    print('-----')
    p = multiprocessing.Process(target=info,args=[])
    p.start()
    p.join()

  

运作结果:

 

美高梅开户网址 5

 

而查看自身本机的历程:

美高梅开户网址 6

 

能够知道,6204正是pycharm,就是那儿的根进度,而主进度即是自身那几个py文件(由__main__可以),接着再往下的子进程等等等的。

 

贰、进度访问queue

from multiprocessing import Process
import queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把这个q传给了子线程
    p = Process(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
Traceback (most recent call last):
  File "C:/Users/dell/PycharmProjects/untitled/process/进程的定义.py", line 77, in <module>
    p.start()
  File "C:\Python36\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Python36\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python36\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

地点代码通过Manger落成子进度间的通信。

三.4 等待四个事件

Example 3.4 source
code
在无数情况下,需求在服务器管理客户端的handler中,
等待七个事件.除了等候客户端的新闻,可能还索要拭目以俟不一致的消息爆发.比方,
游戏壹局的时日到了,供给三个timer的复信号.只怕,须求别的进度的音信,可能其余server的消息.(使用布满式音讯系统).
上边那个例子使用了Condition.这里不保留全体的socket,而是在每一趟循环结束通过Condition.notify_all来公告.那些应用pub/sub形式落成.
为了在三个handler中,等待三个事件,首先大家应用ensure_future来包装一下.

if not recv_task: 
  recv_task = asyncio.ensure_future(ws.receive())
if not tick_task: 
  await tick.acquire() 
  tick_task = asyncio.ensure_future(tick.wait())```

在我们调用Condition.call之前,我们需要获取一下锁.这个锁在调用了tick.wait之后就释放掉.这样其他的协程也可以用了.但是当我们得到一个notification, 会重新获取锁.所以我们在收到notification之后要release一下.

done, pending = await asyncio.wait( [recv_task, tick_task],
return_when=asyncio.FIRST_COMPLETED)“`
其1会阻塞住直到有3个任务完成,今年会回到五个列表,实现的和依旧在运作的.假诺task
is done,大家再安装为None,那样下3个循环里会再叁次创制.

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)



tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

async def game_loop():
    while 1:
        await tick.acquire()
        tick.notify_all()
        tick.release()
        await asyncio.sleep(1)

asyncio.ensure_future(game_loop())

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

(那个重大是asyncio.Condition的用法)

陆.多进度间的通讯和数目共享

第二大家都早已理解进度之间是单身的,不得以互通,并且数据交互独立,而在实际上付出中,一定会遇见须求经过间通讯的情状供给,那么大家怎么搞呢

有二种方法:

  • pipe
  • queue

1)使用queue通信

在102线程这里已经学过queue了,成立queue的格局,q =
queue.Queue(),那种创造是开创的线程queue,并不是经过queue。创设进度queue的法子是:

美高梅开户网址 7

 

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(q,name,age): #这里必须要把q对象作为参数传入才能实现进程之间通信
    q.put({'name':name,'age':age})

if __name__ == '__main__':
    q = multiprocessing.Queue() #创建进程queue对象
    ml = []
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(q,'yang',21))
        p.start()
        ml.append(p)
    print(q.get()) #获取queue信息
    print(q.get()) 
    print(q.get())
    for i in ml:
        i.join()

  

运作结果:

美高梅开户网址 8

 

好的,已经由此queue完结通讯,那么精心的相恋的人大概会想,此时的queue到底是同3个呢照旧copy的呢?起先测试,码如下:

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(q,name,age):
    q.put({'name':name,'age':age})
    print('id:',id(q))
if __name__ == '__main__':
    q = multiprocessing.Queue()
    ml = []
    print('id:',id(q))
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(q,'yang',21))
        p.start()
        ml.append(p)
    print(q.get())
    print(q.get())
    print(q.get())
    for i in ml:
        i.join()

  

在Windows平台运转结果:

美高梅开户网址 9

 

Linux的ubuntu下是如此的:

美高梅开户网址 10

 

那就不好怎么说了,小编个人的知晓,线程和进程那类与Computer硬件(CPU,RAM)等有联系的都有不鲜明因素,姑且认为在Windows平台里queue是copy的,在Linux里是同3个吧,并且据经验职员代表,在macbook上也是同2个。

 

还有个难点, 若是使用的queue是线程式的啊?

代码别的都没变,只改了这里:

美高梅开户网址 11

 

结果:

美高梅开户网址 12

 

尽管如此报错了,可是却有一个关键点,提醒的是不可能pickle线程锁对象,约等于说刚才大家选取的queue是经过对象,所以能够pickle,注意了,这里正是关键点,使用了pickle,那么也正是说,在Windows平台里是copy的,借使不是copy,就没有需求存在pickle对啊?直接拿来用就是呀,干嘛要pickle之后取的时候再反pickle呢对吧?

 

再看Linux下啊,由于Linux暗许是python2,所以模块包名稍微有点差异

美高梅开户网址 13

结果阻塞住了,但是后边的或许出来了,看到的id果然还是一如未来的。

 

此间就有三点须求小心:(个人知道,如有误望指正)

1.进度里的确无法使用线程式queue

二.Windows平台的进度式queue是copy的

叁.Linux平台的线程式和进程式都以同多个,可是假使在进程里使用线程式queue会阻塞住

但笔者个人感到copy更有安全性

 

2)使用pipe通信

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(conn):
    conn.send('约吗?')  #子进程发送数据
    print(conn.recv())  #接受数据,不能加参数1024之类的
    conn.close()        #子进程关闭连接
if __name__ == '__main__':
    parent_conn,son_conn = multiprocessing.Pipe() #创建pipe对象,父进程,子进程
    ml = []
    p = multiprocessing.Process(target=func,args=(son_conn,))
    p.start()
    print(parent_conn.recv())  #父进程接受数据,不能加参数1024之类的
    parent_conn.send('不约')    #发送数据
    p.join()                   #join方法是进程特有

 

  

运转结果:

美高梅开户网址 14

 

那样就关系上了,相信您发现了,主导和前边的socket差不多,可是唯1的不等是recv()方法不能够加参数,不信的话,你加来尝试

回看线程通讯,相信您会感觉进度比线程更有利于

 

当然pipe也可以有四个:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

def func(conn):
    conn.send('约吗?')  #子进程发送数据
    print(conn.recv())
    conn.close()        #子进程关闭连接
if __name__ == '__main__':
    parent_conn,son_conn = multiprocessing.Pipe() #创建pipe对象,父进程,子进程
    ml = []
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(son_conn,))
        p.start()
        ml.append(p)
        print(parent_conn.recv())  #父进程接受数据,不能加参数1024之类的
        parent_conn.send('不约')
    for i in ml:
        i.join()

  

运作结果:

美高梅开户网址 15

 

柒.进程之间数据共享——manager

比较轻易,就应用了经过里的manager对象下的依次数据类型,别的的非常的粗略的,小编就不注释了

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(l,d,num):
    l.append(num)
    d[num] = num

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        l = manager.list()
        d = manager.dict()
        ml = []
        for i in range(6):
            p = multiprocessing.Process(target=func,args=(l,d,i))
            p.start()
            ml.append(p)
        for i in ml:
            i.join()
        print('d:',d)
        print('l:',l)

  

运转结果:

美高梅开户网址 16

 

那样是否就兑现了多中国少年共产党享了?

 

好的,进度也剖析完了

 

三、进程访问`multiprocessing库中的Queue模块`

from multiprocessing import Process,Queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = Queue()   # 把这个q传给了子线程
    p = Process(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
[66, None, 'hello word']

父进度约等于克隆三个Q,把团结的Q克隆了壹份交给子进度,子进度那个时候往Q里面放了一份数据,然后父进度又能实际的获得到。可是你克隆了一份是还是不是就和父进度未有涉嫌了,为何还是能够维系在一块吗?然则实际:等于那五个Q里面包车型大巴数额又把它类别化了,系列化到叁个中等的地点,类似于翻译,然后反体系化给这些父进度那边来了,其实那五个Q就是通过pickle来体系化的,不是二个实在的Q。

总括:三个线程之间能够修改1个数额,不加锁,恐怕就能出错。未来进程中的Queue,是兑现了数码的传递,不是在改变同一份数据,只是完结二个经过的数额传给了此外一个历程。

美高梅开户网址 17

3.5 和线程一齐利用

Example 3.5 source
code

本条例子,大家把asyncio的loop放到别的三个独立线程中.上边也说过了,因为python的GIL的安插性,不容许同时运维八个code.所以使用三八线程来拍卖计算瓶颈的标题,并不是2个好主意.然后还有别的三个利用线程原因正是:
假如有些函数或许库不帮助asyncio,那么就能阻塞住主线程的运维.那种情形下唯一的诀要就是身处其它二个线程中.

要留心asyncio自己不是threadsafe的,可是提供了三个函数.call_soon_threadsafe和run_coroutine_threadsafe.
当您运营这一个事例的时候,你会看到notify的线程id正是主线程的id,这是因为notify协程运维在主线程中,sleep运营在别的二个线程,所以不会阻塞住主线程.

import asyncio
from aiohttp import web

from concurrent.futures import ThreadPoolExecutor
import threading
from time import sleep


async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

def game_loop(asyncio_loop):
    print("Game loop thread id {}".format(threading.get_ident()))
    # a coroutine to run in main thread
    async def notify():
        print("Notify thread id {}".format(threading.get_ident()))
        await tick.acquire()
        tick.notify_all()
        tick.release()

    while 1:
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        # blocking the thread
        sleep(1)
        # make sure the task has finished
        task.result()

print("Main thread id {}".format(threading.get_ident()))

asyncio_loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
asyncio_loop.run_in_executor(executor, game_loop, asyncio_loop)

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

 

 

3.陆 多进度和扩展scaling up

三个线程的server可以干活了,不过那个server唯有一个cpu可用.
为了增加,大家要求周转三个进度,各样进程包涵自个儿的eventloop.
所以我们要求经过之间通讯的方式.同时在玩耍领域,平常会有大量的猜测(寻路什么的).这么些职分日常不会火速到位(一个tick内).
在协程中张开大气耗费时间的持筹握算未有趣,因为会阻塞住新闻循环本身.所以在那种处境下,把大量的估测计算交给其余的长河就很有不可或缺了
最简易的情势正是开发银行多少个单线程的server.然后,能够应用haproxy那样的load
balancer,来把客户端的三番五次分散到差别的历程上去.进城之间的通信有数不尽方法.一种是依靠互连网连接,也得以扩张到四个server.未来曾经有大多兑现了音信和存款和储蓄系统的框架(基于asyncio).
举个例子:

aiomcache
for memcached client
aiozmq
for zeroMQ
aioredis
for Redis storage and pub/sub

还有别的的有的乱七八糟,在git上,超过53%是aio打头.
选用互连网音讯,能够万分有效的储存数据,或然沟通音讯.不过如若要管理多量实时的数额,而且有大气经过通讯的事态,就十一分了.在这种状态下,四个更确切的措施是行使正式的unix
pipe.asyncio has support for pipes and there is a very low-level
example of the server which uses
pipes
inaiohttp repository.
在那一个例子中,大家利用python的高档次的multiprocessing库来触发3个新的历程来展开测算,通过multiprocessing.Queue来打开进度间通讯.不幸的是,近来的multiprocessing完毕并不援救asyncio.所以阻塞的调用就能够卡住住event
loop.
那正是利用线程的最好案例.因为大家在其余四个线程运维multiprocessing的代码.看代码

import asyncio
from aiohttp import web

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Queue, Process
import os
from time import sleep


async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

def game_loop(asyncio_loop):
    # coroutine to run in main thread
    async def notify():
        await tick.acquire()
        tick.notify_all()
        tick.release()

    queue = Queue()

    # function to run in a different process
    def worker():
        while 1:
            print("doing heavy calculation in process {}".format(os.getpid()))
            sleep(1)
            queue.put("calculation result")

    Process(target=worker).start()

    while 1:
        # blocks this thread but not main thread with event loop
        result = queue.get()
        print("getting {} in process {}".format(result, os.getpid()))
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        task.result()

asyncio_loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
asyncio_loop.run_in_executor(executor, game_loop, asyncio_loop)

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

worker()在此外二个进程中运转.包涵了一部分耗费时间总计,把结果放在queue中.获得结果随后,通告主线程的主eventloop这3个等待的client.那一个例子十三分简陋,进程未有适度停止,同时worker恐怕必要此外二个queue来输入数据.

Important! If you are going to run anotherasyncio
event loop in a different thread or sub-process created from main
thread/process, you need to create a loop explicitly, using
asyncio.new_event_loop()
, otherwise, it will not work.

肆、通过Pipe()落成进度间的数额交互,manger实现多中国少年共产党享

地点的例子是经过进度中的Queue,来进展数据共享的,其实还有1种方式贯彻数量共享,那即是管道,pipe,以及数额共享manger。

4.1、Pipe()函数

管道函数会再次回到由管道双方连日来的一组连接对象,该管道暗中同意是双向的(双向的)。

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 发送消息给父进程
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # 接收子进程的消息
    p.join()

四.二、接受反复和出殡和埋葬数次

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 发送消息给父进程
    conn.send("QQ")  # 发送消息给父进程
    print(conn.recv())   # 接收父进程的消息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())  # 接收两次
    parent_conn.send("微信")   # 发送给子进程
    p.join()

4.3、manger

manger能够产生多少间的共享。

from multiprocessing import Process, Manager
import os


def f(d, l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()   # 声明一个字典,这个字典是用manger声明的,不是用dict()声明的
        # manger.dict()是用专门的语法生产一个可在多进程之间进行传递和共享的一个字典
        l = manager.list(range(5))  # 同样声明一个列表
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

线程修改同一份数据的时候必要加锁,进程修改数据吧:不用加锁,因为那些manger已经帮你加锁了,它就私下认可不容许多个经过同时修改一份数据。七个经过没有办法同时修改1份数据,进度之间是单独的,它自身也要加锁,因为它把温馨的事物还要copy好几份,跟刚刚的不胜Queue同样,copy13个字典最后合成2个字典

 

 

 

协程
协程,又叫微线程,实际上正是单线程,通过python语法,或模块来贯彻产出。
实为上就是3个历程多个线程。

进程锁和进度池的运用

美高梅开户网址 18

1、进程锁

因此multiprocessing中的Lock模块来促成进度锁

from multiprocessing import Process,Lock   # 导入进程锁


def f(l, i):
    l.acquire()    # 加锁
    try:
        print("hello word", i)
    finally:
        l.release()   # 释放锁

if __name__ == "__main__":
    lock = Lock()     # 定义锁
    for num in range(10):
        Process(target=f, args=(lock, num,)).start()  # 把锁传入进程中

进度中不是互相独立的吧?为何还要加锁:纵然各种进程都以单身运作的,可是难题来了,它们共享1块显示器。这几个锁存在的意思就是显示器共享。即便经过一想着打字与印刷数据,而经过二想也想打字与印刷数据的意况,就有一点都不小可能率乱套了,然后经过那么些锁来决定,去打字与印刷的时候,那个显示器只有我独占,导致显示屏不会乱。

美高梅开户网址 19

2、进程池apply和apply_saync

2.1、appley

一路实行,也便是串行实施的

from multiprocessing import Pool  # 导入进程池模块pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印进程号


if __name__ == "__main__":
    pool = Pool(processes=5)   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply(func=foo, args=(i,))   # 同步执行挂起进程
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

2.2、apply_saync

异步施行,相当于并行实践。

from multiprocessing import Pool  # 导入进程池模块pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印进程号


if __name__ == "__main__":
    pool = Pool(processes=5)   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply_async(func=foo, args=(i,))   # 采用异步方式执行foo函数
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

二.三、异步下回调函数

程序施行完成之后,再回调过来推行那个Bar函数。

from multiprocessing import Process,Pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印子进程的进程号


def bar(arg):
    print('-->exec done:', arg, os.getpid())   # 打印进程号

if __name__ == "__main__":
    pool = Pool(processes=2)
    print("主进程", os.getpid())   # 主进程的进程号
    for i in range(3):
        pool.apply_async(func=foo, args=(i,), callback=bar)   # 执行回调函数callback=Bar
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

#执行结果
主进程 752
end
in process 2348
-->exec done: None 752
in process 8364
-->exec done: None 752
in process 2348
-->exec done: None 752

注:

  1. 回调函数表达fun=Foo干不完就不实践bar函数,等Foo施行完就去实行Bar
  2. 其3遍调函数是主进度去调用的,而不是各样子进程去调用的。
  3. 回调函数的用途:

      比方说你从各类机器上备份实现,在回调函数中自动写三个剧本,说备份落成

  4. 回调函数是主过程调用的缘由?

      假诺是子进程去调用那一个回调函数,有多少个子进程就有稍许个一连,假诺是主进程的话,只需求一次长连接就足以了,那几个频率就高了

  

 

上海教室是用yield实现了2个五个函数逇并发处理。

 1 from greenlet import greenlet#导入这个模块
 2 def foo():#定义一个函数
 3     print('ok1')#打印
 4     gr2.switch()#将程序切换到下面一个函数,按照名字切
 5     print('ok3')#打印
 6     gr2.switch()#将程序切换到下面一个函数,按照名字切
 7 def bar():
 8     print('ok2')#打印
 9     gr1.switch()#切到上面foo函数
10     print('ok4')
11 gr1=greenlet(foo)#实例化这个函数
12 gr2=greenlet(bar)
13 gr1.switch()#在外面写这个就执行了这个函数

美高梅开户网址 ,由此greenlet模块的switch来兑现协程的切换,greenlet模块须求手动去pycharm下载

 1 import gevent#导入这个模块
 2 def foo():
 3     print('running in foo')
 4     gevent.sleep(2)#打印之后睡一秒,模拟io操作
 5     print('switch to foo again')
 6 def bar():
 7     print('switch  to bar')
 8     gevent.sleep(1)#打印之后睡一秒,模拟io操作
 9     print('switch to bar again')
10 gevent.joinall([gevent.spawn(foo),gevent.spawn(bar)])
11 '''
12 这个程序的运行过程是,先执行foo函数,
13 打印之后遇到了IO操作,然后自动切换到下一个函数执行,
14 打印之后又遇到了IO操作,然后切回foo函数发现IO2秒还没有结束,
15 然后又切到了bar函数发现IO结束,打印,再切回foo函数打印
16 '''

地方代码通过gevent模块来达成写成的IO时期机动切换达成产出的次第。
gevent需要从pycharm下载。

美高梅开户网址 20

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图