欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java多线程 JUC 随笔

程序员文章站 2022-04-15 19:03:49
参考视频线程的创建继承Thread类。实现Runnable接口。实现Callable接口。有返回值,并且可以捕获异常。因为Thread只能接受Runnable作为参数,然后启动线程。所以Callable作为一个有返回值可以捕获异常的接口,就需要适配Thread了。所以就有了一个FutureTask,一个将来的任务,来适配Callable接口。使用FutureTask来创建Callable任务,然后get获取执行结果。FutureTask实现了Runnable、Future。可以通过T...

参考视频

线程的创建

  1. 继承Thread类。
  2. 实现Runnable接口。
  3. 实现Callable接口。有返回值,并且可以捕获异常。
    因为Thread只能接受Runnable作为参数,然后启动线程。
    Java多线程 JUC 随笔
    所以Callable作为一个有返回值可以捕获异常的接口,就需要适配Thread了。
    所以就有了一个FutureTask,一个将来的任务,来适配Callable接口。
    Java多线程 JUC 随笔Java多线程 JUC 随笔
    使用FutureTask来创建Callable任务,然后get获取执行结果。
    FutureTask实现了Runnable、Future。可以通过Thread来启动线程,也可以获取返回结果等。
    FutureTask只会执行一次,里面也有一些状态。
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

线程的五种状态

public enum State {
        /**
         * Thread state for a thread which has not yet started.
         */
        NEW,

        /**
         * Thread state for a runnable thread.  A thread in the runnable
         * state is executing in the Java virtual machine but it may
         * be waiting for other resources from the operating system
         * such as processor.
         */
        RUNNABLE,

        /**
         * Thread state for a thread blocked waiting for a monitor lock.
         * A thread in the blocked state is waiting for a monitor lock
         * to enter a synchronized block/method or
         * reenter a synchronized block/method after calling
         * {@link Object#wait() Object.wait}.
         */
        BLOCKED,

        /**
         * Thread state for a waiting thread.
         * A thread is in the waiting state due to calling one of the
         * following methods:
         * <ul>
         *   <li>{@link Object#wait() Object.wait} with no timeout</li>
         *   <li>{@link #join() Thread.join} with no timeout</li>
         *   <li>{@link LockSupport#park() LockSupport.park}</li>
         * </ul>
         *
         * <p>A thread in the waiting state is waiting for another thread to
         * perform a particular action.
         *
         * For example, a thread that has called <tt>Object.wait()</tt>
         * on an object is waiting for another thread to call
         * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
         * that object. A thread that has called <tt>Thread.join()</tt>
         * is waiting for a specified thread to terminate.
         */
        WAITING,

        /**
         * Thread state for a waiting thread with a specified waiting time.
         * A thread is in the timed waiting state due to calling one of
         * the following methods with a specified positive waiting time:
         * <ul>
         *   <li>{@link #sleep Thread.sleep}</li>
         *   <li>{@link Object#wait(long) Object.wait} with timeout</li>
         *   <li>{@link #join(long) Thread.join} with timeout</li>
         *   <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
         *   <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
         * </ul>
         */
        TIMED_WAITING,

        /**
         * Thread state for a terminated thread.
         * The thread has completed execution.
         */
        TERMINATED;
    }

线程的常用方法

线程常用方法源码分析

  • start:开启线程方法,Thread不会开启线程,而是通过本地方法开启新的线程。
  • setDaemon:设置守护线程,主线程会等待用户线程执行完成,而不会等待守护线程。比如GC。
  • setPriority:设置线程优先级,0-10以此增高,默认5。优先级高的线程有限获取cpu调度。
  • join:在t1线程中调用t2.join(),那么t1会等待t2执行完成,相当于让t2插个队。
  • sleep:让当前线程睡眠一段时间,不会释放锁。
  • yield:让出cpu的执行权。相当于礼让。
  • interrupt:中断线程,调用本地方法,只是设置了一个中断位,并没有实际终止线程
  • isInterrupted:判断线程是否中断,如果中断那么可以结束线程。调用本地方法来获取中断标志位,并不会清除中断状态。
  • static interrupted:判断当前线程是否中断,也是调用本地方法来获取中断标志位,同时会清除中断状态。即中断后,调用一次返回true,调用第二次返回false。
  • isAlive:判断线程是否存活。

线程并发

当多个线程处理同一个资源时,可能会出现并发问题。因为线程会在再线程内部复制一份数据进行操作。线程之间的数据不会实时变化。就会导致数据不一致问题。

并发处理:加锁

锁详解

  • synchronized:互斥锁。通过锁对象或者类字节码来锁定代码块、方法。确保只有一个线程可以进入。
    jdk1.6之后对其进行了优化,自旋锁,自适应自旋锁,锁消除,锁粗化,偏向锁,轻量级锁,重量级锁等。该锁是非公平锁。
    该锁无法获取锁的状态。
    原理:每个Object都有一个对象头,头里面有一个内存区域用来存放线程ID的。当一个线程来了,发现这个锁没用这,就把自己的线程ID放进去了,当第二个线程来了,发现锁已经被占用了,所以就只能等待了。等待第一个线程运行完,退出监视器,第二个才能进入。如此循环。。。。
  • ReentrantLock:使用JUC包下的Lock对象。进行加锁、释放锁操作。默认也是非公平锁。也可以设置公平锁,先来的线程先获取锁执行。
    Lock可以获取到锁的状态,可以适应更加复杂的锁场景。
  • ReentrantReadWriteLock:读写锁
    可以做到更细粒度的读写控制。
    只能有一个线程写。
    可以有多个线程读。
    当读的时候不允许写。当写的时候不允许读。
    读写锁可以解决CopyOnWirteArrayList的读数据不一致问题,但是会影响性能。
    Java多线程 JUC 随笔

线程池

想一下去银行办理业务的场景。跟线程池完全吻合了。。。。
Executors:工具方法,用来创建线程池。
ExecutorService:用来管理线程池。
三大方法、七大参数、四种拒绝策略
cpu密集型:最大线程就是cpu核数
io密集型:读写任务

  • 三大方法
		Executors.newFixedThreadPool(10);
		Executors.newSingleThreadExecutor();
		Executors.newScheduledThreadPool(10);
		Executors.newCachedThreadPool();
  • 七大参数
    public ThreadPoolExecutor(int corePoolSize,// 核心线程数:正常银行开放柜台
                              int maximumPoolSize,// 最大线程数:银行所有柜台,当core满了,并且候客区满了才会进行扩容。
                              long keepAliveTime,// 线程存活时间:不忙时多长时间关闭柜台
                              TimeUnit unit,// 存活单位:时间单位
                              BlockingQueue<Runnable> workQueue,// 使用哪种队列:候客区
                              ThreadFactory threadFactory,// 线程工厂:老板
                              RejectedExecutionHandler handler) {// 拒绝策略:当最大柜台数、候客区都满了时如何处理剩下的客户。
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • 拒绝处理策略:RejectedExecutionHandler
    Java多线程 JUC 随笔

生产者消费者

  1. synchronized:通过如下方法进行线程之间的通信。
  • wait:需要在同步代码块,或者同步方法中使用。使当前线程进入等待状态,释放锁。
  • notifyAll:同样需要在同步代码块,或者同步方法中使用。用来唤醒等待的线程。
  1. Lock:通过Condition进行通信。
  • await:进入等待状态。
  • signalAll:唤醒等待线程。
    通过Condition可以精确的控制唤醒那个线程。

List

  • ArrayList
    ArrayList源码分析
    线程不安全集合。
    内部维护一个数组,通过System.copyArray()来操控数组。
	private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA = {};
    /**
     * Constructs an empty list with an initial capacity of ten.
     */
    public ArrayList() {
        this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
    }

内部实现两个迭代器,一个是实现Collection的不支持set、add方法,一个是List的迭代器,支持set、add方法。

    public ListIterator<E> listIterator() {
        return new ListItr(0);
    }
        public Iterator<E> iterator() {
        return new Itr();
    }
            public void set(E e) {
            if (lastRet < 0)
                throw new IllegalStateException();
            checkForComodification();

            try {
                ArrayList.this.set(lastRet, e);
            } catch (IndexOutOfBoundsException ex) {
                throw new ConcurrentModificationException();
            }
        }

        public void add(E e) {
            checkForComodification();

            try {
                int i = cursor;
                ArrayList.this.add(i, e);
                cursor = i + 1;
                lastRet = -1;
                expectedModCount = modCount;
            } catch (IndexOutOfBoundsException ex) {
                throw new ConcurrentModificationException();
            }
        }

扩容算法:oldSize + oldSize>>1,即扩容之前大小的1.5倍。

    /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return <tt>true</tt> (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        ensureCapacityInternal(size + 1);  // Increments modCount!!
        elementData[size++] = e;
        return true;
    }
        private static int calculateCapacity(Object[] elementData, int minCapacity) {
        if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
            return Math.max(DEFAULT_CAPACITY, minCapacity);
        }
        return minCapacity;
    }

    private void ensureCapacityInternal(int minCapacity) {
        ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));
    }

    private void ensureExplicitCapacity(int minCapacity) {
        modCount++;

        // overflow-conscious code
        if (minCapacity - elementData.length > 0)
            grow(minCapacity);
    }
        /**
     * Increases the capacity to ensure that it can hold at least the
     * number of elements specified by the minimum capacity argument.
     *
     * @param minCapacity the desired minimum capacity
     */
    private void grow(int minCapacity) {
        // overflow-conscious code
        int oldCapacity = elementData.length;
        int newCapacity = oldCapacity + (oldCapacity >> 1);// 1.5倍。
        if (newCapacity - minCapacity < 0)
            newCapacity = minCapacity;
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        // minCapacity is usually close to size, so this is a win:
        elementData = Arrays.copyOf(elementData, newCapacity);
    }

并发异常:在迭代时,会维护一个迭代时大小值以及期望值,如果不匹配,则抛出并发异常。所以只能通过迭代器来进行删除、添加操作。当然多线程并发还是会有问题。

        final void checkForComodification() {
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
        }
  • Vector
    线程安全集合。涉及到并发的操作都添加了synchronized关键词修饰。
    /**
     * Appends the specified element to the end of this Vector.
     *
     * @param e element to be appended to this Vector
     * @return {@code true} (as specified by {@link Collection#add})
     * @since 1.2
     */
    public synchronized boolean add(E e) {
        modCount++;
        ensureCapacityHelper(elementCount + 1);
        elementData[elementCount++] = e;
        return true;
    }
    private void grow(int minCapacity) {
        // overflow-conscious code
        int oldCapacity = elementData.length;
        int newCapacity = oldCapacity + ((capacityIncrement > 0) ?
                                         capacityIncrement : oldCapacity);// 2倍扩容
        if (newCapacity - minCapacity < 0)
            newCapacity = minCapacity;
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        elementData = Arrays.copyOf(elementData, newCapacity);
    }
  • Collections.synchronizedList(List list)
    内部创建了一个SynchronizedList对象,这个对象的内部维护了一个普通的List对象。区别就是在添加、删除等操作的方法上添加了synchronized关键词修饰,保证了同步。
SynchronizedList(List<E> list, Object mutex) {
            super(list, mutex);
            this.list = list;
        }

        public boolean equals(Object o) {
            if (this == o)
                return true;
            synchronized (mutex) {return list.equals(o);}
        }
        public int hashCode() {
            synchronized (mutex) {return list.hashCode();}
        }

        public E get(int index) {
            synchronized (mutex) {return list.get(index);}
        }
        public E set(int index, E element) {
            synchronized (mutex) {return list.set(index, element);}
        }
        public void add(int index, E element) {
            synchronized (mutex) {list.add(index, element);}
        }
        public E remove(int index) {
            synchronized (mutex) {return list.remove(index);}
        }
  • CopyOnWriteArrayList
    CopyOnWriteArrayList文章 3y
    JUC包下的集合类。
    内部通过Lock同步锁来保证线程安全。
    通过CopyOnWrite机制来处理并发问题。当需要写时就复制出一份新的数据,对新的数据进行操作,并不影响旧的数据,这是对旧的数据读也就没有问题了。这样会造成内存空间的浪费。并且只能保证最终数据一致性,不能保证实时一致性。
    JDK8是通过Lock,JDK14又改回了synchronized。
    为什么会改呢,因为当时CopyOnWriteArrayList是jdk1.5版本,那会synchronized还没有优化,为了提高性能就是用了Lock,但是呢,后来jdk1.6之后对synchronized进行了优化,毕竟是亲儿子,所以在jdk11之后的版本就又改回了synchronized。

Set

  • HashSet
    内部通过HashMap实现,因为HashMap的键不重复,所以set也是不能重复的。
    /**
     * Constructs a new, empty set; the backing <tt>HashMap</tt> instance has
     * default initial capacity (16) and load factor (0.75).
     */
    public HashSet() {
        map = new HashMap<>();
    }
    /**
     * Adds the specified element to this set if it is not already present.
     * More formally, adds the specified element <tt>e</tt> to this set if
     * this set contains no element <tt>e2</tt> such that
     * <tt>(e==null&nbsp;?&nbsp;e2==null&nbsp;:&nbsp;e.equals(e2))</tt>.
     * If this set already contains the element, the call leaves the set
     * unchanged and returns <tt>false</tt>.
     *
     * @param e element to be added to this set
     * @return <tt>true</tt> if this set did not already contain the specified
     * element
     */
    public boolean add(E e) {
        return map.put(e, PRESENT)==null;
    }
  • SynchronizedSet
    通过Collections创建的同步Set,内部有一个普通的Set,然后在读写方法上添加了synchronized,用来解决同步问题。
	/**
     * @serial include
     */
    static class SynchronizedSet<E>
          extends SynchronizedCollection<E>
          implements Set<E> {
        private static final long serialVersionUID = 487447009682186044L;

        SynchronizedSet(Set<E> s) {
            super(s);
        }
        SynchronizedSet(Set<E> s, Object mutex) {
            super(s, mutex);
        }

        public boolean equals(Object o) {
            if (this == o)
                return true;
            synchronized (mutex) {return c.equals(o);}
        }
        public int hashCode() {
            synchronized (mutex) {return c.hashCode();}
        }
    }
  • CopyOnWriteArraySet
    内部使用CopyOnWriteArrayList实现。为了保证不重复,在添加时先校验是否存在该元素。
    /**
     * Creates an empty set.
     */
    public CopyOnWriteArraySet() {
        al = new CopyOnWriteArrayList<E>();
    }
    /**
     * Appends the element, if not present.
     *
     * @param e element to be added to this list, if absent
     * @return {@code true} if the element was added
     */
    public boolean addIfAbsent(E e) {
        Object[] snapshot = getArray();
        return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
            addIfAbsent(e, snapshot);
    }

Queue

队列,顾名思义就是排队,根据不同的特性有有不同的实现类:比如阻塞队列,非阻塞队列,单端取队列,双端取队列,固定队列,可扩容队列。
底层呢又根据不同的实现又进行了划分:使用Array数组实现队列数据结构,使用链表实现队列数据结构。
Java多线程 JUC 随笔
队列内部的方法无非也是增删改查。但是根据不同的特性有不同的实现方式,比如阻塞添加、非阻塞添加、阻塞指定时间添加、抛出异常添加等。。。。

集合总结

  • 了解内部的数据结构,实现原理。
  • 解决同步并发问题。
    • synchronized 关键字解决。直接锁方法,但是锁的不彻底没解决并发读问题。
    • 利用Collections提供的同步集合,同上。
    • juc的解决方案,Lock或者synchronized加锁。并且使用CopyOnWrite机制解决读问题。在写的时候复制一份,读的还是旧的,写完后用新的覆盖旧的。用空间解决读并发提高性能。
      缺点:浪费内存空间;读数据不会实时一致,但是最终数据一致。
      // TODO

CountDownLatch

内部for循环判断是否都执行完了,count=0了。

package com.csdn.thread;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest {

	public static void main(String[] args) {
		int size = 10;
		CountDownLatch latch = new CountDownLatch(size);
		for (int i = 0; i < size; i++) {
			new Thread(() -> {
				System.out.println(Thread.currentThread().getName() + "执行完成。");
				latch.countDown();
			}, String.valueOf(i)).start();
		}
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("所有任务执行完成。");
	}

}
1执行完成。
4执行完成。
5执行完成。
3执行完成。
2执行完成。
0执行完成。
6执行完成。
7执行完成。
8执行完成。
9执行完成。
所有任务执行完成。

CyclicBarrier

内部原理for循环判断是否够了。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

	public static void main(String[] args) {
		CyclicBarrier cb = new CyclicBarrier(7, () -> {
			System.out.println("收集完成7个龙珠,召唤神龙。");
		});

		for (int i = 0; i < 7; i++) {
			int index = i;
			new Thread(() -> {
				System.out.println("收集第" + index + "个龙珠。");

				try {
					cb.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
			}).start();
		}
	}
}
收集第0个龙珠。
收集第3个龙珠。
收集第2个龙珠。
收集第5个龙珠。
收集第1个龙珠。
收集第4个龙珠。
收集第6个龙珠。
收集完成7个龙珠,召唤神龙。

Semaphore

package com.csdn.thread;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {

	public static void main(String[] args) {
		Semaphore semaphore = new Semaphore(3, false);// 创建信号量,非公平

		for (int i = 0; i < 6; i++) {
			new Thread(() -> {
				try {
					semaphore.acquire();
					System.out.println(Thread.currentThread().getName() + "抢到车位。");
					TimeUnit.SECONDS.sleep(2);
					System.out.println(Thread.currentThread().getName() + "离开车位。");
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					semaphore.release();
				}
			}).start();
		}
	}
}
Thread-0抢到车位。
Thread-2抢到车位。
Thread-1抢到车位。
Thread-1离开车位。
Thread-2离开车位。
Thread-0离开车位。
Thread-4抢到车位。
Thread-3抢到车位。
Thread-5抢到车位。
Thread-5离开车位。
Thread-4离开车位。
Thread-3离开车位。

函数式接口

  • Consumer
@FunctionalInterface
public interface Consumer<T> {

    /**
     * Performs this operation on the given argument.
     *
     * @param t the input argument
     */
    void accept(T t);

    /**
     * Returns a composed {@code Consumer} that performs, in sequence, this
     * operation followed by the {@code after} operation. If performing either
     * operation throws an exception, it is relayed to the caller of the
     * composed operation.  If performing this operation throws an exception,
     * the {@code after} operation will not be performed.
     *
     * @param after the operation to perform after this operation
     * @return a composed {@code Consumer} that performs in sequence this
     * operation followed by the {@code after} operation
     * @throws NullPointerException if {@code after} is null
     */
    default Consumer<T> andThen(Consumer<? super T> after) {
        Objects.requireNonNull(after);
        return (T t) -> { accept(t); after.accept(t); };
    }
}
  • Function
@FunctionalInterface
public interface Function<T, R> {

    /**
     * Applies this function to the given argument.
     *
     * @param t the function argument
     * @return the function result
     */
    R apply(T t);
}
  • Predicate
@FunctionalInterface
public interface Predicate<T> {

    /**
     * Evaluates this predicate on the given argument.
     *
     * @param t the input argument
     * @return {@code true} if the input argument matches the predicate,
     * otherwise {@code false}
     */
    boolean test(T t);
}
  • Supplier
@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}

Stream!!!!!思想参考数据库查询

集合用来存储数据,流用来操作数据。比如数据的筛选过滤,计算,查找,排序,分组等操作。
JDK8对所有的数据类型都提供了流支持。
Java多线程 JUC 随笔

ForkJoin!!!!!

大数据量。

CompletableFuture

类似于js中发送http请求,然后等待结果,或者处理异常的功能。
Java多线程 JUC 随笔

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
			System.out.println("下载图片....");
			try {
				TimeUnit.SECONDS.sleep(3);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			return "aa";
		});

		future.whenComplete((result, exception) -> {
			System.out.println("result => " + result);
			System.out.println("exception => " + exception);
		}).exceptionally(e -> {
			return null;
		}).thenAccept(result -> {
			System.out.println("then => " + result);
		}).get();
	}

java.util.concurrent.atomic

原子类型操作,底层调用本地方法,与操作系统挂钩。CAS原理。CompareAndSwap替换时比较。
CAS:
Java多线程 JUC 随笔
使用版本号解决ABA问题。

工具使用

Java多线程 JUC 随笔
Java多线程 JUC 随笔

本文地址:https://blog.csdn.net/weixin_44105483/article/details/109599075