中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python實現簡易版的Web服務器(推薦)

發布時間:2020-08-21 13:02:26 來源:腳本之家 閱讀:379 作者:小單同桌 欄目:開發技術

下面給大家介紹python實現簡易版的web服務器,具體內容詳情大家通過本文學習吧!

Python實現簡易版的Web服務器(推薦)

1、請自行了解HTTP協議

https://www.jb51.net/article/133883.htm(點擊跳轉)

2、創建Socket服務,監聽指定IP和端口

Python實現簡易版的Web服務器(推薦)

3、以阻塞方式等待客戶端連接

Python實現簡易版的Web服務器(推薦)

4、讀取客戶端請求數據并進行解析

Python實現簡易版的Web服務器(推薦)

5、準備服務器運行上下文

Python實現簡易版的Web服務器(推薦)

6、處理客戶端請求數據

Python實現簡易版的Web服務器(推薦)

7、根據用戶請求路徑讀取文件

Python實現簡易版的Web服務器(推薦)

8、返回響應結果給客戶端

Python實現簡易版的Web服務器(推薦)

9、程序入口

Python實現簡易版的Web服務器(推薦)

10、目錄結構

Python實現簡易版的Web服務器(推薦)

11、運行

python wsgiserver.py app:run

12、源碼

a.wsgiserver.py文件

#encoding:utf-8
import socket
import StringIO
import sys
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class WSGIServer(object):
 address_family = socket.AF_INET
 socket_type = socket.SOCK_STREAM
 request_queue_size = 30
 recv_size = 1024
 def __init__(self, server_address):
  self._listen_socket = _listen_socket = socket.socket(self.address_family,
               self.socket_type)  _listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)  _listen_socket.bind(server_address) 
  _listen_socket.listen(self.request_queue_size)  
  _host, _port = _listen_socket.getsockname()
  self._server_name = socket.getfqdn(_host)
  self._server_port = _port
  self._headers_set = []
  self._application = None
  self._client = None
  self._request_data = None
  self._request_method = None
  self._path = None
  self._request_version = None
  self._start_response = None
 def set_application(self, application):
  self._application = application
 def server_forever(self): 
  _listen_socket = self._listen_socket
  logger.info('listen on %s:%s', self._server_name, self._server_port)  while 1:  
   try:
    self._client, _addr = _listen_socket.accept()
    self._handle_request(_addr)
   except KeyboardInterrupt as e:
    logger.info('interrupt')
    break
   except BaseException as e:
    logger.error(e)
 def _handle_request(self, client_addr):
  self._request_data = _request_data = self._client.recv(self.recv_size)
  self._parse_request_data(_request_data)  
  _env = self._get_environment(client_addr) 
  _result = self._application(_env, self.start_response)
  self._finish_response(_result)
 def _parse_request_data(self, request_data): 
  _request_line = str(request_data.splitlines()[0]).rstrip('\r\n')
  (self._request_method, self._path, self._request_version) = _request_line.split()
 def _get_environment(self, client_addr):  
  _env = {}  
  _env['wsgi.version'] = (1, 0)  
  _env['wsgi.url_scheme'] = 'http'
  _env['wsgi.input'] = StringIO.StringIO(self._request_data)    _env['wsgi.errors'] = sys.stderr  
  _env['wsgi.multithread'] = False
  _env['wsgi.multiprocess'] = False
  _env['wsgi.run_once'] = False
  _env['REQUEST_METHOD'] = self._request_method.upper()  
  _env['PATH_INFO'] = self._path
  _env['SERVER_NAME'] = self._server_name
  _env['SERVER_PORT'] = self._server_port
  _env['HTTP_CLIENT_IP'] = client_addr[0]
  logger.info('%s %s %s %s', _env['HTTP_CLIENT_IP'], datetime.now().strftime('%Y-%m-%d %H:%M:%S'), _env['REQUEST_METHOD'], _env['PATH_INFO'])
  return _env
 def start_response(self, status, response_headers, exc_info=None):   _server_headers = [
   ('Date', 'Sun, 7 Jun 2015 23:07:04 GMT'),
   ('Server', 'WSGIServer 0.1')
   ]
  self._headers_set = [status, response_headers + _server_headers]
 def _finish_response(self, result):  
  _status, _response_headers = self._headers_set
  _response = 'HTTP/1.1 {status}\r\n'.format(status=_status)    for _header in _response_headers:  
  _response += '{0}:{1}\r\n'.format(*_header)  
  _response += '\r\n'
  for _data in result:   
   _response += _data
  self._client.sendall(_response)
  self._client.close()
def make_server(server_address, application):
 server = WSGIServer(server_address)
 server.set_application(application)
 return server
if __name__ == '__main__':
 logging.basicConfig(level=logging.DEBUG)
 server_addr= ('0.0.0.0', 43002)
 app_path = sys.argv[1]
 module, application = app_path.split(':')
 module = __import__(module)
 application = getattr(module, application)
 httpd = make_server(server_addr, application)
 httpd.server_forever()

b.app.py文件

#encoding:utf-8
import os
class PageNotFoundException(BaseException):
 pass
def render(filename, dirname='html'):
 _path = os.path.join(dirname, filename)
 if os.path.exists(_path):  
  with open(_path, 'rb') as handler:  
   return handler.read()
 raise PageNotFoundException('file not found:%s' % _path)
def run(env, start_response):
 _path = env.get('PATH_INFO')
 response = ''
 try:
  _path = 'index.html' if _path == '/' else _path[1:]
  if _path.endswith('.css'):
   start_response('200 OK', [('Content-Type', 'text/css')])
  elif _path.endswith('.js'):
   start_response('200 OK', [('Content-Type', 'text/javascript')]
  elif _path.endswith('.html'):
   start_response('200 OK', [('Content-Type', 'text/html')])
  else:
   start_response('200 OK', [('Content-Type', 'text/plain'), ('Content-Disposition', 'attachment; filename=%s' % os.path.basename(_path))])
  response = render(_path) 
 except PageNotFoundException as e:
  response = render('404.html') 
 return [response, '\r\n']

總結

以上所述是小編給大家介紹的Python實現簡易版的Web服務器,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對億速云網站的支持!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

玛纳斯县| 淅川县| 通辽市| 沽源县| 景泰县| 南华县| 资兴市| 定西市| 华宁县| 江达县| 武夷山市| 桓仁| 兴城市| 尉氏县| 祥云县| 壶关县| 衢州市| 永新县| 南丹县| 大姚县| 城固县| 安康市| 喜德县| 屯昌县| 镇安县| 京山县| 平江县| 南平市| 方山县| 长武县| 泸西县| 溧水县| 奇台县| 普安县| 克什克腾旗| 东乡族自治县| 鹤岗市| 府谷县| 五大连池市| 神农架林区| 鸡东县|