欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

mysql数据库自动填充数据,性能测试

程序员文章站 2022-06-01 16:34:00
...

import string
import argparse
import random
import threading
import time
from contextlib import contextmanager

import pymysql

DB_NAME=‘test_insert_data_db’
TABLE_NAME=‘test_insert_data_table’
CREATE_TABLE_STATEMENT=""“create table {0} (id int(10) AUTO_INCREMENT,
name varchar(255),
datetime double,
PRIMARY KEY(id))”"".format(TABLE_NAME)

def _argparse():
parse=argparse.ArgumentParser(description=‘benchmark tool for mysql database’)
parse.add_argument(’–host’,action=‘store’,dest=‘host’,required=True,help=‘connect to host’)
parse.add_argument(’–user’,action=‘store’,dest=‘user’,required=True,help=‘user for login’)
parse.add_argument(’–password’,action=‘store’,dest=‘password’,required=True,help=‘password to use when connecting to server’)
parse.add_argument("–port",action=‘store’,dest=‘port’,default=3306,type=int,help=‘port number to use for connection or 3306’)
parse.add_argument(’–thread_size’,action=‘store’,dest=‘thread_size’,default=5,type=int,help=‘how much connection for database’)
parse.add_argument(’–row_size’,action=‘store’,dest=‘row_size’,default=5000,type=int,help=‘how much rows’)

parse.add_argument('-v','--version',action='version',version='%(prog)s 0.1')
return  parse.parse_args()

@contextmanager
def get_conn(**kwargs):
conn=pymysql.connect(**kwargs)
try:
yield conn
finally:
conn.close()
def create_db_and_table(conn):
with conn as cur:
for sql in [‘drop database if exists {0}’.format(DB_NAME),
‘create database {0}’.format(DB_NAME),
‘use {0}’.format(DB_NAME),
CREATE_TABLE_STATEMENT]:
print(sql)
cur.execute(sql)

def random_string(length=10):
s=string.ascii_letters+string.digits
return “”.join(random.sample(s,length))
def add_row(cursor):
SQL_FORMAT=“INSERT INTO {0} (name,datetime) values (’{1}’,’{2}’)”
sql=SQL_FORMAT.format(TABLE_NAME,random_string(),time.time())
cursor.execute(sql)
def insert_data(conn_args,row_size):
with get_conn(**conn_args) as conn:
with conn as c:
c.execute(‘use {0}’.format(DB_NAME))
with conn as c:
for i in range(row_size):
add_row©
conn.commit()

def main():
parse=_argparse()
conn_args=dict(host=parse.host,user=parse.user,password=parse.password,port=parse.port)
with get_conn(**conn_args) as conn:
create_db_and_table(conn)
thread=[]
for i in range(parse.thread_size):
t=threading.Thread(target=insert_data,args=(conn_args,parse.row_size))
thread.append(t)
t.start()
for t in thread:
t.join()

if name == ‘main’:
main()