SimpleChain

class SimpleChain<out R>(gen: suspend () -> R) : Chain<R> (source)

A simple chain of independent tokens. fork returns the same chain.

Constructors

Link copied to clipboard
constructor(gen: suspend () -> R)

Functions

Link copied to clipboard

Represent a chain as a sequence

Link copied to clipboard
inline fun <T, R> Flow<T>.async(dispatcher: CoroutineDispatcher = Dispatchers.Default, crossinline block: suspend CoroutineScope.(T) -> R): AsyncFlow<R>
Link copied to clipboard

Specialized flow chunker for real buffer

fun <T> Flow<T>.chunked(bufferSize: Int, bufferFactory: BufferFactory<T>): Flow<Buffer<T>>

Collect incoming flow into fixed size chunks

Link copied to clipboard
open suspend override fun collect(collector: FlowCollector<R>)
Link copied to clipboard
fun <T, R> Chain<T>.combine(mapper: suspend (Chain<T>) -> R): Chain<R>

Map the whole chain

Link copied to clipboard
fun <T, S, R> Chain<T>.combineWithState(state: S, stateFork: (S) -> S, mapper: suspend S.(Chain<T>) -> R): Chain<R>
Link copied to clipboard
fun <T> Flow<T>.cumulativeSum(group: GroupOps<T>): Flow<T>

Return a Flow of a cumulative sum of elements in the flow. The operation is intermediate and stateful.

Link copied to clipboard
inline suspend fun <T> Chain<T>.discard(number: Int): Chain<T>

Discard a fixed number of samples

Link copied to clipboard
fun <T> Chain<T>.filter(block: (T) -> Boolean): Chain<T>

block must be a pure function or at least not use external random variables, otherwise fork could be broken

Link copied to clipboard
open suspend override fun fork(): Chain<R>

Create a copy of the current chain state. Consuming the resulting chain does not affect the initial chain.

Link copied to clipboard
operator fun <R> Chain<R>.iterator(): Iterator<R>

Represent a chain as regular iterator (uses blocking calls)

Link copied to clipboard
fun <T, R> Chain<T>.map(func: suspend (T) -> R): Chain<R>

Map the chain result using suspended transformation. Initial chain result can no longer be safely consumed since mapped chain consumes tokens. Accepts regular transformation function.

Link copied to clipboard
inline fun <T, R> Flow<T>.mapParallel(dispatcher: CoroutineDispatcher = Dispatchers.Default, crossinline transform: suspend (T) -> R): Flow<R>
Link copied to clipboard
fun <T, S : Ring<T>, ScaleOperations<T>> Flow<T>.mean(space: S): Flow<T>

Return a Flow of mean values of elements in the flow. The operation is intermediate and stateful.

Link copied to clipboard
open suspend override fun next(): R

Generate next value, changing state if needed

Link copied to clipboard
inline suspend fun <T> Chain<T>.nextBuffer(size: Int): Buffer<T>
Link copied to clipboard
fun <T> Flow<Buffer<T>>.spread(): Flow<T>

Flat map a Flow of Buffer into continuous Flow of elements

Link copied to clipboard
fun <T> Flow<T>.windowed(window: Int, algebra: Group<T>): Flow<Buffer<T>>

Map a flow to a moving window buffer. The window step is one. To get different steps, one could use skip operation.

Link copied to clipboard
fun <T, U, R> Chain<T>.zip(other: Chain<U>, block: suspend (T, U) -> R): Chain<R>

Zip two chains together using given transformation