欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

多线程使用注意

程序员文章站 2022-05-27 14:56:01
命名 来源:https://www.cnblogs.com/guozp/p/10344446.html 我们在创建线程池的时候,一定要给线程池名字,如下这种写法,线程是默认直接生成的: 那从这里我们就知道,我们通过submit传递进去的Runnale,最后在FutureTask的run方法里面调用的 ......

命名

来源:https://www.cnblogs.com/guozp/p/10344446.html

  • 我们在创建线程池的时候,一定要给线程池名字,如下这种写法,线程是默认直接生成的:

    public static void main(string[] args) {
            executorservice executorservice = executors.newfixedthreadpool(3);
            for (int i = 0; i < 10; i++) {
                final int finali = i;
                executorservice.execute(new runnable() {
                    @override
                    public void run() {
                        system.out.println(thread.currentthread().getname() + ":" + finali);
                    }
                });
            }       
         }  

    最后的输出:

    pool-1-thread-3:2
    pool-1-thread-2:1
    pool-1-thread-3:4
    pool-1-thread-1:3
    pool-1-thread-3:6
    pool-1-thread-2:5
    pool-1-thread-3:8
    pool-1-thread-1:7
    pool-1-thread-2:9
  • executors中有默认的线程工厂的实现:

    static class defaultthreadfactory implements threadfactory {
            private static final atomicinteger poolnumber = new atomicinteger(1);
            private final threadgroup group;
            private final atomicinteger threadnumber = new atomicinteger(1);
            private final string nameprefix;
    
            defaultthreadfactory() {
                securitymanager s = system.getsecuritymanager();
                group = (s != null) ? s.getthreadgroup() :
                                      thread.currentthread().getthreadgroup();
                nameprefix = "pool-" +
                              poolnumber.getandincrement() +
                             "-thread-";
            }
    
            public thread newthread(runnable r) {
                thread t = new thread(group, r,
                                      nameprefix + threadnumber.getandincrement(),
                                      0);
                if (t.isdaemon())
                    t.setdaemon(false);
                if (t.getpriority() != thread.norm_priority)
                    t.setpriority(thread.norm_priority);
                return t;
            }
        }
  • 我们可以改造一下

     public class namedthreadfactory implements threadfactory {
        private final atomicinteger threadnumber;
        private final string name;
        private final boolean isdaemon;
    
        public namedthreadfactory(string name) {
            this(name, false);
        }
    
        public namedthreadfactory(string name, boolean daemon) {
            this.threadnumber = new atomicinteger(1);
            this.isdaemon = daemon;
            this.name = name + "-thread-pool-";
        }
    
        public thread newthread(runnable r) {
            thread t = new thread(r, this.name + this.threadnumber.getandincrement());
            t.setdaemon(this.isdaemon);
            if (t.getpriority() != thread.norm_priority){
                t.setpriority(thread.norm_priority);
            }
            return t;
        }
    }

    那我们看下改造之后的输出结果:

    有名字的线程池-thread-pool-1:0
    有名字的线程池-thread-pool-3:2
    有名字的线程池-thread-pool-1:3
    有名字的线程池-thread-pool-2:1
    有名字的线程池-thread-pool-1:5
    有名字的线程池-thread-pool-1:7
    有名字的线程池-thread-pool-1:8
    有名字的线程池-thread-pool-3:4
    有名字的线程池-thread-pool-1:9
    有名字的线程池-thread-pool-2:6

    这样的话,当我们应用线上出现问题,需要通过jstack查看线程堆栈的时候,就可以知道是哪些线程出现的问题,否则看到的都是统一的命名方式,看到都是清一色的线程,增加排查问题的难度

thread异常处理

java中线程执行的任务接口java.lang.runnable 要求不抛出checked异常,

public interface runnable {

    public abstract void run();
}

那么如果 run() 方法中抛出了runtimeexception,将会怎么处理了?

线程出现未捕获异常后,jvm将调用thread中的dispatchuncaughtexception方法把异常传递给线程的未捕获异常处理器

private void dispatchuncaughtexception(throwable e) {
    getuncaughtexceptionhandler().uncaughtexception(this, e);
}

public uncaughtexceptionhandler getuncaughtexceptionhandler() {
    return uncaughtexceptionhandler != null ?
        uncaughtexceptionhandler : group;
}

thread中存在两个uncaughtexceptionhandler。一个是静态的defaultuncaughtexceptionhandler,另一个是非静态uncaughtexceptionhandler。

// null unless explicitly set
private volatile uncaughtexceptionhandler uncaughtexceptionhandler;

// null unless explicitly set
private static volatile uncaughtexceptionhandler defaultuncaughtexceptionhandler;
  • defaultuncaughtexceptionhandler:设置一个静态的默认的uncaughtexceptionhandler。来自所有线程中的exception在抛出并且未捕获的情况下,都会从此路过。进程fork的时候设置的就是这个静态的defaultuncaughtexceptionhandler,管辖范围为整个进程
  • uncaughtexceptionhandler:为单个线程设置一个属于线程自己的uncaughtexceptionhandler,辖范围比较小。

如果没有设置uncaughtexceptionhandler,将使用线程所在的线程组来处理这个未捕获异常。线程组threadgroup实现了uncaughtexceptionhandler,所以可以用来处理未捕获异常。threadgroup类定义:

private threadgroup group;

class threadgroup implements thread.uncaughtexceptionhandler{}

threadgroup实现的uncaughtexception如下:

public void uncaughtexception(thread t, throwable e) {
    if (parent != null) {
        parent.uncaughtexception(t, e);
    } else {
        thread.uncaughtexceptionhandler ueh =
            thread.getdefaultuncaughtexceptionhandler();
        if (ueh != null) {
            ueh.uncaughtexception(t, e);
        } else if (!(e instanceof threaddeath)) {
            system.err.print("exception in thread \""
                             + t.getname() + "\" ");
            e.printstacktrace(system.err);
        }
    }
}

默认情况下,线程组处理未捕获异常的逻辑是,首先将异常消息通知给父线程组,然后尝试利用一个默认的defaultuncaughtexceptionhandler来处理异常,如果没有默认的异常处理器则将错误信息输出到system.err。也就是jvm提供给我们设置每个线程的具体的未捕获异常处理器,也提供了设置默认异常处理器的方法,通常java.lang.thread对象运行设置一个默认的异常处理方法:

public static void setdefaultuncaughtexceptionhandler(uncaughtexceptionhandler eh) {
        securitymanager sm = system.getsecuritymanager();
        if (sm != null) {
            sm.checkpermission(
                new runtimepermission("setdefaultuncaughtexceptionhandler")
                    );
        }
    
         defaultuncaughtexceptionhandler = eh;
     }
                

而这个默认的静态全局的异常捕获方法是直接输出异常堆栈。
当然,我们可以覆盖此默认实现,只需要实现java.lang.thread.uncaughtexceptionhandler接口即可

public interface uncaughtexceptionhandler {

    void uncaughtexception(thread t, throwable e);
}

submit异常吞并

  • 我们平时都是通过submit来提交一个callable,那如果提交的是runnable呢,为方便起见我们核心的代码都放在一起了
public future<?> submit(runnable task) {
    if (task == null) throw new nullpointerexception();
    runnablefuture<void> ftask = newtaskfor(task, null);
    execute(ftask);
    return ftask;
}

protected <t> runnablefuture<t> newtaskfor(runnable runnable, t value) {
    return new futuretask<t>(runnable, value);
}

public futuretask(runnable runnable, v result) {
    this.callable = executors.callable(runnable, result);
    this.state = new;       // ensure visibility of callable
}
public static <t> callable<t> callable(runnable task, t result) {
    if (task == null)
        throw new nullpointerexception();
    return new runnableadapter<t>(task, result);
}
//最终futuretask中的callable指向的是一个runnableadapter,而runnableadapter的call方法也是调用了我们传进来的task的run方法,返回的是null
static final class runnableadapter<t> implements callable<t> {
    final runnable task;
    final t result;
    runnableadapter(runnable task, t result) {
        this.task = task;
        this.result = result;
    }
    public t call() {
        task.run();
        return result;
    }
}
  • 那从这里我们就知道,我们通过submit传递进去的runnale,最后在futuretask的run方法里面调用的callable.call()实质上还是我们传递进去的runnable的run方法,在源码futuretask的run方法的时候发现,futuretask中执行任务如果出现异常,是不会抛出来的,必须通过get方法才可以获取到,当然也可以重写afterexecute()这个回调方法,在这个里面来调用get获取异常信息,
    还是要重点强调下,我们在通过submit执行任务的时候,一定要调用get()方法
  • 这里我们重写afterexecute()方法,来获取submit(runnable task)的执行异常:
protected void afterexecute(runnable r, throwable t) {
        super.afterexecute(r, t);
        //执行的callable,对应的t一定是null
        if (t == null && r instanceof future) {
            try {
                future future = (future) r;
                if (future.isdone()){
                    // 判断任务是否执行完成
                    future.get();
                }
            } catch (cancellationexception ce) {
                t = ce;
            } catch (executionexception ee) {
                t = ee.getcause();
            } catch (interruptedexception ie) {
                thread.currentthread().interrupt(); 
            }
        }
    }

countdownlatch 丢失事件

  • 我们在处理一批任务的时候,往往会把任务进行partition,然后再交给每个线程去处理,那主线程需要等待所有的线程处理完,来统计本次处理的时间,以及其他统计的数据,差不多和下面这段代码类似:
public void execute3(){
    list<integer> data = new arraylist<integer>(100);
    for (int i = 0; i < 100; i++) {
        data.add(i + 10);
    }

    list<list<integer>> partition = lists.partition(data, 20);
    final countdownlatch countdownlatch = new countdownlatch(partition.size());
    for (final list<integer> datatohandle : partition) {
        threadpoolexecutor.execute(new runnable() {
            @override
            public void run() {
                try{
                    for (integer i : datatohandle) {
                        dosomething(i);
                    }
                }catch (exception e){
                   logger.error(e.getmessage(), e);
                }finally {  
                    countdownlatch.countdown();
                }
            }
        });
    }

    try {
        countdownlatch.await();
    } catch (interruptedexception e) {
        logger.error(e.getmessage(), e);
    }

    logger.info("任务执行结束...");
}
  • 之前这么写代码没有出现过问题,直到最近出现问题才发现这么写会导致主线程无法结束的问题。我们看下,虽然在每个任务的finally中进行处理
  • countdownlatch.countdown();但是有一点忽视了,我们在异常那块其实有提到过,如果线程池满了,抛出rejectexecuteexception的话,那这次任务的countdownlatch就会被忽视,当然我们这是在主线程里执行,直接会抛出异常导致主线程结束,但是如果和上面提到的在单独的子线程里面去执行这个线程池,那这样的话由于主线程无法捕获到子线程的异常,就会出现主线程无法结束的问题,所以我们在子线程中执行线程池一定要避免这点 即如果在子线程中执行,需要改为下面这样:
 public void execute3(){
    list<integer> data = new arraylist<integer>(100);
    for (int i = 0; i < 100; i++) {
        data.add(i + 10);
    }

    final list<list<integer>> partition = lists.partition(data, 20);
    final countdownlatch countdownlatch = new countdownlatch(partition.size());
    new thread(new runnable() {
        @override
        public void run() {
            for (final list<integer> datatohandle : partition) {
                try {
                    threadpoolexecutor.execute(new runnable() {
                        @override
                        public void run() {
                            try{
                                for (integer i : datatohandle) {
                                    dosomething(i);
                                }
                            }catch (exception e){
                                logger.error(e.getmessage(), e);
                            }finally {

                                countdownlatch.countdown();
                            }
                        }
                    });
                } catch (rejectedexecutionexception e) {
                    logger.error(e.getmessage(), e);
                    //处理完异常之后需要补充countdownlatch事件
                    countdownlatch.countdown();
                }
            }

        }
    }).start();

    try {
        countdownlatch.await();
    } catch (interruptedexception e) {
        logger.error(e.getmessage(), e);
    }

    logger.info("任务执行结束...");
}

来源:https://www.cnblogs.com/guozp/p/10344446.html