老师的好帮手——课堂考勤记录整合
程序员文章站
2022-06-15 20:25:43
文章目录问题背景研究背景目标结果代码问题背景线上课程积累了很多考勤记录,人为处理这些考勤记录既费时又费力,还容易导致遗漏。那么有没有一种方法可以通过一堆考勤记录和一个学生名单直接导出成为一个总的考勤记录表格呢?研究背景以腾讯课堂为例子根据线上课程的经验,在腾讯课堂进行上课的课程居多,因此本文考虑的考勤记录都是基于腾讯课堂的考勤记录。拥有一个点名册我们假设每个老师都有一个点名册,而且最终成绩应该只需要统计成绩单上的同学。不会有重名我们假设不会出现重复的唯一标识,简单起见,我们假...
问题背景
线上课程积累了很多考勤记录,人为处理这些考勤记录既费时又费力,还容易导致遗漏。
那么有没有一种方法可以通过一堆考勤记录和一个学生名单直接导出成为一个总的考勤记录表格呢?
研究背景
-
以腾讯课堂为例子
根据线上课程的经验,在腾讯课堂进行上课的课程居多,因此本文考虑的考勤记录都是基于腾讯课堂的考勤记录。 -
拥有一个点名册
我们假设每个老师都有一个点名册,而且最终成绩应该只需要统计成绩单上的同学。 -
不会有重名
我们假设不会出现重复的唯一标识,简单起见,我们假设为姓名。 -
合并相同姓名的时长
对于同学们使用多个账号的信息,我们将所有相同标识的上课时长进行相加,作为总计时长。
目标结果
目标结果为按照点名册上的序号进行统计的一个考勤结果
代码
具体的讲解在注释中有,如果没有特殊要求,直接复制这段代码然后运行,就可以得到上述的统计结果。
# -*- coding: utf-8 -*-
#获得姓名到序号的映射,其中rankpath为该映射所在文件路径,需要是xlsx类型。
#默认为采用成绩统计表单中的A列作为“序号”,C列为“姓名”。
def get_name2index(rankpath):
import pandas as pd
#读取rank中的名字的顺序,并映射到序号列表中,方便后续进行排序
rank = None
#进行一波尝试,尝试读取文件
while True:
try:
print('正在尝试打开文件:',rankpath)
rank = pd.read_excel(rankpath,usecols='A,C')
break
except Exception as e:
print('打开文件错误,请检查是否被占用\n错误信息为: ',e)
map_name2index = {rank['姓名'][i]:int(rank['序号'][i]) for i in range(0,len(rank['姓名']))}
return map_name2index
#将文件中学生的上课备注信息转化为该生的唯一表示,在本程序中使用字段的所有中文信息(默认为姓名)作为学生的唯一表示。
#当然该部分可以把转化的目标改成序号+姓名等多值组合,从而达到一个姓名可以出现多次的效果。
def toname(str1):
import re
#利用re的替换功能,完成汉语字符串的提取。也可以更改为其他模式,例如split
str0 = re.sub(r'[^\u4e00-\u9fa5]+','',str1)
return str0
#子demo,检查文件是不是满足考勤文件的文件名命名格式
#默认为只要满足‘XXXX.xlsx’格式即可
def check_file(filename):
if '.xlsx' in filename:
return True
else:
return False
#获取某节课的标准时长,默认为腾讯课堂的出勤记录整理模式。
def get_classtime(filepath):
import pandas as pd
remc = None
#尝试一下打开文件。
while True:
try:
print('正在尝试打开文件:',filepath)
remc = pd.read_excel(filepath,usecols='B',skiprows=1,nrows=1)
break
except Exception as e:
print('打开文件错误,请检查是否被占用\n错误信息为: ',e)
total_time0 = str(remc['直播时长'][0])
total_time0 = int(total_time0.replace('分钟',''))
return total_time0
#通过一个上课时长的字符串获取具体学生的上课时长
def get_stutime(strtime):
if strtime == r'不足一分钟':
strtime = 1
else:
strtime = int(strtime.replace('分钟',''))
return strtime
#填充缺失值,默认填充缺失值为均0
def fill_empty(indexlist):
#把每节课的上课时长都填充为0
addmp = {indexone:0 for indexone in indexlist}
stutime = 0
return addmp,stutime
#合并函数,把所有相同的信息合并起来
def merge_time(total_count,newindex,oldindex,classlist):
import pandas
for indexone in classlist:
total_count.loc[newindex,indexone] += total_count[indexone][oldindex]
total_count.loc[newindex,'总时长'] += total_count['总时长'][oldindex]
return total_count
#考勤文件所处文件夹, 最终统计结果输出的文件路径, 人员名称到序号的映射, 合并函数, 文件名检查条件, 不满足信息输出路径
#需要满足的是:考勤文件所处文件夹的excel文件尽量都是考勤文件,但不需要添加最后的‘\’,标准样例:.\考勤
#最终输出结果所在文件夹名称
#合并相同姓名的同学默认为merge_time,即多个相同姓名的学生的上课时长相加合并
#文件名检查条件默认为所有.xlsx文件,要求文件夹内全部存放考勤记录
def read_all(path,outputpath,name2index,merge=merge_time,filecheck=check_file):
import os
import pandas as pd
#获取姓名到序号的映射
map_name2index = name2index
#维护多个变量初始值,方便后续进行增量修改
indexlist = [] #维护课程名称的list,记录每一节课的名称
map_ID2record = {} #维护一个ID to 上课记录的映射,为后续生成记录表准备条件。
map_ID2name = {} #维护一个ID to 第一次出现的姓名的映射,以第一次上课的姓名作为标准姓名进行处理
total_time = 0 #维护一个课程的总时长,后续计算平时分的公式为 : 个人上课总时长 / 全部课时总时长 * 100
map_ID2totaltime = {} #维护一个ID 的总上课时长,免得最后axis计算一次
map_ID2index = {} #维护一个ID→姓名,姓名→序号的合成,ID→序号映射
#枚举path中的每个考勤文件,也就是每节课的考勤记录
for file in os.listdir(path):
#避免无关文件读取报错
if filecheck(file) is False:
continue
#展示一下进度
print('正在处理的文件为 : ',file)
#维护一下这节课的总时长,判定标准总时长-20
classtime = get_classtime(path+'/'+file)
total_time += classtime
#指明需要读取的具体列数,这里使用 C列:授课内容;D列学生姓名;E列学生ID;H列观看直播时长
#默认统计只使用观看直播时长,不考虑观看录播的时长
rem = pd.read_excel(path+'/'+file,skiprows=4,usecols="C:E,H")
#维护每节课的名字
class_name = rem['授课内容'][0]
indexlist.append(class_name)
#提取ID-时长序列
names = rem['学生姓名'] #姓名
ID = rem['学生ID'] #ID
time1 = rem['观看直播时长'] #所有时长
#枚举每个ID-时长键值对,增加到映射中
#构建列名为ID,行取值为time1的映射,然后insert即可
for i in range(0,len(ID)):
id = str(ID[i])
#如果这个ID没有在课堂中出现过,那么创建一个ID的记录
if id not in map_ID2name:
#需要记录的是ID对应的姓名和序号
#首先记录姓名,然后通过姓名获取序号
map_ID2name[id] = toname(names[i])
map_ID2index[id] = map_name2index.get(toname(names[i]),888)
#初始化一下后面需要累加的东西
map_ID2record[id] = {}
map_ID2totaltime[id] = 0
#单独对观看时长进行一个处理,转化成能记录的数字
time = get_stutime(str(time1[i]))
#统计总时长
map_ID2totaltime[id] += time
#统计每节课的时长
map_ID2record[id][class_name] = time
#展示一下进度
print('已经完成对 : ',file,' 的处理\n')
#展示进度
print('正在合并所有信息')
total_count = pd.DataFrame() #维护一个总的统计结果
#构建DataFrame,并利用自动合并功能合并所有信息
total_count = pd.DataFrame(map_ID2record)
total_count = total_count.append(pd.DataFrame(map_ID2name,index=['位置不对的姓名']))
total_count = total_count.append(pd.DataFrame(map_ID2totaltime,index=['总时长']))
total_count = total_count.append(pd.DataFrame(map_ID2index,index=['序号']))
#转置一下是因为现在生成的表格是竖着的,而不是横着的
total_count = total_count.T
#调整一下列的顺序
name = total_count['位置不对的姓名']
total_count.drop(labels=['位置不对的姓名'],axis=1,inplace=True)
total_count.insert(0,'姓名',name)
#展示进度
print('合并表格完成\n\n正在核对没有出勤的同学\n')
#填充一波没有出现过的姓名的同学
namelist = name.tolist()
for name,value in map_name2index.items():
#如果一个在最终名单的人没有在任何考勤记录里出现过,进行处理
if name not in namelist:
#输出一下,展示出来有啥问题吗
print('\t没有上过一次课的同学:name is ',name,'. value is ',value)
#缺失同学信息的填充
addmp,stu_time = fill_empty(indexlist)
#default是基础信息
map_default = {'姓名':name,'总时长':stu_time,'序号':value}
addmp.update(map_default)
#把信息更新到表格中
total_count = total_count.append(addmp,ignore_index=True)
#记录一下那些上了课但是找不到名字的同学
no_name_count = total_count.loc[total_count['序号']==888]
print('\n\t上过课但是没有姓名的同学: ',no_name_count['姓名'].tolist(),'\n')
#填充一波空值为0,在展示的时候好看,也是为了方便计算
for indexone in indexlist:
total_count[indexone].fillna(0,inplace=True)
#处理那些有多个号的同学
print('核对没有出勤同学完成\n\n正在整合具有多个账号的同学信息')
if merge != None:
del_rows = []
map_name2firstindex = {}
#遍历每个变量,看看是不是重了
for i in range(0,len(total_count)):
namenow = total_count['姓名'][i]
#出现了重复的情况
if namenow in map_name2firstindex:
firstindex = map_name2firstindex[namenow]
print('\t查找到一个 ',namenow,' \t同学有重复信息')
#维护一个新旧信息的合并过程,默认为把所有上课时长都加起来
total_count = merge(total_count, i, firstindex, indexlist)
#然后把之前的老序列加到删除序列中
del_rows.append(firstindex)
map_name2firstindex[namenow] = i
#最后进行一波总的删除
total_count.drop(del_rows,axis = 0,inplace = True)
else:
pass
#展示进度
print('整合信息完成\n\n正在计算考勤成绩')
#计算考勤成绩的公式为:学生上课总时长 / 整个课程的总时长 * 100
total_count['考勤成绩'] = total_count['总时长']*100.0/total_time
total_count.drop(labels=['总时长'],axis=1,inplace=True)
#展示进度
print('计算成绩完成\n\n正在排序')
#排个序
total_count.sort_values(by='序号',inplace=True)
#展示进度
print('排序完成\n\n正在整理符合名单的信息')
#把不是正规的同学给删了
not_in_list = total_count.loc[total_count['序号'] == 888]
total_count = total_count.loc[total_count['序号'] < 888]
#重新设置一下序号index。
total_count = total_count.set_index(['序号'])
not_in_list = not_in_list.set_index(['序号'])
print('整理信息完成\n')
outputname = outputpath + r'/考勤结果统计.xlsx'
notlistpath = outputpath + r'/不标准的考勤结果统计.xlsx'
while True:
try:
#展示进度2
print('正在将符合名单的信息导出到excel')
total_count.to_excel(outputname)
break
except Exception as e:
#显示重新尝试
print('导出错误,错误信息为: ',e)
print('随便输点东西,重试一下吧!')
input()
if notlistpath != None:
while True:
try:
#展示进度2
print('正在将不符合的信息导出到excel')
not_in_list.to_excel(notlistpath)
break
except Exception as e:
#显示重新尝试
print('导出错误,错误信息为: ',e)
print('随便输点东西,重试一下吧!')
input()
#展示进度
print('导出成绩完成\n')
print('总课时为: ',total_time,' 分钟')
return
#取得需要生成文件的路径
def get_paths(inputstr):
import time
from tkinter import filedialog
import tkinter as tk
#设置一个tkinter的窗口,然后设置为隐藏
root = tk.Tk()
root.withdraw()
print(inputstr)
#弹出提示之后询问路径
time.sleep(2)
folderpath = filedialog.askdirectory()
#输出一下输入的路径核对一下
print('\t', folderpath)
return folderpath
#读取具体文件的路径
def get_path(inputstr):
import time
from tkinter import filedialog
import tkinter as tk
#设置一个tkinter的窗口,然后设置为隐藏
root = tk.Tk()
root.withdraw()
#弹出提示之后询问路径
print(inputstr)
time.sleep(2)
filepath = filedialog.askopenfilename()
#输出一下输入的路径核对一下
print('\t', filepath)
return filepath
#延迟展示,防止看不到就没了
def deferdo():
import time
for i in range(0,5):
print(5-i,' 秒后退出该窗口!')
time.sleep(1)
if __name__ == '__main__':
#三个路径的选择
resourcepath = get_paths(r'请选择考勤文件所在的文件夹:')
outputpath = get_paths(r'请选择输出结果所处文件夹:')
rankpath = get_path('请选择标准成绩单的文件:\n如果选择为空则所有学生成绩都会到\'不标准的考勤结果统计\'文件中:')
#读取
read_all(resourcepath, outputpath, get_name2index(rankpath))
#延迟一下,看一下结果
deferdo()
鉴于可能部分电脑没有配置相关环境,或可能需要进行更加大范围的合并考勤记录。
因此可以考虑使用TCP客户端进行文件的传输,然后在服务器端完成合并的操作。
其中下面的代码引用了python socket传输文件(视频、图片、文本等)该文章的代码。
#socket TCP传递文件的客户端,传入参数为地址和端口
def socket_client_file(host='127.0.0.1',port=888):
import socket
import os
import sys
import struct
#开启tcp连接,并尝试连接
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
except socket.error as msg:
print (msg)
sys.exit(1)
print (s.recv(1024))
# 需要传输的文件路径
filepath = get_path('请选择文件:')
# 判断是否为文件
if os.path.isfile(filepath):
# 定义定义文件信息。128s表示文件名为128bytes长,l表示一个int或log文件类型,在此为文件大小
fileinfo_size = struct.calcsize('128sl')
# 定义文件头信息,包含文件名和文件大小
fhead = struct.pack('128sl', os.path.basename(filepath).encode('utf-8'), os.stat(filepath).st_size)
# 发送文件名称与文件大小
s.send(fhead)
# 将传输文件以二进制的形式分多次上传至服务器
fp = open(filepath, 'rb')
while 1:
data = fp.read(1024)
if not data:
print ('{0} file send over...'.format(os.path.basename(filepath)))
break
s.send(data)
# 关闭当期的套接字对象
s.close()
#socket TCP传递文件夹的客户端,传入参数为地址和端口
def socket_client_fold(host='127.0.0.1',port=888):
import socket
import os
import sys
import struct
#开启tcp连接,并尝试连接
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
except socket.error as msg:
print (msg)
sys.exit(1)
print (s.recv(1024))
# 需要传输的文件路径
foldpath = get_paths('请选择文件夹:')
for file in os.listdir(foldpath):
# 判断是否为文件
if os.path.isfile(file):
# 定义定义文件信息。128s表示文件名为128bytes长,l表示一个int或log文件类型,在此为文件大小
fileinfo_size = struct.calcsize('128sl')
# 定义文件头信息,包含文件名和文件大小
fhead = struct.pack('128sl', (os.path.basename(foldpath)+'/'+file).encode('utf-8'), os.stat(flodpath + '/' +file).st_size)
# 发送文件名称与文件大小
s.send(fhead)
# 将传输文件以二进制的形式分多次上传至服务器
fp = open(filepath, 'rb')
while 1:
data = fp.read(1024)
if not data:
print ('{0} file send over...'.format(os.path.basename(file)))
break
s.send(data)
# 关闭当期的套接字对象
s.close()
#TCP服务端,接收数据
def socket_service(host='127.0.0.1',port=888):
import threading
import socket
import struct
#建立socket套接字,等待连接
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定端口为9001
s.bind((host, port))
# 设置监听数
s.listen(10)
except socket.error as msg:
print (msg)
sys.exit(1)
print ('Waiting connection...')
#单独的接收数据的一个线程
def deal_data(conn, addr):
print ('Accept new connection from {0}'.format(addr))
# conn.settimeout(500)
# 收到请求后的回复
conn.send('Hi, Welcome to the server!'.encode('utf-8'))
while True:
# 申请相同大小的空间存放发送过来的文件名与文件大小信息
fileinfo_size = struct.calcsize('128sl')
# 接收文件名与文件大小信息
buf = conn.recv(fileinfo_size)
# 判断是否接收到文件头信息
if buf:
# 获取文件名和文件大小
filename, filesize = struct.unpack('128sl', buf)
fn = filename.strip(b'\00')
fn = fn.decode('utf-8')
print ('file new name is {0}, filesize if {1}'.format(str(fn),filesize))
recvd_size = 0 # 定义已接收文件的大小
# 存储在该脚本所在目录下面
fp = open('./' + str(fn), 'wb')
print ('start receiving...')
# 将分批次传输的二进制流依次写入到文件
while not recvd_size == filesize:
if filesize - recvd_size > 1024:
data = conn.recv(1024)
recvd_size += len(data)
else:
data = conn.recv(filesize - recvd_size)
recvd_size = filesize
fp.write(data)
fp.close()
print ('end receive...')
# 传输结束断开连接
conn.close()
break
#无限循环然后等待连接
while True:
# 等待请求并接受(程序会停留在这一旦收到连接请求即开启接受数据的线程)
conn, addr = s.accept()
# 接收数据
t = threading.Thread(target=deal_data, args=(conn, addr))
t.start()
本文地址:https://blog.csdn.net/H_18763886211/article/details/107632098