subscribe method

  1. @override
Disposable subscribe(
  1. Observer<T> observer
)
override

Subscribes with the provided observer.

Implementation

@override
Disposable subscribe(Observer<T> observer) {
  final subscriber = createSubscriber(observer);
  for (final event in sequence.events.whereType<WrappedEvent<T>>()) {
    final timestamp = scheduler.now.add(scheduler.stepDuration * event.index);
    scheduler.scheduleAbsolute(
        timestamp, () => event.event.observe(subscriber));
  }
  return subscriber;
}