RxSwift uses quite a lot of operators utilizing reactive patterns. It is a core part where its magic happens. The Clarke’s third law says:
Any sufficiently advanced technology is indistinguishable from magic.
In the previous article, we found out the basic object of the RxSwift, an Observable. The other significant part is operators where most of the magic happens.
In this article, we create an app which pings web hosts and when it finds out the unavailable one it sends over notifications.
Declaring PingData
struct:
struct PingData { let host: URL let success: Bool let timeout: Double func debugPrint() { print("Ping host: \(host), success \(success), timeout \(timeout)") } }
The main task of the application is to ping the host. We create an Observable with a type of Observable
. It emits the value only once it receives the data.
For this task, we use either Observable
or Single
. The latter is the Observable emitting single element or error. It is a handy way to work with HTTP since there can be only success or failure and it works once. We use an Observable to show how it works with custom event processing.
func pingResource(urlPath: String) -> Observable { return Observable.create { observer in guard let url = URL(string: urlPath) else { observer.onError(RxError.invalidUrl) return Disposables.create() } let startTime = CACurrentMediaTime() let request = URLRequest.init(url: url) let task = URLSession.shared.dataTask(with: request, completionHandler: { (data, response, error) in let endTime = CACurrentMediaTime() if let error = error { if let httpResponse = response as? HTTPURLResponse { observer.onError(RxError.errorHTTP(httpCode: httpResponse.statusCode)) } else { observer.onError(error) } } let elapsedTime = endTime - startTime // Here we send the ping status if no error occured if let httpResponse = response as? HTTPURLResponse { let isPingOK = httpResponse.statusCode == 200 && data != nil observer.onNext(PingData(host: url, success: isPingOK, timeout: elapsedTime)) } else { observer.onNext(PingData(host: url, success: false, timeout: elapsedTime)) } // Completing the observable. Then it is going to be disposed observer.onCompleted() }) task.resume() return Disposables.create() } }
Inside the observable, we start URLSessionDataTask
. This is a handy way to wrap the async operations. You can argue that there is Operation
provided by Apple but it is chainable with dependencies only and doesn’t provide such flexibility as RxSwift.
To navigate in the world of Reactive programming we need Rx Marbles. It is a web site where we can see different observables in action and play with a sequence of observables.
CombineLates Marbles
Let’s go with more most commonly used operators combineLatest
and zip
.
combineLatest
— when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
combineLatest
emits result whenever one of the observables has emitted. It emits every time when one of the observables has emitted. It means that if ping Google
or ping Google
get an emitted value any number of time it would fire the subscription block.
Observable.combineLatest(pingResource(urlPath: googleUrlPath), pingResource(urlPath: bingUrlPath)) .subscribe(onNext: { (googlePingData, bingPingData) in googlePingData.debugPrint() googlePingData.debugPrint() }, onError: { (error) in print("Ping error \(error)") }) .disposed(by: disposeBag)
The same code but with Observable.zip
will differ from above.
Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
In this case, observables wait for both ping Google
and ping Bing
to be completed then emits zipped result altogether. It takes only observable output once zipped not repeating the subscription like combineLatest
Zip Marbles
There are different types of Observables. Some of them are:
Build Observables
Create Just Empty Interval Repeat
Transform
Buffer FlatMap GroupBy Map
Filter
Debounce Distinct Filter Take TakeLast
Combine
combineLatest merge zip
Handle Errors
catch retry
For the example task, we use concatenated observable to ping both Bing and Google. This will do a sequential firing up of observables:
func pingObservables() -> Observable { return Observable.concat(pingResource(urlPath: googleUrlPath), pingResource(urlPath: bingUrlPath)) }
We need ping hosts with 1 second period, log ping operations and send alarm message when the service is unavailable. Also, the errors should be caught along with execution and ping was restarted once the exception has occurred.
The magic of RxSwift lays in the ability to chain observables. Let’s see it in the example:
// Start a background queue let scheduler = ConcurrentDispatchQueueScheduler(qos: .background) // Start timer with 1 second interval Observable<Int>.interval(1.0, scheduler: scheduler) // Flattened the timer observable to the Observable .flatMap { [weak self] _ -> Observable in guard let self = self else { return .empty() } return self.pingObservables() } // Retry on unexpected errors .retryWhen({ [weak self] _ -> Observable in guard let self = self else { return .empty() } return self.pingObservables() }) // Log ping to hosts .do(onNext: { (pingData) in print("ping tick") pingData.debugPrint() }) // Filter out only events where the host is unavalable .filter { !$0.success } .subscribe(onNext: { (pingData) in print("The host is unvailable") pingData.debugPrint() // Send alarm notifications for subscribers }) .disposed(by: disposeBag)
Here we started the timer Observable.interval
, flattened the result to the PingData operation, made retry on error retryWhen
, filtered on failures and subscribed for the waited events. The chaining give us clear readability and made it look like a single use case.
Here are some tips while using RxSwift.
- Using guarding prevents the retain cycle leaks
guard let self = self else { return .empty() }
- The most common errors are memory leaks so preventing this by weakening self
[weak self]
or adding a variable to the capture list. It is better to avoid theunowned
references to self because some observable can outlive its holders.