Java多线程 JUC 随笔
线程的创建
- 继承Thread类。
- 实现Runnable接口。
- 实现Callable接口。有返回值,并且可以捕获异常。
因为Thread只能接受Runnable作为参数,然后启动线程。
所以Callable作为一个有返回值可以捕获异常的接口,就需要适配Thread了。
所以就有了一个FutureTask,一个将来的任务,来适配Callable接口。
使用FutureTask来创建Callable任务,然后get获取执行结果。
FutureTask实现了Runnable、Future。可以通过Thread来启动线程,也可以获取返回结果等。
FutureTask只会执行一次,里面也有一些状态。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
线程的五种状态
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW,
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE,
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED,
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING,
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,
/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED;
}
线程的常用方法
- start:开启线程方法,Thread不会开启线程,而是通过本地方法开启新的线程。
- setDaemon:设置守护线程,主线程会等待用户线程执行完成,而不会等待守护线程。比如GC。
- setPriority:设置线程优先级,0-10以此增高,默认5。优先级高的线程有限获取cpu调度。
- join:在t1线程中调用t2.join(),那么t1会等待t2执行完成,相当于让t2插个队。
- sleep:让当前线程睡眠一段时间,不会释放锁。
- yield:让出cpu的执行权。相当于礼让。
- interrupt:中断线程,调用本地方法,只是设置了一个中断位,并没有实际终止线程。
- isInterrupted:判断线程是否中断,如果中断那么可以结束线程。调用本地方法来获取中断标志位,并不会清除中断状态。
- static interrupted:判断当前线程是否中断,也是调用本地方法来获取中断标志位,同时会清除中断状态。即中断后,调用一次返回true,调用第二次返回false。
- isAlive:判断线程是否存活。
线程并发
当多个线程处理同一个资源时,可能会出现并发问题。因为线程会在再线程内部复制一份数据进行操作。线程之间的数据不会实时变化。就会导致数据不一致问题。
并发处理:加锁
-
synchronized:互斥锁。通过锁对象或者类字节码来锁定代码块、方法。确保只有一个线程可以进入。
jdk1.6之后对其进行了优化,自旋锁,自适应自旋锁,锁消除,锁粗化,偏向锁,轻量级锁,重量级锁等。该锁是非公平锁。
该锁无法获取锁的状态。
原理:每个Object都有一个对象头,头里面有一个内存区域用来存放线程ID的。当一个线程来了,发现这个锁没用这,就把自己的线程ID放进去了,当第二个线程来了,发现锁已经被占用了,所以就只能等待了。等待第一个线程运行完,退出监视器,第二个才能进入。如此循环。。。。 -
ReentrantLock:使用JUC包下的Lock对象。进行加锁、释放锁操作。默认也是非公平锁。也可以设置公平锁,先来的线程先获取锁执行。
Lock可以获取到锁的状态,可以适应更加复杂的锁场景。 -
ReentrantReadWriteLock:读写锁
可以做到更细粒度的读写控制。
只能有一个线程写。
可以有多个线程读。
当读的时候不允许写。当写的时候不允许读。
读写锁可以解决CopyOnWirteArrayList的读数据不一致问题,但是会影响性能。
线程池
想一下去银行办理业务的场景。跟线程池完全吻合了。。。。
Executors:工具方法,用来创建线程池。
ExecutorService:用来管理线程池。
三大方法、七大参数、四种拒绝策略
cpu密集型:最大线程就是cpu核数
io密集型:读写任务
- 三大方法
Executors.newFixedThreadPool(10);
Executors.newSingleThreadExecutor();
Executors.newScheduledThreadPool(10);
Executors.newCachedThreadPool();
- 七大参数
public ThreadPoolExecutor(int corePoolSize,// 核心线程数:正常银行开放柜台
int maximumPoolSize,// 最大线程数:银行所有柜台,当core满了,并且候客区满了才会进行扩容。
long keepAliveTime,// 线程存活时间:不忙时多长时间关闭柜台
TimeUnit unit,// 存活单位:时间单位
BlockingQueue<Runnable> workQueue,// 使用哪种队列:候客区
ThreadFactory threadFactory,// 线程工厂:老板
RejectedExecutionHandler handler) {// 拒绝策略:当最大柜台数、候客区都满了时如何处理剩下的客户。
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- 拒绝处理策略:RejectedExecutionHandler
生产者消费者
- synchronized:通过如下方法进行线程之间的通信。
- wait:需要在同步代码块,或者同步方法中使用。使当前线程进入等待状态,释放锁。
- notifyAll:同样需要在同步代码块,或者同步方法中使用。用来唤醒等待的线程。
- Lock:通过Condition进行通信。
- await:进入等待状态。
- signalAll:唤醒等待线程。
通过Condition可以精确的控制唤醒那个线程。
List
-
ArrayList:
ArrayList源码分析
线程不安全集合。
内部维护一个数组,通过System.copyArray()来操控数组。
private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA = {};
/**
* Constructs an empty list with an initial capacity of ten.
*/
public ArrayList() {
this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
}
内部实现两个迭代器,一个是实现Collection的不支持set、add方法,一个是List的迭代器,支持set、add方法。
public ListIterator<E> listIterator() {
return new ListItr(0);
}
public Iterator<E> iterator() {
return new Itr();
}
public void set(E e) {
if (lastRet < 0)
throw new IllegalStateException();
checkForComodification();
try {
ArrayList.this.set(lastRet, e);
} catch (IndexOutOfBoundsException ex) {
throw new ConcurrentModificationException();
}
}
public void add(E e) {
checkForComodification();
try {
int i = cursor;
ArrayList.this.add(i, e);
cursor = i + 1;
lastRet = -1;
expectedModCount = modCount;
} catch (IndexOutOfBoundsException ex) {
throw new ConcurrentModificationException();
}
}
扩容算法:oldSize + oldSize>>1,即扩容之前大小的1.5倍。
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return <tt>true</tt> (as specified by {@link Collection#add})
*/
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
private static int calculateCapacity(Object[] elementData, int minCapacity) {
if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
return Math.max(DEFAULT_CAPACITY, minCapacity);
}
return minCapacity;
}
private void ensureCapacityInternal(int minCapacity) {
ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));
}
private void ensureExplicitCapacity(int minCapacity) {
modCount++;
// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);
}
/**
* Increases the capacity to ensure that it can hold at least the
* number of elements specified by the minimum capacity argument.
*
* @param minCapacity the desired minimum capacity
*/
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);// 1.5倍。
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// minCapacity is usually close to size, so this is a win:
elementData = Arrays.copyOf(elementData, newCapacity);
}
并发异常:在迭代时,会维护一个迭代时大小值以及期望值,如果不匹配,则抛出并发异常。所以只能通过迭代器来进行删除、添加操作。当然多线程并发还是会有问题。
final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
-
Vector:
线程安全集合。涉及到并发的操作都添加了synchronized
关键词修饰。
/**
* Appends the specified element to the end of this Vector.
*
* @param e element to be appended to this Vector
* @return {@code true} (as specified by {@link Collection#add})
* @since 1.2
*/
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + ((capacityIncrement > 0) ?
capacityIncrement : oldCapacity);// 2倍扩容
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
elementData = Arrays.copyOf(elementData, newCapacity);
}
-
Collections.synchronizedList(List list)
内部创建了一个SynchronizedList
对象,这个对象的内部维护了一个普通的List
对象。区别就是在添加、删除等操作的方法上添加了synchronized
关键词修饰,保证了同步。
SynchronizedList(List<E> list, Object mutex) {
super(list, mutex);
this.list = list;
}
public boolean equals(Object o) {
if (this == o)
return true;
synchronized (mutex) {return list.equals(o);}
}
public int hashCode() {
synchronized (mutex) {return list.hashCode();}
}
public E get(int index) {
synchronized (mutex) {return list.get(index);}
}
public E set(int index, E element) {
synchronized (mutex) {return list.set(index, element);}
}
public void add(int index, E element) {
synchronized (mutex) {list.add(index, element);}
}
public E remove(int index) {
synchronized (mutex) {return list.remove(index);}
}
-
CopyOnWriteArrayList
CopyOnWriteArrayList文章 3y
JUC包下的集合类。
内部通过Lock
同步锁来保证线程安全。
通过CopyOnWrite机制来处理并发问题。当需要写时就复制出一份新的数据,对新的数据进行操作,并不影响旧的数据,这是对旧的数据读也就没有问题了。这样会造成内存空间的浪费。并且只能保证最终数据一致性,不能保证实时一致性。
JDK8是通过Lock,JDK14又改回了synchronized。
为什么会改呢,因为当时CopyOnWriteArrayList是jdk1.5版本,那会synchronized还没有优化,为了提高性能就是用了Lock,但是呢,后来jdk1.6之后对synchronized进行了优化,毕竟是亲儿子,所以在jdk11之后的版本就又改回了synchronized。
Set
-
HashSet
内部通过HashMap实现,因为HashMap的键不重复,所以set也是不能重复的。
/**
* Constructs a new, empty set; the backing <tt>HashMap</tt> instance has
* default initial capacity (16) and load factor (0.75).
*/
public HashSet() {
map = new HashMap<>();
}
/**
* Adds the specified element to this set if it is not already present.
* More formally, adds the specified element <tt>e</tt> to this set if
* this set contains no element <tt>e2</tt> such that
* <tt>(e==null ? e2==null : e.equals(e2))</tt>.
* If this set already contains the element, the call leaves the set
* unchanged and returns <tt>false</tt>.
*
* @param e element to be added to this set
* @return <tt>true</tt> if this set did not already contain the specified
* element
*/
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
-
SynchronizedSet
通过Collections创建的同步Set,内部有一个普通的Set,然后在读写方法上添加了synchronized,用来解决同步问题。
/**
* @serial include
*/
static class SynchronizedSet<E>
extends SynchronizedCollection<E>
implements Set<E> {
private static final long serialVersionUID = 487447009682186044L;
SynchronizedSet(Set<E> s) {
super(s);
}
SynchronizedSet(Set<E> s, Object mutex) {
super(s, mutex);
}
public boolean equals(Object o) {
if (this == o)
return true;
synchronized (mutex) {return c.equals(o);}
}
public int hashCode() {
synchronized (mutex) {return c.hashCode();}
}
}
-
CopyOnWriteArraySet
内部使用CopyOnWriteArrayList实现。为了保证不重复,在添加时先校验是否存在该元素。
/**
* Creates an empty set.
*/
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
/**
* Appends the element, if not present.
*
* @param e element to be added to this list, if absent
* @return {@code true} if the element was added
*/
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}
Queue
队列,顾名思义就是排队,根据不同的特性有有不同的实现类:比如阻塞队列,非阻塞队列,单端取队列,双端取队列,固定队列,可扩容队列。
底层呢又根据不同的实现又进行了划分:使用Array数组实现队列数据结构,使用链表实现队列数据结构。
队列内部的方法无非也是增删改查。但是根据不同的特性有不同的实现方式,比如阻塞添加、非阻塞添加、阻塞指定时间添加、抛出异常添加等。。。。
集合总结
- 了解内部的数据结构,实现原理。
- 解决同步并发问题。
- synchronized 关键字解决。直接锁方法,但是锁的不彻底没解决并发读问题。
- 利用Collections提供的同步集合,同上。
- juc的解决方案,Lock或者synchronized加锁。并且使用CopyOnWrite机制解决读问题。在写的时候复制一份,读的还是旧的,写完后用新的覆盖旧的。用空间解决读并发提高性能。
缺点:浪费内存空间;读数据不会实时一致,但是最终数据一致。
// TODO
CountDownLatch
内部for循环判断是否都执行完了,count=0了。
package com.csdn.thread;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
public static void main(String[] args) {
int size = 10;
CountDownLatch latch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "执行完成。");
latch.countDown();
}, String.valueOf(i)).start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有任务执行完成。");
}
}
1执行完成。
4执行完成。
5执行完成。
3执行完成。
2执行完成。
0执行完成。
6执行完成。
7执行完成。
8执行完成。
9执行完成。
所有任务执行完成。
CyclicBarrier
内部原理for循环判断是否够了。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cb = new CyclicBarrier(7, () -> {
System.out.println("收集完成7个龙珠,召唤神龙。");
});
for (int i = 0; i < 7; i++) {
int index = i;
new Thread(() -> {
System.out.println("收集第" + index + "个龙珠。");
try {
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
收集第0个龙珠。
收集第3个龙珠。
收集第2个龙珠。
收集第5个龙珠。
收集第1个龙珠。
收集第4个龙珠。
收集第6个龙珠。
收集完成7个龙珠,召唤神龙。
Semaphore
package com.csdn.thread;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3, false);// 创建信号量,非公平
for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到车位。");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
}
Thread-0抢到车位。
Thread-2抢到车位。
Thread-1抢到车位。
Thread-1离开车位。
Thread-2离开车位。
Thread-0离开车位。
Thread-4抢到车位。
Thread-3抢到车位。
Thread-5抢到车位。
Thread-5离开车位。
Thread-4离开车位。
Thread-3离开车位。
函数式接口
- Consumer
@FunctionalInterface
public interface Consumer<T> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
*/
void accept(T t);
/**
* Returns a composed {@code Consumer} that performs, in sequence, this
* operation followed by the {@code after} operation. If performing either
* operation throws an exception, it is relayed to the caller of the
* composed operation. If performing this operation throws an exception,
* the {@code after} operation will not be performed.
*
* @param after the operation to perform after this operation
* @return a composed {@code Consumer} that performs in sequence this
* operation followed by the {@code after} operation
* @throws NullPointerException if {@code after} is null
*/
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
}
- Function
@FunctionalInterface
public interface Function<T, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
}
- Predicate
@FunctionalInterface
public interface Predicate<T> {
/**
* Evaluates this predicate on the given argument.
*
* @param t the input argument
* @return {@code true} if the input argument matches the predicate,
* otherwise {@code false}
*/
boolean test(T t);
}
- Supplier
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
Stream!!!!!思想参考数据库查询
集合用来存储数据,流用来操作数据。比如数据的筛选过滤,计算,查找,排序,分组等操作。
JDK8对所有的数据类型都提供了流支持。
ForkJoin!!!!!
大数据量。
CompletableFuture
类似于js中发送http请求,然后等待结果,或者处理异常的功能。
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("下载图片....");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "aa";
});
future.whenComplete((result, exception) -> {
System.out.println("result => " + result);
System.out.println("exception => " + exception);
}).exceptionally(e -> {
return null;
}).thenAccept(result -> {
System.out.println("then => " + result);
}).get();
}
java.util.concurrent.atomic
原子类型操作,底层调用本地方法,与操作系统挂钩。CAS原理。CompareAndSwap替换时比较。
CAS:
使用版本号解决ABA问题。
工具使用
本文地址:https://blog.csdn.net/weixin_44105483/article/details/109599075
上一篇: Spring:依赖注入(DI)学习笔记
下一篇: 汤隔夜怎么保存?好喝的汤有哪些?