Java高级程序设计

并发编程

并发

空间是并存事物的次序 (Space is the order of existence of states which are simultaneous)

—— Leibniz

The world is concurrent.

—— Joe Armstrong

Concurrency occurs when two or more execution flows are able to run simultaneously.

—— Edsger Wybe Dijkstra

进程 vs. 线程

  • 进程:是系统进行资源分配和调度的一个独立单位,也是一个具有独立功能的程序;
  • 线程:线程依托于进程而存在,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器、一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。

区别在于,进程属于资源分配的单位,而线程则是作业调度的单位;进程拥有自己的地址空间,而多个线程拥有自己的堆栈和局部变量,并共享所依托于进程的资源。

并发 vs 并行

JVM Threads







初级篇

Runnable

@FunctionalInterface
public interface Runnable

The Runnable interface should be implemented by any class whose instances are intended to be executed by a thread. The class must define a method of no arguments called run.

Since:
JDK1.0

https://docs.oracle.com/javase/8/docs/api/java/lang/Runnable.html

举例

class LiftOff implements Runnable {
    protected int countDown = 10; // Default
    private static int taskCount = 0;
    private final int id = taskCount++;
    public LiftOff(int countDown) { this.countDown = countDown; }
    public String status() {
        return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), ";
    }
    public void run() {
        while (countDown-- > 0) {
            System.out.print(status()); 
            Thread.yield(); //后面解释
        }
    }
}

Run it

public class MainThread {
    public static void main(String[] args) {
        LiftOff launch = new LiftOff(10);
        launch.run(); //is it a new thread?
    }
}
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!),

Runnable接口仅仅定义“任务”

Runnable r = () -> { task code };

Thread

public class Thread extends Object implements Runnable

A thread is a thread of execution in a program. The Java Virtual Machine allows an application to have multiple threads of execution running concurrently.

Since:
JDK1.0

https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html

举例

public class BasicThreads {
    public static void main(String[] args) {
        //把任务装进线程里
        Thread t = new Thread(new LiftOff(10));
        t.start();
        System.out.println("Waiting for LiftOff");
    }
}

Thread对象像是运载火箭,Runnable的实现对象就是一个荷载(payload)

或者

public class SimpleThread extends Thread {
    private int countDown = 5;  private static int threadCount = 0;
    public SimpleThread() {
        super(Integer.toString(++threadCount));  start();
    }
    public String toString() {
        return "#" + getName() + "(" + countDown + "), ";
    }
    public void run() {
        while (true) { System.out.print(this);  if (--countDown == 0) return; }
    }
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) { new SimpleThread(); }
    }
}

WARNING

  • The run() method should not be called directly by the application. The system calls it.

  • If the run() method is called explicitly by the application then the code is executed sequentially not concurrently.

多线程,走起

public class MoreBasicThreads {
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++)
            new Thread(new LiftOff(10)).start();
        System.out.println("Waiting for LiftOff");
    }
}

直接启动多个Thread

或者用ExecutorService启动

public class CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++)
            exec.execute(new LiftOff(10));
        exec.shutdown();
    }
}

根据需要创建新线程的线程池,如果现有线程没有可用的,则创建一个新线程并添加到池中,如果有被使用完但是还没销毁的线程,就复用该线程

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html

线程池?

  • 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。
  • 在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。
  • 所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。
  • 如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因。

或者另一种策略

public class FixedThreadPool {
    public static void main(String[] args) {
        // Constructor argument is number of threads:
        ExecutorService exec = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++)
            exec.execute(new LiftOff(10));
        exec.shutdown();
    }
}

创建一个固定线程数的线程池,在任何时候最多只有n个线程被创建。如果在所有线程都处于活动状态时,有其他任务提交,他们将等待队列中直到线程可用。如果任何线程由于执行过程中的故障而终止,将会有一个新线程取代这个线程执行后续任务。

线程池常见用法

  • 调用Executors类的静态方法newCachedThreadPool或者newFixedThreadPool
  • 调用submit提交RunnalbeCallable对象
  • 保存好返回的Future对象,以便得到结果或者取消任务
  • 当不想再提交任何任务时,调用shutdown

Callable

如果需要获得异步执行的任务结果怎么办?

@FunctionalInterface
public interface Callable<V>

A task that returns a result and may throw an exception. Implementors define a single method with no arguments called call.
The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

Future

public interface Future<V>

A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method.

Future<V>

保存异步计算的结果

    V get() //阻塞,直到计算完成
    V get(long timeout, TImeUnit unit)
    void cancel(boolean mayInterrupt)
    boolean isCancelled()
    boolean isDone()

当调用Futureget()方法以获得结果时,当前线程就开始阻塞,直接call()方法结束返回结果。

举例

class MyCallable implements Callable<String>{
    @Override
    public String call() throws Exception {
        System.out.println("做一些耗时的任务...");
        Thread.sleep(5000);
        return "OK";
    }
}
public class FutureSimpleDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());
        System.out.println("do something...");
        System.out.println("得到异步任务返回结果:" + future.get());
        System.out.println("Completed!");
    }
}

Sleep 睡眠

Causes the currently executing thread to sleep (temporarily cease execution) for the specified number of milliseconds.

public class SleepingTask implements Runnable {
    private int countDown = 10;
    public void run() {
        try {
            while (countDown-- > 0) {
                TimeUnit.MILLISECONDS.sleep(100);       // Old-style: Thread.sleep(100);
            }
        } catch (InterruptedException e) { System.err.println("Interrupted");  }//可能被打断 
    }
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) { exec.execute(new SleepingTask()); }
        exec.shutdown();
    }
}

Yield 让位 

A hint to the scheduler that the current thread is willing to yield its current use of a processor. The scheduler is free to ignore this hint.

  • yieldsleep的主要区别:
    • yield会临时暂停当前线程,让有同样优先级的正在等待的线程有机会执行
    • 若没有正在等待的线程或者所有正在等待的线程的优先级都较低,则继续运行
    • 执行yield的线程何时继续运行由线程调度器来决定,不同厂商可能有不同行为
    • yield方法不保证当前的线程会暂停或者停止,但是可以保证当前线程在调用yield方法时会放弃CPU

Priority 优先级

public class SimplePriorities implements Runnable {
    private int countDown = 5;
    private volatile double d; // No optimization 后面再解释
    private int priority;
    public SimplePriorities(int priority) {
        this.priority = priority;
    }
    public void run() {
        Thread.currentThread().setPriority(priority);
        while (true) {
            // An expensive, interruptable operation:
            for (int i = 1; i < 100000; i++) {
                d += (Math.PI + Math.E) / (double) i;
                if (i % 1000 == 0)
                    Thread.yield();
            }
            System.out.println(this);
            if (--countDown == 0) return;
        }
    }
    public String toString() {
        return Thread.currentThread() + ": " + countDown;
    }

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++)
            exec.execute(new SimplePriorities(Thread.MIN_PRIORITY));
        exec.execute(new SimplePriorities(Thread.MAX_PRIORITY));
        exec.shutdown();
    }
}

改变线程优先级这件事可以做,但尽量不要做

Daemon 线程

A daemon thread is a thread that does not prevent the JVM from exiting when the program finishes but the thread is still running. An example for a daemon thread is the garbage collection.

public class SimpleDaemons implements Runnable {
    public void run() {
        try {
            while (true) {
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println(Thread.currentThread() + " " + this);
            }
        } catch (InterruptedException e) {
            System.out.println("sleep() interrupted");
        }
    }
    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10; i++) {
            Thread daemon = new Thread(new SimpleDaemons());
            daemon.setDaemon(true); // Must call before start()
            daemon.start();
        }
        System.out.println("All daemons started");
        TimeUnit.MILLISECONDS.sleep(99);
    }
}

后台运行线程,当所有非后台线程结束时,应用退出,所有Daemon线程被杀!

小结

  • Java关于线程编程的抽象

    • Thread对象像是运载火箭,Runnable的实现对象就是一个荷载(payload)
    • Runnable/Callable --> Task
    • Thread --> let tasks go







中级篇

定义一个普通线程

class Sleeper extends Thread {
    private int duration;
    public Sleeper(String name, int sleepTime) {
        super(name);
        duration = sleepTime;
        start();
    }
    public void run() {
        try {
            sleep(duration);
        } catch (InterruptedException e) {
            System.out.println(getName() + " was interrupted.");
            return;
        }
        System.out.println(getName() + " has awakened");
    }
}

定一个等待线程

class Joiner extends Thread {
    private Sleeper sleeper;
    public Joiner(String name, Sleeper sleeper) {
        super(name);
        this.sleeper = sleeper;
        start();
    }
    public void run() {
        try {
            sleeper.join();
        } catch (InterruptedException e) {
            System.out.println("Interrupted");
        }
        System.out.println(getName() + " join completed");
    }
}

Join

public class Joining {
    public static void main(String[] args) {
        Sleeper sleepy = new Sleeper("Sleepy", 1500),
                grumpy = new Sleeper("Grumpy", 1500);
        Joiner  dopey = new Joiner("Dopey", sleepy),
                doc = new Joiner("Doc", grumpy);
        grumpy.interrupt();
    }
}
Grumpy was interrupted.
Doc join completed
Sleepy has awakened
Dopey join completed

Exceptions in Threads

class ExceptionThread implements Runnable {
    public void run() {
        throw new RuntimeException();
    }
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new ExceptionThread());
    }
}

Uncaught Exceptions

public class NaiveExceptionHandling {
    public static void main(String[] args) {
        try {
            ExecutorService exec = Executors.newCachedThreadPool();
            exec.execute(new ExceptionThread());
            exec.shutdown();
        } catch (RuntimeException ue) {
            System.out.println("Exception has been handled!");   // This statement will NOT execute!
        }
    }
}
Exception in thread "pool-1-thread-1" java.lang.RuntimeException
	at concurrency.ExceptionThread.run(ExceptionThread.java:7)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

UncaughtExceptionHandler

class ExceptionThread2 implements Runnable {
    public void run() {
        Thread t = Thread.currentThread();
        System.out.println("run() by " + t);
        System.out.println(  "eh = " + t.getUncaughtExceptionHandler());
        throw new RuntimeException();
    }
}

class MyUncaughtExceptionHandler implements
        Thread.UncaughtExceptionHandler {
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("caught " + e);
    }
}

Then

class HandlerThreadFactory implements ThreadFactory {
    public Thread newThread(Runnable r) {
        System.out.println(this + " creating new Thread");
        Thread t = new Thread(r);
        System.out.println("created " + t);
        t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
        System.out.println( "eh = " + t.getUncaughtExceptionHandler());
        return t;
    }
}
public class CaptureUncaughtException {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
        exec.execute(new ExceptionThread2());
        exec.shutdown();
    }
}

或者简单一点

public class SettingDefaultHandler {
    public static void main(String[] args) {
        Thread.setDefaultUncaughtExceptionHandler(
                new MyUncaughtExceptionHandler());
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new ExceptionThread());
    }
}

资源共享问题

葫芦娃在二维空间中可以随意走,多个葫芦娃在空间上如果随意走,那就会撞头。

因为一个空间位置,是不能共享的。

示例:EvenGenerator

abstract class IntGenerator {
    private volatile boolean canceled = false;
    public abstract int next();
    public void cancel() { canceled = true; }
    public boolean isCanceled() { return canceled; }
}
public class EvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;
    public int next() {
        ++currentEvenValue; // Danger point here!
        ++currentEvenValue;
        return currentEvenValue;
    }
}

EvenChecker

public class EvenChecker implements Runnable {
    private IntGenerator generator;
    private final int id;
    public EvenChecker(IntGenerator g, int ident) {
        generator = g; id = ident;
    }
    public void run() {
        while (!generator.isCanceled()) {
            int val = generator.next();
            if (val % 2 != 0) {
                System.out.println(val + " not even!"); generator.cancel(); // Cancels all EvenCheckers
            }
        }
    }
    public static void test(IntGenerator gp, int count) {
        System.out.println("Press Control-C to exit");
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < count; i++) { exec.execute(new EvenChecker(gp, i));}
        exec.shutdown();
    }
    public static void main(String[] args) {
        EvenChecker.test(new EvenGenerator(), 10);
    }
}

问题

In a multi-threaded environment, a race condition occurs when two or more threads attempt to update mutable shared data at the same time. Java offers a mechanism to avoid race conditions by synchronizing thread access to shared data.

对资源加锁,使得对资源的访问顺序化,确保在某一时刻只有一个任务在使用共享资源(使其互斥)

Mutual Exclusion (Mutex)

Lock

public class MutexEvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;
    private Lock lock = new ReentrantLock();

    public int next() {
        //加锁
        lock.lock();  // block until condition holds
        try {
            ++currentEvenValue;
            Thread.yield();
            ++currentEvenValue;
            return currentEvenValue;
        } finally {
            lock.unlock();   //一定要用try-catch的finally去释放锁
        }
    }
    public static void main(String[] args) {
        EvenChecker.test(new MutexEvenGenerator(), 10);
    }
} 

ReentrantLock

public class ReentrantLock extends Object implements Lock, Serializable

A ReentrantLock is owned by the thread last successfully locking, but not yet unlocking it. A thread invoking lock will return, successfully acquiring the lock, when the lock is not owned by another thread. The method will return immediately if the current thread already owns the lock.

 if (lock.tryLock() ||
     lock.tryLock(timeout, unit)) {
   ...
 }

Synchronized

public class SynchronizedEvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;

    public synchronized int next() {
        ++currentEvenValue;
        Thread.yield(); 
        ++currentEvenValue;
        return currentEvenValue;
    }

    public static void main(String[] args) {
        EvenChecker.test(new SynchronizedEvenGenerator(), 10);
    }
}

Critical Sections 临界区

synchronized(syncObject){
    //balabala
}

加锁代码片段

示例

class PairManager1 extends PairManager {
    public synchronized void increment() { // Synchronize the entire method:
        p.incrementX();
        p.incrementY();
        store(getPair());
    }
}
class PairManager2 extends PairManager {
    public void increment() {
        Pair temp;
        synchronized (this) { // Use a critical section:
            p.incrementX();
            p.incrementY();
            temp = getPair();
        }
        store(temp);
    }
}

Lock on Object

Java的每个对象都关联一个管程(monitor)以实现多个线程执行该对象上同步(synchronized)方法调用时的互斥。一个线程执行这个对象上的同步方法时JVM检查该对象的管程:

  • 如果该对象管程未被占有,当前调用线程可获得所有权并被允许执行改方法;
  • 如果管程被另一个线程所有,则调用线程需要等待管程被释放。

当一个线程完成同步方法调用时,它释放管程所有权,等待该管程的线程被允许执行同步方法。

举个例子

class Counter{
    private int count = 0;
    public void Increment() {
        int n = count;
        count = n+1;
    }
}

如前所述,多个线程并发调用 counter.Increment();时存在竞争(race condition)。

可能的结果

Thread 1 Thread 2 Count
counter.Increment(); --- 0
n = count; // 0 --- 0
--- counter.Increment(); 0
--- n = count; // 0 0
--- count = n + 1; // 1 1
count = n + 1; // 1 --- 1

使之synchronized

class Counter{
    private int count = 0;
    public void synchronized Increment() {
        int n = count;
        count = n+1;
    }
}

正确的结果

Thread 1 Thread 2 Count
counter.Increment(); --- 0
(acquires the monitor) --- 0
n = count; // 0 --- 0
--- counter.Increment(); 0
--- (can't acquire monitor) 0
count = n + 1; // 1 --- (blocked) 1
(releases the monitor) --- (blocked) 1
---------------------- (acquires the monitor) 1
---------------------- n = count; // 1 1
--------------- count = n + 1; // 2 2
-------------------- (releases the monitor) 2

举例:生产者消费者

class Buffer {
    private char[] buffer;
    private int count = 0, in = 0, out = 0;
    Buffer(int size) {
        buffer = new char[size];
    }
    public synchronized void Put(char c) {
        while (count == buffer.length) ;
        System.out.println("Producing " + c + " ...");
        buffer[in] = c;
        in = (in + 1) % buffer.length;
        count++;
    }
    public synchronized char Get() {
        while (count == 0) ;
        char c = buffer[out];
        out = (out + 1) % buffer.length;
        count--;
        System.out.println("Consuming " + c + " ...");
        return c;
    }
}
class Producer extends Thread {
    private Buffer buffer;
    Producer(Buffer b) {
        buffer = b;
    }
    public void run() {
        for (int i = 0; i < 10; i++) {
            buffer.Put((char) ('A' + i % 26));
        }
    }
}
class Consumer extends Thread {
    private Buffer buffer;
    Consumer(Buffer b) {
        buffer = b;
    }
    public void run() {
        for (int i = 0; i < 10; i++) {
            buffer.Get();
        }
    }
}
public class PC {
    public static void main(String[] args) {
        Buffer b = new Buffer(4);
        Producer p = new Producer(b);
        Consumer c = new Consumer(b);

        p.start();
        c.start();
    }
}
Producing A ...
Producing B ...
Producing C ...
Producing D ...

Thread coordination

If buffer is full, wait until consumer gets.

public synchronized void Put(char c) {
    while(count == buffer.length) {
        try { wait(); }
        catch (InterruptedException e) { } 
        finally { } 
    } 
    System.out.println("Producing " + c + " ...");
    buffer[in] = c; 
    in = (in + 1) % buffer.length; 
    count++; 
    notify(); 
}

Thread coordination

If buffer is empty, wait until producer puts.

public synchronized char Get() {
    while (count == 0) {
        try { wait(); }
        catch (InterruptedException e) { } 
        finally { } 
    } 
    char c = buffer[out]; 
    out = (out + 1) % buffer.length;
    count--;
    System.out.println("Consuming " + c + " ..."); 
    notify(); 
    return c;
}

wait & notify()/notifyAll()

The wait() method suspends the calling thread and temporarily releases ownership of the monitor (so it allows other threads to acquire the monitor). The suspended thread that called wait() wakes up only when another thread calls notify() or notifyAll() on that object.

还是看个例子

class Car {
    private boolean waxOn = false;

    public synchronized void wax() {
        System.out.println("Wax On by " + Thread.currentThread().getName());
        waxOn = true;
        notifyAll();
    }
    public synchronized void buff() {
        System.out.println("Wax Off by " + Thread.currentThread().getName());
        waxOn = false;
        notifyAll();
    }

    public synchronized void waitForWaxing() throws InterruptedException {
        while (waxOn == false)
            wait();
    }
    public synchronized void waitForBuffing() throws InterruptedException {
        while (waxOn == true)
            wait();
    }
}
class WaxOn implements Runnable {
    private Car car;
    private String name;

    public WaxOn(Car c, String name) {
        this.car = c;
        this.name = name;
    }

    public void run() {
        Thread.currentThread().setName(name);
        try {
            while (!Thread.interrupted()) {
                car.waitForBuffing();
                TimeUnit.MILLISECONDS.sleep(200);
                car.wax();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax On task");
    }
}
class WaxOff implements Runnable {
    private Car car;
    private String name;

    public WaxOff(Car c, String name) {
        this.car = c;
        this.name = name;
    }

    public void run() {
        Thread.currentThread().setName(name);
        try {
            while (!Thread.interrupted()) {
                car.waitForWaxing();
                TimeUnit.MILLISECONDS.sleep(500);
                car.buff();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax Off task");
    }
}
public class WaxOMatic {
    public static void main(String[] args) throws Exception {
        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car, "A-OFF"));
        exec.execute(new WaxOn(car, "B-ON"));
        exec.execute(new WaxOn(car, "C-ON"));

        TimeUnit.SECONDS.sleep(5); // Run for a while...
        exec.shutdownNow(); // Interrupt all tasks
    }
}

线程状态

wait和sleep的区别

  • 调用wait方法时,线程在等待的时候会释放掉它所获得的monitor,但是调用Thread.sleep()方法时,线程在等待的时候仍然会持有monitor或者锁,wait方法应在同步代码块中调用,但是sleep方法不需要
  • Thread.sleep()方法是一个静态方法,作用在当前线程上;但是wait方法是一个实例方法,并且只能在其他线程调用本实例的notify()方法时被唤醒

如果需要暂停线程一段特定的时间就使用sleep()方法,如果要实现线程间通信就使用wait()方法。

Thread local Storage

示例

class Accessor implements Runnable {
    private final int id;

    public Accessor(int idn) {
        id = idn;
    }

    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            ThreadLocalVariableHolder.increment();
            System.out.println(this);
            Thread.yield();
        }
    }

    public String toString() {
        return "#" + id + ": " + ThreadLocalVariableHolder.get();
    }
}
public class ThreadLocalVariableHolder {
    private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
        private Random rand = new Random(47);
        protected synchronized Integer initialValue() {
            return rand.nextInt(10000);
        }
    };

    public static void increment() {
        value.set(value.get() + 1);
    }

    public static int get() {
        return value.get();
    }

    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++)
            exec.execute(new Accessor(i));
        TimeUnit.SECONDS.sleep(3);  // Run for a while
        exec.shutdownNow();         // All Accessors will quit
    }
}







高级篇

高级设施

  • CountDownLatch
  • CyclicBarrier
  • DelayQueue
  • PriorityBlockingQueue
  • ScheduledExector
  • Semaphore
  • Exchanger

CountDownLatch

public class CountDownLatch extends Object

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html

用法

A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately.

A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

N个等1个

class Driver { // ...
   void main() throws InterruptedException {
     CountDownLatch startSignal = new CountDownLatch(1);
     CountDownLatch doneSignal = new CountDownLatch(N);

     for (int i = 0; i < N; ++i) // create and start threads
       new Thread(new Worker(startSignal, doneSignal)).start();

     doSomethingElse();            // don't let run yet
     startSignal.countDown();      // let all threads proceed
     doSomethingElse();
     doneSignal.await();           // wait for all to finish
   }
 }
 class Worker implements Runnable {
   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;
   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
   }
   public void run() {
     try {
       startSignal.await();
       doWork();
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

1个等N个

class Driver2 { // ...
   void main() throws InterruptedException {
     CountDownLatch doneSignal = new CountDownLatch(N);
     Executor e = ...

     for (int i = 0; i < N; ++i) // create and start threads
       e.execute(new WorkerRunnable(doneSignal, i));

     doneSignal.await();           // wait for all to finish
   }
 }
 class WorkerRunnable implements Runnable {
   private final CountDownLatch doneSignal;
   private final int i;
   WorkerRunnable(CountDownLatch doneSignal, int i) {
     this.doneSignal = doneSignal;
     this.i = i;
   }
   public void run() {
     try {
       doWork(i);
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

CyclicBarrier

 public class CyclicBarrier extends Object

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

ScheduledThreadPoolExecutor

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService

A ThreadPoolExecutor that can additionally schedule commands to run after a given delay, or to execute periodically. This class is preferable to Timer when multiple worker threads are needed, or when the additional flexibility or capabilities of ThreadPoolExecutor (which this class extends) are required.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html

Semaphore

public class Semaphore extends Object implements Serializable

A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Semaphore.html

Semaphore vs. Mutex

Mutex acts similarly to a binary semaphore, we can use it to implement mutual exclusion.

Exchanger

public class Exchanger<V> extends Object

A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Exchanger.html

DelayQueue

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>

An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/DelayQueue.html

<small>https://docs.oracle.com/javase/tutorial/essential/concurrency/index.html</small>