Java并发编程:如何实现线程间通信的N种场景和对应的实现方式


正常情况下,每个子线程完成各自的任务就可以结束了。不过有的时候,我们希望多个线程协同工作来完成某个任务,这时就涉及到了线程间通信了。

1. 文前说明

1.1 本文涉及到的知识点

  • thread.join()
  • object.wait()
  • object.notify()
  • CountDownLatch
  • CyclicBarrier
  • FutureTask
  • Callable

1.2 本文主要的切入点

  • 如何让两个线程依次执行?
  • 那如何让两个线程按照指定方式有序交叉运行呢?
  • 四个线程 A B C D,其中 D 要等到 A B C 全执行完毕后才执行,而且 A B C 是同步运行的
  • 三个运动员各自准备,等到三个人都准备好后,再一起跑
  • 子线程完成某件任务后,把得到的结果回传给主线程

2. 如何让两个线程依次执行

这里其实很简单,如一个主线程main和一个子线程son,现在在main线程开始执行的时候启动子线程son,此时主线程mian和子线程son是异步执行的,但是当main执行到后续逻辑的一定位置后,需要等待子线程执行完后才能执行后面的逻辑。因此这里就涉及到执行顺序的问题,代码示例如下:

public static void main(String[] args) throws InterruptedException {
     Thread thread = new Thread(new Runnable() {
        public void run() {
            System.out.println("son==>print son thread info");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    thread.start();
    thread.join();
    System.out.println("main==>print main thread info");
}

执行结果:

son==>print son thread info
main==>print main thread info

3. 如何让两个线程按照指定方式有序交叉运行

交叉执行换一种说法就是一个线程执行到一定的位置,然后进入等待状态,让另一个线程执行,等另一个线程执行到一定位置又进入等待状态,让之前的线程执行,直到两个线程都执行结束,往往这里就会用到锁。示例代码如下:

public static void main(String[] args) throws InterruptedException {
    final Object lock = new Object();
    Thread threadA = new Thread(new Runnable() {
        public void run() {
            try {
                synchronized (lock) {
                    lock.wait();
                    System.out.println("threadA==>print A1");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    Thread threadB = new Thread(new Runnable() {
        public void run() {
            synchronized (lock) {
                System.out.println("threadB==>print B1");
                System.out.println("threadB==>print B2");
                lock.notify();
            }

        }
    });
    threadA.start();
    Thread.sleep(100);//这里保证是A线程先执行,B线程后执行
    threadB.start();
}

执行结果:

threadB==>print B1
threadB==>print B2
threadA==>print A1

4. 四个线程A、B、C、D,其中D要等到ABC都执行完毕后才执行,且ABC是同步运行

涉及到一个线程等待多个线程执行到一定位置后再执行,就不能使用上面的简单锁,在JDK中提供了一个很好用的类,就是CountDownLatch,这个类有计数功能,当ABC三个线程执行到一定的位置,触发CountDownLatch中的计数减一方法,当CountDownLatch内的计数达到0后,就会触发当前等待的线程D,让D继续往后面执行。

用例子说明,就是ABCD四个人配合打一张副本图,D先到传送门处等待进入下一关,但是需要ABC拿到三把钥匙,将传送门上的三把锁打开,这样D才能顺利进入下一关。

示例代码如下:

//Main主线程就相当于D线程,其他另外的三个线程分别是ABC
public static void main(String[] args) throws InterruptedException {
    //初始化计数数量
    final CountDownLatch countDownLatch = new CountDownLatch(3);
    Thread threadA = new Thread(new Runnable() {
        public void run() {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("threadA==>print A1");
            countDownLatch.countDown();//计数减一
        }
    });
    Thread threadB = new Thread(new Runnable() {
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("threadB==>print B1");
            countDownLatch.countDown();//计数减一
        }
    });
    Thread threadC = new Thread(new Runnable() {
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("threadC==>print C1");
            countDownLatch.countDown();//计数减一
        }
    });
    threadA.start();
    threadB.start();
    threadC.start();
    countDownLatch.await();//阻塞,等待CountDownLatch计数减到0
    System.out.println("main==>print main");
}

执行结果:

threadA==>print A1
threadB==>print B1
threadC==>print C1
main==>print main

5. 三个运动员各自准备,等三个人都准备好,一起跑

这个和上一个的实现不同之处就是三个线程ABC都需要达到同一个位置后,才能各自的开始下一步的执行过程,没有单个线程等待其他线程执行的机制,因为需要每个线程都要获取到钥匙。同样在JDK中提供了一个很好用的类是CyclicBarrier(栅栏)。示例代码如下:

public static void main(String[] args) throws InterruptedException {
        //初始化计数数量
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        Thread threadA = new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(100);
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("threadA==>print A1");
            }
        });
        Thread threadB = new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(1000);
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("threadB==>print B1");
            }
        });
        Thread threadC = new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(4000);
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("threadC==>print C1");
            }
        });
        threadA.start();
        threadB.start();
        threadC.start();
        System.out.println("main==>print main");
    }

首先输出的是main==>print main,然后会等4秒一次性数据三个线程中的结果,输出如下:

main==>print main
threadC==>print C1
threadA==>print A1
threadB==>print B1

6. 子线程完成某个任务后,把得到的结果回传给主线程

这个就很简单了,在说线程的时候,往往会涉及到两种,一种是没有返回值的,一般使用Runnable,一种是有返回值的Callable。这里明显是需要有返回值,因此肯定使用到Callable,但是仅使用Callable是不行的,需要使用FutureTask来将返回值封装起来,提供给主线程获取返回值。示例代码如下:

public static void main(String[] args) throws InterruptedException, ExecutionException {
    Callable<String> callable = new Callable<String>() {
        public String call() throws Exception {
            Thread.sleep(2000);
            return "callable return msg";
        }
    };
    FutureTask<String> futureTask = new FutureTask<String>(callable);//用FutureTask包装Callable
    new Thread(futureTask).start();//启动线程
    System.out.println("子线程返回结果:" + futureTask.get());
}

输出结果如下:

子线程返回结果:callable return msg

这里让子线程里面休眠2秒钟,是为了模拟FutureTask内的get方式是阻塞的,当子线程没有执行结束,会一直停留在get方法的位置,等待响应数据。

这里为什么需要用FutureTask来包装Callable,去看一下FutureTask的继承关系就可以了解了。

7. 参考文章


文章作者: 程序猿洞晓
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 程序猿洞晓 !
评论
 上一篇
Java输入和输出流关闭的顺序和关闭的姿势对比理解 Java输入和输出流关闭的顺序和关闭的姿势对比理解
Java的流操作在实际应用中使用的很多,但是流的关闭顺序到底有没有要求,关闭流的顺序和节点流与处理流有什么关系,输出流和输入流又有什么区别,然后就是关闭流可以通过哪几种方式。这篇文章将会和大家讨论一下。……。整个篇幅说了节点流和处理流的区别,然后根据输出流和输入流讨论流的关闭顺序问题,然后就是流不同的关闭姿势说明,最后比较流不同关闭姿势的优缺点。
2018-10-25
下一篇 
ConcurrentHashMap简单的实现思想理解 ConcurrentHashMap简单的实现思想理解
关于说ConcurrentHashMap的文章很多,本博客也有转载这样的文章,但是总体觉得都是过于偏重源码的说明。没有很明确的结构图来让我们从整体上理解源码的实现过程。毕竟人都是偏向于懒,博客中源码过于太多,理解起来困难,再加上这种源码分析的篇幅很长,因此能真正看完理解的确不多(个人理解)。所以这篇文章里面将不贴出源码,完全……
2018-10-18
  目录