DevTalk.net

ActiveMesa's software development blog. MathSharp, R2P, X2C and more!

Event Brokers in .Net

with 7 comments

Introduction

As soon as software systems become reasonably complex, it gets very hard to organize things. We’re lucky to have sorted out the problem of component creation (with Inversion of Control and Dependency Injection), but when it comes to reacting to events, the framework give us very few organizational hints. As a consequence, people naturally create various infrastructural solutions for organized event handling. This is what we’re going to discuss in this article.

Visual Studio 2010 Solution Icon Get Source Code Here

Problem statement

For the vast majority of developers, the task of handling events either doesn’t exist, or exists ‘in full force’ but in a very predictable key – for example when someone handles a WinForms button_Click event.

That’s all well and good, but as soon as as things get complex, things break down. Here’s an (albeit synthetic) example – let’s try modeling the interaction between a football coach and player:

public class FootballPlayer
{
  public string Name { get;set; }
  public void Score()
  {
    var e = Scored;
    if (e != null) e(this, new EventArgs());
  }
  public event EventHandler Scored;
}
public class FootballCoach
{
  public void Watch(FootballPlayer p)
  {
    p.Scored += (_, __) =>
    {
      Console.WriteLine("I'm happy that {0} scored", p.Name);
    };
  }
}

In the above example, subscription and notification works just fine. But what if we do this?

var p2 = c.Clone(); // deep copy :)
p.Score();

The issue here is that when you copy an object (either via MemberwiseClone() or via deep copy using, say, BinaryFormatter), all subscriptions will be lost.

Now, truth be told, subscriptions can be restored by hand… or, I don’t know, you can try to abuse Expression<T> and the like. But that’s really only part of the problem.

The next issue is that, one day, you’re going to wish that your objects subscribed to events automatically. For example, as soon as a football player walks out onto the field, the coach starts observing him. Same goes for unsubscribing, too. As a consequence, you might end up with the following:

class FootballCoach
{
  public FootballCoach(FootballPlayer[] players)
  {
    foreach (var p in players) {
      p.EntersField += new EventHandler(StartWatchingPlayer);
      p.LeavesField += new EventHandler(StopWatchingPlayer);
    }
  }
}

And so on – in each StartXxx you would have to subscribe, and in each EndXxx you would unsubscribe. But even this is not all.

Okay, so now imagine that your system contains many such objects. And they all send messages to one another. If we try to manage the subscriptions with the += operator (as we’re used to), we’ll get really crazy coupling as well as virtually untestable code (event testing is a pain in any case).

And finally, let us not forget the whole ‘configuration’ issue. Because sometimes it makes sense to receive events of a specific type regardless of who sent them. For example, the coach doesn’t really care who is swearing at him – a player, the other coach, or the referee. Also, beleive it or not, sometimes it’s beneficial to perform various manipulations related to event handling, such as batching events in blocks of 100 and handling them only on Friday 13th during a full moon. As you may have guessed, the simple event infrastructure is poorly suited to handle these sorts of demands.

Simple Event Broker

So given the requirements above, a sane developer would start writing something called an Event Arggregator or Event Broker – a separate component that can help us manage the madness of events. It all starts with a class similar to the following:

public class EventBroker
{
  private MultiDictionary<string, Delegate> subscriptions =
    new MultiDictionary<string, Delegate>(true);
  public void Publish<T>(string name, object sender, T args)
  {
    foreach (var h in subscriptions[name])
      h.DynamicInvoke(sender, args);
  }
  public void Subscribe<T>(string name, Delegate handler)
  {
    subscriptions.Add(name, handler);
  }
}

Thread safety notwithstanding, we got ourselves a broker that can manage subscriptions. Of course, you’re not getting any QoS (Quality of Service) here, and any aggregation/transformation logic related to events has to be done by hand, but we’re already improving on the original situation – for example, by using name as the classifier above, we got ourselves a situation in which one class can subscribe a handler for several different events.

So, with this implemented, our football player doesn’t publish events anymore:

public class FootballPlayer
{
  private readonly EventBroker broker;
  public string Name { get; set; }
  public FootballPlayer(EventBroker broker)
  {
    this.broker = broker;
  }
  public void Injured()
  {
    broker.Publish("LeavingField", this, new EventArgs());
  }
  public void SentOff()
  { // event args can be different for this one
    broker.Publish("LeavingField", this, new EventArgs());
  }
}

And the football coach now subscribes to events via the broker:

public class FootballCoach
{
  private readonly EventBroker broker;
  public FootballCoach(EventBroker broker)
  {
    this.broker = broker;
  }
  public void Watch(FootballPlayer player)
  {
    broker.Subscribe<EventArgs>("LeavingField",
                                new EventHandler(PlayerIsLeavingField));
  }
  public void PlayerIsLeavingField(object sender, EventArgs args)
  {
    Console.WriteLine("Where are you going, {0}?",
                      (sender as FootballPlayer).Name);
  }
}

In the above, we’re relying on polymorphism and the idea that all events, being good citizens, inherit from EventArgs. Strong typing is not critical here because you can always do a type case. And here’s how you could use it all in a client app:

var uc = new UnityContainer();
uc.RegisterType<EventBroker>(
  new ContainerControlledLifetimeManager());
var p = uc.Resolve<FootballPlayer>();
p.Name = "Arshavin";
var c = uc.Resolve<FootballCoach>();
c.Watch(p);
p.Injured();
p.SentOff();

One thing that you should note is that events got effectively replaced by messaging. If, like me, you work with frameworks such as NServiceBus, this might appear for you to be a very natural metamorphosis.

In the above example, the broker is registered as a singleton, so both the the coach and the player receive the same instance. What I’m hinting at is that in static scenarios, when you know in advance who subscribes to what, it makes more sense to describe subscriptions declaratively. We’ll get to that in a moment.

Issues

The solution presented here is not final for a number of reasons. First, it’s really not a good idea to have direct coupling between the component and the broker – this implies that you need to splice the broker through domain classes and use it explicitly. As we’ll see in the next section, this problem is easy to resolve.

The second problem, which I’ve already mentioned, is that event handling logic currently only uses one criterion – a string literal that acts as a ‘classifier’. To limit event processing logic thus is somewhat unwise, especially with the pressent of such a powerful instrument as LINQ.

Reactive Extensions Event Broker

Okay, so in the previous example I described an event broker implemented with plain C# by switching from events to message passing, leaving behind the event keyword and all the problems which typically accompany it. Now, let us try to exploit a library solution. We’re going to try and build an event broker using Reactive Extensions, which should allow us to apply complex selection logic to events, and would also simplify unsubscriptions via its IDisposable tie-in.

Subscription

To start with, we’ll begin by looking at a subscription not as some KeyValuePair, but rather an IDisposable, with the understanding that the subscription only exists while the subscriber exists, and that when the subscriber dies (e.g., gets disposed), it makes sense to unsubscribe from all events it was monitoring. Consenquently, we write the simplest possible implementation:

private class Subscription : IDisposable
{
  private readonly EventBroker broker;
  public IObserver<EventArgs> Subscriber { get; private set; }
  public Subscription(EventBroker broker, IObserver<EventArgs> subscriber)
  {
    this.broker = broker;
    this.Subscriber = subscriber;
  }
  public void Dispose()
  {
    broker.Unsubscribe(Subscriber);
  }
}

This is a private class that lives inside the broker. All it does is keep a reference to the subscriber. The subscription itself is stored in the broker (coming up!) and is served to the subscriber just in case the subscriber wants to die at a later stage.

The Broker

Our new Rx-driven broker implements IObservable<T>, which lets the subscriber perform various operations right on the broker before subscribing to anything. This may sound a bit surreal, but if you think of LINQ as a generator of some proxy between all of broker’s objects and the ones that we actually need, it starts to make more sense.

So, our broker has 2 fields – the list of subscribers (yes, a list, and not a HashSet) and a ReaderWriterLockSlim just in case the broker is used from several threads.

class EventBroker : IObservable<EventArgs>
{
  private readonly List<Subscription> subscribers = new List<Subscription>();
  private readonly ReaderWriterLockSlim myLock = new ReaderWriterLockSlim();
  ...
}

The broker has three methods. The first – Subscribe() – lets anyone subscribe and manipulate with LINQ a push-collection of events that come from this broker. Each subscription forces a check on the absence of existing subscriptions, which makes messy code even more messy:

public IDisposable Subscribe(IObserver<EventArgs> subscriber)
{
  Subscription sub = new Subscription(this, subscriber);
  myLock.EnterUpgradeableReadLock();
  try
  {
    if (!subscribers.Any(s => s.Subscriber == subscriber))
    {
      myLock.EnterWriteLock();
      try
      {
        subscribers.Add(sub);
      }
      finally {
        myLock.ExitWriteLock();
      }
    }
  } finally
  {
    myLock.ExitUpgradeableReadLock();
  }
  return sub;
}

As you can see, the return value as dictated by IObservable<T> has a type of IDisposable – that’s the reason why we had to create a separate Subscription class. The purpose of this is simple – if you need to unsubscribe from the event stream, you can just call Dispose().

Here’s what unsubscribing looks like:

public void Unsubscribe(IObserver<EventArgs> subscriber)
{
  myLock.EnterWriteLock();
  try
  {
    subscribers.RemoveAll(s => s.Subscriber == subscriber);
  }
  finally
  {
    myLock.ExitWriteLock();
  }
}

Nothing unusual here. You can call this method yourself, but it’s also called by Subscription during the moment of deletion. Whichever works for you.

Finally, the third method is the one to publish something.

public void Publish<T>(T args) where T : EventArgs
{
  myLock.EnterReadLock();
  try
  {
    var subs = subscribers.ToArray();
    foreach (var s in subs)
      s.Subscriber.OnNext(args);
  }
  finally { myLock.ExitReadLock(); }
}

There is a tiny hack in the above code. Can you see where? The issue above is that OnNext() can, theoretically, cause a destruction of a subscription, utterly invalidating the subscribers list. Which is why we make a copy.

Domain Objects

So, coming back to our ‘coach and player’ example. The player, unfortunately, continues to keep a reference to the broker. It uses EventBroker.Publish() where necessary to inform everyone that he has just scored a goal:

class FootballPlayer
{
  public string Name { get; set; }
  [Dependency]
  public EventBroker EventBroker { get; set; }
  public void Score()
  {
    Console.WriteLine("{0} scored!!!", Name);
    EventBroker.Publish(new GenericEventArgs(this, Name));
  }
}

As for the coach, he also has a dependency on the broker. In fact, he may as well take it as a constructor parameter (via DI, perhaps?) and subscribe to events then and there. Of course, since we are using Rx, we are at liberty to use LINQ to filter and manipulate the event stream.

class FootballCoach
{
  private readonly EventBroker broker;
  public FootballCoach(EventBroker broker)
  {
    broker.OfType<GenericEventArgs>().Subscribe(args =>
      Console.WriteLine("Well done, {0}!", args.Data));
  }
}

Just to be complete, here is the event argument type:

class GenericEventArgs : EventArgs
{
  public GenericEventArgs(object sender, string data)
  {
    Sender = sender;
    Data = data;
  }
  public object Sender { get; set; }
  public string Data { get; set; }
}

And, after all this is done, simple wireup with Unity finishes things off:

var uc = new UnityContainer();
uc.RegisterType<EventBroker>(new ContainerControlledLifetimeManager());
var p = uc.Resolve<FootballPlayer>();
p.Name = "Arshavin";
var c = uc.Resolve<FootballCoach>();
p.Score();

What is the Point?

Some of you might think that we haven’t really achieved anything – after all, just like with the simple broker, we have to propagate the broker through our entities. Even if this wasn’t a broker but rather some Subject<T> (an Rx interface for classes that both publish and subscribe), it’s still not very neat.

On the other hand, we got one very serious advantage – the ability to use Linq combinators for complex manipulations. For example, if the coach starts noticing a player only after he scores his third goal, you could describe it like so:

broker.OfType<GenericEventArgs>().Skip(2).Take(5).Subscribe(args =>
  Console.WriteLine("Well done, {0}!", args.Data));

“But wait,” you might say, “didn’t you promise to talk about declarative subscriptions?” Indeed I did. That’s what the next section is about.

Unity 2.0 Event Broker

So, moving on with our event broker discussion, let’s take a look at the use of an event broker in static scenarios (where all subscriptions are known at compile-time). We are going to abuse the Unity Application Block to implement declarative event brokerage via attributes and extensions to the Unity container.

The example presented here is originally taken from the corresponding example in the ObjectBuilder project, but have been adapted to support Unity 2.0.

The Broker

Let’s begin with the broker itself. Broker implmenetation imples two immortal constructs – the subscriber (or event sink) and someone who publishes the events (event source). In the Unity implementation of the event broker, these participants are divided into separate classes – somewhat differs from our previous example with a single Subscription class. Why? It’s simple – with Rx, we are attached to a particular interface, whereas what we’re trying to get right here is true loose coupling. All we know is that the publisher publishes an event (yes, an event, not a message), and as for the ‘receiving’ class, we only know the method that takes the predictable (object, EventArgs) pair that we’re all used to.

So, let’s take a look at the infrastructure that we need.

EventSource

This class encapsulates information pertaining to an event source – that’s why the constructor has an EventInfo parameter. Based on the information about an event, we dynamically create and add a subscription to that event. Also, by analogy with our previous example, this class is marked as IDisposable and its Dispose() method does… guess what! That’s right – it unsubscribes all handlers from this event.

internal class EventSource : IDisposable
{
  readonly string eventID;
  readonly EventInfo eventInfo;
  readonly MethodInfo handlerMethod;
  readonly EventBroker broker;
  readonly WeakReference source;
  public EventSource(EventBroker broker,
                      object source,
                      EventInfo eventInfo,
                      string eventID)
  {
    this.broker = broker;
    this.source = new WeakReference(source);
    this.eventInfo = eventInfo;
    this.eventID = eventID;
    handlerMethod = GetType().GetMethod("SourceHandler");
    Delegate @delegate = Delegate.CreateDelegate(eventInfo.EventHandlerType, this, handlerMethod);
    eventInfo.AddEventHandler(source, @delegate);
  }
  public object Source
  {
    get { return source.Target; }
  }
  public void Dispose()
  {
    object sourceObj = source.Target;
    if (sourceObj != null)
    {
      Delegate @delegate = Delegate.CreateDelegate(eventInfo.EventHandlerType, this, handlerMethod);
      eventInfo.RemoveEventHandler(sourceObj, @delegate);
    }
  }
  public void SourceHandler(object sender,
                            EventArgs e)
  {
    broker.Fire(eventID, sender, e);
  }
}
EventSink

This class represents an event ‘receiver’. It does two things – firstly, it ‘registers’ a relationship between a particular event and its handler – that’s why the constructor has a MethodInfo parameter. The constructor also creates an EventArgs type which is later passed to the handler.

The second thing the event sink is concerned with is the creation and invocation of the delegate that’s responsible for the handling of a particular event.

internal class EventSink
{
  readonly Type handlerEventArgsType;
  readonly MethodInfo methodInfo;
  readonly WeakReference sink;
  public EventSink(object sink,
                    MethodInfo methodInfo)
  {
    this.sink = new WeakReference(sink);
    this.methodInfo = methodInfo;
    ParameterInfo[] parameters = methodInfo.GetParameters();
    if (parameters.Length != 2 || !typeof(EventArgs).IsAssignableFrom(parameters[1].ParameterType))
      throw new ArgumentException("Method does not appear to be a valid event handler", "methodInfo");
    handlerEventArgsType = typeof(EventHandler<>).MakeGenericType(parameters[1].ParameterType);
  }
  public object Sink
  {
    get { return sink.Target; }
  }
  public Exception Invoke(object sender,
                          EventArgs e)
  {
    object sinkObject = sink.Target;
    try
    {
      if (sinkObject != null)
      {
        Delegate @delegate = Delegate.CreateDelegate(handlerEventArgsType, sinkObject, methodInfo);
        @delegate.DynamicInvoke(sender, e);
      }
      return null;
    }
    catch (TargetInvocationException ex)
    {
      return ex.InnerException;
    }
  }
}

Extending Unity

It won’t be a surprise to anyone that Unity can, in fact, be extended with new features. To create your own extension, you simply inherit from UnityContainerExtension and add the strategies that implement your extension. Here is the container extension for our broker:

public class BrokerExtension : UnityContainerExtension
{
  private readonly EventBroker broker = new EventBroker();
  protected override void Initialize()
  {
    Context.Container.RegisterInstance(broker,
      new ExternallyControlledLifetimeManager());
    Context.Strategies.AddNew<BrokerReflectionStrategy>(
      UnityBuildStage.PreCreation);
    Context.Strategies.Add(new BrokerWireupStrategy(broker),
                            UnityBuildStage.Initialization);
  }
}

In this example, the Context variable gives us access to the container itself (into which we add the broker itself) as well as the collection of strategies that will be applied. For each strategy we define the stage at which the build-up happens. There are two strategies for our broker. The first – BrokerReflectionStrategy uses reflection to find out about potential publishers and subscribers:

public class BrokerReflectionStrategy : BuilderStrategy
{
  public override void PreBuildUp(IBuilderContext context)
  {
    Type typeToBuild = context.BuildKey.Type;
    if (typeToBuild != null)
    {
      var policy = new EventBrokerPolicy();
      RegisterSinks(policy, typeToBuild);
      RegisterSources(policy, typeToBuild);
      if (!policy.IsEmpty)
        context.Policies.Set<IEventBrokerPolicy>(policy, context.BuildKey);
    }
    base.PreBuildUp(context);
  }
  static void RegisterSinks(EventBrokerPolicy policy,
                            Type type)
  {
    foreach (MethodInfo method in type.GetMethods())
      foreach (SubscribesToAttribute attr in method.GetCustomAttributes(typeof(SubscribesToAttribute), true))
        policy.AddSink(method, attr.Name);
  }
  static void RegisterSources(EventBrokerPolicy policy, Type type)
  {
    foreach (EventInfo @event in type.GetEvents())
      foreach (PublishesAttribute attr in @event.GetCustomAttributes(typeof(PublishesAttribute), true))
        policy.AddSource(@event, attr.Name);
  }
}

The second – BrokerWireupStrategy – is used to actually register subscriptions:

public class BrokerWireupStrategy : BuilderStrategy
{
  private readonly EventBroker broker;
  public BrokerWireupStrategy(EventBroker broker)
  {
    this.broker = broker;
  }
  public override void PreBuildUp(IBuilderContext context)
  {
    var policy = context.Policies.Get<IEventBrokerPolicy>(context.BuildKey);
    if (policy != null && broker != null)
    {
      foreach (KeyValuePair<string, MethodInfo> kvp in policy.Sinks)
        broker.RegisterSink(context.Existing, kvp.Value, kvp.Key);
      foreach (KeyValuePair<string, EventInfo> kvp in policy.Sources)
        broker.RegisterSource(context.Existing, kvp.Value, kvp.Key);
    }
    base.PreBuildUp(context);
  }
}

A strange thing is happening here – we’re using two strategies instead of one. Also, there’s a policy class called EventBrokerPolicy that’s used here – it allows us to pass data from one strategy eo another. One of the reasons behind this is that, in the context of strategies, there’s no direct access to the container itself. In other words, the EventBrokerPolicy class is simply a DTO.

public class EventBrokerPolicy : IEventBrokerPolicy
{
  readonly Dictionary<string, MethodInfo> sinks = new Dictionary<string, MethodInfo>();
  readonly Dictionary<string, EventInfo> sources = new Dictionary<string, EventInfo>();
  public bool IsEmpty
  {
    get { return sinks.Count == 0 && sources.Count == 0; }
  }
  public IEnumerable<KeyValuePair<string, MethodInfo>> Sinks
  {
    get { return sinks; }
  }
  public IEnumerable<KeyValuePair<string, EventInfo>> Sources
  {
    get { return sources; }
  }
  public void AddSink(MethodInfo method,
                      string eventID)
  {
    sinks.Add(eventID, method);
  }
  public void AddSource(EventInfo @event,
                        string eventID)
  {
    sources.Add(eventID, @event);
  }
}

Putting It All Together

So, we’re going to get Unity to find subscriptions via Publishes and SubscribesTo attributes. In our example, we once again used a string identifier to describe events – thus, we get a form of loose coupling in the sense that a competely arbitrary component that gets plugged into the system can easily subscribe to a particular event – so long as it knows its name.

Let’s take a look at actual usage. First thing you should notice is that our coach and player classes got a lot simpler:

public class FootballPlayer
{
  [Publishes("score")]
  public event EventHandler PlayerScored;
  public string Name { get; set; }
  public void Score()
  {
    var ps = PlayerScored;
    if (ps != null)
      ps(this, new EventArgs());
  }
}

Naturally, event arguments in the above example could be a more copmlex type – we have already seen how Unity determines this type and builds a delegate based on it. As for the coach, here’s the new definition:

public class FootballCoach
{
  [SubscribesTo("score")]
  public void PlayerScored(object sender, EventArgs args)
  {
    var p = sender as FootballPlayer;
    Console.Write("Well done, {0}!", p.Name);
  }
}

In other words, we got a purely declarative event handling infrastructure. That’s it! All that remains is to actually use it!

var uc = new UnityContainer();
uc.AddNewExtension<BrokerExtension>();
var p = uc.Resolve<FootballPlayer>();
p.Name = "Maradona";
var c = uc.Resolve<FootballCoach>();
p.Score();
p.Score();

Conclusion

In this article, I have presented three distinct approaches to implementing an event broker – a simple Dictionary-based broker, a broker based on Reactive Extensions, and a declarative broker infrastructure as a Unity 2.0 extension. There are many more possible broker implementations out there – the one you choose is up to you. ■

Written by Dmitri Nesteruk

September 20th, 2010 at 8:36 am

Posted in DotNet

  • http://www.domenicdenicola.com Domenic Denicola

    This is a really excellent post; thanks so much for writing it. I know this blog is pretty new, but you guys have some amazing content, so keep up the great work—I hope some day you’re super-famous, because content like this deserves it.

    I haven’t gotten to read the final section on a Unity 2.0 Event Broker, because I need to head to work in a few minutes, but here are some specific points:
    * I loved the casual use of DI via Unity even in your earliest examples. I’m still learning all of these newfangled best-practices technologies (DI, IoC, AOP, etc.) and this kind of small, but real example really drives home how useful they are.
    * In your first event broker, it wasn’t clear to me at all why you used List instead of HashSet, even though you mentioned that this was a conscious choice.
    * Similarly, using the parallel collections ConcurrentBag or perhaps a hacked version of ConcurrentSet (which doesn’t exist) using ConcurrentDictionary would allow you to avoid some boilerplate code in that same example.
    * Having GenericEventArgs duplicate the functionality of our usual (sender, args) tuple is somewhat counterintuitive, since it seems to break the “is a” inheritance relationship at least in a weak sense. It would have been nicer to bake that tuple pattern into the code, but I can see how that might have made things messy; it’s tricky. It looks like you took care of that in the final event broker though.
    * Love the “What Is the Point?” section—very succinctly makes me go “oooh cool!”

  • Daniel Marbach

    Hy
    Have a look at the eventbroker from bbv.common (apache 2). We provide topic based eventing with asynchronous and synchronous event invocation. Subscribers can be automatically invoked on ui thread if desired. The eventbroker is heavily extendable. Instead of using atribute based eventing it lets you use also coded eventing. i recently programmed an extension which allows interprocess eventing. The extension uses nservicebus, masstransit or rhinoesb internally. The functionality is hooked into your eventbroker instance so you don’t have to do anything special.

    Daniel

  • http://devtalk.net Dmitri Nesteruk

    @Domenic: you’re right about using concurrent collections. Still, it’s worth also looking at libraries like Retlang that work just fine via agent-like message passing and without explicit concurrency constructs.

    @Daniel: thanks for the pointer, I’ll take a look.

  • Daniel Marbach

    Hy dmitri
    I forgot to mention that the distributed stuff is not yet released in google code but I can give you the source if you are interested

  • Aappddeevv

    The Linq does not seem to work. Was EventBroker suppose to implement IEnumerable in the 2nd scenario above?

    • http://devtalk.net Dmitri Nesteruk

      Nope, IEnumerable not required…

    • http://devtalk.net Dmitri

      Hi, I suggest you check out the example posted on BitBucket – it works just fine.