欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python3 数据库的常用操作

程序员文章站 2024-02-23 20:57:28
...

一、Mysql数据库:

安装pymysql: 

pip install pymysql

1、数据库连接对象 connect 的常用方法:

cursor() # 创建一个游标对象

commit() # 提交事务

rollback() # 事务回滚

close() # 关闭数据库连接

2、游标对象 cursor 的常用方法:

execute()  # 执行SQL语句

executemany()  # 用来执行多天SQL语句

close() # 用来关闭游标

fetchone() # 用来从结果取出一条记录,并将游标指向下一条记录

fetchall()  # 从结果中取出所有记录

scroll()  # 用于游标滚动

示例如下:

from myconn import myconn

class Sql:
	def __init__(self,db=None):
		# 初始化连接数据库
		# self.conn = pymysql.connect(db=dataName, user="user", passwd="passwd", host="host",charset='utf8')
		# 这是我自己写的方法,平常的话,安装pymysql,用上面一句连接就可
		self.conn = myconn.get_conn(db) if db!=None else myconn.get_conn()

	def get_databases(self):
		'''  获取所有数据库名 '''
		return self.getSqlData("show databases")
	def get_tables(self):
		'''  获取所有表名 '''
		return self.getSqlData("show tables")

	def create_table(self,table):
		''' 创建表 '''
		cur = self.conn.cursor()
		# id:无符号整数,主键,自增;name:字符串;age:无符号整数
		sql = """create table {} (id int unsigned primary key auto_increment name varchar(10),age int unsigned)""".format(table)
		cur.execute(sql)
		self.conn.commit()

	def insert_data(self,table):
		''' 增加一条数据到数据表 '''
		cur = self.conn.cursor()
		sql = "insert into {}(name,age) values(%s,%s)"
		cur.execute(sql,('myName',80))
		self.conn.commit()

	def update_data(self,table):
		''' 修改表中的数据 '''
		cur = self.conn.cursor()
		sql = "update {} set age=18 where name='myName'".format(table)
		cur.execute(sql)
		self.conn.commit()

	def select_data(self, table):
		'''  从数据库表查询数据 '''
		cur = self.conn.cursor()
		sql = "select name,age from {}".format(table)
		cur.execute(sql)
		return cur.fetchall()

	def delete_data(self, table):
		'''  从数据库表删除数据 '''
		cur = self.conn.cursor()
		sql = "delete from {} where name='myName'".format(table)
		cur.execute(sql)
		self.conn.commit()


	def get_fields(self,table):
		'''  获取指定表的字段 '''
		cur = self.conn.cursor()
		sql = "SELECT * FROM {} LIMIT 1".format(table)
		cur.execute(sql)
		v = cur.description
		zds = [i[0] for i in v]
		self.conn.commit()
		return zds


	def unique(self,table,*fields):
		''' 唯一设置 
		table:表名,fields:字段名列表; '''
		cur = self.conn.cursor()
		if len(fields) == 1:
			sql = "ALTER TABLE {} ADD unique(".format(table)
		else:
			sql = "ALTER TABLE {} ADD UNIQUE KEY(".format(table)
		for i in fields:
			sql += i
			if i != fields[-1]:
				sql += ','
			else:
				sql += ')'
		try:
			cur.execute(sql)
		except Exception as exc:
			print(exc)
		else:
			self.conn.commit()
		
	def closeSql(self):
		''' 关闭数据库连接 '''
		self.conn.close()

二、MongoDB 数据库

1,安装 mongodb:

到MongoDB官网下载对应版本的安装包: https://www.mongodb.com/download-center?jmp=nav#community

1,把MongoDB安装到C盘或者D盘都可以

2,在C盘建立data\db文件夹作为数据文件的存储路径,建立data\log文件夹存储日志文件。

3,安装服务:cmd 进入到MongoDB的bin目录,执行:mongod --dbpath "C:\data\db"  --logpath "C:\data\log\log.txt"  --install -serviceName "MongoDB"

4,开启服务:net start MongoDB

注:有时由于没有正常关闭MongoDB,导致开启服务失败,可尝试删除C:\data\db下的mongod.lock再开启服务

mongod --dbpath "C:\data\db"  --logpath "C:\data\log\log.txt"  --install -serviceName "MongoDB"  

2,安装 pymongo:

pip install pymongo

示例如下:

import pymongo
import datetime
 
class Mongodb:
	""" Python MangoDB 的简单操作 """
 
	def __init__(self):
		# 1、建立连接
		# client = pymongo.MongoClient('localhost',27017) # 第一种方法
		client = pymongo.MongoClient('mongodb://localhost:27017') # 第二种方法
 
		# 2、获取数据库
		# db = client.db_name # 第一种方法
		db = client['db_name'] # 第二种方法
 
		# 3、获取一个集合
		# self.collection = db.table # 第一种方法
		self.collection = db['table'] # 第二种方法
 
	def insert_data(self):
		''' 插入文档 '''
		# name:姓名;age:年龄;datetime:存储时间
		user = {
			"name":"myName",
			"age":18,
			"datetime":datetime.datetime.utcnow()
		}
		# 插入后,如果文档内没有_id这个键值,则系统会自动添加_id;其中user可以是多个user组成的列表
		user_id = self.collection.insert(user)
		return user_id
 
	def find_one_data(self):
		''' 查询一条文档 '''
		data = self.collection.find_one({"name":"myName"}) # 查询name为myName的
		return data
 
	def find_data(self):
		''' 查询多个文档 '''
		# data = [d for d in self.collection.find()] # 查询所有
		data = [d for d in self.collection.find({"name":"myName"})] # 查询所有指定name的文档
		return data

	def find_limit_data(self):
		''' 根据条件查询数据:
			MongoDB中条件操作符有:
			(>) 	  大于 - $gt
			(<) 	  小于 - $lt
			(>=)  大于等于 - $gte
			(<= ) 小于等于 - $lte '''
		data = self.collection.find({"age":{"$gt": 12}}) # 查询年龄大于12岁的
		return data
 
	def get_size(self):
		''' 查询符合条件的文档条数 '''
		size = self.collection.find({"name":"myName"}).count()
		return size

	def get_names(self):
		''' 查询所有 name 的值,不重复的。返回list '''
		names = self.collection.distinct('name')
		return names
 
	def update_data(self):
		''' 修改文档 '''
		self.collection.update({"name":"myName"},{"$set":{"age":28}}) # 修改name为myName的年龄
 
	def delete_data(self):
		''' 删除文档 '''
		self.collection.remove({"name":"myName"}) # 删除name为myName的文档

三、SQLite 数据库

import sqlite3

class Sqlite:
	""" Sqlite 数据库的简单使用 """

	def __init__(self):
		# 1、打开 mydb.db 数据库,若不存在则创建
		self.conn = sqlite3.connect('D:\\mydb.db')
		# self.conn = sqlite3.connect(':memory:') # 在内存中创建数据库文件
		
	
	def create_table(self):
		''' 建表 '''
		# 创建游标对象
		cur = self.conn.cursor()
		sql = "create table tb_name (id integer primary key,name varchar(10),age integer)"
		cur.execute(sql)  	# 执行
		self.conn.commit() 	# 提交

	def insert_data(self):
		''' 插入数据 '''
		cur = self.conn.cursor()
		sql = "insert into tb_name values(?,?,?)"
		cur.execute(sql,(1,'myName',20))  	# 执行
		self.conn.commit() 					# 提交

	def select_data(self):
		''' 查询数据 '''
		cur = self.conn.cursor()
		sql = "select * from tb_name"
		cur.execute(sql)  	# 执行
		data = cur.fetchall() # 返回所有
		return data

	def update_data(self):
		''' 修改数据 '''
		cur = self.conn.cursor()
		sql = "update tb_name set name=? where id=?"
		cur.execute(sql,('isName',1))  	# 执行
		self.conn.commit()

	def delete_data(self):
		''' 删除数据 '''
		cur = self.conn.cursor()
		sql = "delete from tb_name where id=?"
		cur.execute(sql,(1,))  	# 执行
		self.conn.commit()

	def close_db(self):
		''' 关闭数据库连接 '''
		self.conn.close()

 

四、Redis 数据库

#在Redis中设置值,默认不存在则创建,存在则修改
安装 redis:pip install redis
import redis
1、存入数据
r = redis.Redis(host='localhost',port=6379,db=0)
r.set('name', 'zhangsan')
'''参数:
     set(name, value, ex=None, px=None, nx=False, xx=False)
     ex,过期时间(秒)
     px,过期时间(毫秒)
     nx,如果设置为True,则只有name不存在时,当前set操作才执行,同setnx(name, value)
     xx,如果设置为True,则只有name存在时,当前set操作才执行'''
setex(name, value, time)
#设置过期时间(秒)


psetex(name, time_ms, value)
#设置过期时间(豪秒)
mset()


#批量设置值
r.mset(name1='zhangsan', name2='lisi')
#或
r.mget({"name1":'zhangsan', "name2":'lisi'})
get(name)


2、获取数据


mget(keys, *args)
#批量获取
print(r.mget("name1","name2"))
#或
li=["name1","name2"]
print(r.mget(li))


getset(name, value)
#设置新值,打印原值
print(r.getset("name1","wangwu")) #输出:zhangsan
print(r.get("name1")) #输出:wangwu


getrange(key, start, end)
#根据字节获取子序列
r.set("name","zhangsan")
print(r.getrange("name",0,3))#输出:zhan


setrange(name, offset, value)
#修改字符串内容,从指定字符串索引开始向后替换,如果新值太长时,则向后添加
r.set("name","zhangsan")
r.setrange("name",1,"z")
print(r.get("name")) #输出:zzangsan
r.setrange("name",6,"xxx")

print(r.get("name")) #输出:zzangsxxx

import redis

class MyRedis:
	""" Redis 数据库简单使用 """
	def __init__(self):
	        # 1、建立 Redis连接
		# 第一种方式
		# self.red = redis.Redis(host='localhost',port=6379,db=0)
		# 第二种方式(以连接池的方式)
		pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
                self.red = redis.Redis(connection_pool=pool)  

	def insert_data(self):
		''' 插入数据 '''
		self.red.set("key","value")

	def get_data(self):
		''' 查询数据 '''
		data = self.red.get("key")
		return data

	def delete_data(self):
		''' 删除数据 '''
		self.red.delete('key')

 

Redis 之 消息推送

1、创建服务端用以发布数据

import redis
import time
from datetime import datetime

def resis_server(host,port):
	pool = redis.ConnectionPool(host=host,port=port,db=1)
	r = redis.StrictRedis(connection_pool=pool)
	count = 1
	while True:
		dt = datetime.now()
		print(dt) # 要发布的数据
		# 发布到指定频道
		r.publish('channel_name',dt)
		if count >= 10:
			# 只是为了测试,所以限制为10次后,发布结束通知
			r.publish('channel_name','end') 
			print('停止发布')
			break
		time.sleep(1)
		count += 1
		

if __name__ == '__main__':
	resis_server('192.168.88.64',6379)

2、创建客户端以接收数据

import redis

def redis_client(host,port):
	pool = redis.ConnectionPool(host=host,port=port,db=1)
	r = redis.StrictRedis(connection_pool=pool)
	p = r.pubsub()
	p.subscribe('channel_name') # 订阅频道
	for item in p.listen():
		print(item)  # 输出字典
		if item['type'] == 'message':
			data = item['data']
			print(data)  # 输出发送过来的消息
			if item['data'] == b'end':
				break
	p.unsubscribe('channel_name')
	print('取消订阅')

if __name__ == '__main__':
	redis_client('192.168.88.64',6379)

3、分别运行服务端与客户端,最终执行结果如下:(左边:服务端发布的消息,右边:客户的接收的消息)

python3 数据库的常用操作

 

 

 

五、使用 hdf5 快速存取数据

安装 h5py:pip install h5py

# w  以写入方式打开文件
# r  只读,文件必须已存在
# r+ 读写,文件必须已存在
# w  新建文件,若存在覆盖
# w- 或x,新建文件,若存在报错

# a  如存在则读写,不存在则创建(默认)

import h5py

def write_h5():
	''' 写入数据,写入一个list '''
	h5 = h5py.File('test.hdf5','w')
	h5['/test/t1'] = [1,2,3,4,5]
	h5.close()

def read_h5():
	''' 读取数据 '''
	h5 = h5py.File('test.hdf5','r')
	v = h5['/test/t1'].value
	h5.close()
	return v

if __name__ == '__main__':
	write_h5()
	v = read_h5()
	print(v)

执行结果:

[1 2 3 4 5]