欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

windows 安装logstash配置、运行mysql、mongodb

程序员文章站 2022-03-28 20:22:57
...

1.下载zip

下载地址
https://www.elastic.co/cn/downloads/logstash
windows 安装logstash配置、运行mysql、mongodb

本地安装,解压即可。

2. mysql

2.1 配置mysql

需要新建一个xxx.conf文件,在文件中设置数据来源数据库(mysql)和接收数据目标(es)。

input {

     # 输出
    stdin {
    }

    # 事件名称event_spcn
    jdbc {
      # mysql 数据库链接,shop为数据库名
      jdbc_connection_string => "jdbc:mysql://xxx.17.xxx.160:3306/sqm_app_dev"
      # 用户名和密码
      jdbc_user => "root"
      jdbc_password => "root"
      # 驱动
      jdbc_driver_library => "../db/mysql-connector-java-5.1.41.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      # 执行的sql 文件路径+名称
      statement_filepath => "../db/event_spcn.sql"
      #timezone
      jdbc_default_timezone => "Asia/Shanghai"
      clean_run => "true"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "* * * * *"
      #type
      tags => ["sqm"]
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
  if "sqm" in [tags] {
    elasticsearch {
      hosts => ["xxx.17.xxx.158:9200"]
      index => "sqm"
      document_id => "%{no}"
    }
  }

   stdout {
      codec => json_lines
   }
}

此处需要准备两种文件
jdbc_driver_library => “…/db/mysql-connector-java-5.1.41.jar” // mysql 驱动
statement_filepath => “…/db/event_spcn.sql” // 查询语句文件;

  1. 新建的xxx.conf 放到bin目录下;
  2. 这里两个文件的路径可以自己定义;
  3. sql 查询语句自己定义,查询结果将放到es 中 sqm 索引下面;

2.2 启动

打开logstash bin目录,然后CMD;
执行命名即可

logstash -f xxx.conf

3.mongodb

3.1 配置mongodb

需要新建一个xxx_mongo.conf文件,在文件中设置数据来源数据库(mongodb)和接收数据目标(es)。

# 数据源
input {
	mongodb {
		uri => 'mongodb://xxx.17.xxx.160:27017/pfform'
		placeholder_db_dir => '../db'
		placeholder_db_name =>'issue.db'
		collection => 'issue'
		since_column => '_id'
		tags => ["process"]
    }
}

# 过滤器
filter
{
	ruby {
	#   设置时间的默认新增和修改时间,默认加8小时
		code => "
			event.set('createtime', event.get('createtime').time.localtime + 8*60*60)
			event.set('updatetime', event.get('updatetime').time.localtime + 8*60*60)
		"
	}

# 删除时间错和版本信息
	mutate {
		remove_field =>["@version"]
		remove_field =>["@timestamp"]
	}
}

# 从把数据写入es;
output {
	if "process" in [tags] {
		elasticsearch {
		   hosts => ["xxx.17.xxx.158:9200"] 
		   index => "process"
		   document_id => "%{id}"
		}
	}
}
  1. 此处需要指定一个路径和生成的db文件的文件名
    placeholder_db_dir => ‘…/db’ //路径自己定义
    placeholder_db_name =>‘issue.db’ // 指定文件名,以.db结尾即可
  2. xxx_mongo.conf 放到bin目录下;

然后,如上mysql,同样的命令启动logstash,然后就报错了。
因为,缺少mongodb插件

3.2 安装插件

在bin目录下,使用命令查看是否安装插件

logstash-plugin list

或者,在vendor/bundle/jruby/2.5.0/gems目录下可查看安装插件的目录;
windows 安装logstash配置、运行mysql、mongodb

在bin目录下,执行安装插件命名

logstash-plugin install logstash-input-mongodb

安装成功或提示 install sucess

3.3 启动

打开logstash bin目录,然后CMD;
执行命名即可

logstash -f xxx_mongo.conf

即可启动logstash ,将mongodb 中的数据抽取到es 的process索引下;

3.4 附加

如果有需要,也可以在logstash 中对抽取的数据进行进一步的处理;
这需要修改logstash中的一个ruby文件,文件位置
mysql

vendor/bundle/jruby/2.3.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/inputs/jdbc.rb

mongodb

vendor/bundle/jruby/2.3.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb

通常不要修改,下面是一个修改的mongodb.rb

# encoding: utf-8

require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/timestamp"
require "stud/interval"
require "socket" # for Socket.gethostname
require "json"
require "mongo"

include Mongo

class LogStash::Inputs::MongoDB < LogStash::Inputs::Base
  config_name "mongodb"

  # If undefined, Logstash will complain, even if codec is unused.
  default :codec, "plain"

  # Example URI: mongodb://mydb.host:27017/mydbname?ssl=true
  config :uri, :validate => :string, :required => true

  # The directory that will contain the sqlite database file.
  config :placeholder_db_dir, :validate => :string, :required => true

  # The name of the sqlite databse file
  config :placeholder_db_name, :validate => :string, :default => "logstash_sqlite.db"

  # Any table to exclude by name
  config :exclude_tables, :validate => :array, :default => []

  config :batch_size, :avlidate => :number, :default => 30

  config :since_table, :validate => :string, :default => "logstash_since"

  # This allows you to select the column you would like compare the since info
  config :since_column, :validate => :string, :default => "_id"

  # This allows you to select the type of since info, like "id", "date"
  config :since_type, :validate => :string, :default => "id"

  # The collection to use. Is turned into a regex so 'events' will match 'events_20150227'
  # Example collection: events_20150227 or events_
  config :collection, :validate => :string, :required => true

  # This allows you to select the method you would like to use to parse your data
  config :parse_method, :validate => :string, :default => 'flatten'

  # If not flattening you can dig to flatten select fields
  config :dig_fields, :validate => :array, :default => []

  # This is the second level of hash flattening
  config :dig_dig_fields, :validate => :array, :default => []

  # If true, store the @timestamp field in mongodb as an ISODate type instead
  # of an ISO8601 string.  For more information about this, see
  # http://www.mongodb.org/display/DOCS/Dates
  config :isodate, :validate => :boolean, :default => false

  # Number of seconds to wait after failure before retrying
  config :retry_delay, :validate => :number, :default => 3, :required => false

  # If true, an "_id" field will be added to the document before insertion.
  # The "_id" field will use the timestamp of the event and overwrite an existing
  # "_id" field in the event.
  config :generateId, :validate => :boolean, :default => false

  # The message string to use in the event.
  config :message, :validate => :string, :default => "Default message..."

  # Set how frequently messages should be sent.
  # The default, `1`, means send a message every second.
  config :interval, :validate => :number, :default => 1

  SINCE_TABLE = :since_table


  # todo 方法8 在sqlite 中创建一张临时表
  public
  def init_placeholder_table(sqlitedb)

    # 在sqlite 中创建一张临时表 since_table
    begin
      sqlitedb.create_table "#{SINCE_TABLE}" do
        String :table
        String :place
      end

    #  发生异常,临时表已经存在;
    rescue
      @logger.debug("since table already exists")
    end
  end


  # todo 方法10 重新初始化:sqlite中 获取表、插入数据
  public
  def init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)

    # 获取sqlite 数据库表中指定主键的数据
    @logger.debug("init placeholder for #{since_table}_#{mongo_collection_name}")

    # 获取整张表的数据
    since = sqlitedb[SINCE_TABLE]

    # 获取数据库集合
    mongo_collection = mongodb.collection(mongo_collection_name)

    # 根据主键查询mongodb 文档
    first_entry = mongo_collection.find({since_column => {:$regex => @@condition[mongo_collection_name]}}).sort(since_column => 1).limit(1).first

    # 如果文档是id
    if since_type == 'id'
      # 将id转化为字符串
      first_entry_id = first_entry[since_column].to_s

    #   其他类型,将id转化为整形
    else
      first_entry_id = first_entry[since_column].to_i
    end

    # sqlite 数据库表 since_table 中添加数:主键table的值为since_table_mongo_collection_name
    # 然后放入一条数据,place字段的值为(主键id)
    since.insert(:table => "#{since_table}_#{mongo_collection_name}", :place => first_entry_id)
    @logger.info("init placeholder for #{since_table}_#{mongo_collection_name}: #{first_entry}")

    # 返回主键id;
    return first_entry_id
  end


  # todo 方法9 xx
  public
  def get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)

    # 获取slite数据库表 since_table
    since = sqlitedb[SINCE_TABLE]

    # 查询sqlite 数据库表中的数据
    x = since.where(:table => "#{since_table}_#{mongo_collection_name}")

    # 获取表中的总数据量
    c = x.count;
    #y = x.order(:place).last

    # 如果没有数据
    if c == 0

      # todo 方法10 在sqlite中 插入数据, 返回数据id
      first_entry_id = init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
      @logger.debug("FIRST ENTRY ID for #{mongo_collection_name} is #{first_entry_id}")

      # 返回主键id,和1
      return first_entry_id, 1

    #  如果有数据
    else

      # 获取sqlite 数据库表中的数据,按place字段排序,取最后一条数据
      y = x.order(:place).last

      # 打印输出place属性
      @logger.debug("placeholder already exists, it is #{y[:place]}")
      return y[:place], 0
    end
  end


  # todo 方法11
  public
  def update_placeholder(sqlitedb, since_table, mongo_collection_name, place)

    # 打印 数据库表
    @logger.debug("updating placeholder for #{since_table}_#{mongo_collection_name} to #{place}")

    # 获取sqlite 数据库
    since = sqlitedb[SINCE_TABLE]

    # 更新sqlite数据库中保存的数据
    since.where(:table => "#{since_table}_#{mongo_collection_name}").update(:place => place)
  end


  public
  def get_all_tables(mongodb)
    return @mongodb.collection_names
  end

  # todo 方法8 设置数据库名称到collection_names
  public

  def get_collection_names(mongodb, collection)

    # 设置数据库名称到collection_names
    collection_names = []
    @mongodb.collection_names.each do |coll|
      if /#{collection}/ =~ coll
        collection_names.push(coll)
        @logger.debug("Added #{coll} to the collection list as it matches our collection search")
      end
    end
    return collection_names
  end

  # todo 方法3:获取应该刷新的数据,文档列表【?逻辑有两处问题】

  public

  def get_cursor_for_collection(mongodb, is_first_init, mongo_collection_name, last_id_object, batch_size)

    # 如果初始化下标为1;
    if is_first_init == 1

      #  根据集合名称获取集合
      collection = mongodb.collection(mongo_collection_name)

      # 查询文档列表,查询条件
      # 主键since_column, 大于上次查询的标记
      # 主键since_column, 匹配开头
      # 正序排序,然后截取指定数量的数据,需要取当条数据
      # Need to make this sort by date in object id then get the first of the series
      # db.events_20150320.find().limit(1).sort({ts:1})
      @logger.debug("======》get_cursor_for_collection: get the first")
      return collection.find({since_column => {:$gte => last_id_object}, since_column => {:$regex => @@condition[mongo_collection_name]}}).sort({since_column: 1}).limit(batch_size)

      # 如果下标不为1
    else
      collection = mongodb.collection(mongo_collection_name)

      # 直接查询,排序,截取数据;
      # 不匹配主键的正则表达式;不需要取当条数据
      # Need to make this sort by date in object id then get the first of the series
      # db.events_20150320.find().limit(1).sort({ts:1})
      @logger.debug("======》get_cursor_for_collection: get the next:" + last_id_object)
      return collection.find({since_column => {:$gt => last_id_object}}).sort({since_column: 1}).limit(batch_size)
    end
  end

  # todo 方法7 设置集合的抽数信息,用于标记下次抽数
  # mongodb: mongodb数据库
  # sqlitedb: sqlite数据库
  # collection: 拼接查询条件:各个文档主键对应的前缀
  public
  def update_watched_collections(mongodb, collection, sqlitedb)

    # 获取所有需要抽数的集合的名称
    collections = get_collection_names(mongodb, collection)
    collection_data = {}

    # 遍历集合名称
    collections.each do |my_collection|

      #  如果 集合匹配前缀条件,掠过
      if my_collection != collection then
        next
      end

      # 匹配前缀条件,在sqlite 中创建一张临时表
      init_placeholder_table(sqlitedb)

      # sqlite数据库插入数据(mongodb 数据id),插入成功返回数据id和1、已经存在返回0
      last_id, is_first_init = get_placeholder(sqlitedb, since_table, mongodb, my_collection)

      # 如果collection_data中不存在集合
      if !collection_data[my_collection]

        # 在collection_data中放入集合的抽数信息;
        collection_data[my_collection] = {:name => my_collection, :last_id => last_id, :is_first_init => is_first_init}
      end
    end
    return collection_data
  end

=begin
  public
  def update_watched_collections(mongodb, collection, sqlitedb)
      collection_data = {}
	  init_placeholder_table(sqlitedb)
	  last_id, is_first_init = get_placeholder(sqlitedb, since_table, mongodb, collection)
	  if !collection_data[collection]
		collection_data[collection] = { :name => collection, :last_id => last_id , :is_first_init => is_first_init}
	  end
    return collection_data
  end
=end

  # todo 1.方法 注册

  public
  def register

    # 导包
    require "jdbc/sqlite3"
    require "sequel"

    # 导入数据库路径
    placeholder_db_path = File.join(@placeholder_db_dir, @placeholder_db_name)

    # 获取数据库连接
    conn = Mongo::Client.new(@uri)

    # 获取主机名称
    @host = Socket.gethostname
    @logger.info("Registering MongoDB input")

    # 获取mongodb数据库对象
    @mongodb = conn.database
    # 获取logstash_sqlite数据库对象
    @sqlitedb = Sequel.connect("jdbc:sqlite:#{placeholder_db_path}")

    # todo 修改1 下面的常量值为修改部分
    # mongodb文档名称全名
    @@memuHash = {"issue" => "Quality Issue Management", "dfx" => "Quality DFx Review", "kdsa" => "KD SA", "Key_Part_Risk_Assessment" => "Key Part Risk Assessment", "limit_sample" => "Sample Application", "nud" => "NUD Process", "action" => "Action Tracking", "audit" => "Quality Audit", "cqp" => "Component Qualification"}
=begin	
	@@condition = {"issue" => "/^QIM/", "dfx" => "/^DFX/", "kdsa" => "/^KDSA/", "Key_Part_Risk_Assessment" => "/^KPRA/", "limit_sample" => "/^LS/", "nud" => "/^NUD/", "action" => "/^AT/", "audit" => "/^QA/", "cqp" => "/^CQP/"}
=end

    # 拼接查询条件:各个文档主键对应的前缀
    # ^DFX(DFx)
    @@condition = {"issue" => "^QIM", "dfx" => "^DFX", "kdsa" => "^KDSA", "Key_Part_Risk_Assessment" => "^KPRA", "limit_sample" => "^LS", "nud" => "^NUD", "action" => "^AT", "audit" => "^Audit", "cqp" => "^CQP"}

    # 设置集合的抽数信息,用于标记下次抽数
    # Should check to see if there are new matching tables at a predefined interval or on some trigger
    @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)
  end


  class BSON::OrderedHash
    def to_h
      inject({}) { |acc, element| k, v = element; acc[k] = (
      if v.class == BSON::OrderedHash then
        v.to_h
      else
        v
      end); acc }
    end

    def to_json
      JSON.parse(self.to_h.to_json, :allow_nan => true)
    end
  end

  def flatten(my_hash)
    new_hash = {}
    @logger.debug("Raw Hash: #{my_hash}")
    if my_hash.respond_to? :each
      my_hash.each do |k1, v1|
        if v1.is_a?(Hash)
          v1.each do |k2, v2|
            if v2.is_a?(Hash)
              # puts "Found a nested hash"
              result = flatten(v2)
              result.each do |k3, v3|
                new_hash[k1.to_s + "_" + k2.to_s + "_" + k3.to_s] = v3
              end
              # puts "result: "+result.to_s+" k2: "+k2.to_s+" v2: "+v2.to_s
            else
              new_hash[k1.to_s + "_" + k2.to_s] = v2
            end
          end
        else
          # puts "Key: "+k1.to_s+" is not a hash"
          new_hash[k1.to_s] = v1
        end
      end
    else
      @logger.debug("Flatten [ERROR]: hash did not respond to :each")
    end
    @logger.debug("Flattened Hash: #{new_hash}")
    return new_hash
  end


  # todo 2 方法一获取到了需要到数据的文档,此方法,需要增量刷数据;
  def run(queue)

    # 定义操作间隔时间
    sleep_min = 0.01
    sleep_max = 5
    sleeptime = sleep_min

    # 打印集合列表Xxx
    @logger.debug("Tailing MongoDB")
    @logger.debug("Collection data is: #{@collection_data}")

    # stop 为控制开关
    while true && !stop?
      begin

        # 遍历集合列表,定义两个变量,下标和每个集合对象
        @collection_data.each do |index, collection|

          # 获取集合名称
          collection_name = collection[:name]
          @logger.debug("collection_data is: #{@collection_data}")

          # 获取集合下标初始下标和最后下标
          last_id = @collection_data[index][:last_id]
          is_first_init = @collection_data[index][:is_first_init]

          # 赋值最后下标到最后下标对象,
          last_id_object = last_id

          # todo 遍历文档主键,默认是id
          if since_type == 'id'
            #    赋值最后下标到最后下标对象
            last_id_object = last_id

            # 如果是时间,最后时间不为空,获取最后的下标对象
          elsif since_type == 'time'
            if last_id != ''
              last_id_object = Time.at(last_id)
            end
          end

          #    打印输出最后的对象id值获取时间值
          @logger.debug("last_id is: " + last_id)

          #    todo 方法3:获取应该刷新的数据,
          cursor = get_cursor_for_collection(@mongodb, is_first_init, collection_name, last_id_object, batch_size)

          # 1 匹配主键正则,包含当条数据;设置 is_first_init 的目的是为了把主键为1的数据加载到,如果不设置会一直加载不到;
          # 0 不匹配主键正则,不含当条数据;
          # 设置下次取数时不匹配正则;
          if is_first_init == 1
            @collection_data[index][:is_first_init] = 0
          end

          # 遍历需要刷新的集合中获取的文档列表
          # 定义每个文档对象
          cursor.each do |doc|

            # 新建一个logstash 进程,用于处理文档
            #event = LogStash::Event.new("host" => @host)
            event = LogStash::Event.new()
            decorate(event)
            #log_entry = doc.to_h.to_s

            #event.set("log_entry",log_entry.force_encoding(Encoding::UTF_8))

            # todo 修改2 下面的设置值为修改部分
            #   获取文档的id字段值(t0_i:先转字符串),   设置文档的id字段值;
            begin
              event.set("id", doc[since_column].to_s)

                #  发生异常时,也设置,然后遍历下一个对象;
            rescue
              event.set("id", doc[since_column].to_s)
              next
            end

            #      设置集合名称
            event.set("processname", collection_name)

            #      设置文档的fulltext,同时打印
            event.set("fulltext", doc['_full_text'].to_s)
            @logger.debug("fulltext: " + doc['_full_text'].to_s)

            #      获取文档的_app字段值,同时设置和打印
            _app = doc['_app']

            #      获取appid的值
            event.set("appid", _app['appId'].to_s)
            @logger.debug("appId: " + _app['appId'].to_s)

            #      获取projectId的值
            event.set("projectid", _app['projectId'].to_s)
            @logger.debug("projectId: " + _app['projectId'].to_s)

            #      获取appInstanceId的值
            appinstanceid = _app['appInstanceId'].to_s
            event.set("appinstanceid", appinstanceid)
            @logger.debug("appInstanceId: " + _app['appInstanceId'].to_s)

            #      todo 查询任务的分配人
            approver = find_instance_assigness(@mongodb, appinstanceid)
            event.set("approver", approver)

            #      设置文档的全名作为菜单的名称
            event.set("menu", @@memuHash[collection_name])

            #      todo 方法4,获取文档各个字段的值,并设置到event中;
            setvalue(doc, event, collection_name)
=begin			
			_files = doc['_files']
			if !_files.empty?
				_files.each do |file|
					fileevent = LogStash::Event.new()
					realname = file['realName'].to_s
					fileevent.set("type","child_actions")
				    fileevent.set("fileid",file['id'])
					@logger.debug("fileid: "+ file['id'].to_s)
					fileevent.set("realname",realname)
					fileevent.set("originname",file['originName'])
					fileevent.set("processid",doc[since_column].to_s)	
					# filecontent = read_file_content(realname)
					fileevent.set("filecontent","")
					#fileevent.set("[relations][name]","file")
					#fileevent.set("[relations][parent]",doc[since_column].to_s)
					#queue << fileevent
				end
			end
=end

            # 把 数据的事件信息加入到队列中
            queue << event

            # 获取文档的主键
            since_id = doc[since_column]
            @logger.debug("======》since_id: " + since_id.to_s)

            # 如果主键是id,将id的值转化为字符串
            if since_type == 'id'
              since_id = doc[since_column].to_s

            #   如果是时间类型,转化为int类型
            elsif since_type == 'time'
              since_id = doc[since_column].to_i
            end

            # 主键id,记录到抽数成功的主键id,便于下次抽数;
            @collection_data[index][:last_id] = since_id
          end

          # todo 方法11
          # Store the last-seen doc in the database
          update_placeholder(@sqlitedb, since_table, collection_name, @collection_data[index][:last_id])
        end
        @logger.debug("Updating watch collections")

        # 设置集合的抽数信息,用于标记下次抽数
        @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)

        # 抽数完成后休眠
        # nothing found in that iteration
        # sleep a bit
        @logger.debug("No new rows. Sleeping.", :time => sleeptime)

        # 0.02s-5s
        sleeptime = [sleeptime * 2, sleep_max].min
        sleep(sleeptime)

      #   发现异常,输出
      rescue => e
        @logger.warn('MongoDB Input threw an exception, restarting', :exception => e)
      end
    end
  end
  # def run


  def close
    # If needed, use this to tidy up on shutdown
    @logger.debug("Shutting down...")
  end

  # todo 方法4 获取文档的值【?缺少三张表action/audit/cqp】
  def setvalue(doc, event, collection)

    # 处理issue表
    if collection == "issue"
      event.set("createby", "")
      event.set("createtime", Time.new)
      event.set("updateby", "")
      event.set("updatetime", Time.new)

      #   处理dfx 和Key_Part_Risk_Assessment 表
    elsif collection == "dfx" || collection == "Key_Part_Risk_Assessment"
      event.set("createby", doc['createBy'])
      # todo 方法5 格式化日期
      event.set("createtime", format_data(doc['createTime'].to_s))
      event.set("updateby", doc['updateBy'])
      event.set("updatetime", format_data(doc['updateTime'].to_s))

      #  处理kdsa 表
    elsif collection == "kdsa"
      basic_info = doc['basic_info']
      event.set("createby", basic_info['creator'])
      event.set("createtime", format_data(basic_info['create_date'].to_s))
      event.set("updateby", "")
      event.set("updatetime", Time.new)

      #  处理limit_sample表
    elsif collection == "limit_sample"
      info = doc['info']
      event.set("createby", "")
      event.set("createtime", form_data_other(info['create_date'].to_s))
      event.set("updateby", "")
      event.set("updatetime", Time.new)

      #  处理nud表
    elsif collection == "nud"
      basic_info = doc['basic_info']
      event.set("createby", basic_info['creator'])
      event.set("createtime", format_data(basic_info['create_time'].to_s))
      event.set("updateby", basic_info['update_by'])
      event.set("updatetime", format_data(basic_info['update_time'].to_s))
    end
  end

  # todo 方法5 格式化日期
  def format_data(data)

    # 截取月份,数据的前两位数
    month = data[0..1]
    month_int = 0
    if month.start_with?('0')
      month_int = month[1].to_i
    elsif month_int = month.to_i
    end

    # 截取日期, 数据的3-4位数
    day = data[3..4]
    day_int = 0
    if day.start_with?('0')
      day_int = day[1].to_i
    elsif day_int = day.to_i
    end

    # 截取年份,数据的6-9位数
    year = data[6..9].to_i

    # 截取小时,11-12 位数
    h = data[11..12]

    # 截取分钟,14-15 位数
    m = data[14..15]

    # 截取秒,数据的17-18位数
    s = data[17..18]

    # 返回格式化日期
    return Time.mktime(year, month_int, day_int, h, m, s)
  end


  def form_data_other(data)
    month = data[5..6]
    month_int = 0
    if month.start_with?('0')
      month_int = month[1].to_i
    elsif month_int = month.to_i
    end
    day = data[8..9]
    day_int = 0
    if day.start_with?('0')
      day_int = day[1].to_i
    elsif day_int = day.to_i
    end
    year = data[0..3].to_i
    h = data[11..12]
    m = data[14..15]
    s = data[17..18]

    return Time.mktime(year, month_int, day_int, h, m, s)
  end

  # todo 方法6 通过集合和appInstanceId 查询app assignee 任务发布人;
  def find_instance_assigness(mongodb, appinstanceid)

    # 获取集合
    collection = mongodb.collection("app_task_all")

    # 获取匹配条件的数据;
    # Need to make this sort by date in object id then get the first of the series
    # db.events_20150320.find().limit(1).sort({ts:1})
    ncount = collection.find("app.appInstanceId" => appinstanceid.to_i).count

    # 没有获取到时返回“”
    if ncount == 0
      return ""
    end

    # 否则 获取第一个对象
    entry = collection.find("app.appInstanceId" => appinstanceid.to_i).first

    assigness_array = []

    # 获取open 对象的值
    done = entry['done']

    # 获取 done 对象的值
    open = entry['open']

    # 遍历open 对象
    if open.length != 0
      open.each do |item|

        # 获取任务的所有发布人的id
        assignees = item['assignees']
        if !assignees.empty?

          # 获取所有的任务发布人的id,放入assigness_array
          assignees.each do |id|
            assigness_array.push(id)
          end
        end
      end
    end

    # 获取done 任务中,所有的任务发布人的id,放入assigness_array
    if done.length != 0
      done.each do |item|
        assignees = item['assignees']
        if !assignees.empty?
          assignees.each do |id|
            assigness_array.push(id)
          end
        end
      end
    end

    # 如果获取到的assigness_array不为空,拼接“,” 返回
    if !assigness_array.empty?
      return assigness_array.join(",")
    end

    # 没有获取到,返回“”
    return ""
  end

end # class LogStash::Inputs::Example