Magic of RxSwift. Rx Operators

      No Comments on Magic of RxSwift. Rx Operators

RxSwift uses quite a lot of operators utilizing reactive patterns. It is a core part where its magic happens. The Clarke’s third law says:

Any sufficiently advanced technology is indistinguishable from magic.

In the previous article, we found out the basic object of the RxSwift, an Observable. The other significant part is operators where most of the magic happens.

In this article, we create an app which pings web hosts and when it finds out the unavailable one it sends over notifications.

Declaring PingData struct:

struct PingData {
    let host: URL
    let success: Bool
    let timeout: Double
    
    func debugPrint() {
        print("Ping host: \(host), success \(success), timeout \(timeout)")
    }
}

The main task of the application is to ping the host. We create an Observable with a type of Observable. It emits the value only once it receives the data.

For this task, we use either Observable or Single . The latter is the Observable emitting single element or error. It is a handy way to work with HTTP since there can be only success or failure and it works once. We use an Observable to show how it works with custom event processing.

func pingResource(urlPath: String) -> Observable {			
        return Observable.create { observer in
            guard let url = URL(string: urlPath) else {
                observer.onError(RxError.invalidUrl)
                return Disposables.create()
            }
            
            let startTime = CACurrentMediaTime()
            
            let request = URLRequest.init(url: url)
            let task = URLSession.shared.dataTask(with: request, completionHandler: { (data, response, error) in
                let endTime = CACurrentMediaTime()
                if let error = error {
                    if let httpResponse = response as? HTTPURLResponse {
                        observer.onError(RxError.errorHTTP(httpCode: httpResponse.statusCode))
                    } else {
                        observer.onError(error)
                    }
                }
                
                let elapsedTime = endTime - startTime
                
                // Here we send the ping status if no error occured
                if let httpResponse = response as? HTTPURLResponse {
                    let isPingOK = httpResponse.statusCode == 200 && data != nil
                    observer.onNext(PingData(host: url, success: isPingOK, timeout: elapsedTime))
                } else {
                    observer.onNext(PingData(host: url, success: false, timeout: elapsedTime))
                }
                
                // Completing the observable. Then it is going to be disposed
                observer.onCompleted()
            })
            
            task.resume()
            
            return Disposables.create()
        }
    }

Inside the observable, we start URLSessionDataTask. This is a handy way to wrap the async operations. You can argue that there is Operationprovided by Apple but it is chainable with dependencies only and doesn’t provide such flexibility as RxSwift.

To navigate in the world of Reactive programming we need Rx Marbles. It is a web site where we can see different observables in action and play with a sequence of observables.

CombineLates Marbles

Let’s go with more most commonly used operators combineLatest and zip.

combineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function

combineLatest emits result whenever one of the observables has emitted. It emits every time when one of the observables has emitted. It means that if ping Google or ping Google get an emitted value any number of time it would fire the subscription block.

Observable.combineLatest(pingResource(urlPath: googleUrlPath), pingResource(urlPath: bingUrlPath))
            .subscribe(onNext: { (googlePingData, bingPingData) in
                googlePingData.debugPrint()
                googlePingData.debugPrint()
            }, onError: { (error) in
                print("Ping error \(error)")
            })
            .disposed(by: disposeBag)

The same code but with Observable.zipwill differ from above.

Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

In this case, observables wait for both ping Google and ping Bing to be completed then emits zipped result altogether. It takes only observable output once zipped not repeating the subscription like combineLatest

Zip Marbles

There are different types of Observables. Some of them are:

Build Observables

Create Just Empty Interval Repeat

Transform

Buffer FlatMap GroupBy Map

Filter

Debounce Distinct Filter Take TakeLast

Combine

combineLatest merge zip

Handle Errors

catch retry

For the example task, we use concatenated observable to ping both Bing and Google. This will do a sequential firing up of observables:

func pingObservables() -> Observable {
    return Observable.concat(pingResource(urlPath: googleUrlPath), pingResource(urlPath: bingUrlPath))
}

We need ping hosts with 1 second period, log ping operations and send alarm message when the service is unavailable. Also, the errors should be caught along with execution and ping was restarted once the exception has occurred.

The magic of RxSwift lays in the ability to chain observables. Let’s see it in the example:

	
	// Start a background queue
	let scheduler = ConcurrentDispatchQueueScheduler(qos: .background)

	// Start timer with 1 second interval
	Observable<Int>.interval(1.0, scheduler: scheduler)
            // Flattened the timer observable to the Observable
            .flatMap { [weak self] _ -> Observable in
                guard let self = self else { return .empty() }
                return self.pingObservables()
            }
            // Retry on unexpected errors
            .retryWhen({ [weak self] _ -> Observable in
                guard let self = self else { return .empty() }
                return self.pingObservables()
            })
            // Log ping to hosts
            .do(onNext: { (pingData) in
                print("ping tick")
                pingData.debugPrint()
            })
            // Filter out only events where the host is unavalable
            .filter { !$0.success }
            .subscribe(onNext: { (pingData) in
                print("The host is unvailable")
                pingData.debugPrint()
            
                // Send alarm notifications for subscribers
            })
            .disposed(by: disposeBag)

Here we started the timer Observable.interval, flattened the result to the PingData operation, made retry on error retryWhen, filtered on failures and subscribed for the waited events. The chaining give us clear readability and made it look like a single use case.

Here are some tips while using RxSwift.

  • Using guarding prevents the retain cycle leaks guard let self = self else { return .empty() }
  • The most common errors are memory leaks so preventing this by weakening self [weak self] or adding a variable to the capture list. It is better to avoid the unowned references to self because some observable can outlive its holders.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.