Tag Archives: Reactive Extensions

Reactive Extensions

While testing little bit with the reactive extensions, I felt the same thing which we used to do with many lines of code can be done using the reactive extensions so easily just by subscribing. The basic principle used is “Observer Pattern”: some objects observe a certain event or we can say as some subscribed objects get notified when a certain event or action occurs. With the linq in built this was fascinating to work with.

As enumerator uses pull the observer uses push.

The entire library revolves around two interfaces IObserve and IObservable. It’s like any object that observes should implement IObserve interface and the objects that are going to emit should implement IObservable interface. Earlier IObserve and IObservable were implemented in different assembly, now they are in System.Core, so it’s global and very useful.

The general syntax will be like

var query = from n in Enumerable.Range(1, 5) select n;
foreach(var n in query)
     Console.WriteLine(n);

//call the method which writes "I'm Done" to the console

The same can be written in Rx

var query = from n in Enumerable.Range(1, 5) select n;
var observableQuery = query.ToObservable();
observableQuery.Subscribe(Console.WriteLine, () => {Console.WriteLine("I'm Done");});

If we look at the code here, we are converting an Range [1, 2, 3, 4, 5] into Observable. When the system access each item , that item is passed to the Console.WriteLine which is observer her and the next parameter is a callback.

There is lot more than this..
We can make the operation to be performed on one thread and dispatch using the other.. For time consuming operations this will be good

If we have a WPF application and in that we have a button “Send” and textbox results

	private string stringWait(string str)
		{
			Thread.Sleep(250);
			return str;
		}
		protected void btnSend_Click(Object sender, EventArgs e)
		{
			var query = (from n in Enumerable.Range(1, 25) select stringWait(n.ToString())).ToObservable(Scheduler.ThreadPool);
			query.ObserveOnDispatcher().Subscribe(
				(string x) => {	Results.AppendText(String.Format("{0}\n", x));}
			);
			
		}

In the above code the query is observed by the dispatcher whereas the the operation is performed in the thread pool. If we use the same thread to dispatch that will raise and clr exception. To resolve this concurrency issue we can use ObserveOnDispatcher method, which will write the data.

There are many methods in the Observable which are useful

var source = Observable.Generate(0, i=>i<5, i=>i+1, i=>i*2, i=>TimeSpan.FromSeconds(i));

The above code will generate the sequence with the timespan difference.

			IDisposable subscription = source.Subscribe(
				(int x) => {Console.WriteLine("Received {0} from source and the thread {1}", x, Thread.CurrentThread.ManagedThreadId);Thread.Sleep(1000);},
			                                            (Exception ex)=> {Console.WriteLine("Received error: {0}", ex.Message);},
			                                            () => {Console.WriteLine("End of subscription");});

The subscribe method creates a subscription which implements IDisposable, which will be disposed when the subscription is completed.

the “Subscribe” method will emit data for 3 occurences “onNext”, “onError”, “onCompleted”.
Subscribe(onNext, onError, onCompleted)

Any object can be subscribed to any particular event too.. just like an event handler to notify the objects of the event. The same thing can be handled with the Rx, rx won’t replace the existing event handler system but we can use it in multiple scenarios

With the current version you can subscibe to an event like the following

var lbl = new Label{Text="Hello There", AutoSize=true};
			var txt = new TextBox {Width=200};
			txt.TextChanged += (sender, e) => {lbl.Text = "text changed" + txt.Text;};
			var frm = new Form{
				Controls = {
					lbl,
					txt
				}
					};
			var events = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(h => h.Invoke, eh => frm.MouseMove += eh, eh => frm.MouseMove -= eh);
			using(events.Subscribe( evt => {
			                 	lbl.Text += Environment.NewLine + evt.EventArgs.Location.ToString();
			                 	Console.WriteLine("Current Location: {0} at {1}", evt.EventArgs.Location.ToString(), DateTime.Now.ToShortTimeString());
			                        }))
				
			{
			
			Application.Run(frm);
			}