网络编程 学Python的重点
程序员文章站
2022-06-05 20:22:12
网络编程(UPD Socket编程、上传文件实例、计算圆的面积实例)是学Python的重点。一、UPD Socket 编程socket 对象中与UDP Socket 服务器编程有关的方法是bind() ,注意不需要listen() 和accept() , 这是因为UDP 通信不需要像TCP 一样监听端口,建立连接。socket.recvfrom(buffsize) :接收UDP Socket 数据,该方法返回 二元组对象(data,address) ,data 是接收的字节序列对象;addre....
网络编程(UPD Socket编程、上传文件实例、计算圆的面积实例)是学Python的重点。
一、UPD Socket 编程
socket 对象中与UDP Socket 服务器编程有关的方法是bind() ,注意不需要listen() 和accept() , 这是因为UDP 通信不需要像TCP 一样监听端口,建立连接。
socket.recvfrom(buffsize) : 接收UDP Socket 数据,该方法返回 二元组对象(data,address) ,data 是接收的字节序列对象;address 是发送数据的远程Socket 地址。参数buffsize 指定一次接收的最大字节数,因此如果要接收的数据量大于buffsize ,则需要多次调用该方法进行接收。
socket.sendto(bytes, address) : 发送UDP Socket 数据,将bytes 数据发送到地址为address 的远程Socket ,返回成功发送的字节数。如果要发送的数据量很大,则需要多次调用该方法发送数据。
举例:
# 服务器 import socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.bind(('', 8888)) print(' 服务器启动...') # 从客户端接收数据 data, address = s.recvfrom(1024) print(' 从客户端接收消息:{0}'.format(data.decode())) # 给客户端发送数据 s.sendto(' 你好!'.encode(), address) # 释放资源 s.close() 复制代码
# 客户端 import socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 服务器地址 address=('127.0.0.1', 8888) # 给服务器发送数据 s.sendto(b'Hello', address) # 从服务器接收数据 data, _ = s.recvfrom(1024) print(' 从服务器接收消息:{0}'.format(data.decode())) # 释放资源 s.close() 复制代码
二、TCP 上传文件
== 服务器端: ==
import wx import socket class MyFrame(wx.Frame): def __init__(self): super().__init__(parent=None,size=(300,150),title="服务器接收文件") self.Center() panel=wx.Panel(self) self.fileText = wx.TextCtrl(panel,id=-1,pos=(15,5),size=(200,25)) openButton = wx.Button(panel, label="开启服务器") upButton = wx.Button(panel, label="保存" ) openButton.Bind(wx.EVT_BUTTON, self.acceptFile) upButton.Bind(wx.EVT_BUTTON, self.saveFile) box = wx.BoxSizer() mainbox=wx.BoxSizer(wx.VERTICAL) mainbox.Add(self.fileText, 1, flag= wx.ALL|wx.CENTER,border=10) box.Add(openButton, 1, flag=wx.ALL|wx.CENTER,border=10) box.Add(upButton, 1, flag=wx.ALL | wx.CENTER, border=10) mainbox.Add(box, 1, flag=wx.ALL | wx.CENTER, border=10) panel.SetSizer(mainbox) def acceptFile(self, event): self.s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.s.bind(("", 8888)) self.s.listen(5) print("服务器启动····") with self.s.accept()[0] as conn: buffer = [] while True: data = conn.recv(1024) if data: buffer.append(data) else: break self.reust = bytes().join(buffer) self.fileText.SetValue("接收完全,可选择位置保存!") def saveFile(self, event): dlg = wx.FileDialog(self,message="select the Save file style",defaultFile="",) if dlg.ShowModal() == wx.ID_OK: filename = "" paths = dlg.GetPaths() for path in paths: filename = filename + path self.fileText.SetValue(filename) with open(filename, "wb") as file: file.write(self.reust) print("服务器文件保存完成····") dlg.Destroy() class App(wx.App): def OnInit(self): frame=MyFrame() frame.Show() return True if __name__=="__main__": app=App() app.MainLoop() 复制代码
==客户端:==
import socket import wx class MyFrame(wx.Frame): def __init__(self): super().__init__(parent=None,size=(300,150),title="客户端上传文件") self.Center() panel=wx.Panel(self) self.fileText = wx.TextCtrl(panel,id=-1,pos=(15,5),size=(200,25)) openButton = wx.Button(panel, label="打开")###### upButton = wx.Button(panel, label="上传" ) openButton.Bind(wx.EVT_BUTTON, self.onOpenFile)#### upButton.Bind(wx.EVT_BUTTON, self.upLoad) box = wx.BoxSizer() mainbox=wx.BoxSizer(wx.VERTICAL) mainbox.Add(self.fileText, 1, flag= wx.ALL|wx.CENTER,border=10) box.Add(openButton, 1, flag=wx.ALL|wx.CENTER,border=10) box.Add(upButton, 1, flag=wx.ALL | wx.CENTER, border=10) mainbox.Add(box, 1, flag=wx.ALL | wx.CENTER, border=10) panel.SetSizer(mainbox) def onOpenFile(self, event): dlg = wx.FileDialog( self, message="Choose a file", defaultFile="",) if dlg.ShowModal() == wx.ID_OK: tmp = "" paths = dlg.GetPaths() for path in paths: tmp=tmp+path self.fileText.SetValue(tmp) dlg.Destroy() def upLoad(self,event): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect(("127.0.0.1", 8888)) with open(self.fileText.GetValue(), "rb") as file: while True: data = file.read(1024) if data: s.send(data) else: break print("上传完成") class App(wx.App): def OnInit(self): frame=MyFrame() frame.Show() return True if __name__=="__main__": app=App() app.MainLoop() 复制代码
三、TCP 计算圆的面积
== 服务器端: ==
import socket import wx class MyFrame(wx.Frame): def __init__(self): super().__init__(parent=None,size=(350,250),title="服务器:计算圆的面积") self.Center() panel=wx.Panel(self) self.text = wx.TextCtrl(parent=panel, id=-1, style=wx.TE_MULTILINE,size=(100,150)) mainbox=wx.BoxSizer(wx.VERTICAL) mainbox.Add(self.text,flag=wx.EXPAND) panel.SetSizer(mainbox) # 应用程序 class App(wx.App): def OnInit(self): frame=MyFrame() frame.Show() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 8888)) s.listen(5) with s.accept()[0] as conn: while True: data = conn.recv(1024) if len(data) != 0: frame.text.LabelText += "你的半径为:" + data.decode() + "\r\n" msg=str(3.14*float(data.decode())**2) conn.send(msg.encode()) frame.text.LabelText += "计算结果:" + msg + "\r\n" else: break return True # 进入main函数运行:循环 if __name__=="__main__": app=App() app.MainLoop() 复制代码
==客户端:==
import socket import wx s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) class MyFrame(wx.Frame): def __init__(self): super().__init__(parent=None, size=(350, 250), title="客户端:计算圆的面积") self.Center() panel = wx.Panel(self) self.text = wx.TextCtrl(parent=panel, id=-1, style=wx.TE_MULTILINE, size=(100, 150)) startbutton = wx.Button(parent=panel, id=1, label="连接服务器") self.inputtext = wx.TextCtrl(parent=panel, id=3,style=wx.TE_PROCESS_ENTER) button = wx.Button(parent=panel, id=2, label="发送") self.Bind(wx.EVT_BUTTON, self.Talk, id=1) self.Bind(wx.EVT_BUTTON, self.on_button, id=2) self.Bind(wx.EVT_TEXT_ENTER, self.on_button, self.inputtext) box1 = wx.BoxSizer() box1.Add(startbutton, 1, flag=wx.ALL | wx.EXPAND, border=10) box1.Add(self.inputtext, 1, flag=wx.ALL | wx.EXPAND, border=10) box1.Add(button, 1, flag=wx.ALL | wx.EXPAND, border=10) mainbox = wx.BoxSizer(wx.VERTICAL) mainbox.Add(self.text, flag=wx.EXPAND) mainbox.Add(box1, flag=wx.EXPAND) panel.SetSizer(mainbox) # 连接服务器函数 def Talk(self, event): s.connect(("127.0.0.1", 8888)) return # 发送信息给服务器端 def on_button(self, event): msg = self.inputtext.GetValue()# 获取要发送的文本 s.send(msg.encode())# 给服务器端发送信息 self.text.LabelText += "请输入半径:" + msg + "\r\n"# 将发送信息写入纪录文本框 data = s.recv(1024)# 等待接收服务器端信息 if len(data) != 0: self.text.LabelText += "服务器计算结果为:" + data.decode() + "\r\n"# 将发送信息写入纪录文本框 class App(wx.App): def OnInit(self): frame = MyFrame() frame.Show() return True if __name__=="__main__": app=App() app.MainLoop() 复制代码
本文地址:https://blog.csdn.net/m0_49079037/article/details/107348448
上一篇: 缓存有那么多种,分别是干什么的?