XAML Playground
about XAML and other Amenities

Consume a Socket using Reactive Extension

2011-06-29T09:43:23+01:00 by Andrea Boschin

In my last post I wrote about writing a TCP server using the Reactive Extensions library. It was not a Silverlight related post, but it simply introduces the server side component that I will use in this post to show you how to use the same library to consume the incoming data and present it in Silverlight. The interesting part is to understand how rx can simplify the programming model when dealing with asynchronicity both in Silverlight and in server side full .NET programming.

The server I presented was a very simple socket listener that waits for connections and then, when the channel has been established, it push to the client a continuous stream of data that represent the current state of the CPU and memory of the server. With this huge amount of informations incoming we can create a little application that is able to show a chart and some gauges, updated almost in realtime.

The application, developed following the MVVM pattern, is made of a simple view containing the controls used to present the informations. These controls are feeded by a couple of properties in the ViewModel that are updated with the incoming data. So the most of the work is done by the ViewModel that is responsible of connecting to the socket and read the information stream to update the properties. In a real world solution probably you will have some kind of layer between the ViewModel and the socket, but for the sake of the post we will keep it simple as much as we need to understand how it works.

Consuming a socket in Silverlight means using an instance of the Socket class that represents the connection and a SocketAsyncEventArgs that is used when you call the methods of the connection to make the requests and receive responses. So, as an example, when you have to establish the connection you have to create the instance of the Socket class and the call the ConnectAsync method providing the SocketAsyncEventArgs initialized with the address of the endpoint to connect to. When the connection has been established the SocketAsyncEventArgs class will raise a Completed event that notify about the result of the operation. Doing it with Reactive Extensions mean something like this:

   1: protected void Connect()
   2: {
   3:     this.Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
   4:  
   5:     SocketAsyncEventArgs arguments = new SocketAsyncEventArgs();
   6:     arguments.RemoteEndPoint = new DnsEndPoint(Application.Current.Host.Source.DnsSafeHost, 4530);
   7:  
   8:     var socketAsObservable = from args in Observable.FromEvent<SocketAsyncEventArgs>(
   9:                                  ev => arguments.Completed += ev,
  10:                                  ev => arguments.Completed -= ev)
  11:                              select args.EventArgs;
  12:  
  13:     socketAsObservable
  14:         .Where(args => args.LastOperation == SocketAsyncOperation.Connect)
  15:         .Subscribe(
  16:         args =>
  17:         {
  18:             args.Dispose();
  19:             this.Receive();
  20:         });
  21:  
  22:     this.Socket.ConnectAsync(arguments);
  23: }

In the line #3 it is created the Socket with the common parameter that is used in Silverlight for a TCP channel. Then an instance of the SocketAsyncEventArgs is initialized and its property RemoteEndPoint is provided with and instance of DnsEndPoint that represente the address of the server to connect to. The port is 4530 like we defined in the previour article.

At this point it is created a stream, called socketAsObservable, from the Completed event of the SocketAsyncEventArgs. the stream filter the LastOperation to be "Connect", as we expect and in the Subscribe method disposes the SocketAsyncEventArgs instance and starts to receive data. The SocketAsyncEventArgs instance can be used only once so we have to carefully dispose it to avoid memory leaks.

Once the connection has successfully established you have to put your socket in Receive. When incoming data is detected you will be notified and you can fetch it from the SocketAsyncEventArgs and put again the socket in Receive. Here is how it appear with Reactive Extensions:

   1: protected void Receive()
   2: {
   3:     SocketAsyncEventArgs arguments = new SocketAsyncEventArgs();
   4:     arguments.SetBuffer(new byte[1024], 0, 1024);
   5:  
   6:     var socketAsObservable = from args in Observable.FromEvent<SocketAsyncEventArgs>(
   7:                                  ev => arguments.Completed += ev,
   8:                                  ev => arguments.Completed -= ev)
   9:                              select args.EventArgs;
  10:  
  11:     socketAsObservable
  12:         .Where(args => args.LastOperation == SocketAsyncOperation.Receive)
  13:         .Throttle(TimeSpan.FromMilliseconds(500))
  14:         .ObserveOnDispatcher()
  15:         .Subscribe(OnReceive);
  16:  
  17:     if (this.Socket.Connected)
  18:         this.Socket.ReceiveAsync(arguments);
  19: }

Once again we create the instance of the SocketAsyncEventArgs. It is now initialized with a SetBuffer that allocates a buffer of 1 KByte that is used by the socket to compy the incoming data. Then the socketAsObservable is created using the FromEvent; This method is important because it is in charge of attaching and detaching the Completed event so we avoid to have unwanted delegates around. Again the socketAsObservable is filtered selecting only when LastOperation equals to Receive and we also apply a Throttling of about 500 milliseconds. It is made to discart updated when they are too fast. One update every half a second suffice to say that the UI is up to date with the server.

Fially we marshal the stream to the UI Thread using the ObserveOnDispatcher and the received events are forwarded to the OnReceive method that is responsible of parsing the received data and updating the UI.

   1: protected void OnReceive(SocketAsyncEventArgs args)
   2: {
   3:     string data = Encoding.UTF8.GetString(args.Buffer, 0, args.BytesTransferred);
   4:  
   5:     IEnumerable<Sample> samples = this.GetSamples(ref data);
   6:  
   7:     Array.Clear(args.Buffer, 0, 1024);
   8:  
   9:     if (data.Length > 0)
  10:     {
  11:         byte[] bytes = Encoding.UTF8.GetBytes(data);
  12:         Array.Copy(bytes, args.Buffer, bytes.Length);
  13:         args.SetBuffer(bytes.Length, 1024 - bytes.Length);
  14:     }
  15:     else
  16:         args.SetBuffer(0, 1024);
  17:  
  18:     if (this.Socket.Connected)
  19:         this.Socket.ReceiveAsync(args);
  20:  
  21:     this.Update(samples);
  22: }

The method peek up the received data and then calls again the ReceiveAsync method of the connection to make again the channel ready to receive other informations. Then the Update method is called to update the properties binded to the view.

The code is simple and obviously it need some additional check, as an example it needs to verify the communication errors that here are swallowed. I hope it shows, once again, how reactive extension can help you to simplify the consumption of asynchronous streams.

A TCP Server with Reactive Extensions

2011-06-17T14:17:05+01:00 by codeblock

I know, from the title of this post you may understand that the topic I’m about to cover is not really Silverlight related. Please be patient and it will become clear that there is a relation with Silverlight in the next part when I will complete my exposition.

After a long pause, today I return to the Reactive Extensions, I’ve temporarily shelved for lack of time, and I want to present the implementation of a little server that uses a socket and takes advantage of the Extensions. A socket server usually listens to a TCP port and it can be easily consumed by a Silverlight application. This is not a usual scenario because often it is hard to get rid of firewalls and security people in a company, so the use of HTTP is way better in most of the cases. But, sometimes, tipically when you need a reactive communication that let the application to be always up to date with events on the server, the use of a Socket is an effective soluzione

For such people that are not aware of how a socket server works, a brief explanation. A regular socket server usually opens a port on the address it is binded. The port is represented by a number between 1024 and 65535 (numbers lower than 1024 are reserved) and it is expressed after the ip or dns name separated by a colon.

The server initially reserves the port and starts to listen for incoming connections. Until the arrive of a connection the server is substantially idle but when the connection is detected it has to "accept" the request and initialize the channel that will be used by both the parts to exchange the informations.

The acceptance phase of the server must handle the new connection like it is a completely new channel, so the problem is that the server will have a main channel where it accept connections and a number of channels, one for each client that has requested the connection. Usually this means that the server must spawn a new thread for each connection it accepts and the thread will separately handle its channel leaving the main thread free of returning to the listening phase.

Using Reactive Extension properly makes these operations very easy and the code required to handle the listening and acceptance is really compact. In my sample I use a Console Application that starts the listening ad finally remain available until someone hit enter and definitely closes the application. In the following box the main function:

   1: static CancellationTokenSource cts = new CancellationTokenSource();
   2:  
   3: /// <summary>
   4: /// Mains the specified args.
   5: /// </summary>
   6: /// <param name="args">The args.</param>
   7: static void Main(string[] args)
   8: {
   9:     if (IPAddress.Loopback != null)
  10:     {
  11:         TcpListener listener = new TcpListener(IPAddress.Loopback, 4530);
  12:         listener.Start();
  13:  
  14:         Console.WriteLine("Service is listening on {0} port {1}.", IPAddress.Loopback, 4530);
  15:         Console.WriteLine("Press enter to stop");
  16:  
  17:         Func<IObservable<TcpClient>> accept =
  18:             Observable.FromAsyncPattern<TcpClient>(listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient);
  19:  
  20:         accept()
  21:             .Where(o => o.Connected)
  22:             .ObserveOn(Scheduler.NewThread)
  23:             .Subscribe(acceptedClient => OnAccept(acceptedClient, accept));
  24:  
  25:         Console.ReadLine();
  26:         cts.Cancel();
  27:     }
  28: }

In the first lines the server binds to the port 4530 of the localhost ip (127.0.0.1) and it starts to listen using the Start method. Then using the FromAsyncPattern method it is created an observable source that will notify once a new connection is detected. The observable will give the channel ready to communicate. So, the Subcribe method forward the channel to the OnAccept method that is responsible to feed the channel with informations retrieved by the computer (in this case if gives cpu usage and memory size in realtime). Obviously this is the operation we should spawn in a separate thread and the trick here is done by the ObserveOn(Scheduler.NewThread) that automatically initialize and start the thread using the body of the OnAccept method. Let me continue with the OnAccept method:

   1: static void OnAccept(TcpClient client, Func<IObservable<TcpClient>> accept)
   2: {
   3:     accept()
   4:         .Where(o => o.Connected)
   5:         .ObserveOn(Scheduler.NewThread)
   6:         .Subscribe(acceptedClient => OnAccept(acceptedClient, accept));
   7:  
   8:     Console.WriteLine("channel opened");
   9:  
  10:     while (!cts.Token.WaitHandle.WaitOne(100))
  11:     {
  12:         try
  13:         {
  14:             if (client.Connected)
  15:             {
  16:                 byte[] message = GetMessage();
  17:                 client.Client.Send(message);
  18:             }
  19:         }
  20:         catch (Exception ex)
  21:         {
  22:             Console.WriteLine(ex.Message);
  23:             break;
  24:         }
  25:     }
  26:  
  27:     Console.WriteLine("channel closed");
  28:     client.Close();
  29: }

As you can see the OnAccept method receives the connected TcpClient side by side with the instance of the observable and in the very first lines puts the main thread again on listen, just before starting its work. This operation closes the loop because the listener will continue to wait for new connections and the new thread will send the informations along the channel.

To gracefully handle the closure of the server, once you hit the enter key a CancellationTokenSource is used to forward to alle the threads the request of exit. The method GetMessage simply reads the informations of cpu and memory and translates them to a byte array that is sent on the channel.

   1: const ulong Mega = 1048576;
   2: static PerformanceCounter cpuCounter = new PerformanceCounter("Processor", "% Processor Time", "_Total");
   3:  
   4: static byte[] GetMessage()
   5: {
   6:     ulong totalPhisicalMemory = 0;
   7:     ulong availablePhisicalMemory = 0;
   8:     ulong totalVirtualMemory = 0;
   9:     ulong availableVirtualMemory = 0;
  10:     float cpu = cpuCounter.NextValue();
  11:  
  12:     NativeMethods.MEMORYSTATUSEX memStatus = new NativeMethods.MEMORYSTATUSEX();
  13:  
  14:     if (NativeMethods.GlobalMemoryStatusEx(memStatus))
  15:     {
  16:         totalPhisicalMemory = memStatus.ulTotalPhys / Mega;
  17:         availablePhisicalMemory = memStatus.ulAvailPhys / Mega;
  18:         totalVirtualMemory = memStatus.ulTotalVirtual / Mega;
  19:         availableVirtualMemory = memStatus.ulAvailVirtual / Mega;
  20:     }
  21:  
  22:     return Encoding.UTF8.GetBytes(
  23:         string.Format(
  24:             CultureInfo.InvariantCulture,
  25:             "{0:0.0}:{1}:{2}:{3}:{4}{5}",
  26:             cpu,
  27:             totalPhisicalMemory,
  28:             availablePhisicalMemory,
  29:             totalVirtualMemory,
  30:             availableVirtualMemory,
  31:             Environment.NewLine));
  32: }

To have a try with the server please runs the code and then open a command prompt and type the following

telnet 127.0.0.1 4530
 

You will see a continuous stream of informations displayed on the console and you can also try to run multiple command prompt with the same instruction and all the connections will be connected. This example shows the powerfulness of the reactive extensions. In the next post I will show you how to use the library to consume this socket.

The Reactive Snake for Windows Phone 7

2011-05-04T22:57:43+01:00 by codeblock

CaptureContinuing my experiments with Reactive Extension I put together a small example that show an alternative use of the library to create a little game. The game is the widely known Snake that many of us have played on his own Nokia when it is the sole game in the ancient models. Please take note that the game I wrote is not complete since it is first of all an example of the use of a technology and how it can simplify the development of a slightly complex logic like the one that is behind this game.

The core of the game is based on two separate uses of the Reactive Extensions. From one side I have a main loop that is based on a frequent timeout that in a complete version of the game could be used to change the game speed. In this version it is set to 75 milliseconds. Every time the timeout is elapsed I move forward the Snake of one position saved in two variables XDirection and YDirection. This variable can assume the values of 1 or -1 if the snake is moving along the direction and 0 if the snake is moving along the opposite direction.

Using this variables the GenerateWithTime method of the RX generate a stream of points that are the positions where the Snake have passed during its move. The length of the snake is determined by the TrailWhile method that evaluate the length using a third variable that is incremented every time the Snake eats a gem.

The other use of the RX is to handle the Manipulation events that are used to detect if the user touches the screen basing on this touches the XDirection and YDirection are updated to reflect the touch. As an example if when the snake is moving along the y axis like the screenshot the touch on the right side of the screen make change the direction to right and the touch on the left to left.

Here is the code of the main game loop:

Observable.GenerateWithTime(
    new Point(0, 0),
    p => this.IsRunning,
    p => p,
    p => TimeSpan.FromMilliseconds(75),
    p => this.NewPoint(p))
    .ObserveOnDispatcher()
    .TrailWhile(o => o > this.Length)
    .Subscribe(this.EvaluateAndDrawSnake);
 
private Point NewPoint(Point point)
{
    double x = point.X + XDirection;
    double y = point.Y + YDirection;
 
    if (x < 0) x = 22;
    if (x > 22) x = 0;
    if (y < 0) y = 36;
    if (y > 36) y = 0;
 
    return new Point(x, y);
}

The method EvaluateAndDraw is the part of the code that check when the snake eats itself to end the game and then redraw the snake on the screen. Here is the management of the screen:

Observable.FromEvent<ManipulationStartedEventArgs>(
    ev => this.ManipulationStarted += ev,
    ev => this.ManipulationStarted -= ev)
    .Select(
    o =>
    {
        return new
        { 
            X = Math.Sign(o.EventArgs.ManipulationOrigin.X - 240), 
            Y = Math.Sign(o.EventArgs.ManipulationOrigin.Y - 400) 
        };
    })
    .Subscribe(
    o =>
    {
        if (this.XDirection != 0 && this.YDirection == 0)
        {
            this.XDirection = 0;
            this.YDirection = o.Y != 0 ? o.Y : this.YDirection;
        }
        else if (this.XDirection == 0 && this.YDirection != 0)
        {
            this.XDirection = o.X != 0 ? o.X : this.XDirection;
            this.YDirection = 0;
        }
    });

The following link lets you download the complete source of the example. My highest score is 510. Have a good game.

Download: SLPG.ReactiveSnake.zip (140kb)

Taking advantage of combining different streams using Reactive Extension's:

2011-04-04T16:18:45+01:00 by Andrea Boschin

One interesting feature of Reactive Extensions is the combining of Observables. There are a number of extension methods made to combine two or more streams; Amb, Concat, SelectMany, Merge, Zip, CombineLatest, all these methods are made to take, multiple and non necessarily omogeneous, streams and combine them in a unique resulting stream based on different rules. Here a brief resume of the rules applied from every method

Amb returns the stream that start providing values by first
Concat chain two streams one to the end of the other and create a single output
SelectMany Returns every value from the second stream for each value from the first
Merge returns the two streams merged basing of when each source returns its value
Zip returns values from one stream paired with values from another, only when a couple is available.
CombineLatest Combine two streams returning always the latest from both, for each occurrence of a value.

 

Understanding the meaning of each method is really difficult if you do not make some simple experiment, but this is out of the scope of this post. Now I would like to show the meaning of "combining two streams" with a real example that show how a little change in the previous mouse trail example can give a great difference in terms of functions.

In my last example I used the TrailWithCount and TrailWithTime methods to catch MouseMove events to draw a polyline on the plugin that il the trail of the mouse pointer. In the example you have seen the trail shortens while the mouse was moving but when the mouse stops also the trail stops. Now I want to change the example to let the trail shorten to zero length when the mouse stops moving.

To achieve this result I need to have a timer that continues to record the current position for the mouse when it stops. For this purpose I can use the static method Obsevable.Interval(timespan) that is able to generate a stream of events at the given interval of time. Here is the two lines I have to add to the previous example.

   1: Observable.FromEvent<MouseEventHandler, MouseEventArgs>(
   2:     ev => new MouseEventHandler(ev),
   3:     ev => this.LayoutRoot.MouseMove += ev,
   4:     ev => this.LayoutRoot.MouseMove -= ev)
   5:     .CombineLatest(Observable.Interval(TimeSpan.FromMilliseconds(25)), (a, b) => a)
   6:     .ObserveOnDispatcher()
   7:     .Select(o => o.EventArgs.GetPosition(this.LayoutRoot))
   8:     .TrailWithCount(100)
   9:     .Subscribe(DrawLine);

the CombineLatest method gets events from the mouse and from the time interval (this regularly every 25 milliseconds) and using the selector lambda it takes from the combined stream only the mouse events. Every time an interval expire the combined value will contain an integer (the number of intervals from the start) and the latest mouse event. When the mouse stops moving the stream from the MouseMove also stops to provide values but the CombineLatest method will replicate the latest value taken for every interval elapsed. So the trail will be feeded with the latest position and the visual effect will be the shorten of the trail until it reaches the current position of the mouse pointer.

The ObserveOnDispatched method is required because the Observable.Interval method implies the use of a separate thread so when we get the values we are in this thread and we need to marshall back to the UI thread. This method does the trick. Here is the result:

Get Microsoft Silverlight

TrailWithcount and TrailWithTime: the regular way but with performance impact

2011-04-01T14:19:47+01:00 by Andrea Boschin

Today @saldoukhov pointed me to an alternative (and possibly regular) way to implement TrailWithCount, different from the version I introduced yesterday in my latest post. This tecnique invole the use of a bunch of methods from the Reactive Extensions and, at first sight may result more simple and straightforward. Also if @saldoukhov pointed to the sole count version it is also possibile to write a TrailWithTime this way. Here is the new implementation of these methods:

   1: // BE AWARE THAT THESE METHODS HAVE HUGE PERFORMANCE IMPACT
   2:  
   3: public static IObservable<IEnumerable<T>> TrailWithCount<T>(this IObservable<T> observable, int count)
   4: {
   5:     return observable.Scan(
   6:         Enumerable.Empty<T>(),
   7:         (a, b) => a.StartWith(b).Take(count));
   8: }
   9:  
  10: public static IObservable<IEnumerable<T>> TrailWithTime<T>(this IObservable<T> observable, TimeSpan timeSpan)
  11: {
  12:     return observable
  13:         .Select(o => new Timestamped<T>(o, DateTime.Now))
  14:         .Scan(
  15:             Enumerable.Empty<Timestamped<T>>(), (a, b) =>
  16:                 a.StartWith(b).TakeWhile(o => (DateTime.Now - o.Timestamp).TotalMilliseconds <= timeSpan.TotalMilliseconds))
  17:         .Select(k => k.Select(o3 => o3.Value));
  18: }

Both the methods work the same way with the sole difference that the TrailWithTime wraps the Point in a Timestamped instance and then unwraps it just before returning to the caller. Here the Scan method is an aggregation function that forward every occurrence from the stream and populate a resulting aggregation. So the StartWith method let the aggregation grow and then the Take method extract the "counted" instances we really need.

The problem here is that if you try to run this method on a very large number of items (e.g. 1000) you will see a huge performance impact that leave the start of the trail far from the mouse pointer. The effect is so far more evident if you try to use the TrailWithTime with a long timeout (e.g. 5/10 sec).

As far as I understood these methods have an huge payload in terms of iterations and of garbage they produce. Every time you get an event on the stream a new instance of the array is created and it is crawled to create the result. The TrailWithTime has the worst performances because it produces a great number of Timestamped<> instances. I'm not aware of the inner working of the Scan, StartWith and Take methods but the resulting effect is really clear

If you try to run my previous version with a similar interval you will see a very tiny performance impact. Internally I use a queue and the sole payload are the collection of the Dequeued items (but they would have been collected the same if there is not the trail) and the iteration along the resulting collection to create a PointCollection. This is an interesting demonstration of how the use of LINQ may affect drammatically the performances of an application and I suggest you to double check when you use them.

By the way, thanks to @saldoukhov for his suggestion that I had not considered when I wrote for the first time my Trail methods.

Writing TrailWithCount and TrailWithTime methods with Reactive Extensions (and incidentally drawing a mouse trail)

2011-04-01T00:05:38+01:00 by codeblock

I have to confess, the more I play with reactive extensions the more I enjoy the result I get and appreciate their power. With this post I would like to show how it is possible to  take advantage of these extensions to easily obtain something that normally requires a complex code. The example is really simple and probably not so useful by itself but there are lot of uses I can imagine for the trail functions I will show in a few.

Using the Reactive Extensions make very simple to collect and manipulate the events coming from a source, like you are querying a database. The library itself define a number of methods that operate this way. The buffering functions for instance take a stream of events and collect them in groups based on count or timeout. BufferWithCount gives chunks of counted events and BufferWithTime collect the events generated in a specific timeout.  For example, given this sequence:

1,2,3,4,5,6,7,8,9

TrailWithCount(3) returns

1,2,3 - 4,5,6 - 7,8,9

In the following code I show an extension method that is able to collect the events creating a Trail based on count. I called it TrailWithCount; Given the previous collection the result will be

1,2,3 - 2,3,4 - 3,4,5 - 4,5,6 - etc...

   1: public static IObservable<IEnumerable<T>> TrailWithCount<T>(this IObservable<T> observable, int count)
   2: {
   3:     Queue<T> queue = new Queue<T>();
   4:  
   5:     return observable.Select(
   6:         o =>
   7:         {
   8:             queue.Enqueue(o);
   9:  
  10:             while (queue.Count > count) queue.Dequeue();
  11:             return queue.ToArray();
  12:         });
  13: }

This method take in input an IObservable<T> and returns an IObservable<IEnumerable<T>>. This is very close to the BufferWithCount method that returns an IObservable<IList<T>>. I preferred the use of IEnumerable<T> instead of IList<T> because I do not need the Insert and Remove methods of this interface. In the first row of the method I create a Queue<T>. It is important to remember that this collection will be created once the first time the method is called and thanks to the usage of lambda expression it will be alive and available to the body of the Select method for each event we get from the stream.

So, every time we get an event, in the Select method I add it to the Queue and then I remove the ones that exceed the count. Finally I select the items in the queue a result of the lambda expression so they are forwarded to the next extension method in the chain.

The same way, but taking advantage od the Timestamped<T> class I can operate to collect the trail events by time. The Timestamped<T> class is useful to add a timestamp (the moment when the event was received) to the item I add to the queue. So instead of removing items from the queue when they exceed a count I compare the timestamp with the current time and I remove the ones that are outside the defined timeout. Here is the method:

   1: public static IObservable<IEnumerable<T>> TrailWithTime<T>(this IObservable<T> observable, TimeSpan timeSpan)
   2: {
   3:     Queue<Timestamped<T>> queue = new Queue<Timestamped<T>>();
   4:  
   5:     return observable.Select(
   6:         o =>
   7:         {
   8:             DateTime now = DateTime.Now;
   9:  
  10:             queue.Enqueue(new Timestamped<T>(o, now));
  11:  
  12:             while (now - queue.Peek().Timestamp > timeSpan)
  13:                 queue.Dequeue();
  14:  
  15:             return queue.Select(v => v.Value).ToArray();
  16:         });
  17: }

Ok, now that we got the Trail method it is time to use them. To visualize the resulting trail I will catch the MouseMove events and collect the resulting points to draw the trail on the plugin area. So first of all I attach the MouseMove using the Observable.FromEvent then I transform the output events to a stream of Point calling GetPosition for every EventArgs. Finally I call the TrailWithCount (or TrailWithTime) method just before subscribing to the stream.

   1: Observable.FromEvent<MouseEventHandler, MouseEventArgs>(
   2:     ev => new MouseEventHandler(ev),
   3:     ev => this.LayoutRoot.MouseMove += ev,
   4:     ev => this.LayoutRoot.MouseMove -= ev)
   5:     .Select(o => o.EventArgs.GetPosition(this.LayoutRoot))
   6:     .TrailWithTime(TimeSpan.FromSeconds(3))
   7:     .Subscribe(DrawLine);

The Subscribe method will call a Draw method that simply assign the points of the trail to a Polyline. This will show a trail that grow to the required length then is shortened at the end when the mouse moves. Here you can try by yourself moving the mouse on the following instance of the plugin:

Get Microsoft Silverlight

In the attached code you can download you will find the methods I discussed above and an additional TrailWithTimeAndCount method that have an hybrid behavior that combines both the other methods. To make the download working you have to download the Reactive Extensions from Nuget and add a reference to them in the project.

Download: http://www.silverlightplayground.org/assets/sources/SLPG.Trailing.Silverlight.zip (17 KB)

Handling errors when calling network with Observable.FromEvent

2011-03-30T16:11:10+01:00 by Andrea Boschin

In my I've show a new way to write methods to call network resources like feeds, WCF methods and so on. I've proposed two solution that take advantage of Reactive Extension to call operations that follow the Begin-End pattern and also operation that use an event driven pattern. Today I would like to make another step forward to make the event driven side of the problem more reliable. Most of the times when you call the network you deal with this pattern instead of the Begin-End. Calling WCF and WebClient follow this pattern and we need to accurately handle the errors when we get the result from the calls.

The problem is that when you call a network resource this way you eventually get exceptions as a value of a property of the result instead of having them thrown as you may expect. This is obviously understandable due to the asynchronous nature of the calls. Also if the runtime will throw exceptions you would not be able to catch them because they are generated in another thread so the sole way to get them is that the runtime catch every exception and forward it using the Error property of the result. So here is how you commony handle the errors in network calls:

   1: public void client_DownloadStringCompleted(object sender, DownloadStringCompletedEventArgs e)
   2: {
   3:     if (e.Error != null)
   4:     {
   5:         // there is an exception
   6:         MessageBox.Show(e.Error.Message);
   7:     }
   8:     else
   9:     {
  10:         // no error occured: do what you want with the result
  11:     }
  12: }

Using my Reactive Extensions way to the asynchronicity this issue may be a big problem because, since the Subscribe method is able to catch exceptions along the execution of the event, it does not understand about the Error resulting from an asynchronous network call. In my last example I showed this code:

   1: IObservable<SyndicationFeed> result = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
   2:     ev => new DownloadStringCompletedEventHandler(ev),
   3:     ev => client.DownloadStringCompleted += ev,
   4:     ev => client.DownloadStringCompleted -= ev)
   5:     .Select(o => o.EventArgs.Result)
   6:     .ConvertToFeed();

Unfortunately it works perfectly when the network call returns a valid response but it is unable to catch an exception if something goes wrong. To solve this problem I wrote an extension method that can collect possible exceptions and rethrow them. The trick is made using a base class named "AsyncCompletedEventArgs" that is used by every EventArgs returned by a network call. Having this common base class is key to write a single method to handle a lot of cases. Here is the code from the ThrowIfError() method:

   1: public static IObservable<IEvent<T>> ThrowIfError<T>(this IObservable<IEvent<T>> observable)
   2:     where T : AsyncCompletedEventArgs
   3: {
   4:     return observable.SelectMany(
   5:         o =>
   6:         {
   7:             if (o.EventArgs.Error != null)
   8:                 throw o.EventArgs.Error;
   9:  
  10:             return Observable.Return(o);
  11:         });
  12: }

The method is able to intercept and IObservable<IEvent<T>> that is the interface resulting from Observable.FromEvent() method. Once it receive the interface it examine the content and if it detects an error simple throw it. This make the runtime catch the exception and forward it to the OnError method of the Observer<T> that is the second parameter of a Subscribe method. If there is not any error the method simply returns the observable itself so the chain of method can be continued seamless. I use the SelectMany method to flatten the results from IObservable<IObservable<IEvent<T>>> (it's not a joke) to IObservable<IEvent<T>>. So now we can use the method everywhere we have to place a network call:

   1: public static IObservable<SyndicationFeed> DownloadFeedWebClient(Uri uri)
   2: {
   3:     WebClient client = new WebClient();
   4:  
   5:     IObservable<SyndicationFeed> result = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
   6:         ev => new DownloadStringCompletedEventHandler(ev),
   7:         ev => client.DownloadStringCompleted += ev,
   8:         ev => client.DownloadStringCompleted -= ev)
   9:         .ThrowIfError()
  10:         .Select(o => o.EventArgs.Result)
  11:         .ConvertToFeed();
  12:  
  13:     client.DownloadStringAsync(uri);
  14:  
  15:     return result;
  16: }

This does not change anything to the way you can call the DownloadFeedWebClient method because it returns always an IObservable<T> so you can simply Subscribe to it and specify and handler for the OnError part of the Observer<T>. To view an example please visit my .

Using a new pattern to call the network with Reactive Extensions

2011-03-30T01:17:20+01:00 by codeblock

Recently I’m trying the Reactive Extensions, an interesting set of extensions that aim to improve the handling of asynchronous programming with an innovative implementation of the observer pattern. These extensions provide some useful methods that solve in an elegant way some recurring problems that everyone have found for sure when calling the network. So, after lot of attempts I found a pattern I will start to use implementing the Data Access facade in my Silverlight applications.

Before discovering the Reactive Extensions (starting from here I will refer to them using the abbreviation RX), I was habit to use a widely adopted pattern that exposes two additional parameters to every method of the Data Access class. For instance, a method made to download a feed from the network may have the following signature and body:

   1: public static void DownloadFeed(Uri uri, Action<SyndicationFeed> success, Action<Exception> fail)
   2: {
   3:     try
   4:     {
   5:         HttpWebRequest request = HttpWebRequest.CreateHttp(uri);
   6:  
   7:         request.BeginGetResponse(
   8:             asyncResult =>
   9:             {
  10:                 try
  11:                 {
  12:                     WebResponse response = request.EndGetResponse(asyncResult);
  13:  
  14:                     using (StreamReader sReader = new StreamReader(response.GetResponseStream()))
  15:                     {
  16:                         using (StringReader reader = new StringReader(sReader.ReadToEnd()))
  17:                         {
  18:                             using (XmlReader xReader = XmlReader.Create(reader))
  19:                             {
  20:                                 Deployment.Current.Dispatcher.BeginInvoke(
  21:                                     () => success(SyndicationFeed.Load(xReader)));
  22:                             }
  23:                         }
  24:                     }
  25:                 }
  26:                 catch (Exception ex)
  27:                 {
  28:                     fail(ex);
  29:                 }
  30:             }, null);
  31:     }
  32:     catch (Exception ex)
  33:     {
  34:         fail(ex);
  35:     }
  36: }

Using the HttpWebRequest made the code more complicated but the WebClient does not makes significant changes to the way the network call works. This pattern is interesting because using some lambda expression the resulting code remain compact. The drawback is that every method has two more parameters so the code is hardest to be understood.

The RX framework contains two methods that are useful to handle a network connection; Observable.FromAsyncPattern is really useful if you want to use an HttpWebRequest because it can automatically handle the Begin and End parts of the asynchronous pattern. On the other side, Observable.FromEvent can attach an event and gracefully detach when it is not reqired anymore; it is the case of the WebClient class tha expose a DownloadStringCompleted event. So we start handling the async pattern of an HttpWebRequest:

   1: public static IObservable<SyndicationFeed> DownloadFeedWebRequest(Uri uri)
   2: {
   3:     HttpWebRequest request = HttpWebRequest.CreateHttp(uri);
   4:  
   5:     return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)
   6:         .Invoke()
   7:         .ReadToEnd()
   8:         .ConvertToFeed()
   9:         .ObserveOnDispatcher();
  10: }

These few lines does the trick: The Observable.FromAsyncPattern method encapsulates BeginGetResponse and EndGetResponse. The resulting object is a synchronous representation of the async pattern. The Invoke method call the service then two methods I wrote read the resulting stream and convert the string to a syndicationfeed. Finally, with the ObserveOnDispatcher method I marshal the thread to the user interface using the dispatcher.

The return value is an IObservable<SyndicationFeed> instance that enable the caller to listen for the completion of the operation. The way the method is called is very similar to my previous patter but the method now has only the needed parameters:

   1: DataSource.DownloadFeedWebRequest(new Uri("http://api.twitter.com/1/statuses/public_timeline.atom"))
   2:     .Subscribe(
   3:         r =>
   4:         {
   5:             feed.ItemsSource = r.Items;
   6:         }, HandleExceptions);
The Subscribe method I use here exposes two callbacks, one provides the result of the method and the other is called when an exception occur. If you prefer to use the WebClient you can use the Observable.FromEvent method the complete the same operation.

   1: public static IObservable<SyndicationFeed> DownloadFeedWebClient(Uri uri)
   2: {
   3:     WebClient client = new WebClient();
   4:  
   5:     IObservable<SyndicationFeed> result = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
   6:         ev => new DownloadStringCompletedEventHandler(ev),
   7:         ev => client.DownloadStringCompleted += ev,
   8:         ev => client.DownloadStringCompleted -= ev)
   9:         .Select(o => o.EventArgs.Result)
  10:         .ConvertToFeed();
  11:  
  12:     client.DownloadStringAsync(uri);
  13:  
  14:     return result;
  15: }

In the method we have to specify the attach and detach actions. These action will be called by the RX when the Subscribe method is complete; Here we do not need to call the ReadToEnd method because the WebClient returns directly a string and also we do not have to marshal to the UI Thread because the WebClient already does the trick. The select method convert the resulting EventArgs to the value expected from the ConvertToFeed.

I’m very intersted in your opinion on this example. My feel is that the code is more reusable, and compact, but if someone have a suggestion to improve the method pleare