Reactive-Stream不只是簡單的push-model-stream, 它還帶有“拖式”(pull-model)性質。這是因為在Iteratee模式里雖然理論上由Enumerator負責主動推送數據,實現了push-model功能。但實際上Iteratee也會根據自身情況,通過提供callback函數通知Enumerator可以開始推送數據,這從某種程度上也算是一種pull-model。換句話講Reactive-Streams是通過push-pull-model來實現上下游Enumerator和Iteratee之間互動的。我們先看個簡單的Iteratee例子:
def showElements: Iteratee[Int,Unit] = Cont { case Input.El(e) => PRintln(s"EL($e)") showElements case Input.Empty => showElements case Input.EOF => println("EOF") Done((),Input.EOF)} //> showElements: => play.api.libs.iteratee.Iteratee[Int,Unit]val enumNumbers = Enumerator(1,2,3,4,5) //> enumNumbers : play.api.libs.iteratee.Enumerator[Int] = play.api.libs.iteratee.Enumerator$$anon$19@47f6473enumNumbers |>> showElements //> EL(1) //| EL(2) //| EL(3) //| EL(4) //| EL(5) //| res0: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Unit]] = Success(Cont(<function1>))我們看到:enumNumbers |>> showElements立刻啟動了運算。但并沒有實際完成數據發送,因為showElements并沒有收到Input.EOF。首先,我們必須用Iteratee.run來完成運算:val it = Iteratee.flatten(enum |>> consumeAll).run//> El(1) //| El(2) //| El(3) //| El(4) //| El(5) //| El(6) //| El(7) //| El(8) //| EOF //| it : scala.concurrent.Future[Int] = Success(99)這個run函數是這樣定義的:/** * Extracts the computed result of the Iteratee pushing an Input.EOF if necessary * Extracts the computed result of the Iteratee, pushing an Input.EOF first * if the Iteratee is in the [[play.api.libs.iteratee.Cont]] state. * In case of error, an exception may be thrown synchronously or may * be used to complete the returned Promise; this indeterminate behavior * is inherited from fold(). * * @return a [[scala.concurrent.Future]] of the eventually computed result */ def run: Future[A] = fold({ case Step.Done(a, _) => Future.successful(a) case Step.Cont(k) => k(Input.EOF).fold({ case Step.Done(a1, _) => Future.successful(a1) case Step.Cont(_) => sys.error("diverging iteratee after Input.EOF") case Step.Error(msg, e) => sys.error(msg) })(dec) case Step.Error(msg, e) => sys.error(msg) })(dec)再一個問題是:enumNumbers |>> showElements是個封閉的運算,我們無法逐部分截取數據流,只能取得整個運算結果。也就是說如果我們希望把一個Enumerator產生的數據引導到fs2 Stream的話,只能在所有數據都讀入內存后才能實現了。這樣就違背了使用Reactive-Streams的意愿。那我們應該怎么辦?一個可行的方法是使用一個存儲數據結構,用兩個線程,一個線程里Iteratee把當前數據存入數據結構,另一個線程里fs2把數據取出來。fs2.async.mutable包提供了個Queue類型,我們可以用這個Queue結構來作為Iteratee與fs2之間的管道:Iteratee從一頭把數據壓進去(enqueue),fs2從另一頭把數據取出來(dequeue)。我們先設計enqueue部分,這部分是在Iteratee里進行的:
def enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont { case Input.EOF => q.enqueue1(None).unsafeRun Done((),Input.EOF) case Input.Empty => enqueueTofs2(q) case Input.El(e) => q.enqueue1(Some(e)).unsafeRun enqueueTofs2(q)} //> enqueueTofs2: (q: fs2.async.mutable.Queue[fs2.Task,Option[Int]])play.api.libs.iteratee.Iteratee[Int,Unit]先分析一下這個Iteratee:我們直接把enqueueTofs2放入Cont狀態,也就是等待接受數據狀態。當收到數據時運行q.enqueue1把數據塞入q,然后不斷循環運行至收到Input.EOF。注意:q.enqueue1(Some(e)).unsafeRun是個同步運算,在未成功完成數據enqueue1的情況下會一直占用線程。所以,q另一端的dequeue部分必須是在另一個線程里運行,否則會造成整個程序的死鎖。fs2的Queue類型款式是:Queue[F,A],所以我們必須用Stream.eval來對這個Queue進行函數式的操作:
val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q => //run Enumerator-Iteratee and enqueue data in thread 1 //dequeue data and en-stream in thread 2 (current thread) }因為Stream.eval運算結果是Stream[Task,Int],所以我們可以得出這個flatMap內的函數款式 Queue[Task,Option[Int]] => Stream[Task,Int]。下面我們先考慮如何實現數據enqueue部分:這部分是通過Iteratee的運算過程產生的。我們提到過這部分必須在另一個線程里運行,所以可以用Task來選定另一線程如下:Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()現在這個Task就在后面另一個線程里自己去運算了。但它的運行進展則會依賴于另一個線程中dequeue數據的進展。我們先看看fs2提供的兩個函數款式:/** Repeatedly calls `dequeue1` forever. */ def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat/** * Halts the input stream at the first `None`. * * @example {{{ * scala> Stream[Pure, Option[Int]](Some(1), Some(2), None, Some(3), None).unNoneTerminate.toList * res0: List[Int] = List(1, 2) * }}} */ def unNoneTerminate[F[_],I]: Pipe[F,Option[I],I] = _ repeatPull { _.receive { case (hd, tl) => val out = Chunk.indexedSeq(hd.toVector.takeWhile { _.isDefined }.collect { case Some(i) => i }) if (out.size == hd.size) Pull.output(out) as tl else if (out.isEmpty) Pull.done else Pull.output(out) >> Pull.done }}剛好,dequeue產生Stream[F,A]。而unNoneTerminate可以根據Stream(None)來終止運算。現在我們可以把這個Reactive-Streams到fs2-pull-streams轉換過程這樣來定義:implicit val strat = Strategy.fromFixedDaemonPool(4) //> strat : fs2.Strategy = Strategyval fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q => Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture pipe.unNoneTerminate(q.dequeue)} //> fs2Stream : fs2.Stream[fs2.Task,Int] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>)現在這個stream應該已經變成fs2.Stream[Task,Int]了。我們可以用前面的log函數來試運行一下:def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }} //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A] fs2Stream.through(log("")).run.unsafeRun //> > 1 //| > 2 //| > 3 //| > 4 //| > 5我們成功的把Iteratee的Reactive-Stream轉化成fs2的Pull-Model-Stream。下面是這次討論的源代碼:
import play.api.libs.iteratee._import scala.concurrent._import scala.concurrent.duration._import scala.concurrent.ExecutionContext.Implicits.globalimport scala.collection.mutable._import fs2._object iteratees {def showElements: Iteratee[Int,Unit] = Cont { case Input.El(e) => println(s"EL($e)") showElements case Input.Empty => showElements case Input.EOF => println("EOF") Done((),Input.EOF)}val enumNumbers = Enumerator(1,2,3,4,5)enumNumbers |>> showElementsIteratee.flatten(enumNumbers |>> showElements).rundef enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont { case Input.EOF => q.enqueue1(None).unsafeRun Done((),Input.EOF) case Input.Empty => enqueueTofs2(q) case Input.El(e) => q.enqueue1(Some(e)).unsafeRun enqueueTofs2(q)}implicit val strat = Strategy.fromFixedDaemonPool(4)val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q => Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture pipe.unNoneTerminate(q.dequeue)}def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }} fs2Stream.through(log("")).run.unsafeRun }
|
新聞熱點
疑難解答