Semaphores bằng Java

1. Khái quát chung

Trong hướng dẫn nhanh này, chúng ta sẽ khám phá những kiến ​​thức cơ bản về semaphores và mutexes trong Java.

2. Semaphore

Chúng ta sẽ bắt đầu với java.util.concurrent.Semaphore. Chúng ta có thể sử dụng semaphores để giới hạn số lượng các luồng đồng thời truy cập một tài nguyên cụ thể.

Trong ví dụ sau, chúng tôi sẽ triển khai một hàng đợi đăng nhập đơn giản để giới hạn số lượng người dùng trong hệ thống:

class LoginQueueUsingSemaphore { private Semaphore semaphore; public LoginQueueUsingSemaphore(int slotLimit) { semaphore = new Semaphore(slotLimit); } boolean tryLogin() { return semaphore.tryAcquire(); } void logout() { semaphore.release(); } int availableSlots() { return semaphore.availablePermits(); } }

Lưu ý cách chúng tôi sử dụng các phương pháp sau:

  • tryAcquire () - trả về true nếu có giấy phép ngay lập tức và có được nó nếu không thì trả về false, nhưng get () lấy lại giấy phép và chặn cho đến khi có giấy phép
  • release () - phát hành giấy phép
  • sẵn cóPermits () - trả lại số lượng giấy phép hiện tại có sẵn

Để kiểm tra hàng đợi đăng nhập của chúng tôi, trước tiên chúng tôi sẽ cố gắng đạt đến giới hạn và kiểm tra xem lần đăng nhập tiếp theo có bị chặn hay không:

@Test public void givenLoginQueue_whenReachLimit_thenBlocked() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); assertFalse(loginQueue.tryLogin()); }

Tiếp theo, chúng tôi sẽ xem liệu có vị trí nào sau khi đăng xuất không:

@Test public void givenLoginQueue_whenLogout_thenSlotsAvailable() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); loginQueue.logout(); assertTrue(loginQueue.availableSlots() > 0); assertTrue(loginQueue.tryLogin()); }

3. Semaphore hẹn giờ

Tiếp theo, chúng ta sẽ thảo luận về Apache Commons TimedSemaphore. TimedSemaphore cho phép một số giấy phép như một Semaphore đơn giản nhưng trong một khoảng thời gian nhất định, sau khoảng thời gian này, thời gian đặt lại và tất cả các giấy phép được giải phóng.

Chúng ta có thể sử dụng TimedSemaphore để xây dựng một hàng đợi trễ đơn giản như sau:

class DelayQueueUsingTimedSemaphore { private TimedSemaphore semaphore; DelayQueueUsingTimedSemaphore(long period, int slotLimit) { semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit); } boolean tryAdd() { return semaphore.tryAcquire(); } int availableSlots() { return semaphore.getAvailablePermits(); } }

Khi chúng tôi sử dụng hàng đợi trễ với khoảng thời gian là một giây và sau khi sử dụng tất cả các vị trí trong vòng một giây, không có vị trí nào sẽ khả dụng:

public void givenDelayQueue_whenReachLimit_thenBlocked() { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); assertFalse(delayQueue.tryAdd()); }

Nhưng sau khi ngủ một thời gian, semaphore sẽ đặt lại và giải phóng các giấy phép :

@Test public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); Thread.sleep(1000); assertTrue(delayQueue.availableSlots() > 0); assertTrue(delayQueue.tryAdd()); }

4. Semaphore so với Mutex

Mutex hoạt động tương tự như semaphore nhị phân, chúng ta có thể sử dụng nó để thực hiện loại trừ lẫn nhau.

Trong ví dụ sau, chúng ta sẽ sử dụng một semaphore nhị phân đơn giản để tạo bộ đếm:

class CounterUsingMutex { private Semaphore mutex; private int count; CounterUsingMutex() { mutex = new Semaphore(1); count = 0; } void increase() throws InterruptedException { mutex.acquire(); this.count = this.count + 1; Thread.sleep(1000); mutex.release(); } int getCount() { return this.count; } boolean hasQueuedThreads() { return mutex.hasQueuedThreads(); } }

Khi nhiều chuỗi cố gắng truy cập bộ đếm cùng một lúc, chúng sẽ đơn giản bị chặn trong một hàng đợi :

@Test public void whenMutexAndMultipleThreads_thenBlocked() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); }

Khi chúng ta đợi, tất cả các luồng sẽ truy cập vào bộ đếm và không còn luồng nào trong hàng đợi:

@Test public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); Thread.sleep(5000); assertFalse(counter.hasQueuedThreads()); assertEquals(count, counter.getCount()); }

5. Kết luận

Trong bài viết này, chúng ta đã khám phá những kiến ​​thức cơ bản về semaphores trong Java.

Như mọi khi, mã nguồn đầy đủ có sẵn trên GitHub.