StreamInsight – Entity Framework Type Point Output Adaptor
Introduction
This posting takes a common scenario for SQL Server 2008 R2 – StreamInsight applications. Low volume event, such as alarm data, are correlated into meaningful results and stored into a SQL Server database for post analysis and reporting. In this paper we will create an output adaptor for a given complex event processing (CEP) application which will deliver the payload events into a SQL Server database. Note, there are output adaptors for StreamInsight that do this already, but none of them use Entity Framework. I have found that ADO.NET Entity Framework can simplify the creation of event types and the storage of results.
The output adaptor presented in this post is a typed point output adaptor which is a subclass of the TypedPointOutputAdapter base class. The typed point adaptor can be easily converted to an interval adaptor if so desired. This write-up will not delve into the details of adaptor development, but rather shows an implementation using Entity Framework which the developer can use for his own purposes. There are numerous adaptor samples and documentation that ship with StreamInsight, please read those more information.
Output Adaptor – Particulars
First let’s discuss a couple of particulars about building output adaptors that affected the way Entity Framework was used. We are creating typed adaptor which operates as a sink for a fixed payload format, thus the fields and their types are known in advance. This is a good thing, because we can use the Entity Framework Data Model Designer to create these payload types (Entity Types) and have it create the CLR types automatically when we generate the model. The entity types within the model would represent each property within payload format. So we decided to use a model first approach with Entity Framework and create our event types using the graphical model designer.
There are a few restrictions for creating our event types. Some of the more important factors that affected this implementation are enumerated below.
· The event payload, including the number of fields and their types is known in advance.
· A .NET class or structure is used to represent the fixed payload.
· Only public fields and properties can be used for payload fields.
· Only scalar and elementary CLR types can be used for payload fields.
· Fields cannot be modified by using custom attributes.
· Property fields cannot be marked Virtual
Implementation
The scenario used for this demonstration was taken from A Hitchkiker’s Guide to StreamInsight Queries.
Creating the ‘TollCountPoint’ Event Type – Use EF T4 Template
As mentioned above, the Entity Framework Data Model Designer was used to create the payload types. The caveat with that approach is that the default ADO.NET Entity Data Model creates entity types which violate some of the ‘important factors’ enumerated above, one of which was the addition of custom attributes. Fortunately there is an T4 template available with Visual Studio 2010 called the ADO.NET C# POCO Entity Generator. This template can be added to the project to automate the creation of event types that are consumable by StreamInsight event sinks, with one extra twist. The properties for the object are marked virtual, which violates a restriction on event types. This can be remedied by modifying line 56 of the T4 template to remove the insertion of the virtual keyword as . In the example below a DummyModel project was create, the TollModel.tt file was modified to remove the PropertyVirtualModifier as shown below.
<#=PropertyVirtualModifier(Accessibility.ForProperty(edmProperty))#> <#=code.Escape(edmProperty.TypeUsage)#> <#=code.Escape(edmProperty)#>
Replace with
<#=Accessibility.ForProperty(edmProperty)#> <#=code.Escape(edmProperty.TypeUsage)#> <#=code.Escape(edmProperty)#>
Figure 1 TollModel
With point events we are interested in both the payload and the start time for the event. In the example below we took the TollCount event definition from the Hitchhiker’s guide and enhanced it to include an Id field and a StartTime field, the original payload property is the Count field. In the output adaptor we will set the StartTime property to the PointEvent.StartTime property of the event. The Id field corresponds to an Identity column place on the SQL table, it can be removed from the model if desired.
Below you can see the CLR event type and the associated database schema.
Figure 2 TollCountPoint from Entity Framework Designer
public partial class TollCountInterval { public int Id { get; set; } public Nullable<System.DateTime> StartTime { get; set; } public Nullable<System.DateTime> EndTime { get; set; } public int Count { get; set; } }
CREATE TABLE TollCountPoint ( Id INT IDENTITY PRIMARY KEY NOT NULL, StartTime DATETIME NOT NULL, [Count] INT NOT NULL )
EFPointOutput Adaptor
The EFPointOutput class is implemented as a typed point output adapter and thus inherits from class TypedPointOutputAdaptor. This paper will solely illustrate the code and summarize some important implementation details. Describing the nuances of building an output adaptor is beyond the scope of this post, the MSDN article entitled Creating Input and Output Adapters does a great job. In fact, the EFPointOutput code was adapted from the Hitchhiker’s guide sample code, thus the principles should be familiar to the reader.
ITypedOutputAdapterFactory Implementation
The DataBaseOuputAdapterFactory class mirrors sample code available from the StreamInsight documentation. One difference is the configuration data, DataOutputAdaptorConfig. The TraceOutput property enables tracing and the StartTimeFieldName is the property name in the TollCountInterval class where the event start date is to be saved. We will use a Lambda Expression to set this property after an insert event is dequeued.
public class DataOutputAdaptorConfig { public bool TraceOutput { get; set; } public string StartTimeFieldName { get; set; } } public class DataBaseOutputAdapterFactory : ITypedOutputAdapterFactory<DataOutputAdaptorConfig> { public OutputAdapterBase Create<TPayload>(DataOutputAdaptorConfig configInfo, EventShape eventShape) { OutputAdapterBase adapter = default(OutputAdapterBase); switch (eventShape) { case EventShape.Point: adapter = new EFPointOutput<TPayload>(configInfo); break; case EventShape.Interval: adapter = new EFIntervalOutput<TPayload>(configInfo); break; default: throw new Exception(); } return adapter; } public void Dispose() { } }
The C# code for the EFPointOutput adaptor is shown below; the fundamentals for the implementation have been taken from StreamInsight sample documentation.
Below you will find an enumeration of some specifics regarding the customizations that make this adaptor unique.
- ‘getStartTime’ is a compiled Lambda expression of the get accessor for the StartDate property of the TPayload.
- ‘setStartTime’ is the compiled Lambda expression of the set accessor for the StartDate property of the TPayload. The adaptor will record the start time into the SQL database.
- ‘MH.CreateContext’ is a helper function which returns the ObjectContext for the entity data model. This method can be replace by an instantiation of the context for the model.
public class EFPointOutput<TPayload> : TypedPointOutputAdapter<TPayload> { private bool _traceOutput; private Func<TPayload, object> getStartTime; private Action<TPayload, object> setStartTime; public EFPointOutput(DataOutputAdaptorConfig configInfo) { Type t = typeof(TPayload); PropertyInfo pi = t.GetProperty(configInfo.StateTimeFieldName); getStartTime = (Func<TPayload, object>)pi.GetProperty<TPayload>(); setStartTime = (Action<TPayload, object>)pi.SetProperty<TPayload>(); _traceOutput = configInfo.TraceOutput; Trace.Listeners.Add(new ConsoleTraceListener()); } public override void Start() { ConsumeEvents(); } public override void Resume() { ConsumeEvents(); } protected override void Dispose(bool disposing) { if (disposing) { Trace.Flush(); } base.Dispose(disposing); } private void ConsumeEvents() { PointEvent<TPayload> currentEvent = default(PointEvent<TPayload>); try { using (ObjectContext dc = MH.CreateContext()) { while (true) { if (AdapterState.Stopping == AdapterState) { Trace.Flush(); Stopped(); return; } if (DequeueOperationResult.Empty == Dequeue(out currentEvent)) { Ready(); return; } if (currentEvent.EventKind == EventKind.Insert) { setStartTime(currentEvent.Payload, currentEvent.StartTime.Date); dc.AddObject(typeof(TPayload).Name, currentEvent.Payload); dc.SaveChanges(); Trace.WriteLineIf(_traceOutput, string.Format("Insert Event.... {0}", getStartTime(currentEvent.Payload))); } else Trace.WriteLineIf(_traceOutput, string.Format("...CTI: {0}", currentEvent.StartTime)); ReleaseEvent(ref currentEvent); } } } catch (AdapterException e) { Trace.WriteLineIf(_traceOutput, "ConsumeEvent - " + e.Message + "n" + e.StackTrace); } } }
Shown below are two extension methods of PropertyInfo which enable the get/set of properties of a generic type T. The methods compile a Lambda expression and thus eliminate the penalty of using reflection to get/set properties of a .NET object.
public static Func<T, object> GetProperty<T>(this PropertyInfo propInfo) { var prm = Expression.Parameter(propInfo.DeclaringType, "prm"); var prop = Expression.Property(prm, propInfo); var con = Expression.TypeAs(prop, typeof(object)); return (Func<T, object>)Expression.Lambda(con, prm).Compile(); } public static Action<T, object> SetProperty<T>(this PropertyInfo propInfo) { var prm = Expression.Parameter(propInfo.DeclaringType, "i"); var prop = Expression.Parameter(typeof(object), "a"); var call = Expression.Call( prm, propInfo.GetSetMethod(), Expression.Convert(prop, propInfo.PropertyType)); return (Action<T, object>)Expression.Lambda(call, prm, prop) .Compile(); }
Client
A small snippet of the client code is shown below, it mimics what can be seen in the HelloTollTuturial StreamInsight sample code. In this example we create a TypedPointOutputAdapter with a payload of TollCountPoint.
var tQ = from w in tStream.TumblingWindow(TimeSpan.FromMinutes(3), HoppingWindowOutputPolicy.ClipToWindowEnd) select new TollCountPoint { Id = 0, StartTime = DateTime.Now, Count = (int)w.Count() }; Query tQuery1 = tQ.ToQuery( tApp, "HelloTollTutorial", "Hello Toll query", typeof(DataBaseOutputAdapterFactory), new DataOutputAdaptorConfig { TraceOutput = true, StateTimeFieldName = "StartTime", }, EventShape.Point, StreamEventOrder.FullyOrdered);
In Closing
In this post we looked at using Entity Framework to streamline the writing of a typed point output adaptor in which event data is inserted into SQL Server and also as a mechanism to create valid StreamInsight event types. This adapter is targeted for those for low volume event streams; without very careful I/O tuning and use of techniques such as BULK INSERT or SQL Service Broker, performing high-speed data insert into SQL Server is generally impractical. The code in this blog can easily be retrofitted to create a typed interval output adaptor. There may be future updates to this blog per team review and comments.
Reviewers: Curt Peterson, Mark Simms
One Comment
Leave a Reply
You must be logged in to post a comment.














Pingback: Distributed Weekly 105 — Scott Banwart's Blog