欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Programming with JMeter-- ThreadGroup

程序员文章站 2022-04-30 08:46:16
...

       项目搞一段落,总算有时间回来继续。

      上一篇写到JMeterEngine会驱动JMeter ThreadGroup启动Test Threads 执行测试,其本身也是一个Runnable,这里把测试驱动(JUnit或者其他类似main之类的)看作主线程(main thread)的话, JMeterEngine作为第一层子线程(First Child Thread),所以如果项目代码基于很多模块(比如基于OSGI),需要进行ClassLoader调整的话可以重写JMeterEngine的runTest方法,例如.:

                @Override
		public void runTest() throws JMeterEngineException
		{
			try {
				tcClassloader = Thread.currentThread().getContextClassLoader();

				Future<?> f = DefaultExecutorService.getInstance().submit(this);
				f.get();
			}
			catch (Exception err) {
				stopTest();
				throw new JMeterEngineException(err);
			}
			finally {
				
			}
		}

                @Override
		public void run()
		{
			ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
			Thread.currentThread().setContextClassLoader(tcClassloader);
			try {
				super.run();
			}
			finally {
				Thread.currentThread().setContextClassLoader(oldCl);
			}
		}

 

       JMeterEngine从JMeter的数据结构HashTree中取得ThreadGroup之后,通过ThreadGroup的API: start / stopThread / waitThreadsStopped / tellThreadsToStop /  numberOfActiveThreads / verifyThreadsStopped 进行测试线程的开始 / 结束,并在相应的阶段通知一些监听器,而真正创建和启动Test Threads的地方是ThreadGroup。看ThreadGroup#start(int groupCount, ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine)方法,有两种启动线程的方式:

1) 如果是dealyedStartup,即HashTree中配置了ThreadGroup.delayedStart为true,ThreadGroup会交给内部类ThreadStart来创建/启动测试线程。ThreadStart作为一个单独的线程,会计算每一个测试线程的delay time,duration time,end time,根据这些time值创建 / 开始 / 结束 测试线程。

       我们知道JMeter中对线程Stop Condition 提供一些默认的选择:LoopController, IfController, RunTime等。当我们需要 ”执行测试 一段时间停止“ 这样的场景时,往往需要选择org.apache.jmeter.control.RunTime对每一个线程的循环进行控制。而如果在线程启动阶段endTime已经过时的话,就没有必要再启动测试线程了。

 

2) 若不是dealyedStartup,就创建并启动所有测试线程。

      

       ThreadGroup是一个很重要的扩展点,一放面实际项目往往有很多资源需要整合,例如项目中有自己实现统一的线程池,有特定的服务器等等,另一方面对于两种线程启动模式略显复杂,尤其是当出现class load出问题,我们需要控制线程上下文classloader的时候,所以实际项目中重写ThreadGroup往往合并一种模式就够了。以本人个例,需要整合weblogic统一的WorkManager进行线程调度,区分 Remote和Local 的测试线程,大概可以这样参考(略去部分业务相关代码)

 

 ThreadGroup#start:

        @Override
	public void start(int groupCount, ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine)
	{
		running = true;
		int numThreads = getNumThreads();
		int rampUp = getRampUp();
		double perThreadDelay = rampUp * 1000.0 / getNumThreads();
		log.info("Starting thread group number " + groupCount + " threads " + numThreads);

		final JMeterContext context = JMeterContextService.getContext();
		long now = System.currentTimeMillis();
		for (int i = 0; running && i < numThreads; i++) {
			final CustomJMeterThread jmThread = makeThread(groupCount, notifier, threadGroupTree, engine, i, context);
			jmThread.setInitialDelay((int) (i * perThreadDelay));
			scheduleThread(jmThread, now);
                        //ITestResourceHolder:自定义接口用来存放一些共用的测试变量,重写JMeterEngine时顺便实现了该接口,故可在此拿到很多需要的东西
			final ITestResourceHolder resourceHolder = (ITestResourceHolder) engine;
			Runnable runnable = null;
			switch ((TestType) resourceHolder.getResourceMap().get(TestType.class.getName())) {
				case LOCAL:
					runnable = new DaemonizableNamedRunnable() {
						@Override
						public String getName()
						{
							return jmThread.getThreadName();
						}
						@Override
						public boolean isDaemon()
						{
							return true;
						}
						@Override
						public void release()
						{}
						@Override
						public void run()
						{
							jmThread.run();
						}
					};
					break;
				case REMOTE:
					final Subject subj = Security.getCurrentSubject();
					runnable = new DaemonizableNamedRunnable() {
						@Override
						public String getName()
						{
							return jmThread.getThreadName();
						}
						@Override
						public boolean isDaemon()
						{
							return true;
						}
						@Override
						public void release()
						{}

						@Override
						public void run()
						{
							Security.runAs(subj, new PrivilegedAction<Void>() {

								@Override
								public Void run()
								{
									jmThread.run();
									return null;
								}
							});
						}
					};
					break;
			}
			Future<?> f = DefaultExecutorService.getInstance().submit(runnable);
			allThreads.put(jmThread, f);
		}
		log.info("Started thread group number " + groupCount);
	}

 

      start已经重写,stop之类相关的也要相应重写:

     

/* (non-Javadoc)
	* @see org.apache.jmeter.threads.ThreadGroup#stop()
	*/
	@Override
	public void stop()
	{
		running = false;
		for (JMeterThread item : allThreads.keySet()) {
			item.stop();
		}
	}

	/* (non-Javadoc)
	 * @see org.apache.jmeter.threads.ThreadGroup#tellThreadsToStop()
	 */
	@Override
	public void tellThreadsToStop()
	{
		running = false;
		for (Entry<JMeterThread, Future<?>> entry : allThreads.entrySet()) {
			JMeterThread item = entry.getKey();
			item.stop(); // set stop flag
			item.interrupt(); // interrupt sampler if possible
			Future<?> f = entry.getValue();
			if (f != null) {
				f.cancel(true);
				allThreads.remove(item);
			}
		}
	}

/* (non-Javadoc)
	 * @see org.apache.jmeter.threads.ThreadGroup#waitThreadsStopped()
	 */
	@Override
	public void waitThreadsStopped()
	{
		for (Future<?> f : allThreads.values()) {
			try {
				if (getDuration() > 0) {
					f.get(getDuration(), TimeUnit.SECONDS);
				}
				else {
					f.get();
				}
			}
			catch (InterruptedException e) {
				return;
			}
			catch (ExecutionException e) {
				log.error("Exception occurred when try to retrive future value from JMeterThread ", e);
			}
			catch (TimeoutException e) {
				tellThreadsToStop();
				return;
			}
		}
		allThreads.clear();
	}