麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 學院 > 開發(fā)設(shè)計 > 正文

FunDA(7)- Reactive Streams to fs2 Pull Streams

2019-11-11 04:50:09
字體:
供稿:網(wǎng)友

    Reactive-Stream不只是簡單的push-model-stream, 它還帶有“拖式”(pull-model)性質(zhì)。這是因為在Iteratee模式里雖然理論上由Enumerator負責主動推送數(shù)據(jù),實現(xiàn)了push-model功能。但實際上Iteratee也會根據(jù)自身情況,通過提供callback函數(shù)通知Enumerator可以開始推送數(shù)據(jù),這從某種程度上也算是一種pull-model。換句話講Reactive-Streams是通過push-pull-model來實現(xiàn)上下游Enumerator和Iteratee之間互動的。我們先看個簡單的Iteratee例子:

def showElements: Iteratee[Int,Unit] = Cont {  case Input.El(e) =>     PRintln(s"EL($e)")     showElements  case Input.Empty => showElements  case Input.EOF =>     println("EOF")     Done((),Input.EOF)}                                                 //> showElements: => play.api.libs.iteratee.Iteratee[Int,Unit]val enumNumbers = Enumerator(1,2,3,4,5)           //> enumNumbers  : play.api.libs.iteratee.Enumerator[Int] = play.api.libs.iteratee.Enumerator$$anon$19@47f6473enumNumbers |>> showElements                      //> EL(1)                                                  //| EL(2)                                                  //| EL(3)                                                  //| EL(4)                                                  //| EL(5)                                                  //| res0: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Unit]] = Success(Cont(<function1>))我們看到:enumNumbers |>> showElements立刻啟動了運算。但并沒有實際完成數(shù)據(jù)發(fā)送,因為showElements并沒有收到Input.EOF。首先,我們必須用Iteratee.run來完成運算:

val it = Iteratee.flatten(enum |>> consumeAll).run//> El(1)                                                  //| El(2)                                                  //| El(3)                                                  //| El(4)                                                  //| El(5)                                                  //| El(6)                                                  //| El(7)                                                  //| El(8)                                                  //| EOF                                                  //| it  : scala.concurrent.Future[Int] = Success(99)這個run函數(shù)是這樣定義的:

/**   * Extracts the computed result of the Iteratee pushing an Input.EOF if necessary   * Extracts the computed result of the Iteratee, pushing an Input.EOF first   * if the Iteratee is in the [[play.api.libs.iteratee.Cont]] state.   * In case of error, an exception may be thrown synchronously or may   * be used to complete the returned Promise; this indeterminate behavior   * is inherited from fold().   *   *  @return a [[scala.concurrent.Future]] of the eventually computed result   */  def run: Future[A] = fold({    case Step.Done(a, _) => Future.successful(a)    case Step.Cont(k) => k(Input.EOF).fold({      case Step.Done(a1, _) => Future.successful(a1)      case Step.Cont(_) => sys.error("diverging iteratee after Input.EOF")      case Step.Error(msg, e) => sys.error(msg)    })(dec)    case Step.Error(msg, e) => sys.error(msg)  })(dec)再一個問題是:enumNumbers |>> showElements是個封閉的運算,我們無法逐部分截取數(shù)據(jù)流,只能取得整個運算結(jié)果。也就是說如果我們希望把一個Enumerator產(chǎn)生的數(shù)據(jù)引導(dǎo)到fs2 Stream的話,只能在所有數(shù)據(jù)都讀入內(nèi)存后才能實現(xiàn)了。這樣就違背了使用Reactive-Streams的意愿。那我們應(yīng)該怎么辦?一個可行的方法是使用一個存儲數(shù)據(jù)結(jié)構(gòu),用兩個線程,一個線程里Iteratee把當前數(shù)據(jù)存入數(shù)據(jù)結(jié)構(gòu),另一個線程里fs2把數(shù)據(jù)取出來。fs2.async.mutable包提供了個Queue類型,我們可以用這個Queue結(jié)構(gòu)來作為Iteratee與fs2之間的管道:Iteratee從一頭把數(shù)據(jù)壓進去(enqueue),fs2從另一頭把數(shù)據(jù)取出來(dequeue)。

我們先設(shè)計enqueue部分,這部分是在Iteratee里進行的:

def enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {   case Input.EOF =>       q.enqueue1(None).unsafeRun       Done((),Input.EOF)   case Input.Empty => enqueueTofs2(q)   case Input.El(e) =>       q.enqueue1(Some(e)).unsafeRun       enqueueTofs2(q)}    //> enqueueTofs2: (q: fs2.async.mutable.Queue[fs2.Task,Option[Int]])play.api.libs.iteratee.Iteratee[Int,Unit]

先分析一下這個Iteratee:我們直接把enqueueTofs2放入Cont狀態(tài),也就是等待接受數(shù)據(jù)狀態(tài)。當收到數(shù)據(jù)時運行q.enqueue1把數(shù)據(jù)塞入q,然后不斷循環(huán)運行至收到Input.EOF。注意:q.enqueue1(Some(e)).unsafeRun是個同步運算,在未成功完成數(shù)據(jù)enqueue1的情況下會一直占用線程。所以,q另一端的dequeue部分必須是在另一個線程里運行,否則會造成整個程序的死鎖。fs2的Queue類型款式是:Queue[F,A],所以我們必須用Stream.eval來對這個Queue進行函數(shù)式的操作:

val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>    //run Enumerator-Iteratee and enqueue data in thread 1    //dequeue data and en-stream in thread 2 (current thread)  }因為Stream.eval運算結(jié)果是Stream[Task,Int],所以我們可以得出這個flatMap內(nèi)的函數(shù)款式 Queue[Task,Option[Int]] => Stream[Task,Int]。下面我們先考慮如何實現(xiàn)數(shù)據(jù)enqueue部分:這部分是通過Iteratee的運算過程產(chǎn)生的。我們提到過這部分必須在另一個線程里運行,所以可以用Task來選定另一線程如下:

    Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()現(xiàn)在這個Task就在后面另一個線程里自己去運算了。但它的運行進展則會依賴于另一個線程中dequeue數(shù)據(jù)的進展。我們先看看fs2提供的兩個函數(shù)款式:

/** Repeatedly calls `dequeue1` forever. */  def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat/**   * Halts the input stream at the first `None`.   *   * @example {{{   * scala> Stream[Pure, Option[Int]](Some(1), Some(2), None, Some(3), None).unNoneTerminate.toList   * res0: List[Int] = List(1, 2)   * }}}   */  def unNoneTerminate[F[_],I]: Pipe[F,Option[I],I] =    _ repeatPull { _.receive {      case (hd, tl) =>        val out = Chunk.indexedSeq(hd.toVector.takeWhile { _.isDefined }.collect { case Some(i) => i })        if (out.size == hd.size) Pull.output(out) as tl        else if (out.isEmpty) Pull.done        else Pull.output(out) >> Pull.done    }}

剛好,dequeue產(chǎn)生Stream[F,A]。而unNoneTerminate可以根據(jù)Stream(None)來終止運算。現(xiàn)在我們可以把這個Reactive-Streams到fs2-pull-streams轉(zhuǎn)換過程這樣來定義:

implicit val strat = Strategy.fromFixedDaemonPool(4)                                                  //> strat  : fs2.Strategy = Strategyval fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture  pipe.unNoneTerminate(q.dequeue)}   //> fs2Stream  : fs2.Stream[fs2.Task,Int] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>)現(xiàn)在這個stream應(yīng)該已經(jīng)變成fs2.Stream[Task,Int]了。我們可以用前面的log函數(shù)來試運行一下:

def log[A](prompt: String): Pipe[Task,A,A] =    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}                                                  //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]    fs2Stream.through(log("")).run.unsafeRun          //> > 1                                                  //| > 2                                                  //| > 3                                                  //| > 4                                                  //| > 5我們成功的把Iteratee的Reactive-Stream轉(zhuǎn)化成fs2的Pull-Model-Stream。

下面是這次討論的源代碼:

import play.api.libs.iteratee._import scala.concurrent._import scala.concurrent.duration._import scala.concurrent.ExecutionContext.Implicits.globalimport scala.collection.mutable._import fs2._object iteratees {def showElements: Iteratee[Int,Unit] = Cont {  case Input.El(e) =>     println(s"EL($e)")     showElements  case Input.Empty => showElements  case Input.EOF =>     println("EOF")     Done((),Input.EOF)}val enumNumbers = Enumerator(1,2,3,4,5)enumNumbers |>> showElementsIteratee.flatten(enumNumbers |>> showElements).rundef enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {   case Input.EOF =>       q.enqueue1(None).unsafeRun       Done((),Input.EOF)   case Input.Empty => enqueueTofs2(q)   case Input.El(e) =>       q.enqueue1(Some(e)).unsafeRun       enqueueTofs2(q)}implicit val strat = Strategy.fromFixedDaemonPool(4)val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture  pipe.unNoneTerminate(q.dequeue)}def log[A](prompt: String): Pipe[Task,A,A] =    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}    fs2Stream.through(log("")).run.unsafeRun }


發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 日本aaaa片毛片免费观蜜桃 | 国产精品欧美久久久久一区二区 | 懂色av懂色aⅴ精彩av | 亚洲天堂在线电影 | 欧美一级毛片美99毛片 | 久久亚洲精品久久国产一区二区 | 毛片小网站 | 日韩视| 亚洲精品 欧美 | 污污网站入口 | 久久免费视频一区 | 羞羞网站视频 | 成人毛片免费视频 | 日韩黄色精品视频 | 国产精品免费久久久久 | 蜜桃网站在线观看 | 在线日韩亚洲 | 久久久久久久久久美女 | 久久久久久久久日本理论电影 | 成人短视频在线观看免费 | 欧美一级做性受免费大片免费 | 精品国产呦系列在线看 | 女教师~淫辱の动漫在线 | 午夜视频在线免费观看 | 久久亚洲美女视频 | 鲁久久| 日本欧美一区二区三区在线观看 | 欧美一区二区三区中文字幕 | 午夜在线观看视频网站 | 亚洲性综合网 | 内地av在线| 国产超碰人人做人人爱 | 精品国产精品久久 | 久久久资源网 | 宅男噜噜噜66一区二区 | 成人aaaaa片毛片按摩 | 亚洲一区在线视频观看 | 国产精品久久久久久久久久10秀 | 成人福利免费在线观看 | 久久国产精品久久精品国产演员表 | 国产资源在线观看视频 |