欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【案例】构建动态 sql 实现查询数据写入 excel 模版

程序员文章站 2022-07-04 16:25:37
...

工具

Python 3.6
Win 7
Spyder
SQL Server 2014 Management Studio

Python 实现代码


import os
import time
import functools
import pandas as pd
from auto_ave import getPath
from datetime import datetime, timedelta, date


now = lambda : time.perf_counter()
dat = lambda n: date.today() - timedelta(n)

D = dat(1)

def log(func):
    @functools.wraps(func)
    def wrapper(*args, **kw):
        print("Call %s():" % func.__name__)
        return func(*args, **kw)
    return wrapper

def getQ(dat):
    # 返回给定日期所在季度
    # 目的:剔除非本季度的消费
    if isinstance(dat, date):
        m = dat.month
        if m in (1, 2, 3):
            return 'Q1'
        elif m in (4, 5, 6):
            return 'Q2'
        elif m in (7, 8, 9):
            return 'Q3'
        else:
            return 'Q4'
    else:
        raise

def connect():
    def loginInfo(section):
        from configparser import ConfigParser
        path = r'H:\SZ_数据\Python\c.s.conf'
        #path = r'C:\users\chen.huaiyu\Chinasearch\c.s.conf'
        conf = ConfigParser()
        conf.read(path)
        return (conf.get(section, 'acc'), conf.get(section, 'pw')
                , conf.get(section, 'ip'), conf.get(section, 'port')
                , conf.get(section, 'dbname'))
    
    from sqlalchemy import create_engine
    try:
        engine = create_engine(
            'mssql+pymssql://%s:%[email protected]%s:%s/%s' % loginInfo('Output'))
    except Exception as e:
        print('连接失败: %s' % e)
        raise
    else:
        return engine

@log
def output(version):
    
    def outputFilList(version):
        '''
        构建 output 文件列表
        以模版为基础新建文件夹、复制文件、修改名称
    
        Returns
        -------
        None.
    
        '''
        def sFilName():
            for d in range(2, 60):
                filName = dat(d).strftime('%Y%m%d v1')
                if os.path.isdir(os.path.join(path, filName)):
                    return filName, dat(d).year, getQ(dat(d))
        
        def source():
            return os.path.join(path, sFilName()[0])
        
        def tFilName():
            return D.strftime('%Y%m%d ' + version)
        
        def target():
            return os.path.join(path, tFilName())
        
        def cpDir():
            import shutil
            lis = []
            sName, sYear, sQuar = sFilName()
            sourceDir = source()
            if not os.path.exists(target()):
                shutil.copytree(sourceDir, target())    
            for f in os.listdir(sourceDir):
                s = os.path.join(target(), f)
                # modify new name
                f = f.replace(sName, tFilName())
                f = f.replace(str(sYear), str(D.year))   # year
                f = f.replace(sQuar, getQ(D))   # quarter
                t = os.path.join(target(), f)
                lis.append(t)
                os.rename(s, t)
            return lis
        
        path = r'H:\SZ_数据\Output'
        return cpDir()
    
    def data(conn, sql):
        return list(map(lambda x: list(x), conn.execute(sql).fetchall()))
    
    def shtNameLis(table):
        strDat = D.strftime('%Y%m%d')
        return (table + strDat, table+strDat+'_1', table+strDat+'_2')
    
    def getRows(sht, row, col):
        # 获取行数
        return sht[row, col].current_region.rows.count
    
    def getCols(sht, row, col):
        # 获取列数
        return sht[row, col].current_region.columns.count
    
    def getRng(sht, row, col):
        # 数据首行区域 & 全部区域
        cntRow = getRows(sht, row, col)
        cntCol = getCols(sht, row, col)
        return sht[row, col:col+cntCol], sht[row:row+cntRow, col:col+cntCol]
    
    def getRng2(sht, newsht, row, col):
        # 外部表与当前表对应区域
        cntRow = getRows(sht, row, col)
        cntCol = getCols(sht, row, col)
        if getRows(newsht, row, col) < cntRow:
            return newsht[row, col:col+cntCol], newsht[row:row+cntRow-1
                                                       , col:col+cntCol]
        
    def getRng3(sht, row, col1, col2):
        # 获取指定区域
        cntRow = getRows(sht, row, col1)
        return sht[row, col1:col2], sht[row:row+cntRow-3, col1:col2]
    
    def clear2(sht, row, col1, col2):
        # 清空 指定区域
        cntRow = getRows(sht, row, col1)
        rng = sht[row:row+cntRow, col1:col2]
        rng.clear()
    
    def clear(sht, row, col):
        # 清空 所有区域
        _, rng = getRng(sht, row, col)
        rng.clear()
        
    def write(sht, row, col, conn, sql):
        # 指定位置写入数据
        sht[row, col].value = data(conn, sql)
        
    def sqlSpendForecast(lis):
        return "EXEC dbo.pr_spendForecast '%s', '%s', '%s'" % lis
    
    def sqlSpendForecastV(lis):
        return "EXEC dbo.pr_spendForecastV1 '%s', '%s' " % lis
    
    def sqlCashForecast(lis):
        return ''' EXEC dbo.pr_cashSpendForecast '%s', '%s', '%s', '%s', '%s'
                , '%s', '%s' ''' % lis
    
    def sqlHandlingFee(lis):
        return ''' EXEC dbo.pr_handlingFee '%s', '%s', '%s', '%s', '%s', '%s'
                , '%s' ''' % lis
    
    def sqlHandlingFeeDetails(t):
        return ''' EXEC dbo.pr_handlingFeeDetails %s ''' % t
    
    def sqlSalesForecast(lis):
        return ''' EXEC dbo.pr_salesForecast '%s', '%s' ''' % lis
    
    def sqlGPRatio(lis):
        return ''' EXEC dbo.pr_GPRatio '%s', '%s' ''' % lis
    
    def sqlSalesTracking(lis):
        return ''' EXEC dbo.pr_salesTracking '%s', '%s', '%s', '%s'
                , '%s' ''' % lis
    
    def sqlAMTracking(lis):
        return ''' EXEC dbo.pr_AMTracking '%s', '%s', '%s', '%s', '%s'
                , '%s' ''' % lis
    
    def sqlCreateFAF(lis):
        return ''' EXEC dbo.pr_createFAF '%s', '%s', '%s', '%s', '%s', '%s'
                , '%s' ''' % lis
    
    def sqlFAF(lis):
        return ''' EXEC dbo.pr_FAF '%s', '%s', '%s', '%s', '%s', '%s' ''' % lis
    
    def getMonth():
        q = getQ(D)
        if q == 'Q1':
            return 'Jan', 'Feb', 'Mar'
        elif q == 'Q2':
            return 'Apr', 'May', 'Jun'
        elif q == 'Q3':
            return 'Jul', 'Aug', 'Sep'
        else:
            return 'Oct', 'Nov', 'Dec'
    
    def index(sht, newsht, row, col):
        # 更新首列:用户名 & 端口
        cntRow = getRows(sht, row, col)
        newsht[row, col].options(transpose=True).value = sht[1:cntRow
                                                             , col].value
    
    def fillFormula(sht, newsht, row, col):
        # 填充公式
        if getRng2(sht, newsht, row, col):
            rng0, rng1 = getRng2(sht, newsht, row, col)
            rng0.api.AutoFill(rng1.api, constants.AutoFillType.xlFillCopy)
    
    def fillFormula2(sht, row, col1, col2):
        # 填充指定区域
        rng0, rng1 = getRng3(sht, row, col1, col2)
        rng0.api.AutoFill(rng1.api, constants.AutoFillType.xlFillCopy)
    
    import xlwings as xw
    from xlwings import constants
    #
    with connect().begin() as conn:
        # 合并
        conn.execute("EXEC newAve '%s', '%s', '%s'" % shtNameLis('P4P_'))
        conn.execute("EXEC newAve '%s', '%s', '%s'" % shtNameLis('NP_'))
        conn.execute("EXEC newAve '%s', '%s', '%s'" % shtNameLis('Infeeds_'))
        # 写入excel
        filLis = outputFilList('v1')
        p4p, np, inf = [shtNameLis(i)[0] for i in ['P4P_', 'NP_', 'Infeeds_']]
        year = D.strftime('%Y')
        q = getQ(D)
        m1, m2, m3 = getMonth()
        for path in filLis:
            # 打开
            wb = xw.Book(path)
            if 'Spending Forecast' in path and '_v1' not in path:
                # 清除表中数据
                # 更新数据
                #
                sht1 = wb.sheets['P4P']
                clear(sht1, 1, 0)
                write(sht1, 1, 0, conn, sqlSpendForecast((p4p, '端口', year)))
                clear(sht1, 1, 14)
                write(sht1, 1, 14, conn, sqlSpendForecast((p4p, '用户名'
                                                           , year)))
                #
                sht = wb.sheets['NP']
                clear(sht, 1, 0)
                write(sht, 1, 0, conn, sqlSpendForecast((np, '端口', year)))
                clear(sht, 1, 14)
                write(sht, 1, 14, conn, sqlSpendForecast((np, '用户名', year)))
                #
                sht = wb.sheets['Infeeds']
                clear(sht, 1, 0)
                write(sht, 1, 0, conn, sqlSpendForecast((inf, '端口', year)))
                clear(sht, 1, 14)
                write(sht, 1, 14, conn, sqlSpendForecast((inf, '用户名'
                                                          , year)))
                #
                sht = wb.sheets['Region']
                write(sht, 2, 0, conn, sqlSpendForecast((p4p, '区域', year)))
                write(sht, 11, 0, conn, sqlSpendForecast((np, '区域', year)))
                write(sht, 20, 0, conn, sqlSpendForecast((inf, '区域', year)))
                #
                sht = wb.sheets['All']
                fillFormula(sht1, sht, 1, 1)
                fillFormula(sht1, sht, 1, 15)
                index(sht1, sht, 1, 0)
                index(sht1, sht, 1, 14)
                #
                sht = wb.sheets['Infeeds(35%)']
                fillFormula(sht1, sht, 1, 1)
                fillFormula(sht1, sht, 1, 15)
                index(sht1, sht, 1, 0)
                index(sht1, sht, 1, 14)
                # 保存、关闭
                wb.save()
                wb.close()
            elif 'Spending Forecast' in path and '_v1' in path:
                #
                sht1 = wb.sheets['P4P']
                write(sht1, 1, 0, conn, sqlSpendForecastV((p4p, year)))
                #
                sht = wb.sheets['NP']
                write(sht, 1, 0, conn, sqlSpendForecastV((np, year)))
                #
                sht = wb.sheets['Infeeds']
                write(sht, 1, 0, conn, sqlSpendForecastV((inf, year)))
                #
                sht = wb.sheets['All']
                clear2(sht, 3, 0, 18)
                fillFormula(sht1, sht, 1, 0)
                #
                wb.save()
                wb.close()
            elif 'Cash' in path:
                #
                sht = wb.sheets['Region']
                write(sht, 2, 0, conn, sqlCashForecast((p4p, '区域'
                                                        , year, q
                                                        , m1, m2, m3)))
                write(sht, 12, 0, conn, sqlCashForecast((np, '区域'
                                                         , year, q
                                                         , m1, m2, m3)))
                write(sht, 22, 0, conn, sqlCashForecast((inf, '区域'
                                                         , year, q
                                                         , m1, m2, m3)))
                #
                sht = wb.sheets['Finance Region']
                write(sht, 2, 0, conn, sqlCashForecast((p4p, '财务'
                                                        , year, q
                                                        , m1, m2, m3)))
                write(sht, 18, 0, conn, sqlCashForecast((np, '财务'
                                                         , year, q
                                                         , m1, m2, m3)))
                write(sht, 34, 0, conn, sqlCashForecast((inf, '财务'
                                                         , year, q
                                                         , m1, m2, m3)))
                # 
                wb.save()
                wb.close()
            elif 'Handling' in path and 'Details' not in path:
                #
                sht = wb.sheets['AM']
                write(sht, 2, 0, conn, sqlHandlingFee((p4p, 'AM'
                                                       , year, q
                                                       , m1, m2, m3)))
                write(sht, 19, 0, conn, sqlHandlingFee((np, 'AM'
                                                        , year, q
                                                        , m1, m2, m3)))
                write(sht, 36, 0, conn, sqlHandlingFee((inf, 'AM'
                                                        , year, q
                                                        , m1, m2, m3)))
                #
                sht = wb.sheets['Sales']
                write(sht, 2, 0, conn, sqlHandlingFee((p4p, 'Sales'
                                                       , year, q
                                                       , m1, m2, m3)))
                write(sht, 25, 0, conn, sqlHandlingFee((np, 'Sales'
                                                        , year, q
                                                        , m1, m2, m3)))
                write(sht, 49, 0, conn, sqlHandlingFee((inf, 'Sales'
                                                        , year, q
                                                        , m1, m2, m3)))
                #
                wb.save()
                wb.close()
            elif 'Handling' in path and 'Details' in path:
                #
                sht = wb.sheets['P4P']
                clear(sht, 1, 0)
                write(sht, 1, 0, conn, sqlHandlingFeeDetails(p4p))
                #
                sht = wb.sheets['NP']
                clear(sht, 1, 0)
                write(sht, 1, 0, conn, sqlHandlingFeeDetails(np))
                #
                sht = wb.sheets['Infeeds']
                clear(sht, 1, 0)
                write(sht, 1, 0, conn, sqlHandlingFeeDetails(inf))
                #
                wb.save()
                wb.close()
            elif 'Sales Forecast' in path:
                #
                sht = wb.sheets['Data']
                clear2(sht, 3, 0, 12)
                clear2(sht, 4, 12, 18)
                write(sht, 3, 0, conn, sqlSalesForecast((p4p, q)))
                write(sht, 3, 4, conn, sqlSalesForecast((np, q)))
                write(sht, 3, 7, conn, sqlSalesForecast((inf, q)))
                fillFormula2(sht, 3, 12, 18)
                #
                wb.save()
                wb.close()
            elif 'GP' in path:
                #
                sht = wb.sheets[0]
                write(sht, 2, 0, conn, sqlGPRatio((p4p, q)))
                write(sht, 13, 0, conn, sqlGPRatio((np, q)))
                write(sht, 24, 0, conn, sqlGPRatio((inf, q)))
                #
                wb.save()
                wb.close()
            elif 'Sales Tracking' in path:
                #
                sht = wb.sheets[0]
                write(sht, 2, 0, conn, sqlSalesTracking((p4p, q, m1, m2, m3)))
                write(sht, 27, 0, conn, sqlSalesTracking((np, q, m1, m2, m3)))
                write(sht, 52, 0, conn, sqlSalesTracking((inf, q
                                                          , m1, m2, m3)))
                # 
                wb.save()
                wb.close()
            elif 'AM Tracking' in path:
                #
                sht = wb.sheets[0]
                #
                write(sht, 2, 0, conn
                      , sqlAMTracking((p4p, year, q, m1, m2, m3)))
                write(sht, 17, 0
                      , conn, sqlAMTracking((np, year, q, m1, m2, m3)))
                write(sht, 32, 0
                      , conn, sqlAMTracking((inf, year, q, m1, m2, m3)))
                #
                wb.save()
                wb.close()
            elif 'FAF' in path:
                # createFAF
                conn.execute(sqlCreateFAF((p4p, np, inf, year, m1, m2, m3)))
                # select
                # GP
                sht = wb.sheets['Weekly GP Report']
                write(sht, 2, 0, conn, sqlFAF(('region', year
                                               , q, m1, m2, m3)))
                # KPI
                sht = wb.sheets['KPI']
                write(sht, 2, 0, conn, sqlFAF(('am', year, q, m1, m2, m3)))
                #
                wb.save()
                wb.close()  




if __name__ == '__main__':
    
    st = now()
    
    output('v1')
    
    print('Runtime %.3f min' % ((now() - st)/60))

动态 T-SQL


USE output
GO
-- 检查存储过程 newAve,如存在,则删除
-- EXEC pr_check 'newAve'
-- 存储过程名;
IF OBJECT_ID('pr_check') IS NOT NULL
	DROP PROC pr_check
GO
CREATE PROC pr_check
	@obj nvarchar(50)
AS
BEGIN
	DECLARE @sql nvarchar(max)
	SET @sql = 'IF OBJECT_ID(''' + @obj + ''') IS NOT NULL DROP PROC ' + @obj
	EXEC sp_executesql @sql, N'@obj nvarchar(50)', @obj
END
GO

-- 检查表 t,如存在,则删除
-- EXEC pr_checkTable 't'
-- 表名;
EXEC pr_check 'pr_checkTable'
GO
CREATE PROC pr_checkTable
	@t nvarchar(30)
AS
BEGIN
	DECLARE @sql nvarchar(max)
	SET @sql = 'IF OBJECT_ID(''' + @t + ''') IS NOT NULL DROP TABLE ' + QUOTENAME(@t)
	EXEC sp_executesql @sql, N'@t nvarchar(30)', @t
END
GO

-- 查询 Spending Forecast  _v1
-- EXEC [Output].[dbo].pr_spendForecastV1 'Infeeds_20200427', '2020'
-- 表名;年;
EXEC pr_check 'pr_spendForecastV1'
GO
CREATE PROC pr_spendForecastV1
	@t nvarchar(30),
	@Y nvarchar(10)
AS
BEGIN
	DECLARE @sql nvarchar(max)
	SET @sql = 'SELECT AM ' + '
			, sum([Ave. Daily Workday]) AS avg_daily_workday
			, sum([Ave. Daily Holiday]) AS avg_daily_holiday
			, sum([Jan Spending Forecast]) AS Jan_' +RIGHT(@Y, 2)+ '
			, sum([Feb Spending Forecast]) AS Feb_' +RIGHT(@Y, 2)+ '
			, sum([Mar Spending Forecast]) AS Mar_' +RIGHT(@Y, 2)+ '
			, sum([Apr Spending Forecast]) AS Apr_' +RIGHT(@Y, 2)+ '
			, sum([May Spending Forecast]) AS May_' +RIGHT(@Y, 2)+ '
			, sum([Jun Spending Forecast]) AS Jun_' +RIGHT(@Y, 2)+ '
			, sum([Jul Spending Forecast]) AS Jul_' +RIGHT(@Y, 2)+ '
			, sum([Aug Spending Forecast]) AS Aug_' +RIGHT(@Y, 2)+ '
			, sum([Sep Spending Forecast]) AS Sep_' +RIGHT(@Y, 2)+ '
			, sum([Oct Spending Forecast]) AS Oct_' +RIGHT(@Y, 2)+ '
			, sum([Nov Spending Forecast]) AS Nov_' +RIGHT(@Y, 2)+ '
			, sum([Dec Spending Forecast]) AS Dec_' +RIGHT(@Y, 2)+ '
		From ' + QUOTENAME(@t) + ' 
		WHERE 端口 NOT LIKE ''%wrong%'' 
		GROUP BY AM
		ORDER BY AM'
	PRINT @sql
	EXEC sp_executesql @sql, N'@t nvarchar(30), @Y nvarchar(10)', @t, @Y
END
GO

Q/A

1.使用 sp_executesql 时,数据类型为 nvarchar

相关标签: Python应用 SQL