Testing RabbitMQ Concurrency in MassTransit

11 Oct 2017

We have a service which consumes messages from a RabbitMQ queue - for each message, it makes a few http calls, collates the results, does a little processing, and then pushes the results to a 3rd party api. One of the main benefits to having this behind a queue is our usage pattern - the queue usually only has a few messages in it per second, but periodically it will get a million or so messages within 30 minutes (so from ~5 messages/second to ~560 messages/second.)

Processing this spike of messages takes ages, and while this service is only on a T2.Medium machine (2 CPUs, 4GB Memory), it only uses 5-10% CPU while processing the messages, which is clearly pretty inefficient.

We use MassTransit when interacting with RabbitMQ as it provides us with a lot of useful features, but by default sets the amount of messages to be processed in parallel to Environment.ProcessorCount * 2. For this project that means 4 messages, and as the process is IO bound, it stands to reason that we could increase that concurrency a bit. Or a lot.

The existing MassTransit setup looks pretty similar to this:

_bus = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
    var host = rabbit.Host(new Uri("rabbitmq://localhost"), h =>
    {
        h.Username("guest");
        h.Password("guest");
    });

    rabbit.ReceiveEndpoint(host, "SpikyQueue", endpoint =>
    {
        endpoint.Consumer(() => new TestConsumer());
    });
});

The Test (Driven Development)

As we like testing things, I wrote a test to validate the degree of concurrency we have. We use a real instance of RabbitMQ (Started with Docker, as part of the build), and have a test message and consumer. Due to the speed of RabbitMQ delivery, we make the consumer just take a little bit of time before returning:

class TestMessage
{
    public int Value { get; set; }
}

class TestConsumer : IConsumer<TestMessage>
{
    public async Task Consume(ConsumeContext<TestMessage> context)
    {
        await Task.Delay(600);
    }
}

The final piece of our puzzle is an IConsumeObserver, which will count the number of messages processed in parallel, as well as the total number of messages processed. We will use the total number of messages to know when our test can stop running, and the parallel number to prove if our concurrency changes worked.

What this observer is doing is the following, but as we are in a multithreaded environment, we need to use the Interlocked class, and do a bit more work to make sure we don’t lose values:

PreConsume:
    currentPendingDeliveryCount++
    maxPendingDeliveryCount = Math.Max(maxPendingDeliveryCount, currentPendingDeliveryCount)
PostConsume:
    currentPendingDeliveryCount--

The actual ConsumeCountObserver code is as follows:

class ConsumeCountObserver : IConsumeObserver
{
    int _deliveryCount;
    int _currentPendingDeliveryCount;
    int _maxPendingDeliveryCount;

    readonly int _messageCount;
    readonly TaskCompletionSource<bool> _complete;

    public ConsumeCountObserver(int messageCount)
    {
        _messageCount = messageCount;
        _complete = new TaskCompletionSource<bool>();
    }

    public int MaxDeliveryCount => _maxPendingDeliveryCount;
    public async Task Wait() => await _complete.Task;

    Task IConsumeObserver.ConsumeFault<T>(ConsumeContext<T> context, Exception exception) => Task.CompletedTask;

    Task IConsumeObserver.PreConsume<T>(ConsumeContext<T> context)
    {
        Interlocked.Increment(ref _deliveryCount);

        var current = Interlocked.Increment(ref _currentPendingDeliveryCount);
        while (current > _maxPendingDeliveryCount)
            Interlocked.CompareExchange(ref _maxPendingDeliveryCount, current, _maxPendingDeliveryCount);

        return Task.CompletedTask;
    }

    Task IConsumeObserver.PostConsume<T>(ConsumeContext<T> context)
    {
        Interlocked.Decrement(ref _currentPendingDeliveryCount);

        if (_deliveryCount == _messageCount)
            _complete.TrySetResult(true);

        return Task.CompletedTask;
    }
}

Finally, we can put the actual test together: We publish some messages, connect the observer, and start processing. Finally, when the observer indicates we have finished, we assert that the MaxDeliveryCount was the same as the ConcurrencyLimit:

[Test]
public async Task WhenTestingSomething()
{
    for (var i = 0; i < MessageCount; i++)
        await _bus.Publish(new TestMessage { Value = i });

    var observer = new ConsumeCountObserver(MessageCount);
    _bus.ConnectConsumeObserver(observer);

    await _bus.StartAsync();
    await observer.Wait();
    await _bus.StopAsync();

    observer.MaxDeliveryCount.ShouldBe(ConcurrencyLimit);
}

The Problem

The problem we had was actually increasing the concurrency: There are two things you can change, .UseConcurrencyLimit(32) and .PrefetchCount = 32, but doing this doesn’t work:

_bus = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
    var host = rabbit.Host(new Uri("rabbitmq://localhost"), h =>
    {
        h.Username("guest");
        h.Password("guest");
    });

    rabbit.ReceiveEndpoint(host, "SpikeyQueue", endpoint =>
    {
        endpoint.UseConcurrencyLimit(ConcurrencyLimit);
        endpoint.PrefetchCount = (ushort) ConcurrencyLimit;

        endpoint.Consumer(() => new TestConsumer());
    });
});

Or well…it does work, if the ConcurrencyLimit is less than the default. After a lot of trial and error, it turns out there are not two things you can change, but four:

  • rabbit.UseConcurrencyLimit(val)
  • rabbit.PrefetchCount = val
  • endpoint.UseConcurrencyLimit(val)
  • endpoint.PrefetchCount = val

This makes sense (kind of): You can set limits on the factory, and then the endpoints can be any value less than or equal to the factory limits. My process of trial and error to work out which needed to be set:

  1. Set them all to 32
  2. Run test
    • if it passes, remove one setting, go to 2.
    • if it fails, add last setting back, remove a different setting, go to 2.

After iterating this set of steps for a while, it turns out for my use case that I need to set rabbit.UseConcurrencyLimit(val) and endpoint.PrefetchCount = val:

_bus = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
    var host = rabbit.Host(new Uri("rabbitmq://localhost"), h =>
    {
        h.Username("guest");
        h.Password("guest");
    });

    rabbit.UseConcurrencyLimit(ConcurrencyLimit);
    rabbit.ReceiveEndpoint(host, "SpikeyQueue", endpoint =>
    {
        endpoint.PrefetchCount = (ushort) ConcurrencyLimit;
        endpoint.Consumer(() => new TestConsumer());
    });
});

Interestingly, no matter which place you set the PrefetchCount value, it doesn’t show up in the RabbitMQ web dashboard.

Hope this might help someone else struggling with getting higher concurrency with MassTransit.

code, masstransit, rabbitmq, testing

---

Composite Decorators with StructureMap

04 Oct 2017

While I was developing my Crispin project, I ended up needing to create a bunch of implementations of a single interface, and then use all those implementations at once (for metrics logging).

The interface looks like so:

public interface IStatisticsWriter
{
    Task WriteCount(string format, params object[] parameters);
}

And we have a few implementations already:

  • LoggingStatisticsWriter - writes to an ILogger instance
  • StatsdStatisticsWriter - pushes metrics to StatsD
  • InternalStatisticsWriter - aggregates metrics for exposing via Crispin’s api

To make all of these be used together, I created a fourth implementation, called CompositeStatisticsWriter (a name I made up, but apparently matches the Gang of Four definition of a composite!)

public class CompositeStatisticsWriter : IStatisticsWriter
{
    private readonly IStatisticsWriter[] _writers;

    public CompositeStatisticsWriter(IEnumerable<IStatisticsWriter> writers)
    {
        _writers = writers.ToArray();
    }

    public async Task WriteCount(string format, params object[] parameters)
    {
        await Task.WhenAll(_writers
            .Select(writer => writer.WriteCount(format, parameters))
            .ToArray());
    }
}

The problem with doing this is that StructureMap throws an error about a bi-directional dependency:

StructureMap.Building.StructureMapBuildException : Bi-directional dependency relationship detected!
Check the StructureMap stacktrace below:
1.) Instance of Crispin.Infrastructure.Statistics.IStatisticsWriter (Crispin.Infrastructure.Statistics.CompositeStatisticsWriter)
2.) All registered children for IEnumerable<IStatisticsWriter>
3.) Instance of IEnumerable<IStatisticsWriter>
4.) new CompositeStatisticsWriter(*Default of IEnumerable<IStatisticsWriter>*)
5.) Crispin.Infrastructure.Statistics.CompositeStatisticsWriter
6.) Instance of Crispin.Infrastructure.Statistics.IStatisticsWriter (Crispin.Infrastructure.Statistics.CompositeStatisticsWriter)
7.) Container.GetInstance<Crispin.Infrastructure.Statistics.IStatisticsWriter>()

After attempting to solve this myself in a few different ways (you can even watch the stream of my attempts), I asked in the StructreMap gitter chat room, and received this answer:

This has come up a couple times, and yeah, you’ll either need a custom convention or a policy that adds the other ITest’s to the instance for CompositeTest as inline dependencies so it doesn’t try to make Composite a dependency of itself – Jeremy D. Miller

Finally, Babu Annamalai provided a simple implementation when I got stuck (again).

The result is the creation of a custom convention for registering the composite, which provides all the implementations I want it to wrap:

public class CompositeDecorator<TComposite, TDependents> : IRegistrationConvention
    where TComposite : TDependents
{
    public void ScanTypes(TypeSet types, Registry registry)
    {
        var dependents = types
            .FindTypes(TypeClassification.Concretes)
            .Where(t => t.CanBeCastTo<TDependents>() && t.HasConstructors())
            .Where(t => t != typeof(TComposite))
            .ToList();

        registry
            .For<TDependents>()
            .Use<TComposite>()
            .EnumerableOf<TDependents>()
            .Contains(x => dependents.ForEach(t => x.Type(t)));
    }
}

To use this the StructureMap configuration changes from this:

public CrispinRestRegistry()
{
    Scan(a =>
    {
        a.AssemblyContainingType<Toggle>();
        a.WithDefaultConventions();
        a.AddAllTypesOf<IStatisticsWriter>();
    });

    var store = BuildStorage();

    For<IStorage>().Use(store);
    For<IStatisticsWriter>().Use<CompositeStatisticsWriter>();
}

To this version:

public CrispinRestRegistry()
{
    Scan(a =>
    {
        a.AssemblyContainingType<Toggle>();
        a.WithDefaultConventions();
        a.Convention<CompositeDecorator<CompositeStatisticsWriter, IStatisticsWriter>>();
    });

    var store = BuildStorage();
    For<IStorage>().Use(store);
}

And now everything works successfully, and I have Pull Request open on StructureMap’s repo with an update to the documentation about this.

Hopefully this helps someone else too!

code, structuremap, di, ioc

---

Integration Testing with Dotnet Core, Docker and RabbitMQ

02 Oct 2017

When building libraries, not only is it a good idea to have a large suite of Unit Tests, but also a suite of Integration Tests.

For one of my libraries (RabbitHarness) I have a set of tests which check it behaves as expected against a real instance of RabbitMQ. Ideally these tests will always be run, but sometimes RabbitMQ just isn’t available such as when running on AppVeyor builds, or if I haven’t started my local RabbitMQ Docker container.

Skipping tests if RabbitMQ is not available

First off, I prevent the tests from running if RabbitMQ is not available by using a custom XUnit FactAttribute:

public class RequiresRabbitFactAttribute : FactAttribute
{
	private static readonly Lazy<bool> IsAvailable = new Lazy<bool>(() =>
	{
		var factory = new ConnectionFactory { HostName = "localhost", RequestedConnectionTimeout = 1000 };

		try
		{
			using (var connection = factory.CreateConnection())
				return connection.IsOpen;
		}
		catch (Exception)
		{
			return false;
		}
	});

	public override string Skip
	{
		get { return IsAvailable.Value ? "" : "RabbitMQ is not available";  }
		set { /* nothing */ }
	}
}

This attribute will try connecting to a RabbitMQ instance on localhost once for all tests per run, and cause any test with this attribute to be skipped if RabbitMQ is not available.

Build Script & Docker

I decided the build script should start a RabbitMQ container, and use that for the tests, but I didn’t want to re-use my standard RabbitMQ instance which I use for all kinds of things, and may well be broken at any given time.

As my build script is just a bash script, I can check if the docker command is available, and then start a container if it is (relying on the assumption that if docker is available, I can start a container).

if [ -x "$(command -v docker)" ]; then
  CONTAINER=$(docker run -d --rm -p 5672:5672 rabbitmq:3.6.11-alpine)
  echo "Started RabbitMQ container: $CONTAINER"
fi

If docker is available, we start a new container. I use rabbitmq:3.6.11-alpine as it is a tiny image, with no frills, and also start it with the -d and --rm flags, which starts the container in a disconnected mode (e.g. the docker run command returns instantly), and will delete the container when it is stopped, taking care of clean up for us! I only bother binding the main data connection port (5672), as that is all we are going to be using. Finally, the container’s ID, which is returned by the docker run command, is stored in the CONTAINER variable.

I recommend putting this step as the very first part of your build script, as it gives the container time to start up RabbitMQ and be ready for connections while your build is running. Otherwise I found I was needing to put a sleep 5 command in afterwards to pause the script for a short time.

The script then continues on with the normal build process:

dotnet restore "$NAME.sln"
dotnet build "$NAME.sln" --configuration $MODE

find . -iname "*.Tests.csproj" -type f -exec dotnet test "{}" --configuration $MODE \;
dotnet pack ./src/$NAME --configuration $MODE --output ../../.build

Once this is all done, I have another check that docker exists, and stop the container we started earlier, by using the container ID in CONTAINER:

if [ -x "$(command -v docker)" ]; then
  docker stop $CONTAINER
fi

And that’s it! You can see the full build script for RabbitHarness here.

The only problem with this script is if you try and start a RabbitMQ container while you already have one running, the command will fail, but the build should succeed anyway as the running instance of RabbitMQ will work for the tests, and the docker stop command will just output that it can’t find a container with a blank ID.

I think I will be using this technique more to help provide isolation for builds - I think that the Microsoft/mssql-server-linux containers might be very useful for some of our work codebases (which do work against the Linux instances of MSSQL, even if they weren’t designed to!)

code, dotnetcore, rabbitmq, docker, testing

---

Implementing Custom Aspnet Core ModelBinders

22 Sep 2017

This post is a summary of a stream I did last night where I implemented all of this. If you want to watch me grumble my way through it, it’s available on YouTube here.

In my Crispin project, I wanted the ability to support loading Toggles by both name and ID, for all operations. As I use mediator to send messages from my controllers to the handlers in the domain, this means that I had to either:

  • create separate request types for loading by name and loading by id
  • have both an ID and Name property on each method

I didn’t like the sound of either of these as both involve more typing than I want to do, and the second variant has the added downside of causing a lot of if statements in the handlers, as you have to work out which is set before loading. Not to mention the duplication of the load toggle logic in every handler.

The solution I came up with was to use some inheritance, a static factory, some method hiding, and a custom IModelBinder.

ToggleLocator

I started off by having an abstract base class called ToggleLocator. To start with, it just has two static methods for creating an instance of ToggleLocator:

public abstract class ToggleLocator
{
	public static ToggleLocator Create(Guid toggleID) => new ToggleLocatorByID(toggleID);
	public static ToggleLocator Create(string toggleName) => new ToggleLocatorByName(toggleName);
}

As this is going to be used in both Query handlers and Command handlers, I need to be able to load the Toggle (the EventSourced AggregateRoot), and the ToggleView (the projected current state of the AggregateRoot). So we add two abstract methods to the ToggleLocator

internal abstract ToggleView LocateView(IStorageSession session);
internal abstract Toggle LocateAggregate(IStorageSession session);

Note that not only are these two methods abstract, they are also internal - we don’t want anything outside the domain to know about how a toggle is loaded. I was considering using an privately implemented interface to do this method hiding, but didn’t see the point as I can acomplish the same using the internal methods.

We can now write two implementations of the ToggleLocator. First up is the ToggleLocatorByID, which is very straight forward to implement; we use the ID to load the AggregateRoot directly, and the AllToggles view can be queried by ID to fetch the view version also.

public class ToggleLocatorByID : ToggleLocator
{
	private readonly ToggleID _toggleID;

	public ToggleLocatorByID(ToggleID toggleID)
	{
		_toggleID = toggleID;
	}

	internal override ToggleView LocateView(IStorageSession session) => session
		.LoadProjection<AllToggles>()
		.Toggles
		.SingleOrDefault(view => view.ID == _toggleID);

	internal override Toggle LocateAggregate(IStorageSession session) => session
		.LoadAggregate<Toggle>(_toggleID);
}

The more interesting class to implement is ToggleLocatorByName, as this needs to be able to load an AggregateRoot by name; something which is not directly supported. So to do this we fetch the ToggleView first, and then use the ID property so we can load the Toggle:

public class ToggleLocatorByName : ToggleLocator
{
	private readonly string _toggleName;

	public ToggleLocatorByName(string toggleName)
	{
		_toggleName = toggleName;
	}

	internal override ToggleView LocateView(IStorageSession session) => session
		.LoadProjection<AllToggles>()
		.Toggles
		.SingleOrDefault(t => t.Name.Equals(_toggleName, StringComparison.OrdinalIgnoreCase));

	internal override Toggle LocateAggregate(IStorageSession session)
	{
		var view = LocateView(session);

		return view != null
			? session.LoadAggregate<Toggle>(view.ID)
			: null;
	}
}

All this means that the handlers have no conditionals for loading, they just call the relevant .Locate method:

private Task<UpdateToggleTagsResponse> ModifyTags(ToggleLocator locator, Action<Toggle> modify)
{
	using (var session = _storage.BeginSession())
	{
		var toggle = locator.LocateAggregate(session);
		//or
		var view  = locator.LocateView(session);
		//...
	}
}

And in the controllers, we have separate action methods for each route:

[Route("name/{toggleName}/tags/{tagName}")]
[HttpPut]
public async Task<IActionResult> PutTag(string toggleName, string tagName)
{
	var request = new AddToggleTagRequest(ToggleLocator.Create(toggleName), tagName);
	var response = await _mediator.Send(request);

	return new JsonResult(response.Tags);
}

[Route("id/{toggleID}/tags/{tagName}")]
[HttpPut]
public async Task<IActionResult> PutTag(Guid toggleID, string tagName)
{
	var request = new AddToggleTagRequest(ToggleLocator.Create(ToggleID.Parse(toggleID)), tagName);
	var response = await _mediator.Send(request);

	return new JsonResult(response.Tags);
}

But that is still more duplication than I would like, so lets see if we can resolve this with a custom IModelBinder.

Custom IModelBinder for ToggleLocator

To make a custom model binder, we need to implement two interfaces: IModelBinderProvider and IModelBinder. I am not sure why IModelBinderProvider exists to be perfectly honest, but you need it, and as it is doing nothing particularly interesting, I decided to implement both interfaces in the one class, and just return this from IModelBinderProvider.GetBinder:

public class ToggleLocatorBinder : IModelBinderProvider
{
	public IModelBinder GetBinder(ModelBinderProviderContext context)
	{
		if (context.Metadata.ModelType == typeof(ToggleLocator))
			return this;

		return null;
	}
}

We can then implement the second interface, IModelBinder. Here we check (again) that the parameter is a ToggleLocator, fetch the value which came from the route (or querystring, thanks to the .ValueProvider property).

All I need to do here is try and parse the value as a Guid. If it parses successfully, we create a ToggleLocatorByID instance, otherwise create a ToggleLocatorByName instance.

public class ToggleLocatorBinder : IModelBinderProvider, IModelBinder
{
	public Task BindModelAsync(ModelBindingContext bindingContext)
	{
		if (bindingContext.ModelType != typeof(ToggleLocator))
			return Task.CompletedTask;

		var value = bindingContext.ValueProvider.GetValue(bindingContext.FieldName);
		var guid = Guid.Empty;

		var locator = Guid.TryParse(value.FirstValue, out guid)
			? ToggleLocator.Create(ToggleID.Parse(guid))
			: ToggleLocator.Create(value.FirstValue);

		bindingContext.Result = ModelBindingResult.Success(locator);

		return Task.CompletedTask;
	}
}

We add this into our MVC registration code at the beginning of the ModelBinderProviders collection, as MVC will use the first binder which can support the target type, and there is a binder in the collection somewhere which will handle anything which inherits object…

services.AddMvc(options =>
{
	options.ModelBinderProviders.Insert(0, new ToggleLocatorBinder());
});

Now we can reduce our action methods down to one which handles both routes:

[Route("id/{id}/tags/{tagName}")]
[Route("name/{id}/tags/{tagName}")]
[HttpPut]
public async Task<IActionResult> PutTag(ToggleLocator id, string tagName)
{
	var request = new AddToggleTagRequest(id, tagName);
	var response = await _mediator.Send(request);

	return new JsonResult(response.Tags);
}

Much better, no duplication, and no (obvious) if statements!

design, code, aspnetcore

---

Testing Containers or Test Behaviour, Not Implementation

17 Sep 2017

The trouble with testing containers is that usually the test ends up very tightly coupled to the implementation.

Let’s see an example. If we start off with an interface and implementation of a “cache”, which in this case is just going to store a single string value.

public interface ICache
{
    string Value { get; set; }
}

public class Cache
{
    public string Value { get; set; }
}

We then setup our container (StructureMap in this case) to return the same instance of the cache whenever an ICache is requested:

var container = new Container(_ =>
{
    _.For<ICache>().Use<Cache>().Singleton();
});

The following test is fairly typical of how this behaviour gets verified - it just compares that the same instance was returned by the container:

var first = container.GetInstance<ICache>();
var second = container.GetInstance<ICache>();

first.ShouldBe(second);

But this is a very brittle test, as it is assuming that ICache will actually be the singleton. However in the future, we might add in a decorator, or make the cache a totally different style of implementation which isn’t singleton based.

For example, if we were to include a decorator class, which just logs reads and writes to the console:

public class LoggingCache : ICache
{
    private readonly Cache _backingCache;

    public LoggingCache(Cache backingCache)
    {
        _backingCache = backingCache;
    }

    public string Value
    {
        get
        {
            Console.WriteLine("Value fetched");
            return _backingCache.Value;
        }
        set
        {
            Console.Write($"Value changed from {_backingCache.Value} to {value}");
            _backingCache.Value = value;
        }
    }
}

Which will change our container registration:

var container = new Container(_ => {
    _.ForSingletonOf<Cache>();
    _.For<ICache>().Use<LoggingCache>();
});

The test will now fail, or need changing to match the new implementation. This shows two things:

  • Tests are tightly coupled to the implementation
  • Tests are testing the implementation, not the intent.

Testing intent, not implementation

Instead of checking if we get the same class instances back from the container, it would make for more sense to check the classes behave as expected. For my “super stupid cache” example this could take the following form:

var first = container.GetInstance<ICache>();
var second = container.GetInstance<ICache>();

first.Value = "testing";
second.Value.ShouldBe("testing");

Not only does this test validate the behaviour of the classes, but it is far less brittle - we can change what the container returns entirely for ICache, as long as it behaves the same.

But what do you think? How do you go about testing behaviour?

design, code, structuremap, testing

---