windows 安装logstash配置、运行mysql、mongodb
1.下载zip
下载地址
https://www.elastic.co/cn/downloads/logstash
本地安装,解压即可。
2. mysql
2.1 配置mysql
需要新建一个xxx.conf文件,在文件中设置数据来源数据库(mysql)和接收数据目标(es)。
input {
# 输出
stdin {
}
# 事件名称event_spcn
jdbc {
# mysql 数据库链接,shop为数据库名
jdbc_connection_string => "jdbc:mysql://xxx.17.xxx.160:3306/sqm_app_dev"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root"
# 驱动
jdbc_driver_library => "../db/mysql-connector-java-5.1.41.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称
statement_filepath => "../db/event_spcn.sql"
#timezone
jdbc_default_timezone => "Asia/Shanghai"
clean_run => "true"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
#type
tags => ["sqm"]
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
if "sqm" in [tags] {
elasticsearch {
hosts => ["xxx.17.xxx.158:9200"]
index => "sqm"
document_id => "%{no}"
}
}
stdout {
codec => json_lines
}
}
此处需要准备两种文件
jdbc_driver_library => “…/db/mysql-connector-java-5.1.41.jar” // mysql 驱动
statement_filepath => “…/db/event_spcn.sql” // 查询语句文件;
- 新建的xxx.conf 放到bin目录下;
- 这里两个文件的路径可以自己定义;
- sql 查询语句自己定义,查询结果将放到es 中 sqm 索引下面;
2.2 启动
打开logstash bin目录,然后CMD;
执行命名即可
logstash -f xxx.conf
3.mongodb
3.1 配置mongodb
需要新建一个xxx_mongo.conf文件,在文件中设置数据来源数据库(mongodb)和接收数据目标(es)。
# 数据源
input {
mongodb {
uri => 'mongodb://xxx.17.xxx.160:27017/pfform'
placeholder_db_dir => '../db'
placeholder_db_name =>'issue.db'
collection => 'issue'
since_column => '_id'
tags => ["process"]
}
}
# 过滤器
filter
{
ruby {
# 设置时间的默认新增和修改时间,默认加8小时
code => "
event.set('createtime', event.get('createtime').time.localtime + 8*60*60)
event.set('updatetime', event.get('updatetime').time.localtime + 8*60*60)
"
}
# 删除时间错和版本信息
mutate {
remove_field =>["@version"]
remove_field =>["@timestamp"]
}
}
# 从把数据写入es;
output {
if "process" in [tags] {
elasticsearch {
hosts => ["xxx.17.xxx.158:9200"]
index => "process"
document_id => "%{id}"
}
}
}
- 此处需要指定一个路径和生成的db文件的文件名
placeholder_db_dir => ‘…/db’ //路径自己定义
placeholder_db_name =>‘issue.db’ // 指定文件名,以.db结尾即可 - xxx_mongo.conf 放到bin目录下;
然后,如上mysql,同样的命令启动logstash,然后就报错了。
因为,缺少mongodb插件
3.2 安装插件
在bin目录下,使用命令查看是否安装插件
logstash-plugin list
或者,在vendor/bundle/jruby/2.5.0/gems目录下可查看安装插件的目录;
在bin目录下,执行安装插件命名
logstash-plugin install logstash-input-mongodb
安装成功或提示 install sucess
3.3 启动
打开logstash bin目录,然后CMD;
执行命名即可
logstash -f xxx_mongo.conf
即可启动logstash ,将mongodb 中的数据抽取到es 的process索引下;
3.4 附加
如果有需要,也可以在logstash 中对抽取的数据进行进一步的处理;
这需要修改logstash中的一个ruby文件,文件位置
mysql
vendor/bundle/jruby/2.3.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/inputs/jdbc.rb
mongodb
vendor/bundle/jruby/2.3.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb
通常不要修改,下面是一个修改的mongodb.rb
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/timestamp"
require "stud/interval"
require "socket" # for Socket.gethostname
require "json"
require "mongo"
include Mongo
class LogStash::Inputs::MongoDB < LogStash::Inputs::Base
config_name "mongodb"
# If undefined, Logstash will complain, even if codec is unused.
default :codec, "plain"
# Example URI: mongodb://mydb.host:27017/mydbname?ssl=true
config :uri, :validate => :string, :required => true
# The directory that will contain the sqlite database file.
config :placeholder_db_dir, :validate => :string, :required => true
# The name of the sqlite databse file
config :placeholder_db_name, :validate => :string, :default => "logstash_sqlite.db"
# Any table to exclude by name
config :exclude_tables, :validate => :array, :default => []
config :batch_size, :avlidate => :number, :default => 30
config :since_table, :validate => :string, :default => "logstash_since"
# This allows you to select the column you would like compare the since info
config :since_column, :validate => :string, :default => "_id"
# This allows you to select the type of since info, like "id", "date"
config :since_type, :validate => :string, :default => "id"
# The collection to use. Is turned into a regex so 'events' will match 'events_20150227'
# Example collection: events_20150227 or events_
config :collection, :validate => :string, :required => true
# This allows you to select the method you would like to use to parse your data
config :parse_method, :validate => :string, :default => 'flatten'
# If not flattening you can dig to flatten select fields
config :dig_fields, :validate => :array, :default => []
# This is the second level of hash flattening
config :dig_dig_fields, :validate => :array, :default => []
# If true, store the @timestamp field in mongodb as an ISODate type instead
# of an ISO8601 string. For more information about this, see
# http://www.mongodb.org/display/DOCS/Dates
config :isodate, :validate => :boolean, :default => false
# Number of seconds to wait after failure before retrying
config :retry_delay, :validate => :number, :default => 3, :required => false
# If true, an "_id" field will be added to the document before insertion.
# The "_id" field will use the timestamp of the event and overwrite an existing
# "_id" field in the event.
config :generateId, :validate => :boolean, :default => false
# The message string to use in the event.
config :message, :validate => :string, :default => "Default message..."
# Set how frequently messages should be sent.
# The default, `1`, means send a message every second.
config :interval, :validate => :number, :default => 1
SINCE_TABLE = :since_table
# todo 方法8 在sqlite 中创建一张临时表
public
def init_placeholder_table(sqlitedb)
# 在sqlite 中创建一张临时表 since_table
begin
sqlitedb.create_table "#{SINCE_TABLE}" do
String :table
String :place
end
# 发生异常,临时表已经存在;
rescue
@logger.debug("since table already exists")
end
end
# todo 方法10 重新初始化:sqlite中 获取表、插入数据
public
def init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
# 获取sqlite 数据库表中指定主键的数据
@logger.debug("init placeholder for #{since_table}_#{mongo_collection_name}")
# 获取整张表的数据
since = sqlitedb[SINCE_TABLE]
# 获取数据库集合
mongo_collection = mongodb.collection(mongo_collection_name)
# 根据主键查询mongodb 文档
first_entry = mongo_collection.find({since_column => {:$regex => @@condition[mongo_collection_name]}}).sort(since_column => 1).limit(1).first
# 如果文档是id
if since_type == 'id'
# 将id转化为字符串
first_entry_id = first_entry[since_column].to_s
# 其他类型,将id转化为整形
else
first_entry_id = first_entry[since_column].to_i
end
# sqlite 数据库表 since_table 中添加数:主键table的值为since_table_mongo_collection_name
# 然后放入一条数据,place字段的值为(主键id)
since.insert(:table => "#{since_table}_#{mongo_collection_name}", :place => first_entry_id)
@logger.info("init placeholder for #{since_table}_#{mongo_collection_name}: #{first_entry}")
# 返回主键id;
return first_entry_id
end
# todo 方法9 xx
public
def get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
# 获取slite数据库表 since_table
since = sqlitedb[SINCE_TABLE]
# 查询sqlite 数据库表中的数据
x = since.where(:table => "#{since_table}_#{mongo_collection_name}")
# 获取表中的总数据量
c = x.count;
#y = x.order(:place).last
# 如果没有数据
if c == 0
# todo 方法10 在sqlite中 插入数据, 返回数据id
first_entry_id = init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
@logger.debug("FIRST ENTRY ID for #{mongo_collection_name} is #{first_entry_id}")
# 返回主键id,和1
return first_entry_id, 1
# 如果有数据
else
# 获取sqlite 数据库表中的数据,按place字段排序,取最后一条数据
y = x.order(:place).last
# 打印输出place属性
@logger.debug("placeholder already exists, it is #{y[:place]}")
return y[:place], 0
end
end
# todo 方法11
public
def update_placeholder(sqlitedb, since_table, mongo_collection_name, place)
# 打印 数据库表
@logger.debug("updating placeholder for #{since_table}_#{mongo_collection_name} to #{place}")
# 获取sqlite 数据库
since = sqlitedb[SINCE_TABLE]
# 更新sqlite数据库中保存的数据
since.where(:table => "#{since_table}_#{mongo_collection_name}").update(:place => place)
end
public
def get_all_tables(mongodb)
return @mongodb.collection_names
end
# todo 方法8 设置数据库名称到collection_names
public
def get_collection_names(mongodb, collection)
# 设置数据库名称到collection_names
collection_names = []
@mongodb.collection_names.each do |coll|
if /#{collection}/ =~ coll
collection_names.push(coll)
@logger.debug("Added #{coll} to the collection list as it matches our collection search")
end
end
return collection_names
end
# todo 方法3:获取应该刷新的数据,文档列表【?逻辑有两处问题】
public
def get_cursor_for_collection(mongodb, is_first_init, mongo_collection_name, last_id_object, batch_size)
# 如果初始化下标为1;
if is_first_init == 1
# 根据集合名称获取集合
collection = mongodb.collection(mongo_collection_name)
# 查询文档列表,查询条件
# 主键since_column, 大于上次查询的标记
# 主键since_column, 匹配开头
# 正序排序,然后截取指定数量的数据,需要取当条数据
# Need to make this sort by date in object id then get the first of the series
# db.events_20150320.find().limit(1).sort({ts:1})
@logger.debug("======》get_cursor_for_collection: get the first")
return collection.find({since_column => {:$gte => last_id_object}, since_column => {:$regex => @@condition[mongo_collection_name]}}).sort({since_column: 1}).limit(batch_size)
# 如果下标不为1
else
collection = mongodb.collection(mongo_collection_name)
# 直接查询,排序,截取数据;
# 不匹配主键的正则表达式;不需要取当条数据
# Need to make this sort by date in object id then get the first of the series
# db.events_20150320.find().limit(1).sort({ts:1})
@logger.debug("======》get_cursor_for_collection: get the next:" + last_id_object)
return collection.find({since_column => {:$gt => last_id_object}}).sort({since_column: 1}).limit(batch_size)
end
end
# todo 方法7 设置集合的抽数信息,用于标记下次抽数
# mongodb: mongodb数据库
# sqlitedb: sqlite数据库
# collection: 拼接查询条件:各个文档主键对应的前缀
public
def update_watched_collections(mongodb, collection, sqlitedb)
# 获取所有需要抽数的集合的名称
collections = get_collection_names(mongodb, collection)
collection_data = {}
# 遍历集合名称
collections.each do |my_collection|
# 如果 集合匹配前缀条件,掠过
if my_collection != collection then
next
end
# 匹配前缀条件,在sqlite 中创建一张临时表
init_placeholder_table(sqlitedb)
# sqlite数据库插入数据(mongodb 数据id),插入成功返回数据id和1、已经存在返回0
last_id, is_first_init = get_placeholder(sqlitedb, since_table, mongodb, my_collection)
# 如果collection_data中不存在集合
if !collection_data[my_collection]
# 在collection_data中放入集合的抽数信息;
collection_data[my_collection] = {:name => my_collection, :last_id => last_id, :is_first_init => is_first_init}
end
end
return collection_data
end
=begin
public
def update_watched_collections(mongodb, collection, sqlitedb)
collection_data = {}
init_placeholder_table(sqlitedb)
last_id, is_first_init = get_placeholder(sqlitedb, since_table, mongodb, collection)
if !collection_data[collection]
collection_data[collection] = { :name => collection, :last_id => last_id , :is_first_init => is_first_init}
end
return collection_data
end
=end
# todo 1.方法 注册
public
def register
# 导包
require "jdbc/sqlite3"
require "sequel"
# 导入数据库路径
placeholder_db_path = File.join(@placeholder_db_dir, @placeholder_db_name)
# 获取数据库连接
conn = Mongo::Client.new(@uri)
# 获取主机名称
@host = Socket.gethostname
@logger.info("Registering MongoDB input")
# 获取mongodb数据库对象
@mongodb = conn.database
# 获取logstash_sqlite数据库对象
@sqlitedb = Sequel.connect("jdbc:sqlite:#{placeholder_db_path}")
# todo 修改1 下面的常量值为修改部分
# mongodb文档名称全名
@@memuHash = {"issue" => "Quality Issue Management", "dfx" => "Quality DFx Review", "kdsa" => "KD SA", "Key_Part_Risk_Assessment" => "Key Part Risk Assessment", "limit_sample" => "Sample Application", "nud" => "NUD Process", "action" => "Action Tracking", "audit" => "Quality Audit", "cqp" => "Component Qualification"}
=begin
@@condition = {"issue" => "/^QIM/", "dfx" => "/^DFX/", "kdsa" => "/^KDSA/", "Key_Part_Risk_Assessment" => "/^KPRA/", "limit_sample" => "/^LS/", "nud" => "/^NUD/", "action" => "/^AT/", "audit" => "/^QA/", "cqp" => "/^CQP/"}
=end
# 拼接查询条件:各个文档主键对应的前缀
# ^DFX(DFx)
@@condition = {"issue" => "^QIM", "dfx" => "^DFX", "kdsa" => "^KDSA", "Key_Part_Risk_Assessment" => "^KPRA", "limit_sample" => "^LS", "nud" => "^NUD", "action" => "^AT", "audit" => "^Audit", "cqp" => "^CQP"}
# 设置集合的抽数信息,用于标记下次抽数
# Should check to see if there are new matching tables at a predefined interval or on some trigger
@collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)
end
class BSON::OrderedHash
def to_h
inject({}) { |acc, element| k, v = element; acc[k] = (
if v.class == BSON::OrderedHash then
v.to_h
else
v
end); acc }
end
def to_json
JSON.parse(self.to_h.to_json, :allow_nan => true)
end
end
def flatten(my_hash)
new_hash = {}
@logger.debug("Raw Hash: #{my_hash}")
if my_hash.respond_to? :each
my_hash.each do |k1, v1|
if v1.is_a?(Hash)
v1.each do |k2, v2|
if v2.is_a?(Hash)
# puts "Found a nested hash"
result = flatten(v2)
result.each do |k3, v3|
new_hash[k1.to_s + "_" + k2.to_s + "_" + k3.to_s] = v3
end
# puts "result: "+result.to_s+" k2: "+k2.to_s+" v2: "+v2.to_s
else
new_hash[k1.to_s + "_" + k2.to_s] = v2
end
end
else
# puts "Key: "+k1.to_s+" is not a hash"
new_hash[k1.to_s] = v1
end
end
else
@logger.debug("Flatten [ERROR]: hash did not respond to :each")
end
@logger.debug("Flattened Hash: #{new_hash}")
return new_hash
end
# todo 2 方法一获取到了需要到数据的文档,此方法,需要增量刷数据;
def run(queue)
# 定义操作间隔时间
sleep_min = 0.01
sleep_max = 5
sleeptime = sleep_min
# 打印集合列表Xxx
@logger.debug("Tailing MongoDB")
@logger.debug("Collection data is: #{@collection_data}")
# stop 为控制开关
while true && !stop?
begin
# 遍历集合列表,定义两个变量,下标和每个集合对象
@collection_data.each do |index, collection|
# 获取集合名称
collection_name = collection[:name]
@logger.debug("collection_data is: #{@collection_data}")
# 获取集合下标初始下标和最后下标
last_id = @collection_data[index][:last_id]
is_first_init = @collection_data[index][:is_first_init]
# 赋值最后下标到最后下标对象,
last_id_object = last_id
# todo 遍历文档主键,默认是id
if since_type == 'id'
# 赋值最后下标到最后下标对象
last_id_object = last_id
# 如果是时间,最后时间不为空,获取最后的下标对象
elsif since_type == 'time'
if last_id != ''
last_id_object = Time.at(last_id)
end
end
# 打印输出最后的对象id值获取时间值
@logger.debug("last_id is: " + last_id)
# todo 方法3:获取应该刷新的数据,
cursor = get_cursor_for_collection(@mongodb, is_first_init, collection_name, last_id_object, batch_size)
# 1 匹配主键正则,包含当条数据;设置 is_first_init 的目的是为了把主键为1的数据加载到,如果不设置会一直加载不到;
# 0 不匹配主键正则,不含当条数据;
# 设置下次取数时不匹配正则;
if is_first_init == 1
@collection_data[index][:is_first_init] = 0
end
# 遍历需要刷新的集合中获取的文档列表
# 定义每个文档对象
cursor.each do |doc|
# 新建一个logstash 进程,用于处理文档
#event = LogStash::Event.new("host" => @host)
event = LogStash::Event.new()
decorate(event)
#log_entry = doc.to_h.to_s
#event.set("log_entry",log_entry.force_encoding(Encoding::UTF_8))
# todo 修改2 下面的设置值为修改部分
# 获取文档的id字段值(t0_i:先转字符串), 设置文档的id字段值;
begin
event.set("id", doc[since_column].to_s)
# 发生异常时,也设置,然后遍历下一个对象;
rescue
event.set("id", doc[since_column].to_s)
next
end
# 设置集合名称
event.set("processname", collection_name)
# 设置文档的fulltext,同时打印
event.set("fulltext", doc['_full_text'].to_s)
@logger.debug("fulltext: " + doc['_full_text'].to_s)
# 获取文档的_app字段值,同时设置和打印
_app = doc['_app']
# 获取appid的值
event.set("appid", _app['appId'].to_s)
@logger.debug("appId: " + _app['appId'].to_s)
# 获取projectId的值
event.set("projectid", _app['projectId'].to_s)
@logger.debug("projectId: " + _app['projectId'].to_s)
# 获取appInstanceId的值
appinstanceid = _app['appInstanceId'].to_s
event.set("appinstanceid", appinstanceid)
@logger.debug("appInstanceId: " + _app['appInstanceId'].to_s)
# todo 查询任务的分配人
approver = find_instance_assigness(@mongodb, appinstanceid)
event.set("approver", approver)
# 设置文档的全名作为菜单的名称
event.set("menu", @@memuHash[collection_name])
# todo 方法4,获取文档各个字段的值,并设置到event中;
setvalue(doc, event, collection_name)
=begin
_files = doc['_files']
if !_files.empty?
_files.each do |file|
fileevent = LogStash::Event.new()
realname = file['realName'].to_s
fileevent.set("type","child_actions")
fileevent.set("fileid",file['id'])
@logger.debug("fileid: "+ file['id'].to_s)
fileevent.set("realname",realname)
fileevent.set("originname",file['originName'])
fileevent.set("processid",doc[since_column].to_s)
# filecontent = read_file_content(realname)
fileevent.set("filecontent","")
#fileevent.set("[relations][name]","file")
#fileevent.set("[relations][parent]",doc[since_column].to_s)
#queue << fileevent
end
end
=end
# 把 数据的事件信息加入到队列中
queue << event
# 获取文档的主键
since_id = doc[since_column]
@logger.debug("======》since_id: " + since_id.to_s)
# 如果主键是id,将id的值转化为字符串
if since_type == 'id'
since_id = doc[since_column].to_s
# 如果是时间类型,转化为int类型
elsif since_type == 'time'
since_id = doc[since_column].to_i
end
# 主键id,记录到抽数成功的主键id,便于下次抽数;
@collection_data[index][:last_id] = since_id
end
# todo 方法11
# Store the last-seen doc in the database
update_placeholder(@sqlitedb, since_table, collection_name, @collection_data[index][:last_id])
end
@logger.debug("Updating watch collections")
# 设置集合的抽数信息,用于标记下次抽数
@collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)
# 抽数完成后休眠
# nothing found in that iteration
# sleep a bit
@logger.debug("No new rows. Sleeping.", :time => sleeptime)
# 0.02s-5s
sleeptime = [sleeptime * 2, sleep_max].min
sleep(sleeptime)
# 发现异常,输出
rescue => e
@logger.warn('MongoDB Input threw an exception, restarting', :exception => e)
end
end
end
# def run
def close
# If needed, use this to tidy up on shutdown
@logger.debug("Shutting down...")
end
# todo 方法4 获取文档的值【?缺少三张表action/audit/cqp】
def setvalue(doc, event, collection)
# 处理issue表
if collection == "issue"
event.set("createby", "")
event.set("createtime", Time.new)
event.set("updateby", "")
event.set("updatetime", Time.new)
# 处理dfx 和Key_Part_Risk_Assessment 表
elsif collection == "dfx" || collection == "Key_Part_Risk_Assessment"
event.set("createby", doc['createBy'])
# todo 方法5 格式化日期
event.set("createtime", format_data(doc['createTime'].to_s))
event.set("updateby", doc['updateBy'])
event.set("updatetime", format_data(doc['updateTime'].to_s))
# 处理kdsa 表
elsif collection == "kdsa"
basic_info = doc['basic_info']
event.set("createby", basic_info['creator'])
event.set("createtime", format_data(basic_info['create_date'].to_s))
event.set("updateby", "")
event.set("updatetime", Time.new)
# 处理limit_sample表
elsif collection == "limit_sample"
info = doc['info']
event.set("createby", "")
event.set("createtime", form_data_other(info['create_date'].to_s))
event.set("updateby", "")
event.set("updatetime", Time.new)
# 处理nud表
elsif collection == "nud"
basic_info = doc['basic_info']
event.set("createby", basic_info['creator'])
event.set("createtime", format_data(basic_info['create_time'].to_s))
event.set("updateby", basic_info['update_by'])
event.set("updatetime", format_data(basic_info['update_time'].to_s))
end
end
# todo 方法5 格式化日期
def format_data(data)
# 截取月份,数据的前两位数
month = data[0..1]
month_int = 0
if month.start_with?('0')
month_int = month[1].to_i
elsif month_int = month.to_i
end
# 截取日期, 数据的3-4位数
day = data[3..4]
day_int = 0
if day.start_with?('0')
day_int = day[1].to_i
elsif day_int = day.to_i
end
# 截取年份,数据的6-9位数
year = data[6..9].to_i
# 截取小时,11-12 位数
h = data[11..12]
# 截取分钟,14-15 位数
m = data[14..15]
# 截取秒,数据的17-18位数
s = data[17..18]
# 返回格式化日期
return Time.mktime(year, month_int, day_int, h, m, s)
end
def form_data_other(data)
month = data[5..6]
month_int = 0
if month.start_with?('0')
month_int = month[1].to_i
elsif month_int = month.to_i
end
day = data[8..9]
day_int = 0
if day.start_with?('0')
day_int = day[1].to_i
elsif day_int = day.to_i
end
year = data[0..3].to_i
h = data[11..12]
m = data[14..15]
s = data[17..18]
return Time.mktime(year, month_int, day_int, h, m, s)
end
# todo 方法6 通过集合和appInstanceId 查询app assignee 任务发布人;
def find_instance_assigness(mongodb, appinstanceid)
# 获取集合
collection = mongodb.collection("app_task_all")
# 获取匹配条件的数据;
# Need to make this sort by date in object id then get the first of the series
# db.events_20150320.find().limit(1).sort({ts:1})
ncount = collection.find("app.appInstanceId" => appinstanceid.to_i).count
# 没有获取到时返回“”
if ncount == 0
return ""
end
# 否则 获取第一个对象
entry = collection.find("app.appInstanceId" => appinstanceid.to_i).first
assigness_array = []
# 获取open 对象的值
done = entry['done']
# 获取 done 对象的值
open = entry['open']
# 遍历open 对象
if open.length != 0
open.each do |item|
# 获取任务的所有发布人的id
assignees = item['assignees']
if !assignees.empty?
# 获取所有的任务发布人的id,放入assigness_array
assignees.each do |id|
assigness_array.push(id)
end
end
end
end
# 获取done 任务中,所有的任务发布人的id,放入assigness_array
if done.length != 0
done.each do |item|
assignees = item['assignees']
if !assignees.empty?
assignees.each do |id|
assigness_array.push(id)
end
end
end
end
# 如果获取到的assigness_array不为空,拼接“,” 返回
if !assigness_array.empty?
return assigness_array.join(",")
end
# 没有获取到,返回“”
return ""
end
end # class LogStash::Inputs::Example
上一篇: python爬虫进阶教程:抖音APP无水印视频批量下载
下一篇: 笔试 CVTE
推荐阅读
-
windows下mysql 5.7.20 安装配置方法图文教程
-
mysql 8.0.12 安装配置方法图文教程(windows10)
-
mysql 8.0.12安装配置方法图文教程(Windows版)
-
mysql 5.7.17 安装配置方法图文教程(windows)
-
Windows 8下MySQL Community Server 5.6安装配置方法图文教程
-
Windows8下mysql 5.6.15 安装配置方法图文教程
-
mysql 5.7.17 免安装版配置方法图文教程(windows10)
-
mysql 5.7.17 安装配置方法图文教程(windows10)
-
windows版本下mysql的安装启动和基础配置图文教程详解
-
Windows10下mysql 5.7.17 安装配置方法图文教程