Spatio-temporal Event Processing with StreamInsight, SQL Server Denali, and Bing Maps – Part 3

At the end of my last post, I’d managed to stream some data representing cases of the H5N1 virus from a KML file into the StreamInsight engine, and streamed the results out again to a console window. Not particularly exciting, I admit, but hopefully it will provide the foundation necessary to create some more interesting spatio-temporal analysis. So, the next step was to try to come up with a better query to analyse the data as it streamed through the engine.

Thinking about the data I was using, I tried to think of a typical way in which you’d want to query the information relating to the outbreak of an infectious disease. Here’s what I came up with – let’s suppose that the disease is airborne and spread by droplets. In which case, you can define the geographic extent of the area that has been affected by creating the convex hull around all the locations that have had reported incidents (if you’re in a location and places to the west, east, south, and north of you have all had confirmed cases, I’d say you lie in an infected area even if you haven’t directly had any cases at your location). What’s more, let’s say that, after an outbreak has been confirmed at a location, it takes 30 days before that area can be declared “all clear” and free of the infection.

Bear in mind that I’m not an epidemiologist and I have no idea if these assumptions are anything like accurate, but they’ll work for me in this example. So, in order to define (and plot on a map) the areas of the world affected by the disease at any point in time, I’d want to determine all those points that lay within the convex hull formed from the locations of any incidents that had occurred in the preceding 30 days. This seemed like a reasonable case for a spatio-temporal query, so let’s just pass over the medical accuracy or not of the assumptions…

Defining the Hopping Window

The basic query I’d set up last time just selected all the events from the input stream, as follows:

var query = from e in inputStream
            select e;

This time, rather than just select each individual event, I wanted to create a hopping window – a window that selects only those events that occur within a particular, sliding timeframe. Then I’d want to perform an aggregate of the payload fields of the events contained within that window.

I’d already decided that the extent of my window would cover a period of 30 days (to capture all those occurrences that were still contagious at any given point in time), and I wanted to examine the cases that fell within that window on a day-by-day basis – i.e. my window would “hop” forward by one day each time. There was no point defining a hop size of less than a day, since the granularity of my input data was only recorded at date level.

Both the window size and the hop size are defined as timespan parameters to a HoppingWindow acting on an input stream. So, I changed the first part of my query to look like this:

var query = from e in inputStream.HoppingWindow(
                        TimeSpan.FromDays(30), // Window size
                        TimeSpan.FromDays(1))  // Hop size
            ... // Do something with the events that fall in this window

Next, I needed to create the aggregate function to act upon those events in the window.

Creating a Custom StreamInsight Aggregate function

To create the convex hull of the set of points contained within a particular window, I needed to create an aggregate function that derives from the CepAggregate base class. There’s an MSDN article at http://msdn.microsoft.com/en-us/library/ee842720.aspx that gives an example, although it fails to mention where you’d actually find that class (it’s in Microsoft.ComplexEventProcessing.Extensibility, incidentally). In my case, I wanted a function that would act upon an input set of strings (remember that my payload contained the location of each outbreak as a WKT Point), and return a string (the WKT of a Polygon created from the convex hull of those points). Here’s the function I created:

  public class ConvexHull : CepAggregate<string, string>
  {
    public override string GenerateOutput(IEnumerable<string> eventData)
    {
      // First, create a MultiPoint of all Points in the current window
      var gb = new SqlGeographyBuilder();
      gb.SetSrid(4326);
      gb.BeginGeography(OpenGisGeographyType.MultiPoint);
      foreach (var d in eventData)
      {
        // Create a geography instance from the WKT of each event
        SqlGeography point = SqlGeography.Parse(d);
        gb.BeginGeography(OpenGisGeometryType.Point);
        gb.BeginFigure((double)point.STX, (double)point.STY);
        gb.EndFigure();
        gb.EndGeography();
      }
      gb.EndGeography();

      // Now, create the Convex Hull of that MultiPoint
      SqlGeography convexhull;
      convexhull = gb.ConstructedGeography.STConvexHull();
      // Return the WKT of the Convex Hull
      return convexhull.ToString();
    }
  }

Notice that, as with the location data in the payload of each event, I’m using the WKT text format for all inputs and outputs of my spatial functions. This incurs a little extra cost in parsing the input into each function, but it just makes the workflow that much easier to debug. I’m also using the ConvexHull() method of the SqlGeography datatype, which is newly introduced in the SqlServer.Types.dll library that ships with SQL Server Denali.

Before you can actually call this function from the Stream Insight query, you need to register a LINQ extension method that wraps the aggregate in such a way that it can be called from LINQ. Here’s my LINQ Extension wrapper:

  public static class UDAExtensionMethods
  {
    [CepUserDefinedAggregate(typeof(ConvexHull))]
    public static string CHull<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, string>> map)
    {
      throw CepUtility.DoNotCall();
    }
  }

Calling the CHull aggregate method (remember to use the name of the LINQ wrapper, not the UDA itself) to calculate the convex hull around all those points whose location is defined by the WKT field in the event payload contained within the window at any time meant that my modified query now looked like this:

var query = from x in inputStream.HoppingWindow(
                        TimeSpan.FromDays(30),
                        TimeSpan.FromDays(1))
            select x.CHull(p => p.WKT);

Now, re-running my project gave the output on the console window as shown below – for each day covered by the extent of the data (still shown in mm/dd/yyyy format because of the OutputTracer output adaptor I’m using – grrrr), the results show the Polygon formed from the convex hull of any points that occurred in the preceding 30 days.

image

In the next post, I’ll look at doing something nicer with this output than simply sending a WKT string to a console window, by plotting the resulting polygons on Bing Maps instead.

This entry was posted in Spatial, SQL Server and tagged , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s