欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python实现简易版的Web服务器(推荐)

程序员文章站 2022-05-30 08:15:47
下面给大家介绍python实现简易版的web服务器,具体内容详情大家通过本文学习吧! 1、请自行了解HTTP协议 http://www.cnblogs.com/re...

下面给大家介绍python实现简易版的web服务器,具体内容详情大家通过本文学习吧!

Python实现简易版的Web服务器(推荐)

1、请自行了解HTTP协议

http://www.cnblogs.com/reboot51/p/8358129.html(点击跳转)

2、创建Socket服务,监听指定IP和端口

Python实现简易版的Web服务器(推荐)

3、以阻塞方式等待客户端连接

Python实现简易版的Web服务器(推荐)

4、读取客户端请求数据并进行解析

Python实现简易版的Web服务器(推荐)

5、准备服务器运行上下文

Python实现简易版的Web服务器(推荐)

6、处理客户端请求数据

Python实现简易版的Web服务器(推荐)

7、根据用户请求路径读取文件

Python实现简易版的Web服务器(推荐)

8、返回响应结果给客户端

Python实现简易版的Web服务器(推荐)

9、程序入口

Python实现简易版的Web服务器(推荐)

10、目录结构

Python实现简易版的Web服务器(推荐)

11、运行

python wsgiserver.py app:run

12、源码

a.wsgiserver.py文件

#encoding:utf-8
import socket
import StringIO
import sys
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class WSGIServer(object):
  address_family = socket.AF_INET
  socket_type = socket.SOCK_STREAM
  request_queue_size = 30
  recv_size = 1024
  def __init__(self, server_address):
    self._listen_socket = _listen_socket = socket.socket(self.address_family,
                             self.socket_type)    _listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)    _listen_socket.bind(server_address)  
    _listen_socket.listen(self.request_queue_size)   
    _host, _port = _listen_socket.getsockname()
    self._server_name = socket.getfqdn(_host)
    self._server_port = _port
    self._headers_set = []
    self._application = None
    self._client = None
    self._request_data = None
    self._request_method = None
    self._path = None
    self._request_version = None
    self._start_response = None
  def set_application(self, application):
    self._application = application
  def server_forever(self):  
    _listen_socket = self._listen_socket
    logger.info('listen on %s:%s', self._server_name, self._server_port)   while 1:   
      try:
        self._client, _addr = _listen_socket.accept()
        self._handle_request(_addr)
      except KeyboardInterrupt as e:
        logger.info('interrupt')
        break
      except BaseException as e:
        logger.error(e)
  def _handle_request(self, client_addr):
    self._request_data = _request_data = self._client.recv(self.recv_size)
    self._parse_request_data(_request_data)    
    _env = self._get_environment(client_addr)  
    _result = self._application(_env, self.start_response)
    self._finish_response(_result)
  def _parse_request_data(self, request_data):  
    _request_line = str(request_data.splitlines()[0]).rstrip('\r\n')
    (self._request_method, self._path, self._request_version) = _request_line.split()
  def _get_environment(self, client_addr):   
    _env = {}    
    _env['wsgi.version'] = (1, 0)    
    _env['wsgi.url_scheme'] = 'http'
    _env['wsgi.input'] = StringIO.StringIO(self._request_data)        _env['wsgi.errors'] = sys.stderr    
    _env['wsgi.multithread'] = False
    _env['wsgi.multiprocess'] = False
    _env['wsgi.run_once'] = False
    _env['REQUEST_METHOD'] = self._request_method.upper()    
    _env['PATH_INFO'] = self._path
    _env['SERVER_NAME'] = self._server_name
    _env['SERVER_PORT'] = self._server_port
    _env['HTTP_CLIENT_IP'] = client_addr[0]
    logger.info('%s %s %s %s', _env['HTTP_CLIENT_IP'], datetime.now().strftime('%Y-%m-%d %H:%M:%S'), _env['REQUEST_METHOD'], _env['PATH_INFO'])
    return _env
  def start_response(self, status, response_headers, exc_info=None):      _server_headers = [
      ('Date', 'Sun, 7 Jun 2015 23:07:04 GMT'),
      ('Server', 'WSGIServer 0.1')
      ]
    self._headers_set = [status, response_headers + _server_headers]
  def _finish_response(self, result):   
    _status, _response_headers = self._headers_set
    _response = 'HTTP/1.1 {status}\r\n'.format(status=_status)        for _header in _response_headers:    
    _response += '{0}:{1}\r\n'.format(*_header)    
    _response += '\r\n'
    for _data in result:      
      _response += _data
    self._client.sendall(_response)
    self._client.close()
def make_server(server_address, application):
  server = WSGIServer(server_address)
  server.set_application(application)
  return server
if __name__ == '__main__':
  logging.basicConfig(level=logging.DEBUG)
  server_addr= ('0.0.0.0', 43002)
  app_path = sys.argv[1]
  module, application = app_path.split(':')
  module = __import__(module)
  application = getattr(module, application)
  httpd = make_server(server_addr, application)
  httpd.server_forever()

b.app.py文件

#encoding:utf-8
import os
class PageNotFoundException(BaseException):
  pass
def render(filename, dirname='html'):
  _path = os.path.join(dirname, filename)
  if os.path.exists(_path):   
    with open(_path, 'rb') as handler:   
      return handler.read()
  raise PageNotFoundException('file not found:%s' % _path)
def run(env, start_response):
  _path = env.get('PATH_INFO')
  response = ''
  try:
    _path = 'index.html' if _path == '/' else _path[1:]
    if _path.endswith('.css'):
      start_response('200 OK', [('Content-Type', 'text/css')])
    elif _path.endswith('.js'):
      start_response('200 OK', [('Content-Type', 'text/javascript')]
    elif _path.endswith('.html'):
      start_response('200 OK', [('Content-Type', 'text/html')])
    else:
      start_response('200 OK', [('Content-Type', 'text/plain'), ('Content-Disposition', 'attachment; filename=%s' % os.path.basename(_path))])
    response = render(_path) 
  except PageNotFoundException as e:
    response = render('404.html')  
  return [response, '\r\n']

总结

以上所述是小编给大家介绍的Python实现简易版的Web服务器,希望对大家有所帮助