欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python 实现批量查询IP并解析为归属地

程序员文章站 2024-01-29 20:57:10
环境配置db_udw.envdb_udw.env 如下: 最后 我接触Python时间也不是很久,实现方法可能会有疏漏。如果有什么疑问和见解,欢迎评论区交流。 ......
一、背景:
最近工作中做了一个小功能,目的是为了分析注册用户区域分布和订单的区域分布情况。所以需要将其对应的ip信息解析为归属地,并同步每天同步更新。
线上跑起来效率还是有优化的空间,优化的方向:在调用ip查询api过程可以调整为多线程并行解析ip。后续会更新这方便的调整。
技术: pyhton3
postgresql
env配置文件

附加信息:ip地址查询(ip138官方企业版):https://market.aliyun.com/products/56928004/cmapi015606.html#sku=yuncode960600002
.可提供免费的ip查询api.

二、实现思路: 1、 读取数据库ip信息
2、 调用第三方ip解析api进行解析
3、 将解析归属地信息存入数据库
三、几点说明: 1、环境信息等参数配置
2、日志输出
3、异常处理: 数据库连接异常
请求连接查询ip的url异常:http error 503
4、json,字典,数组等类型数据输入输出
5、分页查询并批量解析
5.功能实现很简单,所以就没有做详细的介绍了。详情可直接看完整代码,有详细的备注。

四、步骤简单介绍:
针对实现思路的3个步骤写了3个函数,彼此调用执行。
函数:
def get_ip_info(table_name):
def get_ip_area(table_name):
def ip_write_db(table_name):
调用:
ip_write_db("h_user_stat")

五、关键代码说明:
语法:urllib.request.urlopen(url, data=none, [timeout, ]*, cafile=none, capath=none, cadefault=false, context=none)
 # 对从数据库表中出查询的ip进行解析
       querys = 'callback&datatype=jsonp&ip=' + get_ip
       bodys = {}
       url = host + path + '?' + querys
       request = urllib.request.request(url)
       request.add_header('authorization', 'appcode ' + appcode)

       # 连接url时可能会出现 error: http error 503: service unavailable
       try: 
         response = urllib.request.urlopen(request)
       except exception as e:
         logging.error(e) # 输出异常日志信息 
         time.sleep(5)
         response = urllib.request.urlopen(request)
       finally:
         content = response.read()
         ip_area = content.decode('utf8')
         ip_area = json.loads(ip_area)['data'] # json类型转字典类型并取'data'健值
         arr.append([get_ip, ip_area]) # 将结果集存于二元数组
说明:从数据库分页查询固定数量的ip存入数组,并遍历该数组并将解析后的地区信息data健值存于二元数组中。

 

 
六、python代码实现如下:

  1 # 导入psycopg2包
  2 import psycopg2, time,datetime,sys
  3 import json
  4 import urllib, urllib.request
  5 import os
  6 import configparser
  7 import logging
  8                      # purpose: 连接数据库读取表ip信息
  9 def get_ip_info(table_name):
 10     # 全局变量作用局部作用域
 11     global pagesize # 每页查询数据条数
 12     global rows_count
 13 
 14     # 测试1
 15     starttime_1 = time.time()
 16     # 建立游标,用来执行数据库操作
 17     cur = conn.cursor()
 18     # 执行sql命令
 19     cur.execute("select remote_ip from (select remote_ip,min(created_at) from " + table_name + " group by remote_ip) h1 where remote_ip is not null and remote_ip <> '' and  not exists (select 1 from d_ip_area_mapping h2 where h1.remote_ip = h2.remote_ip) limit " + str(pagesize) + ";")
 20 
 21 
 22     # 获取结果集条数
 23     rows_count = cur.rowcount
 24    
 25     # print('解析用户ip的总数:' + str(rows_count))
 26 
 27      # 当有未解析的用户的ip,返回元组,否则退出程序
 28     if rows_count > 0:
 29       # 获取select返回的元组
 30       rows =  cur.fetchall()        # all rows in table
 31 
 32       for row in rows:
 33           tuple = rows
 34 
 35       conn.commit()
 36       # 关闭游标
 37       cur.close()
 38 
 39     else:
 40         tuple = []
 41     logging.info('每页查询秒数:' + str(time.time() - starttime_1))
 42     return tuple
 43     # 调用解析函数
 44 
 45 
 46 def get_ip_area(table_name):
 47   # 内包含用户id和ip的数组的元组
 48   tuple = get_ip_info(table_name)  
 49 
 50   # 测试2
 51   starttime_2 = time.time()
 52   host = 'http://ali.ip138.com'
 53   path = '/ip/'
 54   method = 'get'
 55   appcode = '917058e6d7c84104b7cab9819de54b6e'
 56   arr = []
 57   for row in tuple:
 58 
 59        get_ip = row[0]
 60        #get_user = "".join(str(row))
 61        #get_user = row[0]
 62 
 63             # 对从数据库表中出查询的ip进行解析
 64        querys = 'callback&datatype=jsonp&ip=' + get_ip
 65        bodys = {}
 66        url = host + path + '?' + querys
 67        request = urllib.request.request(url)
 68        request.add_header('authorization', 'appcode ' + appcode)
 69 
 70        # 连接url时可能会出现 error: http error 503: service unavailable
 71        try: 
 72          response = urllib.request.urlopen(request)
 73        except exception as e:
 74          logging.error(e) # 输出异常日志信息 
 75          time.sleep(5)
 76          response = urllib.request.urlopen(request)
 77        finally:
 78          content = response.read()
 79          ip_area = content.decode('utf8')
 80          ip_area = json.loads(ip_area)['data'] # json类型转字典类型并取'data'健值
 81          arr.append([get_ip, ip_area]) # 将结果集存于二元数组
 82   logging.info('每页解析秒数:' + str(time.time() - starttime_2))
 83   return  arr
 84 
 85 
 86 def ip_write_db(table_name):
 87    
 88     write_ip = get_ip_area(table_name)  # 内包含用户id和ip的数组的元组
 89 
 90 
 91     # 测试1
 92     starttime_3 = time.time()
 93 
 94      # 建立游标,用来执行数据库操作
 95     cur = conn.cursor()
 96     for row in write_ip:
 97         # get_user = row[0]  # 获取用户id
 98         get_ip = row[0]  # 获取用户对应的ip
 99         country = row[1][0]  # 获取ip解析后的地区:国家
100         province = row[1][1]  # 获取ip解析后的地区:省
101         city = row[1][2]  # 获取ip解析后的地区:市
102         isp = row[1][3]  # 获取ip解析后的服务提供商
103 
104         # 执行sql命令
105         sql = "insert into d_ip_area_mapping(remote_ip,country,province,city,isp,created_at,updated_at,job_id) values (%s,%s,%s,%s,%s,%s,%s,%s);"
106         val = [get_ip, country, province, city, isp, time.strftime("%y-%m-%d %h:%m:%s", time.localtime()),
107                time.strftime("%y-%m-%d %h:%m:%s", time.localtime()),time.strftime("%y-%m-%d",time.localtime())]
108         
109         cur.execute(sql, val)
110         conn.commit()    
111     # 关闭游标
112     cur.close()
113     logging.info('每页插入秒数:' + str(time.time() - starttime_3))
114 
115 
116 # 1.程序开始执行计时
117 starttime = time.time()
118 
119 
120      # 读取配置文件环境信息 
121 
122 # 项目路径
123 rootdir = os.path.split(os.path.realpath(__file__))[0]
124 
125 
126 ############################### config.env文件路径  #############################################################
127 
128 configfilepath = os.path.join(rootdir, 'db_udw.env')
129 config = configparser.configparser()
130 config.read(configfilepath)
131 
132 # 读取数据库环境信息
133 db_database = config.get('postgresql','database')
134 db_user = config.get('postgresql','user')
135 db_password = config.get('postgresql','password')
136 db_host = config.get('postgresql','host')
137 db_port = config.get('postgresql','port')
138 
139 # 读取输出日志路径
140 log = config.get('log','log_path')
141 
142 # 每页查询数据条数
143 pagesize = config.get('page','pagesize') 
144 
145 # 读取解析ip条数限制
146 ip_num_limit = config.get('ip_num','ip_num_limit') 
147 
148 # 配置输出日志格式
149 logging.basicconfig(level=logging.debug,#控制台打印的日志级别
150                       filename='{my_log_path}/ip_analyzer.log'.format(my_log_path=log),  # 指定日志文件及路径
151                       filemode='a',##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志 #a是追加模式,默认如果不写的话,就是追加模式
152                       format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'#日志格式
153                       )
154 
155 ###############################   程序开始执行  #############################################################
156 try:
157 
158   # 连接到一个给定的数据库
159   conn = psycopg2.connect(database=db_database, user=db_user, password=db_password, host=db_host, port=db_port)
160 except exception as e:
161   logging.error(e) # 输出连接异常日志信息
162 
163 # 返回查询行数 默认为0
164 rows_count = 0
165  # 用户表ip解析总数
166 user_ip_num = 0 
167  # 订单表ip解析总数
168 order_ip_num = 0 
169 
170 
171 
172 try:
173 
174   # 解析用户表注册ip信息
175   while user_ip_num <= eval(ip_num_limit):
176      i = 1  # 循环次数
177      ip_write_db("h_user_stat")
178      user_ip_num = user_ip_num + rows_count*i
179      i  = i + 1
180      if rows_count == 0 :
181          break
182 
183   # 解析订单表下单ip信息 
184   while user_ip_num <= eval(ip_num_limit):
185       # 解析用户表注册ip信息
186       i = 1  # 循环次数
187       ip_write_db("h_order")
188       order_ip_num = order_ip_num + rows_count*i
189       i = i + 1
190       if rows_count == 0 :
191          break
192 except exception as e:
193   logging.error(e) # 输出异常日志信息
194 finally:
195   # 关闭数据库连接
196   conn.close()
197 
198 # 2 程序结束执行计时
199   endtime = time.time()
200   
201   # print('解析用户ip的总数:' + str(user_ip_num))
202   # print('解析订单ip的总数:' + str(order_ip_num))
203   # # 打印程序执行总耗时
204   # print('解析总耗时秒数:' + str(endtime - starttime))
205   logging.info('解析用户ip的总数:' + str(user_ip_num))
206   logging.info('解析订单ip的总数:' + str(order_ip_num))
207   logging.info('解析总耗时秒数:' + str(endtime - starttime))


 环境配置db_udw.envdb_udw.env 如下:

# 数据库环境信息
[postgresql]
database = ahaschool_udw
user = admin
password = 123456
host = 127.0.0.0
port = 5432

# 设置日志文件路径
[log]
log_path = /home/hjmrunning/bi_etl_product/scripts/log

# 每页查询数据条数
[page]
pagesize = 1000  

# ip解析条数限制
[ip_num]
ip_num_limit = 150000

最后

    我接触python时间也不是很久,实现方法可能会有疏漏。如果有什么疑问和见解,欢迎评论区交流。