Using SQL Server for reference data in a StreamInsight query
After a bit of prodding, finally getting around to finishing the blog I’d mentioned *cough* about a year ago, for using a SQL Server table as a source for reference data in a StreamInsight query. The scenario we’d like to unlock is:
- Fast moving “activity” data coming in hot (typically from a network socket)
- Slower moving (but not static) “reference” data stored in a SQL server database used to annotate and/or filter the activity stream. Changes in the database should be pulled in and update events in the reference stream.
For following along, here’s the finished project with SQL files.
This blog will cover implementing such a system from end to end:
- Defining the SQL data source, and the requisite stipulations for efficient querying
- Creating the SQL server adapter
- Configuring the sample data adapter
- Defining the activity and reference streams
- Defining the query (part 1 – with bugs)
- Defining the query (part 2 – once more with feeling!)
Defining the SQL data source
For this blog we’ll use a simple user table (and stored procedure to demonstrate that as well), containing information about names, email, etc linked back to a UserId key.
CREATE TABLE UserInformation( [UserId] int NOT NULL, [FirstName] nvarchar(32) NOT NULL, [LastName] nvarchar(64) NOT NULL, [DisplayName] nvarchar(128) NOT NULL, [Email] nvarchar(256), [LastUpdated] rowversion, CONSTRAINT [PK_UserInformation_UserId] PRIMARY KEY CLUSTERED ( [UserId] ASC ) ) INSERT INTO UserInformation (UserId, FirstName, LastName, DisplayName, Email) VALUES (1, 'Fred', 'Jones', 'Fred Jones', 'fred@email.com') INSERT INTO UserInformation (UserId, FirstName, LastName, DisplayName, Email) VALUES (2, 'Bob', 'Foobar', 'Robert Foobar', 'bobrob@email.com')
Note the bi stipulation I’m going to put on any reference data sources – that the data source must provide for a rowversion column. This will be used to identify and retrieve only changed values so we don’t have to pull down the entire data set every time we check for new values. Not terribly impactful for 2 or 3 rows of data, but will make a big difference at 20,000. We’ll start with two rows of sample data (and more later).
Creating the SQL Server Adapter
We need to create a StreamInsight input adapter that implements the following features:
- Periodically polls a SQL data source (either a table or stored procedure) that returns any rows that have been changed since the last request. This table or stored procedure is constrained to have a rowversioncolumn that provides the “here’s the new stuff” information.
- Enqueues these rows as CEP events.
| We’re only going to implement a PointInputAdapter – we’ll handle the process of converting the stream of point events into a continuous signal in the query rather than in an EdgeInputAdapter. This way we let StreamInsight handle the state maintenance, not our adapter code. |
When developing any StreamInsight adapter we follow three steps:
- Create the configuration class. This will be the runtime parameters that we will be able to pass the adapter. In here we’ll include things like the SQL connection string, table name, rowversion field name, etc.
- Create the factory. We will have a simple pass through factory, as we won’t have shared state management (i.e. no shared SQL connection between multiple adapters – the ADO.NET driver can handle connection pooling on our behalf) nor will we declaratively manage CTIs. Since we receive data only when we request it we’ll manually advance the time stream (i.e. enqueue a CTI) after receiving a batch of data from SQL Server.
- Create the adapter instance. In this case we’ll separate the StreamInsight adapter aspects from the SQL interaction by creating two classes; SqlPointInputAdapter and SqlPoller. This will be a little cleaner for separating concerns between the two (and would make it very straightforward to add support for a SqlIntervalInputAdapter if we had a data source with intervals in the database).
Creating the configuration class – SqlInputConfig
Our configuration class will expose the following properties:
| Property | Type | Description |
| PollingInterval | TimeSpan | Polling interval; how often to run the SQL command and check for new events |
| ConnectionString | string | Connection string to the data source |
| ConnectionStringName | string | If set, look for a connection string in the app.config with this name. |
| Mode | SqlInputMode | SQL input mode – what type of command will be executed to retrieve data (StoredProcedure or Table). All stored procedures and tables must have a parameter corresponding to the TimestampField. |
| TimestampField | string | The name of the field which stores the “last updated” time in the target table or stored procedure. This fields needs to be of type rowversion . |
| Command | string | The name of the table or stored procedure |
Which will give us a simple configuration class with a ToString() override for easier debugging, including the SqlInputMode enumeration:
/// <summary> /// Configuration object for the SQL poller adapter /// </summary> public class SqlInputConfig { /// <summary> /// Polling interval; how often to run the SQL command and check for new events /// </summary> public TimeSpan PollingInterval { get; set; } /// <summary> /// Connection string to the data source /// </summary> public string ConnectionString { get; set; } /// <summary> /// Connection string to the data source /// </summary> public string ConnectionStringName { get; set; } /// <summary> /// SQL input mode - what type of command will be executed to retrieve data. /// All commands must have a parameter corresponding to the TimestampField /// option (rowversion data type) /// </summary> public SqlInputMode Mode { get; set; } /// <summary> /// The name of the field which stores the "last updated" time in the target /// table or stored procedure. This fields needs to be of type rowversion /// </summary> public string TimestampField { get; set; } /// <summary> /// The name of the table or stored procedure /// </summary> public string Command { get; set; } public override string ToString() { StringBuilder sb = new StringBuilder(); sb.AppendFormat("\tConnection String: {0}\r\n", ConnectionString); sb.AppendFormat("\tMode: {0}\r\n", Mode); sb.AppendFormat("\tTimestamp Field: {0}\r\n", TimestampField); sb.AppendFormat("\tSQL Command: {0}\r\n", Command); return sb.ToString(); } } /// <summary> /// The valid SQL input mode types /// </summary> public enum SqlInputMode { /// <summary> /// Invoke a stored procedure to retrieve values /// </summary> StoredProcedure, /// <summary> /// SELECT from a table to retrieve values /// </summary> Table, }
Create the factory
In this case we’ll be using an untyped factory (which uses field descriptions rather than strong .NET types). This has the advantage in a remote deployment model of not having to deploy an assembly containing the .NET type to the StreamInsight service host. In our demo case (where we use an embedded server) it has the advantage of not introducing an additional step (SQL row –> CEP object, rather than SQL row –> .NET object via reflection –> CEP object).
Note that throughout this sample I’ll be using a variant of the logging module that I described a while back in this post. It’s been updated to use System.Diagnostics and NLog (the latter imported via the awesome NuGet package management system!).
/// <summary> /// Implementation of an adapter factory for polling changing reference /// information from a SQL Server database /// </summary> public class SqlInputAdapterFactory : IInputAdapterFactory<SqlInputConfig> { // All of our logging information will use this as the category name internal static readonly string APP_NAME = "Adapters.SqlInput"; /// <summary> /// Called by the StreamInsight engine when creating an adapter. Creates the /// appropriate adapter object. Note that this adapter self-manages CTI events. /// </summary> /// <param name="configInfo">The SqlInputConfig class specified when the stream source is created</param> /// <param name="eventShape">The event shape; only Point events will be supported by this adapter</param> /// <param name="cepEventType">Description of the event type (fields, types, etc)</param> /// <returns>StreamInsight input adapter instance</returns> public InputAdapterBase Create(SqlInputConfig configInfo, EventShape eventShape, CepEventType cepEventType) { if (eventShape != EventShape.Point) throw new ArgumentException("Only Point events are supported for SQL Input polling reference adapter"); InputAdapterBase ret = default(InputAdapterBase); switch(eventShape) { case EventShape.Point: ret = new SqlPointInputAdapter(TraceFactory.GetTracer(APP_NAME), cepEventType, configInfo); break; } return ret; } /// <summary> /// This factory maintains no shared resources, nothing to dispose /// </summary> public void Dispose() { } }
This is a very simple adapter factory implementation, simply creating a SqlPointInputAdapter given the cepEventType and configInfo objects. Since we’re going to manually advance application time after each database call we don’t need to implement the ITypedDeclareAdvanceTimeProperties interface.
Adapter Part 1 – StreamInsight
As mentioned earlier, we’re going to implement the adapter functionality in two classes – one for managing enqueueing CEP events, and the other for pulling data from SQL. The former will be provided by the SqlPointInputAdapter class, with the SqlPoller handling the nasty database stuff. In order for the SqlPointInputAdapter to receive push notifications from the SqlPoller, we’ll use an intermediary interface and payload class (allowing the poller to be shared between Point and Interval adapters if we so chose).
/// <summary> /// Interface used to provide a callback between the SqlPoller and the /// adapter instances /// </summary> internal interface IUntypedEventReceiver { /// <summary> /// Given an untyped event payload, enqueue the contents as a CEP event /// </summary> /// <returns>Whether or not the event was successfully enqueued</returns> bool HandleEvent(UntypedEventPayload evt); /// <summary> /// Given a timestamp, advance stream time (enqueue a CTI) /// </summary> /// <param name="cti"></param> void FlushEvents(DateTimeOffset cti); } /// <summary> /// Wrapper class to store key/value payloads. Note: data needs to be /// enqueued in the same order as the CepEventType ordinals in order to /// be successfully enqueued /// </summary> internal class UntypedEventPayload { /// <summary> /// Start time (receive time) of the event /// </summary> public DateTimeOffset StartTime { get; set; } /// <summary> /// Array of key value pairs (the payload) /// </summary> public KeyValuePair<string, object>[] Data { get; set; } }
We’ll start off with the basics, deriving from the PointInputAdapter abstract base class, implementing overrides for the Start, Resume and Dispose methods as well as a constructor to match the signature we set in the factory. Note that heavy lifting SQL-wise will be taken care of by the SqlPoller class.
/// <summary> /// Implementation of an untyped input adapter for StreamInsight, designed to /// retrieve reference (slow changing) information from a SQL server database /// </summary> internal sealed class SqlPointInputAdapter : PointInputAdapter, IUntypedEventReceiver { #region "Private Variables" /// <summary> /// Tracing object; passed in from the factory /// </summary> private TraceLog trace; /// <summary> /// Configuration object; passed in from the factory /// </summary> private SqlInputConfig config; /// <summary> /// The CEP event type definition (this is the type that StreamInsight expects; /// the adapter will handle mapping between the SQL type and this type) /// </summary> private CepEventType eventType; /// <summary> /// The helper class responsible for managing interaction with SQL server /// </summary> private SqlPoller poller; #endregion /// <summary> /// Create a new SQL input adapter. Note that this class is largely a wrapper around /// SqlPoller (where all of the heavy lifting happens). Note: this class self-manages /// CTI events. /// </summary> /// <param name="trace">Initialized tracing object</param> /// <param name="eventType">CEP event type definition</param> /// <param name="config">Adapter configuration</param> public SqlPointInputAdapter(TraceLog trace, CepEventType eventType, SqlInputConfig config) { this.trace = trace; this.config = config; this.eventType = eventType; // Create the SQL poller object (this handles interaction with // SQL server) this.poller = new SqlPoller(trace, eventType, config, this); } /// <summary> /// Start the adapter (initialize the poller) /// </summary> public override void Start() { this.poller.Start(); } /// <summary> /// Resume the poller (if it has been paused) /// </summary> public override void Resume() { this.poller.Resume(); } /// <summary> /// Dispose the poller /// </summary> public void Shutdown() { trace.LogInfo("Adapter is stopping - invoking shutdown"); this.poller.Dispose(); trace.LogInfo("Shutdown complete"); } /// <summary> /// This is the callback for the SqlPoller to pass back new events /// data from the data source. The adapter is responsible for /// mapping the data into a CEP event and enqueueing /// </summary> /// <returns>Whether or not the event was successfully enqueued</returns> public bool HandleEvent(UntypedEventPayload evtPayload) { // Ensure that the adapter is in the running state if (AdapterState.Stopping == AdapterState) { Shutdown(); Stopped(); } // If the adapter is in a paused state ensure that we are not raising // more events if (AdapterState.Running != AdapterState) { this.poller.Pause(); return false; } // Create a new untyped point event to hold the event payload PointEvent evt = CreateInsertEvent(); if (evt == null) { trace.LogError("Could not allocate point event!"); return false; } // Set the event timestamp to 'now' evt.StartTime = DateTimeOffset.UtcNow; // Fill in the event payload foreach (var kv in evtPayload.Data) { int ordinal = this.eventType.Fields[kv.Key].Ordinal; evt.SetField(ordinal, kv.Value); } // If debug logging is enabled for this component, trace the contents if (trace.ShouldLogDebug()) { trace.LogDebug("Raising SQL POINT input event - {0}", evtPayload.ToString()); } // If the event cannot be enqueued, release the memory and signal that // the adapter is ready to process more events (via. Ready()) if (EnqueueOperationResult.Full == Enqueue(ref evt)) { ReleaseEvent(ref evt); Ready(); return false; } return true; } /// <summary> /// Called by the SQL poller after a set of events has been received /// </summary> /// <param name="cti"></param> public void FlushEvents(DateTimeOffset cti) { // Ensure that the adapter is in the running state if (AdapterState.Stopping == AdapterState) { Shutdown(); Stopped(); } if (AdapterState.Running != AdapterState) { this.poller.Pause(); return; } // Enqueue a CTI to advance time in the reference stream this.EnqueueCtiEvent(cti); } }
This is the basic boilerplate – the real work happens in the implementation methods for the IUntypedEventReceiver interface; HandleEvent and FlushEvent.
Let’s start with HandleEvent, which goes through this sequence:
- Ensure that the adapter isn’t in a stopping state. If the adapter is in a stopping state, call the Shutdown method (shown above, which disposes the poller object) and signal the StreamInsight engine that shutdown is complete by calling Stopped.
- Ensure that the adapter isn’t in a paused state. If the adapter is not in the running state, pause the polling object and exit the function. Note that this has the effect of throwing away the event! The poller will account for this by not advancing the “last received” time (and retrieving the same set of rows on the next timer interval).
- Allocating a new untyped CEP event via CreateInsertEvent.
- Assigning the timestamp (the current time), and filling in the payload. Note that the poller is responsible for putting data into the evtPayload object in the same order as the CEP event type.
- The adapter attempts to enqueue the event into the StreamInsight engine.
Which is implemented by:
/// <summary> /// This is the callback for the SqlPoller to pass back new events /// data from the data source. The adapter is responsible for /// mapping the data into a CEP event and enqueueing /// </summary> /// <returns>Whether or not the event was successfully enqueued</returns> public bool HandleEvent(UntypedEventPayload evtPayload) { // Ensure that the adapter is in the running state if (AdapterState.Stopping == AdapterState) { Shutdown(); Stopped(); } // If the adapter is in a paused state ensure that we are not raising // more events if (AdapterState.Running != AdapterState) { this.poller.Pause(); return false; } // Create a new untyped point event to hold the event payload PointEvent evt = CreateInsertEvent(); if (evt == null) { trace.LogError("Could not allocate point event!"); return false; } // Set the event timestamp to 'now' evt.StartTime = DateTimeOffset.UtcNow; // Fill in the event payload foreach (var kv in evtPayload.Data) { int ordinal = this.eventType.Fields[kv.Key].Ordinal; evt.SetField(ordinal, kv.Value); } // If debug logging is enabled for this component, trace the contents if (trace.ShouldLogDebug()) { trace.LogDebug("Raising SQL POINT input event - {0}", evtPayload.ToString()); } // If the event cannot be enqueued, release the memory and signal that // the adapter is ready to process more events (via. Ready()) if (EnqueueOperationResult.Full == Enqueue(ref evt)) { ReleaseEvent(ref evt); Ready(); return false; } return true; }
Finally we implement the FlushEvents method to allow the poller to advance application time when all of the rows returned from SQL server have been enqueued. This goes through the same adapter state checks as above and then enqueues a CTI event.
/// <summary> /// Called by the SQL poller after a set of events has been received /// </summary> /// <param name="cti"></param> public void FlushEvents(DateTimeOffset cti) { // Ensure that the adapter is in the running state if (AdapterState.Stopping == AdapterState) { Shutdown(); Stopped(); } if (AdapterState.Running != AdapterState) { this.poller.Pause(); return; } // Enqueue a CTI to advance time in the reference stream this.EnqueueCtiEvent(cti); }
There’s the implementation of the StreamInsight piece of the adapter – now to interact with SQL server and pull out some rows.
Adapter Part 2 – SQL Server
Now for the fun (as in easy
) part – polling SQL Server. This will be a simple timer-based polling class that periodically executes a differential that asks “do you have any changed rows” (using a rowversion column to detect changes). Our class will have the following features:
- Execute SQL code on a polling timer, which can be paused and resumed (to match the adapter state)
- Ask for changed rows using a rowversion value
- Enqueue events into the adapter using the HandleEvent and FlushEvent methods we implemented above. Successful enqueue will trigger the “last rowversion” value to advance. This has the effect of re-querying results in the case we don’t successfully enqueue.
- Convert SQL rows into a form that can be passed to the adapter and converted into an appropriate UntypedEvent.
We’ll start off with a basic class shell, implementing IDisposable.
/// <summary> /// Handles periodically polling of a SQL data source looking for /// updated records /// </summary> internal sealed class SqlPoller : IDisposable { }
Now for some variables and a constructor:
#region "Private Variables" /// <summary> /// Callback interface to the adapter instance to which we will /// publish new records /// </summary> private IUntypedEventReceiver recv; /// <summary> /// Adapter configuration /// </summary> private SqlInputConfig config; /// <summary> /// Tracing object /// </summary> private TraceLog trace; /// <summary> /// CEP event definition /// </summary> private CepEventType eventType; /// <summary> /// The SQL command used to retrieve new information /// </summary> private SqlCommand sqlCmd; /// <summary> /// Timer used to periodically poll the data source /// </summary> private System.Timers.Timer pollTimer; /// <summary> /// Dictionary storing the ordinal mapping between the SQL /// rows and the CEP event type /// </summary> private Dictionary<int, string> sqlToCepMap; /// <summary> /// The rowversion value expressed as an unsigned 64-bit integer /// used to ask for "rowversions greater than this value" /// </summary> private System.UInt64 lastUpdated = 0; /// <summary> /// The ordinal of the rowversion column /// </summary> private int timestampOrdinal = -1; /// <summary> /// Synchronization object for the polling callback /// </summary> private object _lockObj = new object(); #endregion /// <summary> /// Create a new instance of the SQL poller, given a trace object, /// CEP event type definition, adapter configuration and adapter /// instance callback /// </summary> public SqlPoller(TraceLog trace, CepEventType eventType, SqlInputConfig config, IUntypedEventReceiver recv) { this.eventType = eventType; this.trace = trace; this.config = config; this.recv = recv; trace.LogInfo("Creating SQL Server polling adapter component against \r\n\t{0}", config.ToString()); // Create the requisite SQL command if (config.Mode == SqlInputMode.StoredProcedure) CreateSprocCmd(); else if (config.Mode == SqlInputMode.Table) CreateTableCmd(); // Create the type mapping InitializeTypeMapping(); }
The setup here is fairly straightforward – we capture the working variables, create an appropriately shaped SqlCommand (either a stored procedure or SELECT statement into a table), then generate the type mapping (ordinal mapping) between the SQL rows and the CEP event type description.
/// <summary> /// Create the SELECT command for direct table interaction /// </summary> private void CreateTableCmd() { sqlCmd = new SqlCommand(); sqlCmd.CommandText = String.Format("SELECT * FROM {0} WHERE {1} > @{1}", config.Command, config.TimestampField); sqlCmd.CommandType = System.Data.CommandType.Text; sqlCmd.Parameters.Add("@" + config.TimestampField, System.Data.SqlDbType.Timestamp); } /// <summary> /// Create the SQL command for invoking a stored procedure /// </summary> private void CreateSprocCmd() { sqlCmd = new SqlCommand(); sqlCmd.CommandText = config.Command; sqlCmd.CommandType = System.Data.CommandType.StoredProcedure; sqlCmd.Parameters.Add("@" + config.TimestampField, System.Data.SqlDbType.Timestamp); }
This isn’t the best SQL in the world (SELECT * is evil, and we’re not making any provision for any additional stored procedure parameters – this is why this a “starting point” adapter
). Our final initialization step will to create the mapping dictionary between the SQL ordinal and the CEP field name (since we’re doing a SELECT * in this case will get the fields in their defined order).
/// <summary> /// Initialize the SQL type to CEP type mapping dictionary (ordinal /// mapping) by invoking the lookup command in schema-only mode /// </summary> private void InitializeTypeMapping() { trace.LogDebug("Initializing type mapping.."); sqlToCepMap = new Dictionary<int, string>(); using (var conn = new SqlConnection(config.ConnectionString)) { sqlCmd.Connection = conn; sqlCmd.Connection.Open(); // Retrieve only the command schema (no data) var rdr = sqlCmd.ExecuteReader(CommandBehavior.SchemaOnly); // Iterate through the list of fields, creating a mapping from // name -> ordinal var sqlOrdinals = new Dictionary<string, int>(); for (int i = 0; i < rdr.FieldCount; i++) { var name = rdr.GetName(i); sqlOrdinals.Add(name, i); if (name == config.TimestampField) timestampOrdinal = i; } // Iterate through the CEP event type fields and create the mapping // [sql ordinal] -> [cep field name] foreach (var x in eventType.Fields) { if (!sqlOrdinals.ContainsKey(x.Key)) { trace.LogWarn("Warning: did not find CEP field {0} (ord {1}) in SQL column list\r\n\tlist of SQL fields = {2}\r\n\tlist of CEP fields = {3}", x.Key, x.Value.Ordinal, String.Join(", ", sqlOrdinals.Keys.ToArray()), String.Join(", ", eventType.Fields.Keys.ToArray())); } else { sqlToCepMap.Add(sqlOrdinals[x.Key], x.Key); } } } trace.LogDebug("Type mapping initialized with {0} fields", sqlToCepMap.Count); }
Two pieces left – the lifecycle management (start, stop, resume, pause) and the actual polling function. State management is straightforward, using the Enabled property on our System.Timers.Timer.
#region "Lifecycle Management" /// <summary> /// Start the poller (activate the timer) /// </summary> public void Start() { // Polling timer for retrieving SQL data pollTimer = new System.Timers.Timer(); pollTimer.AutoReset = true; pollTimer.Elapsed += new System.Timers.ElapsedEventHandler(DoWork); pollTimer.Interval = (int)config.PollingInterval.TotalMilliseconds; pollTimer.Enabled = true; } public void Dispose() { pollTimer.Dispose(); } internal void Resume() { pollTimer.Enabled = true; } internal void Pause() { pollTimer.Enabled = false; } #endregion
Now for the big scary function. There’s a lot going on in here, so we’ll break it down group by group.
/// <summary> /// Polling timer callback method. Polls the SQL data source to look for /// new / updated records /// </summary> private void DoWork(object sender, ElapsedEventArgs args) { // Use a fail-fast mutex to avoid simultaneous invocations of the // timer callback method. This generally only happens when debugging // this function or a communication / timeout error. if (Monitor.TryEnter(_lockObj)) { try { using (var conn = new SqlConnection(config.ConnectionString)) { // Assign the new connection to the comamnd and open sqlCmd.Connection = conn; sqlCmd.Connection.Open(); // Map the timestamp array back into a rowversion SQL parameter value var lastUpdatedBytes = BitConverter.GetBytes(lastUpdated); if (BitConverter.IsLittleEndian) Array.Reverse(lastUpdatedBytes); sqlCmd.Parameters[0].Value = lastUpdatedBytes; // Execute the sql command to get the next set of data trace.LogDebug("Retrieving SQL data from {0} cmd {1} last Updated = {2}", conn.DataSource, sqlCmd.CommandText, lastUpdated); var rdr = sqlCmd.ExecuteReader(); System.UInt64 maxTimestamp = 0; while (rdr.Read()) { // Create an untyped event payload to wrap the SQL row UntypedEventPayload evtPayload = new UntypedEventPayload(); List<KeyValuePair<string, object>> kvps = new List<KeyValuePair<string, object>>(); foreach (var i in sqlToCepMap) { var name = i.Value; var obj = rdr.GetValue(i.Key); kvps.Add(new KeyValuePair<string, object>(name, obj)); } evtPayload.Data = kvps.ToArray(); evtPayload.StartTime = DateTimeOffset.Now; // Enqueue the event if (!recv.HandleEvent(evtPayload)) { trace.LogDebug("Error: could not enqueue payload event"); return; } // Check for updated timestamp if (timestampOrdinal > -1) { byte[] tsArray = (byte[])rdr.GetValue(timestampOrdinal); if (BitConverter.IsLittleEndian) Array.Reverse(tsArray); // Convert the timestamp (8-byte array) into an unsigned 64-bit // integer for easy comparison System.UInt64 ts = BitConverter.ToUInt64(tsArray, 0); if (ts > maxTimestamp) maxTimestamp = ts; } } // After all events have been enqueued, advance stream // time by enqueueing a CTI recv.FlushEvents(DateTimeOffset.UtcNow); // Advance the last updated time (all events have been enqueued) lastUpdated = maxTimestamp; } } catch (Exception ex0) { trace.LogException(ex0, "Error in SQL retrieve"); } finally { Monitor.Exit(_lockObj); } } }
The sequence of activity in this function is:
- Line 10 – Monitor. The entire method is guarded by a fast failing Monitor object (i.e. if the monitor is already active exit immediately). As noted in the comments, this is primarily to ease in debugging. If you are debugging something in the DoWork method you won’t need to worry the timer triggering again and having two concurrent threads running). This can also come into play if the time taken in the method exceeds the timer period (which is highly unlikely for appropriate sized reference data tables, but could happen).
- Lines 14 – 18. SQL Connection. Standard boilerplate for pulling in an SQL connection from the connection pool and assigning it to the SQL command (which is only constructed once).
- Lines 21 – 24. Preparing the lastUpdated (rowversion) parameter. The SQL rowversion column consists of an 8-byte array. Rather than trying to store & compare the byte array values we instead convert them to and from an unsigned 64-bit integer for comparison purposes. Note the endian check to get bytes in the correct order.
- Lines 27 – 30. Preparing the DataReader. We next execute the SQL command by creating a DataReader for pulling back the rows. We log the SQL statement at the DEBUG level for aid in run-time diagnostics.
- Lines 34 – 45. Creating an UntypedPayload object from the data row. Using the type map that was initialized earlier we create and populate an UntypedPayload object.
- Lines 48-51. Enqueueing the UntypedPayload. Attempt to enqueue the event. If the attempt fails exit the timer callback method without advancing the lastUpdatedrowversion value.
- Lines 54-65. Checking for a new maximum timestamp value. Check to see if we have a new “biggest” rowversion value which will replace the lastUpdated value if all events are successfully enqueued.
- Line 71 – Advance Time. Finally we advance time in this stream by enqueueing a CTI.
Configuring the sample data adapter
Before we can join in our reference stream, we need some data to join against. We’ll use a slightly modified version of the codeplex sample DataGenerator adapter to do this. The codeplex sample uses a hard coded event type, whereas for our purposes it would be better to have more control over the sample data generated.
Without going into too much detail (if you want the details post a comment!
), in order to generate a stream of sample data we need to implement two classes with the proper interfaces and annotations:
- Sample data type, annotated with a custom EventGeneratorAttribute which points to a factory class.
- Factory class, which creates random sample data objects.
Here’s our sample data type:
/// <summary> /// Sample activity event. ActivityEventFactory can create /// random ActivityEvent objects /// </summary> [EventGenerator(typeof(ActivityEventFactory))] public class ActivityEvent { public int UserId { get; set; } public string Activity { get; set; } public DateTime Timestamp { get; set; } public string Status { get; set; } }
And our factory class, which will allocate new activity events with random values chosen from the hard-coded lists:
/// <summary> /// Factory for creating random activity events /// </summary> public class ActivityEventFactory : IRandomEventFactory<ActivityEvent> { private Random rand = new Random(); /// <summary> /// The range of user IDs /// </summary> public int[] userIds = new int[] { 1, 2, 3, 4 }; /// <summary> /// Range of activity names /// </summary> public string[] activityNames = new string[] { "Login", "Logout", "BuyStuff", "SellStuff" }; public string[] statusNames = new string[] { "Success", "Fail", "FileNotFound" }; /// public int GetRandomUser() { return userIds[rand.Next(userIds.Length - 1)]; } public string GetRandomActivity() { return activityNames[rand.Next(activityNames.Length - 1)]; } public string GetRandomStatus() { return statusNames[rand.Next(statusNames.Length - 1)]; } public ActivityEvent GetRandomEvent() { return new ActivityEvent() { UserId = GetRandomUser(), Activity = GetRandomActivity(), Timestamp = DateTime.UtcNow, Status = GetRandomStatus() }; } }
The key restriction for our next test is that we will only generate random user ID values of 1, 2, 3 and 4. So when we go to create more SQL user records we won’t get any activity records corresponding to other user ids.
Defining the activity and reference streams
Now for the good part – actually using the SQL adapter we wrote in the previous sections. We’ll start with a simple embedded host StreamInsight console application using NLog as the tracing implementation.
Creating a Console Application
We’ll start with a basic shell, which configures NLog to use the default configuration (as defined in app.config) and creates a StreamInsight host.
class Program { static void Main(string[] args) { // Set up NLog var factory = new TraceWrapperNLogFactory(); TraceFactory.RegisterTraceImplementation(factory); using (Server cepServer = Server.Create("Default")) { try { // Host the queries in the "samples" application var cepApp = cepServer.CreateApplication("samples"); } catch (Exception ex0) { Console.WriteLine("Error: " + ex0.ToString()); } } } }
Note that you’ll need to change the StreamInsight instance name if you’re not using “Default”.
In our app.config we’ll add the following entry to allow NLog to use an external configuration file, which will allow dynamic refresh of the logging configuration at runtime. This will come in handy when we start working with the queries in the next section. We’ll also define a connection string for use in defining our SQL data source.
<?xml version="1.0"?> <configuration> <configSections> <section name="nlog" type="NLog.Config.ConfigSectionHandler, NLog"/> </configSections> <connectionStrings> <add name="TestDatabase" connectionString="Data Source=(local);Initial Catalog=RefDatabase;Integrated Security=True"/> </connectionStrings> <nlog> <!-- Include the debugging version of the logging configuration --> <include file="${basedir}/nlog.config"/> </nlog> <startup> <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/> </startup> </configuration>
Our nlog.config file is fairly detailed – we want file and coloured console output with individual log levels for each of our defined components. In this case we have three adapter components – Adapters.SqlInput, Adapters.RandomData, and Adapters.OutputTracer.
<?xml version="1.0" encoding="utf-8" ?> <nlog autoReload="true" xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <!-- Output targets. As the debugging configuration, we will output to console and file. --> <targets> <!-- Log all events to console --> <target name="console" xsi:type="ColoredConsole" layout="${processtime}| ${logger}:${level} - ${message}" /> <!-- Log all events to rolling file --> <target name="file" xsi:type="File" lineEnding="Default" autoFlush="true" keepFileOpen="false" concurrentWrites="false" createDirs="true" fileName="${basedir}/sampleLog_${shortdate}.log" > <layout xsi:type="CSVLayout"> <column name="time" layout="${longdate}" /> <column name="level" layout="${level}"/> <column name="logger" layout="${logger}"/> <column name="message" layout="${message}" /> </layout> </target> </targets> <!-- Rules; routing and matching --> <rules> <!-- Individual component log configuration --> <logger name="Adapters.OutputTracer" minlevel="Info" writeTo="file,console" /> <logger name="Adapters.SqlInput" minlevel="Debug" writeTo="file,console" /> <logger name="Adapters.RandomData" minlevel="Info" writeTo="file,console" /> </rules> </nlog>
Defining the Activity Stream
Let’s start by defining our activity stream. We need to define an input configuration (of type GeneratorConfig) set up an activity name then define the stream.
// Host the queries in the "samples" application var cepApp = cepServer.CreateApplication("samples"); // Create a data stream of random activity events var inputCfg = new GeneratorConfig() { CtiFrequency = 1, EventInterval = 1000, EventIntervalVariance = 250 }; var activityStreamName = "activityStream"; var activityStream = CepStream<ActivityEvent>.Create(cepApp, activityStreamName, typeof(GeneratorFactory), inputCfg, EventShape.Point);
Here’s one of the magical incantations that will make this whole thing work – defining the Advance Time Import settings that says “time advances at the pace of this stream”. For more details on what this is all about, see this blog post.
// We want time to advance as new activity updates arrive. In order to do this // we need to link the reference stream's CTIs to the activity stream via a // time import setting var timeImportSettings = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings(activityStreamName), AdvanceTimePolicy.Adjust);
There we go – activity stream is good to go. Next step, let’s get the reference stream defined.
Defining the Reference Stream
Same process for defining the reference stream to start with – defining the configuration for the reference stream and the stream. Starting off we’ll use table mode (i.e. SELECT * FROM table WHERE lastUpdated > @lastUpdated) with a polling interval of 30 seconds and the LastUpdated timestamp field we defined at the beginning of the post.
// Create a reference stream of database updates var connString = ConfigurationManager.ConnectionStrings["TestDatabase"]; var referenceCfg = new SqlInputConfig() { ConnectionString = connString.ConnectionString, Mode = SqlInputMode.Table, Command = "UserInformation", PollingInterval = TimeSpan.Parse("00:00:30"), TimestampField = "LastUpdated", }; // Use the time import settings to link CTI values from the activity // stream var referenceStream = CepStream<UserEvent>.Create( "referenceStream", typeof(SqlInputAdapterFactory), referenceCfg, EventShape.Point, timeImportSettings);
Note that the timeImportSettings definition from the previous section is included in the reference stream definition. Now the reference stream is a sequence of point events. In order to join with the activity stream they need to have a temporal overlap. To do this, what we’ll do is extend the sequence of point events into a signal using AlterEventDuration and ClipEventDuration.
// Convert the reference point stream into a reference signal
var referenceSignal = referenceStream
.AlterEventDuration(e => TimeSpan.MaxValue)
.ClipEventDuration(referenceStream, (e1, e2) => (e1.UserId == e2.UserId));
Fun with errors – Advance time import stream does not exist
To illustrate a common issue when working with the advance time import settings let’s do something “normal” and tie the reference stream to a trace output adapter to see some values flowing through.
var outputCfg = new TracerConfig() { DisplayCtiEvents = false, SingleLine = true, TracerKind = TracerKind.Console, TraceName = "OUTPUT" }; var q = referenceStream.ToQuery(cepApp, "enrichedData", "", typeof(TracerFactory), outputCfg, EventShape.Point, StreamEventOrder.FullyOrdered);
Let’s go ahead and run this and see the really fun error that crops up!
Error: Microsoft.ComplexEventProcessing.ManagementException: Advance time import stream
'activityStream' does not exist. --->
Microsoft.ComplexEventProcessing.Compiler.CompilerException: Advance time import
stream 'activityStream' does not exist. |
What do you mean activity stream does not exist? I just defined it!!! Ok, calm breathing. Here’s what’s going on with this error – while the activity stream is defined, it’s not active yet. How about we go and define that as a live query and start it?
Error: Microsoft.ComplexEventProcessing.ManagementException: Advance time import stream
'activityStream' does not exist. --->
Microsoft.ComplexEventProcessing.Compiler.CompilerException: Advance time import stream
'activityStream' does not exist. |
Same error!! But now the stream exists – why does this happen. What’s going on under the hood here is that the activity stream has not been PHYSICALLY JOINED with the reference stream (i.e. the activity events are not physically meshed in with the reference stream – so the engine doesn’t “see” the activity stream).
In the next section we’ll join the two streams to create a query that will run happily without triggering this area.
Defining the query (part 1 – with bugs)
With the streams in place, let’s go ahead and create a query that pulls the streams together and dumps out some joined events.
// Join the two streams to enrich activity data with user details var enrichedStream = from a in activityStream join r in referenceStream on a.UserId equals r.UserId select new { UserId = a.UserId, Activity = a.Activity, Status = a.Status, Name = r.DisplayName }; // Pump these to the console for visibility var outputCfg = new TracerConfig() { DisplayCtiEvents = false, SingleLine = true, TracerKind = TracerKind.Console, TraceName = "OUTPUT" }; var q = enrichedStream.ToQuery(cepApp, "enrichedData", "", typeof(TracerFactory), outputCfg, EventShape.Point, StreamEventOrder.FullyOrdered); q.Start();
There’s an insidious bug in this code. It’s kind of sneaky – hint, it’s on line 4. Rather than come out and state the error (which you’ve probably noticed by now) I’m going to walk through the process of debugging this one. So we run the query looking for output events which are tagged with OUTPUT. Running the application we wait for the SQL data to get pulled in, and wait a little bit more.
00:00:30.0391| Adapters.SqlInput:Debug - Retrieving SQL data from (local) cmd SELECT *
FROM UserInformation WHERE LastUpdated > @LastUpdated last Updated = 0 00:00:30.0406| Adapters.SqlInput:Debug - Raising SQL POINT input event -
Microsoft.WindowsAzureCat.BlogSamples.Adapters.UntypedEventPayload 00:00:31.0143| Adapters.RandomData:Debug - Generated Data INSERT - Event: UserId:3, Activity:Logout, Timestamp:8/30/2011 5:59:19 PM, Status:Success, 00:00:32.0089| Adapters.RandomData:Debug - Generated Data INSERT - Event: UserId:3, Activity:Login, Timestamp:8/30/2011 5:59:19 PM, Status:Fail, |
Why is there no output?? Let’s pull in the event flow debugger to figure out what we did wrong. Start up an event flow debugger session and capture a trace (see this blog post for more details). Here’s a screenshot of a trace I captured when I made this mistake when developing the sample.
Look at the startTime and endTime for the reference event – this is a Point event not an Interval or Edge event! When we joined the streams we joined the reference stream not the reference signal. Let’s try this again:
// Join the two streams to enrich activity data with user details var enrichedStream = from a in activityStream join r in referenceSignal on a.UserId equals r.UserId select new { UserId = a.UserId, Activity = a.Activity, Status = a.Status, Name = r.DisplayName };
There we go!
Defining the Query (Part 2 – Once more with feeling!)
Now that we have our query correctly defined let’s play with the NLog settings and SQL database to see this in action. We’ll start with:
- Two records in the SQL database (user ID 1 & 2).
- The following rules configuration in nlog.config
<rules> <!-- Individual component log configuration --> <logger name="Adapters.OutputTracer" minlevel="Info" writeTo="file,console" /> <logger name="Adapters.SqlInput" minlevel="Debug" writeTo="file,console" /> <logger name="Adapters.RandomData" minlevel="Info" writeTo="file,console" /> </rules>
Now let’s go ahead and run the query. Your output with look slightly different (it is after all a random data generator
). There won’t be any output for at least 30 seconds (until the reference adapter activates)
00:00:00.0197| Adapters.SqlInput:Debug - Initializing type mapping.. 00:00:00.0334| Adapters.SqlInput:Debug - Type mapping initialized with 5 fields Press <enter> to close application 00:00:30.0362| Adapters.SqlInput:Debug - Retrieving SQL data from (local)
cmd SELECT * FROM UserInformation WHERE LastUpdated > @LastUpdated last Updated = 0 00:00:30.0362| Adapters.SqlInput:Debug - Raising SQL POINT input event -
Microsoft.WindowsAzureCat.BlogSamples.Adapters.UntypedEventPayload 00:00:30.0377| Adapters.SqlInput:Debug - Raising SQL POINT input event -
Microsoft.WindowsAzureCat.BlogSamples.Adapters.UntypedEventPayload OUTPUT: Point at 06:26:35.584: Logout Fred Jones Success 1 OUTPUT: Point at 06:26:36.831: Logout Fred Jones Success 1 OUTPUT: Point at 06:26:38.003: Login Fred Jones Fail 1 OUTPUT: Point at 06:26:41.089: BuyStuff Fred Jones Fail 1 OUTPUT: Point at 06:26:43.854: Login Fred Jones Fail 1 OUTPUT: Point at 06:26:44.881: Login Fred Jones Success 1 OUTPUT: Point at 06:26:47.114: Login Fred Jones Success 1 OUTPUT: Point at 06:26:48.296: BuyStuff Robert Foobar Success 2 OUTPUT: Point at 06:26:49.232: Logout Robert Foobar Fail 2 OUTPUT: Point at 06:26:50.381: Login Robert Foobar Success 2 OUTPUT: Point at 06:26:51.559: Logout Fred Jones Success 1 OUTPUT: Point at 06:26:52.773: BuyStuff Fred Jones Success 1 |
Note that we really should have a ToString() override on UntypedEventPayload
Now let’s go ahead and update one of the rows in the database:
UPDATE UserInformation SET DisplayName = 'Bob Notsofoobar' WHERE UserId = 2
And a few seconds later we see the changes reflected:
00:02:23.0110| Adapters.SqlInput:Debug - Retrieving SQL data from (local) cmd
SELECT * FROM UserInformation WHERE LastUpdated > @LastUpdated last Updated = 2007 00:02:23.0114| Adapters.SqlInput:Debug - Raising SQL POINT input event -
Microsoft.WindowsAzureCat.BlogSamples.Adapters.UntypedEventPayload OUTPUT: Point at 06:28:27.913: Login Bob Notsofoobar Fail 2 OUTPUT: Point at 06:28:28.932: BuyStuff Bob Notsofoobar Success 2 |
Rock on! We have no changes in SQL dynamically propagating into the StreamInsight reference stream. If you want to see the raw input modify the nlog.config to set the RandomData adapter to DEBUG.
Conclusion
So, wrapping it all up. Some key points to consider:
- Reference data often has no inherent temporal property (i.e. a timestamp). Since StreamInsight only works with data that has some temporal property we promote the implicit temporal context – that reference data is valid from the time that I first see it, until it changes.
- Reference data streams can be used to enrich and filter hot activity streams.
- With the enclosed sample it’s relatively easy to pull in reference streams from SQL Server, provided you can apply some basic restrictions (i.e. the rowversion column).
Other posts and useful information referenced in this post:
- StreamInsight: Where did that query result come from (using the Event Flow Debugger in LINQPad)?
- StreamInsight: Synchronizing slow-moving reference streams with fast-moving data streams (time import)
- StreamInsight and reference data (lists, databases, etc)
In case you missed it at the beginning of the post, here are the sample files and projects.
3 Comments
Leave a Reply
You must be logged in to post a comment.
















Pingback: Distributed Weekly 120 — Scott Banwart's Blog
finally
{
Monitor.Exit(_lockObj);
}
this code resets the maxTimestamp variable and it polls the same data over and over again, i think it can be changed with lock () {}.
The use of Monitor and falling through without updating the timestamp is entirely deliberate to implement this behavior:
1. If the polling and enqueue operation fails (timeout, queue pushback, etc), the adapter will not advance the time stream. This way you don’t miss updates by polling the same data over and over again until you successfully enqueue data. The timestamp only advances if new data shows up and is successfully enqueued.
2. The Monitor.TryEnter is there, as per the comments, to avoid multiple timer delegates from entering the method (it’s really only there to make debugging easier as otherwise you have to disable and re-enable the timer to be able to step through lines of code in the delegate without additional timer triggers stacking up).