欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python测试用例服务器与客户端

程序员文章站 2022-03-11 21:19:11
Python测试用例服务器与客户端客户端:无限发,可以多个客户端同步进行服务器:根据进程确定接收客户端数量,根据信息头进行选择型输出import socketfrom multiprocessing import Processimport threadingimport sys, osimport sysimport iodef setup_io(): sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detac...

Python测试用例服务器与客户端

客户端:无限发,可以多个客户端同步进行
服务器:根据进程确定接收客户端数量,根据信息头进行选择型输出

import socket
from multiprocessing import Process
import threading
import sys, os
import sys
import io

def setup_io():
    sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
    sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
setup_io()

def my_fun(self):
    try:
        for i in range(10):
            # 变量声明
            PORT = 9993
            server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            #重复使用绑定信息
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            address = ("192.168.56.1", PORT)
            server_socket.bind(address)

            receive_data_array1 = []
            receive_data_array2 = []
            for i in range(2):
                print('server waiting')
                receive_data, client_address = server_socket.recvfrom(1024)
                print("接收到了客户端 %s 传来的数据: %s" % (client_address, receive_data.decode()))

                receive_data_array =[]
                receive_data_array.append(receive_data.decode())
                receive_data_array1 = receive_data_array1 + receive_data_array
                receive_data_array2 = receive_data_array2 + receive_data_array

            msg = input("请输入检索设备:")
            if(msg==(receive_data_array1[0][0:1])):
                    print(receive_data_array1[0])
                    receive_data_array1 = []
                    # server_socket.sendto("hello".encode(), client_address)
            elif(msg==(receive_data_array1[1][0:1])):
                    print(receive_data_array1[1])
                    receive_data_array2 = []
                    # server_socket.sendto("hello".encode(), client_address)
            else:
                    # print(receive_data_array)
                    # server_socket.sendto(msg.encode(), client_address)
                    # print(receive_data_array)
                    print(receive_data_array1)
                    # print(receive_data_array2)
                    # print(receive_data_array3)

    finally:
            server_socket.close()

#进程
def process_works(func, arg, worknum):
    proc_record = []
    for i in range(worknum):
        p = Process(target=func, args=(arg,))
        p.start()
        proc_record.append(p)
    for p in proc_record:
        p.join()
#线程
def thread_works(func, arg, worknum):
    thread_record = []
    for i in range(worknum):
        t = threading.Thread(target=func, args=(arg,))
        t.start()
        thread_record.append(t)
        print(thread_record)
    for t in thread_record:
        t.join()




if __name__ == '__main__':
    arg = 5
    procs=3
    thread_num=3
    #procs = 2   #进程个数
    #process_works(my_fun, arg, procs)
    thread_works(my_fun, arg, thread_num)
    #process_works(my_fun, arg, worknum):












#!/usr/bin/env python
#coding:utf-8
import socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

while True:
    msg = input("请输入要发送的内容:")
    server_address = ("192.168.56.1", 9993)
    client_socket.sendto(msg.encode(), server_address)
    # server_reply,server_address=client_socket.recvfrom(1024)
    # 打印接受的数据
    # print(str(server_reply, 'utf8'))
    # if (msg == 'quit'):
    #     break
    for i in range(4):
        if i > 3:
            break

# 关闭连接
client_socket.close()



本文地址:https://blog.csdn.net/qq_41519058/article/details/110421604