SELECT版FTP的实例详解
程序员文章站
2022-04-25 16:45:53
...
SELECT版FTP:
使用SELECT或SELECTORS模块实现并发简单版FTP
允许多用户并发上传下载文件
View Code
View Code
View Code
View Code
View Code
View Code
View Code
View Code
使用SELECT或SELECTORS模块实现并发简单版FTP
允许多用户并发上传下载文件
必须使用select or selectors模块支持多并发,禁止使用多线程或多进程
REDMAE
用户登陆 1、查看共享目录文件 2、上传文件, 3、下载方件 4、退出 程序结构: socket_server_client/#程序目录 |- - -clients/#client程序主目录 | |- - -__init__.py | |- - -bin/#启用目录 | | |- - - __init__.py | | |- - -socket_client.py#客户端启动 | | | |- - -cfg/#配置文件目录 | | |- - - __init__.py | | |- - -config.py#配置文件 | | | |- - -core/#主要程序目录 | | |- - - __init__.py | | |- - -client_func.py#主要函数 | | | |- - -home/#客户端下载文件目录 | |- - -servers/#server程序主目录 | |- - -__init__.py | |- - -bin/#启用目录 | | |- - - __init__.py | | |- - -registration.py#用户注册 | | |- - -server.py#服务端启动(selectors版) | | |- - -socket_server.py#服务端启动(select版) | | | |- - -cfg/#配置文件目录 | | |- - - __init__.py | | |- - -config.py#配置文件 | | | |- - -core/#主要程序目录 | | |- - - __init__.py | | |- - -server_classc.py#主要函数 | | | |- - -db/#用户上传文件主目录 | |- - -user_file/#用户上传目录(共享) | |- - -user_names#注册用户文件 |
程序结构: socket_server_client/#程序目录 |- - -clients/#client程序主目录 | |- - -__init__.py | |- - -bin/#启用目录 | | |- - - __init__.py | | |- - -socket_client.py#客户端启动
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 5 import socket,os,json,sys 6 BASE_DIR=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 7 sys.path.append(BASE_DIR)#增加环境变量 8 from core.client_func import user_pwd 9 #from core.client_func import show_process 10 from cfg import config 11 12 #进度条 13 def show_process(lens): 14 received_size=0#定义大小 15 current_percent=0#当前大小百分比 16 while received_size<lens: 17 if int((received_size/lens)*100)>current_percent: 18 print('#',end='',flush=True) 19 current_percent=int((received_size/lens)*100) 20 new_size=yield 21 received_size+=new_size 22 23 server_addr=('localhost',9500)#设置绑定的 IP 端口 24 #server_addr=('192.168.11.50',9500)#设置绑定的 IP 端口 25 client=socket.socket() 26 client.connect(server_addr) 27 while True: 28 data_d=user_pwd(client) 29 if data_d['tag']:#运行#用户名登陆成功 30 while True: 31 print('''=====指令提示==== 32 查看目录文件: ls 33 下载文件: get 文件名 或 文件编号 如: get test.txt 或 get 1 34 上传方件: put 路径/文件名 如 put e:/test.txt 35 退出:exit 36 ''') 37 cho=input('指令 >>:').strip() 38 if len(cho)==0:continue 39 if cho=='exit':exit()#退出指令 40 cmd_list=cho.split() 41 if cmd_list[0]=='put':#如果等于下载指令 42 if len(cmd_list)==1: 43 print('没有输入相关文件名') 44 continue 45 filename=cmd_list[1] 46 file_dir=config.USER_DIR+'/'+filename 47 if os.path.isfile(file_dir):#如果文件存在 48 file_obj=open(file_dir,"rb")#打开文件 49 name=file_obj.name.split('/')[-1]#文件名 50 #name=filename.split("\\")[-1]#文件名 51 sez=os.path.getsize(file_dir)#获取文件大小 52 if sez<1: 53 print('\033[41;1m文件为空!,不能上传\033[0m') 54 continue 55 progress = show_process(sez) #进度条 传入文件大小 56 progress.__next__() 57 rat=0 58 file_obj.seek(rat)#移动到位置 59 data_header={ 60 "action":"put", 61 "filename":name, 62 "size":sez 63 } 64 client.send(json.dumps(data_header).encode())#用json 序列化后,发送相关 信息 65 66 print("文件[%s]发送中...."%data_header["filename"]) 67 68 while rat<sez: 69 line=file_obj.read(4096) 70 client.send(line) 71 try: 72 progress.send(len(line))#传入当前数据大小 73 except StopIteration as e: 74 print("100%") 75 break 76 print("文件[%s]发送完毕!"%data_header["filename"]) 77 else: 78 print('\033[41;1m该文件不存在或为目录\033[0m') 79 continue 80 elif cmd_list[0]=='get':#如果等于get 上传指令 81 if len(cmd_list)==1: 82 print('没有输入相关文件名') 83 continue 84 filename=cmd_list[1] 85 print(filename) 86 data_header={ 87 "action":"get", 88 "filename":filename, 89 "size":'' 90 } 91 client.send(json.dumps(data_header).encode())#用json 序列化后,发送相关 信息 92 datas=client.recv(4096)#接收数据 指令 93 data_l= json.loads(datas.decode())#反序列 94 # print(data_l) 95 # print(data_l['size']) 96 if data_l['filename']==False: 97 print('\033[41;1m文件不存在或者出错\033[0m') 98 continue 99 prten=show_process(data_l["size"])100 prten.__next__()101 file_dir=config.USER_DIR+'/'+data_l["filename"]102 file_obj=open(file_dir,'wb')#打开新建 这个文件103 rece_size=0#定义 文件大小值104 105 106 while rece_size<data_l["size"]:#小于接收的文件大小时,107 recv_data=client.recv(4096)108 file_obj.write(recv_data)#写入文件109 rece_size+=len(recv_data)#增加文件大小计算110 try:111 prten.send(len(recv_data))112 except StopIteration as e:113 print('100%')114 115 else:116 print("文件[%s]接收完毕!"%data_l["filename"])117 file_obj.flush()118 file_obj.close()#关闭文件119 elif cmd_list[0]=='ls':#查看目录文件120 data_header={121 "action":"ls",122 "filename":'',123 "size":''124 }125 client.send(json.dumps(data_header).encode())#用json 序列化后,发送相关 信息126 datas=client.recv(4096)#接收数据 指令127 data_l= json.loads(datas.decode())#反序列128 for k,v in enumerate(data_l):129 print('编号: %s 文件名:%s'%(k,v))130 131 else:132 print(data_d['mag'])
| |- - -cfg/#配置文件目录 | | |- - - __init__.py | | |- - -config.py#配置文件
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 5 import os ,sys 6 BASE_DIR=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 7 sys.path.append(BASE_DIR)#增加环境变量 8 9 10 USER_DIR=BASE_DIR+'/home'#定义用户目录文件路径变量11 IP='192.168.11.50'12 PORST=9500
| |- - -core/#主要程序目录 | | |- - - __init__.py | | |- - -client_func.py#主要函数
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 import socket,os,json,sys 5 #用户名登陆函数 6 def user_pwd(client): 7 user_=input('请输入用户名:').strip() 8 pwd_=input('请输入密码:').strip() 9 data_header={10 "action":"user",11 "name":user_,12 "pwd":pwd_13 }14 client.send(json.dumps(data_header).encode())#用json 序列化后,发送相关 信息15 data=client.recv(4096)#接收数据 指令16 data_s=json.loads(data.decode('utf-8'))#反序列17 return data_s
|- - -servers/#server程序主目录 | |- - -__init__.py | |- - -bin/#启用目录 | | |- - - __init__.py | | |- - -registration.py#用户注册
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 import socket,os,json,sys,pickle 5 6 BASE_DIR=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 7 sys.path.append(BASE_DIR)#增加环境变量 8 from cfg import config 9 print('用户注册'.center(60,'='))10 while True:11 user_=input('请输入您要注册的用户名:').strip()12 user_dir=os.path.join(config.USER_DIR,user_)#拼接用户目录路径13 if os.path.isdir(user_dir):# 判断一个目录是否存在14 print('用户已经存在请重输!')15 continue16 else:17 pwd_=input('请输入密码:').strip()18 pwd_two=input('请确认密码:').strip()19 if pwd_==pwd_two:20 21 22 if not os.path.isfile(config.USER_FILE):23 with open(config.USER_FILE,'w',encoding='utf-8') as f:24 f.write('{}')25 with open(config.USER_FILE,'r+',encoding='utf-8') as f:26 data=eval(f.readline())27 data[user_]=pwd_28 f.seek(0)29 f.write(str(data))30 print('用户[%s]注册成功!'%user_)31 exit()
| | |- - -server.py#服务端启动(selectors版)
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 #python 5 #2017/6/24 19:34 6 #__author__='Administrator' 7 import select,socket,sys ,queue,json,os 8 BASE_DIR=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 9 sys.path.append(BASE_DIR)#增加环境变量10 11 import core12 from core.server_class import socket_server13 14 s=socket.socket()#实例化一个连接对象15 s.setblocking(0)#设置成非阻塞16 server_addr=('localhost',9500)#设置绑定的 IP 端口17 s.bind(server_addr)#连接对象绑定IP 端口18 s.listen(100)#队列 可连接数量19 inputs=[s,]#首先要监测本身20 21 outputs=[]#发送列表22 23 meg_queues={} #发送 连接对象的队列集合 字典24 25 while True:26 print('监听中......')27 readable,writeable,exeptional=select.select(inputs,outputs,inputs)#生成select 对象,返回三个列表 连接,发关,错误28 29 for i in readable: #i为一个socket30 if i is s:#如果i 是s 表示有新 连接 进来31 conn,client_addr=i.accept()#建立一个新连接32 print('接入一个新连接...',client_addr)33 conn.setblocking(0)#也设成非阻塞34 inputs.append(conn)#加入select,的连接列表,避免出现阻塞35 meg_queues[conn]=queue.Queue()#创建一个队列 添加到字典36 else:37 try:38 data=i.recv(1024)#如果不是新连接就收数据39 except Exception as e:40 print(e)41 if data: #如果数据不为空42 print('[%s] 发来的数据 [%s]'%(i.getpeername,data))43 meg_queues[i].put(data)#当前连接的消息队列加入数据44 if i not in outputs:#如果当前连接没有在发送列表内,就加入发送列表45 outputs.append(i)46 else:47 print('客户端已经断开了....')#开始清理工作48 if i in outputs:#在发送列表49 outputs.remove(i)#在发送列表内删除50 inputs.remove(i)#在连接列表内删除51 del meg_queues[i]#在队列字典内删除52 53 for w in writeable:#循环发送列表54 try:55 msg=meg_queues[w].get_nowait()#取出队列中的数据,判断56 except queue.Empty:#如果数据为空57 outputs.remove(w)##从发送列表内删除58 else:59 data = json.loads(msg.decode())#反序列60 serv=socket_server(data,w)61 if data['action']=='user':#如果是用户名,进行认证\62 #serv=socket_server(data,conn)63 ret=serv.ret_l()64 if ret['tag']:65 pass66 else:67 break68 #print('echoing', repr(data), 'to', conn)69 #data=json.loads(data)70 if data['action']=="put":#如果接收的字典中是put,就是进行接收71 #serv=socket_server(data,conn)72 serv.put_file(serv.open_f())#调对象方法73 elif data['action']=='get':#下载74 #serv=socket_server(data,conn)#实例化75 serv.send_file(serv.open_f())#调 用方法76 elif data['action']=='ls':#查看77 #serv=socket_server(data,conn)78 serv.ls_file(serv.open_f())79 break80 81 #w.send(msg)#发送82 83 84 85 for e in exeptional:#循环错误列表86 print('连接[%s]出错!'%e.getpeername)87 inputs.remove(e)##从发送列表内删除88 if e in outputs:#在发送列表89 outputs.remove(e)#在发送列表内删除90 e.close()91 del meg_queues[e]#在队列字典内删除
| | |- - -socket_server.py#服务端启动(select版)
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 import socket,os,json 5 import sys 6 import selectors 7 8 BASE_DIR=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 9 sys.path.append(BASE_DIR)#增加环境变量10 11 from core.server_class import socket_server12 from core.server_class import open_file_list13 14 15 16 17 18 def accept(sock, mask):19 conn, addr = sock.accept() # 建立新连接20 print('accepted', conn, 'from', addr)21 conn.setblocking(False)#设成非阻塞22 sel.register(conn, selectors.EVENT_READ, read)#注册 连接,回调函数 read23 24 25 def read(conn,mask):26 #gevent.spawn(handle_request, cli)#创建一个新协程来27 data = conn.recv(1024) # 接收数据28 if data:#不为空29 print('接收的数据:')30 #print(mask)31 if len(data)==0:32 return33 data = json.loads(data.decode())#反序列34 serv=socket_server(data,conn)35 if data['action']=='user':#如果是用户名,进行认证\36 #serv=socket_server(data,conn)37 ret=serv.ret_l()38 if ret['tag']:39 pass40 else:41 return42 if data['action']=="put":#如果接收的字典中是put,就是进行接收43 #serv=socket_server(data,conn)44 serv.put_file(serv.open_f())#调对象方法45 elif data['action']=='get':#下载46 #serv=socket_server(data,conn)#实例化47 serv.send_file(serv.open_f())#调 用方法48 elif data['action']=='ls':#查看49 #serv=socket_server(data,conn)50 serv.ls_file(serv.open_f())51 return52 else:#如果为空53 print('closing', conn)54 sel.unregister(conn)#取消注册55 conn.close()#关闭连接56 57 server_addr=('0.0.0.0',9501)#设置绑定的 IP 端口58 s=socket.socket()#定义59 s.bind(server_addr)#绑定IP 端口60 s.listen(5)#对列561 s.setblocking(False)#非阻塞62 print('正在监听中')63 64 sel = selectors.DefaultSelector()#生成一个创建一个selectors对象65 sel.register(s, selectors.EVENT_READ, accept)#注册连接 返调函数为accepts66 67 while True:68 events = sel.select()#默认为阻塞模式69 for key, mask in events:#如果有连接,接入70 callback = key.data#新建连接句柄71 callback(key.fileobj, mask)
| |- - -cfg/#配置文件目录 | | |- - - __init__.py | | |- - -config.py#配置文件
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 import os ,sys 5 BASE_DIR=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 6 sys.path.append(BASE_DIR)#增加环境变量 7 8 9 USER_DIR=BASE_DIR+'/db/user_file/'#定义用户目录文件路径变量10 11 USER_FILE=BASE_DIR+'/db/user_names'#定义用户名密码文件路径变量12 IP='localhost'13 PORST=9501
| |- - -core/#主要程序目录 | | |- - - __init__.py | | |- - -server_classc.py#主要函数
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 import socket,os,json,sys,pickle 5 import selectors 6 7 BASE_DIR=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 8 sys.path.append(BASE_DIR)#增加环境变量 9 10 from cfg import config 11 12 13 #用户名检测函数 14 15 def open_file_list(name,pas):#传入当前类 16 with open(config.USER_FILE,'r',encoding='utf-8') as f: 17 data=eval(f.readline()) 18 print(data) 19 if name in data and pas==data[name]: 20 return True 21 else: 22 return False 23 24 25 26 27 #连接类 28 class socket_server(object): 29 '''连接类''' 30 file_path=config.USER_DIR#用户路经变量 31 def __init__(self,data,conn):#传入 32 33 self.DATA=data 34 self.conn=conn 35 36 37 def ret_l(self): 38 self.ret=self.login(self.DATA["name"],self.DATA['pwd'],self.conn)#用户名检测 39 return self.ret 40 def open_f(self):#打开目录 41 42 file_dir=os.path.join(config.USER_DIR)#用户目录 43 print(file_dir) 44 file_name=os.listdir(file_dir)#目录文件列表 45 f=file_dir+'/'+self.DATA['filename']##上传的文件名 46 return file_dir,file_name,f#返回 47 48 def ls_file(self,data):#查看文件 49 self.conn.send(json.dumps(data[1]).encode()) 50 51 def send_file(self,data): 52 53 if self.DATA['filename'] in data[1]:#如果是输入文件名 54 f=data[0]+'/'+self.DATA['filename'] 55 file_obj=open(f,"rb")#打开文件 56 name=file_obj.name.split('/')[-1]#文件名 57 sez=os.path.getsize(f)#获取文件大小 58 if sez<1: 59 print('文件错误!') 60 data={'filename':False} 61 self.conn.send(json.dumps(data).encode()) 62 print(''.center(30,'=')) 63 print(sez) 64 print(''.center(30,'=')) 65 data_header={ 66 "action":"put", 67 "filename":name, 68 "size":sez 69 } 70 self.conn.send(json.dumps(data_header).encode())#用json 序列化后,发送相关 信息 71 for line in file_obj: 72 self.conn.send(line)#发送数据 73 74 elif self.DATA['filename'].isdigit():#如果是输入编号 75 num=int(self.DATA['filename'])#转为数字 76 try: 77 f=data[0]+'/'+data[1][num]# 78 file_obj=open(f,"rb")#打开文件 79 name=file_obj.name.split('/')[-1]#文件名 80 sez=os.path.getsize(f)#获取文件大小 81 if sez<1: 82 print('文件错误!') 83 data={'filename':False} 84 self.conn.send(json.dumps(data).encode()) 85 print(sez) 86 data_header={ 87 "action":"put", 88 "filename":name, 89 "size":sez 90 } 91 self.conn.send(json.dumps(data_header).encode())#用json 序列化后,发送相关 信息 92 for line in file_obj: 93 self.conn.send(line)#发送数据 94 self.conn.send(json.dumps(f).encode())#发送文件 95 except Exception as e: 96 data={'filename':False} 97 self.conn.send(json.dumps(data).encode()) 98 else: 99 data={'filename':False}100 self.conn.send(json.dumps(data).encode())101 def put_file(self,data):#上传文件102 file_obj=open(data[2],'wb')#打开新建 这个文件103 rece_size=0#定义 文件大小值104 while rece_size<self.DATA["size"]:#小于接收的文件大小时,105 recv_data=self.conn.recv(4096)106 file_obj.write(recv_data)#写入文件107 rece_size+=len(recv_data)#增加文件大小计算108 else:109 print("文件[%s]接收完毕!"%self.DATA["filename"])110 file_obj.flush()111 file_obj.close()#关闭文件112 #@staticmethod113 def login(self,name,pas,conn):#用户检测 函数114 try:115 if open_file_list(name,pas):116 tag=True117 error=''118 datas={'user':name}119 data={'mag':'用户认证通过','tag':True}120 print(json.dumps(data).encode())121 conn.send(json.dumps(data).encode())122 else:123 raise Exception('\033[41;1m用户名或密码错误\033[0m' %name)124 except Exception as e:125 tag=False126 error=str(e)127 datas=''128 data={'mag':'用户或密码错误','tag':False}129 print('发送数据%s'%data)130 conn.send(json.dumps(data).encode())131 return {'tag':tag,'error':error,'data':datas}
以上就是SELECT版FTP的实例详解的详细内容,更多请关注其它相关文章!