欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MetaQ技术内幕——源码分析(四)

程序员文章站 2022-07-13 13:49:03
...

前面,我们已经把Broker存储最重要的一个类具体分析了一遍,接下来,我们分析一下其删除的策略。前面介绍过Messagestore采用的多文件存储的组织方式,而存储空间不可能无限大,得有一定的删除策略对其进行删除以腾出空间给新的消息。

MetaQ允许自定义删除策略,需要实现接口DeletePolicy,默认提供了两种删除策略:过期删除(DiscardDeletePolicy)和过期打包删除(ArchiveDeletePolicy)。DiscardDeletePolicy和ArchiveDeletePolicy都比较简单,DiscardDeletePolicy主要是对于超过一定时期的文件进行删除,ArchiveDeletePolicy则是先打包备份再删除。

自定义策略是如何被识别和使用的呢,MetaQ定义了DeletePolicyFactory,所有删除策略的实例都由DeletePolicyFactory提供,DeletePolicyFactory对外提供了注册机制,利用反射机制生成实例,每个自定义的删除策略都必须有一个无参构造,DeletePolicyFactory生成实例代码如下:

Java代码 MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ  MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ 
  1. public static DeletePolicy getDeletePolicy(String values) { 
  2.         String[] tmps = values.split(","); 
  3.         String name = tmps[0]; 
  4.         Class<? extends DeletePolicy> clazz = policyMap.get(name); 
  5.         if (clazz == null) { 
  6.             throw new UnknownDeletePolicyException(name); 
  7.         } 
  8.         try { 
  9.            //直接调用class的newInstance()方法,该方法必须要求有一个无参构造 
  10.             DeletePolicy deletePolicy = clazz.newInstance(); 
  11.             String[] initValues = null; 
  12.             if (tmps.length >= 2) { 
  13.                 initValues = new String[tmps.length - 1]; 
  14.                 System.arraycopy(tmps, 1, initValues, 0, tmps.length - 1); 
  15.             } 
  16.             deletePolicy.init(initValues); 
  17.             return deletePolicy; 
  18.         } 
  19.         catch (Exception e) { 
  20.             throw new MetamorphosisServerStartupException("New delete policy `" + name + "` failed", e); 
  21.         } 
  22.     } 
public static DeletePolicy getDeletePolicy(String values) {        String[] tmps = values.split(",");        String name = tmps[0];        Class<? extends DeletePolicy> clazz = policyMap.get(name);        if (clazz == null) {            throw new UnknownDeletePolicyException(name);        }        try {           //直接调用class的newInstance()方法,该方法必须要求有一个无参构造            DeletePolicy deletePolicy = clazz.newInstance();            String[] initValues = null;            if (tmps.length >= 2) {                initValues = new String[tmps.length - 1];                System.arraycopy(tmps, 1, initValues, 0, tmps.length - 1);            }            deletePolicy.init(initValues);            return deletePolicy;        }        catch (Exception e) {            throw new MetamorphosisServerStartupException("New delete policy `" + name + "` failed", e);        }    }

DeletePolicy和MessageStore如何结合在一起的呢?则是粘合剂MessageStoreManager,MessageStoreManager是存储模块的管家,负责与其他模块联系,也是MessageStore管理器,管理所有的MessageStore以及其删除策略,MessageStoreManager也是要好好分析的一个类。

Java代码 MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ  MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ 
  1. private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* partition */, MessageStore>> stores = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, MessageStore>>(); 
  2. //前面的存储组织方式介绍过一个主题对应多一个分区,每个分区对应一个MessageStore实例,分区号使用数值来表示,stores就是按照该方式组织管理的 
  3.     private final MetaConfig metaConfig; 
  4. //参数配置 
  5.     private ScheduledThreadPoolExecutor scheduledExecutorService;// = 
  6.     // Executors.newScheduledThreadPool(2); 
  7.   //调度服务,对不同的MessageStore实例flush,将数据提到到硬盘 
  8.     private final DeletePolicy deletePolicy; 
  9.   //删除策略选择器,这里采用的一个topic对应一种策略,而不是一个MessageStore对应一个策略实例,一个策略实例在同一个topic的不同MessageStore实例间是重用的 
  10.     private DeletePolicySelector deletePolicySelector; 
  11.    
  12.     public static final int HALF_DAY = 1000 * 60 * 60 * 12; 
  13.   //topic 集合 
  14.     private final Set<Pattern> topicsPatSet = new HashSet<Pattern>(); 
  15.  
  16.     private final ConcurrentHashMap<Integer, ScheduledFuture<?>> unflushIntervalMap = new ConcurrentHashMap<Integer, ScheduledFuture<?>>(); 
  17. //前面曾介绍过MessageStore的提交方式有两种:组提交和定时提交,unflushIntervalMap是存放 
  18. //定时提交的任务 
  19.     private Scheduler scheduler; 
  20. //定时调度器,用于定时调度删除任务 
  21.     public MessageStoreManager(final MetaConfig metaConfig, final DeletePolicy deletePolicy) { 
  22.         this.metaConfig   = metaConfig; 
  23.         this.deletePolicy = deletePolicy; 
  24. //生成策略选择器 
  25.         this.newDeletePolicySelector(); 
  26. //添加匿名监听器,监听topic列表变化,如果列表发生变化,则新增列表并重新生成选择器 
  27.         this.metaConfig.addPropertyChangeListener("topics", new PropertyChangeListener() { 
  28.             public void propertyChange(final PropertyChangeEvent evt) { 
  29.                 MessageStoreManager.this.makeTopicsPatSet(); 
  30.                 MessageStoreManager.this.newDeletePolicySelector(); 
  31.             } 
  32.         }); 
  33. //添加匿名监听,监听unflushInternal变化,如果发生变化 
  34.         this.metaConfig.addPropertyChangeListener("unflushInterval", new PropertyChangeListener() { 
  35.             public void propertyChange(final PropertyChangeEvent evt) { 
  36.                 MessageStoreManager.this.scheduleFlushTask(); 
  37.             } 
  38.         }); 
  39.         this.makeTopicsPatSet(); 
  40.       //初始化调度 
  41.         this.initScheduler(); 
  42.         // 定时flush,该方法作者有详细注释就不在解释了 
  43.         this.scheduleFlushTask(); 
  44.     } 
private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* partition */, MessageStore>> stores = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, MessageStore>>();//前面的存储组织方式介绍过一个主题对应多一个分区,每个分区对应一个MessageStore实例,分区号使用数值来表示,stores就是按照该方式组织管理的	private final MetaConfig metaConfig;//参数配置	private ScheduledThreadPoolExecutor scheduledExecutorService;// =	// Executors.newScheduledThreadPool(2);  //调度服务,对不同的MessageStore实例flush,将数据提到到硬盘	private final DeletePolicy deletePolicy;  //删除策略选择器,这里采用的一个topic对应一种策略,而不是一个MessageStore对应一个策略实例,一个策略实例在同一个topic的不同MessageStore实例间是重用的	private DeletePolicySelector deletePolicySelector;  	public static final int HALF_DAY = 1000 * 60 * 60 * 12;  //topic 集合	private final Set<Pattern> topicsPatSet = new HashSet<Pattern>();	private final ConcurrentHashMap<Integer, ScheduledFuture<?>> unflushIntervalMap = new ConcurrentHashMap<Integer, ScheduledFuture<?>>();//前面曾介绍过MessageStore的提交方式有两种:组提交和定时提交,unflushIntervalMap是存放//定时提交的任务	private Scheduler scheduler;//定时调度器,用于定时调度删除任务	public MessageStoreManager(final MetaConfig metaConfig, final DeletePolicy deletePolicy) {		this.metaConfig   = metaConfig;		this.deletePolicy = deletePolicy;//生成策略选择器		this.newDeletePolicySelector();//添加匿名监听器,监听topic列表变化,如果列表发生变化,则新增列表并重新生成选择器		this.metaConfig.addPropertyChangeListener("topics", new PropertyChangeListener() {			public void propertyChange(final PropertyChangeEvent evt) {				MessageStoreManager.this.makeTopicsPatSet();				MessageStoreManager.this.newDeletePolicySelector();			}		}); //添加匿名监听,监听unflushInternal变化,如果发生变化		this.metaConfig.addPropertyChangeListener("unflushInterval", new PropertyChangeListener() {			public void propertyChange(final PropertyChangeEvent evt) {				MessageStoreManager.this.scheduleFlushTask();			}		});		this.makeTopicsPatSet();      //初始化调度		this.initScheduler();		// 定时flush,该方法作者有详细注释就不在解释了		this.scheduleFlushTask();	}

MessageStoreManager实现接口Service,在启动是会调用init方法,关闭时调用dispose方法

Java代码 MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ  MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ 
  1. public void init() { 
  2.         // 加载已有数据并校验 
  3.         try { 
  4.             this.loadMessageStores(this.metaConfig); 
  5.         } catch (final IOException e) { 
  6.             log.error("load message stores failed", e); 
  7.             throw new MetamorphosisServerStartupException("Initilize message store manager failed", e); 
  8.         } catch (InterruptedException e) { 
  9.             Thread.currentThread().interrupt(); 
  10.         } 
  11.         this.startScheduleDeleteJobs(); 
  12.     } 
  13.  
  14. // 
  15. private Set<File> getDataDirSet(final MetaConfig metaConfig) throws IOException { 
  16.         final Set<String> paths = new HashSet<String>(); 
  17.         // public data path 
  18.       //公共数据目录 
  19.         paths.add(metaConfig.getDataPath()); 
  20.         // topic data path 
  21.       //私有数据目录 
  22.         for (final String topic : metaConfig.getTopics()) { 
  23.             final TopicConfig topicConfig = metaConfig.getTopicConfig(topic); 
  24.             if (topicConfig != null) { 
  25.                 paths.add(topicConfig.getDataPath()); 
  26.             } 
  27.         } 
  28.         final Set<File> fileSet = new HashSet<File>(); 
  29.         for (final String path : paths) { 
  30.             //验证数据目录是否存在 
  31.             fileSet.add(this.getDataDir(path)); 
  32.         } 
  33.         return fileSet; 
  34.     } 
  35.  
  36. private void loadMessageStores(final MetaConfig metaConfig) throws IOException, InterruptedException { 
  37. //加载数据目录列表,再加载每个目录下的数据 
  38.         for (final File dir : this.getDataDirSet(metaConfig)) { 
  39.             this.loadDataDir(metaConfig, dir); 
  40.         } 
  41.     } 
  42.  
  43.     private void loadDataDir(final MetaConfig metaConfig, final File dir) throws IOException, InterruptedException { 
  44.         log.warn("Begin to scan data path:" + dir.getAbsolutePath()); 
  45.         final long start = System.currentTimeMillis(); 
  46.         final File[] ls = dir.listFiles(); 
  47.         int nThreads = Runtime.getRuntime().availableProcessors() + 1; 
  48.         ExecutorService executor = Executors.newFixedThreadPool(nThreads); 
  49.         int count = 0; 
  50.       //将加载验证每个分区的数据包装成一个个任务 
  51.         List<Callable<MessageStore>> tasks = new ArrayList<Callable<MessageStore>>(); 
  52.         for (final File subDir : ls) { 
  53.             if (!subDir.isDirectory()) { 
  54.                 log.warn("Ignore not directory path:" + subDir.getAbsolutePath()); 
  55.             } else { 
  56.                 final String name = subDir.getName(); 
  57.                 final int index = name.lastIndexOf('-'); 
  58.                 if (index < 0) { 
  59.                     log.warn("Ignore invlaid directory:" + subDir.getAbsolutePath()); 
  60.                     continue; 
  61.                 } 
  62.                   //包装任务 
  63.                 tasks.add(new Callable<MessageStore>() { 
  64.                     //回调方法,方法将具体的加载验证分区数据 
  65. @Override 
  66.                     public MessageStore call() throws Exception { 
  67.                         log.warn("Loading data directory:" + subDir.getAbsolutePath() + "..."); 
  68.                         final String topic = name.substring(0, index); 
  69.                         final int partition = Integer.parseInt(name.substring(index + 1));                   //构造MessageStore实例的时候会自动加载验证数据,在初始化MessageStore实例的时候会给该MessageStore实例选择该topic的删除策略 
  70.                         final MessageStore messageStore = new MessageStore(topic, partition, metaConfig, 
  71.                                 MessageStoreManager.this.deletePolicySelector.select(topic, MessageStoreManager.this.deletePolicy)); 
  72.                         return messageStore; 
  73.                     } 
  74.                 }); 
  75.                 count++; 
  76.                 if (count % nThreads == 0 || count == ls.length) { 
  77. //如果配置了并行加载,则使用并行加载 
  78.                     if (metaConfig.isLoadMessageStoresInParallel()) { 
  79.                         this.loadStoresInParallel(executor, tasks); 
  80.                     } else { 
  81. //串行加载验证数据 
  82.                         this.loadStores(tasks); 
  83.                     } 
  84.                 } 
  85.             } 
  86.         } 
  87.         executor.shutdownNow(); 
  88.         log.warn("End to scan data path in " + (System.currentTimeMillis() - start) / 1000 + " secs"); 
  89.     } 
public void init() {		// 加载已有数据并校验		try {			this.loadMessageStores(this.metaConfig);		} catch (final IOException e) {			log.error("load message stores failed", e);			throw new MetamorphosisServerStartupException("Initilize message store manager failed", e);		} catch (InterruptedException e) {			Thread.currentThread().interrupt();		}		this.startScheduleDeleteJobs();	}//private Set<File> getDataDirSet(final MetaConfig metaConfig) throws IOException {		final Set<String> paths = new HashSet<String>();		// public data path      //公共数据目录		paths.add(metaConfig.getDataPath());		// topic data path      //私有数据目录		for (final String topic : metaConfig.getTopics()) {			final TopicConfig topicConfig = metaConfig.getTopicConfig(topic);			if (topicConfig != null) {				paths.add(topicConfig.getDataPath());			}		}		final Set<File> fileSet = new HashSet<File>();		for (final String path : paths) {			//验证数据目录是否存在			fileSet.add(this.getDataDir(path));		}		return fileSet;	}private void loadMessageStores(final MetaConfig metaConfig) throws IOException, InterruptedException {//加载数据目录列表,再加载每个目录下的数据		for (final File dir : this.getDataDirSet(metaConfig)) {			this.loadDataDir(metaConfig, dir);		}	}	private void loadDataDir(final MetaConfig metaConfig, final File dir) throws IOException, InterruptedException {		log.warn("Begin to scan data path:" + dir.getAbsolutePath());		final long start = System.currentTimeMillis();		final File[] ls = dir.listFiles();		int nThreads = Runtime.getRuntime().availableProcessors() + 1;		ExecutorService executor = Executors.newFixedThreadPool(nThreads);		int count = 0;      //将加载验证每个分区的数据包装成一个个任务		List<Callable<MessageStore>> tasks = new ArrayList<Callable<MessageStore>>();		for (final File subDir : ls) {			if (!subDir.isDirectory()) {				log.warn("Ignore not directory path:" + subDir.getAbsolutePath());			} else {				final String name = subDir.getName();				final int index = name.lastIndexOf('-');				if (index < 0) {					log.warn("Ignore invlaid directory:" + subDir.getAbsolutePath());					continue;				}	              //包装任务				tasks.add(new Callable<MessageStore>() {					//回调方法,方法将具体的加载验证分区数据@Override					public MessageStore call() throws Exception {						log.warn("Loading data directory:" + subDir.getAbsolutePath() + "...");						final String topic = name.substring(0, index);						final int partition = Integer.parseInt(name.substring(index + 1));                   //构造MessageStore实例的时候会自动加载验证数据,在初始化MessageStore实例的时候会给该MessageStore实例选择该topic的删除策略						final MessageStore messageStore = new MessageStore(topic, partition, metaConfig,								MessageStoreManager.this.deletePolicySelector.select(topic, MessageStoreManager.this.deletePolicy));						return messageStore;					}				});				count++;				if (count % nThreads == 0 || count == ls.length) {//如果配置了并行加载,则使用并行加载					if (metaConfig.isLoadMessageStoresInParallel()) {						this.loadStoresInParallel(executor, tasks);					} else {//串行加载验证数据						this.loadStores(tasks);					}				}			}		}		executor.shutdownNow();		log.warn("End to scan data path in " + (System.currentTimeMillis() - start) / 1000 + " secs");	}

在init方法中做的一件事情就是加载校验已有的数据,加载校验的方式有两种个,串行和并行。

Java代码 MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ  MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ 
  1. //串行加载验证数据,则在主线程上完成验证加载工作,其缺点是较慢,好处是不会打乱日志顺序 
  2. private void loadStores(List<Callable<MessageStore>> tasks) throws IOException, InterruptedException { 
  3.         for (Callable<MessageStore> task : tasks) { 
  4.             MessageStore messageStore; 
  5.             try { 
  6.                 messageStore = task.call(); 
  7.                 ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(messageStore.getTopic()); 
  8.                 if (map == null) { 
  9.                     map = new ConcurrentHashMap<Integer, MessageStore>(); 
  10.                     this.stores.put(messageStore.getTopic(), map); 
  11.                 } 
  12.                 map.put(messageStore.getPartition(), messageStore); 
  13.             } catch (IOException e) { 
  14.                 throw e; 
  15.             } catch (InterruptedException e) { 
  16.                 throw e; 
  17.             } catch (Exception e) { 
  18.                 throw new IllegalStateException(e); 
  19.             } 
  20.         } 
  21.         tasks.clear(); 
  22.     } 
  23.  
  24. //并行加载数据,当数据过多的时候,启动并行加载数据可以加快启动速度;但是会打乱启动的日志顺序,默认不启用。 
  25. private void loadStoresInParallel(ExecutorService executor, List<Callable<MessageStore>> tasks) throws InterruptedException { 
  26.         CompletionService<MessageStore> completionService = new ExecutorCompletionService<MessageStore>(executor); 
  27.         for (Callable<MessageStore> task : tasks) { 
  28.             completionService.submit(task); 
  29.         } 
  30.         for (int i = 0; i < tasks.size(); i++) { 
  31.             try { 
  32.                 //确保任务都已经运行完毕 
  33.                 MessageStore messageStore = completionService.take().get(); 
  34.  
  35.                 ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(messageStore.getTopic()); 
  36.                 if (map == null) { 
  37.                     map = new ConcurrentHashMap<Integer, MessageStore>(); 
  38.                     this.stores.put(messageStore.getTopic(), map); 
  39.                 } 
  40.                 map.put(messageStore.getPartition(), messageStore); 
  41.             } catch (ExecutionException e) { 
  42.                 throw ThreadUtils.launderThrowable(e); 
  43.             } 
  44.         } 
  45.         tasks.clear(); 
  46.     } 
//串行加载验证数据,则在主线程上完成验证加载工作,其缺点是较慢,好处是不会打乱日志顺序private void loadStores(List<Callable<MessageStore>> tasks) throws IOException, InterruptedException {		for (Callable<MessageStore> task : tasks) {			MessageStore messageStore;			try {				messageStore = task.call();				ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(messageStore.getTopic());				if (map == null) {					map = new ConcurrentHashMap<Integer, MessageStore>();					this.stores.put(messageStore.getTopic(), map);				}				map.put(messageStore.getPartition(), messageStore);			} catch (IOException e) {				throw e;			} catch (InterruptedException e) {				throw e;			} catch (Exception e) {				throw new IllegalStateException(e);			}		}		tasks.clear();	}//并行加载数据,当数据过多的时候,启动并行加载数据可以加快启动速度;但是会打乱启动的日志顺序,默认不启用。private void loadStoresInParallel(ExecutorService executor, List<Callable<MessageStore>> tasks) throws InterruptedException {		CompletionService<MessageStore> completionService = new ExecutorCompletionService<MessageStore>(executor);		for (Callable<MessageStore> task : tasks) {			completionService.submit(task);		}		for (int i = 0; i < tasks.size(); i++) {			try {                //确保任务都已经运行完毕				MessageStore messageStore = completionService.take().get();				ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(messageStore.getTopic());				if (map == null) {					map = new ConcurrentHashMap<Integer, MessageStore>();					this.stores.put(messageStore.getTopic(), map);				}				map.put(messageStore.getPartition(), messageStore);			} catch (ExecutionException e) {				throw ThreadUtils.launderThrowable(e);			}		}		tasks.clear();	}

MessageStoreManager关闭时调用dispose方法,确保资源都正确释放。

Java代码 MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ  MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ 
  1. public void dispose() { 
  2. //关闭调度器和调度池 
  3.         this.scheduledExecutorService.shutdown(); 
  4.         if (this.scheduler != null) { 
  5.             try { 
  6.                 this.scheduler.shutdown(true); 
  7.             } catch (final SchedulerException e) { 
  8.                 log.error("Shutdown quartz scheduler failed", e); 
  9.             } 
  10.         } 
  11. //确保每一个 MessageStore实例都正确关闭 
  12.         for (final ConcurrentHashMap<Integer/* partition */, MessageStore> subMap : MessageStoreManager.this.stores 
  13.                 .values()) { 
  14.             if (subMap != null) { 
  15.                 for (final MessageStore msgStore : subMap.values()) { 
  16.                     if (msgStore != null) { 
  17.                         try { 
  18.                             msgStore.close(); 
  19.                         } catch (final Throwable e) { 
  20.                             log.error("Try to run close  " + msgStore.getTopic() + "," + msgStore.getPartition() + " failed", e); 
  21.                         } 
  22.                     } 
  23.                 } 
  24.             } 
  25.         } 
  26. //清空stores列表 
  27.         this.stores.clear(); 
  28.     } 
public void dispose() { //关闭调度器和调度池		this.scheduledExecutorService.shutdown();		if (this.scheduler != null) {			try {				this.scheduler.shutdown(true);			} catch (final SchedulerException e) {				log.error("Shutdown quartz scheduler failed", e);			}		}//确保每一个 MessageStore实例都正确关闭		for (final ConcurrentHashMap<Integer/* partition */, MessageStore> subMap : MessageStoreManager.this.stores				.values()) {			if (subMap != null) {				for (final MessageStore msgStore : subMap.values()) {					if (msgStore != null) {						try {							msgStore.close();						} catch (final Throwable e) {							log.error("Try to run close  " + msgStore.getTopic() + "," + msgStore.getPartition() + " failed", e);						}					}				}			}		}//清空stores列表		this.stores.clear();	}

MessageStoreManager对外提供了获取的MessageStore的方法getMessageStore(final String topic, final int partition)和getOrCreateMessageStore(final String topic, final int partition) throws IOException。

getMessageStore()从stores列表查找对应的MessageStore,如果不存在则返回空;而getOrCreateMessage()则先检查对应的topic是否曾经配置,如果没有则抛出异常,如果有则判断stores是否已有MessageStore实例,如果没有,则生成MessageStore实例放入到stores列表并返回,如果有,则直接返回。

Java代码 MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ  MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ MetaQ技术内幕——源码分析(四)
            
    
    博客分类: MetaQ MetaQ 
  1. public MessageStore getMessageStore(final String topic, final int partition) { 
  2.         final ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic); 
  3.         if (map == null) { 
  4. //如果topic对应的MessageStore实例列表不存在,则直接返回null 
  5.             return null; 
  6.         } 
  7.         return map.get(partition); 
  8.     } 
  9.  
  10.     Collection<MessageStore> getMessageStoresByTopic(final String topic) { 
  11.         final ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic); 
  12.         if (map == null) { 
  13.             return Collections.emptyList(); 
  14.         } 
  15.         return map.values(); 
  16.     } 
  17.  
  18.     public MessageStore getOrCreateMessageStore(final String topic, final int partition) throws IOException { 
  19.         return this.getOrCreateMessageStoreInner(topic, partition, 0); 
  20.     } 
  21.  
  22.     public MessageStore getOrCreateMessageStore(final String topic, final int partition, final long offsetIfCreate) throws IOException { 
  23.         return this.getOrCreateMessageStoreInner(topic, partition, offsetIfCreate); 
  24.     } 
  25.  
  26.     private MessageStore getOrCreateMessageStoreInner(final String topic, final int partition, final long offsetIfCreate) throws IOException { 
  27.       //判断topic是否可用,即是否在topicsPatSet列表中 
  28.         if (!this.isLegalTopic(topic)) { 
  29.             throw new IllegalTopicException("The server do not accept topic " + topic); 
  30.         } 
  31. //判断分区号是否正确 
  32.         if (partition < 0 || partition >= this.getNumPartitions(topic)) { 
  33.             log.warn("Wrong partition " + partition + ",valid partitions (0," + (this.getNumPartitions(topic) - 1) + ")"); 
  34.             throw new WrongPartitionException("wrong partition " + partition); 
  35.         } 
  36.         ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic); 
  37. //如果topic对应的列表不存在,则生成列表,放进stores中 
  38.         if (map == null) { 
  39.             map = new ConcurrentHashMap<Integer, MessageStore>(); 
  40.             final ConcurrentHashMap<Integer/* partition */, MessageStore> oldMap = this.stores.putIfAbsent(topic, map); 
  41.             if (oldMap != null) { 
  42.                 map = oldMap; 
  43.             } 
  44.         } 
  45. //判断列表中是否有存在分区号位partition为的MessageStore实例,如果有,直接返回;如果没有,则生成实例并放进列表中 
  46.         MessageStore messageStore = map.get(partition); 
  47.         if (messageStore != null) { 
  48.             return messageStore; 
  49.         } else { 
  50.             // 对string加锁,特例 
  51.             synchronized (topic.intern()) { 
  52.                 messageStore = map.get(partition); 
  53.                 // double check 
  54.                 if (messageStore != null) { 
  55.                     return messageStore; 
  56.                 } 
  57.                 messageStore = new MessageStore(topic, partition, this.metaConfig, this.deletePolicySelector.select(topic, this.deletePolicy), offsetIfCreate); 
  58.                 log.info("Created a new message storage for topic=" + topic + ",partition=" + partition); 
  59.                 map.put(partition, messageStore); 
  60.             } 
  61.         } 
  62.         return messageStore; 
  63.     } 
  64.  
  65.     boolean isLegalTopic(final String topic) { 
  66.         for (final Pattern pat : this.topicsPatSet) { 
  67.             if (pat.matcher(topic).matches()) { 
  68.                 return true; 
  69.             } 
  70.         } 
  71.         return false; 
  72.     } 
public MessageStore getMessageStore(final String topic, final int partition) {		final ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic);		if (map == null) {//如果topic对应的MessageStore实例列表不存在,则直接返回null			return null;		}		return map.get(partition);	}	Collection<MessageStore> getMessageStoresByTopic(final String topic) {		final ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic);		if (map == null) {			return Collections.emptyList();		}		return map.values();	}	public MessageStore getOrCreateMessageStore(final String topic, final int partition) throws IOException {		return this.getOrCreateMessageStoreInner(topic, partition, 0);	}	public MessageStore getOrCreateMessageStore(final String topic, final int partition, final long offsetIfCreate) throws IOException {		return this.getOrCreateMessageStoreInner(topic, partition, offsetIfCreate);	}	private MessageStore getOrCreateMessageStoreInner(final String topic, final int partition, final long offsetIfCreate) throws IOException {      //判断topic是否可用,即是否在topicsPatSet列表中		if (!this.isLegalTopic(topic)) {			throw new IllegalTopicException("The server do not accept topic " + topic);		}//判断分区号是否正确		if (partition < 0 || partition >= this.getNumPartitions(topic)) {			log.warn("Wrong partition " + partition + ",valid partitions (0," + (this.getNumPartitions(topic) - 1) + ")");			throw new WrongPartitionException("wrong partition " + partition);		}		ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic);//如果topic对应的列表不存在,则生成列表,放进stores中		if (map == null) {			map = new ConcurrentHashMap<Integer, MessageStore>();			final ConcurrentHashMap<Integer/* partition */, MessageStore> oldMap = this.stores.putIfAbsent(topic, map);			if (oldMap != null) {				map = oldMap;			}		}//判断列表中是否有存在分区号位partition为的MessageStore实例,如果有,直接返回;如果没有,则生成实例并放进列表中		MessageStore messageStore = map.get(partition);		if (messageStore != null) {			return messageStore;		} else {			// 对string加锁,特例			synchronized (topic.intern()) {				messageStore = map.get(partition);				// double check				if (messageStore != null) {					return messageStore;				}				messageStore = new MessageStore(topic, partition, this.metaConfig, this.deletePolicySelector.select(topic, this.deletePolicy), offsetIfCreate);				log.info("Created a new message storage for topic=" + topic + ",partition=" + partition);				map.put(partition, messageStore);			}		}		return messageStore;	}	boolean isLegalTopic(final String topic) {		for (final Pattern pat : this.topicsPatSet) {			if (pat.matcher(topic).matches()) {				return true;			}		}		return false;	}

通过MessageStoreManager,我们把MessageStore和删除策略很好的组织在一起,并在MessageStoreManager提供定时提交的功能,提升了数据的可靠性;通过MessageStoreManager也为其他模块访问存储模块提供了接口。

我觉得MessageStoreManager设计不好的地方在于topicsPatSet,在topic列表发生变化的时候,没有先清空topicsPatSet,而是直接添加,而且没有对topic对应的MessageStore实例进行重新初始化,如果MessageStore实例已经存在,新删除策略配置不能生效。个人建议是一旦topic列表发生变化的时候,重新初始化整个存储模块,保证一致性。

至此, Broker的消息存储模块基本分析完毕。下一篇,进入Broker网络相关以及消息处理流程分析。

相关标签: MetaQ