【案例】构建动态 sql 实现查询数据写入 excel 模版
程序员文章站
2022-07-04 16:25:37
...
工具
Python 3.6
Win 7
Spyder
SQL Server 2014 Management Studio
Python 实现代码
import os
import time
import functools
import pandas as pd
from auto_ave import getPath
from datetime import datetime, timedelta, date
now = lambda : time.perf_counter()
dat = lambda n: date.today() - timedelta(n)
D = dat(1)
def log(func):
@functools.wraps(func)
def wrapper(*args, **kw):
print("Call %s():" % func.__name__)
return func(*args, **kw)
return wrapper
def getQ(dat):
# 返回给定日期所在季度
# 目的:剔除非本季度的消费
if isinstance(dat, date):
m = dat.month
if m in (1, 2, 3):
return 'Q1'
elif m in (4, 5, 6):
return 'Q2'
elif m in (7, 8, 9):
return 'Q3'
else:
return 'Q4'
else:
raise
def connect():
def loginInfo(section):
from configparser import ConfigParser
path = r'H:\SZ_数据\Python\c.s.conf'
#path = r'C:\users\chen.huaiyu\Chinasearch\c.s.conf'
conf = ConfigParser()
conf.read(path)
return (conf.get(section, 'acc'), conf.get(section, 'pw')
, conf.get(section, 'ip'), conf.get(section, 'port')
, conf.get(section, 'dbname'))
from sqlalchemy import create_engine
try:
engine = create_engine(
'mssql+pymssql://%s:%[email protected]%s:%s/%s' % loginInfo('Output'))
except Exception as e:
print('连接失败: %s' % e)
raise
else:
return engine
@log
def output(version):
def outputFilList(version):
'''
构建 output 文件列表
以模版为基础新建文件夹、复制文件、修改名称
Returns
-------
None.
'''
def sFilName():
for d in range(2, 60):
filName = dat(d).strftime('%Y%m%d v1')
if os.path.isdir(os.path.join(path, filName)):
return filName, dat(d).year, getQ(dat(d))
def source():
return os.path.join(path, sFilName()[0])
def tFilName():
return D.strftime('%Y%m%d ' + version)
def target():
return os.path.join(path, tFilName())
def cpDir():
import shutil
lis = []
sName, sYear, sQuar = sFilName()
sourceDir = source()
if not os.path.exists(target()):
shutil.copytree(sourceDir, target())
for f in os.listdir(sourceDir):
s = os.path.join(target(), f)
# modify new name
f = f.replace(sName, tFilName())
f = f.replace(str(sYear), str(D.year)) # year
f = f.replace(sQuar, getQ(D)) # quarter
t = os.path.join(target(), f)
lis.append(t)
os.rename(s, t)
return lis
path = r'H:\SZ_数据\Output'
return cpDir()
def data(conn, sql):
return list(map(lambda x: list(x), conn.execute(sql).fetchall()))
def shtNameLis(table):
strDat = D.strftime('%Y%m%d')
return (table + strDat, table+strDat+'_1', table+strDat+'_2')
def getRows(sht, row, col):
# 获取行数
return sht[row, col].current_region.rows.count
def getCols(sht, row, col):
# 获取列数
return sht[row, col].current_region.columns.count
def getRng(sht, row, col):
# 数据首行区域 & 全部区域
cntRow = getRows(sht, row, col)
cntCol = getCols(sht, row, col)
return sht[row, col:col+cntCol], sht[row:row+cntRow, col:col+cntCol]
def getRng2(sht, newsht, row, col):
# 外部表与当前表对应区域
cntRow = getRows(sht, row, col)
cntCol = getCols(sht, row, col)
if getRows(newsht, row, col) < cntRow:
return newsht[row, col:col+cntCol], newsht[row:row+cntRow-1
, col:col+cntCol]
def getRng3(sht, row, col1, col2):
# 获取指定区域
cntRow = getRows(sht, row, col1)
return sht[row, col1:col2], sht[row:row+cntRow-3, col1:col2]
def clear2(sht, row, col1, col2):
# 清空 指定区域
cntRow = getRows(sht, row, col1)
rng = sht[row:row+cntRow, col1:col2]
rng.clear()
def clear(sht, row, col):
# 清空 所有区域
_, rng = getRng(sht, row, col)
rng.clear()
def write(sht, row, col, conn, sql):
# 指定位置写入数据
sht[row, col].value = data(conn, sql)
def sqlSpendForecast(lis):
return "EXEC dbo.pr_spendForecast '%s', '%s', '%s'" % lis
def sqlSpendForecastV(lis):
return "EXEC dbo.pr_spendForecastV1 '%s', '%s' " % lis
def sqlCashForecast(lis):
return ''' EXEC dbo.pr_cashSpendForecast '%s', '%s', '%s', '%s', '%s'
, '%s', '%s' ''' % lis
def sqlHandlingFee(lis):
return ''' EXEC dbo.pr_handlingFee '%s', '%s', '%s', '%s', '%s', '%s'
, '%s' ''' % lis
def sqlHandlingFeeDetails(t):
return ''' EXEC dbo.pr_handlingFeeDetails %s ''' % t
def sqlSalesForecast(lis):
return ''' EXEC dbo.pr_salesForecast '%s', '%s' ''' % lis
def sqlGPRatio(lis):
return ''' EXEC dbo.pr_GPRatio '%s', '%s' ''' % lis
def sqlSalesTracking(lis):
return ''' EXEC dbo.pr_salesTracking '%s', '%s', '%s', '%s'
, '%s' ''' % lis
def sqlAMTracking(lis):
return ''' EXEC dbo.pr_AMTracking '%s', '%s', '%s', '%s', '%s'
, '%s' ''' % lis
def sqlCreateFAF(lis):
return ''' EXEC dbo.pr_createFAF '%s', '%s', '%s', '%s', '%s', '%s'
, '%s' ''' % lis
def sqlFAF(lis):
return ''' EXEC dbo.pr_FAF '%s', '%s', '%s', '%s', '%s', '%s' ''' % lis
def getMonth():
q = getQ(D)
if q == 'Q1':
return 'Jan', 'Feb', 'Mar'
elif q == 'Q2':
return 'Apr', 'May', 'Jun'
elif q == 'Q3':
return 'Jul', 'Aug', 'Sep'
else:
return 'Oct', 'Nov', 'Dec'
def index(sht, newsht, row, col):
# 更新首列:用户名 & 端口
cntRow = getRows(sht, row, col)
newsht[row, col].options(transpose=True).value = sht[1:cntRow
, col].value
def fillFormula(sht, newsht, row, col):
# 填充公式
if getRng2(sht, newsht, row, col):
rng0, rng1 = getRng2(sht, newsht, row, col)
rng0.api.AutoFill(rng1.api, constants.AutoFillType.xlFillCopy)
def fillFormula2(sht, row, col1, col2):
# 填充指定区域
rng0, rng1 = getRng3(sht, row, col1, col2)
rng0.api.AutoFill(rng1.api, constants.AutoFillType.xlFillCopy)
import xlwings as xw
from xlwings import constants
#
with connect().begin() as conn:
# 合并
conn.execute("EXEC newAve '%s', '%s', '%s'" % shtNameLis('P4P_'))
conn.execute("EXEC newAve '%s', '%s', '%s'" % shtNameLis('NP_'))
conn.execute("EXEC newAve '%s', '%s', '%s'" % shtNameLis('Infeeds_'))
# 写入excel
filLis = outputFilList('v1')
p4p, np, inf = [shtNameLis(i)[0] for i in ['P4P_', 'NP_', 'Infeeds_']]
year = D.strftime('%Y')
q = getQ(D)
m1, m2, m3 = getMonth()
for path in filLis:
# 打开
wb = xw.Book(path)
if 'Spending Forecast' in path and '_v1' not in path:
# 清除表中数据
# 更新数据
#
sht1 = wb.sheets['P4P']
clear(sht1, 1, 0)
write(sht1, 1, 0, conn, sqlSpendForecast((p4p, '端口', year)))
clear(sht1, 1, 14)
write(sht1, 1, 14, conn, sqlSpendForecast((p4p, '用户名'
, year)))
#
sht = wb.sheets['NP']
clear(sht, 1, 0)
write(sht, 1, 0, conn, sqlSpendForecast((np, '端口', year)))
clear(sht, 1, 14)
write(sht, 1, 14, conn, sqlSpendForecast((np, '用户名', year)))
#
sht = wb.sheets['Infeeds']
clear(sht, 1, 0)
write(sht, 1, 0, conn, sqlSpendForecast((inf, '端口', year)))
clear(sht, 1, 14)
write(sht, 1, 14, conn, sqlSpendForecast((inf, '用户名'
, year)))
#
sht = wb.sheets['Region']
write(sht, 2, 0, conn, sqlSpendForecast((p4p, '区域', year)))
write(sht, 11, 0, conn, sqlSpendForecast((np, '区域', year)))
write(sht, 20, 0, conn, sqlSpendForecast((inf, '区域', year)))
#
sht = wb.sheets['All']
fillFormula(sht1, sht, 1, 1)
fillFormula(sht1, sht, 1, 15)
index(sht1, sht, 1, 0)
index(sht1, sht, 1, 14)
#
sht = wb.sheets['Infeeds(35%)']
fillFormula(sht1, sht, 1, 1)
fillFormula(sht1, sht, 1, 15)
index(sht1, sht, 1, 0)
index(sht1, sht, 1, 14)
# 保存、关闭
wb.save()
wb.close()
elif 'Spending Forecast' in path and '_v1' in path:
#
sht1 = wb.sheets['P4P']
write(sht1, 1, 0, conn, sqlSpendForecastV((p4p, year)))
#
sht = wb.sheets['NP']
write(sht, 1, 0, conn, sqlSpendForecastV((np, year)))
#
sht = wb.sheets['Infeeds']
write(sht, 1, 0, conn, sqlSpendForecastV((inf, year)))
#
sht = wb.sheets['All']
clear2(sht, 3, 0, 18)
fillFormula(sht1, sht, 1, 0)
#
wb.save()
wb.close()
elif 'Cash' in path:
#
sht = wb.sheets['Region']
write(sht, 2, 0, conn, sqlCashForecast((p4p, '区域'
, year, q
, m1, m2, m3)))
write(sht, 12, 0, conn, sqlCashForecast((np, '区域'
, year, q
, m1, m2, m3)))
write(sht, 22, 0, conn, sqlCashForecast((inf, '区域'
, year, q
, m1, m2, m3)))
#
sht = wb.sheets['Finance Region']
write(sht, 2, 0, conn, sqlCashForecast((p4p, '财务'
, year, q
, m1, m2, m3)))
write(sht, 18, 0, conn, sqlCashForecast((np, '财务'
, year, q
, m1, m2, m3)))
write(sht, 34, 0, conn, sqlCashForecast((inf, '财务'
, year, q
, m1, m2, m3)))
#
wb.save()
wb.close()
elif 'Handling' in path and 'Details' not in path:
#
sht = wb.sheets['AM']
write(sht, 2, 0, conn, sqlHandlingFee((p4p, 'AM'
, year, q
, m1, m2, m3)))
write(sht, 19, 0, conn, sqlHandlingFee((np, 'AM'
, year, q
, m1, m2, m3)))
write(sht, 36, 0, conn, sqlHandlingFee((inf, 'AM'
, year, q
, m1, m2, m3)))
#
sht = wb.sheets['Sales']
write(sht, 2, 0, conn, sqlHandlingFee((p4p, 'Sales'
, year, q
, m1, m2, m3)))
write(sht, 25, 0, conn, sqlHandlingFee((np, 'Sales'
, year, q
, m1, m2, m3)))
write(sht, 49, 0, conn, sqlHandlingFee((inf, 'Sales'
, year, q
, m1, m2, m3)))
#
wb.save()
wb.close()
elif 'Handling' in path and 'Details' in path:
#
sht = wb.sheets['P4P']
clear(sht, 1, 0)
write(sht, 1, 0, conn, sqlHandlingFeeDetails(p4p))
#
sht = wb.sheets['NP']
clear(sht, 1, 0)
write(sht, 1, 0, conn, sqlHandlingFeeDetails(np))
#
sht = wb.sheets['Infeeds']
clear(sht, 1, 0)
write(sht, 1, 0, conn, sqlHandlingFeeDetails(inf))
#
wb.save()
wb.close()
elif 'Sales Forecast' in path:
#
sht = wb.sheets['Data']
clear2(sht, 3, 0, 12)
clear2(sht, 4, 12, 18)
write(sht, 3, 0, conn, sqlSalesForecast((p4p, q)))
write(sht, 3, 4, conn, sqlSalesForecast((np, q)))
write(sht, 3, 7, conn, sqlSalesForecast((inf, q)))
fillFormula2(sht, 3, 12, 18)
#
wb.save()
wb.close()
elif 'GP' in path:
#
sht = wb.sheets[0]
write(sht, 2, 0, conn, sqlGPRatio((p4p, q)))
write(sht, 13, 0, conn, sqlGPRatio((np, q)))
write(sht, 24, 0, conn, sqlGPRatio((inf, q)))
#
wb.save()
wb.close()
elif 'Sales Tracking' in path:
#
sht = wb.sheets[0]
write(sht, 2, 0, conn, sqlSalesTracking((p4p, q, m1, m2, m3)))
write(sht, 27, 0, conn, sqlSalesTracking((np, q, m1, m2, m3)))
write(sht, 52, 0, conn, sqlSalesTracking((inf, q
, m1, m2, m3)))
#
wb.save()
wb.close()
elif 'AM Tracking' in path:
#
sht = wb.sheets[0]
#
write(sht, 2, 0, conn
, sqlAMTracking((p4p, year, q, m1, m2, m3)))
write(sht, 17, 0
, conn, sqlAMTracking((np, year, q, m1, m2, m3)))
write(sht, 32, 0
, conn, sqlAMTracking((inf, year, q, m1, m2, m3)))
#
wb.save()
wb.close()
elif 'FAF' in path:
# createFAF
conn.execute(sqlCreateFAF((p4p, np, inf, year, m1, m2, m3)))
# select
# GP
sht = wb.sheets['Weekly GP Report']
write(sht, 2, 0, conn, sqlFAF(('region', year
, q, m1, m2, m3)))
# KPI
sht = wb.sheets['KPI']
write(sht, 2, 0, conn, sqlFAF(('am', year, q, m1, m2, m3)))
#
wb.save()
wb.close()
if __name__ == '__main__':
st = now()
output('v1')
print('Runtime %.3f min' % ((now() - st)/60))
动态 T-SQL
USE output
GO
-- 检查存储过程 newAve,如存在,则删除
-- EXEC pr_check 'newAve'
-- 存储过程名;
IF OBJECT_ID('pr_check') IS NOT NULL
DROP PROC pr_check
GO
CREATE PROC pr_check
@obj nvarchar(50)
AS
BEGIN
DECLARE @sql nvarchar(max)
SET @sql = 'IF OBJECT_ID(''' + @obj + ''') IS NOT NULL DROP PROC ' + @obj
EXEC sp_executesql @sql, N'@obj nvarchar(50)', @obj
END
GO
-- 检查表 t,如存在,则删除
-- EXEC pr_checkTable 't'
-- 表名;
EXEC pr_check 'pr_checkTable'
GO
CREATE PROC pr_checkTable
@t nvarchar(30)
AS
BEGIN
DECLARE @sql nvarchar(max)
SET @sql = 'IF OBJECT_ID(''' + @t + ''') IS NOT NULL DROP TABLE ' + QUOTENAME(@t)
EXEC sp_executesql @sql, N'@t nvarchar(30)', @t
END
GO
-- 查询 Spending Forecast _v1
-- EXEC [Output].[dbo].pr_spendForecastV1 'Infeeds_20200427', '2020'
-- 表名;年;
EXEC pr_check 'pr_spendForecastV1'
GO
CREATE PROC pr_spendForecastV1
@t nvarchar(30),
@Y nvarchar(10)
AS
BEGIN
DECLARE @sql nvarchar(max)
SET @sql = 'SELECT AM ' + '
, sum([Ave. Daily Workday]) AS avg_daily_workday
, sum([Ave. Daily Holiday]) AS avg_daily_holiday
, sum([Jan Spending Forecast]) AS Jan_' +RIGHT(@Y, 2)+ '
, sum([Feb Spending Forecast]) AS Feb_' +RIGHT(@Y, 2)+ '
, sum([Mar Spending Forecast]) AS Mar_' +RIGHT(@Y, 2)+ '
, sum([Apr Spending Forecast]) AS Apr_' +RIGHT(@Y, 2)+ '
, sum([May Spending Forecast]) AS May_' +RIGHT(@Y, 2)+ '
, sum([Jun Spending Forecast]) AS Jun_' +RIGHT(@Y, 2)+ '
, sum([Jul Spending Forecast]) AS Jul_' +RIGHT(@Y, 2)+ '
, sum([Aug Spending Forecast]) AS Aug_' +RIGHT(@Y, 2)+ '
, sum([Sep Spending Forecast]) AS Sep_' +RIGHT(@Y, 2)+ '
, sum([Oct Spending Forecast]) AS Oct_' +RIGHT(@Y, 2)+ '
, sum([Nov Spending Forecast]) AS Nov_' +RIGHT(@Y, 2)+ '
, sum([Dec Spending Forecast]) AS Dec_' +RIGHT(@Y, 2)+ '
From ' + QUOTENAME(@t) + '
WHERE 端口 NOT LIKE ''%wrong%''
GROUP BY AM
ORDER BY AM'
PRINT @sql
EXEC sp_executesql @sql, N'@t nvarchar(30), @Y nvarchar(10)', @t, @Y
END
GO
Q/A
1.使用 sp_executesql 时,数据类型为 nvarchar
;
上一篇: Android中自定义Spinner字体大小、颜色等样式 改变下拉选项的高度
下一篇: 综合案例