Java多线程学习记录(一)

13

多线程实现方法

对于JDK21之前的JAVA来说(未实现协程),主要通过以下三种办法实现多线程

  1. 继承Thread类并重写run方法

  2. 实现Runable接口

  3. 实现Callable接口

写法分别为以下三种:

    class ThreadA extends Thread {
        @Override
        public void run(){
            //  这里写业务代码
        }
    }

    class ThreadB implements Runnable{

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                System.out.println("线程A正在执行"+i);
            }
        }
    }
    
    static class CallableTask implements Callable<String> {

        @Override
        public String call() throws Exception {
            return "";
        }
    }

继承Thread和实现Runnable区别不大,在应用的时候常常使用后者,只定义可运行的内容,编写代码更加灵活,下面是一个示例:

public class Main {
    static class ThreadA implements Runnable{
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                System.out.println("线程A正在执行"+i);
            }
        }
    }
    static class ThreadB implements Runnable{
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                System.out.println("线程B正在执行"+i);
            }
        }
    }

      public static void main(String[] args) throws IOException {
        new Thread(new ThreadA()).start();
        new Thread(new ThreadB()).start();
  }
}

新建Thread对象,然后把已实现的ThreadA传入Thread对象,最后执行start,就可以启动多个线程了,这段代码的结果打印出来可以发现他们在交替打印结果。

然后是Callable的用法,CallableTask就按上面的例子来声明,然后具体代码这么写:

    public static void main(String[] args) throws IOException {
        new Thread(
                ()-> {
                    try {
                        String result = new CallableTask().call();
                        System.out.printf(result);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
        ).start();

thread并不能直接用callable传入,而是在声明thread的过程中调用callbale方法,如果需要管理callable的执行过程,可以和FutureTask一起使用,下面是示例代码。从感性上看,futureTask可以看作面向线程执行过程中的一个调度切片,监控线程状态,并且可以返回计算结果(和直接写Callable的方法一样,FutureTask都可能会抛出异常,所以必须使用异常处理)

    public static void main(String[] args) throws Exception {
//        将要执行的任务
        CallableTask callableTask = new CallableTask();
        FutureTask<String> futureTask = new FutureTask<>(callableTask);
        new Thread(futureTask).start();

        String result = futureTask.get();
        System.out.printf(result);
    }
}

P.S:这里感觉还是Go的channel设计得更加直观。。。

同步锁

内部锁(synchronized

synchronized的用法比较多变,可以锁住类,也可以锁住部分代码段,下面的总结主要是从JavaGuide和ChatGPT抄出来的,首先看用法,再来写写例子:

  • 同步实例方法:通过将整个实例方法声明为synchronized,使得同一时间只允许一个线程执行该方法。

public synchronized void instanceMethod() {
    // 方法体
}
  • 同步静态方法:使用synchronized修饰静态方法,使得同一时间只能有一个线程执行该静态方法。此时锁是针对类的Class对象

public static synchronized void staticMethod() {
    // 方法体
}
  • 同步代码块:使用synchronized关键字声明一个代码块,可以指定锁住的对象,从而只对该代码块中的代码进行同步。

public void method() {
    synchronized (this) { // 锁定当前对象
        // 需要同步的代码
    }
}

对于1.2,我理解为,一个是锁定一个实例,一个是锁定当前类。对于同步实例方法来说,可以理解为声明一个门,就会生成对应的一把锁,如果声明了多个门,就会有多把锁,但是如果修饰静态方法,对于当前类来说,无论声明多少类,都只会有一把锁。

显性锁(ReentrantLock和信号量

这里直接写ReentrantLock的写法吧,其实和操作系统的PV操作一样,ReentrantLock实际上就是显式声明的“MUTEX”,这里直接接着上面的写,实现

public class Main {

    private static final ReentrantLock lock = new ReentrantLock();
    static class ThreadA implements Runnable{

        @Override
        public void run() {
            lock.lock();
            for (int i = 0; i < 100; i++) {
                System.out.println("线程A正在执行"+i);
            }
            lock.unlock();
        }
    }

    static class ThreadB implements Runnable{

        @Override
        public void run() {
            lock.lock();
            for (int i = 0; i < 100; i++) {
                System.out.println("线程B正在执行"+i);
            }
            lock.unlock();
        }
    }


    public static void main(String[] args) throws Exception {
        new Thread(new ThreadA()).start();
        new Thread(new ThreadB()).start();
    }
}

打印出来会发现,线程B会等待线程A的所有For执行完成后,才会开始执行,这样就可以较为灵活地控制线程之间的同步

同时Java也有操作系统中的信号量,可以使用 Semaphore lock = new Semaphore(10) 这种方法来声明,控制线程访问某个资源的数量

对于显性锁来说,还有较为乐观的锁,Java都有所实现,比如读写锁和CAS,这些放在稍微后面一些来讲。

线程池

线程池的概念

线程池的基本思想是将线程的创建和管理交给一个池化的管理器。线程池会预先创建一定数量的线程,并在任务需要执行时从池中获取空闲线程来执行任务,任务完成后线程会被返回池中等待下一个任务。使用Executors.newFixedThreadPool()可以声明一个固定大小的线程池,然后使用线程池的submit函数来提交任务,主要方法如下:

  1. 提交任务

    • <T> Future<T> submit(Callable<T> task):提交一个可调用任务并返回一个 Future 对象,以便将来获取结果。

    • Future<?> submit(Runnable task):提交一个Runnable任务并返回一个代表该任务的Future。

  2. 执行任务

    • void execute(Runnable command):执行给定的任务。

  3. 关闭服务

    • void shutdown():开始关闭ExecutorService,已经提交的任务会继续执行,但不接受新的任务。

    • List<Runnable> shutdownNow():尝试停止所有正在执行的任务,停掉等待的任务,并返回等待执行的任务列表。

  4. 任务管理

    • boolean isShutdown():如果ExecutorService已经被关闭,则返回true。

    • boolean isTerminated():如果所有任务已完成并且ExecutorService已经被关闭,则返回true。

    • boolean awaitTermination(long timeout, TimeUnit unit):在关闭后等待所有任务完成,直到超时或被中断。

  5. 获取状态信息

    • int getActiveCount():获取当前正在执行的任务数(通常是 ThreadPoolExecutor 的方法)。

    • int getPoolSize():获取当前线程池的大小(通常是 ThreadPoolExecutor 的方法)。

  6. 创建新的任务(这些方法通常在 Executors 工具类中实现):

    • static ExecutorService newFixedThreadPool(int nThreads):创建一个固定数量的线程池。

    • static ExecutorService newCachedThreadPool():创建一个可缓存的线程池,能够根据需要创建新线程。

    • static ExecutorService newSingleThreadExecutor():创建一个只包含一个线程的线程池。

应用

在学习以上几个方法之后,尝试写了一个多线程下载器,以下是具体实现代码。

package org.example;

import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;

public class MThreadDownloader{
    private static final int THREAD_COUN = 5;

    private static final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUN);

    public void downloadFile(String url,String path) throws IOException {
//        获取文件长度
        HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
//        这里的意义是:只请求请求头,不请求请求体,和get方法类似
        connection.setRequestMethod("HEAD");
        int fileSize = connection.getContentLength();
        connection.disconnect();
//      计算切分之后的大小
        int partSize = fileSize / THREAD_COUN;
        RandomAccessFile accessFile = new RandomAccessFile(path,"rw");
        accessFile.setLength(fileSize);
//      开始分配任务并并发执行
        for (int i = 0; i < THREAD_COUN; i++) {
            final int threadId = i;
            executorService.submit(
                    ()-> {
                     try {
                         int start = threadId * partSize;
                         int end = (threadId +1 == THREAD_COUN)? fileSize: (threadId + 1)*partSize -1;

                         HttpURLConnection partConn = (HttpURLConnection) new URL(url).openConnection();
                         partConn.setRequestProperty("Range","bytes="+start+"-"+end);
                         InputStream inputStream = partConn.getInputStream();

                         synchronized (accessFile) {
                             accessFile.seek(start);
                             byte[] buffer = new byte[1048576];
                             int byteRead;
//                             一直读到缓冲区没有数据,就结束写入
                             while ((byteRead = inputStream.read(buffer))!= -1) {
                                 accessFile.write(buffer,0,byteRead);
                             }
                         }
                         inputStream.close();
                         partConn.disconnect();

                     } catch (IOException e){
                         e.printStackTrace();
                     }
                    }
            );

        }
        executorService.shutdown();
//        等待线程池执行完成并终结
        while (!executorService.isTerminated()) {

        }
//        关闭accessfile
        accessFile.close();
        System.out.println("下载成功!");
    }
}