欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  后端开发

Python基于Socket实现异步非阻塞

程序员文章站 2022-04-25 23:29:11
...
本篇将使用200行代码完成一个微型异步非阻塞Web框架:Snow。具有很好的参考价值,下面跟着小编一起来看下吧

Python的Web框架中Tornado以异步非阻塞而闻名。本篇将使用200行代码完成一个微型异步非阻塞Web框架:Snow。

一、源码

本文基于非阻塞的Socket以及IO多路复用从而实现异步非阻塞的Web框架,其中便是众多异步非阻塞Web框架内部原理。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import re
import socket
import select
import time
class HttpResponse(object):
 """
 封装响应信息
 """
 def init(self, content=''):
  self.content = content
  self.headers = {}
  self.cookies = {}
 def response(self):
  return bytes(self.content, encoding='utf-8')
class HttpNotFound(HttpResponse):
 """
 404时的错误提示
 """
 def init(self):
  super(HttpNotFound, self).init('404 Not Found')
class HttpRequest(object):
 """
 用户封装用户请求信息
 """
 def init(self, conn):
  self.conn = conn
  self.header_bytes = bytes()
  self.header_dict = {}
  self.body_bytes = bytes()
  self.method = ""
  self.url = ""
  self.protocol = ""
  self.initialize()
  self.initialize_headers()
 def initialize(self):
  header_flag = False
  while True:
   try:
    received = self.conn.recv(8096)
   except Exception as e:
    received = None
   if not received:
    break
   if header_flag:
    self.body_bytes += received
    continue
   temp = received.split(b'\r\n\r\n', 1)
   if len(temp) == 1:
    self.header_bytes += temp
   else:
    h, b = temp
    self.header_bytes += h
    self.body_bytes += b
    header_flag = True
 @property
 def header_str(self):
  return str(self.header_bytes, encoding='utf-8')
 def initialize_headers(self):
  headers = self.header_str.split('\r\n')
  first_line = headers[0].split(' ')
  if len(first_line) == 3:
   self.method, self.url, self.protocol = headers[0].split(' ')
   for line in headers:
    kv = line.split(':')
    if len(kv) == 2:
     k, v = kv
     self.header_dict[k] = v
class Future(object):
 """
 异步非阻塞模式时封装回调函数以及是否准备就绪
 """
 def init(self, callback):
  self.callback = callback
  self._ready = False
  self.value = None
 def set_result(self, value=None):
  self.value = value
  self._ready = True
 @property
 def ready(self):
  return self._ready
class TimeoutFuture(Future):
 """
 异步非阻塞超时
 """
 def init(self, timeout):
  super(TimeoutFuture, self).init(callback=None)
  self.timeout = timeout
  self.start_time = time.time()
 @property
 def ready(self):
  current_time = time.time()
  if current_time > self.start_time + self.timeout:
   self._ready = True
  return self._ready
class Snow(object):
 """
 微型Web框架类
 """
 def init(self, routes):
  self.routes = routes
  self.inputs = set()
  self.request = None
  self.async_request_handler = {}
 def run(self, host='localhost', port=9999):
  """
  事件循环
  :param host:
  :param port:
  :return:
  """
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  sock.bind((host, port,))
  sock.setblocking(False)
  sock.listen(128)
  sock.setblocking(0)
  self.inputs.add(sock)
  try:
   while True:
    readable_list, writeable_list, error_list = select.select(self.inputs, [], self.inputs,0.005)
    for conn in readable_list:
     if sock == conn:
      client, address = conn.accept()
      client.setblocking(False)
      self.inputs.add(client)
     else:
      gen = self.process(conn)
      if isinstance(gen, HttpResponse):
       conn.sendall(gen.response())
       self.inputs.remove(conn)
       conn.close()
      else:
       yielded = next(gen)
self.async_request_handler[conn] = yielded
    self.polling_callback()
except Exception as e:
   pass
  finally:
   sock.close()
def polling_callback(self):
  """
  遍历触发异步非阻塞的回调函数
  :return:
  """
  for conn in list(self.async_request_handler.keys()):
   yielded = self.async_request_handler[conn]
   if not yielded.ready:
    continue
   if yielded.callback:
    ret = yielded.callback(self.request, yielded)
    conn.sendall(ret.response())
   self.inputs.remove(conn)
   del self.async_request_handler[conn]
   conn.close()
 def process(self, conn):
  """
  处理路由系统以及执行函数
  :param conn:
  :return:
  """
  self.request = HttpRequest(conn)
  func = None
  for route in self.routes:
   if re.match(route[0], self.request.url):
    func = route[1]
    break
  if not func:
   return HttpNotFound()
  else:
   return func(self.request)
snow.py

二、使用

1. 基本使用

from snow import Snow
from snow import HttpResponse
def index(request):
return HttpResponse('OK')
routes = [
 (r'/index/', index),
]
app = Snow(routes)
app.run(port=8012)

2.异步非阻塞:超时

from snow import Snow
from snow import HttpResponse
from snow import TimeoutFuture
request_list = []
def async(request):
 obj = TimeoutFuture(5)
 yield obj
def home(request):
 return HttpResponse('home')
routes = [
 (r'/home/', home),
 (r'/async/', async),
]
app = Snow(routes)
app.run(port=8012)

3.异步非阻塞:等待

基于等待模式可以完成自定制操作

from snow import Snow
from snow import HttpResponse
from snow import Future
request_list = []
def callback(request, future):
 return HttpResponse(future.value)
def req(request):
 obj = Future(callback=callback)
 request_list.append(obj)
 yield obj
def stop(request):
 obj = request_list[0]
 del request_list[0]
 obj.set_result('done')
 return HttpResponse('stop')
routes = [
 (r'/req/', req),
 (r'/stop/', stop),
]
app = Snow(routes)
app.run(port=8012)

【相关推荐】

1. Python免费视频教程

2. Python学习手册

3. Python面向对象视频教程

以上就是Python基于Socket实现异步非阻塞的详细内容,更多请关注其它相关文章!