blog

Spring Boot 2.X実践編 - WebFulxリアクティブプログラミング

Spring 5の最も重要なアップデートは、Reactiveプログラミングのサポートです。 Reactiveプログラミングはノンブロッキングなので、ビジネス処理の完了を待つ必要がなく、サーバーリソース...

Nov 30, 2020 · 7 min. read
シェア

ソースコードのリポジトリ

Spring 5の最も重要なアップデートは、リアクティブプログラミングのサポートです。リアクティブプログラミングはノンブロッキングであり、ビジネスプロセスの完了を待ってブロックする必要がないことを意味します。低レイテンシ、高スループットのプロジェクトに最適です。ノンブロッキング、非同期、弾力性があり、イベント駆動型のエンタープライズクラスのサービスを構築するために使用できます。

Spring WebFluxはSpring 5の新しいフレームワークで、機能的なReactive Webの開発のためのフレームワークです、より多くのタスクを完了するために、より少ないリソースを使用することができます、システム効率を向上させる、より多くの同時接続を処理することができます、大幅にシステムの処理能力を向上させます。

ここでは、SpringBootとWebFluxを統合してシンプルな機能開発を実現する方法を簡単に紹介します。

リアクティブ・プログラミングは、 データフローと 変更配信に 基づく 宣言型プログラミング・パラダイムです。

Spring WebFluxの紹介

WebFluxは、Servlet 3.1+をサポートするコンテナや、非同期処理をサポートするコンテナ上で動作する必要があります。

WebFlux 関数モジュール

WebFluxは3つの主要なモジュールで構成されています:

ルータ関数

// 関数型プログラミングでAPIインターフェースを作成し、APIインターフェースのパス、リクエストメソッド、ビジネスハンドラを指定する。
@Bean
public RouterFunction<ServerResponse> routerHandlerConfig() {
 return RouterFunctions.route(GET("/helloWebflux"),
 routerHandler::helloWebflux);
}

, Spring WebFlux

WebFlux は、 レスポンシブ・プログラミングを サポートするために、上流と下流のコンポーネントを調整するコアコンポーネントです。 WebFlux は、データストリームを Mono または Flux フォーマットにカプセル化し、統一的な処理を行います。 Mono と Flux は イベントパブリッシャーであり、イベントが発生すると対応するメソッドをコールバックしてクライアントに通知します。

  • Mono 単一アイテムの処理
  • Flux 複数項目の処理

リアクティブ・ストリーム

WebFluxはデフォルトでReactorと統合されています。

はじめに

他のSpringフレームワークと同様に、WebFluxはSpringBootのためのすぐに使えるスターターを提供します。

新しいプロジェクトを作成し、 Spring Reactive Web 依存モジュールの導入に注意し、以下の依存関係を導入します:

dependencies {
 implementation 'org.springframework.boot:spring-boot-starter-webflux'
 testImplementation('org.springframework.boot:spring-boot-starter-test') {
 exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
 }
 testImplementation 'io.projectreactor:reactor-test'
}

または;;。

 <dependencies>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-webflux</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 <exclusions>
 <exclusion>
 <groupId>org.junit.vintage</groupId>
 <artifactId>junit-vintage-engine</artifactId>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupId>io.projectreactor</groupId>
 <artifactId>reactor-test</artifactId>
 <scope>test</scope>
 </dependency>
 </dependencies>

MyMessages.java

public class MyMessages {
 private String status;
 private String message;
 // ゲッター・セッター・コンストラクタを省略する

プロジェクトを実行すると、WebFlux がデフォルトで Netty コンテナを使用していることがわかります:

netty.NettyWebServer : Netty started on port(s): 8080

ルータ関数

@Component
public class RouterHandler {
 public Mono<ServerResponse> helloWebflux(ServerRequest request) {
 // リクエストからデータを取得する
 // リターンコードを200 okに設定する
 return ServerResponse.ok()
 // 戻り値の形式をUTF8 JSONに設定する
 .contentType(MediaType.APPLICATION_JSON_UTF8)
 // 返されるボディの内容を設定する
 .body(Mono.just(new MyMessages("OK", "From WebFlux ! From " + request.path())),
 MyMessages.class);
 }
}

Routerを使ってHandlerをapiにバインドします:

@Configuration
public class RouterConfig {
 private final RouterHandler routerHandler;
 @Autowired
 public RouterConfig(RouterHandler routerHandler) {
 this.routerHandler = routerHandler;
 }
 @Bean
 public RouterFunction<ServerResponse> routerHandlerConfig() {
 // ルーターのパスを /helloWebflux に、ハンドラーのパスを helloWebflux() に設定する。
 return RouterFunctions.route(GET("/helloWebflux"),
 routerHandler::helloWebflux);
 // .filter() インターセプターを追加し、andRoute()でさらにルートを追加する。
 }
}

プロジェクトを実行するには、ブラウザを使って次のサイトにアクセスします:

{"status":"OK","message":"From WebFlux ! From /helloWebflux"}

インターセプター RouterFilter.java を追加します。

@Component
class RouterFilter implements HandlerFilterFunction<ServerResponse, ServerResponse> {
 @Override
 public Mono<ServerResponse> filter(ServerRequest serverRequest,
 HandlerFunction<ServerResponse> handlerFunction) {
 // TODO return nextによる判定処理の遮断.handle(request);
 // 以下の例は
 return ServerResponse.status(UNAUTHORIZED).body(Mono.just("インターセプトされる"), String.class);
 }
}

RouterConfig.javaの修正

@Configuration
public class RouterConfig {
 @Resource RouterFilter filter; //  
 public RouterFunction<ServerResponse> routerHandlerConfig() {
 // フィルタを追加して
 return RouterFunctions.route(GET("/helloWebflux"),
 routerHandler::helloWebflux).filter(filter);
 }
}

プロジェクトを実行し、ブラウザで"http://localhost:8080"/helloWebfluxに アクセスすると、 401ステータスが "Intercepted "という文字列で返されていることがわかります 。

注釈付きAPI

@RestController
public class WebFluxController {
 @Resource
 WebFluxService service;
 @GetMapping(value = "/hello")
 public Mono<String> hello() {
 return Mono.just("Hello WebFlux By Controller");
 }

Spring MVCのアプローチと大差はありませんが、MonoとFluxの統一されたデータフローにデータフローをカプセル化しているだけです。 アクセスして出力を見てください。

データベースのクエリー操作をシミュレートし、新しい WebFluxService.java を作成します。

@Service
public class WebFluxService {
 public Flux<MyMessages> list() {
 MyMessages[] myMessages = new MyMessages[2];
 // TODO MySQLやその他のSQLデータベースは、現時点ではReactiveをサポートしていない。,
 // データを操作する方法はSpring Data JPAの部分だが、結果はMonoとFluxでカプセル化されている。
 myMessages[0] = new MyMessages("ok", "Message 1");
 myMessages[1] = new MyMessages("ok", "Message 2");
 // Fluxで複数のデータをカプセル化する
 return Flux.fromArray(myMessages);
 }
}

WebFluxController に以下の関数を追加します:

@Resource
WebFluxService service;
@GetMapping("/getList")
public Flux<MyMessages> getList() {
 return service.list();
}

プロジェクトを再起動し、 アクセスすると、以下の出力が表示されます:

[{"status":"ok","message":"Message 1"},{"status":"ok","message":"Message 2"}]

Router Functionを使っても、Spring MVCのアノテーションを使っても、効果は同じなので、私はアノテーションを使う方が好きです。しかし、実際の開発は、WebFluxのリアクティブプログラミングやアプリケーションを開発する伝統的な方法の合理的な選択は、実際の状況に基づいてする必要があり、WebFluxを使用して、リアクティブまたはアノテーションを使用する必要があります。

サーバー側のメッセージプッシュ

以前のアプリケーションでは、クライアント側のページのコンテンツを継続的に更新する必要があり、クライアント側からのリクエストを常に行い、非同期または同期のメソッドを使用してクライアント側のデータを更新する必要がありました。HTML 5 では、SSE と Websocket という 2 つの接続方法が導入されました。

SSE(サーバーサイド・プッシュ)とは、クライアントがリクエストを開始した後に維持されるコネクションのことで、サーバー側はそのコネクションを使用して継続的にクライアントにデータを送信します。WebSocketとは異なり、SSEは一方向通信です。

ここでは、WebFlux を使用して SSE を開発し、サーバーサイドからクライアントに継続的にデータをプッシュする方法を簡単に紹介します:

WebFluxController.java New

@GetMapping("/randomNumbers")
public Flux<ServerSentEvent<Integer>> randomNumbers() {
 // インターバル 1s
 return Flux.interval(Duration.ofSeconds(1))
 // ThreadLocalRandom.current().nextInt() 乱数を生成する
 .map(seq -> Tuples.of(seq, ThreadLocalRandom.current().nextInt()))
 .map(data -> ServerSentEvent.<Integer>builder()
 .event("ランダムメッセージ")
 .id(Long.toString(data.getT1()))
 .data(data.getT2())
 .build()
 );
}

ご覧ください。

id:0
event:ランダムなメッセージを送信する
data:1339192084
id:1
event:ランダムなメッセージを送信する
data:-672769364

短い

  • Router Function RESTfulなAPIの開発
  • MVCによる注釈付きRESTful API
  • SSE 簡単な例

Read next

ある日の大きなリート(最小経路和) 難易度:中 - Day20200723

非負整数を含むm×nの格子が与えられたとき、左上隅から右下隅まで、その経路上の数の和を最小にする経路を求めなさい。 一度に下か右に一歩しか進めません. | 3 | ... | 3 | ... | | 3 | ... | グリッド[i][j]

Nov 30, 2020 · 4 min read