Posts tagged ‘StreamInsight’

August 8, 2011

Spatio-temporal Event Processing with StreamInsight, SQL Server Denali, and Bing Maps – Part 2

In my last post, I outlined a plan for conducting temporal analysis of a set of spatial data by streaming it through StreamInsight. Following this plan in something like a logical order, the first step is to obtain some source data and create an input adaptor to load that information into the SI engine.

Looking around the internet, I decided that I would use a set of data produced by Declan Butler detailing reported cases of the Avian Flu virus, H5N1. This data set was originally used to accompany an article published in “Nature” magazine, here, and you can download the underlying data as a KML document from http://www.declanbutler.info/Flumaps1/Timeseries.kml.

Creating the Event Class

Looking at the contents of the dataset above, I tried to determine which pieces of information I wanted to attach to each event for possible use for filtering or grouping as I streamed them though the StreamInsight query engine. The following shows the structure of the <Placemark> element used to denote each occurrence recorded in the TimeSeries.kml file:

image

Most importantly, this file does contain both spatial (the <coordinates> element) and temporal (the <TimeStamp> element) information for each occurrence. There’s also a lot of additional information – the number of individuals affected, number of deaths, number of animals destroyed, number vaccinated… – but it was going to be a bit awkward to extract this information since they had been placed in an HTML table within the <description> element (cut off in the image above) rather than identified as separate KML elements (the KML schema, strangely, doesn’t specify a <chickens_slaughtered> element…). These additional fields of information could be very useful for my analysis, but they would require some string-parsing and manipulation to extract, and I didn’t want to make things too complicated since this was meant to be an exercise in StreamInsight rather than in data-crunching.

So, for now I chose only to use the more accessible element values instead. Maybe if I get a basic test case working, I’ll come back and include some more fields later. Here’s the simple class I defined, which would contain the fields of information in the payload attached to each event:

public class AvianFluEventType
{
  public string Name { get; set; }
  public string Affected { get; set; }
  public string WKT { get; set; }
}
  • Name would be populated from the <name> element of each element in the KML file, which is just the placename where the incident occurred.
  • Affected records the subject of the infection – human, or animal – which I intend to use as a filter in the query later. The cases in the KML file were grouped into different parent <Folder> elements for Human and Animal cases, so this was easy to extract as well.
  • The WKT field will store the Well-Known Text representation of a Point placed at the location specified by the <coordinates> element in the KML file. There are many ways I could have serialised this geographic information in the payload – I could have created separate numeric columns for latitude and longitude coordinates values, which would perhaps have been efficient but would have limited me to only dealing with Point instances. I could also have used a binary format, such as as Well-Known Binary or the SqlGeography serialisation, which may have been more efficient to transmit and operate on, but would have been harder to debug. However, I settled on Well-Known Text as being familiar, flexible, and vaguely warm-feeling…. if I could see a stream of “POINT…” or “POLYGON…” strings passing through StreamInsight – I’d feel a lot more comfortable that my process was working correctly than if I saw a load of 0xE021BB0000….

Notice that my AvianFluEventType class defines only those fields in the payload attached to each event (i.e. the non-temporal aspects of information) – it does not contain a field representing the time(s) at which the event occurred. This information, which is fundamental to the way that StreamInsight treats data, will  be attached to the event separately in just a minute, and sourced from the <TimeStamp> element in the KML.

Creating the Input Adaptor

To actually stream the data through StreamInsight, I needed to create an input adaptor that would load the KML file and populate a stream of events based on the rules above. Since KML is just a dialect of XML, I started off by looking to see if there was an existing XML data adaptor. The StreamInsight Samples codeplex project contains input adaptors for Facebook, Twitter, CSV, SQL, and WCF, but not, as far as I could tell, for XML. So I was going to have to write my own adaptor.

To start with, I created a new adaptor class that inherited from the Microsoft.ComplexEventProcessing.Adapters.PointInputAdaptor base class.

namespace XmlDataSource
{
  public class XmlFilePointInput : PointInputAdapter
  {

For the logic of the XmlFilePointInput class itself, I borrowed a large part of the code from the TextFilePointInput class in the SimpleTextFileReader example on codeplex. The main difference being that, instead of creating a stream of events from fields in a delimited text file, my adaptor creates a payload based on elements of an XML file. In the TextFilePointInput class, the payload fields are populated by splitting each line of a CSV file according to a specified delimiter (i.e., a comma), and then looping through each element to populate the fields of the chosen event, as follows:

private PointEvent CreateEventFromLine(string line)
{
  string[] split = line.Split(new char[] { this.delimiter }, StringSplitOptions.None);
  ...
  for (int ordinal = 0; ordinal < this.bindtimeEventType.FieldsByOrdinal.Count; ordinal++)
  {
    int cepOrdinal = this.inputOrdinalToCepOrdinal[ordinal];
    CepEventTypeField evtField = this.bindtimeEventType.FieldsByOrdinal[cepOrdinal];
    ...
    Type t = Nullable.GetUnderlyingType(evtField.Type.ClrType) ?? evtField.Type.ClrType;
    object value = Convert.ChangeType(split[ordinal + NumNonPayloadFields], t, this.cultureInfo);
                  
    pointEvent.SetField(cepOrdinal, value);

And the StartTime for an event (which if you’re using the Point event shape model, as I am, is the only time value associated with each event) is set to the first value in the corresponding line of the CSV:

pointEvent.StartTime = DateTime.Parse(split[0], this.cultureInfo, DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal).ToUniversalTime();

My XML file reader works in almost exactly the same way, except that instead of referencing the StartTime and the payload fields for each event based on their ordinal position in a CSV file, I populate them from specified elements in an XML file. Originally, I had intended to make a reusable, generic XML input adaptor that would take a supplied XSLT transformation to map elements from the source XML file to the corresponding StreamInsight Event values, however I had some problems writing the XSLT in such a way that it would actually work with this particular KML file. As a temporary solution, I instead hardcoded an xQuery pattern for each field in the eventtype to target the appropriate KML element. Making a more generic XML Input Adaptor can be added to my “List of things to come back to at the end if I ever get this working”.

Once I’d targetted the appropriate KML elements to populate the Name, Affected, and WKT fields of my event class, I also had to do a little bit of manipulating of the data. Firstly, I removed the<[![CDATA]> wrappers around each of the name and type values. Then I concatenated the values of the <coordinates> element to construct the WKT representation of a point.

I wondered whether I’d also have to manipulate the format of the <TimeStamp> element from its existing yyyy-mm-dd structure before it could be used to set the StartTime property of each of my events, or whether DateTime.Parse() would interpret it correctly. I decided to leave it alone for now, but in case of problems in the future I made sure that my XmlFilePointInput class accepted a CultureInfo parameter that could be passed to DateTime.Parse() so that I could have some control over the way date/time formats were handled by the reader. (The SimpleTextFileReader adaptor does this already). I don’t want any of that mm/dd/yyyy nonsense going on… Winking smile

One final important point worth noting about the input adaptor is that, since I’m reading the values from the KML file in order to create a stream of events over time, that adaptor must read those points in ascending chronological order. The simplest way to achieve this, of course, is to ensure that the elements in the XML file are themselves stored in chronological order. If this is not the case then you’ll have to apply a sort, perhaps using XSLT’s xsl:sort instruction to sort the elements prior to feeding the events to StreamInsight.

Testing Out the Input Adaptor

To test out my KML input adaptor, I created a console application (no need for any fancy GUIs here…), which began by starting up the StreamInsight server, creating a new application, and defining the configuration properties for the input data adaptor. It then used my new XmlFileReaderFactory to create a stream of point events, each containing my specified AvianFluEventType payload, as follows:

static void Main(string[] args)
{
  // Start up the SI Server
  Console.WriteLine("Starting StreamInsight Server...");
  using (Server server = Server.Create("StreamInsight12"))

    // Plenty of opportunity for things to go wrong. Best use a try block...
    try
    {
    
      // Create a new application on the server
      var myApp = server.CreateApplication("AvianFlu");

      // Configure the XML Input adaptor
      // TODO Make this a generic XML reader that accepts an XSLT transform
      // rather than hardcoded values to the TimeSeries.kml dataset as currently 
      var inputConfig = new XmlFileReaderConfig
      {
        InputFileName = @"TimeSeries.kml",
        CtiFrequency = 1,
        CultureName = "en-GB"
      };

      // Create input stream
      var inputStream = CepStream<AvianFluEventType>.Create(
        "input",
        typeof(XmlFileReaderFactory),
        inputConfig, 
        EventShape.Point
      );

In order to test whether the events were correctly flowing into StreamInsight, I probably should have used the StreamInsight Event Flow Debugger. However, as stated in my last post, I didn’t have much luck with my Gung Ho approach to just starting this application up and expecting to connect to a running StreamInsight server. A cursory glance at the MSDN documentation suggests that it’s because I need to expose a management endpoint from my StreamInsight server instance first…. but that’s another thing to address in the “Let’s look at it later list”. For now, I’ll check whether the events are flowing into StreamInsight by just trying to stream them out again, to a data output adaptor that prints to the console.

First, let’s define a LINQ query against the input stream. Well, I say a “query”, but it’s really just a passthrough, as follows:

var query = from e in inputStream
            select e;

 

Now we need to bind that query to an output adaptor that will output the results somewhere – in this case, to a console window. Fortunately, such an adaptor is included in the samples package, so I just compiled it and then included a reference to StreamInsight.Samples.Adapters.OutputTracer in my own project. Then, bind the query to the output adaptor as follows:

var outputToConsole = query.ToQuery(
  myApp,
  "SI Query",
  string.Empty,
  typeof(TracerFactory),
  tracerConfig,
  EventShape.Point,
  StreamEventOrder.FullyOrdered);

outputToConsole.Start();

(I’m using the default tracerConfig as supplied in the sample). With all that in place, the only thing was to hit F5, cross fingers, and hope.

Well, what do you know? (or, as they say here in Norfolk, “who’d have thunk?”) – my console application window started filling up with events as they passed through the SI query engine into the output adaptor:

image

I also felt that this vindicated my decision to use WKT as the geographic encoding in the event payload – just look at all those lovely POINTs flowing past…  ;)

One thing that might not be obvious in the output above is that the date format of each Point event is being displayed in mm/dd/yyyy format. Since I had tried to specify the en-GB culture when parsing the TimeStamp for the StartTime attached to each event, and this wasn’t what I was getting, I mistakenly tried burrowing around to find a bug in my code. What I should have realised, of course, is that the input adaptor had read the input timestamps correctly into StreamInsight, but that the output adaptor was formatting them differently (in this case, using CultureInfo.InvariantCulture). Sadly, the TracerConfig class used by the the ConsoleWriter sample doesn’t allow you to supply a CultureInfo, so I’ll just live with this for now.

Stay tuned for the next episode, where I’ll try to write a spatial query that actually does something more useful than just sending all the results out in WKT as they are received, and perhaps I’ll get on to creating an output adaptor that sends the results to Bing Maps too…

July 31, 2011

Spatio-temporal Event Processing with StreamInsight, SQL Server Denali, and Bing Maps – Part 1

Following a recent conversation with Isaac Kunen from the StreamInsight team, I decided that I should find out more about Microsoft’s complex event processing engine. Technology moves so fast that there are probably a hundred topics that I wish I’d know more about and, although I had a superficial idea what StreamInsight does, I’d never really looked at it in much depth because I didn’t think I had any particular use for it. So this post captures my first baby steps with using StreamInsight – hopefully it should provide some guidance for anyone else who hasn’t tried it yet, and perhaps a few chuckles for those people who are already experts in the field.

What Is StreamInsight, anyway?

The first hurdle I had to overcome was not a technical one, but a mental one: what exactly is StreamInsight and what it it for? You can read all the release material from Microsoft which I won’t bother repeating here, but these are the things I had to get my head around:

  • Firstly, although it’s branded and licensed as part of SQL Server, StreamInsight really has nothing to do with SQL Server, either in terms of architecture or purpose. You don’t need to have SQL Server installed on the machine on which StreamInsight runs, for example, and you won’t see a line of T-SQL code in StreamInsight.
  • StreamInsight is an entirely in-memory engine for processing streams of input (i.e. a time-ordered series of events) – it can write output to SQL Server, but it can also write to a Console app, WCF service etc. It doesn’t have to persist any data at all.
  • Although the examples tend to focus on scenarios in which you’d have live streams of event data coming from e.g. monitoring systems, stock trackers etc., you can also use StreamInsight to analyse any temporal data, by streaming it into the StreamInsight engine using an input adaptor from e.g. a text/CSV file or database. The timestamp used to order the events in a stream can be supplied from historical data and does not have to bear any relation to the actual time at which it is received by the engine.
  • You write queries using LINQ to analyse those events passing through the engine. There are different types of temporal query pattern – for example, analysing those events that occur in rolling windows of a fixed time length, or at a particular snapshot in time.
  • Every event carries an associated payload of data – user-defined fields of information attached to that event. These are based on fundamental .NET entity types – string, binary etc. Of interest to me, particularly, is that means you can have spatial information in an event payload, either as WKT, WKB, or just using the native SqlGeography/SqlGeometry serialisation format.
  • The payload information of those events falling within a specified window (or filtered using some other query template) can be aggregated, manipulated, or have other query logic applied. This information can then be routed through to one or more output adaptors, which can write the results to a file, database, or service.

StreamInsight and Spatial. An Example Application Plan.

So, how could I put this all together and make an example related to spatial? Well, here was my plan (explained using my limited knowledge of StreamInsight terminology):

  1. I would create an input adaptor that would load a set of spatial point data that also has an associated temporal value into the StreamInsight engine. The sort of thing I’ve got in mind is a dataset of the location and time at which cases of a particular virus outbreak (e.g. H5N1/Foot and Mouth etc.) were reported.
  2. Since I’m considering notifications to occur at a singular moment in time, I’d use the point event model. (If, instead, I wanted to consider the period of time for which a farm was declared “infected”, I might use an edge model instead)
  3. The payload attached to each event would contain a geography Point instance representing the location at which the occurrence occurred (possibly serialised as Well-Known Binary, or maybe just WKT)
  4. In SI, I’d define a hopping window that would consider, say, all those events in the preceding 3 days leading up to any day.
  5. Then, in my query I’d create a User-Defined Aggregate that took the events in the current window and created a convex hull around them (using the geography ConvexHull() method introduced in SQL Server Denali Microsoft.SqlServer.Types.dll).
  6. Finally, I’d create an output adaptor that would send (via a WCF service?) the resulting geography Polygon of the current event window to be visualised on Bing Maps. The map would refresh to show a time-based spatial analysis of the spread of the virus.

That’s my plan, at least, and if you follow over the next few blog posts you’ll see how I get on in actually achieving it. I haven’t prepared these posts in advance, so there’s a very real chance that I’ll get halfway through and give up, crying. We’ll see.

For this post, I’ll start right at the beginning and just jot down a few notes about getting StreamInsight installed and configured.

Installing and Configuring StreamInsight

  • The latest version of StreamInsight (v1.2), can be downloaded from the StreamInsight Download Centre at http://www.microsoft.com/download/en/details.aspx?id=26720.
  • There are x86 and x64 versions available. Since StreamInsight is not a component of SQL Server, there is no need to choose a version that correlates with any other SQL Server tools you’ve already got installed. I’m running a x64 system so I downloaded the x64 StreamInsight package, even though I’ve got an x86 SQL Server installation on this machine.
  • The package comes in two flavours: one is for client only (i.e. allows you to connect to existing StreamInsight services) whereas the other is the full package. I went with the full shebang option (which, curiously, is exactly the same filesize as the client-only version).
  • The package is very small and installation is quick. What I found a bit odd is that, after installation, I was prompted to install version 3.5 SP2 of SQL Server Compact. Why should a product that is licensed as part of SQL Server 2008 R2 have a dependency on SQL Server Compact? What’s even more odd is that, if install the x64 platform version of StreamInsight, you need to install both the x86 and x64 versions of SQL Server Compact. Fortunately, these are both included in the redist folder of StreamInsight, and only take a few minutes to set up, so it’s not too much of a big deal.
  • After installation, there’s not much to see. Stream Insight is a service and a SDK rather that an “application”. There is one executable added to the Start Menu, which is the Event Flow Debugger. Because I’m the kind of person who clicks on programs that are newly-installed, I tried firing this up – it had already pre-filled in the instance name I supplied during installation, so I clicked OK:

image

Hmm. No Dice:

image

I wondered whether the StreamInsight service was actually running. After all, the installation was very short, and I don’t remember setting up anything relating to service accounts etc. unlike in a SQL Server installation, for example. Sure enough, StreamInsight was set to be a manual service, and wasn’t currently running, so I tried to manually start it:

image

image

Hmmm. Let’s just leave that for a while and look at what else was added to the start menu. There’s a link to the MSDN documentation and a link to some codeplex samples and, well, that’s it, actually.

Documentation is boring so I downloaded and browsed through the codeplex samples instead. The one entitled “PatternDetector” looked interested, so I loaded it up. I changed the server connection string where indicated to provide the name of my StreamInsight server specified during installation:

image

Then I built the package, hit F5 and, what do you know, it worked! At least, I think it worked – here’s the output:

image

So that’s the end of my first experience with StreamInsight. In my next post, I’ll look at how to create a Input Adaptor to load events into the engine containing a spatial payload.

Follow

Get every new post delivered to your Inbox.

Join 31 other followers