五月天青色头像情侣网名,国产亚洲av片在线观看18女人,黑人巨茎大战俄罗斯美女,扒下她的小内裤打屁股

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

(轉(zhuǎn)載)外行人都能看懂的WebFlux,錯過了血虧

2020-09-17 16:05 作者:觴翊の澤  | 我要投稿


原創(chuàng) Java3y Java3y 2019-11-16

前言

只有光頭才能變強(qiáng)。

文本已收錄至我的GitHub倉庫,歡迎Star:

https://github.com/ZhongFuCheng3y/3y

本文知識點(diǎn)架構(gòu):


如果有關(guān)注我公眾號文章的同學(xué)就會發(fā)現(xiàn),最近我不定時轉(zhuǎn)發(fā)了一些比較好的WebFlux的文章,因?yàn)槲易罱趯W(xué)。

我之前也說過,學(xué)習(xí)一項(xiàng)技術(shù)之前,先要了解為什么要學(xué)這項(xiàng)技術(shù)。其實(shí)這次學(xué)習(xí)WebFlux也沒有多大的原生動力,主要是在我們組內(nèi)會輪流做一次技術(shù)分享,而我又不知道分享什么比較好…

之前在初學(xué)大數(shù)據(jù)相關(guān)的知識,但是這一塊的時間線會拉得比較長,感覺趕不及小組內(nèi)分享(而組內(nèi)的同學(xué)又大部分都懂大數(shù)據(jù),就只有我一個菜雞,淚目)。所以,想的是:“要不我學(xué)點(diǎn)新東西搞搞?”。于是就花了點(diǎn)時間學(xué)WebFlux啦~

這篇文章主要講解什么是WebFlux,帶領(lǐng)大家入個門,希望對大家有所幫助(至少看完這篇文章,知道WebFlux是干嘛用的)

一、什么是WebFlux?

我們從Spring的官網(wǎng)拉下一點(diǎn)點(diǎn)就可以看到介紹WebFlux的地方了

從官網(wǎng)的簡介中我們能得出什么樣的信息?

  • 我們程序員往往根據(jù)不同的應(yīng)用場景選擇不同的技術(shù),有的場景適合用于同步阻塞的,有的場景適合用于異步非阻塞的。而Spring5提供了一整套響應(yīng)式(非阻塞)的技術(shù)棧供我們使用(包括Web控制器、權(quán)限控制、數(shù)據(jù)訪問層等等)。

  • 而左側(cè)的圖則是技術(shù)棧的對比啦;

  • 響應(yīng)式一般用Netty或者Servlet 3.1的容器(因?yàn)橹С之惒椒亲枞?,而Servlet技術(shù)棧用的是Servlet容器

  • 在Web端,響應(yīng)式用的是WebFlux,Servlet用的是SpringMVC

  • …..

總結(jié)起來,WebFlux只是響應(yīng)式編程中的一部分(在Web控制端),所以一般我們用它與SpringMVC來對比。

二、如何理解響應(yīng)式編程?

在上面提到了響應(yīng)式編程(Reactive Programming),而WebFlux只是響應(yīng)式編程的其中一個技術(shù)棧而已,所以我們先來探討一下什么是響應(yīng)式編程

從維基百科里邊我們得到的定義:

reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change

響應(yīng)式編程(reactive programming)是一種基于數(shù)據(jù)流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式

在維基百科上也舉了個小例子:

意思大概如下:

  • 在命令式編程(我們的日常編程模式)下,式子a=b+c,這就意味著a的值是由bc計(jì)算出來的。如果b或者c后續(xù)有變化,不會影響a的值

  • 在響應(yīng)式編程下,式子a:=b+c,這就意味著a的值是由bc計(jì)算出來的。但如果b或者c的值后續(xù)有變化,會影響a的值

我認(rèn)為上面的例子已經(jīng)可以幫助我們理解變化傳遞(propagation of change)

那數(shù)據(jù)流(data stream)和聲明式(declarative)怎么理解呢?那可以提一提我們的Stream流了。之前寫過Lambda表達(dá)式和Stream流的文章,大家可以先去看看:

  • 最近學(xué)到的Lambda表達(dá)式基礎(chǔ)知識

  • 手把手帶你體驗(yàn)Stream流

Lambda的語法是這樣的(Stream流的使用會涉及到很多Lambda表達(dá)式的東西,所以一般先學(xué)Lambda再學(xué)Stream流):

Stream流的使用分為三個步驟(創(chuàng)建Stream流、執(zhí)行中間操作、執(zhí)行最終操作):

執(zhí)行中間操作實(shí)際上就是給我們提供了很多的API去操作Stream流中的數(shù)據(jù)(求和/去重/過濾)等等

說了這么多,怎么理解數(shù)據(jù)流和聲明式呢?其實(shí)是這樣的:

  • 本來數(shù)據(jù)是我們自行處理的,后來我們把要處理的數(shù)據(jù)抽象出來(變成了數(shù)據(jù)流),然后通過API去處理數(shù)據(jù)流中的數(shù)據(jù)(是聲明式的)

比如下面的代碼;將數(shù)組中的數(shù)據(jù)變成數(shù)據(jù)流,通過顯式聲明調(diào)用.sum()來處理數(shù)據(jù)流中的數(shù)據(jù),得到最終的結(jié)果:

public?static?void?main(String[]?args)?{
????int[]?nums?=?{?1,?2,?3?};
????int?sum2?=?IntStream.of(nums).parallel().sum();
????System.out.println("結(jié)果為:"?+?sum2);
}

如圖下所示:


上面講了響應(yīng)式編程是什么:

響應(yīng)式編程(reactive programming)是一種基于數(shù)據(jù)流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式

也講解了數(shù)據(jù)流/變化傳遞/聲明式是什么意思,但說到響應(yīng)式編程就離不開異步非阻塞。

從Spring官網(wǎng)介紹WebFlux的信息我們就可以發(fā)現(xiàn)asynchronous, nonblocking?這樣的字樣,因?yàn)?strong>響應(yīng)式編程它是異步的,也可以理解成變化傳遞它是異步執(zhí)行的。

如下圖,合計(jì)的金額會受其他的金額影響(更新的過程是異步的):


我們的JDK8 Stream流是同步的,它就不適合用于響應(yīng)式編程(但基礎(chǔ)的用法是需要懂的,因?yàn)轫憫?yīng)式流編程都是操作嘛)

而在JDK9 已經(jīng)支持響應(yīng)式流了,下面我們來看一下

三、JDK9 Reactive

響應(yīng)式流的規(guī)范早已經(jīng)被提出了:里面提到了:

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking?back pressure?? ----->http://www.reactive-streams.org/

翻譯再加點(diǎn)信息:

響應(yīng)式流(Reactive Streams)通過定義一組實(shí)體,接口和互操作方法,給出了實(shí)現(xiàn)異步非阻塞背壓的標(biāo)準(zhǔn)。第三方遵循這個標(biāo)準(zhǔn)來實(shí)現(xiàn)具體的解決方案,常見的有Reactor,RxJava,Akka Streams,Ratpack等。

規(guī)范里頭實(shí)際上就是定義了四個接口:

Java 平臺直到 JDK 9才提供了對于Reactive的完整支持,JDK9也定義了上述提到的四個接口,在java.util.concurrent包上

一個通用的流處理架構(gòu)一般會是這樣的(生產(chǎn)者產(chǎn)生數(shù)據(jù),對數(shù)據(jù)進(jìn)行中間處理,消費(fèi)者拿到數(shù)據(jù)消費(fèi)):

  • 數(shù)據(jù)來源,一般稱為生產(chǎn)者(Producer)

  • 數(shù)據(jù)的目的地,一般稱為消費(fèi)者(Consumer)

  • 在處理時,對數(shù)據(jù)執(zhí)行某些操作一個或多個處理階段。(Processor)

到這里我們再看回響應(yīng)式流的接口,我們應(yīng)該就能懂了:

  • Publisher(發(fā)布者)相當(dāng)于生產(chǎn)者(Producer)

  • Subscriber(訂閱者)相當(dāng)于消費(fèi)者(Consumer)

  • Processor就是在發(fā)布者與訂閱者之間處理數(shù)據(jù)用的

在響應(yīng)式流上提到了back pressure(背壓)這么一個概念,其實(shí)非常好理解。在響應(yīng)式流實(shí)現(xiàn)異步非阻塞是基于生產(chǎn)者和消費(fèi)者模式的,而生產(chǎn)者消費(fèi)者很容易出現(xiàn)的一個問題就是:生產(chǎn)者生產(chǎn)數(shù)據(jù)多了,就把消費(fèi)者給壓垮了

而背壓說白了就是:消費(fèi)者能告訴生產(chǎn)者自己需要多少量的數(shù)據(jù)。這里就是Subscription接口所做的事。

下面我們來看看JDK9接口的方法,或許就更加能理解上面所說的話了:

//?發(fā)布者(生產(chǎn)者)
public?interface?Publisher<T>?{
????public?void?subscribe(Subscriber<??super?T>?s);
}
//?訂閱者(消費(fèi)者)
public?interface?Subscriber<T>?{
????public?void?onSubscribe(Subscription?s);
????public?void?onNext(T?t);
????public?void?onError(Throwable?t);
????public?void?onComplete();
}
//?用于發(fā)布者與訂閱者之間的通信(實(shí)現(xiàn)背壓:訂閱者能夠告訴生產(chǎn)者需要多少數(shù)據(jù))
public?interface?Subscription?{
????public?void?request(long?n);
????public?void?cancel();
}
//?用于處理發(fā)布者?發(fā)布消息后,對消息進(jìn)行處理,再交由消費(fèi)者消費(fèi)
public?interface?Processor<T,R>?extends?Subscriber<T>,?Publisher<R>?{
}

3.1 看個例子

代碼中有大量的注釋,我就不多BB了,建議直接復(fù)制跑一下看看:

class?MyProcessor?extends?SubmissionPublisher<String>
????????implements?Processor<Integer,?String>?{

????private?Subscription?subscription;

????@Override
????public?void?onSubscribe(Subscription?subscription)?{
????????//?保存訂閱關(guān)系,?需要用它來給發(fā)布者響應(yīng)
????????this.subscription?=?subscription;

????????//?請求一個數(shù)據(jù)
????????this.subscription.request(1);
????}

????@Override
????public?void?onNext(Integer?item)?{
????????//?接受到一個數(shù)據(jù),?處理
????????System.out.println("處理器接受到數(shù)據(jù):?"?+?item);

????????//?過濾掉小于0的,?然后發(fā)布出去
????????if?(item?>?0)?{
????????????this.submit("轉(zhuǎn)換后的數(shù)據(jù):"?+?item);
????????}

????????//?處理完調(diào)用request再請求一個數(shù)據(jù)
????????this.subscription.request(1);

????????//?或者?已經(jīng)達(dá)到了目標(biāo),?調(diào)用cancel告訴發(fā)布者不再接受數(shù)據(jù)了
????????//?this.subscription.cancel();
????}

????@Override
????public?void?onError(Throwable?throwable)?{
????????//?出現(xiàn)了異常(例如處理數(shù)據(jù)的時候產(chǎn)生了異常)
????????throwable.printStackTrace();

????????//?我們可以告訴發(fā)布者,?后面不接受數(shù)據(jù)了
????????this.subscription.cancel();
????}

????@Override
????public?void?onComplete()?{
????????//?全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
????????System.out.println("處理器處理完了!");
????????//?關(guān)閉發(fā)布者
????????this.close();
????}

}

public?class?FlowDemo2?{

????public?static?void?main(String[]?args)?throws?Exception?{
????????//?1.?定義發(fā)布者,?發(fā)布的數(shù)據(jù)類型是?Integer
????????//?直接使用jdk自帶的SubmissionPublisher
????????SubmissionPublisher<Integer>?publiser?=?new?SubmissionPublisher<Integer>();

????????//?2.?定義處理器,?對數(shù)據(jù)進(jìn)行過濾,?并轉(zhuǎn)換為String類型
????????MyProcessor?processor?=?new?MyProcessor();

????????//?3.?發(fā)布者?和?處理器?建立訂閱關(guān)系
????????publiser.subscribe(processor);

????????//?4.?定義最終訂閱者,?消費(fèi)?String?類型數(shù)據(jù)
????????Subscriber<String>?subscriber?=?new?Subscriber<String>()?{

????????????private?Subscription?subscription;

????????????@Override
????????????public?void?onSubscribe(Subscription?subscription)?{
????????????????//?保存訂閱關(guān)系,?需要用它來給發(fā)布者響應(yīng)
????????????????this.subscription?=?subscription;

????????????????//?請求一個數(shù)據(jù)
????????????????this.subscription.request(1);
????????????}

????????????@Override
????????????public?void?onNext(String?item)?{
????????????????//?接受到一個數(shù)據(jù),?處理
????????????????System.out.println("接受到數(shù)據(jù):?"?+?item);

????????????????//?處理完調(diào)用request再請求一個數(shù)據(jù)
????????????????this.subscription.request(1);

????????????????//?或者?已經(jīng)達(dá)到了目標(biāo),?調(diào)用cancel告訴發(fā)布者不再接受數(shù)據(jù)了
????????????????//?this.subscription.cancel();
????????????}

????????????@Override
????????????public?void?onError(Throwable?throwable)?{
????????????????//?出現(xiàn)了異常(例如處理數(shù)據(jù)的時候產(chǎn)生了異常)
????????????????throwable.printStackTrace();

????????????????//?我們可以告訴發(fā)布者,?后面不接受數(shù)據(jù)了
????????????????this.subscription.cancel();
????????????}

????????????@Override
????????????public?void?onComplete()?{
????????????????//?全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
????????????????System.out.println("處理完了!");
????????????}

????????};

????????//?5.?處理器?和?最終訂閱者?建立訂閱關(guān)系
????????processor.subscribe(subscriber);

????????//?6.?生產(chǎn)數(shù)據(jù),?并發(fā)布
????????publiser.submit(-111);
????????publiser.submit(111);

????????//?7.?結(jié)束后?關(guān)閉發(fā)布者
????????//?正式環(huán)境?應(yīng)該放?finally?或者使用?try-resouce?確保關(guān)閉
????????publiser.close();

????????//?主線程延遲停止,?否則數(shù)據(jù)沒有消費(fèi)就退出
????????Thread.currentThread().join(1000);
????}

}

輸出的結(jié)果如下:

流程實(shí)際上非常簡單的:

參考資料:

  • https://yanbin.blog/java-9-talk-reactive-stream/#more-8877

  • https://blog.csdn.net/wudaoshihun/article/details/83070086

  • http://www.spring4all.com/article/6826

  • https://www.cnblogs.com/IcanFixIt/p/7245377.html

Java 8 的 Stream 主要關(guān)注在流的過濾,映射,合并,而 ?Reactive Stream 更進(jìn)一層,側(cè)重的是流的產(chǎn)生與消費(fèi),即流在生產(chǎn)與消費(fèi)者之間的協(xié)調(diào)

說白了就是:響應(yīng)式流是異步非阻塞+流量控制的(可以告訴生產(chǎn)者自己需要多少的量/取消訂閱關(guān)系)

展望響應(yīng)式編程的場景應(yīng)用:

比如一個日志監(jiān)控系統(tǒng),我們的前端頁面將不再需要通過“命令式”的輪詢的方式不斷向服務(wù)器請求數(shù)據(jù)然后進(jìn)行更新,而是在建立好通道之后,數(shù)據(jù)流從系統(tǒng)源源不斷流向頁面,從而展現(xiàn)實(shí)時的指標(biāo)變化曲線;

再比如一個社交平臺,朋友的動態(tài)、點(diǎn)贊和留言不是手動刷出來的,而是當(dāng)后臺數(shù)據(jù)變化的時候自動體現(xiàn)到界面上的。

四、入門WebFlux

扯了一大堆,終于回到WebFlux了。經(jīng)過上面的基礎(chǔ),我們現(xiàn)在已經(jīng)能夠得出一些結(jié)論的了:

  • WebFlux是Spring推出響應(yīng)式編程的一部分(web端)

  • 響應(yīng)式編程是異步非阻塞的(是一種基于數(shù)據(jù)流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式)

我們再回來看官網(wǎng)的圖:

?

4.1 簡單體驗(yàn)WebFlux

Spring官方為了讓我們更加快速/平滑到WebFlux上,之前SpringMVC那套都是支持的。也就是說:我們可以像使用SpringMVC一樣使用著WebFlux。

WebFlux使用的響應(yīng)式流并不是用JDK9平臺的,而是一個叫做Reactor響應(yīng)式流庫。所以,入門WebFlux其實(shí)更多是了解怎么使用Reactor的API,下面我們來看看~

Reactor是一個響應(yīng)式流,它也有對應(yīng)的發(fā)布者(Publisher?),Reactor的發(fā)布者用兩個類來表示:

  • Mono(返回0或1個元素)

  • Flux(返回0-n個元素)

而消費(fèi)者則是Spring框架幫我們去完成

下面我們來看一個簡單的例子(基于WebFlux環(huán)境構(gòu)建):

//?阻塞5秒鐘
private?String?createStr()?{
????try?{
????????TimeUnit.SECONDS.sleep(5);
????}?catch?(InterruptedException?e)?{
????}
????return?"some?string";
}

//?普通的SpringMVC方法
@GetMapping("/1")
private?String?get1()?{
????log.info("get1?start");
????String?result?=?createStr();
????log.info("get1?end.");
????return?result;
}

//?WebFlux(返回的是Mono)
@GetMapping("/2")
private?Mono<String>?get2()?{
????log.info("get2?start");
????Mono<String>?result?=?Mono.fromSupplier(()?->?createStr());
????log.info("get2?end.");
????return?result;
}

首先,值得說明的是,我們構(gòu)建WebFlux環(huán)境啟動時,應(yīng)用服務(wù)器默認(rèn)是Netty的:

基于Netty

我們分別來訪問一下SpringMVC的接口和WebFlux的接口,看一下有什么區(qū)別:

SpringMVC:

WebFlux:

?

從調(diào)用者(瀏覽器)的角度而言,是感知不到有什么變化的,因?yàn)槎际堑玫却?s才返回?cái)?shù)據(jù)。但是,從服務(wù)端的日志我們可以看出,WebFlux是直接返回Mono對象的(而不是像SpringMVC一直同步阻塞5s,線程才返回)。

這正是WebFlux的好處:能夠以固定的線程來處理高并發(fā)(充分發(fā)揮機(jī)器的性能)。

WebFlux還支持服務(wù)器推送(SSE - >Server Send Event),我們來看個例子:

/**
?????*?Flux?:?返回0-n個元素
?????*?注:需要指定MediaType
?????*?@return
?????*/
@GetMapping(value?=?"/3",?produces?=?MediaType.TEXT_EVENT_STREAM_VALUE)
private?Flux<String>?flux()?{
????Flux<String>?result?=?Flux
????????.fromStream(IntStream.range(1,?5).mapToObj(i?->?{
????????????try?{
????????????????TimeUnit.SECONDS.sleep(1);
????????????}?catch?(InterruptedException?e)?{
????????????}
????????????return?"flux?data--"?+?i;
????????}));
????return?result;
}

效果就是每秒會給瀏覽器推送數(shù)據(jù):

服務(wù)器推送

WebFlux我還沒寫完,這篇寫了WebFlux支持SpringMVC那套注解來開發(fā),下篇寫寫如何使用WebFlux另一種模式(Functional Endpoints)來開發(fā)以及一些常見的問題還需要補(bǔ)充一下~



(轉(zhuǎn)載)外行人都能看懂的WebFlux,錯過了血虧的評論 (共 條)

分享到微博請遵守國家法律
清水河县| 新泰市| 诏安县| 罗田县| 阳春市| 重庆市| 博罗县| 广东省| 岑巩县| 夏河县| 额敏县| 吴堡县| 旺苍县| 广昌县| 韶关市| 青冈县| 宜都市| 辽中县| 凤翔县| 资兴市| 克山县| 绍兴县| 沭阳县| 穆棱市| 黑龙江省| 潜山县| 池州市| 高淳县| 扶余县| 轮台县| 金寨县| 无棣县| 民丰县| 于都县| 安平县| 台山市| 门源| 东山县| 大兴区| 鄂伦春自治旗| 德清县|