Python 实现批量查询IP并解析为归属地
程序员文章站
2022-05-17 16:51:18
环境配置db_udw.envdb_udw.env 如下: 最后 我接触Python时间也不是很久,实现方法可能会有疏漏。如果有什么疑问和见解,欢迎评论区交流。 ......
一、背景:
最近工作中做了一个小功能,目的是为了分析注册用户区域分布和订单的区域分布情况。所以需要将其对应的ip信息解析为归属地,并同步每天同步更新。
线上跑起来效率还是有优化的空间,优化的方向:在调用ip查询api过程可以调整为多线程并行解析ip。后续会更新这方便的调整。
技术: pyhton3
postgresql
env配置文件
附加信息:ip地址查询(ip138官方企业版):https://market.aliyun.com/products/56928004/cmapi015606.html#sku=yuncode960600002
.可提供免费的ip查询api.
二、实现思路: 1、 读取数据库ip信息
2、 调用第三方ip解析api进行解析
3、 将解析归属地信息存入数据库
三、几点说明: 1、环境信息等参数配置
2、日志输出
3、异常处理: 数据库连接异常
请求连接查询ip的url异常:http error 503
4、json,字典,数组等类型数据输入输出
5、分页查询并批量解析
5.功能实现很简单,所以就没有做详细的介绍了。详情可直接看完整代码,有详细的备注。
四、步骤简单介绍:
针对实现思路的3个步骤写了3个函数,彼此调用执行。
函数:
def get_ip_info(table_name):
def get_ip_area(table_name):
def ip_write_db(table_name):
调用:
ip_write_db("h_user_stat")
五、关键代码说明:
语法:urllib.request.urlopen(url, data=none, [timeout, ]*, cafile=none, capath=none, cadefault=false, context=none)
# 对从数据库表中出查询的ip进行解析
querys = 'callback&datatype=jsonp&ip=' + get_ip
bodys = {}
url = host + path + '?' + querys
request = urllib.request.request(url)
request.add_header('authorization', 'appcode ' + appcode)
# 连接url时可能会出现 error: http error 503: service unavailable
try:
response = urllib.request.urlopen(request)
except exception as e:
logging.error(e) # 输出异常日志信息
time.sleep(5)
response = urllib.request.urlopen(request)
finally:
content = response.read()
ip_area = content.decode('utf8')
ip_area = json.loads(ip_area)['data'] # json类型转字典类型并取'data'健值
arr.append([get_ip, ip_area]) # 将结果集存于二元数组
说明:从数据库分页查询固定数量的ip存入数组,并遍历该数组并将解析后的地区信息data健值存于二元数组中。
六、python代码实现如下:
1 # 导入psycopg2包 2 import psycopg2, time,datetime,sys 3 import json 4 import urllib, urllib.request 5 import os 6 import configparser 7 import logging 8 # purpose: 连接数据库读取表ip信息 9 def get_ip_info(table_name): 10 # 全局变量作用局部作用域 11 global pagesize # 每页查询数据条数 12 global rows_count 13 14 # 测试1 15 starttime_1 = time.time() 16 # 建立游标,用来执行数据库操作 17 cur = conn.cursor() 18 # 执行sql命令 19 cur.execute("select remote_ip from (select remote_ip,min(created_at) from " + table_name + " group by remote_ip) h1 where remote_ip is not null and remote_ip <> '' and not exists (select 1 from d_ip_area_mapping h2 where h1.remote_ip = h2.remote_ip) limit " + str(pagesize) + ";") 20 21 22 # 获取结果集条数 23 rows_count = cur.rowcount 24 25 # print('解析用户ip的总数:' + str(rows_count)) 26 27 # 当有未解析的用户的ip,返回元组,否则退出程序 28 if rows_count > 0: 29 # 获取select返回的元组 30 rows = cur.fetchall() # all rows in table 31 32 for row in rows: 33 tuple = rows 34 35 conn.commit() 36 # 关闭游标 37 cur.close() 38 39 else: 40 tuple = [] 41 logging.info('每页查询秒数:' + str(time.time() - starttime_1)) 42 return tuple 43 # 调用解析函数 44 45 46 def get_ip_area(table_name): 47 # 内包含用户id和ip的数组的元组 48 tuple = get_ip_info(table_name) 49 50 # 测试2 51 starttime_2 = time.time() 52 host = 'http://ali.ip138.com' 53 path = '/ip/' 54 method = 'get' 55 appcode = '917058e6d7c84104b7cab9819de54b6e' 56 arr = [] 57 for row in tuple: 58 59 get_ip = row[0] 60 #get_user = "".join(str(row)) 61 #get_user = row[0] 62 63 # 对从数据库表中出查询的ip进行解析 64 querys = 'callback&datatype=jsonp&ip=' + get_ip 65 bodys = {} 66 url = host + path + '?' + querys 67 request = urllib.request.request(url) 68 request.add_header('authorization', 'appcode ' + appcode) 69 70 # 连接url时可能会出现 error: http error 503: service unavailable 71 try: 72 response = urllib.request.urlopen(request) 73 except exception as e: 74 logging.error(e) # 输出异常日志信息 75 time.sleep(5) 76 response = urllib.request.urlopen(request) 77 finally: 78 content = response.read() 79 ip_area = content.decode('utf8') 80 ip_area = json.loads(ip_area)['data'] # json类型转字典类型并取'data'健值 81 arr.append([get_ip, ip_area]) # 将结果集存于二元数组 82 logging.info('每页解析秒数:' + str(time.time() - starttime_2)) 83 return arr 84 85 86 def ip_write_db(table_name): 87 88 write_ip = get_ip_area(table_name) # 内包含用户id和ip的数组的元组 89 90 91 # 测试1 92 starttime_3 = time.time() 93 94 # 建立游标,用来执行数据库操作 95 cur = conn.cursor() 96 for row in write_ip: 97 # get_user = row[0] # 获取用户id 98 get_ip = row[0] # 获取用户对应的ip 99 country = row[1][0] # 获取ip解析后的地区:国家 100 province = row[1][1] # 获取ip解析后的地区:省 101 city = row[1][2] # 获取ip解析后的地区:市 102 isp = row[1][3] # 获取ip解析后的服务提供商 103 104 # 执行sql命令 105 sql = "insert into d_ip_area_mapping(remote_ip,country,province,city,isp,created_at,updated_at,job_id) values (%s,%s,%s,%s,%s,%s,%s,%s);" 106 val = [get_ip, country, province, city, isp, time.strftime("%y-%m-%d %h:%m:%s", time.localtime()), 107 time.strftime("%y-%m-%d %h:%m:%s", time.localtime()),time.strftime("%y-%m-%d",time.localtime())] 108 109 cur.execute(sql, val) 110 conn.commit() 111 # 关闭游标 112 cur.close() 113 logging.info('每页插入秒数:' + str(time.time() - starttime_3)) 114 115 116 # 1.程序开始执行计时 117 starttime = time.time() 118 119 120 # 读取配置文件环境信息 121 122 # 项目路径 123 rootdir = os.path.split(os.path.realpath(__file__))[0] 124 125 126 ############################### config.env文件路径 ############################################################# 127 128 configfilepath = os.path.join(rootdir, 'db_udw.env') 129 config = configparser.configparser() 130 config.read(configfilepath) 131 132 # 读取数据库环境信息 133 db_database = config.get('postgresql','database') 134 db_user = config.get('postgresql','user') 135 db_password = config.get('postgresql','password') 136 db_host = config.get('postgresql','host') 137 db_port = config.get('postgresql','port') 138 139 # 读取输出日志路径 140 log = config.get('log','log_path') 141 142 # 每页查询数据条数 143 pagesize = config.get('page','pagesize') 144 145 # 读取解析ip条数限制 146 ip_num_limit = config.get('ip_num','ip_num_limit') 147 148 # 配置输出日志格式 149 logging.basicconfig(level=logging.debug,#控制台打印的日志级别 150 filename='{my_log_path}/ip_analyzer.log'.format(my_log_path=log), # 指定日志文件及路径 151 filemode='a',##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志 #a是追加模式,默认如果不写的话,就是追加模式 152 format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'#日志格式 153 ) 154 155 ############################### 程序开始执行 ############################################################# 156 try: 157 158 # 连接到一个给定的数据库 159 conn = psycopg2.connect(database=db_database, user=db_user, password=db_password, host=db_host, port=db_port) 160 except exception as e: 161 logging.error(e) # 输出连接异常日志信息 162 163 # 返回查询行数 默认为0 164 rows_count = 0 165 # 用户表ip解析总数 166 user_ip_num = 0 167 # 订单表ip解析总数 168 order_ip_num = 0 169 170 171 172 try: 173 174 # 解析用户表注册ip信息 175 while user_ip_num <= eval(ip_num_limit): 176 i = 1 # 循环次数 177 ip_write_db("h_user_stat") 178 user_ip_num = user_ip_num + rows_count*i 179 i = i + 1 180 if rows_count == 0 : 181 break 182 183 # 解析订单表下单ip信息 184 while user_ip_num <= eval(ip_num_limit): 185 # 解析用户表注册ip信息 186 i = 1 # 循环次数 187 ip_write_db("h_order") 188 order_ip_num = order_ip_num + rows_count*i 189 i = i + 1 190 if rows_count == 0 : 191 break 192 except exception as e: 193 logging.error(e) # 输出异常日志信息 194 finally: 195 # 关闭数据库连接 196 conn.close() 197 198 # 2 程序结束执行计时 199 endtime = time.time() 200 201 # print('解析用户ip的总数:' + str(user_ip_num)) 202 # print('解析订单ip的总数:' + str(order_ip_num)) 203 # # 打印程序执行总耗时 204 # print('解析总耗时秒数:' + str(endtime - starttime)) 205 logging.info('解析用户ip的总数:' + str(user_ip_num)) 206 logging.info('解析订单ip的总数:' + str(order_ip_num)) 207 logging.info('解析总耗时秒数:' + str(endtime - starttime))
环境配置db_udw.envdb_udw.env 如下:
# 数据库环境信息
[postgresql]
database = ahaschool_udw
user = admin
password = 123456
host = 127.0.0.0
port = 5432
# 设置日志文件路径
[log]
log_path = /home/hjmrunning/bi_etl_product/scripts/log
# 每页查询数据条数
[page]
pagesize = 1000
# ip解析条数限制
[ip_num]
ip_num_limit = 150000
最后
我接触python时间也不是很久,实现方法可能会有疏漏。如果有什么疑问和见解,欢迎评论区交流。