欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Python 常用工具函数

程序员文章站 2022-06-19 11:58:45
...

File

获取同目录文件绝对路径

import os.path
os.path.join(os.path.abspath(os.path.dirname(__file__)), "../book_proects_list_config.txt")

读取文件, with open as工具可自动关闭文件

with open(filePath,'r') as fp:
    lines=[ line.strip() for line in fp.readlines()]
import os
import shutil
os.chdir(path)   # cd
os.makedirs(path)   # mul level, if exist error
os.mkdir(path)   # single level, if exist error
shutil.rmtree(path)   # delete file and subdir,只读文件无法删除

#删除只读文件
ShellUtil().remove_dir(path)

shutil.copyfile(src, dst)
shutil.copytree(srcDir, dstDir)  #dstDir must not exist

Shell

class ShellUtil:
    def excute_command(self, cmdStr):
        p = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        usefulMessage = ''
        for line in p.stdout.readlines():
            usefulMessage += line
        retval = p.wait()
        return retval, usefulMessage

    def remove_dir(self, path):  # del dir and subdir and files
        cmdStr = 'rd '+path+'/S /Q'
        retval, usefulMessage = self.excute_command(cmdStr)
        return retval, usefulMessage

参数

import getopt

options, args= getopt.getopt(sys.argv[1:], 'HhFfSsIiAaW:w:C:c:UuZ:z:Rr', ['help','full','ignore','auto-upload','workspace=','clearlevel=','upload-only','zip-path=','refresh-book-list','specified'])

异常退出

import sys
sys.exit(1)

系统

import platform
if platform.system()=='Windows':
    print ''
elif platform.system()=='Linux':
    print ''

私有方法变量

双下划线开头

__param1 = None
def __initWorkSpace(self, workSpaceRootPath, cleanWorkSpace):
    return

Zipfile

ZIP压缩目录以及目录下的子文件

import zipfile
zipBooksFile = zipfile.ZipFile(newZipFilePath),'w',zipfile.ZIP_DEFLATED)
for parent, dirnames, filenames in os.walk(toZipDir):
    for dirname in dirnames:
        fullPath = os.path.join(parent,dirname)
        zipBooksFile.write(fullPath, fullPath[len(toZipDir)+1:])
    for filename in filenames:
        fullPath = os.path.join(parent,filename)
        zipBooksFile.write(fullPath, fullPath[len(toZipDir)+1:])
zipBooksFile.close()

time

import time
ISOTIMEFORMAT='%Y-%m-%dT%H-%M-%S'
time.strftime(ISOTIMEFORMAT, time.localtime())
time.strptime('2017-09-08T09-12-48', ISOTIMEFORMAT)

Yaml

# pip install pyyaml
import yaml
yaml.load(file(path))

string

‘d:/tmp/test’.replace(‘/’,’\’)

gitlab

# pip install python-gitlab
from gitlab import Gitlab
class GitlabUtil:
    glInstance=None
    def __init__(self):
        self.__auth()
    def __auth(self):
        self.glInstance=Gitlab(url, private_token)
        self.glInstance.auth()
    def getAllGitbookProjects(self):
        projects = GitlabUtil().glInstance.groups.get(groupid).projects.list(all=True)
        return [(project.path, project.description) for project in projects]

encode

#coding: utf-8
description=u'DRDS\u5206\u5e03\u5f0f\u5173\u7cfb\u6570\u636e\u5e93\u8bbe\u8ba1\u6587\u6863'   # unicode编码
print description.encode('utf-8')

print 'aaa'+'\xe2\x80\xaa'.decode('utf-8')+'bbb'  #gbk被utf-8编码乱码

paramiko

#pip install paramkio
import paramiko

### ssh
sshClient = paramiko.SSHClient()
sshClient.load_system_host_keys()           sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy)
sshClient.connect(hostname=ip or host, port=22, username, password, timeout=5)

stdin, stdout, stderr = sshClient.exec_command(command=commandStr, timeout=5)
existStatus = stdout.channel.recv_exit_status()
print stdout.readlines()
print stderr.readlines()
sshClient.close()

###sftp
transport = paramiko.Transport((host or ip, 22))
transport.connect(username, password)
sftpClient = paramiko.SFTPClient.from_transport(transport)
sftpClient .put(localPath, remotePath)
sftpClient .close()

thread&lock

import thread
sshClientLock = thread.allocate_lock()
class Util:
    def method(self):
        sshClientLock.acquire()
        ...
        sshClientLock.release()

不同级目录调用

  • root
    • deploy.py
    • DeployDocsTool
      • init.py
      • deploy_book.py
#__init__文件很重要
from DeployDocsTool.deploy_book import DeployBook

随机数

import random
random.uniform(5,10)   #随机5到10内的小数,如 5.96334428515
random.randint(5,10)  #随机5到10内的整数
random.random() #随意0到1之内的小数

vara=random.uniform(5,10)
round(vara, 3) # 额外的,可以使用round函数选择保留几位小数

文件锁

Python使用fcntl模块支持文件锁功能

函数原型: int flock(int fp, int operation);

fcntl支持共享锁模式和排它锁模式,使用以下关键字创建不同类型的锁。

  • LOCK_SH:表示要创建一个共享锁,在任意时间内,一个文件的共享锁可以被多个进程拥有;
  • LOCK_EX:表示创建一个排他锁,在任意时间内,一个文件的排他锁只能被一个进程拥有;
  • LOCK_UN:表示删除该进程创建的锁;
  • LOCK_MAND:它主要是用于共享模式强制锁,它可以与 LOCK_READ 或者 LOCK_WRITE联合起来使用,从而表示是否允许并发的读操作或者并发的写操作;

fcntl的锁是劝告锁,即不是强制锁,它并不能够阻止linux系统的其他进程访问被加锁的文件;它只是在文件上设置一个锁标志。在程序代码中,如果要实现锁机制,应该在操作文件之前手动检查锁的状态,并根据锁状态决定是否对目标文件执行操作。

检查锁状态的方法也是加锁的方法。若某文件的锁已经被其他进程持有,则该加锁的请求会被阻塞,等待获取锁。

import fcntl
# 创建锁类,使用该类实现锁的持有与释放
class FileLock:

    def __init__(self, targetFilePath):
        self.targetFilePath = targetFilePath

    def getLock(self): #持有一个锁
        self.fp = open(self.targetFilePath,'r')
        fcntl.flock(self.fp, fcntl.LOCK_EX)  #如果无法获得锁,此处将阻塞

    def releaseLock(self): #释放一个锁
        fcntl.flock(self.fp, fcntl.LOCK_UN)
        try:
            self.fp.close()
        except:
            raise 'Close lock file handler error.'

lock=FileLock(targetFilePath)
lock.getLock()

with open('targetFilePath', 'r+') as fp:
    #对文件操作
with open('targetFilePath', 'w') as fp:
    #对文件操作

lock.releaseLock()
# 以上的一系列文件操作处于一个锁内

日志

Python使用logging模块提供日志记录功能

logging模块的配置具有进程共享性。既如果在一个python进程中,不同的module均引用了logging模块,则在其中一个module中配置了logging参数,则在其他module中,不需要再次进行配置,直接使用其它模块配置的logging即可。若再次配置,可能会出现日志重复打印的情况。

进一步的,我们需要在一个Python进程启动时,编写一个logging初始化的功能函数。

import logging
import os
# logger初始化函数
def initLogger(loglevel=logging.DEBUG, logname='deployLogger', \
             logfile='/var/log/deploy-doc.log'):

    if not os.path.exists(logfile):
        logfileParent = os.path.dirname(logfile)
        if not os.path.exists(logfileParent):   
            os.makedirs(logfileParent)

    logger = logging.getLogger(logname)
    logger.setLevel(loglevel)

    fh=logging.FileHandler(logfile)
    fh.setLevel(loglevel)

    ch = logging.StreamHandler()
    ch.setLevel(loglevel)

    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)

    logger.addHandler(fh)
    logger.addHandler(ch)

#获取已经初始化的logger
def getLogger(logname='deployLogger'):
    return logging.getLogger(logname)


# main入口模块
import logger_util
import logging

class ScheduleUtil:
    def __init__(self):
        self.logger = logger_util.getLogger()  #此处进行了logger的初始化


#其他直接使用logger的模块
import logger_util

class ShellUtil:

    def __init__(self):
        self.logger = logger_util.getLogger()