欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并行程序学习

程序员文章站 2022-06-12 17:11:51
...

可见性:线程之间的变量是否可见,线程A修改共享变量后,线程B是否可见的问题。
原子性:32位机上进行对long这种64位数据类型的操作,会出现原子性问题。
有序性:指令重排:意义在于cpu执行程序指令时并不是一步执行完毕,分步执行时,会有等待上条指令的情况出现,这个停顿的时间,cpu用来进行优化,将不需要等待的操作进行提前执行,将停顿时间消除。

进程是线程的容器,是程序执行的最小实体。用多线程开发而不是多进程的原因是:线程间的切换和调度成本远小于进程。
线程的转态:
new
runnable
blocked
waiting 通过wait()方法等待的线程在等待notify()方法,而通过join()方法等待的线程则会等待目标线程的结束。一旦结束waiting进入runnable状态
time_waiting
terminated
wait() 与sleep()都可让线程等待,但是wait()可以释放目标对象的锁,但是sleep()不会释放任何资源。
wait()与notify() 是Object对象的方法,sleep()是Thread的方法。
join() 让调用该方法的线程结束后再继续执行。
yield() 让调用者让出CPU使用权。
daemon 守护线程

当锁对象处于可用的状态时,线程1与线程2同时请求锁,那么谁将获得锁,在大多数情况下,是随机的,synchronized锁关键字进行控制的是非公平的
公平锁与非公平锁 在大多数情况下,是非公平的锁
公平锁:最大的特点是不会产生饥饿现象,只要排队,最终会获得资源。但是公平锁的实现成本较高,性能相对低下。默认情况下,是非公平的。
Reentrantlock 可重入锁
Condition 与重入锁相关联使用,作用与Object.wait() Object.notify() 大致相同。 await() ,signal() 在使用前后需要获取锁,并释放锁。
ReadWriteLock 读写锁 使用读写锁分离机制可以有效提升系统的性能,将读写分离,使得在进行读取操作时达到并行,而进行写操作时仍旧是阻塞操作。
semaphore 信号量 允许多个线程同时访问 acquire() 获取一个许可 release() 释放一个许可,让其他等待许可的线程继续访问。
CountDownLatch 倒计数器 多线程控制工具类。让某个线程等待直到所有检查线程全部完工后,再执行。
CyclicBarrier 循环栅栏,可反复使用的计数器,功能和CountDownLatch 类似。

ScheduledExecutorService 定时方法:
schedule() 在给定时间,进行一次任务调度
scheduleAtFixedRate() 周期性执行调度
scheduleWithFixedDelay() 周期性执行调度,但是下次调度开始时间延迟进行
如果调度周期时间小于任务的实际执行时间,会不会发生调度重叠??? 不会,下次任务会在上次任务执行完成后,立即执行。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

corePoolSize: 线程池中的核心线程数量
maximumPoolSize: 线程池中的最大线程数量
keepAliveTime: 当线程数量超过corePoolSize时,多余的空闲线程的存活时间(超过时间销毁)。
unit: keepAliveTime的时间单位
workQueue: 任务队列,提交但尚未执行的任务
threadFactory: 线程工厂,用于创建线程,一般采用默认
handler: 拒绝策略,当任务太多来不及处理,如何拒绝任务

workQueue:
直接提交的队列 SynchronoousQueue 没有容量,每一个插入操作都要等待一个相应的删除操作。newCachedThreadPool 中采用此队列。
有界的任务队列 ArrayBlockQueue 当有新任务加入时,如果线程池的实际线程数小于核心线程数,那么会优先创建新线程,如果大于核心线程数,那么会将新任务添加到任务队列,如果队列已满,无法就加入,那么会在总线程数不大于最大线程数的前提下,创建新的线程,如果大于最大线程数,那么执行拒绝策略。
*的任务队列 LinkedBlockingQueue 与有界队列相比,不存在任务入队失败的情况,当有新任务加入时,如果线程池的线程数小于核心线程数,那么创建新的线程,达到核心线程数时,线程数量不再增加,如果大于核心线程数,那么就会进入队列等待,直到资源耗尽。
newFixedThreadPool
优先任务队列 ProorityBlockingQueue 是特殊的*任务队列,上边两种队列都是先进先出,这种会按照任务自身的优先级来执行。

handler: 拒绝策略
AbortPolicy : 该策略直接抛出异常,阻止系统正常运行
CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。这样不会真的丢弃任务,但是,任务提交线程的性能会急剧下降
DiscardOledestPolicy : 丢弃即将执行的一个请求,并尝试再次提交当前任务
DiscardPolicy : 默默丢弃无法处理的任务,不予任何处理

线程池中的线程从ThreadFactory中创建。
扩展线程池: 在默认的ThreadPoolExecutor 实现中,提供了空的 beforeExecute() 和 afterExecute() 实现。在实际应用中可以用这个进行线程的监控。
···
public static void main(String[] args) throws InterruptedException {

    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
             new DefaultThreadFactory("default")) {
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println("准备执行"+((MyThread)r).name);
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println("执行完毕"+((MyThread)r).name);
        }

        @Override
        protected void terminated() {
            System.out.println("线程池退出");
        }
    };

    for (int i = 0; i < 5; i++) {
        MyThread myThread = new MyThread("Task-Name-" + i);
        // 注意: 这里必须是execute 提交 而不是submit
        executor.execute(myThread);
        Thread.sleep(10);
    }
    executor.shutdown();
}

···

Java 中 CAS 算法 其中的一个关键
AbstractQueuedSynchronizer
AQS 同步器
AQS的核心思想是基于volatile int state这样的volatile变量,配合Unsafe工具对其原子性的操作来实现对当前锁状态进行修改。同步器内部依赖一个FIFO的双向队列来完成资源获取线程的排队工作。:

import sun.misc.Unsafe;
    /**
     * Setup to support compareAndSet. We need to natively implement
     * this here: For the sake of permitting future enhancements, we
     * cannot explicitly subclass AtomicInteger, which would be
     * efficient and useful otherwise. So, as the lesser of evils, we
     * natively implement using hotspot intrinsics API. And while we
     * are at it, we do the same for other CASable fields (which could
     * otherwise be done with atomic field updaters).
     */
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

ReentrantLock

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

伪共享问题:
为了提高CPU的速度,CPU有一个高速缓存Cache,在这个高速缓存中,读取数据的最小单元是缓存行,它是从主存复制到缓存中的最小单位,一般为32字节到128字节。
如果两个变量存放在一个缓存行中,在多线程访问中,可能会相互影响彼此的性能。假设变量x,y在同一缓存行中,运行在CPU1上的线程更新了x,那么CPU2上的缓存行就会失效。导致Cache无法命中。如果CPU经常不能命中缓存,那么系统的吞吐量就会急剧下降。

转载于:https://www.jianshu.com/p/478c0371a692