Giới thiệu về RSocket

1. Giới thiệu

Trong hướng dẫn này, chúng ta sẽ xem xét đầu tiên về RSocket và cách nó cho phép giao tiếp máy khách-máy chủ.

2. RSocket là gì?

RSocket là một giao thức truyền thông điểm-điểm, nhị phân được thiết kế để sử dụng trong các ứng dụng phân tán. Theo nghĩa đó, nó cung cấp một giải pháp thay thế cho các giao thức khác như HTTP.

So sánh đầy đủ giữa RSocket và các giao thức khác nằm ngoài phạm vi của bài viết này. Thay vào đó, chúng tôi sẽ tập trung vào một tính năng chính của RSocket: các mô hình tương tác của nó.

RSocket cung cấp bốn mô hình tương tác. Với ý nghĩ đó, chúng tôi sẽ khám phá từng cái với một ví dụ.

3. Sự phụ thuộc của Maven

RSocket chỉ cần hai phụ thuộc trực tiếp cho các ví dụ của chúng tôi:

 io.rsocket rsocket-core 0.11.13   io.rsocket rsocket-transport-netty 0.11.13 

Các phụ thuộc rsocket-core và rsocket-transport-netty có sẵn trên Maven Central.

Một lưu ý quan trọng là thư viện RSocket sử dụng thường xuyên các luồng phản ứng . Các lớp FluxMono được sử dụng trong suốt bài viết này vì vậy hiểu cơ bản về chúng sẽ rất hữu ích.

4. Thiết lập máy chủ

Đầu tiên, hãy tạo lớp Máy chủ :

public class Server { private final Disposable server; public Server() { this.server = RSocketFactory.receive() .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl())) .transport(TcpServerTransport.create("localhost", TCP_PORT)) .start() .subscribe(); } public void dispose() { this.server.dispose(); } private class RSocketImpl extends AbstractRSocket {} }

Ở đây chúng tôi sử dụng RSocketFactory để thiết lập và lắng nghe một ổ cắm TCP. Chúng tôi chuyển RSocketImpl tùy chỉnh của mình để xử lý các yêu cầu từ khách hàng. Chúng tôi sẽ thêm các phương thức vào RSocketImpl khi chúng tôi tiếp tục.

Tiếp theo, để khởi động máy chủ, chúng ta chỉ cần khởi tạo nó:

Server server = new Server();

Một phiên bản máy chủ duy nhất có thể xử lý nhiều kết nối . Do đó, chỉ một phiên bản máy chủ sẽ hỗ trợ tất cả các ví dụ của chúng tôi.

Khi chúng ta hoàn tất, phương pháp xử lý sẽ dừng máy chủ và giải phóng cổng TCP.

4. Mô hình tương tác

4.1. Yêu cầu / Phản hồi

RSocket cung cấp một mô hình yêu cầu / phản hồi - mỗi yêu cầu nhận được một phản hồi duy nhất.

Đối với mô hình này, chúng tôi sẽ tạo một dịch vụ đơn giản trả lại thông báo cho khách hàng.

Hãy bắt đầu bằng cách thêm một phương thức vào phần mở rộng của AbstractRSocket, RSocketImpl :

@Override public Mono requestResponse(Payload payload) { try { return Mono.just(payload); // reflect the payload back to the sender } catch (Exception x) { return Mono.error(x); } }

Các requestResponse phương thức trả về một kết quả duy nhất cho mỗi yêu cầu , như chúng ta có thể nhìn thấy bằng các Mono loại phản ứng.

Payload là lớp chứa nội dung tin nhắn và siêu dữ liệu . Nó được sử dụng bởi tất cả các mô hình tương tác. Nội dung của tải trọng là nhị phân, nhưng có các phương pháp tiện lợi hỗ trợnội dung dựa trên chuỗi .

Tiếp theo, chúng ta có thể tạo lớp khách hàng của mình:

public class ReqResClient { private final RSocket socket; public ReqResClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); } public String callBlocking(String string) { return socket .requestResponse(DefaultPayload.create(string)) .map(Payload::getDataUtf8) .block(); } public void dispose() { this.socket.dispose(); } }

Máy khách sử dụng phương thức RSocketFactory.connect () để bắt đầu kết nối ổ cắm với máy chủ. Chúng tôi sử dụng phương thức requestResponse trên socket để gửi tải trọng đến máy chủ .

Tải trọng của chúng tôi chứa Chuỗi được chuyển vào máy khách. Khi Monophản hồi đến, chúng ta có thể sử dụng phương thức getDataUtf8 () để truy cập nội dung Chuỗi của phản hồi.

Cuối cùng, chúng tôi có thể chạy kiểm tra tích hợp để xem yêu cầu / phản hồi đang hoạt động. Chúng tôi sẽ gửi một Chuỗi đến máy chủ và xác minh rằng cùng một Chuỗi được trả về:

@Test public void whenSendingAString_thenRevceiveTheSameString() { ReqResClient client = new ReqResClient(); String string = "Hello RSocket"; assertEquals(string, client.callBlocking(string)); client.dispose(); }

4.2. Lửa và Quên

Với mô hình fire-and-forget, máy khách sẽ không nhận được phản hồi từ máy chủ .

Trong ví dụ này, máy khách sẽ gửi các phép đo mô phỏng đến máy chủ trong khoảng thời gian 50ms. Máy chủ sẽ xuất bản các phép đo.

Hãy thêm một trình xử lý fire-and-forget vào máy chủ của chúng tôi trong lớp RSocketImpl :

@Override public Mono fireAndForget(Payload payload) { try { dataPublisher.publish(payload); // forward the payload return Mono.empty(); } catch (Exception x) { return Mono.error(x); } }

Trình xử lý này trông rất giống với trình xử lý yêu cầu / phản hồi. Tuy nhiên, fireAndForget trả về Mono thay vì Mono .

Các dataPublisher là một thể hiện của org.reactivestreams.Publisher . Do đó, nó làm cho tải trọng có sẵn cho người đăng ký. Chúng tôi sẽ sử dụng điều đó trong ví dụ yêu cầu / luồng.

Tiếp theo, chúng tôi sẽ tạo ứng dụng khách có thể quên:

public class FireNForgetClient { private final RSocket socket; private final List data; public FireNForgetClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); } /** Send binary velocity (float) every 50ms */ public void sendData() { data = Collections.unmodifiableList(generateData()); Flux.interval(Duration.ofMillis(50)) .take(data.size()) .map(this::createFloatPayload) .flatMap(socket::fireAndForget) .blockLast(); } // ... }

Việc thiết lập ổ cắm giống hệt như trước đây.

Phương thức sendData () sử dụng một luồng Flux để gửi nhiều tin nhắn. Đối với mỗi thông báo, chúng tôi gọi socket :: fireAndForget .

Chúng ta cần đăng ký phản hồi Mono cho mỗi tin nhắn . Nếu chúng ta quên đăng ký thì socket :: fireAndForget sẽ không thực thi.

The flatMap operator makes sure the Void responses are passed to the subscriber, while the blockLast operator acts as the subscriber.

We're going to wait until the next section to run the fire-and-forget test. At that point, we'll create a request/stream client to receive the data that was pushed by the fire-and-forget client.

4.3. Request/Stream

In the request/stream model, a single request may receive multiple responses. To see this in action we can build upon the fire-and-forget example. To do that, let's request a stream to retrieve the measurements we sent in the previous section.

As before, let's start by adding a new listener to the RSocketImpl on the server:

@Override public Flux requestStream(Payload payload) { return Flux.from(dataPublisher); }

The requestStream handler returns a Flux stream. As we recall from the previous section, the fireAndForget handler published incoming data to the dataPublisher. Now, we'll create a Flux stream using that same dataPublisher as the event source. By doing this the measurement data will flow asynchronously from our fire-and-forget client to our request/stream client.

Let's create the request/stream client next:

public class ReqStreamClient { private final RSocket socket; public ReqStreamClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); } public Flux getDataStream() { return socket .requestStream(DefaultPayload.create(DATA_STREAM_NAME)) .map(Payload::getData) .map(buf -> buf.getFloat()) .onErrorReturn(null); } public void dispose() { this.socket.dispose(); } }

We connect to the server in the same way as our previous clients.

In getDataStream()we use socket.requestStream() to receive a Flux stream from the server. From that stream, we extract the Float values from the binary data. Finally, the stream is returned to the caller, allowing the caller to subscribe to it and process the results.

Now let's test. We'll verify the round trip from fire-and-forget to request/stream.

We can assert that each value is received in the same order as it was sent. Then, we can assert that we receive the same number of values that were sent:

@Test public void whenSendingStream_thenReceiveTheSameStream() { FireNForgetClient fnfClient = new FireNForgetClient(); ReqStreamClient streamClient = new ReqStreamClient(); List data = fnfClient.getData(); List dataReceived = new ArrayList(); Disposable subscription = streamClient.getDataStream() .index() .subscribe( tuple -> { assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2()); dataReceived.add(tuple.getT2()); }, err -> LOG.error(err.getMessage()) ); fnfClient.sendData(); // ... dispose client & subscription assertEquals("Wrong data count received", data.size(), dataReceived.size()); }

4.4. Channel

The channel model provides bidirectional communication. In this model, message streams flow asynchronously in both directions.

Let's create a simple game simulation to test this. In this game, each side of the channel will become a player. As the game runs, these players will send messages to the other side at random time intervals. The opposite side will react to the messages.

Firstly, we'll create the handler on the server. Like before, we add to the RSocketImpl:

@Override public Flux requestChannel(Publisher payloads) { Flux.from(payloads) .subscribe(gameController::processPayload); return Flux.from(gameController); }

The requestChannel handler has Payload streams for both input and output. The Publisher input parameter is a stream of payloads received from the client. As they arrive, these payloads are passed to the gameController::processPayload function.

In response, we return a different Flux stream back to the client. This stream is created from our gameController, which is also a Publisher.

Here is a summary of the GameController class:

public class GameController implements Publisher { @Override public void subscribe(Subscriber subscriber) { // send Payload messages to the subscriber at random intervals } public void processPayload(Payload payload) { // react to messages from the other player } }

When the GameController receives a subscriber it begins sending messages to that subscriber.

Next, let's create the client:

public class ChannelClient { private final RSocket socket; private final GameController gameController; public ChannelClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); this.gameController = new GameController("Client Player"); } public void playGame() { socket.requestChannel(Flux.from(gameController)) .doOnNext(gameController::processPayload) .blockLast(); } public void dispose() { this.socket.dispose(); } }

As we have seen in our previous examples, the client connects to the server in the same way as the other clients.

The client creates its own instance of the GameController.

We use socket.requestChannel() to send our Payload stream to the server. The server responds with a Payload stream of its own.

Khi tải trọng nhận được từ máy chủ, chúng tôi chuyển chúng đến trình xử lý gameController :: processPayload .

Trong mô phỏng trò chơi của chúng tôi, máy khách và máy chủ là hình ảnh phản chiếu của nhau. Tức là mỗi bên đang gửi một luồng Tải trọng và nhận một luồng Tải trọng từ đầu bên kia .

Các luồng chạy độc lập, không đồng bộ hóa.

Cuối cùng, hãy chạy mô phỏng trong một bài kiểm tra:

@Test public void whenRunningChannelGame_thenLogTheResults() { ChannelClient client = new ChannelClient(); client.playGame(); client.dispose(); }

5. Kết luận

Trong bài viết giới thiệu này, chúng tôi đã khám phá các mô hình tương tác do RSocket cung cấp. Bạn có thể tìm thấy mã nguồn đầy đủ của các ví dụ trong kho lưu trữ Github của chúng tôi.

Hãy nhớ xem trang web RSocket để thảo luận sâu hơn. Đặc biệt, các tài liệu Câu hỏi thường gặp và Động lực cung cấp một nền tảng tốt.