欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python 串行执行和并行执行实例

程序员文章站 2022-06-03 12:53:26
我就废话不多说了,大家还是直接看代码吧!#coding=utf-8 import threading import time import cx_oracle from pprint import p...

我就废话不多说了,大家还是直接看代码吧!

#coding=utf-8
 
import threading
 
import time
 
import cx_oracle
 
from pprint import pprint
 
import csv
 
print time.asctime()
 
table_name = "dbtest.csv"
 
f = open(table_name + ".csv", "w")
 
conn = cx_oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
 
cursor = conn.cursor()
 
def query01():
 
  tname = threading.current_thread()
 
  aa=10
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
def query02():
 
  tname = threading.current_thread()
 
  aa=20
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
def query03():
 
  tname = threading.current_thread()
 
  aa=30
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
def query04():
 
  tname = threading.current_thread()
 
  aa=40
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
def query05():
 
  tname = threading.current_thread()
 
  aa=50
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
def query06():
 
  tname = threading.current_thread()
 
  aa=60
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
def query07():
 
  tname = threading.current_thread()
 
  aa=70
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  return row
 
  cursor.close()
 
def query08():
 
  tname = threading.current_thread()
 
  aa=80
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
def query09():
 
  tname = threading.current_thread()
 
  aa=90
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
def query10():
 
  tname = threading.current_thread()
 
  aa=100
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close() 
 
 
threads = []
 
t1 = threading.thread(target=query01, name='query01')
 
threads.append(t1)
 
t2 = threading.thread(target=query02, name='query02')
 
threads.append(t2)
 
t2 = threading.thread(target=query03, name='query03')
 
threads.append(t2)
 
t2 = threading.thread(target=query04, name='query04')
 
threads.append(t2)
 
t2 = threading.thread(target=query05, name='query05')
 
threads.append(t2)
 
t2 = threading.thread(target=query06, name='query06')
 
threads.append(t2)
 
t2 = threading.thread(target=query07, name='query07')
 
threads.append(t2)
 
t2 = threading.thread(target=query08, name='query08')
 
threads.append(t2)
 
t2 = threading.thread(target=query09, name='query09')
 
threads.append(t2)
 
t2 = threading.thread(target=query10, name='query10')
 
threads.append(t2) 
 
if __name__ == '__main__':
 
  for t in threads:
 
    # t.setdaemon(true)
 
    t.start()
 
    # t.run()
 
    # t.start()
 
    # print '3333333'
 
    print threading.current_thread()
 
    # print t.is_alive()
 
    # print '3333333'
 
    t.join()
 
  print "all over "
 
  print time.asctime() 
 
 
c:\python27\python.exe c:/users/tlcb/pycharmprojects/untitled/a2.py
 
wed mar 28 11:08:19 2018
 
<_mainthread(mainthread, started 18744)>
 
[(10, '10boobook10', '10aaaaaaaaaaaa10', '10bbbbbbbbbbbbbbbbb10'), (10, '10sssssssss10', 'tlcb', 'tlcb'), (10, '10boobook10', '10aaaaaaaaaaaa10', '10bbbbbbbbbbbbbbbbb10')]
 
<_mainthread(mainthread, started 18744)>
 
[(20, '20boobook20', '20aaaaaaaaaaaa20', '20bbbbbbbbbbbbbbbbb20'), (20, '20boobook20', '20aaaaaaaaaaaa20', '20bbbbbbbbbbbbbbbbb20'), (20, '20boobook20', '20aaaaaaaaaaaa20', '20bbbbbbbbbbbbbbbbb20')]
 
<_mainthread(mainthread, started 18744)>
 
[(30, '30boobook30', '30aaaaaaaaaaaa30', '30bbbbbbbbbbbbbbbbb30'), (30, '30boobook30', '30aaaaaaaaaaaa30', '30bbbbbbbbbbbbbbbbb30'), (30, '30boobook30', '30aaaaaaaaaaaa30', '30bbbbbbbbbbbbbbbbb30')]
 
<_mainthread(mainthread, started 18744)>
 
[(40, '40boobook40', '40aaaaaaaaaaaa40', '40bbbbbbbbbbbbbbbbb40'), (40, '40boobook40', '40aaaaaaaaaaaa40', '40bbbbbbbbbbbbbbbbb40'), (40, '40boobook40', '40aaaaaaaaaaaa40', '40bbbbbbbbbbbbbbbbb40')]
 
<_mainthread(mainthread, started 18744)>
 
[(50, '50boobook50', '50aaaaaaaaaaaa50', '50bbbbbbbbbbbbbbbbb50'), (50, '50boobook50', '50aaaaaaaaaaaa50', '50bbbbbbbbbbbbbbbbb50'), (50, '50boobook50', '50aaaaaaaaaaaa50', '50bbbbbbbbbbbbbbbbb50')]
 
<_mainthread(mainthread, started 18744)>
 
[(60, '60boobook60', '60aaaaaaaaaaaa60', '60bbbbbbbbbbbbbbbbb60'), (60, '60boobook60', '60aaaaaaaaaaaa60', '60bbbbbbbbbbbbbbbbb60'), (60, '60boobook60', '60aaaaaaaaaaaa60', '60bbbbbbbbbbbbbbbbb60')]
 
<_mainthread(mainthread, started 18744)>
 
<_mainthread(mainthread, started 18744)>
 
[(80, '80boobook80', '80aaaaaaaaaaaa80', '80bbbbbbbbbbbbbbbbb80'), (80, '80boobook80', '80aaaaaaaaaaaa80', '80bbbbbbbbbbbbbbbbb80'), (80, '80boobook80', '80aaaaaaaaaaaa80', '80bbbbbbbbbbbbbbbbb80')]
 
<_mainthread(mainthread, started 18744)>
 
[(90, '90boobook90', '90aaaaaaaaaaaa90', '90bbbbbbbbbbbbbbbbb90'), (90, '90boobook90', '90aaaaaaaaaaaa90', '90bbbbbbbbbbbbbbbbb90'), (90, '90boobook90', '90aaaaaaaaaaaa90', '90bbbbbbbbbbbbbbbbb90')]
 
<_mainthread(mainthread, started 18744)>
 
[(100, '100boobook100', '100aaaaaaaaaaaa100', '100bbbbbbbbbbbbbbbbb100'), (100, '100boobook100', '100aaaaaaaaaaaa100', '100bbbbbbbbbbbbbbbbb100'), (100, '100boobook100', '100aaaaaaaaaaaa100', '100bbbbbbbbbbbbbbbbb100')]
 
all over 
 
wed mar 28 11:08:34 2018 
 
process finished with exit code 0 
 
 
这个时候是串行 花费了15秒 
 
 
多线程跑:
 
#coding=utf-8
 
import threading
 
import time
 
import cx_oracle
 
from pprint import pprint
 
import csv
 
print time.asctime()
 
table_name = "dbtest.csv"
 
f = open(table_name + ".csv", "w") 
 
 
def query01():
 
  tname = threading.current_thread()
 
  aa=10
 
  conn = cx_oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
 
  cursor = conn.cursor()
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
  conn.close()
 
def query02():
 
  tname = threading.current_thread()
 
  aa=20
 
  conn = cx_oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
 
  cursor = conn.cursor()
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
  conn.close()
 
def query03():
 
  tname = threading.current_thread()
 
  aa=30
 
  conn = cx_oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
 
  cursor = conn.cursor()
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
  conn.close()
 
def query04():
 
  conn = cx_oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
 
  cursor = conn.cursor()
 
  tname = threading.current_thread()
 
  aa=40
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
  conn.close()
 
def query05():
 
  conn = cx_oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
 
  cursor = conn.cursor()
 
  tname = threading.current_thread()
 
  aa=50
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
  conn.close()
 
def query06():
 
  conn = cx_oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
 
  cursor = conn.cursor()
 
  tname = threading.current_thread()
 
  aa=60
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
  conn.close()
 
def query07():
 
  conn = cx_oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
 
  cursor = conn.cursor()
 
  tname = threading.current_thread()
 
  aa=70
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  return row
 
  cursor.close()
 
  conn.close()
 
def query08():
 
  conn = cx_oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
 
  cursor = conn.cursor()
 
  tname = threading.current_thread()
 
  aa=80
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
  conn.close()
 
def query09():
 
  conn = cx_oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
 
  cursor = conn.cursor()
 
  tname = threading.current_thread()
 
  aa=90
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
  conn.close()
 
def query10():
 
  conn = cx_oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
 
  cursor = conn.cursor()
 
  tname = threading.current_thread()
 
  aa=100
 
  # cursor.execute("select * from test100 where id = %s", [aa])
 
  cursor.prepare("""select * from test100 where id=:id""")
 
  cursor.execute(none,{'id':aa})
 
  row = cursor.fetchall()
 
  print row
 
  return row
 
  cursor.close()
 
  conn.close() 
 
 
threads = []
 
t1 = threading.thread(target=query01, name='query01')
 
threads.append(t1)
 
t2 = threading.thread(target=query02, name='query02')
 
threads.append(t2)
 
t2 = threading.thread(target=query03, name='query03')
 
threads.append(t2)
 
t2 = threading.thread(target=query04, name='query04')
 
threads.append(t2)
 
t2 = threading.thread(target=query05, name='query05')
 
threads.append(t2)
 
t2 = threading.thread(target=query06, name='query06')
 
threads.append(t2)
 
t2 = threading.thread(target=query07, name='query07')
 
threads.append(t2)
 
t2 = threading.thread(target=query08, name='query08')
 
threads.append(t2)
 
t2 = threading.thread(target=query09, name='query09')
 
threads.append(t2)
 
t2 = threading.thread(target=query10, name='query10')
 
threads.append(t2) 
 
 
if __name__ == '__main__':
 
  for t in threads:
 
    # t.setdaemon(true)
 
    t.start()
 
    # t.run()
 
    # t.start()
 
    # print '3333333'
 
    print threading.current_thread()
 
    # print t.is_alive()
 
    # print '3333333'
 
  t.join()
 
  print "all over "
 
  print time.asctime() 
 
 
c:\python27\python.exe c:/users/tlcb/pycharmprojects/untitled/a2.py
 
wed mar 28 11:12:47 2018
 
<_mainthread(mainthread, started 22500)>
 
<_mainthread(mainthread, started 22500)>
 
<_mainthread(mainthread, started 22500)>
 
<_mainthread(mainthread, started 22500)>
 
<_mainthread(mainthread, started 22500)>
 
<_mainthread(mainthread, started 22500)>
 
<_mainthread(mainthread, started 22500)>
 
<_mainthread(mainthread, started 22500)>
 
<_mainthread(mainthread, started 22500)>
 
<_mainthread(mainthread, started 22500)>
 
[(40, '40boobook40', '40aaaaaaaaaaaa40', '40bbbbbbbbbbbbbbbbb40'), (40, '40boobook40', '40aaaaaaaaaaaa40', '40bbbbbbbbbbbbbbbbb40'), (40, '40boobook40', '40aaaaaaaaaaaa40', '40bbbbbbbbbbbbbbbbb40')]
 
[(60, '60boobook60', '60aaaaaaaaaaaa60', '60bbbbbbbbbbbbbbbbb60'), (60, '60boobook60', '60aaaaaaaaaaaa60', '60bbbbbbbbbbbbbbbbb60'), (60, '60boobook60', '60aaaaaaaaaaaa60', '60bbbbbbbbbbbbbbbbb60')]
 
[(80, '80boobook80', '80aaaaaaaaaaaa80', '80bbbbbbbbbbbbbbbbb80'), (80, '80boobook80', '80aaaaaaaaaaaa80', '80bbbbbbbbbbbbbbbbb80'), (80, '80boobook80', '80aaaaaaaaaaaa80', '80bbbbbbbbbbbbbbbbb80')]
 
[(50, '50boobook50', '50aaaaaaaaaaaa50', '50bbbbbbbbbbbbbbbbb50'), (50, '50boobook50', '50aaaaaaaaaaaa50', '50bbbbbbbbbbbbbbbbb50'), (50, '50boobook50', '50aaaaaaaaaaaa50', '50bbbbbbbbbbbbbbbbb50')]
 
[(10, '10boobook10', '10aaaaaaaaaaaa10', '10bbbbbbbbbbbbbbbbb10'), (10, '10sssssssss10', 'tlcb', 'tlcb'), (10, '10boobook10', '10aaaaaaaaaaaa10', '10bbbbbbbbbbbbbbbbb10')]
 
[(20, '20boobook20', '20aaaaaaaaaaaa20', '20bbbbbbbbbbbbbbbbb20'), (20, '20boobook20', '20aaaaaaaaaaaa20', '20bbbbbbbbbbbbbbbbb20'), (20, '20boobook20', '20aaaaaaaaaaaa20', '20bbbbbbbbbbbbbbbbb20')]
 
[(30, '30boobook30', '30aaaaaaaaaaaa30', '30bbbbbbbbbbbbbbbbb30'), (30, '30boobook30', '30aaaaaaaaaaaa30', '30bbbbbbbbbbbbbbbbb30'), (30, '30boobook30', '30aaaaaaaaaaaa30', '30bbbbbbbbbbbbbbbbb30')]
 
[(100, '100boobook100', '100aaaaaaaaaaaa100', '100bbbbbbbbbbbbbbbbb100'), (100, '100boobook100', '100aaaaaaaaaaaa100', '100bbbbbbbbbbbbbbbbb100'), (100, '100boobook100', '100aaaaaaaaaaaa100', '100bbbbbbbbbbbbbbbbb100')]
 
[(90, '90boobook90', '90aaaaaaaaaaaa90', '90bbbbbbbbbbbbbbbbb90'), (90, '90boobook90', '90aaaaaaaaaaaa90', '90bbbbbbbbbbbbbbbbb90'), (90, '90boobook90', '90aaaaaaaaaaaa90', '90bbbbbbbbbbbbbbbbb90')]
 
all over 
 
wed mar 28 11:12:55 2018 
 
process finished with exit code 0 
 
此时花了 8秒

补充知识:python logging定制logstash的json日志格式

最近一直在折腾日志的收集,现在算是收尾了。 写一篇算python优化logstash的方案。

其实大家都知道logstash调用grok来解析日志的话,是要消耗cpu的成本的,毕竟是需要正则的匹配的。

根据logstash调优的方案,咱们可以预先生成json的格式。 我这边基本是python的程序,怎么搞尼 ?

有两种方法,第一种方法是生成json后,直接打入logstash的端口。 还有一种是生成json写入文件,让logstash做tail操作的时候,把一行的日志数据直接载入json就可以了。

python下的日志调试用得时logging,改成json也是很好改得。 另外不少老外已经考虑到这样的需求,已经做了python logstash的模块。

import logging
import logstash
import sys

host = 'localhost'

test_logger = logging.getlogger('python-logstash-logger')
test_logger.setlevel(logging.info)
test_logger.addhandler(logstash.logstashhandler(host, 5959, version=1))
# test_logger.addhandler(logstash.tcplogstashhandler(host, 5959, version=1))

test_logger.error('python-logstash: test logstash error message.')
test_logger.info('python-logstash: test logstash info message.')
test_logger.warning('python-logstash: test logstash warning message.')

# add extra field to logstash message
extra = {
  'test_string': 'python version: ' + repr(sys.version_info),
  'test_boolean': true,
  'test_dict': {'a': 1, 'b': 'c'},
  'test_float': 1.23,
  'test_integer': 123,
  'test_list': [1, 2, '3'],
}
test_logger.info('python-logstash: test extra fields', extra=extra)

python-logstash自带了amqp的方案

import logging
import logstash

# amqp parameters
host = 'localhost'
username = 'guest'
password= 'guest'
exchange = 'logstash.py'

# get a logger and set logging level
test_logger = logging.getlogger('python-logstash-logger')
test_logger.setlevel(logging.info)

# add the handler
test_logger.addhandler(logstash.amqplogstashhandler(version=1,
                          host=host,
                          durable=true,
                          username=username,
                          password=password,
                          exchange=exchange))

# log
test_logger.error('python-logstash: test logstash error message.')
test_logger.info('python-logstash: test logstash info message.')
test_logger.warning('python-logstash: test logstash warning message.')

try:
  1/0
except:
  test_logger.exception('python-logstash: test logstash exception with stack trace')

不管怎么说,最后生成的格式是这样就可以了。

{
  "@source"=>"unknown",
  "@type"=>"nginx",
  "@tags"=>[],
  "@fields"=>{
    "remote_addr"=>"192.168.0.1",
    "remote_user"=>"-",
    "body_bytes_sent"=>"13988",
    "request_time"=>"0.122",
    "status"=>"200",
    "request"=>"get /some/url http/1.1",
    "request_method"=>"get",
    "http_referrer"=>"http://www.example.org/some/url",
    "http_user_agent"=>"mozilla/5.0 (x11; linux x86_64) applewebkit/537.1 (khtml, like gecko) chrome/21.0.1180.79 safari/537.1"
  },
  "@timestamp"=>"2012-08-23t10:49:14+02:00"
}

我这里简单提一下,这个模块用的不是很满意,我在python下把日志打成了json字符串,我原本以为会像grok那样,在es里面,我的这条日志是个字段的结构,而不是这个日志都在message里面…. 我想大家应该明白了我的意思,这样很是不容易在kibana的搜索…

在kibana搜索,我经常上 source:xxx and level:info 结果正像上面描述的那样,整条日志,都在@message里面。

以上这篇python 串行执行和并行执行实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。