欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python集成mongo、mysql、redis、oss、apollo、mq实用简单案例

程序员文章站 2022-07-12 16:37:40
...
python集成mongo、mysql、redis、oss、apollo、mq实用简单案例

     由于工作原因,需要临时使用python做一些业务,纠结一段时间配置环境,现在主要将
python集成mongo

python集成mysql以及连接池

python集成redis以及连接池

python集成oss

python集成apollo

python集成rocketmq

在这里做一个分享,纪念一下 顺便供需要用的同学少走弯路
以下的例子主要是在mac环境下基于python3.6使用,不过使用python3.7也是一样的只有微小差异,会在特殊之处添加注释

1、python集成mongo
1.1、python连接mongo环境准备
pip3 install pymongo

1.2、python连接mongo代码操作
#encoding: utf-8
from pymongo import MongoClient
#端口这里python3.6这里需要加int().python3.7这里不需要
client = MongoClient('mongo地址', int('mongo端口'))
db = client['具体库名']  # 进入数据库
db.authenticate('用户名', '密码')

#刚才获取到了连接对象 赋值给db,接下来可以操作
item = db['tb_movie_info'].find({"info_id" : "YS0000001"})
for rows in item:
    print(rows)



2、python集成mysql
2.1、python连接mysql环境准备
pip3 install PyMySQL
pip3 install DBUtils

2.2、python连接mysql包括连接池代码操作
import pymysql
from DBUtils.PooledDB import PooledDB

pool = PooledDB(
       creator=pymysql,
       mincached='最小连接数',
       host='地址',
       user='用户名',
       passwd='密码',
       db='数据库',
      #端口这里python3.6这里需要加int().python3.7这里不需要
       port=int('端口'),
      #字符集这里需要用utf8 不能用utf-8
       charset='字符集'
       )

conn = pool.connection(shareable=False)

#刚才获取到了连接对象 赋值给conn,接下来可以操作
#print(dir())
#print(__file__)
cursor =conn.cursor()  #获取一个光标
sql ='SELECT name from tb_common_info'

cursor.execute(sql)
results = cursor.fetchall()    #获取查询的所有记录
print("name")
for row in results :
    name = row[0]
    print(name)
#conn.commit()
cursor.close()
conn.close()


3、python集成redis
3.1、python连接redis环境准备
pip3 install redis
pip3 install python-redis

3.2、python连接redis以及连接池代码操作
import redis

pool = redis.ConnectionPool(
        host='地址',
        port='端口',
        max_connections='连接数辆',
        db='db',
        password='密码'
        )

conn = redis.Redis(connection_pool=pool)
#刚才获取到了连接对象 赋值给conn,接下来可以操作
conn.set('name','111') #存储值
print(conn.get('name').decode('utf-8'))#获取值


4、python集成oss
4.1、python连接oss环境准备
pip3 install oss2(包较大 容易超时)

4.2、python连接oss代码操作
import oss2

auth = oss2.Auth('accessKey', 'accessKeySecret')
bucket = oss2.Bucket(auth, 'url', 'bucket')

#下面有下载和上传两个案例

#下载文件(下载路径就要指定好文件名,可以不一样,但不影响文件内容)
#第一个参数是oss路径,第二个参数是本地文件路径
#get_result=bucket.get_object_to_file('dev/vision/image/a.png', '/Users/Documents/patronli/a.png')
#if get_result.status == 200:
#    print("get sucess")

#上传文件(下载路径就要指定好文件名,可以不一样,但不影响文件内容)
#第一个参数是oss路径,第二个参数是本地文件路径
put_result = bucket.put_object_from_file('dev/vision/python/a.jpg', '/Users/Documents/patronli/a.png')
if put_result.status == 200:
    print('put success')


5、python集成apollo
5.1、python连接apollo环境准备
pip3 install pyapollos

5.2、python连接apollo代码操作
from pyapollos import ApolloClient
#config_server_url 我一直尝试用域名的方式不成功,所以这里暂时用ip+端口
apollo = ApolloClient(app_id='os-operation-platform',cluster='cluster',config_server_url='地址+端口')

print(apollo.get_value('spider.mysql.datasource.url', namespace='application'))


6、python集成rocketmq(这个是阿里云的案例)
6.1、python连接rocketmq环境准备
pip3 install rocketmq-client-python

6.2、python连接rocketmq代码操作
6.2.1、consumer.py
from rocketmq.client import PushConsumer, ConsumeStatus

import time

# 发送消息时请设置您在阿里云 RocketMQ 控制台上申请的 Topic
topic = 'topic'
# 您在阿里云 RocketMQ 控制台上申请的 GID
gid =  'groupId'
# 设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取
name_srv =  'name_srv'
# 您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证
ak =  'accessKey'
# 您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证
sk =  'secretKey'
# 用户渠道,默认值为:ALIYUN
channel =  ALIYUN

def callback(msg):
    print(msg.id, msg.body)
    # 消费成功回复 CONSUME_SUCCESS,消费失败回复 RECONSUME_LATER。此时会触发消费重试
    return ConsumeStatus.CONSUME_SUCCESS


def start_consume_message():
    consumer = PushConsumer(gid)
    consumer.set_name_server_address(name_srv)
    consumer.set_session_credentials(ak, sk, channel)
    consumer.subscribe(topic, callback)
    # ********************************************
    # 1. 确保订阅关系的设置在启动之前完成
    # 2. 确保相同 GID 下面的消费者的订阅关系一致
    # *********************************************
    print ('start consume message')
    consumer.start()

    # 请保持消费者一直处于运行状态
    while True:
       time.sleep(3600)


if __name__ == '__main__':
    start_consume_message()

6.2.2、producer.py
from rocketmq.client import Producer, Message
from conf import config


# 发送消息时请设置您在阿里云 RocketMQ 控制台上申请的 Topic
topic ='topic'
# 您在阿里云 RocketMQ 控制台上申请的 GID
gid ='groupId'
# 设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取
name_srv = 'name_srv'
# 您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证
ak ='accessKey'
# 您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证
sk ='secretKey'
# 用户渠道,默认值为:ALIYUN
channel = ALIYUN


def create_message():
    msg = Message(topic)
    msg.set_keys('YourKey')
    msg.set_tags('YourTags')
    msg.set_body('11111')
    return msg


def send_message_sync(count):
    producer = Producer(gid)
    producer.set_name_server_address(name_srv)
    producer.set_session_credentials(ak,sk,channel)
    producer.start()
    for n in range(count):
        msg = create_message()
        ret = producer.send_sync(msg)
        print ('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
    print ('send sync message done')
    producer.shutdown()
    producer.destroy()

if __name__ == '__main__':
    send_message_sync(10)