In my last post I wrote about writing a TCP server using the Reactive Extensions library. It was not a Silverlight related post, but it simply introduces the server side component that I will use in this post to show you how to use the same library to consume the incoming data and present it in Silverlight. The interesting part is to understand how rx can simplify the programming model when dealing with asynchronicity both in Silverlight and in server side full .NET programming.
The server I presented was a very simple socket listener that waits for connections and then, when the channel has been established, it push to the client a continuous stream of data that represent the current state of the CPU and memory of the server. With this huge amount of informations incoming we can create a little application that is able to show a chart and some gauges, updated almost in realtime.
The application, developed following the MVVM pattern, is made of a simple view containing the controls used to present the informations. These controls are feeded by a couple of properties in the ViewModel that are updated with the incoming data. So the most of the work is done by the ViewModel that is responsible of connecting to the socket and read the information stream to update the properties. In a real world solution probably you will have some kind of layer between the ViewModel and the socket, but for the sake of the post we will keep it simple as much as we need to understand how it works.
Consuming a socket in Silverlight means using an instance of the Socket class that represents the connection and a SocketAsyncEventArgs that is used when you call the methods of the connection to make the requests and receive responses. So, as an example, when you have to establish the connection you have to create the instance of the Socket class and the call the ConnectAsync method providing the SocketAsyncEventArgs initialized with the address of the endpoint to connect to. When the connection has been established the SocketAsyncEventArgs class will raise a Completed event that notify about the result of the operation. Doing it with Reactive Extensions mean something like this:
1: protected void Connect()
2: {
3: this.Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
4:
5: SocketAsyncEventArgs arguments = new SocketAsyncEventArgs();
6: arguments.RemoteEndPoint = new DnsEndPoint(Application.Current.Host.Source.DnsSafeHost, 4530);
7:
8: var socketAsObservable = from args in Observable.FromEvent<SocketAsyncEventArgs>(
9: ev => arguments.Completed += ev,
10: ev => arguments.Completed -= ev)
11: select args.EventArgs;
12:
13: socketAsObservable
14: .Where(args => args.LastOperation == SocketAsyncOperation.Connect)
15: .Subscribe(
16: args =>
17: {
18: args.Dispose();
19: this.Receive();
20: });
21:
22: this.Socket.ConnectAsync(arguments);
23: }
In the line #3 it is created the Socket with the common parameter that is used in Silverlight for a TCP channel. Then an instance of the SocketAsyncEventArgs is initialized and its property RemoteEndPoint is provided with and instance of DnsEndPoint that represente the address of the server to connect to. The port is 4530 like we defined in the previour article.
At this point it is created a stream, called socketAsObservable, from the Completed event of the SocketAsyncEventArgs. the stream filter the LastOperation to be "Connect", as we expect and in the Subscribe method disposes the SocketAsyncEventArgs instance and starts to receive data. The SocketAsyncEventArgs instance can be used only once so we have to carefully dispose it to avoid memory leaks.
Once the connection has successfully established you have to put your socket in Receive. When incoming data is detected you will be notified and you can fetch it from the SocketAsyncEventArgs and put again the socket in Receive. Here is how it appear with Reactive Extensions:
1: protected void Receive()
2: {
3: SocketAsyncEventArgs arguments = new SocketAsyncEventArgs();
4: arguments.SetBuffer(new byte[1024], 0, 1024);
5:
6: var socketAsObservable = from args in Observable.FromEvent<SocketAsyncEventArgs>(
7: ev => arguments.Completed += ev,
8: ev => arguments.Completed -= ev)
9: select args.EventArgs;
10:
11: socketAsObservable
12: .Where(args => args.LastOperation == SocketAsyncOperation.Receive)
13: .Throttle(TimeSpan.FromMilliseconds(500))
14: .ObserveOnDispatcher()
15: .Subscribe(OnReceive);
16:
17: if (this.Socket.Connected)
18: this.Socket.ReceiveAsync(arguments);
19: }
Once again we create the instance of the SocketAsyncEventArgs. It is now initialized with a SetBuffer that allocates a buffer of 1 KByte that is used by the socket to compy the incoming data. Then the socketAsObservable is created using the FromEvent; This method is important because it is in charge of attaching and detaching the Completed event so we avoid to have unwanted delegates around. Again the socketAsObservable is filtered selecting only when LastOperation equals to Receive and we also apply a Throttling of about 500 milliseconds. It is made to discart updated when they are too fast. One update every half a second suffice to say that the UI is up to date with the server.
Fially we marshal the stream to the UI Thread using the ObserveOnDispatcher and the received events are forwarded to the OnReceive method that is responsible of parsing the received data and updating the UI.
1: protected void OnReceive(SocketAsyncEventArgs args)
2: {
3: string data = Encoding.UTF8.GetString(args.Buffer, 0, args.BytesTransferred);
4:
5: IEnumerable<Sample> samples = this.GetSamples(ref data);
6:
7: Array.Clear(args.Buffer, 0, 1024);
8:
9: if (data.Length > 0)
10: {
11: byte[] bytes = Encoding.UTF8.GetBytes(data);
12: Array.Copy(bytes, args.Buffer, bytes.Length);
13: args.SetBuffer(bytes.Length, 1024 - bytes.Length);
14: }
15: else
16: args.SetBuffer(0, 1024);
17:
18: if (this.Socket.Connected)
19: this.Socket.ReceiveAsync(args);
20:
21: this.Update(samples);
22: }
The method peek up the received data and then calls again the ReceiveAsync method of the connection to make again the channel ready to receive other informations. Then the Update method is called to update the properties binded to the view.
The code is simple and obviously it need some additional check, as an example it needs to verify the communication errors that here are swallowed. I hope it shows, once again, how reactive extension can help you to simplify the consumption of asynchronous streams.