benym的知识笔记 benym的知识笔记
🦮首页
  • Java

    • Java-基础
    • Java-集合
    • Java-多线程与并发
    • Java-JVM
    • Java-IO
  • Python

    • Python-基础
    • Python-机器学习
  • Kafka
  • Redis
  • MySQL
  • 分布式事务
  • Spring

    • SpringIOC
    • SpringAOP
🦌设计模式
  • 剑指Offer
  • LeetCode
  • 排序算法
🐧实践
  • Rpamis

    • Utils
    • Exception
    • Security
  • 归档
  • 标签
  • 目录
🦉里程碑
🐷关于
GitHub (opens new window)

benym

惟其艰难,才更显勇毅🍂惟其笃行,才弥足珍贵
🦮首页
  • Java

    • Java-基础
    • Java-集合
    • Java-多线程与并发
    • Java-JVM
    • Java-IO
  • Python

    • Python-基础
    • Python-机器学习
  • Kafka
  • Redis
  • MySQL
  • 分布式事务
  • Spring

    • SpringIOC
    • SpringAOP
🦌设计模式
  • 剑指Offer
  • LeetCode
  • 排序算法
🐧实践
  • Rpamis

    • Utils
    • Exception
    • Security
  • 归档
  • 标签
  • 目录
🦉里程碑
🐷关于
GitHub (opens new window)
  • Java-基础

    • Java反射获取类对象的三种方式
    • 动态代理
  • Java-集合

    • ArrayList的扩容机制
  • Java-多线程与并发

    • Java多线程实现的几种方式
    • 多线程交替打印数字—多种实现
    • CountDownLatch使用方法
    • CyclicBarrier使用方法
    • Semaphore使用方法
      • CompletableFuture常用用法及踩坑
    • Java-JVM

      • 自定义类加载器
      • JMH-基准测试框架
    • Java-IO

      • 概览
    • Java
    • Java-多线程与并发
    benym
    2022-01-18
    目录

    Semaphore使用方法

    # Semaphore使用方法

    Semaphore可以翻译为信号量,Semaphore可以控制同时访问的线程个数,通过acquire()获取一个许可,如果没有许可就等待,release()方法则可以释放一个许可

    # 构造方法

        /**
         * Creates a {@code Semaphore} with the given number of
         * permits and nonfair fairness setting.
         *
         * @param permits the initial number of permits available.
         *        This value may be negative, in which case releases
         *        must occur before any acquires will be granted.
         */
        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
        /**
         * Creates a {@code Semaphore} with the given number of
         * permits and the given fairness setting.
         *
         * @param permits the initial number of permits available.
         *        This value may be negative, in which case releases
         *        must occur before any acquires will be granted.
         * @param fair {@code true} if this semaphore will guarantee
         *        first-in first-out granting of permits under contention,
         *        else {@code false}
         */
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    • permits表示许可线程的数量
    • fair表示公平性,如果为true则线程为先进先出

    # 常用方法

    public void acquire() throws InterruptedException {  }     //获取一个许可
    public void acquire(int permits) throws InterruptedException { }    //获取permits个许可
    public void release() { }          //释放一个许可
    public void release(int permits) { }    //释放permits个许可
    
    1
    2
    3
    4
    1
    2
    3
    4

    acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可

    release()用来释放许可。注意,在释放许可之前,必须先获获得许可

    这4个方法都会被阻塞,如果想立即执行得到结果,可以使用以下方法:

    //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
    public boolean tryAcquire() { };    
    //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
    public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };  
    //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
    public boolean tryAcquire(int permits) { }; 
    //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; 
    
    1
    2
    3
    4
    5
    6
    7
    8
    1
    2
    3
    4
    5
    6
    7
    8

    另外还可以通过availablePermits()方法得到可用的许可数目。

    # 使用案例

    **案例一:**假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现

    public class Test {
    
        public static void main(String[] args) {
            // 工人数目
            int n = 8;
            // 机器数目
            Semaphore semaphore = new Semaphore(5);
            for (int i = 0; i < n; i++) {
                int finalI = i;
                new Thread(() -> {
                    try {
                        semaphore.acquire();
                        System.out.println("工人" + finalI + "占用一个机器在生产");
                        Thread.sleep(2000);
                        System.out.println("工人" + finalI + "释放出机器");
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23

    在获取许可之后,我们sleep一下当前线程,让他不要那么快进行释放,观察运行的结果

    运行结果

    工人0占用一个机器在生产
    工人3占用一个机器在生产
    工人2占用一个机器在生产
    工人1占用一个机器在生产
    工人4占用一个机器在生产
    工人3释放出机器
    工人4释放出机器
    工人0释放出机器
    工人1释放出机器
    工人2释放出机器
    工人6占用一个机器在生产
    工人5占用一个机器在生产
    工人7占用一个机器在生产
    工人6释放出机器
    工人7释放出机器
    工人5释放出机器
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16

    可以发现,当规定Semaphore的permits为5时,最多有5个线程获取许可,剩余的线程必须等待许可释放之后才能获取许可

    **案例二:**流量控制

    Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求, 要读取几万个文件的数据,因为都是 IO 密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有 10 个,这时我们必须控制只有 10 个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用 Semaphore 来做流量控制。

    public class ConnectionSemaphore {
    
        private final static int CONNECTION_SIZE = 10;
        // 两个信号量,分别表示可用连接和已用连接
        private final Semaphore userFulLink, useLessLink;
        // 存放数据库链接的容器,这里用Integer代替
        private static final LinkedList<Integer> connectionPool = new LinkedList<>();
    
        static {
            for (int i = 0; i < CONNECTION_SIZE; i++) {
                connectionPool.addLast(i);
            }
        }
    
        public ConnectionSemaphore(Semaphore userFulLink, Semaphore useLessLink) {
            this.userFulLink = userFulLink;
            this.useLessLink = useLessLink;
        }
    
        /*归还连接*/
        public void returnConnect(Integer connection) throws InterruptedException {
            if (connection != null) {
                System.out.println(
                        "当前有" + userFulLink.getQueueLength() + "个线程等待数据库连接," + "可用连接数:" + userFulLink
                                .availablePermits());
                useLessLink.acquire();
                synchronized (connectionPool) {
                    connectionPool.addLast(connection);
                }
                userFulLink.release();
            }
        }
    
        /*从池子拿连接*/
        public Integer takeConnect() throws InterruptedException {
            userFulLink.acquire();
            Integer connection;
            synchronized (connectionPool) {
                connection = connectionPool.removeFirst();
            }
            useLessLink.release();
            return connection;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    /**
     * 测试类
     */
    public class Test {
    
        private static ConnectionSemaphore connectionSemaphore = new ConnectionSemaphore();
    
        private static class testThread implements Runnable {
    
            @Override
            public void run() {
                // 模拟每个线程的不同持有时间
                Random randomTime = new Random();
                long start = System.currentTimeMillis();
                try {
                    Integer connect = connectionSemaphore.takeConnect();
                    System.out.println(Thread.currentThread().getName()
                            + "_获取数据库连接共耗时【" + (System.currentTimeMillis() - start) + "】ms.");
                    // 模拟业务,线程持有连接查询数据
                    Thread.sleep(100 + randomTime.nextInt(100));
                    System.out.println(Thread.currentThread().getName()+"_查询数据完成,释放连接");
                    connectionSemaphore.returnConnect(connect);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 15; i++) {
                Thread thread = new Thread(new testThread());
                thread.start();
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36

    运行结果

    Thread-14_获取数据库连接共耗时【0】ms.
    Thread-13_获取数据库连接共耗时【0】ms.
    Thread-8_获取数据库连接共耗时【0】ms.
    Thread-0_获取数据库连接共耗时【1】ms.
    Thread-10_获取数据库连接共耗时【0】ms.
    Thread-11_获取数据库连接共耗时【0】ms.
    Thread-6_获取数据库连接共耗时【0】ms.
    Thread-12_获取数据库连接共耗时【0】ms.
    Thread-1_获取数据库连接共耗时【2】ms.
    Thread-9_获取数据库连接共耗时【0】ms.
    Thread-0_查询数据完成,释放连接
    Thread-14_查询数据完成,释放连接
    当前有5个线程等待数据库连接,可用连接数:0
    当前有5个线程等待数据库连接,可用连接数:0
    Thread-7_获取数据库连接共耗时【180】ms.
    Thread-9_查询数据完成,释放连接
    当前有3个线程等待数据库连接,可用连接数:0
    Thread-4_获取数据库连接共耗时【179】ms.
    Thread-5_获取数据库连接共耗时【181】ms.
    Thread-8_查询数据完成,释放连接
    当前有2个线程等待数据库连接,可用连接数:0
    Thread-3_获取数据库连接共耗时【185】ms.
    Thread-10_查询数据完成,释放连接
    当前有1个线程等待数据库连接,可用连接数:0
    Thread-2_获取数据库连接共耗时【190】ms.
    Thread-12_查询数据完成,释放连接
    Thread-1_查询数据完成,释放连接
    当前有0个线程等待数据库连接,可用连接数:0
    当前有0个线程等待数据库连接,可用连接数:0
    Thread-13_查询数据完成,释放连接
    当前有0个线程等待数据库连接,可用连接数:2
    Thread-6_查询数据完成,释放连接
    当前有0个线程等待数据库连接,可用连接数:3
    Thread-11_查询数据完成,释放连接
    当前有0个线程等待数据库连接,可用连接数:4
    Thread-3_查询数据完成,释放连接
    当前有0个线程等待数据库连接,可用连接数:5
    Thread-2_查询数据完成,释放连接
    当前有0个线程等待数据库连接,可用连接数:6
    Thread-5_查询数据完成,释放连接
    当前有0个线程等待数据库连接,可用连接数:7
    Thread-7_查询数据完成,释放连接
    当前有0个线程等待数据库连接,可用连接数:8
    Thread-4_查询数据完成,释放连接
    当前有0个线程等待数据库连接,可用连接数:9
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45

    从打印结果可以看出,一次只有 10 个线程执行acquire(),只有线程进行release()方法后才会有别的线程执行acquire()。看到这里或许会疑惑在takeConnect中获取许可的是可用连接userFulLink,而释放许可的是useLessLink,在Semaphore中的release方法原本的注释如下

        /**
         * Releases a permit, returning it to the semaphore.
         *
         * <p>Releases a permit, increasing the number of available permits by
         * one.  If any threads are trying to acquire a permit, then one is
         * selected and given the permit that was just released.  That thread
         * is (re)enabled for thread scheduling purposes.
         *
         * <p>There is no requirement that a thread that releases a permit must
         * have acquired that permit by calling {@link #acquire}.
         * Correct usage of a semaphore is established by programming convention
         * in the application.
         */
        public void release() {
            sync.releaseShared(1);
        }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16

    释放许可证,将其返回给信号量。释放许可证,将可用许可证的数量增加一。如果任何线程试图获得许可,则选择一个线程并给予刚刚释放的许可。该线程(重新)启用用于线程调度目的。不要求释放许可的线程必须通过调用{@link acquire} 获得该许可。信号量的正确使用是通过应用程序中的编程约定建立的。

    这句话的意思就是说,release方法仅仅只是把许可证数量加一,在release之前不需要对应的信号量去执行acquire,那么这段代码的含义就可以理解为减少可用连接的数量,增加已用连接的数量,因为当一个线程持有连接之后可用连接应该-1,而已用的连接数应该+1,当一个线程执行完毕业务之后应该将已用连接-1,可用连接+1。

    需要注意的是:Semaphore只是对资源并发访问的线程数进行监控,并不会保证线程安全。

    # 参考文章

    https://www.cnblogs.com/dolphin0520/p/3920397.html https://www.jianshu.com/p/0d53a643a60c

    编辑 (opens new window)
    #Java#JUC
    上次更新: 2023/05/13, 18:05:21
    CyclicBarrier使用方法
    CompletableFuture常用用法及踩坑

    ← CyclicBarrier使用方法 CompletableFuture常用用法及踩坑→

    最近更新
    01
    SpringCache基本配置类
    05-16
    02
    DSTransactional与Transactional事务混用死锁场景分析
    03-04
    03
    Rpamis-security-原理解析
    12-13
    更多文章>
    Theme by Vdoing | Copyright © 2018-2024 benym | MIT License
     |   |   | 
    渝ICP备18012574号 | 渝公网安备50010902502537号
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式