CyclicBarrier trong Java

1. Giới thiệu

CyclicBarriers là các cấu trúc đồng bộ hóa đã được giới thiệu với Java 5 như một phần của gói java.util.concurrent .

Trong bài viết này, chúng ta sẽ khám phá cách triển khai này trong một kịch bản đồng thời.

2. Java Concurrency - Synchronizers

Các java.util.concurrent gói chứa một số lớp học mà giúp đỡ quản lý một tập hợp các chủ đề mà cộng tác với nhau. Một số trong số này bao gồm:

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Người trao đổi
  • Semaphore
  • SynchronousQueue

Các lớp này cung cấp chức năng ngoài hộp cho các mẫu tương tác chung giữa các luồng.

Nếu chúng ta có một tập hợp các luồng giao tiếp với nhau và giống với một trong các mẫu phổ biến, chúng ta có thể chỉ cần sử dụng lại các lớp thư viện thích hợp (còn được gọi là Bộ đồng bộ hóa ) thay vì cố gắng đưa ra một lược đồ tùy chỉnh bằng cách sử dụng một bộ khóa và điều kiện các đối tượng và từ khóa được đồng bộ hóa .

Hãy tập trung vào CyclicBarrier trong tương lai.

3. CyclicBarrier

Một CyclicBarrier là một đồng bộ cho phép một loạt các chủ đề để chờ đợi nhau để đạt được một điểm thực hiện phổ biến, cũng được gọi là một rào cản .

CyclicBarriers được sử dụng trong các chương trình mà chúng ta có một số lượng cố định các luồng phải đợi nhau đạt đến một điểm chung trước khi tiếp tục thực thi.

Rào cản được gọi là tuần hoàn vì nó có thể được sử dụng lại sau khi các luồng chờ được giải phóng.

4. Cách sử dụng

Hàm tạo cho CyclicBarrier rất đơn giản. Nó cần một số nguyên duy nhất biểu thị số luồng cần gọi phương thức await () trên cá thể rào cản để biểu thị việc đạt đến điểm thực thi chung:

public CyclicBarrier(int parties)

Các luồng cần đồng bộ hóa việc thực thi của chúng cũng được gọi là các bên và việc gọi phương thức await () là cách chúng ta có thể đăng ký rằng một luồng nhất định đã đạt đến điểm rào cản.

Lời gọi này là đồng bộ và luồng gọi phương thức này sẽ tạm ngừng thực thi cho đến khi một số luồng cụ thể đã gọi cùng một phương thức trên rào cản. Tình huống này trong đó số luồng cần thiết được gọi là await () , được gọi là vấp rào cản .

Theo tùy chọn, chúng ta có thể truyền đối số thứ hai cho hàm tạo, đó là một cá thể Runnable . Điều này có logic sẽ được chạy bởi luồng cuối cùng vượt qua rào cản:

public CyclicBarrier(int parties, Runnable barrierAction)

5. Thực hiện

Để xem CyclicBarrier hoạt động, hãy xem xét tình huống sau:

Có một hoạt động mà một số luồng cố định thực hiện và lưu trữ các kết quả tương ứng trong một danh sách. Khi tất cả các luồng kết thúc thực hiện hành động của chúng, một trong số chúng (thường là luồng cuối cùng vượt qua rào cản) bắt đầu xử lý dữ liệu được tìm nạp bởi từng luồng này.

Hãy triển khai lớp chính nơi tất cả các hành động xảy ra:

public class CyclicBarrierDemo { private CyclicBarrier cyclicBarrier; private List
    
      partialResults = Collections.synchronizedList(new ArrayList()); private Random random = new Random(); private int NUM_PARTIAL_RESULTS; private int NUM_WORKERS; // ... }
    

Lớp này khá dễ hiểu - NUM_WORKERS là số luồng sẽ thực thi và NUM_PARTIAL_RESULTS là số kết quả mà mỗi luồng công nhân sẽ tạo ra.

Cuối cùng, chúng tôi có partialResults đó là một danh sách đó sẽ lưu trữ các kết quả của mỗi một trong các đề người lao động. Xin lưu ý rằng danh sách này là SynchronizedList vì nhiều luồng sẽ ghi vào nó cùng một lúc và phương thức add () không an toàn cho luồng trên ArrayList thuần túy .

Bây giờ hãy triển khai logic của từng luồng worker:

public class CyclicBarrierDemo { // ... class NumberCruncherThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); List partialResult = new ArrayList(); // Crunch some numbers and store the partial result for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) { Integer num = random.nextInt(10); System.out.println(thisThreadName + ": Crunching some numbers! Final result - " + num); partialResult.add(num); } partialResults.add(partialResult); try { System.out.println(thisThreadName + " waiting for others to reach barrier."); cyclicBarrier.await(); } catch (InterruptedException e) { // ... } catch (BrokenBarrierException e) { // ... } } } }

Bây giờ chúng ta sẽ triển khai logic chạy khi rào cản đã bị vấp.

Để đơn giản hóa mọi thứ, hãy thêm tất cả các số vào danh sách kết quả từng phần:

public class CyclicBarrierDemo { // ... class AggregatorThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); System.out.println( thisThreadName + ": Computing sum of " + NUM_WORKERS + " workers, having " + NUM_PARTIAL_RESULTS + " results each."); int sum = 0; for (List threadResult : partialResults) { System.out.print("Adding "); for (Integer partialResult : threadResult) { System.out.print(partialResult+" "); sum += partialResult; } System.out.println(); } System.out.println(thisThreadName + ": Final result = " + sum); } } }

Bước cuối cùng sẽ là xây dựng CyclicBarrier và bắt đầu mọi thứ bằng phương thức main () :

public class CyclicBarrierDemo { // Previous code public void runSimulation(int numWorkers, int numberOfPartialResults) { NUM_PARTIAL_RESULTS = numberOfPartialResults; NUM_WORKERS = numWorkers; cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread()); System.out.println("Spawning " + NUM_WORKERS + " worker threads to compute " + NUM_PARTIAL_RESULTS + " partial results each"); for (int i = 0; i < NUM_WORKERS; i++) { Thread worker = new Thread(new NumberCruncherThread()); worker.setName("Thread " + i); worker.start(); } } public static void main(String[] args) { CyclicBarrierDemo demo = new CyclicBarrierDemo(); demo.runSimulation(5, 3); } } 

Trong đoạn mã trên, chúng tôi đã khởi tạo rào cản tuần hoàn với 5 luồng mà mỗi luồng tạo ra 3 số nguyên như một phần của tính toán của chúng và lưu trữ giống nhau trong danh sách kết quả.

Khi rào cản bị vấp, luồng cuối cùng vấp phải rào cản sẽ thực thi logic được chỉ định trong AggregatorThread, cụ thể là - thêm tất cả các số được tạo bởi các luồng.

6. Kết quả

Đây là kết quả đầu ra từ một lần thực thi chương trình trên - mỗi lần thực thi có thể tạo ra các kết quả khác nhau vì các luồng có thể được tạo theo một thứ tự khác nhau:

Spawning 5 worker threads to compute 3 partial results each Thread 0: Crunching some numbers! Final result - 6 Thread 0: Crunching some numbers! Final result - 2 Thread 0: Crunching some numbers! Final result - 2 Thread 0 waiting for others to reach barrier. Thread 1: Crunching some numbers! Final result - 2 Thread 1: Crunching some numbers! Final result - 0 Thread 1: Crunching some numbers! Final result - 5 Thread 1 waiting for others to reach barrier. Thread 3: Crunching some numbers! Final result - 6 Thread 3: Crunching some numbers! Final result - 4 Thread 3: Crunching some numbers! Final result - 0 Thread 3 waiting for others to reach barrier. Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 0 Thread 2 waiting for others to reach barrier. Thread 4: Crunching some numbers! Final result - 9 Thread 4: Crunching some numbers! Final result - 3 Thread 4: Crunching some numbers! Final result - 5 Thread 4 waiting for others to reach barrier. Thread 4: Computing final sum of 5 workers, having 3 results each. Adding 6 2 2 Adding 2 0 5 Adding 6 4 0 Adding 1 1 0 Adding 9 3 5 Thread 4: Final result = 46 

Như đầu ra ở trên cho thấy, Luồng 4 là luồng vượt qua rào cản và cũng thực hiện logic tổng hợp cuối cùng. Nó cũng không cần thiết rằng các luồng thực sự phải chạy theo thứ tự mà chúng được bắt đầu như ví dụ trên cho thấy.

7. Kết luận

Trong bài viết này, chúng ta đã biết CyclicBarrier là gì và nó hữu ích trong những trường hợp nào.

Chúng tôi cũng thực hiện một kịch bản trong đó chúng tôi cần một số luồng cố định để đạt được điểm thực thi cố định, trước khi tiếp tục với logic chương trình khác.

Như mọi khi, bạn có thể tìm thấy mã cho hướng dẫn trên GitHub.