欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Programming with JMeter-- JMeterThread

程序员文章站 2022-03-02 11:45:36
...

到上一篇ThreadGroup为止,基本上在项目中可以完成对JMeterEngine / ThreadGroup 的整合,现在要来分析下JMeterThread,以便更好的集成到项目中。

        一个 JMeterThread 好比一个vu(虚拟用户),他的创建和启动已经在ThreadGroup这一篇中完整的分析,我们是将JMeterThread委托给一个叫做DaemonizableNamedRunnable(这是因为我需要整合 weblogic workmanager 统一管理线程池)的实例来运行的, 附DaemonizableNamedRunnable代码

//remote case
runnable = new DaemonizableNamedRunnable() {

						@Override
						public String getName()
						{
							return jmThread.getThreadName();
						}

						@Override
						public boolean isDaemon()
						{
							return true;
						}

						@Override
						public void release()
						{
						}

						@Override
						public void run()
						{
							Security.runAs(subj, new PrivilegedAction<Void>() {

								@Override
								public Void run()
								{
									jmThread.run();
									return null;
								}
							});
						}
					};

//Local Case
runnable = new DaemonizableNamedRunnable() {

						@Override
						public String getName()
						{
							return jmThread.getThreadName();
						}

						@Override
						public boolean isDaemon()
						{
							return true;
						}

						@Override
						public void release()
						{
						}

						@Override
						public void run()
						{
							jmThread.run();
						}
					};

 

       也就是说外围的一些工作都在ThreadGroup里完成了, 现在看看其内部的结构,首先看下大概的类图(不全,JMeterEngine没有画进去):

Programming with JMeter-- JMeterThread
            
    
    博客分类: JMeter JMeterThreadThreadGroup  结合初始化创建代码:

         

//ThreadGroup创建JMeterThread
private CustomJMeterThread makeThread(int groupCount, ListenerNotifier notifier, ListedHashTree threadGroupTree,
			StandardJMeterEngine engine, int i, JMeterContext context)
	{ // N.B. Context needs to be fetched in the correct thread
		boolean onErrorStopTest = getOnErrorStopTest();
		boolean onErrorStopTestNow = getOnErrorStopTestNow();
		boolean onErrorStopThread = getOnErrorStopThread();
		boolean onErrorStartNextLoop = getOnErrorStartNextLoop();
		String groupName = getName();
		final CustomJMeterThread jmeterThread = new CustomJMeterThread(cloneTree(threadGroupTree), this, notifier);
		jmeterThread.setThreadNum(i);
		jmeterThread.setThreadGroup(this);//持有线程所属的 ThreadGroup 实例
		jmeterThread.setInitialContext(context);
		final String threadName = groupName + " " + groupCount + "-" + (i + 1);
		jmeterThread.setThreadName(threadName);
		jmeterThread.setEngine(engine); //持有一个 当前运行的 JMeterEngine 实例,这个以后会有用
		jmeterThread.setOnErrorStopTest(onErrorStopTest);
		jmeterThread.setOnErrorStopTestNow(onErrorStopTestNow);
		jmeterThread.setOnErrorStopThread(onErrorStopThread);
		jmeterThread.setOnErrorStartNextLoop(onErrorStartNextLoop);
		return jmeterThread;
	}

//JMeterThread 构造方法
public JMeterThread(HashTree test, JMeterThreadMonitor monitor, ListenerNotifier note) {
        this.monitor = monitor;
        threadVars = new JMeterVariables();
        testTree = test;
        compiler = new TestCompiler(testTree);
        controller = (Controller) testTree.getArray()[0];
        SearchByClass<TestIterationListener> threadListenerSearcher = new SearchByClass<TestIterationListener>(TestIterationListener.class); // TL - IS
        test.traverse(threadListenerSearcher);
        testIterationStartListeners = threadListenerSearcher.getSearchResults();
        notifier = note;
        running = true;
    }

//JMeterThread保存线程上下文变量
public void setInitialContext(JMeterContext context) {
        threadVars.putAll(context.getVariables());
    }
 
            非常清晰了,JMeterThread的创建和初始化基本上针对以下几点:
1) onError 设置,方便在出错时响应
2) 设置线程所在的ThreadGroup 和 JMeterEngine
3) 保存好上下文到 线程 Local 的 JMeterVariables 
4) 保存好测试数据结构 HashTree, 解析出controller, listener. 之前我们提到过JMeter 提供了注入 LoopController / Runtime 来控制线程的循环,和注入ResultCollector之类的Listener来收集测试结果。
         初始化完毕,我们再来看运行过程,即run()方法:
 @Override
    public void run() {
        // threadContext is not thread-safe, so keep within thread
        JMeterContext threadContext = JMeterContextService.getContext();
        LoopIterationListener iterationListener=null;

        try {
            iterationListener = initRun(threadContext);
            while (running) {
                Sampler sam = controller.next();
                while (running && sam != null) {
                	process_sampler(sam, null, threadContext);
                	threadContext.cleanAfterSample();
                	if(onErrorStartNextLoop || threadContext.isRestartNextLoop()) {
                	    if(threadContext.isRestartNextLoop()) {
                            triggerEndOfLoopOnParentControllers(sam, threadContext);
                            sam = null;
                            threadContext.getVariables().put(LAST_SAMPLE_OK, TRUE);
                            threadContext.setRestartNextLoop(false);
                	    } else {
                    		boolean lastSampleFailed = !TRUE.equals(threadContext.getVariables().get(LAST_SAMPLE_OK));
                    		if(lastSampleFailed) {
    	                		if(log.isDebugEnabled()) {
    	                    		log.debug("StartNextLoop option is on, Last sample failed, starting next loop");
    	                    	}
    	                    	triggerEndOfLoopOnParentControllers(sam, threadContext);
    	                        sam = null;
    	                        threadContext.getVariables().put(LAST_SAMPLE_OK, TRUE);
                    		} else {
                    			sam = controller.next();
                    		}
                	    }
                	} 
                	else {
                		sam = controller.next();
                	}
                }
                if (controller.isDone()) {
                    running = false;
                }
            }
        }
        // Might be found by contoller.next()
        catch (JMeterStopTestException e) {
            log.info("Stopping Test: " + e.toString());
            stopTest();
        }
        catch (JMeterStopTestNowException e) {
            log.info("Stopping Test Now: " + e.toString());
            stopTestNow();
        } catch (JMeterStopThreadException e) {
            log.info("Stop Thread seen: " + e.toString());
        } catch (Exception e) {
            log.error("Test failed!", e);
        } catch (ThreadDeath e) {
            throw e; // Must not ignore this one
        } catch (Error e) {// Make sure errors are output to the log file
            log.error("Test failed!", e);
        } finally {
            currentSampler = null; // prevent any further interrupts
            try {
                interruptLock.lock();  // make sure current interrupt is finished, prevent another starting yet
                threadContext.clear();
                log.info("Thread finished: " + threadName);
                threadFinished(iterationListener);
                monitor.threadFinished(this); // Tell the monitor we are done
                JMeterContextService.removeContext(); // Remove the ThreadLocal entry
            }
            finally {
                interruptLock.unlock(); // Allow any pending interrupt to complete (OK because currentSampler == null)
            }
        }
    }
 
          代码结构很清晰: 

 

1) initRun(代码省略):  
         初始化Thread Context (主要是初始化好当前 Thread / threadgroup / engine 实例, 线程上下文变量, 线程数,线程循环控制器,监听器),
         执行 线程正式运行前的delay(若有thread rampup 策略的话)工作
         Thread计数,numberOfActiveThreads / numberOfThreadsStarted / numberOfThreads
2)循环运行,直到 running 为 false(一般manually 停止测试就必须会设置该flag为false) OR  controller.next()为null,也就是说配置的循环控制器执行完毕
3)finally 清理线程:这里有行比较重要的代码 “monitor.threadFinished(this);” ,这个monitor是什么呢? 往下看。(Q1)
        中间循环运行比较复杂的是process_sampler(sam, null, threadContext); 这个方法会真正触发客户端的测试代码的地方,代码很长就略去了,核心的逻辑大概如下:
1)分析Sampler是否是TransactionSampler, 若是解析Transaction 范围的一组Sampler Pack 进行递归process
2)Sampler.sample(Entry e)方法,产生一个SampleResult
3)把result包装成一个SampleEvent 并通知所有的监听器
         所有的 SampleListener #sampleOccur 方法被触发, ResultCollector 就会有相应的措施来保存测试结果。
接口 Sampler(主要有JavaSampler / TransationSampler / BeanShellSampler 等), SampleListener (最终要的是ResultCollector,另外还有ResultAction,Summariser等)代码:
public interface Sampler extends Serializable, TestElement {
    /**
     * Obtains statistics about the given Entry, and packages the information
     * into a SampleResult.
     */
    SampleResult sample(Entry e);
}

public interface SampleListener {
    /**
     * A sample has started and stopped.
     */
    void sampleOccurred(SampleEvent e);

    /**
     * A sample has started.
     */
    void sampleStarted(SampleEvent e);

    /**
     * A sample has stopped.
     */
    void sampleStopped(SampleEvent e);
}
 
        留下问题Q1:  monitor.threadFinished(this);” ,这个monitor是什么呢?
           这个monitor其实就当前Thread 所在的 ThreadGroup,这里相当与对ThreadGroup的一个回调,如果有些资源清理必须在同一个线程中做的话就会很有用,比如一个case,我的测试代码中需要一个JNDI Context 进行lookup,那么这个JNDI Context 的close工作应该是在创建他的 同一个线程中去做,那么线程结束的时候去清理是最好不过了,所以可以重写ThreadGroup的threadFinished(JMeterThread)方法:
@Override
	public void threadFinished(JMeterThread thread)
	{
		super.threadFinished(thread);
		Context ctx = (Context) (thread instanceof ITestResourceHolder ? ((ITestResourceHolder) thread).getResourceMap().get(
				JMeterTestResource.JNDI_CONTEXT) : null);
		if (ctx != null) {
			try {
				log.debug("Thread " + Thread.currentThread().getName() + " closing InitialContext " + ctx);
				ctx.close();
			}
			catch (NamingException e) {
				log.error("Exception occurred when try to close the InitialContext ", e);
			}
			finally {
				((ITestResourceHolder) thread).getResourceMap().clear();
			}
		}
	}
    
            和JMeterEngine一样,我在扩展JMeterThread时顺便实现了ITestResourceHolder接口用来维护额外的线程变量(比如JNDIContext).
  • Programming with JMeter-- JMeterThread
            
    
    博客分类: JMeter JMeterThreadThreadGroup 
  • 大小: 54.9 KB