tensorflow、多GPU、多线程训练VGG19来做cifar-10分类
程序员文章站
2022-07-13 11:42:30
...
背景:几天前需要写个多GPU训练的算法模型,翻来覆去在tensorflow的官网上看到cifar-10的官方代码,花了三天时间去掉代码的冗余部分和改写成自己的风格。
代码共有6部分构成:
1、data_input.py 由于cifar-10官方训练集和验证集都是.bin格式文件,故使用同一份数据读入代码
2、network.py 搭建VGG19,返回带weight decay的变量loss和交叉熵之和作为总loss
3、train.py 在每个GPU中建立tower,并行训练
4、val.py 多线程进行模型验证
5、toTFRecords.py 由于使用多线程无法读入label,故将图像和图像名(作label)制作成TFRecords
6、 test_tfrecords.py 读TFRecords文件,计算模型输出成csv
1、data_input.py:坑点为多线程读入数据时,如果num_threads==1,则万事大吉,否则,由于不同线程读取数据快慢不同,读入num_threads个数据的时候会出现一定程度的顺序打乱
import os
import tensorflow as tf
class data_input(object):
def __init__(self, data_dir, batch_size, num_classes, is_training):
self.data_dir = data_dir
self.batch_size = batch_size
self.num_classes = num_classes
self.is_training = is_training
self.image_batch, self.label_batch = self.load_data()
#Input of this function is args, output is batch of images and batch of labels.
def load_data(self):
if self.is_training == True:
filenames = [os.path.join(self.data_dir, 'data_batch_%d.bin' % i) for i in range(1, 6)]
else:
filenames = [os.path.join(self.data_dir, 'test_batch.bin')]
for f in filenames:
if not tf.gfile.Exists(f):
raise ValueError('Failed to find file: ' + f)
filename_queue = tf.train.string_input_producer(filenames, shuffle = False)
image, label = self.load_one_sample(filename_queue)
image.set_shape([32, 32, 3])
label.set_shape([self.num_classes])
#if the "num_threads" is not 1, due to the speed difference of each thread, there will be a shuffle in every num_threads data.
if self.is_training == True:#Then data augmentation == True, shuffle == True.
image = self.data_augmentation(image)
image_batch, label_batch = tf.train.shuffle_batch([image, label], batch_size = self.batch_size, num_threads = 16, capacity = 20400, min_after_dequeue = 20000)
else:
image = tf.image.resize_image_with_crop_or_pad(image, 28, 28)
image_batch, label_batch = tf.train.batch([image, label], batch_size = self.batch_size, num_threads = 16, capacity = 20400)
return image_batch, tf.reshape(label_batch, [self.batch_size, self.num_classes])
#From filename queue to read image and label. Image occupies 32*32*3 bytes, label occupies 1 bytes.
def load_one_sample(self, filename_queue):
image_bytes = 32 * 32 * 3
label_bytes = 1
record_bytes = image_bytes + label_bytes
reader = tf.FixedLengthRecordReader(record_bytes = record_bytes)
key, value = reader.read(filename_queue)
record_bytes = tf.decode_raw(value, tf.uint8)
label = tf.cast(tf.strided_slice(record_bytes, [0], [label_bytes]), tf.int32)
label = tf.one_hot(label, self.num_classes)
image = tf.reshape(tf.strided_slice(record_bytes, [label_bytes], [label_bytes + image_bytes]), [3, 32, 32])
image = tf.transpose(image, [1, 2, 0])
image = tf.cast(image, tf.float32)
return image, tf.reshape(label, [self.num_classes])
def data_augmentation(self, image):
image = tf.random_crop(image, [28, 28, 3])
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, max_delta = 63)
image = tf.image.random_contrast(image, lower = 0.2, upper = 1.8)
image = tf.image.per_image_standardization(image)
return image
2、network.py:坑点为不要忘记在验证集和测试集上关闭dropout。。。。
import tensorflow as tf
class NETWORK(object):
def __init__(self, image_batch, label_batch, keep_prob, num_classes, is_training):
self.image_batch = image_batch
self.label_batch = label_batch
if is_training is True:
self.keep_prob = keep_prob
else:
self.keep_prob = 1
self.num_classes = num_classes
self.logits = self.inference()
self.losses = self.loss()
def inference(self):
conv1_1 = conv(self.image_batch, 3, 64, 1, 'SAME', 0.0004, 'conv1_1')
conv1_2 = conv(conv1_1, 3, 64, 2, 'SAME', None, 'conv1_2')
pool1 = max_pool(conv1_2, 3, 2, 'SAME', 'pool1')
conv2_1 = conv(pool1, 3, 128, 1, 'SAME', 0.0004, 'conv2_1')
conv2_2 = conv(conv2_1, 3, 128, 1, 'SAME', None, 'conv2_2')
pool2 = max_pool(conv2_2, 3, 2, 'SAME', 'pool2')
conv3_1 = conv(pool2, 3, 256, 1, 'SAME', 0.0004, 'conv3_1')
conv3_2 = conv(conv3_1, 3, 256, 1, 'SAME', None, 'conv3_2')
conv3_3 = conv(conv3_2, 3, 256, 1, 'SAME', None, 'conv3_3')
conv3_4 = conv(conv3_3, 3, 256, 1, 'SAME', None, 'conv3_4')
pool3 = max_pool(conv3_4, 3, 2, 'SAME', 'pool3')
conv4_1 = conv(pool3, 3, 512, 1, 'SAME', 0.0004, 'conv4_1')
conv4_2 = conv(conv4_1, 3, 512, 1, 'SAME', None, 'conv4_2')
conv4_3 = conv(conv4_2, 3, 512, 1, 'SAME', None, 'conv4_3')
conv4_4 = conv(conv4_3, 3, 512, 1, 'SAME', None, 'conv4_4')
pool4 = max_pool(conv4_4, 3, 2, 'SAME', 'pool4')
conv5_1 = conv(pool4, 3, 512, 1, 'SAME', 0.0004, 'conv5_1')
conv5_2 = conv(conv5_1, 3, 512, 1, 'SAME', None, 'conv5_2')
conv5_3 = conv(conv5_2, 3, 512, 1, 'SAME', None, 'conv5_3')
conv5_4 = conv(conv5_3, 3, 512, 1, 'SAME', None, 'conv5_4')
pool5 = max_pool(conv5_4, 3, 2, 'SAME', 'pool5')
reshape = tf.reshape(pool5, [pool5.get_shape().as_list()[0], -1])
dim = reshape.get_shape()[1].value
fc6 = fc(reshape, dim, 4096, 0.0004, 'fc6')
dropout6 = dropout(fc6, self.keep_prob)
fc7 = fc(dropout6, 4096, 4096, 0.0004, 'fc7')
dropout7 = dropout(fc7, self.keep_prob)
fc8 = fc(dropout7, 4096, self.num_classes, None, 'fc8')
logits = tf.nn.softmax(fc8)
return logits
def loss(self):
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels = self.label_batch, logits = self.logits, name = 'cross_entropy_per_example')
cross_entropy_mean = tf.reduce_mean(cross_entropy, name = 'cross_entropy')
tf.add_to_collection('losses', cross_entropy_mean)
return tf.add_n(tf.get_collection('losses'), name = 'total_loss')
def _variable_on_cpu(name, shape, initializer):
with tf.device('/cpu:0'):
var = tf.get_variable(name, shape, initializer = initializer, dtype = tf.float32)
return var
def _variable_with_weight_decay(name, shape, stddev, wd):
var = _variable_on_cpu(name, shape, tf.truncated_normal_initializer(stddev = stddev, dtype = tf.float32))
if wd is not None:
weight_decay = tf.multiply(tf.nn.l2_loss(var), wd, name = 'weight_loss')
tf.add_to_collection('losses', weight_decay)
return var
def conv(x, filter_wh, out_channels, stride_yx, padding, wd, name):
in_channels = int(x.get_shape()[-1])
with tf.variable_scope(name) as scope:
kernel = _variable_with_weight_decay('weights', shape = [filter_wh, filter_wh, in_channels, out_channels], stddev = 5e-2, wd = wd)
conv = tf.nn.conv2d(x, kernel, [1, stride_yx, stride_yx, 1], padding = padding)
biases = _variable_on_cpu('biases', [out_channels], tf.constant_initializer(0.0))
pre_activation = tf.nn.bias_add(conv, biases)
conv_result = tf.nn.relu(pre_activation, name=scope.name)
return conv_result
def max_pool(x, ksize_hw, stride_yx, padding, name):
return tf.nn.max_pool(x, ksize = [1, ksize_hw, ksize_hw, 1], strides = [1, stride_yx, stride_yx, 1], padding = padding, name = name)
def lrn(x, depth_radius, alpha, beta, bias, name):
return tf.nn.local_response_normalization(x, depth_radius, bias = bias, alpha = alpha, beta = beta, name = name)
def fc(x, num_in, num_out, wd, name):
with tf.variable_scope(name) as scope:
weights = _variable_with_weight_decay('weights', shape = [num_in, num_out], stddev = 0.04, wd = wd)
biases = _variable_on_cpu('biases', [num_out], tf.constant_initializer(0.1))
fc = tf.nn.xw_plus_b(x, weights, biases)
fc_result = tf.nn.relu(fc, name = scope.name)
return fc_result
def dropout(x, keep_prob):
return tf.nn.dropout(x, keep_prob)
3、train.py:暂无坑点,多GPU并行训练就是在每个GPU上建立tower,并分别计算每个GPU中每个tower的loss和gradient,保留5个checkpoint。
import argparse
import sys, os
import re
import tensorflow as tf
from datetime import datetime
import time
from data_input import data_input
from network import NETWORK
#Calculate the total loss on a tower running the model.
def tower_loss(scope, images, labels, keep_prob, num_classes, is_training):
model = NETWORK(images, labels, keep_prob, num_classes, is_training)
_ = model.loss()
#By using tf.add_to_collection() tf.get_collection() tf.add_n() to assemble all the losses.
losses = tf.get_collection('losses', scope)
total_loss = tf.add_n(losses, name='total_loss')
for l in losses + [total_loss]:
loss_name = re.sub('TOWER_[0-9]*/', '', l.op.name)
tf.summary.scalar(loss_name, l)
return total_loss
#Calculate the average gradient for each shared variable across all towers.
def average_gradients(tower_grads):
average_grads = []
for grad_and_vars in zip(*tower_grads):
grads = []
for g, _ in grad_and_vars:
expanded_g = tf.expand_dims(g, 0)
grads.append(expanded_g)
grad = tf.concat(axis=0, values=grads)
grad = tf.reduce_mean(grad, 0)
v = grad_and_vars[0][1]
grad_and_var = (grad, v)
average_grads.append(grad_and_var)
return average_grads
def train(args):
#CPU is utilized to calculate "mean""update""variable statement".
#GPU is utilized to calculate "loss""gradient".
with tf.Graph().as_default(), tf.device('/cpu: 0'):
#global_step == number of processed batches * number of GPU
global_step = tf.get_variable('global_step', [], initializer = tf.constant_initializer(0), trainable = False)
lr = tf.train.exponential_decay(args.base_lr, global_step, 10000, 0.9, staircase = True)
opt = tf.train.GradientDescentOptimizer(lr)
samples = data_input(args.train_data, args.batch_size, args.num_classes, args.is_training)
batch_queue = tf.contrib.slim.prefetch_queue.prefetch_queue([samples.image_batch, samples.label_batch], capacity = 2 * args.num_gpus)
tower_grads = []
with tf.variable_scope(tf.get_variable_scope()):
for i in range(args.num_gpus):
with tf.device('/gpu: %d' %i):
with tf.name_scope('TOWER_%d' % i) as scope:
images, labels = batch_queue.dequeue()
loss = tower_loss(scope, images, labels, args.dropout_rate, args.num_classes, args.is_training)
tf.get_variable_scope().reuse_variables()
summaries = tf.get_collection(tf.GraphKeys.SUMMARIES, scope)
grads = opt.compute_gradients(loss)
tower_grads.append(grads)
grads = average_gradients(tower_grads)
summaries.append(tf.summary.scalar('learning_rate', lr))
for grad, var in grads:
if grad is not None:
summaries.append(tf.summary.histogram(var.op.name + '/gradients', grad))
apply_gradients_op = opt.apply_gradients(grads, global_step = global_step)
for var in tf.trainable_variables():
summaries.append(tf.summary.histogram(var.op.name, var))
#Track the moving averages of all trainable variables.
variable_averages = tf.train.ExponentialMovingAverage(0.9999, global_step)
variable_averages_op = variable_averages.apply(tf.trainable_variables())
#Group all updates to a single train op.
train_op = tf.group(apply_gradients_op, variable_averages_op)
#saver to save checkpoint file, summary_op to write summary file.
saver = tf.train.Saver(tf.global_variables(), max_to_keep = 5)
summary_op = tf.summary.merge(summaries)
init = tf.global_variables_initializer()
sess = tf.Session(config = tf.ConfigProto(allow_soft_placement = True, log_device_placement = False))
sess.run(init)
#Start queue runner to let data in.
tf.train.start_queue_runners(sess = sess)
summary_writer = tf.summary.FileWriter(args.train_logs, sess.graph)
for step in range(args.max_iter):
start_time = time.time()
_, loss_value = sess.run([train_op, loss])
duration = time.time() - start_time
if step % 10 == 0:
num_examples_per_step = args.batch_size * args.num_gpus
examples_per_sec = num_examples_per_step / duration
sec_per_batch = duration / args.num_gpus
format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f sec/batch)')
print (format_str % (datetime.now(), step, loss_value, examples_per_sec, sec_per_batch))
if step % 100 == 0:
summary_str = sess.run(summary_op)
summary_writer.add_summary(summary_str, step)
if step % 1000 == 0 or (step + 1) == args.max_iter:
checkpoint_path = os.path.join(args.ckpt_dir, 'model.ckpt')
saver.save(sess, checkpoint_path, global_step=step)
def main(args):
#Clean the ancient summary file.
if tf.gfile.Exists(args.train_logs):
tf.gfile.DeleteRecursively(args.train_logs)
tf.gfile.MakeDirs(args.train_logs)
train(args)
def parse_arguments(argv):
parser = argparse.ArgumentParser()
parser.add_argument('--train_data', type = str, help = 'Train data directory.', default = './train_data')
parser.add_argument('--ckpt_dir', type = str, help = 'Directory of trained models.', default = './ckpts')
parser.add_argument('--train_logs', type = str, help = 'Directory for saving summary.', default = './train_logs')
parser.add_argument('--num_gpus', type = int, help = 'Number of GPU.', default = 1)
parser.add_argument('--max_iter', type = int, help = 'Iteration number for training.', default = 100000)
parser.add_argument('--batch_size', type = int, help = 'Batch size.', default = 128)
parser.add_argument('--num_classes', type = int, help = 'Number of classes.', default = 10)
parser.add_argument('--base_lr', type = float, help = 'Base learning rate to start with.', default = 0.01)
parser.add_argument('--weight_decay', type = float, help = 'Weight decay for L2 loss.', default = 0.0001)
parser.add_argument('--momentum', type = float, help = 'Momentum for optimization.', default = 0.9)
parser.add_argument('--dropout_rate', type = float, help = 'Dropout rate.', default = 0.7)
parser.add_argument('--is_training', type = bool, help = 'Training mode.', default = True)
return parser.parse_args(argv)
if __name__ == '__main__':
main(parse_arguments(sys.argv[1: ]))
4、val.py:有个巨大的坑点。。。。那就是千万不要用tf.nn.in_top_k()函数,因为当预测值都为0时,该函数会显示你所有情况都预测对了。。。。
from datetime import datetime
import math, sys
import argparse
import numpy as np
import tensorflow as tf
from data_input import data_input
from network import NETWORK
def validation(args):
with tf.Graph().as_default() as g:
samples = data_input(args.val_data, args.batch_size, args.num_classes, args.is_training)
model = NETWORK(samples.image_batch, samples.label_batch, args.dropout_rate, args.num_classes, args.is_training)
logits = model.logits
#Never use tf.nn.in_top_k() function!
correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(samples.label_batch, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))
#To restore the moving averaged variabele.
variable_averages = tf.train.ExponentialMovingAverage(0.9999)
variables_to_restore = variable_averages.variables_to_restore()
saver = tf.train.Saver(variables_to_restore)
summary_op = tf.summary.merge_all()
summary_writer = tf.summary.FileWriter(args.val_logs, g)
with tf.Session() as sess:
#coord is utilized to coordinate each thread.
coord = tf.train.Coordinator()
try:
threads = []
for qr in tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS):
threads.extend(qr.create_threads(sess, coord = coord, daemon = True, start = True))
#Try to remain top three model by running model on validation dataset.
top_three_ckpt = [[1000, 0.1], [2000, 0.05], [3000, 0.0]]
for global_step in range(2000, 7000, 1000):
ckpt_path = './ckpts/model.ckpt-' + str(global_step)
saver.restore(sess, ckpt_path)
num_iter = int(math.ceil(10000 / args.batch_size))
true_count = 0
step = 0
while step < num_iter and not coord.should_stop():
predictions = sess.run([accuracy])
true_count += np.sum(predictions)
step += 1
precision = true_count / num_iter
print('%s: precision @ 1 = %.3f' % (datetime.now(), precision))
for i in range(len(top_three_ckpt)):
if precision >= top_three_ckpt[i][1]:
top_three_ckpt.insert(i, [int(global_step), precision])
top_three_ckpt.pop()
break
else:
continue
print(top_three_ckpt)
summary = tf.Summary()
summary.ParseFromString(sess.run(summary_op))
summary.value.add(tag='Precision @ 1', simple_value=precision)
summary_writer.add_summary(summary, global_step)
except Exception as e:
coord.request_stop(e)
coord.request_stop()
coord.join(threads, stop_grace_period_secs=10)
def main(args):
if tf.gfile.Exists(args.val_logs):
tf.gfile.DeleteRecursively(args.val_logs)
tf.gfile.MakeDirs(args.val_logs)
validation(args)
def parse_arguments(argv):
parser = argparse.ArgumentParser()
parser.add_argument('--val_data', type = str, help = 'Validation data directory.', default = './val_data')
parser.add_argument('--ckpt_dir', type = str, help = 'Directory of trained models.', default = './ckpts')
parser.add_argument('--val_logs', type = str, help = 'Directory for saving evaluation summary.', default = './val_logs')
parser.add_argument('--num_gpus', type = int, help = 'Number of GPU.', default = 1)
parser.add_argument('--max_iter', type = int, help = 'Iteration number for training.', default = 100000)
parser.add_argument('--batch_size', type = int, help = 'Batch size.', default = 128)
parser.add_argument('--num_classes', type = int, help = 'Number of classes.', default = 10)
parser.add_argument('--base_lr', type = float, help = 'Base learning rate to start with.', default = 0.01)
parser.add_argument('--weight_decay', type = float, help = 'Weight decay for L2 loss.', default = 0.0001)
parser.add_argument('--momentum', type = float, help = 'Momentum for optimization.', default = 0.9)
parser.add_argument('--dropout_rate', type = float, help = 'Dropout rate.', default = 0.7)
parser.add_argument('--is_training', type = bool, help = 'Training mode.', default = False)
return parser.parse_args(argv)
if __name__ == '__main__':
main(parse_arguments(sys.argv[1: ]))
5、toTFRecords.py:暂无坑点,网上有关制作TFRecords格式文件一大堆。。。。就只想说,使用多线程tf.train.string_input_producer()函数是无法读取图像的label的!!!
import tensorflow as tf
from PIL import Image
classes = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
image_path = "./test_data"
TFRecord_path = "./test.tfrecords"
csv_path = "./trainLabels.csv"
def main():
writer = tf.python_io.TFRecordWriter(TFRecord_path)
for i in range(1, 121, 1):
label_int = i
image_name = image_path + '/' + str(i) + '.png'
image = Image.open(image_name)
image_raw = image.tobytes()
example = tf.train.Example(features=tf.train.Features(feature={
"label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label_int])),
'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_raw]))
}))
writer.write(example.SerializeToString())
writer.close()
if __name__ == '__main__':
main()
6、test_tfrecords.py:暂无坑点,老老实实读入tfrecords得到预测logits并写入csv。
import sys, csv
import argparse
import tensorflow as tf
from network import NETWORK
dataset = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
#Read and decode the tfrecords format file.
def read_and_decode(filename):
filename_queue = tf.train.string_input_producer([filename], shuffle = False)
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(serialized_example, features={'label': tf.FixedLenFeature([], tf.int64), 'img_raw' : tf.FixedLenFeature([], tf.string),})
img = tf.decode_raw(features['img_raw'], tf.uint8)
img = tf.reshape(img, [32, 32, 3])
img = tf.cast(img, tf.float32) * (1. / 255) - 0.5
label = tf.cast(features['label'], tf.int32)
return img, label
def test(args):
with tf.Graph().as_default():
#Read image batch and label batch.
img, label = read_and_decode("/home/gz04025/Desktop/Syntax/test.tfrecords")
img_batch, label_batch = tf.train.batch([img, label], batch_size = args.batch_size, capacity = 2000)
#input the data to network and get the corresponding logits.
init = tf.initialize_all_variables()
model = NETWORK(img_batch, label_batch, args.dropout_rate, args.num_classes, args.is_training)
logits = model.logits
num_logits = tf.argmax(logits, 1)
#Restore moving average variables.
variable_averages = tf.train.ExponentialMovingAverage(0.9999)
variables_to_restore = variable_averages.variables_to_restore()
saver = tf.train.Saver(variables_to_restore)
with tf.Session() as sess:
sess.run(init)
#Restore the saved model.
ckpt = tf.train.get_checkpoint_state(args.ckpt_dir)
if ckpt and ckpt.model_checkpoint_path:
saver.restore(sess, ckpt.model_checkpoint_path)
global_step = ckpt.model_checkpoint_path.split('/')[-1].split('-')[-1]
else:
print('No checkpoint file found.')
return
coord = tf.train.Coordinator()
try:
threads = []
for qr in tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS):
threads.extend(qr.create_threads(sess, coord = coord, daemon = True, start = True))
iter_num = 0
while iter_num < (10000 / args.batch_size) and not coord.should_stop():
numpy_num_logits = num_logits.eval(session = sess)
str_logits = []
for val in numpy_num_logits:
str_logits.append(dataset[val])
iter_num += 1
with open('./test.csv', 'w', newline = '') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['id', 'label'])
for i in range(len(str_logits)):
writer.writerow([int(global_step) * args.batch_size + i + 1, str_logits[i]])
csvfile.close()
except Exception as e:
coord.request_stop(e)
coord.request_stop()
coord.join(threads, stop_grace_period_secs=10)
def main(args):
test(args)
def parse_arguments(argv):
parser = argparse.ArgumentParser()
parser.add_argument('--test_data', type = str, help = 'Test data directory.', default = './test_data')
parser.add_argument('--ckpt_dir', type = str, help = 'Directory of trained models.', default = './ckpts')
parser.add_argument('--test_result', type = str, help = 'Directory for saving test result.', default = './')
parser.add_argument('--num_gpus', type = int, help = 'Number of GPU.', default = 1)
parser.add_argument('--max_iter', type = int, help = 'Iteration number for training.', default = 100000)
parser.add_argument('--batch_size', type = int, help = 'Batch size.', default = 12)
parser.add_argument('--num_classes', type = int, help = 'Number of classes.', default = 10)
parser.add_argument('--base_lr', type = float, help = 'Base learning rate to start with.', default = 0.01)
parser.add_argument('--weight_decay', type = float, help = 'Weight decay for L2 loss.', default = 0.0001)
parser.add_argument('--momentum', type = float, help = 'Momentum for optimization.', default = 0.9)
parser.add_argument('--dropout_rate', type = float, help = 'Dropout rate.', default = 0.7)
parser.add_argument('--is_training', type = bool, help = 'Training mode.', default = False)
return parser.parse_args(argv)
if __name__ == '__main__':
main(parse_arguments(sys.argv[1: ]))
下一篇: Flink window 的类型