欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

调用链跨线程传递ThreadLocal对象

程序员文章站 2022-03-24 15:26:07
...

在全链路跟踪框架中,Trace信息的传递功能是基于ThreadLocal的。但实际业务中可能会使用异步调用,这样就会丢失Trace信息,破坏了链路的完整性。

在同一线程中trace信息的传递流程使用代码模拟如下:

ThreadLocal<String> traceContext = new ThreadLocal<>();

String traceId = Tracer.startServer();
traceContext.set(traceId) //生成trace信息 传入threadlocal
...
Tracer.startClient(traceContext.get()); //从threadlocal获取trace信息
Tracer.endClient();
...
Tracer.endServer();

那么显然如果是异步线程的话,下一个Span拿不到上一个Span的trace信息,就会造成调用链跟踪断了。那么怎么才能在异步的情况下传递ThreadLocal对象呢。

InheritableThreadLocal

如果仅仅是父子之间传递ThreadLocal对象的话,JDK自身就有实现InheritableThreadLocal。

Thread内部为InheritableThreadLocal开辟了一个单独的ThreadLocalMap。在父线程创建一个子线程的时候,会检查这个ThreadLocalMap是否为空,不为空则会浅拷贝给子线程的ThreadLocalMap。

Thread的init相关逻辑如下:

if (parent.inheritableThreadLocals != null)
    this.inheritableThreadLocals =        ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

赋值拷贝代码如下:

 private ThreadLocalMap(ThreadLocalMap parentMap) {
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            setThreshold(len);
            table = new Entry[len];

            for (int j = 0; j < len; j++) {
                Entry e = parentTable[j];
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        int h = key.threadLocalHashCode & (len - 1);
                        while (table[h] != null)
                            h = nextIndex(h, len);
                        table[h] = c;
                        size++;
                    }
                }
            }
        }

需要注意的是,拷贝为浅拷贝。父子线程的 ThreadLocalMap 内的 key 都指向同一个 InheritableThreadLocal 对象,Value 也指向同一个 Value。子线程的Value更改可以覆盖父线程的Value。

这样一来InheritableThreadLocal让我们可以在父线程创建子线程的时候将ThreadLocal中的值传递给子线程。但在大部分场景下,业务应用都会使用线程池。而在这种复用线程的池化场景中,线程池中的线程和主线程却都不是父子线程的关系,并不能直接使用InheritableThreadLocal。

Transmittable ThreadLocal

Transmittable ThreadLocal是阿里开源的库,继承了InheritableThreadLocal,优化了在使用线程池等会池化复用线程的情况下传递ThreadLocal的使用。

简单来说,有个专门的TtlRunnable和TtlCallable包装类,用于读取原Thread的ThreadLocal对象及值并存于Runnable/Callable中,在执行run或者call方法的时候再将存于Runnable/Callable中的ThreadLocal对象和值读取出来,存入调用run或者call的线程中。

以TtlRunnable为例,构造函数如下:

private final AtomicReference<Object> capturedRef;
private final Runnable runnable;
private final boolean releaseTtlValueReferenceAfterRun;

private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
    //从父类capture复制到本类
    this.capturedRef = new AtomicReference<>(capture());
    this.runnable = runnable; //提交的runnable对象
    this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}

capture函数的复制过程如下:

@Nonnull
        public static Object capture() {
            Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
            for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
                captured.put(threadLocal, threadLocal.copyValue());
            }
            return captured;
        }

其中holder记录了当前 Thread 绑定了哪些 TransmittableThreadLocal 对象。captured保存了父线程ThreadLocal的值。

接着任务提交到线程池,线程开始运行时,取出保存在captured中的父线程ThreadLocal值并重新set。即将父线程值传递到了任务执行时。

@Override
public void run() {
    Object captured = capturedRef.get();
    if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
        throw new IllegalStateException("TTL value reference is released after run!");
    }

    Object backup = replay(captured);
    try {
        runnable.run();
    } finally {
        restore(backup);
    }
}

这样TransmittableThreadLocal就解决了在线程池场景下的ThreadLocal对象传递。整个流程图如下:
调用链跨线程传递ThreadLocal对象

调用链跨线程传递trace信息

有了TransmittableThreadLocal作为基础,调用链跨线程传递trace信息也不再困难,只需将trace信息均存于TransmittableThreadLocal中,使用异步线程池时使用Ttl相关类修饰即可。模拟代码如下:

public void testAsync() {
    ExecutorService ttlExecutorService = TtlExecutors.getTtlExecutorService(executorService);
    String traceId = Tracer.startServer(); //父线程的traceId
    ThreadLocal<String> traceContext = new TransmittableThreadLocal<>(); 
    traceContext.set(traceId); //存入TransmittableThreadLocal
    ttlExecutorService.submit(new Runnable() {
        @Override
        public void run() {
            //runnable执行中获取当前线程的traceId与父线程的traceId一致
            String childTraceId = traceContext.get();
            Assert.assertEquals(childTraceId, traceId);
            Tracer.startClient(traceId);
            Tracer.endClient();
        }
    });
    Tracer.endServer();
}

使用Java Agent植入修饰代码

以上所有使用需要业务代码去改动自己的线程池类,runnable或者callable类。而使用Java Agent实现线程池的传递是透明的,可以做到应用代码无侵入。

Java Agent(Instrumentation)是JDK1.5引入的技术,基于JVM TI机制,使得开发者可以构建一个独立于应用程序的代理(Agent),用来监测和协助运行在 JVM 上的程序,以及替换和修改某些类的定义。开发者可以在一个普通 Java 程序运行时,通过 – javaagent 参数指定一个特定的 jar 文件(包含 Instrumentation 代理)来启动相应的代理程序,植入自己扩展的修饰代码以实现功能。

在TransmittableThreadLocal中,相关Agent的源码分析如下:

//需要通过agent插入Executor类中的某个方法
private static void updateMethodOfExecutorClass(final CtClass clazz, final CtMethod method) throws NotFoundException, CannotCompileException {
    if (method.getDeclaringClass() != clazz) {
        return;
    }
    //插入的方法需要Public并且非静态
    final int modifiers = method.getModifiers();
    if (!Modifier.isPublic(modifiers) || Modifier.isStatic(modifiers)) {
        return;
    } 
    //获取该方法的参数类型存入parameterTypes数组中
    CtClass[] parameterTypes = method.getParameterTypes();
    StringBuilder insertCode = new StringBuilder(); 

    //根据参数类型顺序,进行代码格式化插入
    for (int i = 0; i < parameterTypes.length; i++) {
        CtClass paraType = parameterTypes[i];
        
        //区分Runnable/Callable
        if (RUNNABLE_CLASS_NAME.equals(paraType.getName())) {
            String code = String.format("$%d = %s.get($%d, false, true);", i + 1, TTL_RUNNABLE_CLASS_NAME, i + 1);
            logger.info("insert code before method " + method + " of class " + method.getDeclaringClass().getName() + ": " + code);
            insertCode.append(code);
        } else if (CALLABLE_CLASS_NAME.equals(paraType.getName())) {
            String code = String.format("$%d = %s.get($%d, false, true);", i + 1, TTL_CALLABLE_CLASS_NAME, i + 1);
            logger.info("insert code before method " + method + " of class " + method.getDeclaringClass().getName() + ": " + code);
            insertCode.append(code);
        }
    }
    
    //调用insertBefore()完成代码插入
    if (insertCode.length() > 0) {
        method.insertBefore(insertCode.toString());
    }
}

将封装好的TransmittableThreadLocal Jar包放在类目录下的某个文件夹下,例如agent,那么只需在启动参数加入:-javaagent:agent/transmittable-thread-local-xxx.jar即可完成修饰代码的植入。

相关标签: 分布式系统