(6) - Quartz项目实战
本片博文将阐述项目工作中使用Quartz的情况,包含项目背景、项目框架、Quartz集群部署等方面,重点讲述如何在实际项目中使用Quartz。
1. 背景
因项目需求,需要定时调用数据下载接口,并将数据存储至诸如mongo、redis、elasticsearch等数据库或缓存中。具体涉及到的需求如下:
a. 调用接口的任务均从mongo数据库读取;
b. 任务的个数随着业务量的增加而增加;
c. 每个调用任务的定时执行时间可能不同,且定时执行时间在mongo中可配置;
d. 任务的执行需要动态更新,如检测到某一任务的定时时间发生变化,则任务的执行也需要实时修改
e. mongo、redis、elasticsearch等数据库中所存储的字段也由mongo进行配置;
f. 任务执行需要实时性较高、可靠性较强、可扩展性较高等
综上需求,调研了一番,发现任务调度框架Quartz可满足项目需求。
2. 框架
基于项目的需求,结合任务调度框架Quartz,大体的流程框架如下图所示:
1) 首先构建从mongo加载任务
2) 将任务的配置信息初始化至Quartz
3) 通过Quartz的Job任务实现定时调用下载接口任务
4) 将下载的数据依据配置,存储至数据库中
5) 定时检测任务通过定时扫描mongo数据库,查看相关任务信息的配置是否发生变化,如果发生变化,则进行动态更新
6) 为了实现高可用性、可扩展性,可以直接使用Quartz原生的集群特性。
3. 核心代码
核心代码将会涵盖上述流程图中的相关环节,为了项目的保密性,相关信息也会隐藏。
3.1 任务主流程
public class SchedulerRunner {
static Logger logger = LoggerFactory.getLogger(SchedulerRunner.class);
public static void main(String[] args) {
// 加载日志配置文件
PropertyConfigurator.configure("./conf/log4j.properties");
// 加载quartz配置文件
System.setProperty("org.quartz.properties", "./conf/quartz.properties");
// 执行任务解析与调度
run();
}
public static void run(){
// 获取配置信息表
List<TaskInfo> taskInfos = GenerateTaskInfo.generateTaskInfoFromMysql();
if(taskInfos.size() == 0){
logger.info("there is no tasks from mongoInfo");
return;
}
// 过滤下线任务
taskInfos = GenerateTaskInfo.filterTask(taskInfos);
if(taskInfos.size() == 0){
logger.info("all tasks if offline, no need to run");
return;
}
Scheduler scheduler = null;
try {
scheduler = StdSchedulerFactory.getDefaultScheduler();
} catch (SchedulerException e) {
e.printStackTrace();
}
if(scheduler == null){
logger.error("create scheduler failed");
return;
}
if(isSchedulerClear()){
clearSchedulerJob(scheduler);
}
// 加入任务调度
for(TaskInfo task : taskInfos){
SchedulerFactory.addJob2Scheduler(task, scheduler);
}
// 加入动态更新任务
SchedulerFactory.addDynamicUpdateJob2Scheduler(scheduler);
// 开启任务
try {
scheduler.start();
} catch (SchedulerException e) {
logger.error("start scheduler error!");
}
}
public static void clearSchedulerJob(Scheduler scheduler){
try {
scheduler.clear();
} catch (SchedulerException e) {
logger.error("clear scheduler error!");
}
}
/**
* 基于配置文件中的信息,加载调度器开始运行时的清洗标识
* @return
*/
private static boolean isSchedulerClear(){
Configuration conf = OcpConfHelper.getInstance().getOcpConf();
return conf.getBooleanValue("cleanSchedulerFlag", "true");
}
}
3.2 封装任务对象
public class TaskInfo {
protected String categoryId; // 业务Id
protected String categoryName; // 业务名称
protected String sourceId; // 信源Id
protected String sourceName; // 信源名称
protected int sourceStatus; // 信源状态
protected String pipelineConf; // 信源pipeline配置信息
protected List<String> dbStoreTypes; // 业务的存储类型
protected String esConfInfo; // ES存储配置
protected String dbConfInfo; // DB存储配置
protected String cronInfo; // 定时任务信息
protected int sourceType; // 实时更新还是离线更新
protected List<String> indexBuildEles; // 更新索引的信息
protected List<String> idBuildEles; // id的构建因素
protected String indexType; // 全量或增量
protected String categoryLevel1; // 一级分类
protected String zhName; // 中文信息
protected Map<String,String> outputType; //输出参数名及其类型
protected String providerName;
protected String functionName; //category_function名称
public String getProviderName() {
return providerName;
}
public void setProviderName(String providerName) {
this.providerName = providerName;
}
public String getCategoryId() {
return categoryId;
}
public void setCategoryId(String categoryId) {
this.categoryId = categoryId;
}
public String getCategoryName() {
return categoryName;
}
public void setCategoryName(String categoryName) {
this.categoryName = categoryName;
}
public String getSourceId() {
return sourceId;
}
public void setSourceId(String sourceId) {
this.sourceId = sourceId;
}
public String getSourceName() {
return sourceName;
}
public void setSourceName(String sourceName) {
this.sourceName = sourceName;
}
public int getSourceStatus() {
return sourceStatus;
}
public void setSourceStatus(int sourceStatus) {
this.sourceStatus = sourceStatus;
}
public String getPipelineConf() {
return pipelineConf;
}
public void setPipelineConf(String pipelineConf) {
this.pipelineConf = pipelineConf;
}
public String getEsConfInfo() {
return esConfInfo;
}
public void setEsConfInfo(String esConfInfo) {
this.esConfInfo = esConfInfo;
}
public String getDbConfInfo() {
return dbConfInfo;
}
public void setDbConfInfo(String dbConfInfo) {
this.dbConfInfo = dbConfInfo;
}
public String getCronInfo() {
return cronInfo;
}
public void setCronInfo(String cronInfo) {
this.cronInfo = cronInfo;
}
public int getSourceType() {
return sourceType;
}
public void setSourceType(int sourceType) {
this.sourceType = sourceType;
}
public List<String> getIdBuildEles() {
return idBuildEles;
}
public void setIdBuildEles(List<String> idBuildEles) {
this.idBuildEles = idBuildEles;
}
public List<String> getIndexBuildEles() {
return indexBuildEles;
}
public void setIndexBuildEles(List<String> indexBuildEles) {
this.indexBuildEles = indexBuildEles;
}
public String getIndexType() {
return indexType;
}
public void setIndexType(String indexType) {
this.indexType = indexType;
}
public String getCategoryLevel1() {
return categoryLevel1;
}
public void setCategoryLevel1(String categoryLevel1) {
this.categoryLevel1 = categoryLevel1;
}
public String getZhName() {
return zhName;
}
public void setZhName(String zhName) {
this.zhName = zhName;
}
public TaskInfo(){}
public List<String> getDbStoreTypes() {
return dbStoreTypes;
}
public void setDbStoreTypes(List<String> dbStoreTypes) {
this.dbStoreTypes = dbStoreTypes;
}
public Map<String, String> getOutputType() {
return outputType;
}
public void setOutputType(Map<String, String> outputType) {
this.outputType = outputType;
}
public String getFunctionName() {
return functionName;
}
public void setFunctionName(String functionName) {
this.functionName = functionName;
}
/**
* 是否有相同的定时信息
* @param taskInfo
* @return
*/
public boolean hasSameCronInfo(TaskInfo taskInfo){
if(taskInfo == null) return false;
return this.getCronInfo().equalsIgnoreCase(taskInfo.getCronInfo());
}
@Override
public String toString() {
return "TaskInfo{" +
"categoryId='" + categoryId + '\'' +
", categoryName='" + categoryName + '\'' +
", sourceId='" + sourceId + '\'' +
", sourceName='" + sourceName + '\'' +
", sourceStatus=" + sourceStatus +
", pipelineConf='" + pipelineConf + '\'' +
", dbStoreTypes=" + dbStoreTypes +
", esConfInfo='" + esConfInfo + '\'' +
", dbConfInfo='" + dbConfInfo + '\'' +
", cronInfo='" + cronInfo + '\'' +
", sourceType=" + sourceType +
", indexBuildEles=" + indexBuildEles +
", idBuildEles=" + idBuildEles +
", indexType='" + indexType + '\'' +
", categoryLevel1='" + categoryLevel1 + '\'' +
", zhName='" + zhName + '\'' +
", outputType='" + outputType + '\'' +
", providerName='" + providerName + '\'' +
", functionName='" + functionName + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TaskInfo taskInfo = (TaskInfo) o;
if (sourceStatus != taskInfo.sourceStatus) return false;
if (sourceType != taskInfo.sourceType) return false;
if (categoryName != null ? !categoryName.equals(taskInfo.categoryName) : taskInfo.categoryName != null)
return false;
if (sourceName != null ? !sourceName.equals(taskInfo.sourceName) : taskInfo.sourceName != null) return false;
if (providerName != null ? !providerName.equals(taskInfo.providerName) : taskInfo.providerName != null) return false;
if (pipelineConf != null ? !pipelineConf.equals(taskInfo.pipelineConf) : taskInfo.pipelineConf != null)
return false;
if (dbStoreTypes != null ? !dbStoreTypes.equals(taskInfo.dbStoreTypes) : taskInfo.dbStoreTypes != null)
return false;
if (esConfInfo != null ? !esConfInfo.equals(taskInfo.esConfInfo) : taskInfo.esConfInfo != null) return false;
if (dbConfInfo != null ? !dbConfInfo.equals(taskInfo.dbConfInfo) : taskInfo.dbConfInfo != null) return false;
if (cronInfo != null ? !cronInfo.equals(taskInfo.cronInfo) : taskInfo.cronInfo != null) return false;
if (indexBuildEles != null ? !indexBuildEles.equals(taskInfo.indexBuildEles) : taskInfo.indexBuildEles != null)
return false;
if (idBuildEles != null ? !idBuildEles.equals(taskInfo.idBuildEles) : taskInfo.idBuildEles != null)
return false;
if (indexType != null ? !indexType.equals(taskInfo.indexType) : taskInfo.indexType != null) return false;
if (categoryLevel1 != null ? !categoryLevel1.equals(taskInfo.categoryLevel1) : taskInfo.categoryLevel1 != null)
return false;
if (outputType != null ? !outputType.equals(taskInfo.outputType) : taskInfo.outputType != null)
return false;
if (functionName != null ? !functionName.equals(taskInfo.functionName) : taskInfo.functionName != null)
return false;
return zhName != null ? zhName.equals(taskInfo.zhName) : taskInfo.zhName == null;
}
@Override
public int hashCode() {
int result = categoryName != null ? categoryName.hashCode() : 0;
result = 31 * result + (sourceName != null ? sourceName.hashCode() : 0);
result = 31 * result + (providerName != null ? providerName.hashCode() : 0);
result = 31 * result + sourceStatus;
result = 31 * result + (pipelineConf != null ? pipelineConf.hashCode() : 0);
result = 31 * result + (dbStoreTypes != null ? dbStoreTypes.hashCode() : 0);
result = 31 * result + (esConfInfo != null ? esConfInfo.hashCode() : 0);
result = 31 * result + (dbConfInfo != null ? dbConfInfo.hashCode() : 0);
result = 31 * result + (cronInfo != null ? cronInfo.hashCode() : 0);
result = 31 * result + sourceType;
result = 31 * result + (indexBuildEles != null ? indexBuildEles.hashCode() : 0);
result = 31 * result + (idBuildEles != null ? idBuildEles.hashCode() : 0);
result = 31 * result + (indexType != null ? indexType.hashCode() : 0);
result = 31 * result + (categoryLevel1 != null ? categoryLevel1.hashCode() : 0);
result = 31 * result + (zhName != null ? zhName.hashCode() : 0);
result = 31 * result + (outputType != null ? outputType.hashCode() : 0);
result = 31 * result + (functionName != null ? functionName.hashCode() : 0);
return result;
}
}
3.3 任务的构造及初始化
/**
* 获取调度任务的相关信息
* Created by songwang4 on 2017/6/7.
*/
public class GenerateTaskInfo {
static Logger logger = LoggerFactory.getLogger(GenerateTaskInfo.class);
static DBCollection sourceColl = MongoUtil.createOcpSourceDB();
static DBCollection categoryColl = MongoUtil.createOcpCategoryDB();
/**
* 从数据库中读取任务相关信息
*
* @return
*/
public static List<TaskInfo> generateTaskInfoFromMongo() {
// 将任务信息进行封装
List<TaskInfo> tasks = Lists.newArrayList();
TaskInfo task = null;
DBCursor sourceCur = sourceColl.find();
DBObject sourceObj = null;
DBObject categoryObj = null;
while (sourceCur.hasNext()) {
sourceObj = sourceCur.next();
task = new TaskInfo();
String sourceName = sourceObj.get("sourceName").toString();
String categoryName = sourceObj.get("category").toString();
// 基于业务名查找对应的业务表信息
categoryObj = categoryColl.findOne(new BasicDBObject("catName", categoryName));
if (categoryObj == null) {
logger.error("no category found through source: " + sourceName);
continue;
}
task.setCategoryId(categoryObj.get("_id").toString()); // 业务Id
task.setCategoryName(categoryName); // 业务名
List<String> dbStoreTypes = Lists.newArrayList();
if (categoryObj.containsField("storeType")) {
try {
JSONArray storeTypeArr = JSON.parseArray(categoryObj.get("storeType").toString());
for (int i = 0; i < storeTypeArr.size(); i++) {
dbStoreTypes.add(storeTypeArr.getString(i));
}
} catch (Exception e) {
}
}
task.setDbStoreTypes(dbStoreTypes); // 存储类型
task.setCategoryLevel1(categoryObj.get("parent").toString()); // 一级业务分类
task.setZhName(sourceObj.get("zhName").toString());
task.setDbConfInfo(categoryObj.containsField("db") ? categoryObj.get("db").toString() : categoryName); // DB配置
task.setEsConfInfo(categoryObj.containsField("es") ? categoryObj.get("es").toString() : categoryName); // ES配置
task.setIndexBuildEles(extractBuilderEles(categoryObj, "isIndex", "itemName")); // 构建ES索引信息
task.setIdBuildEles(extractBuilderEles(categoryObj, "isGK", "itemName")); // 构建id的信息元素
task.setSourceId(sourceObj.get("_id").toString()); // 信源Id
task.setSourceName(sourceName); // 信源名称
int status = StatusType.OFFLINE;
if (sourceObj.containsField("status")) {
String statusType = sourceObj.get("status").toString();
if (statusType.equals(StatusType.STR_ONLINE)) {
status = StatusType.ONLINE;
}
}
task.setSourceStatus(status); // 信源的上下线状态
int sourceType = SourceType.REAL_TIME_PROCESS;
if (sourceObj.containsField("type")) {
String strStatusType = sourceObj.get("type").toString();
if (strStatusType.equals(SourceType.STR_OFF_LINE_PROCESS)) {
sourceType = SourceType.OFF_LINE_PROCESS;
}
}
task.setSourceType(sourceType); // 离线或实时处理
task.setIndexType(sourceObj.containsField("indexType") ?
sourceObj.get("indexType").toString() : ""); // 增量或全量标识
// 定时时间配置
task.setCronInfo(sourceObj.containsField("timerInfo") ?
sourceObj.get("timerInfo").toString() : "");
if (task.getCronInfo().trim().length() == 0) {
task.setCronInfo(generateCronInfo(sourceObj));
}
task.setPipelineConf(sourceObj.containsField("mappingWorkflow") ?
sourceObj.get("mappingWorkflow").toString() : ""); // pipeline配置信息
tasks.add(task);
}
sourceCur.close();
return tasks;
}
/**
* 构建生成id或es的信息元素
*
* @param categoryObj
* @param queryField
* @param retureField
* @return
*/
public static List<String> extractBuilderEles(DBObject categoryObj, String queryField, String retureField) {
List<String> builerEles = Lists.newArrayList();
JSONArray dataItemArr = null;
try {
dataItemArr = JSON.parseArray(categoryObj.get("dataItems").toString());
} catch (JSONException e) {
}
if (dataItemArr != null && dataItemArr.size() > 0) {
JSONObject dataItemJson = null;
for (int i = 0; i < dataItemArr.size(); i++) {
dataItemJson = dataItemArr.getJSONObject(i);
if (dataItemJson.containsKey(queryField) && dataItemJson.getBoolean(queryField)) {
builerEles.add(dataItemJson.getString(retureField).trim());
}
}
}
return builerEles;
}
/**
* 基于业务表中的信息构造定时任务表达式
*
* @param sourceObj
* @return
*/
public static String generateCronInfo(DBObject sourceObj) {
String updateTimeType = "";
String updateTimeCycle = "";
if (sourceObj.containsField("updateType")) {
updateTimeType = sourceObj.get("updateType").toString();
}
if (sourceObj.containsField("updateCycle")) {
updateTimeCycle = sourceObj.get("updateCycle").toString();
}
if (updateTimeType.trim().length() == 0 || updateTimeCycle.trim().length() == 0) {
return "";
}
StringBuilder sb = new StringBuilder();
Date date = null;
if (updateTimeType.equalsIgnoreCase("YEAR")) {
date = TimeUtil.parseDate(updateTimeCycle, "MM-dd HH:mm");
if (date == null) {
try {
sb.append(TimeUtil.extractFixedTimeByDay(Integer.parseInt(updateTimeCycle), 0, 0));
} catch (NumberFormatException e) {
}
} else {
sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.MONTH) + 1).append(" ? *");
}
}
if (updateTimeType.equalsIgnoreCase("MONTH")) {
date = TimeUtil.parseDate(updateTimeCycle, "dd HH:mm");
if (date == null) return "";
sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" * ?");
}
if (updateTimeType.equalsIgnoreCase("DAY")) {
date = TimeUtil.parseDate(updateTimeCycle, "HH:mm");
if (date == null) return "";
sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" * * ?");
}
if (updateTimeType.equalsIgnoreCase("WEEK")) {
String weekDay = "1";
if (sourceObj.containsField("weekDay")) {
weekDay = sourceObj.get("weekDay").toString();
}
date = TimeUtil.parseDate(updateTimeCycle, "HH:mm");
if (date == null) return "";
sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ? * ")
.append(TimeUtil.extractFixedTime(weekDay));
}
if (updateTimeType.equalsIgnoreCase("HOUR")) {
try {
int hour = Integer.parseInt(updateTimeCycle);
sb.append(TimeUtil.extractFixedTimeByHour(hour, 0));
} catch (NumberFormatException e) {
}
}
if (updateTimeType.equalsIgnoreCase("MINUTE")) {
try {
int minute = Integer.parseInt(updateTimeCycle);
sb.append(TimeUtil.extractFixedTimeByMinute(minute));
} catch (NumberFormatException e) {
}
}
if (updateTimeType.equalsIgnoreCase("SECOND")) {
sb.append("*/").append(updateTimeCycle).append(" * * * * ?");
}
return sb.toString();
}
/**
* 过滤下线的任务
*
* @param tasks
* @return
*/
public static List<TaskInfo> filterTask(List<TaskInfo> tasks) {
List<TaskInfo> taskInfos = Lists.newArrayList();
for (TaskInfo taskInfo : tasks) {
// 过滤下线的信源状态或实时的信源
if (taskInfo.getSourceStatus() == StatusType.OFFLINE
|| taskInfo.getSourceType() != SourceType.OFF_LINE_PROCESS) {
continue;
}
taskInfos.add(taskInfo);
}
return taskInfos;
}
/**
* 基于业务名称对任务进行分组
*
* @param oriTasks
* @return
*/
public static Map<String, List<TaskInfo>> groupTaskByCategory(List<TaskInfo> oriTasks) {
Map<String, List<TaskInfo>> categoryTasks = Maps.newHashMap();
for (TaskInfo oriTask : oriTasks) {
if (!categoryTasks.containsKey(oriTask.getCategoryId())) {
List<TaskInfo> taskInfos = Lists.newArrayList();
taskInfos.add(oriTask);
categoryTasks.put(oriTask.getCategoryId(), taskInfos);
} else {
boolean hasSameSourceId = false;
for (TaskInfo taskInfo : categoryTasks.get(oriTask.getCategoryId())) {
if (taskInfo.getSourceId().equals(oriTask.getSourceId())) {
hasSameSourceId = true;
break;
}
}
if (!hasSameSourceId) {
categoryTasks.get(oriTask.getCategoryId()).add(oriTask);
}
}
}
return categoryTasks;
}
}
3.4 调用下载接口的任务
/**
* 离线存储任务
* 注意:上一个任务如未完成,且下一次的定时任务已到执行时间,则需要等待上一个任务
* 执行完成,再进行下一个任务
*/
@DisallowConcurrentExecution
public class ScheduleJob implements Job {
static Logger logger = LoggerFactory.getLogger(ScheduleJob.class);
public ScheduleJob() {
}
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDetail jobDetail = jobExecutionContext.getJobDetail();
JSONObject json = new JSONObject();
json.put("jobName", jobDetail.getKey().getName());
json.put("jobGroup", jobDetail.getKey().getGroup());
json.put("triggerName", jobExecutionContext.getTrigger().getKey().getName());
json.put("triggerGroup", jobExecutionContext.getTrigger().getKey().getGroup());
logger.info("job is running: " + json.toString());
JobDataMap dataMap = jobDetail.getJobDataMap();
JSONObject confJson = null;
try {
confJson = JSONObject.parseObject(dataMap.getString(SchedulerFactory.CONF_INFO));
} catch (JSONException e) {
}
if (confJson == null) {
logger.error("conf is empty: " + json.toString());
return;
}
// 获取存储类型
TaskInfo taskInfo = new Gson().fromJson(confJson.toString(), TaskInfo.class);
if (!isNeedtoRun(taskInfo)) {
logger.info("no need to run: " + json.toString());
return;
}
List<IDataOut> dataOuts = Lists.newArrayList();
for (String dbStoreType : taskInfo.getDbStoreTypes()) {
switch (dbStoreType) {
case StoreType.STR_MONGO_STORE:
dataOuts.add(new DataOut2Mongo(taskInfo.getFunctionName(), taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName()));
break;
case StoreType.STR_ES_STORE:
dataOuts.add(new DataOut2ES(taskInfo.getCategoryName(),taskInfo.getFunctionName(), taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getIndexBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName()));
break;
case StoreType.STR_REDIS_STORE:
dataOuts.add(new DataOut2Redis(taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName()));
break;
}
}
// 创建数据拉取对象,拉取前存储一次,拉取后存储一次
CrawlerLog crawlerLog = createCrawlerLog(taskInfo);
if (dataOuts.size() > 0) {
PipeExecuter.executeSave(taskInfo.getPipelineConf(), dataOuts, crawlerLog);
}
}
/**
* 判断job是否需要执行
*
* @param taskInfo
* @return
*/
public static boolean isNeedtoRun(TaskInfo taskInfo) {
// 实时or离线
if (taskInfo.getSourceType() == SourceType.REAL_TIME_PROCESS) {
logger.warn("the job is real-time process, no need to run");
return false;
}
// job的上下线状态
if (taskInfo.getSourceStatus() == StatusType.OFFLINE) {
logger.warn("the job status is offline, no need to run");
return false;
}
// pipeline的配置信息
if (Strings.isNullOrEmpty(taskInfo.getPipelineConf()) || taskInfo.getPipelineConf().trim().length() == 0) {
logger.warn("no pipeline configure info, no need to run");
return false;
}
// job的存储信息
if (taskInfo.getDbStoreTypes().size() == 0) {
logger.warn("the job store type is 0, no need to store");
return false;
}
return true;
}
/**
* 创建拉取数据的日志,以便管理系统查看
*
* @param taskInfo
* @return
*/
public CrawlerLog createCrawlerLog(TaskInfo taskInfo) {
CrawlerLog crawlerLog = new CrawlerLog();
crawlerLog.setIndexType(taskInfo.getIndexType()); // 增量还是全量
crawlerLog.setCategoryLv1(taskInfo.getCategoryLevel1());
String sourceName = taskInfo.getSourceName();
String sourceZhName = taskInfo.getZhName();
String sourceArr[] = sourceName.split("_");
String sourceZhArr[] = sourceZhName.split("_");
crawlerLog.setCategoryLv2((sourceArr != null && sourceArr.length > 0) ? sourceArr[0] : "");
crawlerLog.setFunctionName((sourceArr != null && sourceArr.length > 1) ? sourceArr[1] : "");
crawlerLog.setProviderName((sourceArr != null && sourceArr.length > 2) ? sourceArr[2] : "");
crawlerLog.setFunctionZhName((sourceZhArr != null && sourceZhArr.length > 1) ? sourceZhArr[1] : "");
crawlerLog.setProviderZhName((sourceZhArr != null && sourceZhArr.length > 2) ? sourceZhArr[2] : "");
crawlerLog.setId();
return crawlerLog;
}
}
3.5 任务调度工厂
工厂用于生成任务的触发器Trigger,以及创建任务Job。
public class SchedulerFactory {
static Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
public static final String CONF_INFO = "conf_info";
public static final String DYNAMIC_UPDATE_JOB_NAME = "dynamicUpdateJob";
public static final String DYNAMIC_UPDATE_GROUP_NAME = "dynamicUpdateGroup";
public static final String DYNAMIC_UPDATE_CRONTINFO = "*/30 * * * * ?";
/**
* 将任务加入任务调度中
* @param taskInfo
* @param scheduler
*/
public static void addJob2Scheduler(TaskInfo taskInfo, Scheduler scheduler) {
try {
JobDetail jobDetail = generateJobDetail(taskInfo);
if(jobDetail == null){
logger.error("create job failed!");
return;
}
Trigger triger = generateTrigger(taskInfo);
if(triger == null){
logger.error("create trigger failed!");
return;
}
// 加载执行Job及定时器
scheduler.scheduleJob(jobDetail,triger);
} catch (SchedulerException e) {
logger.error("create scheduler error, error message: "+e.toString());
}
}
public static void addDynamicUpdateJob2Scheduler(Scheduler scheduler) {
try {
JobDetail jobDetail = generateDynamicUpdateJobDetail(DYNAMIC_UPDATE_JOB_NAME, DYNAMIC_UPDATE_GROUP_NAME);
if(jobDetail == null){
logger.error("create job failed!");
return;
}
Trigger triger = generateTrigger(DYNAMIC_UPDATE_JOB_NAME, DYNAMIC_UPDATE_GROUP_NAME, DYNAMIC_UPDATE_CRONTINFO);
if(triger == null){
logger.error("create trigger failed!");
return;
}
// 加载执行Job及定时器
scheduler.scheduleJob(jobDetail,triger);
} catch (SchedulerException e) {
logger.error("create scheduler error, error message: "+e.toString());
}
}
/**
* 于信源信息生成对应的job
* @param taskInfo
* @return
*/
public static JobDetail generateJobDetail(TaskInfo taskInfo) {
String jobName = taskInfo.getSourceName();
if(jobName.trim().length() == 0){
logger.error("job name is empty, please check!");
return null;
}
String jobGroup = taskInfo.getCategoryName();
if(jobGroup.trim().length() == 0){
logger.error("job group is empty, please check!");
return null;
}
return JobBuilder.newJob(ScheduleJob.class)
.withIdentity(PrefixType.JOB_PREFIX+jobName, PrefixType.JOB_PREFIX+jobGroup)
.requestRecovery()
.usingJobData(CONF_INFO, new Gson().toJson(taskInfo)).build();
}
public static JobDetail generateDynamicUpdateJobDetail(String jobName, String jobGroup) {
if(jobName.trim().length() == 0){
logger.error("job name is empty, please check!");
return null;
}
if(jobGroup.trim().length() == 0){
logger.error("job group is empty, please check!");
return null;
}
return JobBuilder.newJob(DynamicUpdateJob.class)
.withIdentity(PrefixType.JOB_PREFIX+jobName, PrefixType.JOB_PREFIX+jobGroup)
.requestRecovery()
.build();
}
/**
* 基于信源信息生成对应的trigger
* @param taskInfo
* @return
*/
public static Trigger generateTrigger(TaskInfo taskInfo) {
String sourceTriggerName = taskInfo.getSourceName();
if(sourceTriggerName.trim().length() == 0){
logger.error("trigger name is empty, please check!");
return null;
}
String sourceTriggerGroup = taskInfo.getCategoryName();
if(sourceTriggerGroup.trim().length() == 0){
logger.error("trigger group is empty, please check!");
return null;
}
String cronInfo = taskInfo.getCronInfo();
if(cronInfo.trim().length() == 0){
logger.error("cron timer info is empty, please check!");
return null;
}
return TriggerBuilder.newTrigger().withIdentity(PrefixType.TRIGGER_PREFIX+sourceTriggerName,
PrefixType.TRIGGER_PREFIX+sourceTriggerGroup)
.withSchedule(CronScheduleBuilder.cronSchedule(cronInfo))
.build();
}
public static Trigger generateTrigger(String sourceTriggerName, String sourceTriggerGroup, String cronInfo) {
if(sourceTriggerName.trim().length() == 0){
logger.error("trigger name is empty, please check!");
return null;
}
if(sourceTriggerGroup.trim().length() == 0){
logger.error("trigger group is empty, please check!");
return null;
}
if(cronInfo.trim().length() == 0){
logger.error("cron timer info is empty, please check!");
return null;
}
return TriggerBuilder.newTrigger().withIdentity(PrefixType.TRIGGER_PREFIX+sourceTriggerName,
PrefixType.TRIGGER_PREFIX+sourceTriggerGroup)
.withSchedule(CronScheduleBuilder.cronSchedule(cronInfo))
.build();
}
}
3.6 动态检测任务更新的Job
@DisallowConcurrentExecution
public class DynamicUpdateJob implements Job{
private static Logger logger = LoggerFactory.getLogger(DynamicUpdateJob.class);
public DynamicUpdateJob(){}
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDetail jobDetail = jobExecutionContext.getJobDetail();
JSONObject json = new JSONObject();
json.put("jobName", jobDetail.getKey().getName());
json.put("jobGroup", jobDetail.getKey().getGroup());
json.put("triggerName", jobExecutionContext.getTrigger().getKey().getName());
json.put("triggerGroup", jobExecutionContext.getTrigger().getKey().getGroup());
logger.info("job is running: "+json.toString());
// 获取当前的调度器
Scheduler scheduler = jobExecutionContext.getScheduler();
// 获取配置信息中的任务(注意需要保持)
List<TaskInfo> confTaskInfos = GenerateTaskInfo.generateTaskInfoFromMysql();
// 获取所有的job信息
List<JobKey> schedulerJobKeys = acquireJobKeysWithinSceduler(scheduler);
// 1. 配置任务不存在,而sheduler相关任务存在,则进行下线处理
for(JobKey schedulerJobKey : schedulerJobKeys ){
boolean hasSameJobKeyInConfTask = false;
for(TaskInfo confTaskInfo : confTaskInfos){
if(generateJobKey(confTaskInfo).equals(schedulerJobKey)){
hasSameJobKeyInConfTask = true;
break;
}
}
if(!hasSameJobKeyInConfTask){
try {
scheduler.deleteJob(schedulerJobKey);
logger.info("delete offline job: "+schedulerJobKey.toString());
} catch (SchedulerException e) {
logger.error("delete offline job error: "+json.toString());
}
}
}
// 2 配置任务与调度器任务比较
for(TaskInfo confTaskInfo : confTaskInfos){
JobKey confJobKey = generateJobKey(confTaskInfo);
boolean hasSameJob = false;
for(JobKey schedulerJobKey : schedulerJobKeys ){
if(confJobKey.equals(schedulerJobKey)){
hasSameJob = true;
break;
}
}
if(hasSameJob){ //具有相同名称的job
logger.info("has same jobKey: "+confJobKey);
JobDetail schedulerJobDetail = null;
try {
schedulerJobDetail = scheduler.getJobDetail(confJobKey);
} catch (SchedulerException e) {
logger.error("get job detail from scheduler error: "+confJobKey);
}
if(schedulerJobDetail == null) continue;
// 1) 是否需要下线
if(!ScheduleJob.isNeedtoRun(confTaskInfo)){
try {
logger.info("has same jobKey and offline the job "+confJobKey);
scheduler.deleteJob(confJobKey);
} catch (SchedulerException e) {
logger.error("delete offline job error: "+confJobKey);
}
}else{
// 2) 是否需要更新任务
TaskInfo schedulerTaskInfo = parseTaskInfoFromJobDataMap(schedulerJobDetail);
logger.info("confTaskInfo: " + confTaskInfo);
logger.info("schedulerTaskInfo: " + schedulerTaskInfo);
if(!confTaskInfo.equals(schedulerTaskInfo)){
try {
logger.info("has same jobKey and update the job "+confJobKey);
scheduler.deleteJob(confJobKey);
SchedulerFactory.addJob2Scheduler(confTaskInfo, scheduler);
} catch (SchedulerException e) {
logger.error("update scheduler info error: "+confJobKey);
}
}else{
logger.info("the job info is same "+confJobKey);
}
}
}else{ // 创建新的Job
// 1) 是否满足上线的条件
if(!ScheduleJob.isNeedtoRun(confTaskInfo)){
logger.info("the status is offline, no need to create new job: "+confJobKey);
continue;
}
logger.info("no same jobKey and create job "+confJobKey);
// 2) 上线
SchedulerFactory.addJob2Scheduler(confTaskInfo, scheduler);
}
}
}
protected List<JobKey> acquireJobKeysWithinSceduler(Scheduler scheduler){
List<JobKey> jobKeys = Lists.newArrayList();
try {
for(String groupName : scheduler.getJobGroupNames()){
if(groupName.equals(PrefixType.JOB_PREFIX+SchedulerFactory.DYNAMIC_UPDATE_GROUP_NAME)){
continue;
}
for(JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))){
jobKeys.add(jobKey);
}
}
} catch (SchedulerException e) {
}
return jobKeys;
}
protected TaskInfo parseTaskInfoFromJobDataMap(JobDetail jobDetail){
try {
String confInfo = jobDetail.getJobDataMap().getString(SchedulerFactory.CONF_INFO);
return new Gson().fromJson(confInfo, TaskInfo.class);
} catch (Exception e) {
logger.error("parse task info from JobDataMap error!");
return null;
}
}
protected JobKey generateJobKey(TaskInfo taskInfo){
return generateJobKey(taskInfo.getSourceName(), taskInfo.getCategoryName());
}
protected JobKey generateJobKey(String jobName, String jobGroup){
return JobKey.jobKey(PrefixType.JOB_PREFIX+jobName,PrefixType.JOB_PREFIX+jobGroup);
}
}
3.7 Es数据库存储
/**
* Created by songwang4 on 2017/6/7.
*/
public class DataOut2ES implements IDataOut, IDataClose {
static Logger logger = LoggerFactory.getLogger(DataOut2ES.class);
static TransportClient client;
String indexName; // 默认为ocp
String typeName;
String sourceName;
List<String> indexBuildEles;
List<String> idBuilderEles;
Map<String,String> outputType;
String providerName;
public DataOut2ES(String indexName,String type){
this.indexName = indexName;
this.typeName = type;
init();
}
public DataOut2ES(String indexName,String type, List<String> indexBuildEles){
this(indexName,type);
this.indexBuildEles = indexBuildEles;
}
public DataOut2ES(String indexName,String type, List<String> idBuilderEles, List<String> indexBuildEles){
this(indexName,type, indexBuildEles);
this.idBuilderEles = idBuilderEles;
}
public DataOut2ES(String indexName,String type, String sourceName, List<String> idBuilderEles, List<String> indexBuildEles){
this(indexName,type, idBuilderEles, indexBuildEles);
this.sourceName = sourceName;
}
public DataOut2ES(String indexName,String type, String sourceName, List<String> idBuilderEles, List<String> indexBuildEles, Map<String,String> outputType,String providerName){
this(indexName,type,sourceName, idBuilderEles, indexBuildEles);
this.outputType = outputType;
this.providerName = providerName;
}
public static void init() {
if(client == null){
Configuration conf = OcpConfHelper.getInstance().getOcpConf();
String esClusterName = conf.getStringValue("ocp_es_cluster_name", "");
String esIp = conf.getStringValue("ocp_es_ip", "");
int esPort = conf.getIntValue("ocp_es_port", "");
Settings settings = Settings.builder()
.put("cluster.name",esClusterName)
.put("client.transport.sniff", true)
.put("client.transport.ping_timeout", "120s")
.put("client.transport.nodes_sampler_interval","30s").build();
try {
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esIp),esPort));
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
/**
* 批量写入
* @param datas
*/
public void save(List<JSONObject> datas) {
// 批量的插入数据
BulkRequestBuilder bulkRequest = client.prepareBulk();
for(JSONObject data : datas){
//按输出字段类型进行转换
// data = OutputTypeTransform.transform(data,outputType);
String id64 = IdBuilder.generateId(this.sourceName, data, this.idBuilderEles);
if(id64.trim().length() == 0) continue;
JSONObject indexJson = new JSONObject();
for(String indexBuildEle : this.indexBuildEles){
if(data.containsKey(indexBuildEle)){
indexJson.put(indexBuildEle, data.get(indexBuildEle));
}
}
if(indexJson.keySet().isEmpty()){
logger.info("no json fields, so no need to save");
return;
}
bulkRequest.add(client.prepareIndex(indexName, typeName, id64).setSource(indexJson.toString()));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if(bulkResponse.hasFailures()){
logger.error("insert data 2 es error "+indexName);
System.out.println(bulkResponse.buildFailureMessage());
}
}
public void saveWithoutIndexBuilds(List<JSONObject> datas) {
// 批量的插入数据
BulkRequestBuilder bulkRequest = client.prepareBulk();
for(JSONObject data : datas){
bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(data.toString()));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if(bulkResponse.hasFailures()){
logger.error("insert data 2 es error "+indexName);
System.out.println(bulkResponse.buildFailureMessage());
}
}
public void saveWithoutIndexBuilds2(List<JSONObject> datas) {
// 批量的插入数据
BulkRequestBuilder bulkRequest = client.prepareBulk();
for(JSONObject data : datas){
String _id = data.getString("_id");
JSONObject source = data.getJSONObject("_source");
bulkRequest.add(client.prepareIndex(indexName, typeName,_id).setSource(source.toString()));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if(bulkResponse.hasFailures()){
logger.error("insert data 2 es error "+indexName);
System.out.println(bulkResponse.buildFailureMessage());
}
}
/**
* 判断索引是否存在
* @param indexName
* @return
*/
public boolean isExistsIndex(String indexName){
IndicesExistsResponse response = client.admin().indices()
.exists(new IndicesExistsRequest().indices(new String[]{indexName})).actionGet();
return response.isExists();
}
/**
* 创建索引信息
* @param indexName
* @return
*/
public boolean createIndex(String indexName){
try {
CreateIndexResponse indexResponse = this.client
.admin()
.indices()
.prepareCreate(indexName)
.get();
return indexResponse.isAcknowledged();
} catch (ElasticsearchException e) {
e.printStackTrace();
}
return false;
}
@Override
public void save(Object data) {
if(this.indexBuildEles.size() == 0){
logger.error("index fields are empty in es, no index need to save, info: " + data.toString());
return;
}
// 逐条插入数据
JSONObject json = null;
try {
json = (JSONObject)data;
} catch (Exception e) {
logger.error("trans data to json error in es :" + data.toString());
return;
}
if(json == null){
logger.error("trans data to Json error in es, info " + data.toString());
return;
}
// json = OutputTypeTransform.transform(json,outputType);
// 构建索引id
String id64 = IdBuilder.generateId(this.sourceName, json, this.idBuilderEles);
if(id64.trim().length() == 0){
logger.error("generate 64 bit id is null,please check: " + data.toString());
return;
}
JSONObject indexJson = new JSONObject();
for(String indexBuildEle : this.indexBuildEles){
if(json.containsKey(indexBuildEle)){
indexJson.put(indexBuildEle, json.get(indexBuildEle));
}
}
if(indexJson.keySet().isEmpty()){
logger.info("no json fields, so no need to save");
return;
}
logger.info("index info: "+indexJson);
IndexResponse response = client.prepareIndex(this.indexName, this.typeName, id64).setSource(indexJson.toString()).get();
if(response.status() != RestStatus.CREATED && response.status() != RestStatus.OK){
logger.error("index error in es, status is "+response.status().getStatus()+"info: " + data.toString());
return;
}
}
@Override
public void close() {
}
}
以上代码均为与Quartz相关的整体流程,虽然各个细节方面的代码,如配置类,数据库初始化类或加载类、以及部分帮助类没有展示,但对于Quartz的核心使用,已略窥一二。如有问题,可留言回复。
4. 集群模式
注意:上述默认使用Quartz集群模式,从主流程加载的quartz.properties中配置的集群模式如下,可进行参考。
#============================================================================
# Configure Main Scheduler Properties
#============================================================================
org.quartz.scheduler.instanceName: OcpScheduler
org.quartz.scheduler.instanceId: OcpInstance
org.quartz.scheduler.skipUpdateCheck: true
#============================================================================
# Configure ThreadPool
#============================================================================
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 50
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
#============================================================================
# Configure JobStore
#============================================================================
org.quartz.jobStore.misfireThreshold: 120000
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties: false
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.dataSource: ocpQzDs
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.clusterCheckinInterval = 60000
#============================================================================
# Configure Datasources
#============================================================================
org.quartz.dataSource.ocpQzDs.driver: com.mysql.jdbc.Driver
org.quartz.dataSource.ocpQzDs.URL:jdbc:mysql://192.168.1.1:3306/test?useUnicode=true&characterEncoding=utf-8
org.quartz.dataSource.ocpQzDs.user: test
org.quartz.dataSource.ocpQzDs.password: test
org.quartz.dataSource.ocpQzDs.maxConnection: 30
#============================================================================
# Configure Plugins
#============================================================================
org.quartz.plugin.shutdownHook.class: org.quartz.plugins.management.ShutdownHookPlugin
org.quartz.plugin.shutdownHook.cleanShutdown: true
#org.quartz.plugin.triggHistory.class: org.quartz.plugins.history.LoggingJobHistoryPlugin
上一篇: h5+c3进阶(1)
下一篇: h5c3 3d部分