RSocket协议和基本使用
RSocket github地址:https://github.com/rsocket
RSocket官网地址:https://rsocket.io/
RSocket中文站点:http://rsocketbyexample.info/
RSocket是一种新兴的通信协议,它是一种二进制的异步的通信协议,主要由Facebook、Netifi和Pivotal等工程师联合开发。
基于 Reactive Streams 规范具有异步、背压支持、多路复用、基于消息通讯、可扩展等特性,同时提供了 Java、JavaScript、Python、C ++、Golang、Rust 各种语言 SDK 实现,方便应用快速接入 RSocket 协议。
关于Reactive Streams规范,可以参考:
它在通信系统分层中,位于OSI七层模型的第七层-应用层。所以从这个角度上来看,RSocket可以看成是HTTP协议的一个替代和补充。RSocket协议可以运行在TCP、WebSocket以及Aeron之上。
为什么会有RSocket协议呢,主要还是因为现在常见的HTTP REST API/RPC是基于request/response架构的,除了性能上不够好意外,还是可以基本满足需求的。但是对于Streaming、Event Driven、IoT 和双向通讯,这种架构模式就不能满足需求了。
RSocket有什么特点?
RSocket是一种协议,目前支持的编程语言包括:Java、JS、C++、go、.NET、Swift等等
RSocket的Github地址:https://github.com/rsocket
相较于HTTP这种只支持request/response的通讯方式,RSocket支持四种不同的通讯方式:
- Request/Response 模型:发出一个请求,就必须等待一个响应。
- Request/Stream 模型:发出单个请求,可以接受多个响应。(pub/sub就算其中的一种,参考Request/Stream - Pub/Sub)
- Fire-and-Forget 模型:单向请求,发出一个请求后,不接受响应(打点采集、日志传输、metrics上报等)。
- Channel(Bi-direction)模型:双向通信,可以发出多个请求,也可以接受多个请求(类似在线聊天)
以前的协议都是针对特定领域的问题的,另一个场景可能就完全无法适用了。RSocket在设计上就很好的支持了多种复杂场景下的通信问题。
RSocket-Java的实现
rsocket-java的github地址:https://github.com/rsocket/rsocket-java
Maven 实现库名称 | 底层实现 | 支持协议 |
---|---|---|
rsocket-transport-netty | Reactor Netty | TCP 和 WebSocket |
rsocket-transport-akka | Akka | TCP 和 WebSocket |
rsocket-transport-aeron | Aeron | UDP |
其中io.rsocket.RSocket
定义了RSocket协议的基本模型。https://github.com/rsocket/rsocket/blob/master/Protocol.md
public interface RSocket extends Availability, Closeable {
Mono<Payload> requestResponse(Payload payload);
Mono<Void> fireAndForget(Payload payload);
Flux<Payload> requestStream(Payload payload);
Flux<Payload> requestChannel(Publisher<Payload> payloads);
Mono<Void> metadataPush(Payload payload);
default double availability() {
return isDisposed() ? 0.0 : 1.0;
}
equestResponse
、fireAndForget
、requestStream
和requestChannel
就是前面我们提到的RSocket支持的四种通讯方式,metadataPush
主要是用来推送元数据(类似zooKeeper向client推送节点变更),进一步了解可以参考:metadataPush - 元信息推送
availability
主要用来表示当前可用性。0.0表示不可用,1.0表示完全可用。这个参数在load balance的情况下非常实用。
Payload
就是通信消息了,它主要由两个部分组成:metadata、data。
- data一般指应用本身需要传递的业务数据
- metadata可以采用网络基础的
官方关于data和metadata的解释:https://rsocket.io/docs/Protocol#data-and-metadata
目前,Spring framework 5.2版本内置了RSocket,Spring boot 2.2.0版本也支持了RSocket,但是目前商业应用产品还很少,如:Netifi Broker。阿里开源了一款基于RSocket协议的中间件产品:Alibaba RSocket Broker。
rsocket-java直接使用Demo
引入相关依赖:
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>1.1.0</version>
</dependency>
request-response模型demo
public static void main(String[] args) throws InterruptedException {
RSocket rSocket = new RSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
log.info("receive request from client,data:{},metadata:{},", payload.getDataUtf8(), payload.getMetadataUtf8());
return Mono.just(DefaultPayload.create("response >> " + payload.getDataUtf8()));
}
};
RSocketServer.create(SocketAcceptor.with(rSocket))
.bind(TcpServerTransport.create("localhost", 7000))
.block();
RSocket clientRSocket =
RSocketConnector.create()
// Enable Zero Copy
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(TcpClientTransport.create("localhost", 7000))
.block();
clientRSocket.requestResponse(DefaultPayload.create("hello"))
.map(Payload::getDataUtf8)
.doOnNext(log::info)
.block();
Thread.sleep(5000L);
rSocket.dispose();
clientRSocket.dispose();
}
request-stream模型demo
public static void main(String[] args) throws InterruptedException {
RSocket rSocket = new RSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
log.info("receive request from client,data:{},metadata:{},", payload.getDataUtf8(), payload.getMetadataUtf8());
List<String> response = Lists.newArrayList("Hello1", "Hello2", "Hello3");
return Flux.fromStream(response.stream().map(DefaultPayload::create));
}
};
RSocketServer.create(SocketAcceptor.with(rSocket))
.bind(TcpServerTransport.create("localhost", 7000))
.block();
RSocket clientRSocket =
RSocketConnector.create()
// Enable Zero Copy
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(TcpClientTransport.create("localhost", 7000))
.block();
clientRSocket.requestStream(DefaultPayload.create("hello"))
.map(Payload::getDataUtf8)
.doOnNext(log::info)
.blockLast();
Thread.sleep(5000L);
rSocket.dispose();
clientRSocket.dispose();
}
fire-and-forget模型demo
public static void main(String[] args) throws InterruptedException {
RSocket rSocket = new RSocket() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
log.info("receive request from client,data:{},metadata:{},", payload.getDataUtf8(), payload.getMetadataUtf8());
return Mono.empty();
}
};
RSocketServer.create(SocketAcceptor.with(rSocket))
.bind(TcpServerTransport.create("localhost", 7000))
.block();
RSocket clientRSocket =
RSocketConnector.create()
// Enable Zero Copy
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(TcpClientTransport.create("localhost", 7000))
.block();
clientRSocket.fireAndForget(DefaultPayload.create("hello"))
.block();
Thread.sleep(5000L);
rSocket.dispose();
clientRSocket.dispose();
}
request-channel模型demo
public static void main(String[] args) throws InterruptedException {
RSocket rSocket = new RSocket() {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads).flatMap(payload -> {
log.info("receive request from client,data:{},metadata:{},", payload.getDataUtf8(), payload.getMetadataUtf8());
return Flux.fromStream(
payload.getDataUtf8().codePoints().mapToObj(c -> String.valueOf((char) c))
.map(DefaultPayload::create));
});
}
};
RSocketServer.create(SocketAcceptor.with(rSocket))
.bind(TcpServerTransport.create("localhost", 7000))
.block();
RSocket clientRSocket =
RSocketConnector.create()
// Enable Zero Copy
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(TcpClientTransport.create("localhost", 7000))
.block();
clientRSocket.requestChannel(Flux.just("hello", "world", "123").map(DefaultPayload::create))
// clientRSocket.requestChannel(Flux.just(EmptyPayload.INSTANCE))
.map(Payload::getDataUtf8)
.doOnNext(log::info)
.doOnError(consumer -> {
log.error("some error occurred,{}", consumer);
})
.blockLast();
Thread.sleep(5000L);
rSocket.dispose();
clientRSocket.dispose();
}
rsocket-java整合Spring Boot使用Demo
参考资料:
- Spring RSocket Documents
- Getting Started With RSocket: Spring Boot Server
- Getting Started With RSocket: Spring Boot Channels
spring-boot已经提供了rsocket的starter,所以首先在client端和server端引入相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
Server端
然后在server端配置传输方式&端口,增加以下配置:
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp
新建请求处理类Controller,并定义相关处理方法:
@Slf4j
@Controller
public class ServerController {
@MessageMapping("/request-response")
public Mono<String> requestResponse(String request) {
log.info("receive request:{}", request);
return Mono.just("Hello " + request);
}
@MessageMapping("/fire-and-forget")
public Mono<Void> fireAndForget(String request) {
log.info("receive request:{}", request);
return Mono.empty();
}
@MessageMapping("/request-stream")
public Flux<String> requestStream(String request) {
log.info("receive request:{}", request);
return Flux.just("hello", request, "welcome");
}
@MessageMapping("/request-channel")
Flux<String> requestChannel(Flux<String> request) {
return request
.doOnNext(record -> log.info("record is {}.", record))
.map(record -> "response from server to " + record)
.log();
}
@MessageExceptionHandler
public Mono<String> handleException(Exception e) {
return Mono.just(e.getMessage());
}
}
Client端
和服务端一样,配置传输方式,因为此处为client,不需要定义对应端口
spring.rsocket.server.transport=tcp
##避免和server端端口冲突
server.port=8081
配置客户端,设置encoder和decoder、请求格式、重试策略、连接端口。
@Bean
public RSocketStrategies rSocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2JsonEncoder()))
.decoders(decoders -> decoders.add(new Jackson2JsonDecoder()))
.build();
}
@Bean
public RSocketRequester getRSocketRequester(RSocketRequester.Builder builder) {
return builder
.rsocketConnector(rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
.dataMimeType(MediaType.APPLICATION_JSON)
.transport(TcpClientTransport.create(7000));
}
对于除request-channel
方式以外的请求,我们用http请求的方式触发:
@Slf4j
@RestController
public class ClientController {
@Autowired
private RSocketRequester rSocketRequester;
@GetMapping(value = "/requestResponse/{message}")
public Mono<String> requestResponse(@PathVariable("message") String message) {
log.info("send request:{}", message);
return rSocketRequester
.route("/request-response")
.data(message)
.retrieveMono(String.class);
}
@GetMapping(value = "/fireAndForget/{message}")
public Mono<Void> fireAndForget(@PathVariable("message") String message) {
log.info("send request:{}", message);
return rSocketRequester
.route("/fire-and-forget")
.data(message)
.send();
}
@GetMapping(value = "/requestStream/{message}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> requestStream(@PathVariable("message") String message) {
return rSocketRequester
.route("/request-stream")
.data(message)
.retrieveFlux(String.class);
}
@GetMapping(value = "/requestChannel")
public void requestChannel() {
Mono<String> setting1 = Mono.just("Hello1");
Mono<String> setting2 = Mono.just("Hello2").delayElement(Duration.ofSeconds(2));
Mono<String> setting3 = Mono.just("Hello3").delayElement(Duration.ofSeconds(5));
Flux<String> settings = Flux.concat(setting1, setting2, setting3)
.doOnNext(d -> log.info("Sending request of {}.", d));
this.rSocketRequester
.route("request-channel")
.data(settings)
.retrieveFlux(String.class)
.subscribe(data -> log.info("Received: {} \n(Type 's' to stop.)", data));
}
}
对于request-channel
模式的,我们用shell的方式:
@Slf4j
@ShellComponent
public class ClientShell {
@Autowired
private RSocketRequester rSocketRequester;
@ShellMethod("request channel")
public void channel() {
log.info("start request channel to server");
Mono<String> setting1 = Mono.just("Hello1");
Mono<String> setting2 = Mono.just("Hello2").delayElement(Duration.ofSeconds(5));
Mono<String> setting3 = Mono.just("Hello3").delayElement(Duration.ofSeconds(15));
Flux<String> settings = Flux.concat(setting1, setting2, setting3)
.doOnNext(d -> log.info("\nSending setting for a {}-second interval.\n", d));
this.rSocketRequester
.route("/request-channel")
.data(settings)
.retrieveFlux(String.class)
.subscribe(message -> log.info("Received: {}", message));
}
}