Spatio-temporal Event Processing with StreamInsight, SQL Server Denali, and Bing Maps – Part 4

Over the last few posts, I’ve been trying to use SQL Server StreamInsight to analyse a set of data recording outbreaks of  H5N1 avian flu. The data set is spatio-temporal – it contains both the location of each outbreak and the date it occurred.

  • In part 1, I introduced the basics of Stream Insight and installed and tested the Stream Insight engine.
  • In part 2, I created a new input adaptor to stream data from a KML file and tested it by simply passing all the event data to a console window.
  • In part 3, I modified the query to use a custom aggregate function to form the convex hull around the location of all events that lay within a hopping window of 30 days, advancing a day at a time.

Having created the input adaptor to load data into SI, and the query to specify how that data should analysed, the next step is to address what should happen with the output. Currently, I’m just sending the output to a console window using the OutputTracer class from the codeplex StreamInsight samples package, but what I really wanted to do was create an animated map that showed the development of the outbreak over time….

Structuring the Output

Before doing anything more with the results, I decided I needed to structure the output a bit more, so I created a new class defining a summary of the area of the world affected by H5N1 at the end of each day:

public class AvianFluDailySummary
{
  public DateTime AsAtDate { get; set; }
  public string PolygonWKT { get; set; }
}

I then needed to adjust my LINQ query to populate this class with a summary of the events lying in each 24hr hopping window. I already calculated PolygonWKT – the convex hull of events occurring in the window in a custom UDA, so now I just needed to populate the AsAtDate – the timestamp at the end of each window for which that data was valid. I had thought that this would have been exposed quite simply as a property of each hopping window, but it doesn’t appear to be the case (at least, I couldn’t find it). What’s more, I couldn’t just use a UDA to calculate the MAX() starttime of any event falling into a window, since that wasn’t guaranteed to represent the true end of the window.

So, the only way I could find to access the time at the end of a hopping window from within the query was to create a different kind of UDA – a time-sensitive UDA. From http://technet.microsoft.com/en-us/library/ee842720.aspx (emphasis added):

  • Time-insensitive UDAs and UDOs do not expect to be passed whole events including their time stamps. Instead, they only consider a set of one or more payload fields from the events in the defined window. Also, the current window start and end time are not passed to them.
  • Time-sensitive UDAs and UDOs are passed a set of events for each window including their time stamps and the window start and end times.

So, if I created a new time-sensitive UDA I could use it to return the timestamp at the end of each hopping window. Note that I wouldn’t actually be doing any aggregation of the events in the window, which means I can’t believe this is the most efficient way to do this, but it seems to work for now. Here’s my CepTimeSensitiveAggregate to return the timestamp at the end of a given hopping window:

public class WindowEnd<AvianFluEventType> : CepTimeSensitiveAggregate<AvianFluEventType, DateTime>
{
  public override DateTime GenerateOutput(IEnumerable<IntervalEvent<AvianFluEventType>> events,
                      WindowDescriptor windowDescriptor)
  {
    return windowDescriptor.EndTime.UtcDateTime;
  }
}

And, as with my ConvexHull aggregate created before, I also needed to register an associated wrapper to enable the WindowEnd aggregate to be called from within the LINQ query:

[CepUserDefinedAggregate(typeof(WindowEnd<>))]
public static DateTime WinEnd<InputT>(this CepWindow<InputT> window)
{
  throw CepUtility.DoNotCall();
}

Then I changed my query to make use of the WinEnd() function to populate the “AsAtDate” for each daily summary, like this:

var query = from x in inputStream.HoppingWindow(TimeSpan.FromDays(30), TimeSpan.FromDays(1))
            select new AvianFluDailySummary
            {
              AsAtDate = x.WinEnd(),
              PolygonWKT = x.CHull(p => p.WKT)
            };

Sending the Output to a WCF Service and Drawing a Map

Now I’d got some structure to my output, I could send it to a different output adaptor. To get the results onto Bing Maps, I decided the easiest way would be to get StreamInsight to publish them to a webservice. The StreamInsight Samples codeplex project includes a model output data adaptor to send results to a WCF service that I shamelessly bastardised. Firstly, I created a listener interface that defined a ServiceContract for the fields in my daily summary:

[ServiceContract]
public interface IFeedbackListenerService
{
    [OperationContract]
    void ReceiveEvents(
      DateTime asAtDate,
      string convexPoly
    );
}

Then I created a new listener that would act upon events received through this interface. The listener would call a specified delegate method and pass it a set of parameters defined in the UpdateMapEventsArgs class in order to update a map with a polygon showing the area affected as at the specified time :

// Define delegate method to be called to update the map
public delegate void UpdateMapDelegate(object sender, UpdateMapEventArgs e);

// Define arguments to be passed to delegate method
public class UpdateMapEventArgs 
{ 
  public string PolygonWKT { get; set; }
  public DateTime AsAtDate { get; set; }

  public UpdateMapEventArgs(DateTime asAt, string polyWKT) 
  {
    AsAtDate = asAt;
    PolygonWKT = polyWKT;
  } 
}

// Define listener service
public class FeedbackListenerService : IFeedbackListenerService
{
  public static UpdateMapDelegate MapUpdater { get; set; }
  
  public void ReceiveEvents(string asAtDate, string polygonWKT)
  {
    try
    {
      UpdateMapDelegate handler = MapUpdater;
      if (handler != null)
      {
        handler(this, new UpdateMapEventArgs(asAtDate, polygonWKT));
      }
    }
    catch {
      // Do something here later
    }
  }
}

To display the map, I used v7 of the Bing Maps AJAX control hosted within a WebBrowser control, with a function to plot a polygon from supplied WKT as follows:

// Add an (initially empty) polygon to the map
var Polygon = new Microsoft.Maps.Polygon();
map.entities.push(Polygon);

// Redraw the polygon
function updatePolygon(WKT) {

  // Split the WKT into an array of coordinate pairs
  var coordArray = WKT.split(", ");
  
  // Populate a location array from the set of coordinates
  locationArray = new Array();
  for (var i = 0; i < coordArray.length; i++) {
    var coordpair = coordArray[i].toString();
    var coords = coordpair.split(" ");
    locationArray.push(new Microsoft.Maps.Location(coords[1], coords[0]));
  }

  // Update the polygon
  Polygon.setLocations(locationArray);
        
}

Then, I just needed to invoke this script from the UpdateMap method which would be the delegate method provided as a callback to my listener service:

public void UpdateMap(object sender, UpdateMapEventArgs args)
{
  Invoke((MethodInvoker) delegate
  {
    // Update a simple label to show what date we're looking at
    label1.Text = args.AsAtDate.ToString();

    // Call the javascript in the Bing Maps webbrowser to update the map
    this.webBrowser1.Document.InvokeScript("updatePolygon", new object[] { args.PolygonWKT });
  }
}

Finally, start the listening service:

FeedbackListenerService.MapUpdater = UpdateMap;
host = new ServiceHost(typeof(FeedbackListenerService));
host.Open();

With the new output adaptor in place and the listening service started, I started off my StreamInsight query to start streaming summaries of the events contained in each window (remember that my window contains events in the preceding 30 days, and hops forward by a day each time). These summaries would be published to a WCF service, where they would be picked up by the listener and sent as arguments to update the Bing Map control. Here’s some screenshots of the application in action:

imageimage

And, because the whole point of this was to create an animated map timeline, here’s a video:

There’s still quite a lot of tidying up I could do to my code (this was, after all, my first attempt and I took quite a lot of shortcuts), and I don’t think this application will exactly wow anyone in terms of making advanced use of StreamInsight’s functions, but hopefully it shows you that an alternative approach to analysing spatial data that also has a time-based element.

Perhaps if I find time I’ll revisit this topic again in the future and pick up where I left off. But, for now, enjoy!

This entry was posted in Bing Maps, Spatial, SQL Server and tagged , , . Bookmark the permalink.

2 Responses to Spatio-temporal Event Processing with StreamInsight, SQL Server Denali, and Bing Maps – Part 4

  1. Karlo Bartels says:

    Great series! This guy mapped people moving around Europe by recording location data from their iPhone: http://flowingdata.com/2011/07/18/iphone-fireflies-across-the-europe-sky/ Amazing animated timeline also! Maybe you could pull this off with a heat map in SQL Server Reporting Services?

    • alastaira says:

      That’s a beautiful animation – thanks for sharing. The data is so much more resolute than the set I was using – I might have to try and get hold of a copy and see what I can do with it!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s