Posts tagged ‘Stream Insight’

August 18, 2011

Spatio-temporal Event Processing with StreamInsight, SQL Server Denali, and Bing Maps – Part 5

Yesterday, fellow Bing Maps MVP Nicolas Boonaert (blog) kindly tweeted a link to my series of posts about analysing spatio-temporal data using StreamInsight and Bing Maps. In addition to the generous words he said publicly, he also sent me a private message teasing me about the graphical presentation of the data. I believe the term he used was “moving pixelised potato on a map”… Winking smile

Spurred on by Nicolas’ comments, I thought I’d take the opportunity to improve the visual impact of my StreamInsight output by making use of my own HeatMap library to plot the outbreak as a heatmap rather than as a convex hull. To do so required remarkably few changes to the code:

First, instead of creating a (Polygonal) convex hull around all the events in the Stream Insight window at a  given point in time, I added a UDA that would instead create a (MultiPoint) union of all those points.

public class MultiPointUnion : CepAggregate<string, string>
  {
    public override string GenerateOutput(IEnumerable<string> eventData)
    {
      var gb = new SqlGeographyBuilder();
      gb.SetSrid(4326);
      gb.BeginGeography(OpenGisGeographyType.MultiPoint);
      foreach (var d in eventData)
      {
        SqlGeography point = SqlGeography.Parse(d);
        gb.BeginGeography(OpenGisGeographyType.Point);
        gb.BeginFigure((double)point.Lat, (double)point.Long);
        gb.EndFigure();
        gb.EndGeography();
      }
      gb.EndGeography();

      return gb.ConstructedGeography();
    }
  }
}

Then, I amended the javascript that updated the map in the browser control. Instead of plotting a single polygon as a vector shape on the map, I imported the heatmap library and added a new HeatMapLayer as follows:

heatmapLayer = new HeatMapLayer(
  map,
  new Array(),
  { intensity: 0.4,
    radius: 25,
    colourgradient: {
      0.0: 'rgba(255, 255, 255, 0)',
      0.5: 'rgba(255, 255, 120, 100)',
      0.8: 'yellow',
      0.95: 'red',
      1.0: 'white'
    }
  }
);

(Colours were chosen somewhat arbitrarily – white hot spot right at the centre of each point, followed by red, and gradually diminishing yellow). Then, each time the WCF listener received a new event summary from StreamInsight, rather than plot it as a Microsoft.Maps.Polygon, it called a method that would split the WKT of the MultiPoint instance into an array of Microsoft.Maps.Locations, and supply these to the heatmap layer:

heatmapLayer.SetPoints(locationArray);

The result is shown in the video below, complete with my rather vague commentary (based on this evidence, I don’t think I’ll ever realise my childhood dreams of being a radio DJ… ). One thing to notice is that, while the convex hull approach I used before emphasises the overall geographic spread at any point in time, the heat map approach emphasises instead the intensity of events in certain areas.

August 16, 2011

Spatio-temporal Event Processing with StreamInsight, SQL Server Denali, and Bing Maps – Part 4

Over the last few posts, I’ve been trying to use SQL Server StreamInsight to analyse a set of data recording outbreaks of  H5N1 avian flu. The data set is spatio-temporal – it contains both the location of each outbreak and the date it occurred.

  • In part 1, I introduced the basics of Stream Insight and installed and tested the Stream Insight engine.
  • In part 2, I created a new input adaptor to stream data from a KML file and tested it by simply passing all the event data to a console window.
  • In part 3, I modified the query to use a custom aggregate function to form the convex hull around the location of all events that lay within a hopping window of 30 days, advancing a day at a time.

Having created the input adaptor to load data into SI, and the query to specify how that data should analysed, the next step is to address what should happen with the output. Currently, I’m just sending the output to a console window using the OutputTracer class from the codeplex StreamInsight samples package, but what I really wanted to do was create an animated map that showed the development of the outbreak over time….

Structuring the Output

Before doing anything more with the results, I decided I needed to structure the output a bit more, so I created a new class defining a summary of the area of the world affected by H5N1 at the end of each day:

public class AvianFluDailySummary
{
  public DateTime AsAtDate { get; set; }
  public string PolygonWKT { get; set; }
}

I then needed to adjust my LINQ query to populate this class with a summary of the events lying in each 24hr hopping window. I already calculated PolygonWKT - the convex hull of events occurring in the window in a custom UDA, so now I just needed to populate the AsAtDate – the timestamp at the end of each window for which that data was valid. I had thought that this would have been exposed quite simply as a property of each hopping window, but it doesn’t appear to be the case (at least, I couldn’t find it). What’s more, I couldn’t just use a UDA to calculate the MAX() starttime of any event falling into a window, since that wasn’t guaranteed to represent the true end of the window.

So, the only way I could find to access the time at the end of a hopping window from within the query was to create a different kind of UDA - a time-sensitive UDA. From http://technet.microsoft.com/en-us/library/ee842720.aspx (emphasis added):

  • Time-insensitive UDAs and UDOs do not expect to be passed whole events including their time stamps. Instead, they only consider a set of one or more payload fields from the events in the defined window. Also, the current window start and end time are not passed to them.
  • Time-sensitive UDAs and UDOs are passed a set of events for each window including their time stamps and the window start and end times.

So, if I created a new time-sensitive UDA I could use it to return the timestamp at the end of each hopping window. Note that I wouldn’t actually be doing any aggregation of the events in the window, which means I can’t believe this is the most efficient way to do this, but it seems to work for now. Here’s my CepTimeSensitiveAggregate to return the timestamp at the end of a given hopping window:

public class WindowEnd<AvianFluEventType> : CepTimeSensitiveAggregate<AvianFluEventType, DateTime>
{
  public override DateTime GenerateOutput(IEnumerable<IntervalEvent<AvianFluEventType>> events,
                      WindowDescriptor windowDescriptor)
  {
    return windowDescriptor.EndTime.UtcDateTime;
  }
}

And, as with my ConvexHull aggregate created before, I also needed to register an associated wrapper to enable the WindowEnd aggregate to be called from within the LINQ query:

[CepUserDefinedAggregate(typeof(WindowEnd<>))]
public static DateTime WinEnd<InputT>(this CepWindow<InputT> window)
{
  throw CepUtility.DoNotCall();
}

Then I changed my query to make use of the WinEnd() function to populate the “AsAtDate” for each daily summary, like this:

var query = from x in inputStream.HoppingWindow(TimeSpan.FromDays(30), TimeSpan.FromDays(1))
            select new AvianFluDailySummary
            {
              AsAtDate = x.WinEnd(),
              PolygonWKT = x.CHull(p => p.WKT)
            };

Sending the Output to a WCF Service and Drawing a Map

Now I’d got some structure to my output, I could send it to a different output adaptor. To get the results onto Bing Maps, I decided the easiest way would be to get StreamInsight to publish them to a webservice. The StreamInsight Samples codeplex project includes a model output data adaptor to send results to a WCF service that I shamelessly bastardised. Firstly, I created a listener interface that defined a ServiceContract for the fields in my daily summary:

[ServiceContract]
public interface IFeedbackListenerService
{
    [OperationContract]
    void ReceiveEvents(
      DateTime asAtDate,
      string convexPoly
    );
}

Then I created a new listener that would act upon events received through this interface. The listener would call a specified delegate method and pass it a set of parameters defined in the UpdateMapEventsArgs class in order to update a map with a polygon showing the area affected as at the specified time :

// Define delegate method to be called to update the map
public delegate void UpdateMapDelegate(object sender, UpdateMapEventArgs e);

// Define arguments to be passed to delegate method
public class UpdateMapEventArgs 
{ 
  public string PolygonWKT { get; set; }
  public DateTime AsAtDate { get; set; }

  public UpdateMapEventArgs(DateTime asAt, string polyWKT) 
  {
    AsAtDate = asAt;
    PolygonWKT = polyWKT;
  } 
}

// Define listener service
public class FeedbackListenerService : IFeedbackListenerService
{
  public static UpdateMapDelegate MapUpdater { get; set; }
  
  public void ReceiveEvents(string asAtDate, string polygonWKT)
  {
    try
    {
      UpdateMapDelegate handler = MapUpdater;
      if (handler != null)
      {
        handler(this, new UpdateMapEventArgs(asAtDate, polygonWKT));
      }
    }
    catch {
      // Do something here later
    }
  }
}

To display the map, I used v7 of the Bing Maps AJAX control hosted within a WebBrowser control, with a function to plot a polygon from supplied WKT as follows:

// Add an (initially empty) polygon to the map
var Polygon = new Microsoft.Maps.Polygon();
map.entities.push(Polygon);

// Redraw the polygon
function updatePolygon(WKT) {

  // Split the WKT into an array of coordinate pairs
  var coordArray = WKT.split(", ");
  
  // Populate a location array from the set of coordinates
  locationArray = new Array();
  for (var i = 0; i < coordArray.length; i++) {
    var coordpair = coordArray[i].toString();
    var coords = coordpair.split(" ");
    locationArray.push(new Microsoft.Maps.Location(coords[1], coords[0]));
  }

  // Update the polygon
  Polygon.setLocations(locationArray);
        
}

Then, I just needed to invoke this script from the UpdateMap method which would be the delegate method provided as a callback to my listener service:

public void UpdateMap(object sender, UpdateMapEventArgs args)
{
  Invoke((MethodInvoker) delegate
  {
    // Update a simple label to show what date we're looking at
    label1.Text = args.AsAtDate.ToString();

    // Call the javascript in the Bing Maps webbrowser to update the map
    this.webBrowser1.Document.InvokeScript("updatePolygon", new object[] { args.PolygonWKT });
  }
}

Finally, start the listening service:

FeedbackListenerService.MapUpdater = UpdateMap;
host = new ServiceHost(typeof(FeedbackListenerService));
host.Open();

With the new output adaptor in place and the listening service started, I started off my StreamInsight query to start streaming summaries of the events contained in each window (remember that my window contains events in the preceding 30 days, and hops forward by a day each time). These summaries would be published to a WCF service, where they would be picked up by the listener and sent as arguments to update the Bing Map control. Here’s some screenshots of the application in action:

imageimage

And, because the whole point of this was to create an animated map timeline, here’s a video:

There’s still quite a lot of tidying up I could do to my code (this was, after all, my first attempt and I took quite a lot of shortcuts), and I don’t think this application will exactly wow anyone in terms of making advanced use of StreamInsight’s functions, but hopefully it shows you that an alternative approach to analysing spatial data that also has a time-based element.

Perhaps if I find time I’ll revisit this topic again in the future and pick up where I left off. But, for now, enjoy!

August 13, 2011

Spatio-temporal Event Processing with StreamInsight, SQL Server Denali, and Bing Maps – Part 3

At the end of my last post, I’d managed to stream some data representing cases of the H5N1 virus from a KML file into the StreamInsight engine, and streamed the results out again to a console window. Not particularly exciting, I admit, but hopefully it will provide the foundation necessary to create some more interesting spatio-temporal analysis. So, the next step was to try to come up with a better query to analyse the data as it streamed through the engine.

Thinking about the data I was using, I tried to think of a typical way in which you’d want to query the information relating to the outbreak of an infectious disease. Here’s what I came up with – let’s suppose that the disease is airborne and spread by droplets. In which case, you can define the geographic extent of the area that has been affected by creating the convex hull around all the locations that have had reported incidents (if you’re in a location and places to the west, east, south, and north of you have all had confirmed cases, I’d say you lie in an infected area even if you haven’t directly had any cases at your location). What’s more, let’s say that, after an outbreak has been confirmed at a location, it takes 30 days before that area can be declared “all clear” and free of the infection.

Bear in mind that I’m not an epidemiologist and I have no idea if these assumptions are anything like accurate, but they’ll work for me in this example. So, in order to define (and plot on a map) the areas of the world affected by the disease at any point in time, I’d want to determine all those points that lay within the convex hull formed from the locations of any incidents that had occurred in the preceding 30 days. This seemed like a reasonable case for a spatio-temporal query, so let’s just pass over the medical accuracy or not of the assumptions…

Defining the Hopping Window

The basic query I’d set up last time just selected all the events from the input stream, as follows:

var query = from e in inputStream
            select e;

This time, rather than just select each individual event, I wanted to create a hopping window – a window that selects only those events that occur within a particular, sliding timeframe. Then I’d want to perform an aggregate of the payload fields of the events contained within that window.

I’d already decided that the extent of my window would cover a period of 30 days (to capture all those occurrences that were still contagious at any given point in time), and I wanted to examine the cases that fell within that window on a day-by-day basis – i.e. my window would “hop” forward by one day each time. There was no point defining a hop size of less than a day, since the granularity of my input data was only recorded at date level.

Both the window size and the hop size are defined as timespan parameters to a HoppingWindow acting on an input stream. So, I changed the first part of my query to look like this:

var query = from e in inputStream.HoppingWindow(
                        TimeSpan.FromDays(30), // Window size
                        TimeSpan.FromDays(1))  // Hop size
            ... // Do something with the events that fall in this window

Next, I needed to create the aggregate function to act upon those events in the window.

Creating a Custom StreamInsight Aggregate function

To create the convex hull of the set of points contained within a particular window, I needed to create an aggregate function that derives from the CepAggregate base class. There’s an MSDN article at http://msdn.microsoft.com/en-us/library/ee842720.aspx that gives an example, although it fails to mention where you’d actually find that class (it’s in Microsoft.ComplexEventProcessing.Extensibility, incidentally). In my case, I wanted a function that would act upon an input set of strings (remember that my payload contained the location of each outbreak as a WKT Point), and return a string (the WKT of a Polygon created from the convex hull of those points). Here’s the function I created:

  public class ConvexHull : CepAggregate<string, string>
  {
    public override string GenerateOutput(IEnumerable<string> eventData)
    {
      // First, create a MultiPoint of all Points in the current window
      var gb = new SqlGeographyBuilder();
      gb.SetSrid(4326);
      gb.BeginGeography(OpenGisGeographyType.MultiPoint);
      foreach (var d in eventData)
      {
        // Create a geography instance from the WKT of each event
        SqlGeography point = SqlGeography.Parse(d);
        gb.BeginGeography(OpenGisGeometryType.Point);
        gb.BeginFigure((double)point.STX, (double)point.STY);
        gb.EndFigure();
        gb.EndGeography();
      }
      gb.EndGeography();

      // Now, create the Convex Hull of that MultiPoint
      SqlGeography convexhull;
      convexhull = gb.ConstructedGeography.STConvexHull();
      // Return the WKT of the Convex Hull
      return convexhull.ToString();
    }
  }

Notice that, as with the location data in the payload of each event, I’m using the WKT text format for all inputs and outputs of my spatial functions. This incurs a little extra cost in parsing the input into each function, but it just makes the workflow that much easier to debug. I’m also using the ConvexHull() method of the SqlGeography datatype, which is newly introduced in the SqlServer.Types.dll library that ships with SQL Server Denali.

Before you can actually call this function from the Stream Insight query, you need to register a LINQ extension method that wraps the aggregate in such a way that it can be called from LINQ. Here’s my LINQ Extension wrapper:

  public static class UDAExtensionMethods
  {
    [CepUserDefinedAggregate(typeof(ConvexHull))]
    public static string CHull<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, string>> map)
    {
      throw CepUtility.DoNotCall();
    }
  }

Calling the CHull aggregate method (remember to use the name of the LINQ wrapper, not the UDA itself) to calculate the convex hull around all those points whose location is defined by the WKT field in the event payload contained within the window at any time meant that my modified query now looked like this:

var query = from x in inputStream.HoppingWindow(
                        TimeSpan.FromDays(30),
                        TimeSpan.FromDays(1))
            select x.CHull(p => p.WKT);

Now, re-running my project gave the output on the console window as shown below – for each day covered by the extent of the data (still shown in mm/dd/yyyy format because of the OutputTracer output adaptor I’m using - grrrr), the results show the Polygon formed from the convex hull of any points that occurred in the preceding 30 days.

image

In the next post, I’ll look at doing something nicer with this output than simply sending a WKT string to a console window, by plotting the resulting polygons on Bing Maps instead.

Follow

Get every new post delivered to your Inbox.

Join 53 other followers