The Problem with CPUs and Kubernetes

02 Jun 2021

Key Takeaway:

os .cpus() returns the number of cores on a Kubernetes host, not the number of cores assigned to a pod.

Investigating excessive memory usage

Recently, when I was looking through a cluster health dashboard for a Kubernetes cluster, I noticed that one of the applications deployed was using a considerable amount of RAM - way more than I thought could be reasonable. Each instance (pod) of the application used approximately 8 GB of RAM, which was definitely excessive for a reasonably simple NodeJS webserver. Combined with the application running 20-30 replicas or so, it makes the total RAM usage between 160 GB and 240 GB.

One of the first things I noticed was that the deployment manifest in Kubernetes had the NODE_MAX_MEM environment variable specified and set to 250 MB:

environment:
  NODE_MAX_MEM: 250

Interesting. So how is a single container using more RAM than that?

The application used to be deployed to EC2 machines and to fully utilise the multiple cores in the machines, the cluster library was used.

This library essentially forks the node process into n child processes, and in this case, n was set to os.cpus(), which returns the number of cores available on the machine in NodeJS.

While this works for direct virtual machine usage, when the application was containerised and deployed to Kubernetes, it used about the same amount of ram as before, so no one realised there was a problem.

os.cpus() and Kubernetes

The interesting thing about os.cpus() when called in a container in Kubernetes is that it reports the number of cores available on the host machine, not the amount of CPU assigned to the container (e.g. through resource requests and limits).

So every replica for the application spawns 32 child processes, as our EC2 hosts have that many cores. As they had a limited per-pod CPU budget, was there any benefit to doing this?

So I did what seemed natural - I replaced os.cpus() with 1, and deployed the application to production, and watched the performance metrics to see what happened.

And what do you know? No difference in request performance at all - and the memory usage dropped by 7.75 GB per pod.

This means overall, we have saved 155 GB to 232.5 GB of RAM, with no performance difference!

Adding Observability to Vault

27 May 2021

One of the things I like to do when setting up a Vault cluster is to visualise all the operations Vault is performing, which helps see usage patterns changing, whether there are lots of failed requests coming in, and what endpoints are receiving the most traffic.

While Vault has a lot of data available in Prometheus telemetry, the kind of information I am after is best taken from the Audit backend. Setting up an audit backend for Vault is reasonably easy - it supports three methods of communication: file, socket and syslog. For this application, I use a Unix socket and a small daemon running on the same machine as the Vault instance to send the data to a tracing system.

The Goal

Write a small application that receives audit events and writes traces (spans) to an observability tool. In this case, I am implementing both Honeycomb and Zipkin via OpenTelemetry.

The code is available on Github, and the most interesting parts are covered in the rest of this blog post.

Receiving and Processing Messages

ln, _ := net.Listen("unix", "/tmp/observe.sock")
conn, _ := ln.Accept()

for {
  message, _ := bufio.NewReader(conn).ReadBytes('\n')

  // do something with the message
}

We only need to do minimal processing of the data for this application before sending it on to Honeycomb or Zipkin. As the messages contain nested objects, we need to flatten the object hierarchy for easier viewing in spans. So instead of this:

{
  "request": {
    "operation": "update",
    "namespace": { "id": "root" },
    "path": "sys/audit/socket",
    "data": {
      "local": false
    }
  }
}

We want to send this:

{
  "request.operation": "update",
  "request.namespace.id": "root",
  "request.path": "sys/audit/socket",
  "request.data.local": false
}

We also want to get a few strongly typed pieces of data out of the message, too, such as the type (request or response) and the request’s id, which is in both messages and can be used to group the spans.

To save us from deserialising the json twice, we can do the following:

  1. deserialize into a map[string]interface{}
  2. create a flattened version of the event using the flatten library
  3. turn the map into a typed struct using the mapstructure library
// 1 deserialize
event := map[string]interface{}{}
if err := json.Unmarshal(message, &event); err != nil {
  return err
}

// 2 flatten
flat, err := flatten.Flatten(event, "", flatten.DotStyle)
if err != nil {
  return err
}

// 3 type
typed := Event{}
if err := mapstructure.Decode(event, &typed); err != nil {
  return err
}

Now that we have our flattened version and our typed version of the message, we can forward it to our span processors. There are two implementations (3 if you count stdout), so let’s look at them one at a time.

Honeycomb

To send the spans to Honeycomb, I am using their lower-level library libhoney-go, rather than the more usual beeline as I don’t need all the context propagation or automatic ID generation.

For the first version of this application, just sending the two events to Honeycomb linked together is enough; however, both spans will show 0ms durations. We’ll fix this problem for both Honeycomb and OpenTelemetry later.

To link our spans together properly, I use the .Request.ID property from the event as the trace.trace_id; it’s already a guid and is the same for both the request and response events. Then, for a request event, I make it the parent span by using the .Request.ID again, but this time as the trace.span_id. Finally, for the response event, I set the trace.parent_id to the .Request.ID, and generate a random value for the trace.span_id field.

Lastly, I loop through the flattened version of the event, adding each key-value pair to the event’s attributes and finally send the event.

ev := libhoney.NewEvent()
ev.AddField("trace.trace_id", typed.Request.ID)

if typed.Type == "request" {
  ev.AddField("trace.span_id", typed.Request.ID)
} else {
  ev.AddField("trace.parent_id", typed.Request.ID)
  ev.AddField("trace.span_id", generateSpanID())
}

ev.AddField("service_name", "vault")
ev.AddField("name", typed.Type)

for key, val := range event {
  ev.AddField(key, val)
}

ev.Send()

Zipkin / OpenTelemetry

The process for sending via OpenTelemetry is reasonably similar; we start a new span, copy the flattened structure into the span’s attributed and call End(), making the TracerProvider send the span to our configured backends (Zipkin in this case.)

id, _ := uuid.Parse(typed.Request.ID)
ctx := context.WithValue(context.Background(), "request_id", id)

tr := otel.GetTracerProvider().Tracer("main")
ctx, span := tr.Start(ctx, typed.Type, trace.WithSpanKind(trace.SpanKindServer))

for key, value := range event {
  span.SetAttributes(attribute.KeyValue{
    Key:   attribute.Key(key),
    Value: attribute.StringValue(fmt.Sprintf("%v", value)),
  })
}

if typed.Error != "" {
  span.SetStatus(codes.Error, typed.Error)
}

span.End()

The hard part was figuring out how to feed the .Request.ID into the Tracer as the TraceID, which was achieved by configuring OpenTelemetry with a custom ID generator that would use the request_id property of the current context:

type Generator struct{}

func (g *Generator) NewIDs(ctx context.Context) (trace.TraceID, trace.SpanID) {
  val := ctx.Value("request_id").(uuid.UUID)
  tid := trace.TraceID{}
  req, _ := val.MarshalText()
  copy(tid[:], req)

  sid := trace.SpanID{}
  rand.Read(sid[:])

  return tid, sid
}

I am sure more copying and allocation is happening in this method than necessary, but it is good enough for now. Configuring it for use by OpenTelemetry is straightforward; it just needs adding to the NewTracerProvider call by wrapping it with trace.WithIDGenerator():

exporter, _ := zipkin.NewRawExporter(
  "http://localhost:9411/api/v2/spans",
  zipkin.WithSDKOptions(sdktrace.WithSampler(sdktrace.AlwaysSample())),
)

processor := sdktrace.NewSimpleSpanProcessor(exporter)

tp := sdktrace.NewTracerProvider(
  sdktrace.WithSpanProcessor(processor),
  sdktrace.WithResource(resource.NewWithAttributes(
    semconv.ServiceNameKey.String("vault-observe"),
  )),
  sdktrace.WithIDGenerator(&Generator{}),
)

otel.SetTracerProvider(tp)

Testing

To verify that it works, I have a single docker-compose.yml file which sets up a Vault instance in dev mode, and a Zipkin instance. It mounts the current working directory into the Vault container as /sockets to share the socket file between the host and the container.

version: "3.9"

services:
  vault:
    image: vault:latest
    cap_add:
      - IPC_LOCK
    volumes:
      - "./:/sockets:rw"
    ports:
      - "8200:8200"
    environment:
      VAULT_DEV_ROOT_TOKEN_ID: "vault"
  zipkin:
    image: openzipkin/zipkin-slim
    ports:
      - "9411:9411"

Running the application along with the docker container is now as follows:

go build
docker-compose up -d
./vault-observe --zipkin --socket-path observe.sock

In another terminal, you can now enable the new audit backend and send some requests so we can look at them in Zipkin:

export VAULT_ADDR="http://localhost:8200"
export VAULT_TOKEN="vault"

vault audit enable socket address=/sockets/observe.sock socket_type=unix

vault secrets enable -version=2 kv
vault kv put /secrets/test name=andy
vault kv get /secrets/test

Running in Production

There are a few things you should be aware of, running this in production:

  • This must not be your only audit backend: Vault will fail requests if they are not successfully written to at least one audit backend if any are enabled.
  • There is the possibility of losing data if the vault-observe process stops

Improvements

As I am using this for keeping an eye on request durations and patterns in behaviour, capturing the actual time it takes for Vault to handle a request would be pretty valuable. So instead of processing both events, I will keep just the timestamp from the request, and then when the response event comes in, look up the timestamp and calculate the duration.

As I don’t want an ever-expanding list of timestamps in memory, I use an automatically expiring cache so keep them for around 10 seconds, as no request to Vault should be that slow!

requests := cache.New(10*time.Second, 1*time.Minute)

for {
  err := processMessage(requests, conn, sender)
  if err != nil && err != io.EOF {
    fmt.Println(err)
  }
}

The processMessage function now handles the request and response messages separately. The request just inserts the event’s time property into the cache, and exists:

if typed.Type == "request" {
  requests.Set(typed.Request.ID, typed.Time, cache.DefaultExpiration)
  return nil
}

The response version pulls the time back out of the cache and stores it into the event itself - it’s then up to the sender if it wants to use the value or not.

if typed.Type == "response" {

  if x, found := requests.Get(typed.Request.ID); found {
    typed.StartTime = x.(time.Time)
    requests.Delete(typed.Request.ID)
  } else {
    return fmt.Errorf("No request found in the cache for %s", typed.Request.ID)
  }
}

In the Honeycomb sender, we can remove all the parenting logic; we only need to set the Timestamp and duration_ms fields to get the duration showing correctly:

duration := typed.Time.Sub(typed.StartTime).Milliseconds()

ev := libhoney.NewEvent()
ev.Timestamp = typed.StartTime
ev.AddField("duration_ms", duration)

ev.AddField("trace.trace_id", typed.Request.ID)
ev.AddField("trace.span_id", typed.Request.ID)

For the OpenTelemetry sender, we can add a trace.WithTimestamp() call to both the Start() and End() calls so use our events’ timestamps:

ctx := context.WithValue(context.Background(), "request_id", id)
tr := otel.GetTracerProvider().Tracer("main")
ctx, span := tr.Start(ctx, typed.Type, trace.WithSpanKind(trace.SpanKindServer), trace.WithTimestamp(typed.StartTime))

// ...


span.End(trace.WithTimestamp(typed.Time))

Getting NodeJS OpenTelemetry data into NewRelic

12 Mar 2021

I had the need to get some OpenTelemetry data out of a NodeJS application, and into NewRelic’s distributed tracing service, but found that there is no way to do it directly, and in this use case, adding a separate collector is more hassle than it’s worth.

Luckily, there is an NodeJS OpenTelemetry library which can report to Zipkin, and NewRelic can also ingest Zipkin format data.

To use it was relatively straight forward:

import { context, setSpan, Span, trace } from "@opentelemetry/api";
import { BasicTracerProvider, BatchSpanProcessor } from "@opentelemetry/tracing";
import { ZipkinExporter } from "@opentelemetry/exporter-zipkin";

const exporter = new ZipkinExporter({
  url: "https://trace-api.newrelic.com/trace/v1",
  serviceName: "interesting-service",
  headers: {
    "Api-Key": process.env.NEWRELIC_APIKEY,
    "Data-Format": "zipkin",
    "Data-Format-Version": "2",
  },
});

const provider = new BasicTracerProvider();
provider.addSpanProcessor(new BatchSpanProcessor(exporter));
provider.register();

export const tracer = trace.getTracer("default");


const rootSpan = tracer.startSpan("main");

// do something fantastically interesting

rootSpan.end();
provider.shutdown();

This has the added benefit of being able to test with Zipkin locally, using the openzipkin/zipkin-slim docker container, by just removing the URL property from the ZipkinExporter:

docker run --rm -d -p 9411:9411 openzipkin/zipkin-slim

Child Spans

Figuring out how to create child spans was actually harder in the end, in part because the OpenTelemetry docs don’t quite match the actual function signatures.

In the end, I wrote this little helper function:

import { context, setSpan, Span } from "@opentelemetry/api";

function startSpan(parent: Span, name: string): Span {
  return tracer.startSpan(name, undefined, setSpan(context.active(), parent));
}

Which I can use like this:


async function DoInterestingThings(span: Span) {
  span = startSpan(span, "do-interesting-things");

  // interesting things happen here

  span.end();
}

Doing both of these means I can now see what my misbehaving cron jobs are actually doing, rather than trying to guess what their problems are.

Observability with Infrastructure as Code

01 Mar 2021

This article was originally published on the Pulumi blog.

When using the Pulumi Automation API to create applications which can provision infrastructure, it is very handy to be able to use observability techniques to ensure the application functions correctly and to help see where performance bottlenecks are.

One of the applications I work on creates a VPC and Bastion host and then stores the credentials into a Vault instance. The problem is that the “create infrastructure” part is an opaque blob, in that I can see it takes 129 seconds to create, but I can’t see what it’s doing, or why it takes this amount of time.

honeycomb traces of one pulumi stack resource

So can I do better?

The Initial Application

In this example I use Honeycomb’s Go Beeline to capture all the data I care about; durations, errors, any context which is “interesting”:

func main() {

	beeline.Init(beeline.Config{
		WriteKey: os.Getenv("HONEYCOMB_API_KEY"),
		Dataset:  "pulumi-demo",
	})
	defer beeline.Close()

	ctx, span := beeline.StartSpan(context.Background(), "basic-vpc")
	defer span.Send()

	name := auto.FullyQualifiedStackName(os.Getenv("PULUMI_USERNAME"), "basic-vpc", "dev")
	stack, err := auto.UpsertStackInlineSource(ctx, name, "basic-vpc", func(pc *pulumi.Context) error {

		azs, err := getAvailabilityZones(ctx)
		if err != nil {
			beeline.AddField(ctx, "err", err)
			return err
		}

		v, err := vpc.NewVpc(ctx, pc, "dev", &vpc.VpcArgs{
			Description:           "dev",
			BaseCidr:              "192.168.0.0/16",
			AvailabilityZoneNames: azs,
			S3Endpoint:            true,
			DynamoEndpoint:        true,
		})
		if err != nil {
			beeline.AddField(ctx, "err", err)
			return err
		}
	})

	if err != nil {
		beeline.AddField(ctx, "err", err)
		os.Exit(1)
	}

	if err := stack.SetConfig(ctx, "aws:region", auto.ConfigValue{Value: os.Getenv("PULUMI_REGION")}); err != nil {
		beeline.AddField(ctx, "err", err)
		os.Exit(1)
	}

	ws := stack.Workspace()
	if err := ws.InstallPlugin(ctx, "aws", "v3.23.0"); err != nil {
		beeline.AddField(ctx, "err", err)
		os.Exit(1)
	}

	if _, err := stack.Refresh(ctx); err != nil {
		beeline.AddField(ctx, "err", err)
		os.Exit(1)
	}

	stream := optup.ProgressStreams(os.Stdout)
	if _, err := stack.Up(ctx, stream); err != nil {
		beeline.AddField(ctx, "err", err)
		os.Exit(1)
	}

	//vault code

}

Adding Infrastructure Observability

To get a handle on what is happening when stack.Up() runs, I have mplemented a custom io.Writer, which will be passed into the ProgressStream constructor.

The custom progress stream’s Write method is called once for each line emitted, which allows us to start new spans when a resource starts being constructed, and send them when construction completes. Currently, this is achieved by parsing the console output text, but I gather in the future, it will be possible to get streamed json blobs which can be unmarshaled into go structs.

type pulumiBeeline struct {
	ctx      context.Context
	contexts map[string]func()
}

func NewPulumiBeeline(ctx context.Context) *pulumiBeeline {
	return &pulumiBeeline{
		ctx:  	ctx,
		contexts: map[string]func(){},
	}
}

func (cw *pulumiBeeline) Write(p []byte) (n int, err error) {

	// todo: make more robust, support modifications, deletions etc.
	line := strings.TrimSpace(string(p))
	parts := strings.Split(line, " ")
	if len(parts) < 5 {
		return len(p), nil
	}

	//+  aws-vpc dev creating
	//+  <type> <name> <action>
	resourceType := parts[2]
	resourceName := parts[3]
	resourceAction := parts[4]

	if resourceAction == "creating" {
		c, s := beeline.StartSpan(cw.ctx, resourceName)
		beeline.AddField(c, "type", resourceType)
		// add other things here

		cw.contexts[resourceName] = s.Send
	}

	if resourceAction == "created" {
		cw.contexts[resourceName]()
	}

	return len(p), nil
}

Modifying the optup.ProgressStreams is the only change needed to the original application:

stream := optup.ProgressStreams(os.Stdout, NewPulumiBeeline(ctx))
if _, err := stack.Up(ctx, stream); err != nil {
	beeline.AddField(ctx, "err", err)
	os.Exit(1)
}

Now when I run this program again, I can see a lot more information in my Honeycomb traces, which not only shows me that Pulumi is highly parallelised, but also gives me a better idea of where the time is taken when creating infrastructure; in this example, it’s the NAT Gateways:

honeycomb traces of all infrastructure resources in the pulumi stack

In the future, I want to expand this to cover far more details, such as including the reasons resources were created/modified/destroyed and including as much information as possible about what caused a resource to fail.

Wrapping Up

In the end, this turned out to be much easier to achieve than I had hoped. Being able to use Pulumi progmatically, rather than running os.Exec directly myself was a huge productivity boost.

I am looking forward to all the new kinds of tooling I can build to solve my user’s problems continuing to utilise Honeycomb for my observability and Pulumi for my infrastructure.

Forking Multi Container Docker Builds

03 Nov 2020

Following on from my last post on Isolated Multistage Docker Builds, I thought it would be useful to cover another advantage to splitting your dockerfiles: building different output containers from a common base.

The Problem

When I have an application which when built, needs to have all assets in one container, and a subset of assets in a second container.

For example, writing a node webapp, where you want the compiled/bundled static assets available in the container as a fallback, and also stored in an nginx container for serving. One of the reasons to do this is separation of concerns: I don’t want to put my backend code where it doesn’t need to be. There is also, in this case, the fact that the backend code and nginx version need different base containers, meaning deploying the same container twice won’t work.

So let’s see how we solve this!

Creating Separate Dockerfiles

The first dockerfile to write is the common base, which I name Dockerfile.builder. This is the same as the previous post - we are assuming that the yarn ci:build step transpiles the typescript, and generates the static assets for our application.

FROM node:15.0.1-alpine3.12 as builder
WORKDIR /app

COPY . ./
RUN yarn install --frozen-lockfile && yarn cache clean

RUN yarn ci:build

Next up is the server container, which will be in the Dockerfile.backend file, as try to name the files based on their purpose, rather than their technology used. As in the previous post, this installs the production dependencies for the application, and copies in the compiled output from the builder stage:

ARG builder_image
FROM ${builder_image} as builder

FROM node:15.0.1-alpine3.12 as output
WORKDIR /app

COPY package.json yarn.lock /app
RUN yarn install --frozen-lockfile --production && yarn cache clean

COPY --from builder /app/dist /app

Now let’s deal with the Dockerfile.frontend. This uses nginx:1.19.3-alpine as a base, and copies in the nginx.conf file from the host, and the static assets directory from the builder container:

ARG builder_image
FROM ${builder_image} as builder

FROM nginx:1.19.3-alpine as output

COPY ./nginx.conf /etc/nginx/nginx.conf
COPY --from builder /app/dist/static /app

Building Containers

The reason we rely on the builder stage rather than the backend output stage is that we are now decoupled from layout/structural changes in that container, and we gain the ability to run the builds in parallel too (the & at the end of the lines), for a bit of a speed up on our build agents:

version="${GIT_COMMIT:0:7}"
builder_tag="builder:$version"

docker build --file Dockerfile.builder -t "$builder_tag" .

# run the builder container here to do tests, lint, static analysis etc.

docker build --file dockerfile.backend --build-arg "builder_image=$builder_tag" -t backend:$version . &
docker build --file Dockerfile.frontend --build-arg "builder_image=$builder_tag" -t frontend:$version . &

wait

The result of this is 3 containers, all labled with the short version of the current git commit:

  • builder:abc123e - contains all packages, compiled output
  • backend:abc123e - node based, contains the node backend and static assets
  • frontend:abc123e - nginx based, contains the static assets

I can now publish the builder internally (so it can be cloned before builds for caching and speed), and deploy the backend and frontend to their different locations.