python集成mongo、mysql、redis、oss、apollo、mq实用简单案例
程序员文章站
2022-07-12 16:38:22
...
python集成mongo、mysql、redis、oss、apollo、mq实用简单案例
由于工作原因,需要临时使用python做一些业务,纠结一段时间配置环境,现在主要将
python集成mongo
python集成mysql以及连接池
python集成redis以及连接池
python集成oss
python集成apollo
python集成rocketmq
在这里做一个分享,纪念一下 顺便供需要用的同学少走弯路
以下的例子主要是在mac环境下基于python3.6使用,不过使用python3.7也是一样的只有微小差异,会在特殊之处添加注释
1、python集成mongo
1.1、python连接mongo环境准备
pip3 install pymongo
1.2、python连接mongo代码操作
#encoding: utf-8 from pymongo import MongoClient #端口这里python3.6这里需要加int().python3.7这里不需要 client = MongoClient('mongo地址', int('mongo端口')) db = client['具体库名'] # 进入数据库 db.authenticate('用户名', '密码') #刚才获取到了连接对象 赋值给db,接下来可以操作 item = db['tb_movie_info'].find({"info_id" : "YS0000001"}) for rows in item: print(rows)
2、python集成mysql
2.1、python连接mysql环境准备
pip3 install PyMySQL pip3 install DBUtils
2.2、python连接mysql包括连接池代码操作
import pymysql from DBUtils.PooledDB import PooledDB pool = PooledDB( creator=pymysql, mincached='最小连接数', host='地址', user='用户名', passwd='密码', db='数据库', #端口这里python3.6这里需要加int().python3.7这里不需要 port=int('端口'), #字符集这里需要用utf8 不能用utf-8 charset='字符集' ) conn = pool.connection(shareable=False) #刚才获取到了连接对象 赋值给conn,接下来可以操作 #print(dir()) #print(__file__) cursor =conn.cursor() #获取一个光标 sql ='SELECT name from tb_common_info' cursor.execute(sql) results = cursor.fetchall() #获取查询的所有记录 print("name") for row in results : name = row[0] print(name) #conn.commit() cursor.close() conn.close()
3、python集成redis
3.1、python连接redis环境准备
pip3 install redis pip3 install python-redis
3.2、python连接redis以及连接池代码操作
import redis pool = redis.ConnectionPool( host='地址', port='端口', max_connections='连接数辆', db='db', password='密码' ) conn = redis.Redis(connection_pool=pool) #刚才获取到了连接对象 赋值给conn,接下来可以操作 conn.set('name','111') #存储值 print(conn.get('name').decode('utf-8'))#获取值
4、python集成oss
4.1、python连接oss环境准备
pip3 install oss2(包较大 容易超时)
4.2、python连接oss代码操作
import oss2 auth = oss2.Auth('accessKey', 'accessKeySecret') bucket = oss2.Bucket(auth, 'url', 'bucket') #下面有下载和上传两个案例 #下载文件(下载路径就要指定好文件名,可以不一样,但不影响文件内容) #第一个参数是oss路径,第二个参数是本地文件路径 #get_result=bucket.get_object_to_file('dev/vision/image/a.png', '/Users/Documents/patronli/a.png') #if get_result.status == 200: # print("get sucess") #上传文件(下载路径就要指定好文件名,可以不一样,但不影响文件内容) #第一个参数是oss路径,第二个参数是本地文件路径 put_result = bucket.put_object_from_file('dev/vision/python/a.jpg', '/Users/Documents/patronli/a.png') if put_result.status == 200: print('put success')
5、python集成apollo
5.1、python连接apollo环境准备
pip3 install pyapollos
5.2、python连接apollo代码操作
from pyapollos import ApolloClient #config_server_url 我一直尝试用域名的方式不成功,所以这里暂时用ip+端口 apollo = ApolloClient(app_id='os-operation-platform',cluster='cluster',config_server_url='地址+端口') print(apollo.get_value('spider.mysql.datasource.url', namespace='application'))
6、python集成rocketmq(这个是阿里云的案例)
6.1、python连接rocketmq环境准备
pip3 install rocketmq-client-python
6.2、python连接rocketmq代码操作
6.2.1、consumer.py
from rocketmq.client import PushConsumer, ConsumeStatus import time # 发送消息时请设置您在阿里云 RocketMQ 控制台上申请的 Topic topic = 'topic' # 您在阿里云 RocketMQ 控制台上申请的 GID gid = 'groupId' # 设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取 name_srv = 'name_srv' # 您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证 ak = 'accessKey' # 您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证 sk = 'secretKey' # 用户渠道,默认值为:ALIYUN channel = ALIYUN def callback(msg): print(msg.id, msg.body) # 消费成功回复 CONSUME_SUCCESS,消费失败回复 RECONSUME_LATER。此时会触发消费重试 return ConsumeStatus.CONSUME_SUCCESS def start_consume_message(): consumer = PushConsumer(gid) consumer.set_name_server_address(name_srv) consumer.set_session_credentials(ak, sk, channel) consumer.subscribe(topic, callback) # ******************************************** # 1. 确保订阅关系的设置在启动之前完成 # 2. 确保相同 GID 下面的消费者的订阅关系一致 # ********************************************* print ('start consume message') consumer.start() # 请保持消费者一直处于运行状态 while True: time.sleep(3600) if __name__ == '__main__': start_consume_message()
6.2.2、producer.py
from rocketmq.client import Producer, Message from conf import config # 发送消息时请设置您在阿里云 RocketMQ 控制台上申请的 Topic topic ='topic' # 您在阿里云 RocketMQ 控制台上申请的 GID gid ='groupId' # 设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取 name_srv = 'name_srv' # 您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证 ak ='accessKey' # 您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证 sk ='secretKey' # 用户渠道,默认值为:ALIYUN channel = ALIYUN def create_message(): msg = Message(topic) msg.set_keys('YourKey') msg.set_tags('YourTags') msg.set_body('11111') return msg def send_message_sync(count): producer = Producer(gid) producer.set_name_server_address(name_srv) producer.set_session_credentials(ak,sk,channel) producer.start() for n in range(count): msg = create_message() ret = producer.send_sync(msg) print ('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) print ('send sync message done') producer.shutdown() producer.destroy() if __name__ == '__main__': send_message_sync(10)