Python学习笔记【第十五篇】:Python网络编程三ftp案例练习--断点续传
开发一个支持多用户在线的FTP程序-------------------主要是学习思路
实现功能点
1:用户登陆验证(用户名、密码)
2:实现多用户登陆
3:实现简单的cmd命令操作
4:文件的上传(断点续传)
程序文件结构
说明:
客户端文件夹为TFTP_Client, 服务端文件夹为TFTP_Server,bin目录下的文件为启动文件。核心代码在core文件夹中,服务端home文件夹为每个账号的家目录,已登陆名为文件夹名,conf文件夹为配置文件,logger为日志文件夹(未实现)
一:启动服务端。启动文件为ftp_server.py 文件
首先将编译器定位到启动文件目录中 cd demo/tftp_server/bin(根据创建文件路径)
启动服务:python ftp_server.py start
代码:
# -*- coding: utf-8 -*- # 声明字符编码 # coding:utf-8 import os, sys # 手动添加环境变量(找到TFTP_Server这层) PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(PATH) # 引入core层中main模块 from core import main if __name__ == "__main__": # main模块调用AravHandler main.AravHandler()
二:启动客户端。启动文件为ftp_Client.py 文件
首先定位到bin目录:cd demo/tftp_client/bin
连接服务器:python ftp_client.py -s 127.0.0.1 -P 8888 -u root -p root
看看客户端反应
客户端启动代码
# -*- coding: utf-8 -*- # 声明字符编码 # coding:utf-8 import os, sys # 手动添加环境变量(找到TFTP_Server这层) PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(PATH) # 引入core层中main模块 from core import main if __name__ == "__main__": # main模块调用AravHandler main.ClientHandler()
三:服务端main.py 文件和 客户端的main.py 文件-------------(核心代码)
服务端:
# -*- coding: utf-8 -*- # 声明字符编码 # coding:utf-8 import sys # 解析命令行参数 import optparse import socketserver from conf import settings from core import MySocketServer class AravHandler(object): def __init__(self): self.opt = optparse.OptionParser() # options返回的是对象 args:命令参数 options, args = self.opt.parse_args() self.verify_args(options, args) def verify_args(self, options, args): cmd = args[0] # 通过反射处理指令 if hasattr(self, cmd): func = getattr(self, cmd) func() else: print("系统暂无【%s】指令" % cmd) def start(self): print("服务器开始启动....") server = socketserver.ThreadingTCPServer((settings.IP, settings.PORT), MySocketServer.ServerHandler) server.serve_forever()
服务端:MySocketServer.py 文件
# -*- coding: utf-8 -*- # 声明字符编码 # coding:utf-8 import socketserver import json from conf import settings import subprocess import configparser import os import struct BUFFER_SIZE = 1024 STATUS_CODE = { 250: "Invalid cmd format, e.g: {'action':'get','filename':'test.py','size':344}", 251: "Invalid cmd ", 252: "Invalid auth data", 253: "Wrong username or password", 254: "Passed authentication", 255: "Filename doesn't provided", 256: "File doesn't exist on server", 257: "ready to send file", 258: "md5 verification", 800: "the file exist,but not enough ,is continue? ", 801: "the file exist !", 802: " ready to receive datas", 900: "md5 valdate success" } class ServerHandler(socketserver.BaseRequestHandler): # 读取账号配置文件进行验证 def authenticate(self, user, pwd): conf = configparser.ConfigParser() print("账号配置文件路径:", settings.ACCOUNT_PATH) conf.read(settings.ACCOUNT_PATH) # 判断当前用户是否存在 if user in conf.sections(): if conf[user]["Password"] == pwd: self.user = user self.file_write_path = os.path.join(settings.BASE_DIR, "home", user) return user # 不满足条件,函数返回None # 验证方法 def auth(self, **kwargs): print("服务器准备验证用户信息.....") user_name = kwargs["user"] user_pwd = kwargs["pwd"] print("用户输入的用户名:%s 密码:%s " % (user_name, user_pwd)) user = self.authenticate(user_name, user_pwd) print("验证后用户名为:%s " % user) if user: self.send_response(254) else: self.send_response(253) # 响应客户端 def send_response(self, status_code): response = {"status_code": status_code} self.request.sendall(json.dumps(response).encode("utf-8")) def handle(self): self.ip, self.port = self.client_address print("客户端[%s:%s]已连接到服务器" % (self.ip, self.port)) # 处理用户发送的信息 while True: try: client_msg = self.request.recv(BUFFER_SIZE) if not client_msg: break print("客户端【%s】>>%s" % (self.client_address, client_msg)) data = json.loads(client_msg.decode('utf-8')) """ 客户端与服务端通讯格式 { "action":"执行的方法", "user":"用户名", "pwd":"密码” } """ if data.get('action'): # 方法分发调用 if hasattr(self, data.get('action')): func = getattr(self, data.get('action')) func(**data) else: print("'%s' 不是内部或外部命令,也不是可运行的程序或批处理文件。" % data.get('action')) else: print("Invalid cmd") except Exception as e: print(e) break # 解析写入数据 def put(self, **kwargs): file_name = kwargs.get("file_name") file_size = kwargs.get("file_size") target_path = kwargs.get("target_path") abs_path = os.path.join(self.file_write_path, target_path, file_name) print("文件写入路径:", abs_path) # 判断当前上传的文件服务器是否有 write_size = 0 if os.path.exists(abs_path): # ===================文件在服务器存在的情况===================== server_file_size = os.stat(abs_path).st_size if server_file_size < file_size: # 进行断点续传 self.request.sendall("800".encode('utf-8')) yorn = self.request.recv(BUFFER_SIZE).decode('utf-8') if yorn == "Y": # 继续上传 self.request.sendall(str(server_file_size).encode('utf-8')) write_size += server_file_size f = open(abs_path, "ab") elif yorn == "N": # 不续传,重新上传 f = open(abs_path, "wb") else: # 文件存在并且大小相等提示用户即可 self.request.sendall("801".encode("utf-8")) return else: # ==================文件为空直接写入========================= self.request.sendall("802".encode("utf-8")) f = open(abs_path, "wb") while write_size < file_size: try: data = self.request.recv(BUFFER_SIZE) except Exception as e: print(e) break f.write(data) write_size += len(data) f.close() print("===========文件上传完成===========") def ls(self, **kwargs): print("接收客户端[%s:%s]命令[%s]" % (self.ip, self.port, "ls")) # 处理执行的命令 res = subprocess.Popen("dir", shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, stdin=subprocess.PIPE) err = res.stderr.read() if err: cmd_err = err else: cmd_err = res.stdout.read() # # 第一种方式:解决粘包问题 # msg_len = len(cmd_err) # print("数据长度为:", msg_len) # client_socket.send(str(msg_len).encode('utf-8')) # # 马上等待回复 # is_ok = client_socket.recv(BUFFER_SIZE) # if is_ok == b"OK": # client_socket.send(cmd_err) # 第二种方式:解决粘包问题 msg_len = len(cmd_err) msg_len = struct.pack('i', msg_len) # 下面两次发送,在客户端会当成一次接收 self.request.send(msg_len) self.request.send(cmd_err) # print(msg_len) # print(cmd_err)
客户端:
# -*- coding: utf-8 -*- # 声明字符编码 # coding:utf-8 import optparse import socket import configparser import json import os import sys import struct # 服务队与客户端交互状态码 STATUS_CODE = { 250: "Invalid cmd format, e.g: {'action':'get','filename':'test.py','size':344}", 251: "Invalid cmd ", 252: "Invalid auth data", 253: "Wrong username or password", 254: "Passed authentication", 255: "Filename doesn't provided", 256: "File doesn't exist on server", 257: "ready to send file", 258: "md5 verification", 800: "the file exist,but not enough ,is continue? ", 801: "the file exist !", 802: " ready to receive datas", 900: "md5 valdate success" } class ClientHandler(object): def __init__(self): self.opt = optparse.OptionParser() # # 这里有两种方式可以获取启动文件后面跟的参数 1:通过索引获取。2:通过optparse构建对象。 # # 第一种 获取命令列表 # print(sys.argv) # # 第二种 self.opt.add_option("-s", "--s", dest="server") self.opt.add_option("-P", "--P", dest="port") self.opt.add_option("-u", "--u", dest="user") self.opt.add_option("-p", "--p", dest="pwd") self.options, self.args = self.opt.parse_args() self.port_verification() self.client_connect() self.upload_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # print(options) # print(args) # cmd = sys.argv[1] # print(cmd) # 服务端应答处理 def server_answer(self): data = self.sock.recv(1024).decode('utf-8') if data is not None: data = json.loads(data) return data # 账号发送至服务端(服务器验证账号密码) def account_verification(self, user, pwd): """ 客户端与服务端通讯格式 { "action":"执行的方法", "user":"用户名", "pwd":"密码” } """ data = {"action": "auth", "user": user, "pwd": pwd} self.sock.send(json.dumps(data).encode('utf-8')) # 等待服务端回消息 response = self.server_answer() print("服务器<<:", response) if response["status_code"] == 254: self.user = user print("status_code<<:", STATUS_CODE[254]) return True else: print(STATUS_CODE[response["status_code"]]) # 账号参数验证 def user_info_verification(self): if self.options.user is None or self.options.pwd is None: user_name = input("user: ") user_pwd = input("pwd: ") return self.account_verification(user_name, user_pwd) else: return self.account_verification(self.options.user, self.options.pwd) # 端口号校验 def port_verification(self): if int(self.options.port) > 0: if int(self.options.port) < 65535: return True else: exit("端口号的取值范围因该在0-65535") else: exit("端口号的取值范围因该在0-65535") # 客户端连接服务器 def client_connect(self): print("正在连接服务器....") self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.options.server, int(self.options.port))) # 交互 def interactive(self): # 账号参数验证 if self.user_info_verification(): while True: print("begin to interactive.......") cmd_info = input("[%s]" % self.user).strip() # put txt.png images cmd_list = cmd_info.split() print("cmd 命令:", cmd_list) if hasattr(self, cmd_list[0]): func = getattr(self, cmd_list[0]) func(*cmd_list) else: print("'%s' 不是内部或外部命令,也不是可运行的程序或批处理文件。" % cmd_list[0]) # 打印进度条 # 上传功能 def put(self, *args): action, local_path, target_path = args # 读取本地路径资源(默认读取TFTP_Client/files) local_path = os.path.join(self.upload_path, "files", local_path) print("文件读取路径:", local_path) upload_file_size = os.stat(local_path).st_size print("上传文件:[%s][%d]" % (os.path.basename(local_path), upload_file_size)) data = { "action": "put", "file_name": os.path.basename(local_path), "file_size": upload_file_size, "target_path": target_path } self.sock.send(json.dumps(data).encode("utf-8")) is_exit = self.sock.recv(1024).decode('utf-8') client_size = 0 if is_exit == "800": # 文件不完整 yorn = input("文件有未完成记录是否继续上传【y/n】").strip().upper() if yorn == "Y": # 继续上传 self.sock.sendall(yorn.encode("utf-8")) seck_size = self.sock.recv(1024).decode("utf-8") client_size += int(seck_size) elif yorn == "N": # 不续传,重新上传 self.sock.sendall(yorn.encode("utf-8")) elif is_exit == "801": # 文件完全存在 print("文件[%s]已存在" % os.path.basename(local_path)) return else: pass f = open(local_path, "rb") f.seek(client_size) while client_size < upload_file_size: data = f.read(1024) self.sock.sendall(data) client_size += len(data) self.show_progress(client_size, upload_file_size) # 打印进度条 def show_progress(self, number, total): rate = float(number) / float(total) rate_num = int(rate * 100) sys.stdout.write("%s%% %s\r" % (rate_num, "#" * rate_num)) def ls(self, *args): data = { "action": "ls" } self.sock.sendall(json.dumps(data).encode('utf-8')) # 第二种方式:解决粘包问题 # 先接收四个字节 length_data = self.sock.recv(4) content_length = struct.unpack('i', length_data)[0] print("准备接收%d大小的数据" % content_length) recv_size = 0 recv_msg = b'' # 循环获取数据 while recv_size < content_length: recv_msg += self.sock.recv(1024) recv_size = len(recv_msg) print("<<%s" % (recv_msg.decode('gbk'))) client = ClientHandler() client.interactive()
四:服务端配置文件 accounts.cfg 和 settings.py
[DEFAULT]
[admin]
Password = 123
Quotation = 100
[root]
Password = root
Quatation = 100
# -*- coding: utf-8 -*- # 声明字符编码 # coding:utf-8 import os, sys # 项目根目录 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 账号文件路径 ACCOUNT_PATH = os.path.join(BASE_DIR, "conf", "accounts.cfg") IP = "127.0.0.1" PORT = 8888
五:简单演示
上传文件:
断点续传:
六 总结:
整个程序就是一个服务端和客户端之间的简单通讯,通过约定好的内容来做相应事情(调用哪个方法),当客户端向服务端发送一同指令,服务端接收后通过反射来判断当前服务中又没有对应指令的方法,有则获取调用,没有就提示客户端。断点续传则是,客户端先发送这次上传的文件信息(约定格式为JSON内容 data = { "action": "put", "file_name": os.path.basename(local_path), "file_size": upload_file_size,"target_path": target_path},服务端收到后解析内容,然后判断文件在服务器这边的状态(文件已存在、文件不存在、文件存在并且大小不相等提示用户是否继续上传等)返回给客户端。客户端根据服务器返回的状态码经行相应的读取文件发送给服务端。