进度间数据交互与共享,知识十遗篇

 

多进程

进程之间是互为独立的,python是运维进度的时候,是运行的是原生进度。进程是不曾GIL锁的,而且不设有锁的概念,进度之间的数据式无法共享的,而线程是能够的。

当二十多线程创制落成之后,start并从未了当下运营,如故需求和别的线程抢CPU的身价,只是
时光十分的短。
进程之间的通讯分为两种,queue和pipe

写一个game 循环
game
loop是每一个游戏的焦点.它不停的获得用户输入,更新游戏状态,渲染游戏结果到显示屏上.网络电游分为客户端和服务端两部分.两边的loop通过网络连接起来.日常状态下,客户端获取用户输入,发送到服务端,服务端管理计算数据,更新游戏的使用者状态,发送结果个客户端.举个例子游戏的使用者也许游戏物体的地方.异常重大的是,不要把客户端和服务端的功力混淆了,借使未有丰盛的理由的话.
要是在客户端做游戏总结,那么不一样的客户端万分轻松就区别步了.

进程

壹、进度的概念

用muliprocessing那一个包中的Process来定义多进程,跟定义八线程类似

from multiprocessing import Process   # 导入进程模块
import time


def run(name):
    time.sleep(2)
    print("hello", name)

if __name__ == "__main__":
    p_obj_list = list()    # 存放进程对象
    for i in range(10):    # 启动10个进程
        p = Process(target=run, args=("QQ{0}".format(i),))  # 产生一个进程实例
        p.start()   # 启动进程
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()   # 等待进程结果
 1 import multiprocessing
 2 def foo(q):
 3     q.put([1,'hello',True])
 4 if __name__=='__main__':
 5     q=multiprocessing.Queue()#通过multiprocessing建立一个队列
 6     p=multiprocessing.Process(target=foo,args=(q,))
 7   #用multiprocessing在调用Process,建立一个子进程,定义函数名,将q作为参数传到foo函数,
 8     #foo函数就可以通过这个参数来与主进程做交互了。
 9     p.start()#激活这个子进程
10     print(q.get())#主进程

A game loop iteration is often called a tick. Tick is an event
meaning that current game loop iteration is over and the data for the
next frame(s) is ready.

1.意义:Computer中的程序关于某数码集结上的二回运转活动,是系统进行财富分配和调节的焦点单位。说白了就是1个先后的实行实例。

实践多个程序正是一个进程,比方您张开浏览器看到作者的博客,浏览器本人是三个软件程序,你此时张开的浏览器便是二个历程。

 

二、进程中投入线程

from multiprocessing import Process
import time,threading


def thread_run(name):   # 定义线程执行的方法
    print("{0}:{1}".format(name, threading.get_ident()))  # thread.get_ident ()返回当前线程的标识符,标识符是一个非零整数


def run(name):
    time.sleep(2)
    print("hello", name)
    t = threading.Thread(target=thread_run, args=(name,))   # 嵌入线程
    t.start()   # 执行线程


if __name__ == "__main__":
    p_obj_list = list()
    for i in range(10):
        p = Process(target=run, args=("QQ{0}".format(i),))
        p.start()
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()

地方函数通过multiprocessing的queue来落成进程间通讯。

在下一个例证中,我们写3个客户端,那么些客户端通过WebSocket连接服务器,同时运维二个简便的loop,接受输入发送给服务器,回显新闻.Client
source code is located
here.

贰.历程的特色

  • ### 3个经过里能够有几个子进度

  • ### 新的经过的创制是一点1滴拷贝整个主进度

  • ### 进程里可以分包线程

  • ### 进程之间(包罗主进度和子进度)不设有数量共享,互相通讯(浏览器和python之间的数码无法互通的),要通讯则要依赖队列,管道之类的

 

三、老爹和儿子进度

每一种子进度都以由3个父进度运营的,各样程序也是有2个父进度

from multiprocessing import Process
import os


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  # 获得父进程ID
    print('process id:', os.getpid())  # 获得子进程ID
    print("\n\n")


def f(name):
    info('\033[31;1m function f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1m main process line\033[0m')
    p = Process(target=f, args=('QQ',))
    p.start()
    p.join()

  

 

 

 1 from multiprocessing import  Pipe,Process
 2 def foo(sk):
 3     sk.send('hello world')#通过管道sk发送内容
 4     print(sk.recv())#打印接收到的内容
 5 if __name__ == '__main__':
 6     sock,conn=Pipe()#定义一个管道的两头
 7     p=Process(target=foo,args=(sock,))#由于上面已经通过multiprocessing导入了Process,
 8     # 所以这里直接就可以创建一个子进程,并将sock(管道的一头)作为参数给foo函数
 9     p.start()#激活这个进程
10     print(conn.recv())#打印接收到的内容,conn是管道的另一头
11     conn.send('hi son')#通过管道发送内容

3.1

Example 3.1 source
code

大家使用aiohttp来创设三个game
server.这几个库能够成立asyncio的client和server.那么些库的好处是同时协助http请求和websocket.所以服务器就不须求把结果管理成html了.
来看一下server如何运营:

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


async def wshandler(request):
    app = request.app
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    app["sockets"].append(ws)

    while 1:
        msg = await ws.receive()
        if msg.tp == web.MsgType.text:
            print("Got message %s" % msg.data)
            ws.send_str("Pressed key code: {}".format(msg.data))
        elif msg.tp == web.MsgType.close or\
             msg.tp == web.MsgType.error:
            break

    app["sockets"].remove(ws)
    print("Closed connection")
    return ws

async def game_loop(app):
    while 1:
        for ws in app["sockets"]:
            ws.send_str("game loop says: tick")
        await asyncio.sleep(2)


app = web.Application()
app["sockets"] = []

asyncio.ensure_future(game_loop(app))

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

其一代码就不翻译了,

三.进度和线程之间的分歧

  • ### 线程共享地址空间,而经过之间有相互独立的半空中

  • ### 线程之间数据互通,相互操作,而经过不得以

  • ### 新的线程比新的历程成立简单,比开进度的开垦小大多

  • ### 主线程能够影响子线程,而主进度不可能影响子进度

 

 

进度间数据交互与共享

知道分化进度之间内部存款和储蓄器是不共享的,要想完结五个经过间的通讯供给采用multiprocessing库中的queue(队列)模块,那些multiprocessing库中的queue模块跟单纯的queue库是不均等的。进程导入前者(这里的queue是特地为经过之间的通讯设计的)不失误,导入后者(那里的queue首倘若线程间数据交互)出错。

地点代码通过Pipe来促成七个进度间的通讯。

三.二 有请求才起来loop

地点的例证,server是不停的loop.现在改成有请求才loop.
同时,server上或许存在几个room.1个player创设了二个session(一场较量依然三个别本?),别的的player能够参与.

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


async def wshandler(request):
    app = request.app
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    app["sockets"].append(ws)

    if app["game_is_running"] == False:
        asyncio.ensure_future(game_loop(app))
    while 1:
        msg = await ws.receive()
        if msg.tp == web.MsgType.text:
            print("Got message %s" % msg.data)
            ws.send_str("Pressed key code: {}".format(msg.data))
        elif msg.tp == web.MsgType.close or\
             msg.tp == web.MsgType.error:
            break

    app["sockets"].remove(ws)
    print("Closed connection")

    return ws

async def game_loop(app):
    app["game_is_running"] = True
    while 1:
        for ws in app["sockets"]:
            ws.send_str("game loop says: tick")
        if len(app["sockets"]) == 0:
            break
        await asyncio.sleep(2)
    app["game_is_running"] = False


app = web.Application()

app["sockets"] = []
app["game_is_running"] = False

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

四.在python中,进程与线程的用法就只是名字分化,使用的章程也是没多大分歧

1、线程访问queue

import queue,threading


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把这个q传给了子线程
    p = threading.Thread(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
[66, None, 'hello word']
 1 from multiprocessing import  Manager,Process
 2 def foo(l,i):#收到参数,l是Mlist,i是循环的i
 3     l.append(i*i)#将i平方添加到Mlist
 4 if __name__=='__main__':
 5     manager=Manager()
 6     Mlist=manager.list([11,22,33])#定义一个列表
 7 
 8     l=[]
 9     for i in range(5):#创建5个子进程
10         p=Process(target=foo,args=(Mlist,i))#定义一个进程,将Mlist和i作为参数传到foo
11         p.start()#激活这个进程,执行foo函数
12         l.append(p)#将5个进程添加到l这个列表
13     for i in l:
14         i.join()#循环这个列表,然后将每个进程join
15     print(Mlist)#当所有的子进程都结束,运行主进程

3.3 管理task

直接操作task对象.没有人的时候,能够cancel掉task.
注意!:
This cancel()
call tells scheduler not to pass execution to this coroutine anymore and
sets its state tocancelled
which then can be checked by cancelled()
method. And here is one caveat worth to mention: when you have external
references to a task object and exception happens in this task, this
exception will not be raised. Instead, an exception is set to this task
and may be checked by exception()
method. Such silent fails are not useful when debugging a code. Thus,
you may want to raise all exceptions instead. To do so you need to call
result()
method of unfinished task explicitly. This can be done in a callback:

只要想要cancel掉,也不想触发exception,那么就检查一下canceled状态.
app["game_loop"].add_done_callback(lambda t: t.result() if not t.cancelled() else None)

五.简易实例

1)创设三个粗略的多进度:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

def func(name):
    time.sleep(1)
    print('hello',name,time.ctime())

ml = []
for i in range(3):
    p = multiprocessing.Process(target=func,args=('yang',))
    p.start()
    ml.append(p)

for i in ml:
    i.join() #注意这里,进程必须加join方法,不然会导致僵尸进程

  

运行结果:

美高梅开户网址 1

 

不管怎么说,反正报错了,一样的代码,在python自带的IDLE里尝试:

美高梅开户网址 2

尚未别的交事务物就长逝了。好的,那里要说下了,根据我个人的明白,当你用pycharm也许IDLE时,pycharm或许IDLE在您的Computer里本人也是多少个经过,并且默许是主进度。所以在pycharm会报错,而在IDLE里运转正是空手,个人通晓,对不对临时不谈,早先时期学到子进程时再说。

 

消除办法正是,别的的不改变,加二个if __进度间数据交互与共享,知识十遗篇。name == ‘__main__’决断就行:

美高梅开户网址 3

 

这样就消除了,好的,你今后能够体会到那句话了,经过与线程的用法就只是名字区别,使用的主意也是没多大分别。不多说,自行体会。而运营结果来看的时日是一起的,那么那进程才是的确含义上的并行运转。

 

2)自定义类式进度

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

class myprocess(multiprocessing.Process):
    def __init__(self,name):
        super(myprocess,self).__init__()
        self.name = name

    def run(self):
        time.sleep(1)
        print('hello',self.name,time.ctime())

if __name__ == '__main__':
    ml = []
    for i in range(3):
        p = myprocess('yang')
        p.start()
        ml.append(p)

    for j in ml:
        j.join()

  

运营结果:

美高梅开户网址 4

 

 

接下来setDaemon之类的措施和线程也是完全1致的。

 

三)每1个历程都有根进程,换句话,每八个进度都有父进度

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time,os

def info():
    print('mudule name:',__name__)
    print('parent process:',os.getppid()) #父进程号
    print('son process:',os.getpid())     #子进程号

if __name__ == '__main__':
    info()
    print('-----')
    p = multiprocessing.Process(target=info,args=[])
    p.start()
    p.join()

  

运作结果:

 

美高梅开户网址 5

 

而查看自身本机的进程:

美高梅开户网址 6

 

能够领悟,620四正是pycharm,正是那时的根进度,而主进度正是本身这么些py文件(由__main__可见),接着再往下的子进度等等等的。

 

贰、进程访问queue

from multiprocessing import Process
import queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把这个q传给了子线程
    p = Process(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
Traceback (most recent call last):
  File "C:/Users/dell/PycharmProjects/untitled/process/进程的定义.py", line 77, in <module>
    p.start()
  File "C:\Python36\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Python36\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python36\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

上边代码通过Manger达成子进度间的通讯。

叁.4 守候八个事件

Example 3.4 source
code
在多数气象下,须求在服务器处理客户端的handler中,
等待八个事件.除了等候客户端的新闻,也许还索要静观其变不相同的音讯发生.比方,
游戏1局的时辰到了,必要一个timer的实信号.或然,供给任何进度的音信,也许别的server的音讯.(使用布满式音讯系统).
上边那几个事例使用了Condition.那里不保留全体的socket,而是在每回循环甘休通过Condition.notify_all来通告.这么些利用pub/sub情势完毕.
为了在1个handler中,等待多少个事件,首先大家使用ensure_future来包装一下.

if not recv_task: 
  recv_task = asyncio.ensure_future(ws.receive())
if not tick_task: 
  await tick.acquire() 
  tick_task = asyncio.ensure_future(tick.wait())```

在我们调用Condition.call之前,我们需要获取一下锁.这个锁在调用了tick.wait之后就释放掉.这样其他的协程也可以用了.但是当我们得到一个notification, 会重新获取锁.所以我们在收到notification之后要release一下.

done, pending = await asyncio.wait( [recv_task, tick_task],
return_when=asyncio.FIRST_COMPLETED)“`
以此会阻塞住直到有一个职分完结,那一年会回到八个列表,实现的和照旧在运作的.假诺task
is done,我们再设置为None,这样下二个循环里会再2次成立.

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)



tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

async def game_loop():
    while 1:
        await tick.acquire()
        tick.notify_all()
        tick.release()
        await asyncio.sleep(1)

asyncio.ensure_future(game_loop())

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

(这几个关键是asyncio.Condition的用法)

六.多进度间的通讯和数量共享

第二大家都早已明白进程之间是单身的,不得以互通,并且数据交互独立,而在实际付出中,一定会遇见要求进度间通讯的场景须要,那么大家怎么搞呢

有二种方法:

  • pipe
  • queue

1)使用queue通信

在拾2线程那里已经学过queue了,创立queue的章程,q =
queue.Queue(),那种创立是成立的线程queue,并不是经过queue。创建进度queue的艺术是:

美高梅开户网址 7

 

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(q,name,age): #这里必须要把q对象作为参数传入才能实现进程之间通信
    q.put({'name':name,'age':age})

if __name__ == '__main__':
    q = multiprocessing.Queue() #创建进程queue对象
    ml = []
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(q,'yang',21))
        p.start()
        ml.append(p)
    print(q.get()) #获取queue信息
    print(q.get()) 
    print(q.get())
    for i in ml:
        i.join()

  

运营结果:

美高梅开户网址 8

 

好的,已经经过queue落成通讯,那么精心的恋人可能会想,此时的queue到底是同一个啊依然copy的吗?开头测试,码如下:

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(q,name,age):
    q.put({'name':name,'age':age})
    print('id:',id(q))
if __name__ == '__main__':
    q = multiprocessing.Queue()
    ml = []
    print('id:',id(q))
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(q,'yang',21))
        p.start()
        ml.append(p)
    print(q.get())
    print(q.get())
    print(q.get())
    for i in ml:
        i.join()

  

在Windows平台运维结果:

美高梅开户网址 9

 

Linux的ubuntu下是如此的:

美高梅开户网址 10

 

那就不佳怎么说了,笔者个人的驾驭,线程和进程那类与计算机硬件(CPU,RAM)等有挂钩的都有不显著因素,姑且以为在Windows平台里queue是copy的,在Linux里是同3个吗,并且据经验人员表示,在macbook上也是同一个。

 

还有个难点, 假设使用的queue是线程式的吧?

代码其余都没变,只改了那里:

美高梅开户网址 11

 

结果:

美高梅开户网址 12

 

尽管报错了,不过却有1个关键点,提醒的是不可能pickle线程锁对象,也便是说刚才我们利用的queue是经过对象,所以能够pickle,注意了,那里就是关键点,使用了pickle,那么也便是说,在Windows平台里是copy的,假如不是copy,就不须求存在pickle对吧?直接拿来用正是呀,干嘛要pickle之后取的时候再反pickle呢对吗?

 

再看Linux下啊,由于Linux私下认可是python二,所以模块包名稍微有点差异

美高梅开户网址 13

结果阻塞住了,可是后面的恐怕出来了,看到的id果然照旧一如既往的。

 

此地就有③点需求注意:(个人驾驭,如有误望指正)

一.进度里的确不能使用线程式queue

2.Windows平台的进程式queue是copy的

3.Linux平台的线程式和进度式都以同1个,不过假如在经过里使用线程式queue会阻塞住

但作者个人以为copy更有安全性

美高梅开户网址, 

2)使用pipe通信

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(conn):
    conn.send('约吗?')  #子进程发送数据
    print(conn.recv())  #接受数据,不能加参数1024之类的
    conn.close()        #子进程关闭连接
if __name__ == '__main__':
    parent_conn,son_conn = multiprocessing.Pipe() #创建pipe对象,父进程,子进程
    ml = []
    p = multiprocessing.Process(target=func,args=(son_conn,))
    p.start()
    print(parent_conn.recv())  #父进程接受数据,不能加参数1024之类的
    parent_conn.send('不约')    #发送数据
    p.join()                   #join方法是进程特有

 

  

运维结果:

美高梅开户网址 14

 

如此那般就联系上了,相信你开掘了,骨干和眼下的socket大概,不过唯壹的差别是recv()方法不能够加参数,不信的话,你加来试试看

回过头看线程通讯,相信你会感到进程比线程更利于

 

理所当然pipe也足以有多个:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

def func(conn):
    conn.send('约吗?')  #子进程发送数据
    print(conn.recv())
    conn.close()        #子进程关闭连接
if __name__ == '__main__':
    parent_conn,son_conn = multiprocessing.Pipe() #创建pipe对象,父进程,子进程
    ml = []
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(son_conn,))
        p.start()
        ml.append(p)
        print(parent_conn.recv())  #父进程接受数据,不能加参数1024之类的
        parent_conn.send('不约')
    for i in ml:
        i.join()

  

运转结果:

美高梅开户网址 15

 

柒.历程之间数据共享——manager

相比轻便,就应用了经过里的manager对象下的逐条数据类型,其余的很简单的,笔者就不注释了

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(l,d,num):
    l.append(num)
    d[num] = num

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        l = manager.list()
        d = manager.dict()
        ml = []
        for i in range(6):
            p = multiprocessing.Process(target=func,args=(l,d,i))
            p.start()
            ml.append(p)
        for i in ml:
            i.join()
        print('d:',d)
        print('l:',l)

  

运行结果:

美高梅开户网址 16

 

如此是或不是就得以完成了数据共享了?

 

好的,进度也剖析完了

 

三、进度访问`multiprocessing库中的Queue模块`

from multiprocessing import Process,Queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = Queue()   # 把这个q传给了子线程
    p = Process(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
[66, None, 'hello word']

父进度相当于克隆3个Q,把自个儿的Q克隆了壹份交给子过程,子进程那个时候往Q里面放了1份数据,然后父进度又能实际的收获到。但是你克隆了1份是否就和父进程未有关联了,为什么还是能够维系在同步啊?不过事实上:等于那八个Q里面包车型大巴数额又把它种类化了,种类化到贰当中路的地点,类似于翻译,然后反体系化给那些父进度那边来了,其实那八个Q就是通过pickle来连串化的,不是二个实在的Q。

总计:五个线程之间可以修改一个数额,不加锁,大概就会出错。今后经过中的Queue,是得以达成了多少的传递,不是在修改同1份数据,只是达成2个进度的多寡传给了其它二个经过。

美高梅开户网址 17

三.5 和线程一齐使用

Example 3.5 source
code

这些例子,大家把asyncio的loop放到别的三个单身线程中.上边也说过了,因为python的GIL的宏图,不可能还要运营八个code.所以使用八线程来拍卖总括瓶颈的标题,并不是3个好主意.然后还有其它一个行使线程原因就是:
假使有的函数大概库不援助asyncio,那么就会阻塞住主线程的运营.那种情状下唯一的不2秘技便是位于其它三个线程中.

要留意asyncio本人不是threadsafe的,不过提供了四个函数.call_soon_threadsafe和run_coroutine_threadsafe.
当您运维这么些例子的时候,你会看出notify的线程id正是主线程的id,那是因为notify协程运营在主线程中,sleep运维在其余一个线程,所以不会阻塞住主线程.

import asyncio
from aiohttp import web

from concurrent.futures import ThreadPoolExecutor
import threading
from time import sleep


async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

def game_loop(asyncio_loop):
    print("Game loop thread id {}".format(threading.get_ident()))
    # a coroutine to run in main thread
    async def notify():
        print("Notify thread id {}".format(threading.get_ident()))
        await tick.acquire()
        tick.notify_all()
        tick.release()

    while 1:
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        # blocking the thread
        sleep(1)
        # make sure the task has finished
        task.result()

print("Main thread id {}".format(threading.get_ident()))

asyncio_loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
asyncio_loop.run_in_executor(executor, game_loop, asyncio_loop)

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

 

 

三.陆 多过程和增加scaling up

四个线程的server能够干活了,不过这么些server只有贰个cpu可用.
为了扩大,大家须求周转多个进程,各个进程蕴涵自个儿的eventloop.
所以大家要求经过之间通讯的形式.同时在打闹领域,日常会有大批量的企图(寻路什么的).这几个职务平日不会火速到位(二个tick内).
在协程中开展大气耗费时间的乘除未有意义,因为会阻塞住音信循环本身.所以在这种地方下,把大气的估测计算交给别的的经过就很有供给了
最简便易行的方法正是运营三个单线程的server.然后,能够接纳haproxy这样的load
balancer,来把客户端的几次三番分散到不相同的历程上去.进城之间的通讯有为数不少方法.一种是基于网络连接,也足以扩大到多个server.未来已经有广大兑现了新闻和存款和储蓄系统的框架(基于asyncio).
比方:

aiomcache
for memcached client
aiozmq
for zeroMQ
aioredis
for Redis storage and pub/sub

还有其余的片段乱七八糟,在git上,抢先八分之四是aio打头.
采用网络新闻,能够格外有效的贮存数据,大概调换音讯.可是假诺要管理大量实时的数额,而且有雅量经过通信的气象,就丰硕了.在那种景观下,3个更适合的秘技是接纳标准的unix
pipe.asyncio has support for pipes and there is a very low-level
example of the server which uses
pipes
inaiohttp repository.
在这么些例子中,咱们选用python的高等级次序的multiprocessing库来触发1个新的进度来进展测算,通过multiprocessing.Queue来开始展览进程间通讯.不幸的是,近日的multiprocessing落成并不协理asyncio.所以阻塞的调用就会阻塞住event
loop.
那多亏利用线程的最棒案例.因为大家在别的五个线程运维multiprocessing的代码.看代码

import asyncio
from aiohttp import web

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Queue, Process
import os
from time import sleep


async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

def game_loop(asyncio_loop):
    # coroutine to run in main thread
    async def notify():
        await tick.acquire()
        tick.notify_all()
        tick.release()

    queue = Queue()

    # function to run in a different process
    def worker():
        while 1:
            print("doing heavy calculation in process {}".format(os.getpid()))
            sleep(1)
            queue.put("calculation result")

    Process(target=worker).start()

    while 1:
        # blocks this thread but not main thread with event loop
        result = queue.get()
        print("getting {} in process {}".format(result, os.getpid()))
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        task.result()

asyncio_loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
asyncio_loop.run_in_executor(executor, game_loop, asyncio_loop)

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

worker()在别的2个进程中运转.包蕴了部分耗费时间总计,把结果放在queue中.获得结果之后,文告主线程的主eventloop那多少个等待的client.那么些例子十二分简陋,进度没有适当甘休,同时worker恐怕要求其它贰个queue来输入数据.

Important! If you are going to run anotherasyncio
event loop in a different thread or sub-process created from main
thread/process, you need to create a loop explicitly, using
asyncio.new_event_loop()
, otherwise, it will not work.

4、通过Pipe()完毕进度间的多寡交互,manger实现数量共享

地点的例证是经过进度中的Queue,来实行数量共享的,其实还有一种格局贯彻数据共享,那便是管道,pipe,以及数据共享manger。

4.1、Pipe()函数

管道函数会回去由管道双方连日来的一组连接对象,该管道私下认可是双向的(双向的)。

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 发送消息给父进程
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # 接收子进程的消息
    p.join()

四.2、接受反复和发送数次

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 发送消息给父进程
    conn.send("QQ")  # 发送消息给父进程
    print(conn.recv())   # 接收父进程的消息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())  # 接收两次
    parent_conn.send("微信")   # 发送给子进程
    p.join()

4.3、manger

manger能够成功多少间的共享。

from multiprocessing import Process, Manager
import os


def f(d, l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()   # 声明一个字典,这个字典是用manger声明的,不是用dict()声明的
        # manger.dict()是用专门的语法生产一个可在多进程之间进行传递和共享的一个字典
        l = manager.list(range(5))  # 同样声明一个列表
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

线程修改同1份数据的时候供给加锁,进程修改数据吧:不用加锁,因为那一个manger已经帮你加锁了,它就暗中认可不容许四个经过同时修改一份数据。多少个进度未有主意同时修改1份数据,进度之间是独立的,它和睦也要加锁,因为它把温馨的东西还要copy好几份,跟刚刚的百般Queue同样,copy拾个字典最后合成1个字典

 

 

 

协程
协程,又叫微线程,实际上便是单线程,通过python语法,或模块来贯彻产出。
实质上正是叁个经过一个线程。

进度锁和进度池的运用

美高梅开户网址 18

1、进程锁

由此multiprocessing中的Lock模块来兑现进程锁

from multiprocessing import Process,Lock   # 导入进程锁


def f(l, i):
    l.acquire()    # 加锁
    try:
        print("hello word", i)
    finally:
        l.release()   # 释放锁

if __name__ == "__main__":
    lock = Lock()     # 定义锁
    for num in range(10):
        Process(target=f, args=(lock, num,)).start()  # 把锁传入进程中

进程中不是互相独立的吧?为啥还要加锁:即便各样进程都以独自运营的,不过难题来了,它们共享一块显示屏。这些锁存在的意思正是荧屏共享。如若经过一想着打印数据,而经过贰想也想打字与印刷数据的状态,就有希望乱套了,然后通过这些锁来决定,去打字与印刷的时候,那几个荧屏只有本身独占,导致显示屏不会乱。

美高梅开户网址 19

2、进程池apply和apply_saync

2.1、appley

手拉手实行,也正是串行施行的

from multiprocessing import Pool  # 导入进程池模块pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印进程号


if __name__ == "__main__":
    pool = Pool(processes=5)   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply(func=foo, args=(i,))   # 同步执行挂起进程
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

2.2、apply_saync

异步施行,也便是并行推行。

from multiprocessing import Pool  # 导入进程池模块pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印进程号


if __name__ == "__main__":
    pool = Pool(processes=5)   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply_async(func=foo, args=(i,))   # 采用异步方式执行foo函数
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

二.三、异步下回调函数

程序试行落成之后,再回调过来推行那么些Bar函数。

from multiprocessing import Process,Pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印子进程的进程号


def bar(arg):
    print('-->exec done:', arg, os.getpid())   # 打印进程号

if __name__ == "__main__":
    pool = Pool(processes=2)
    print("主进程", os.getpid())   # 主进程的进程号
    for i in range(3):
        pool.apply_async(func=foo, args=(i,), callback=bar)   # 执行回调函数callback=Bar
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

#执行结果
主进程 752
end
in process 2348
-->exec done: None 752
in process 8364
-->exec done: None 752
in process 2348
-->exec done: None 752

注:

  1. 回调函数表达fun=Foo干不完就不实践bar函数,等Foo执行完就去实行Bar
  2. 本条回调函数是主进度去调用的,而不是各类子进度去调用的。
  3. 回调函数的用处:

      比方说你从各类机器上备份完成,在回调函数中活动写2个剧本,说备份实现

  4. 回调函数是主进度调用的原故?

      假如是子进程去调用那些回调函数,有微微个子进程就有微微个再三再四,如若是主进度的话,只须求三次长连接就足以了,这一个作用就高了

  

 

上海教室是用yield实现了三个七个函数逇并发管理。

 1 from greenlet import greenlet#导入这个模块
 2 def foo():#定义一个函数
 3     print('ok1')#打印
 4     gr2.switch()#将程序切换到下面一个函数,按照名字切
 5     print('ok3')#打印
 6     gr2.switch()#将程序切换到下面一个函数,按照名字切
 7 def bar():
 8     print('ok2')#打印
 9     gr1.switch()#切到上面foo函数
10     print('ok4')
11 gr1=greenlet(foo)#实例化这个函数
12 gr2=greenlet(bar)
13 gr1.switch()#在外面写这个就执行了这个函数

经过greenlet模块的switch来兑现协程的切换,greenlet模块需求手动去pycharm下载

 1 import gevent#导入这个模块
 2 def foo():
 3     print('running in foo')
 4     gevent.sleep(2)#打印之后睡一秒,模拟io操作
 5     print('switch to foo again')
 6 def bar():
 7     print('switch  to bar')
 8     gevent.sleep(1)#打印之后睡一秒,模拟io操作
 9     print('switch to bar again')
10 gevent.joinall([gevent.spawn(foo),gevent.spawn(bar)])
11 '''
12 这个程序的运行过程是,先执行foo函数,
13 打印之后遇到了IO操作,然后自动切换到下一个函数执行,
14 打印之后又遇到了IO操作,然后切回foo函数发现IO2秒还没有结束,
15 然后又切到了bar函数发现IO结束,打印,再切回foo函数打印
16 '''

位置代码通过gevent模块来完成写成的IO时期机动切换落成产出的次第。
gevent需要从pycharm下载。

美高梅开户网址 20

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图