IO多路复用

2018 年 10 月 12 日 • 阅读数: 100

IO多路复用

阻塞IO实现HTTP

  • 通过urlparse函数解析出url的域名和路径
  • 建立socket连接
  • 发送请求
  • 接收数据
import socket
from urllib.parse import urlparse
def get_url(url):
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"
    '''建立socket连接'''
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, 80))
    '''发送数据'''
    client.send("GET {} HTTP/1.1\r\nHOST:{}\r\nConnection:close\r\n\r\n".
                        format(path, host).encode("utf-8"))
    '''接收数据'''
    data = b""
    while True:
        d = client.recv(1024)
        if d:
            data += d
        else:
            break
    data = data.decode("utf-8")
    client.close()
    return data

非阻赛IO实现HTTP

  • Http默认在建立连接的时候会阻塞
  • 当我们设置了setblocking(False)的时候,不会在建立连接的时候阻塞
  • 但会抛一个 BlockingIOError 的异常,需要进行异常处理
  • 然后因为连接还没建立成功,在发送数据的时候也会抛异常
  • 依然做一下异常处理,然后通过不停的循环请求,直到连接建立完成,成功发送数据
  • 数据发送成功后,在接收数据的时候也会出现异常
  • 再次做一下异常处理,如果抛异常,就continue掉
def get_url(url):
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"
    '''建立socket连接'''
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.setblocking(False)
    try:
        client.connect((host, 80))
    except BlockingIOError as e:
        pass
    '''发送数据'''
    while True:
        try:
            client.send("GET {} HTTP/1.1\r\nHOST:{}\r\nConnection:close\r\n\r\n".
                        format(path, host).encode("utf-8"))
            break
        except OSError as e:
            pass
    '''接收数据'''
    data = b""
    while True:
        try:
            d = client.recv(1024)
        except BlockingIOError as e:
            continue
        if d:
            data += d
        else:
            break
    data = data.decode("utf-8")
    client.close()
    return data

使用select来实现HTTP

  • 在上面的测试中,非阻赛的请求时间并没有缩短,反而在cpu的运行时间上远远高于阻塞
  • select本身是不支持register模式的
  • socket状态变化以后的回调是由我们自己完成的(使用事件循环)
  • IO多路复用的实现方式就是:select(poll\epoll) + 回调 + 事件循环
from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ

selector = DefaultSelector()
urls = []
stop = False

class Fetcher:
    def get_url(self, url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"
        '''建立socket连接'''
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)
        try:
            self.client.connect((self.host, 80))
        except BlockingIOError as e:
            pass
        '''将socket的事件描述符注册到selector中'''
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
    
    '''发送数据的回调函数'''
    def connected(self,key):
        selector.unregister(key.fd)
        self.client.send("GET {} HTTP/1.1\r\nHOST:{}\r\nConnection:close\r\n\r\n".
                        format(self.path, self.host).encode("utf-8"))
        '''将接收数据的回调函数注册到selector中'''
        selector.register(self.client.fileno(), EVENT_READ, self.readable)
        
    '''接受数据的回调函数'''
    def readable(self,key):
        '''接收数据'''
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode("utf-8")
            self.client.close()
            urls.remove(self.spider_url)
            if not urls:
                global stop
                stop = True
            print(data)
            return data
def loop():
    '''事件循环,不停的请求socket的状态并调用对应的回调函数'''
    while not stop:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data
            call_back(key)

回调之痛

  • 当我们使用select + 回调 + 事件循环 的模式后
  • 虽然我们的运行时间大大缩短
  • 但我们代码的可读性变差,共享状态管理变得困难,异常处理变得困难
  • 为了解决这些问题,Python中引入了协程
标签: IO

召唤伊斯特瓦尔