Spatio-temporal Event Processing with StreamInsight, SQL Server Denali, and Bing Maps – Part 2

In my last post, I outlined a plan for conducting temporal analysis of a set of spatial data by streaming it through StreamInsight. Following this plan in something like a logical order, the first step is to obtain some source data and create an input adaptor to load that information into the SI engine.

Looking around the internet, I decided that I would use a set of data produced by Declan Butler detailing reported cases of the Avian Flu virus, H5N1. This data set was originally used to accompany an article published in “Nature” magazine, here, and you can download the underlying data as a KML document from http://www.declanbutler.info/Flumaps1/Timeseries.kml.

Creating the Event Class

Looking at the contents of the dataset above, I tried to determine which pieces of information I wanted to attach to each event for possible use for filtering or grouping as I streamed them though the StreamInsight query engine. The following shows the structure of the <Placemark> element used to denote each occurrence recorded in the TimeSeries.kml file:

image

Most importantly, this file does contain both spatial (the <coordinates> element) and temporal (the <TimeStamp> element) information for each occurrence. There’s also a lot of additional information – the number of individuals affected, number of deaths, number of animals destroyed, number vaccinated… – but it was going to be a bit awkward to extract this information since they had been placed in an HTML table within the <description> element (cut off in the image above) rather than identified as separate KML elements (the KML schema, strangely, doesn’t specify a <chickens_slaughtered> element…). These additional fields of information could be very useful for my analysis, but they would require some string-parsing and manipulation to extract, and I didn’t want to make things too complicated since this was meant to be an exercise in StreamInsight rather than in data-crunching.

So, for now I chose only to use the more accessible element values instead. Maybe if I get a basic test case working, I’ll come back and include some more fields later. Here’s the simple class I defined, which would contain the fields of information in the payload attached to each event:

public class AvianFluEventType
{
  public string Name { get; set; }
  public string Affected { get; set; }
  public string WKT { get; set; }
}
  • Name would be populated from the <name> element of each element in the KML file, which is just the placename where the incident occurred.
  • Affected records the subject of the infection – human, or animal – which I intend to use as a filter in the query later. The cases in the KML file were grouped into different parent <Folder> elements for Human and Animal cases, so this was easy to extract as well.
  • The WKT field will store the Well-Known Text representation of a Point placed at the location specified by the <coordinates> element in the KML file. There are many ways I could have serialised this geographic information in the payload – I could have created separate numeric columns for latitude and longitude coordinates values, which would perhaps have been efficient but would have limited me to only dealing with Point instances. I could also have used a binary format, such as as Well-Known Binary or the SqlGeography serialisation, which may have been more efficient to transmit and operate on, but would have been harder to debug. However, I settled on Well-Known Text as being familiar, flexible, and vaguely warm-feeling…. if I could see a stream of “POINT…” or “POLYGON…” strings passing through StreamInsight – I’d feel a lot more comfortable that my process was working correctly than if I saw a load of 0xE021BB0000….

Notice that my AvianFluEventType class defines only those fields in the payload attached to each event (i.e. the non-temporal aspects of information) – it does not contain a field representing the time(s) at which the event occurred. This information, which is fundamental to the way that StreamInsight treats data, will  be attached to the event separately in just a minute, and sourced from the <TimeStamp> element in the KML.

Creating the Input Adaptor

To actually stream the data through StreamInsight, I needed to create an input adaptor that would load the KML file and populate a stream of events based on the rules above. Since KML is just a dialect of XML, I started off by looking to see if there was an existing XML data adaptor. The StreamInsight Samples codeplex project contains input adaptors for Facebook, Twitter, CSV, SQL, and WCF, but not, as far as I could tell, for XML. So I was going to have to write my own adaptor.

To start with, I created a new adaptor class that inherited from the Microsoft.ComplexEventProcessing.Adapters.PointInputAdaptor base class.

namespace XmlDataSource
{
  public class XmlFilePointInput : PointInputAdapter
  {

For the logic of the XmlFilePointInput class itself, I borrowed a large part of the code from the TextFilePointInput class in the SimpleTextFileReader example on codeplex. The main difference being that, instead of creating a stream of events from fields in a delimited text file, my adaptor creates a payload based on elements of an XML file. In the TextFilePointInput class, the payload fields are populated by splitting each line of a CSV file according to a specified delimiter (i.e., a comma), and then looping through each element to populate the fields of the chosen event, as follows:

private PointEvent CreateEventFromLine(string line)
{
  string[] split = line.Split(new char[] { this.delimiter }, StringSplitOptions.None);
  ...
  for (int ordinal = 0; ordinal < this.bindtimeEventType.FieldsByOrdinal.Count; ordinal++)
  {
    int cepOrdinal = this.inputOrdinalToCepOrdinal[ordinal];
    CepEventTypeField evtField = this.bindtimeEventType.FieldsByOrdinal[cepOrdinal];
    ...
    Type t = Nullable.GetUnderlyingType(evtField.Type.ClrType) ?? evtField.Type.ClrType;
    object value = Convert.ChangeType(split[ordinal + NumNonPayloadFields], t, this.cultureInfo);
                  
    pointEvent.SetField(cepOrdinal, value);

And the StartTime for an event (which if you’re using the Point event shape model, as I am, is the only time value associated with each event) is set to the first value in the corresponding line of the CSV:

pointEvent.StartTime = DateTime.Parse(split[0], this.cultureInfo, DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal).ToUniversalTime();

My XML file reader works in almost exactly the same way, except that instead of referencing the StartTime and the payload fields for each event based on their ordinal position in a CSV file, I populate them from specified elements in an XML file. Originally, I had intended to make a reusable, generic XML input adaptor that would take a supplied XSLT transformation to map elements from the source XML file to the corresponding StreamInsight Event values, however I had some problems writing the XSLT in such a way that it would actually work with this particular KML file. As a temporary solution, I instead hardcoded an xQuery pattern for each field in the eventtype to target the appropriate KML element. Making a more generic XML Input Adaptor can be added to my “List of things to come back to at the end if I ever get this working”.

Once I’d targetted the appropriate KML elements to populate the Name, Affected, and WKT fields of my event class, I also had to do a little bit of manipulating of the data. Firstly, I removed the<[![CDATA]> wrappers around each of the name and type values. Then I concatenated the values of the <coordinates> element to construct the WKT representation of a point.

I wondered whether I’d also have to manipulate the format of the <TimeStamp> element from its existing yyyy-mm-dd structure before it could be used to set the StartTime property of each of my events, or whether DateTime.Parse() would interpret it correctly. I decided to leave it alone for now, but in case of problems in the future I made sure that my XmlFilePointInput class accepted a CultureInfo parameter that could be passed to DateTime.Parse() so that I could have some control over the way date/time formats were handled by the reader. (The SimpleTextFileReader adaptor does this already). I don’t want any of that mm/dd/yyyy nonsense going on… Winking smile

One final important point worth noting about the input adaptor is that, since I’m reading the values from the KML file in order to create a stream of events over time, that adaptor must read those points in ascending chronological order. The simplest way to achieve this, of course, is to ensure that the elements in the XML file are themselves stored in chronological order. If this is not the case then you’ll have to apply a sort, perhaps using XSLT’s xsl:sort instruction to sort the elements prior to feeding the events to StreamInsight.

Testing Out the Input Adaptor

To test out my KML input adaptor, I created a console application (no need for any fancy GUIs here…), which began by starting up the StreamInsight server, creating a new application, and defining the configuration properties for the input data adaptor. It then used my new XmlFileReaderFactory to create a stream of point events, each containing my specified AvianFluEventType payload, as follows:

static void Main(string[] args)
{
  // Start up the SI Server
  Console.WriteLine("Starting StreamInsight Server...");
  using (Server server = Server.Create("StreamInsight12"))

    // Plenty of opportunity for things to go wrong. Best use a try block...
    try
    {
    
      // Create a new application on the server
      var myApp = server.CreateApplication("AvianFlu");

      // Configure the XML Input adaptor
      // TODO Make this a generic XML reader that accepts an XSLT transform
      // rather than hardcoded values to the TimeSeries.kml dataset as currently 
      var inputConfig = new XmlFileReaderConfig
      {
        InputFileName = @"TimeSeries.kml",
        CtiFrequency = 1,
        CultureName = "en-GB"
      };

      // Create input stream
      var inputStream = CepStream<AvianFluEventType>.Create(
        "input",
        typeof(XmlFileReaderFactory),
        inputConfig, 
        EventShape.Point
      );

In order to test whether the events were correctly flowing into StreamInsight, I probably should have used the StreamInsight Event Flow Debugger. However, as stated in my last post, I didn’t have much luck with my Gung Ho approach to just starting this application up and expecting to connect to a running StreamInsight server. A cursory glance at the MSDN documentation suggests that it’s because I need to expose a management endpoint from my StreamInsight server instance first…. but that’s another thing to address in the “Let’s look at it later list”. For now, I’ll check whether the events are flowing into StreamInsight by just trying to stream them out again, to a data output adaptor that prints to the console.

First, let’s define a LINQ query against the input stream. Well, I say a “query”, but it’s really just a passthrough, as follows:

var query = from e in inputStream
            select e;

 

Now we need to bind that query to an output adaptor that will output the results somewhere – in this case, to a console window. Fortunately, such an adaptor is included in the samples package, so I just compiled it and then included a reference to StreamInsight.Samples.Adapters.OutputTracer in my own project. Then, bind the query to the output adaptor as follows:

var outputToConsole = query.ToQuery(
  myApp,
  "SI Query",
  string.Empty,
  typeof(TracerFactory),
  tracerConfig,
  EventShape.Point,
  StreamEventOrder.FullyOrdered);

outputToConsole.Start();

(I’m using the default tracerConfig as supplied in the sample). With all that in place, the only thing was to hit F5, cross fingers, and hope.

Well, what do you know? (or, as they say here in Norfolk, “who’d have thunk?”) – my console application window started filling up with events as they passed through the SI query engine into the output adaptor:

image

I also felt that this vindicated my decision to use WKT as the geographic encoding in the event payload – just look at all those lovely POINTs flowing past… 😉

One thing that might not be obvious in the output above is that the date format of each Point event is being displayed in mm/dd/yyyy format. Since I had tried to specify the en-GB culture when parsing the TimeStamp for the StartTime attached to each event, and this wasn’t what I was getting, I mistakenly tried burrowing around to find a bug in my code. What I should have realised, of course, is that the input adaptor had read the input timestamps correctly into StreamInsight, but that the output adaptor was formatting them differently (in this case, using CultureInfo.InvariantCulture). Sadly, the TracerConfig class used by the the ConsoleWriter sample doesn’t allow you to supply a CultureInfo, so I’ll just live with this for now.

Stay tuned for the next episode, where I’ll try to write a spatial query that actually does something more useful than just sending all the results out in WKT as they are received, and perhaps I’ll get on to creating an output adaptor that sends the results to Bing Maps too…

This entry was posted in Spatial, SQL Server and tagged , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s