Skip to content

Commit

Permalink
Add skip-last, concat-reduce, flatten
Browse files Browse the repository at this point in the history
  • Loading branch information
Alotor authored and niwinz committed Jul 5, 2021
1 parent 0d685ca commit cb65c3c
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion src/beicon/core.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 2,7 @@
(:refer-clojure :exclude [true? map filter reduce merge repeat first
last mapcat repeatedly zip dedupe drop
take take-while map-indexed concat empty
delay range throw do trampoline subs])
delay range throw do trampoline subs flatten])
(:require [beicon.impl.rxjs]
[beicon.impl.rxjs-operators]
[cljs.core :as c]))
Expand Down Expand Up @@ -416,6 416,11 @@
[pob ob]
(pipe ob (.skipUntil ^js rxop pob)))

(defn skip-last
"Skip a specified number of values before the completion of an observable."
[n ob]
(.pipe ob (.skipLast ^js rxop (int n))))

(defn take
"Bypasses a specified number of elements in an observable sequence and
then returns the remaining elements."
Expand Down Expand Up @@ -717,6 722,25 @@
[ms]
(.interval rx ms))

(defn flatten
"Just like clojure collections flatten but for rx streams. Given a stream
off collections will emit every value separately"
[ob]
(pipe ob (.concatMap ^js rxop identity)))

(defn concat-reduce
"Like reduce but accepts a function that returns a stream. Will use as
value for the next step in the reduce the last valued emited by the stream
in the function."
[f seed ob]
(let [current-acc (atom seed)]
(->> (concat
(of seed)
(->> ob
(mapcat #(f @current-acc %))
(tap #(reset! current-acc %))))
(last))))

;; --- Schedulers

(defn scheduler
Expand Down

0 comments on commit cb65c3c

Please sign in to comment.