美文网首页
Semaphore 的作用、应用场景和实战

Semaphore 的作用、应用场景和实战

作者: wuchao226 | 来源:发表于2021-04-13 16:04 被阅读0次

CyclicBarrier 的作用、应用场景和实战
CountDownLatch 的作用、应用场景和实战

概述

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

过程如下图所示:

Semaphore 常用方法

构造方法:

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
  • permits 表示许可线程的数量
  • fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程

常用方法:

public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength() 
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
protected Collection<Thread> getQueuedThreads()
  • acquire() 表示阻塞并获取许可
  • tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
  • release() 表示释放许可
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermit(int reduction):减少 reduction 个许可证
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合

Semaphore 使用场景

可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求, 要读取几万个文件的数据,因为都是 IO 密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有 10 个,这时我们必须控制只有 10 个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用 Semaphore 来做流量控制。

代码示例:

/**
 * 类说明:演示Semaphore用法,一个数据库连接池的实现
 */
public class DBPoolSemaphore {

    private final static int POOL_SIZE = 10;
    //两个指示器,分别表示池子还有可用连接和已用连接
    private final Semaphore useful, useless;
    //存放数据库连接的容器
    private static LinkedList<Connection> pool = new LinkedList<Connection>();

    //初始化池
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
    }

    public DBPoolSemaphore() {
        this.useful = new Semaphore(10);
        this.useless = new Semaphore(0);
    }

    /*归还连接*/
    public void returnConnect(Connection connection) throws InterruptedException {
        if (connection != null) {
            System.out.println("当前有" + useful.getQueueLength() + "个线程等待数据库连接!!"
                    + "可用连接数:" + useful.availablePermits());
            useless.acquire();
            synchronized (pool) {
                pool.addLast(connection);
            }
            useful.release();
        }
    }

    /*从池子拿连接*/
    public Connection takeConnect() throws InterruptedException {
        useful.acquire();
        Connection connection;
        synchronized (pool) {
            connection = pool.removeFirst();
        }
        useless.release();
        return connection;
    }
}
/**
 * 类说明:测试数据库连接池
 */
public class AppTest {

    private static DBPoolSemaphore dbPool = new DBPoolSemaphore();

    private static class BusiThread extends Thread {
        @Override
        public void run() {
            Random r = new Random();//让每个线程持有连接的时间不一样
            long start = System.currentTimeMillis();
            try {
                Connection connect = dbPool.takeConnect();
                System.out.println("Thread_" + Thread.currentThread().getId()
                        + "_获取数据库连接共耗时【" + (System.currentTimeMillis() - start) + "】ms.");
                SleepTools.ms(100 + r.nextInt(100));//模拟业务操作,线程持有连接查询数据
                System.out.println("查询数据完成,归还连接!");
                dbPool.returnConnect(connect);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            Thread thread = new BusiThread();
            thread.start();
        }
    }
}

打印结果:

Thread_12_获取数据库连接共耗时【0】ms.
Thread_14_获取数据库连接共耗时【0】ms.
Thread_17_获取数据库连接共耗时【0】ms.
Thread_15_获取数据库连接共耗时【0】ms.
Thread_18_获取数据库连接共耗时【0】ms.
Thread_20_获取数据库连接共耗时【0】ms.
Thread_13_获取数据库连接共耗时【0】ms.
Thread_16_获取数据库连接共耗时【0】ms.
Thread_11_获取数据库连接共耗时【0】ms.
Thread_19_获取数据库连接共耗时【0】ms.
查询数据完成,归还连接!
当前有5个线程等待数据库连接!!可用连接数:0
Thread_21_获取数据库连接共耗时【121】ms.
查询数据完成,归还连接!
当前有4个线程等待数据库连接!!可用连接数:0
Thread_22_获取数据库连接共耗时【133】ms.
查询数据完成,归还连接!
当前有3个线程等待数据库连接!!可用连接数:0
查询数据完成,归还连接!
当前有2个线程等待数据库连接!!可用连接数:0
Thread_23_获取数据库连接共耗时【145】ms.
Thread_24_获取数据库连接共耗时【144】ms.
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有1个线程等待数据库连接!!可用连接数:0
当前有1个线程等待数据库连接!!可用连接数:0
Thread_25_获取数据库连接共耗时【158】ms.
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:1
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:2
当前有0个线程等待数据库连接!!可用连接数:2
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:4
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:5
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:6
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:7
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:8
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:9

从打印结果可以看出,一次只有 10 个线程执行 acquire(),只有线程进行 release() 方法后才会有别的线程执行 acquire()。

注意: Semaphore 只是对资源并发访问的线程数进行监控,并不会保证线程安全。

相关文章

网友评论

      本文标题:Semaphore 的作用、应用场景和实战

      本文链接:https://www.haomeiwen.com/subject/sgcclltx.html