调用链跨线程传递ThreadLocal对象
在全链路跟踪框架中,Trace信息的传递功能是基于ThreadLocal的。但实际业务中可能会使用异步调用,这样就会丢失Trace信息,破坏了链路的完整性。
在同一线程中trace信息的传递流程使用代码模拟如下:
ThreadLocal<String> traceContext = new ThreadLocal<>();
String traceId = Tracer.startServer();
traceContext.set(traceId) //生成trace信息 传入threadlocal
...
Tracer.startClient(traceContext.get()); //从threadlocal获取trace信息
Tracer.endClient();
...
Tracer.endServer();
那么显然如果是异步线程的话,下一个Span拿不到上一个Span的trace信息,就会造成调用链跟踪断了。那么怎么才能在异步的情况下传递ThreadLocal对象呢。
InheritableThreadLocal
如果仅仅是父子之间传递ThreadLocal对象的话,JDK自身就有实现InheritableThreadLocal。
Thread内部为InheritableThreadLocal开辟了一个单独的ThreadLocalMap。在父线程创建一个子线程的时候,会检查这个ThreadLocalMap是否为空,不为空则会浅拷贝给子线程的ThreadLocalMap。
Thread的init相关逻辑如下:
if (parent.inheritableThreadLocals != null)
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
赋值拷贝代码如下:
private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];
for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}
需要注意的是,拷贝为浅拷贝。父子线程的 ThreadLocalMap 内的 key 都指向同一个 InheritableThreadLocal 对象,Value 也指向同一个 Value。子线程的Value更改可以覆盖父线程的Value。
这样一来InheritableThreadLocal让我们可以在父线程创建子线程的时候将ThreadLocal中的值传递给子线程。但在大部分场景下,业务应用都会使用线程池。而在这种复用线程的池化场景中,线程池中的线程和主线程却都不是父子线程的关系,并不能直接使用InheritableThreadLocal。
Transmittable ThreadLocal
Transmittable ThreadLocal是阿里开源的库,继承了InheritableThreadLocal,优化了在使用线程池等会池化复用线程的情况下传递ThreadLocal的使用。
简单来说,有个专门的TtlRunnable和TtlCallable包装类,用于读取原Thread的ThreadLocal对象及值并存于Runnable/Callable中,在执行run或者call方法的时候再将存于Runnable/Callable中的ThreadLocal对象和值读取出来,存入调用run或者call的线程中。
以TtlRunnable为例,构造函数如下:
private final AtomicReference<Object> capturedRef;
private final Runnable runnable;
private final boolean releaseTtlValueReferenceAfterRun;
private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
//从父类capture复制到本类
this.capturedRef = new AtomicReference<>(capture());
this.runnable = runnable; //提交的runnable对象
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
capture函数的复制过程如下:
@Nonnull
public static Object capture() {
Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
captured.put(threadLocal, threadLocal.copyValue());
}
return captured;
}
其中holder记录了当前 Thread 绑定了哪些 TransmittableThreadLocal 对象。captured保存了父线程ThreadLocal的值。
接着任务提交到线程池,线程开始运行时,取出保存在captured中的父线程ThreadLocal值并重新set。即将父线程值传递到了任务执行时。
@Override
public void run() {
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
Object backup = replay(captured);
try {
runnable.run();
} finally {
restore(backup);
}
}
这样TransmittableThreadLocal就解决了在线程池场景下的ThreadLocal对象传递。整个流程图如下:
调用链跨线程传递trace信息
有了TransmittableThreadLocal作为基础,调用链跨线程传递trace信息也不再困难,只需将trace信息均存于TransmittableThreadLocal中,使用异步线程池时使用Ttl相关类修饰即可。模拟代码如下:
public void testAsync() {
ExecutorService ttlExecutorService = TtlExecutors.getTtlExecutorService(executorService);
String traceId = Tracer.startServer(); //父线程的traceId
ThreadLocal<String> traceContext = new TransmittableThreadLocal<>();
traceContext.set(traceId); //存入TransmittableThreadLocal
ttlExecutorService.submit(new Runnable() {
@Override
public void run() {
//runnable执行中获取当前线程的traceId与父线程的traceId一致
String childTraceId = traceContext.get();
Assert.assertEquals(childTraceId, traceId);
Tracer.startClient(traceId);
Tracer.endClient();
}
});
Tracer.endServer();
}
使用Java Agent植入修饰代码
以上所有使用需要业务代码去改动自己的线程池类,runnable或者callable类。而使用Java Agent实现线程池的传递是透明的,可以做到应用代码无侵入。
Java Agent(Instrumentation)是JDK1.5引入的技术,基于JVM TI机制,使得开发者可以构建一个独立于应用程序的代理(Agent),用来监测和协助运行在 JVM 上的程序,以及替换和修改某些类的定义。开发者可以在一个普通 Java 程序运行时,通过 – javaagent 参数指定一个特定的 jar 文件(包含 Instrumentation 代理)来启动相应的代理程序,植入自己扩展的修饰代码以实现功能。
在TransmittableThreadLocal中,相关Agent的源码分析如下:
//需要通过agent插入Executor类中的某个方法
private static void updateMethodOfExecutorClass(final CtClass clazz, final CtMethod method) throws NotFoundException, CannotCompileException {
if (method.getDeclaringClass() != clazz) {
return;
}
//插入的方法需要Public并且非静态
final int modifiers = method.getModifiers();
if (!Modifier.isPublic(modifiers) || Modifier.isStatic(modifiers)) {
return;
}
//获取该方法的参数类型存入parameterTypes数组中
CtClass[] parameterTypes = method.getParameterTypes();
StringBuilder insertCode = new StringBuilder();
//根据参数类型顺序,进行代码格式化插入
for (int i = 0; i < parameterTypes.length; i++) {
CtClass paraType = parameterTypes[i];
//区分Runnable/Callable
if (RUNNABLE_CLASS_NAME.equals(paraType.getName())) {
String code = String.format("$%d = %s.get($%d, false, true);", i + 1, TTL_RUNNABLE_CLASS_NAME, i + 1);
logger.info("insert code before method " + method + " of class " + method.getDeclaringClass().getName() + ": " + code);
insertCode.append(code);
} else if (CALLABLE_CLASS_NAME.equals(paraType.getName())) {
String code = String.format("$%d = %s.get($%d, false, true);", i + 1, TTL_CALLABLE_CLASS_NAME, i + 1);
logger.info("insert code before method " + method + " of class " + method.getDeclaringClass().getName() + ": " + code);
insertCode.append(code);
}
}
//调用insertBefore()完成代码插入
if (insertCode.length() > 0) {
method.insertBefore(insertCode.toString());
}
}
将封装好的TransmittableThreadLocal Jar包放在类目录下的某个文件夹下,例如agent,那么只需在启动参数加入:-javaagent:agent/transmittable-thread-local-xxx.jar
即可完成修饰代码的植入。
上一篇: SpringMVC返回类型
下一篇: Kafka最佳实践