A Gentle Introduction to OTP

April 24, 2014 Leave a comment

Supervisors, Workers, and Trees – An Overview

(cross-posted from neo.com, somewhat belatedly)
Open Telecom Platform, or OTP is a group of Erlang libraries that help implement distributed, fault-tolerant systems. Because of the patterns that are encouraged by the OTP libraries, these systems tend to be built of lots of small, independent processes that are easy to reason about and manage. And while the name implies that they are only useful for Telecom applications, many types of applications can be built using OTP.

At its simplest, an OTP application, at runtime, is nothing but a tree of Erlang processes. There are two fundamental kinds of OTP processes (implemented as Erlang behaviors). The first is the Supervisor, the job of which is to start up its children manage their lifecycle. The other is the Worker (of which there are several sub-types). Workers, as the name implies, do the work of the system.

I ended my third installment on Elixir and the Internet of Things with a picture of the Supervisor Tree we’ve built for our SSL RabbitMQ endpoint host. As a reminder, it looks like this:

Owsla Supervisor Tree

For those of you who are familiar with OTP, this diagram probably already makes sense. If you’re not, follow along and I’ll try to make some sense of this slightly different version of a typical “circles, boxes, and lines” diagram.

Supervisors

Supervisors do nothing but start, monitor, and (potentially) restart their children. While this sounds like a small thing, supervisors are, in may ways, what give OTP applications their reputation for stability and resilience in the face of errors. I’ll go into more details about supervisors, supervision strategies, and restart settings in a later post, but for now understand that each supervisor can be configured to do start up different kinds of children and react differently when those children die.

Workers

As their name implies, OTP workers perform the actual work of the application. There are three generic worker behaviors provided by the OTP libraries, the gen_server, gen_fsm, and gen_event, each of which is implemented as a behavior in Erlang parlance.

gen_server (Generic Server)

The most generic of the three is the gen_server behavior. This behavior implements a typical Erlang event loop and provides callbacks for a generic server. However, it doesn’t inherently implement any sort of networking server (as its name may imply) beyond what any Erlang process provides. It’s really a host for fairly generic business/communications logic. You may, however, use this to create things like our TcpEndoint from the Owsla project, where the generic gen_server happens to “own” a TCP connection and receives and reacts to events from that connection. Rather than repeat myself, I’ll refer you to the 2nd part of my series on Elixir and the Internet of Things, where I break down our TcpEndpoint worker in great detail.

gen_fs (the Finite State Machine)

Does anyone remember what Finite State Machines (FSMs) are? You know, those little circle->arrow->circle diagrams that turn into regular expressions, or something like that? At least, that’s what I remember from my CS class in college. Seriously though, FSMs are useful for situations where you have logic that maps to a series of states and transitions. A small example from the wikipedia article on Finite State Machines will give you the idea of what I’m talking about. This one models a simple coin-operated turnstyle:

Coin Operated Turnstyle FSM

These kinds of workers are often used to model real-world devices (like the above-mentioned turnstyle).

	-module(turnstyle).
	-behaviour(gen_fsm).

	-export([start_link/1]).
	-export([coin/0]).
	-export([push/0]).
	-export([init/1, locked/2, unlocked/2]).

	start_link() ->
	    gen_fsm:start_link({local, turnstyle}, tunrstyle, {}, []).

	coin() ->
	    gen_fsm:send_event(turnstyle, {}).

	init() ->
	    {ok, locked, {[]}}.

	locked({coin}, {}) ->
	       do_unlock(),
	       {next_state, unlocked, {[], }, 30000}.

	locked({push}, {}) ->
	       {next_state, locked, {[], }, 30000}.
	       
	unlocked(timeout, {}) ->
	    do_lock(),
	    {next_state, locked, State}.
	    
    unlocked({push}, {}) ->
    	do_rotate(),
    	{next_state, locked, State}.

You may have noticed that this is straight-up Erlang code. This is because, as of Elixir 0.12.5, the GenFSM.Behavior module was deprecated, and it was removed in 0.13. The reason given is that there are easier and more idiomatic ways in Erlang to implmement a finite state machine, and that those solutions (like) plain_fsm probably provide a better basis for a Finite State Machine implementation in Elixir. Also, please pardon what I am sure are the many typos in my Erlang code (I compiled it in my head and it seemed to work fine).

gen_event (Event Handlers)

Event Handlers, as implemented by the gen_event behavior, work in partnership with an event manager to separate the responsibility of raising events and handling them in different ways. An event manager may have zero, one, or several event handlers registered, and each will respond to an event sent to its event manager. Let’s implement the console logger from the erlang docs in Elixir:

	defmodule ConsoleLogger do
		use GenEvent.Behaviour
		
		def init(_) do
			{ :ok, [] }
		end
		
		def handle_event(error, state) do
			IO.puts "***ERROR*** #{error}"
			{:ok, state}
		end
	end

Now, you can start up a generic “event manager”, register our handler with it, and send it events:

	# start up our event manager
	:gen_event.start({:local, :error_manager})
	# add our handler
	:gen_event.add_handler(:error_manager, ConsoleLogger, [])
	# Now, send an event
	:gen_event.notify(:error_manager, :error)

which would result in an IEX session something like this:

	iex(2)> :gen_event.start({:local, :error_manager})
	{:ok, #PID<0.59.0>}
	iex(3)> :gen_event.add_handler(:error_manager, ConsoleLogger, [])
	:ok
	iex(4)> :gen_event.notify(:error_manager, :error)
	:ok
	***ERROR*** error
	iex(5)>

Note that you can register additional event handlers, each of which will be notified whenever an event is sent to the event manager. This is great when you want to separate out different concerns (logging to the console, log files, database tables, and airbrake.io, for example) and simply register the event handler(s) you need at the time.

Conclusion

This only scratches the surface of OTP, but hopefully it will give you a starting point for understanding how some of the moving parts fit together. In a future post, I’ll go into more detail on building OTP applications, supervisor trees and strategies, why our system kept imploding every time we disconnected more than 5 TCP/IP connections in a short period of time, and how you can avoid having the same thing happen to you.

Categories: Elixir, Erlang

Elixir and the Internet of Things (Part 3) – Rabbits, Rabbits Everywhere.

The Stampede

(Cross-posted from neo.com, somewhat belatedly)

In honor of Erlang Factory 2014 (where I’ll be soaking much Elixir and Erlang knowledge today and tomorrow), I’d like to present the 3rd and final installment in my series on using Elixir when dealing with the Internet of Things.

In part 1 of this series, I discussed the stampede of connections issue we faced when working with Nexia Home on their home automation system, and what (Ruby) solutions we tried before landing on Elixir. Additionally, I explained how we backed into implementing an acceptor pool in order to handle 1000 new SSL connections/second from home automation bridges and other devices.

In part 2, I discussed the implementation of our TcpEndpoint process, and how it routes incoming, line-based TCP/IP protocols to a RabbitMQ queue for processing by our back-end systems, but I left the implementation of our RabbitProducer and RabbitConsumer for later.

So it appears we’re at now now.

Let’s start by taking a look at the Rabbit Producer. We’re using the ExRabbit Elixir rabbit client (forked from). The producer is a single Erlang process that simply pushes data to RabbitMQ. So far, it seems that this is fast enough to handle many thousands of messages per second, so we haven’t done any additional tweaking of this code.

     defmodule Owsla.RabbitProducer do
      use GenServer.Behaviour

      defrecord RabbitProducerState, amqp: nil, channel: nil, config: nil

      def start_link(config) do
        :gen_server.start_link({ :local, :producer }, __MODULE__, config, [])
      end

      def init(config) do
        amqp = Exrabbit.Utils.connect(host: config.rabbit_host,
                                      port: config.rabbit_port,
                                      username: config.rabbit_user,
                                      password: config.rabbit_pass)

        channel = Exrabbit.Utils.channel amqp
        queue = Exrabbit.Utils.declare_queue(channel, config.from_device, auto_delete: false, durable: true)

        {:ok, RabbitProducerState.new(amqp: amqp, channel: channel, config: config) }
      end

      def send(connection_id, message, from_device_seq) do
        :gen_server.cast(:producer, {:send, connection_id, message, from_device_seq})
      end

      def handle_cast({:send, connection_id, message, from_device_seq}, state ) do
        props = [headers: [{"connection_id", :longstr, connection_id}, {state.config.from_device_seq, :long, from_device_seq}]]
        Exrabbit.Utils.publish(state.channel, "", state.config.from_device, message, props)
        {:noreply, state }
      end

      def terminate(reason, state) do
        IO.puts "Rabbit producer terminating. Reason was:"
        IO.inspect reason
        Exrabbit.Utils.channel_close(state.channel)
        Exrabbit.Utils.disconnect(state.amqp)
        :ok
      end
    end

In init, we create a rabbitmq channel, declare our queue, and return a new record that stores our amqp connection and channel, along with some configuration information. A call to the send helper function will send a message to the locally registered producer process with the relevant data. Note that the TcpEndpoint maintains an outbound and inbound message sequence, and the from_device_seq parameter to send is that sequence number.

The single handle_cast receives the send message, adds two message headers for the connection ID and the previously-mentioned sequence number, and then publishes the message to a queue named in our configuration object.

There’s nothing really magical going on here, and the RabbitConsumer isn’t much different. However, we are leveraging another part of ExRabbit for the RabbitConsumer, which is the ExRabbit.Subscriber module.

    defmodule Owsla.RabbitConsumer do
      use Exrabbit.Subscriber

      def start_link(config) do
        :gen_server.start_link(Owsla.RabbitConsumer,
                          [host: config.rabbit_host,
                           port: config.rabbit_port,
                           username: config.rabbit_user,
                           password: config.rabbit_pass,
                           queue: config.to_device],
                          [])
      end

      def handle_message(msg, state) do
        case parse_message_with_props(msg) do
          nil -> nil
          {tag, payload, props} ->
            conid = get_header(props.headers, "connection_id")
            to_device_seq = get_header(props.headers, "to_device_seq")
            Owsla.TcpEndpoint.send(conid, to_device_seq, payload)
            ack state[:channel], tag
        end
      end

      def get_header(nil, _key), do: nil
      def get_header(:undefined, _key), do: nil
      def get_header([], _key), do: nil
      def get_header(headers, key) do
        [head | rest] = headers
        case head do
          {^key, _, value} -> value
          {_, _, _} -> get_header(rest, key)
        end
      end
    end

So, we start up our RabbitConsumer with the same kinds of configuration data we used for the RabbitProducer in the start_link function. The ExRabbit.Subscriber module does most of the hard work of subscribing to the queue we provided in the start_link call, and all we have to worry about is reacting to the handle_message function, which is called for us whenever we receive a message. Unfortunately, the headers property of the message properties is a list of tuples in the form of {key, type, value}, and not something useful like a HashDict, so we have some utility methods to walk the list and find the headers we need (namely, the connection ID and the sequence number from the incoming message) and then we just call the TcpEndpoint.send function.

If you remember from part 1, whenever we create a new TcpEndpoint process we register it’s PID in the global registry, which allows us to easily find the connection regardless of what node in our cluster it’s on. Note that we start up 8 (configurable) RabbitConsumers, and do some message ordering magic in the TcpEndpoint to make sure we’re delivering messages to the devices in the correct order.

And that’s pretty much it, round-trip from device -> backend -> device. For those who have been following along, our supervisor tree ends up looking like this:

Owsla Supervisor Tree

Categories: Elixir, Performance

Elixir and the Internet of Things (Part 2) – What’s in a Process?

January 30, 2014 1 comment

The Stampede

In my previous installment I discussed how our application is required to accept up to 1000 new TCP/IP connections per second, handle upwards of 40,000 simultaneous TCP/IP connections, and the kinds of things we tried to get that working under Ruby/JRuby, and how we eventually decided to try Elixir and Erlang to solve the problem. If you haven’t read part 1, I’d suggest reading it now. Go ahead… I’ll wait…

So, now that we’ve accepted 40,000 connections talking over SSL, what are we going to do with them? That was our next challenge.

TL;DR – (Erlang) Processes FTW

So each time we called Owsla.TcpSupervisor.start_endpoint(ssl_socket) in part 1, we created a new Erlang process for that SSL connection. That process has two responsibilities. First, it handles reading data from the socket (which is generally text-based and line-oriented) and putting it, along with some routing information, onto a RabbitMQ queue. Second, it can receive messages from any Erlang process in our cluster and send them back to the device.

40,000 Processes — is this guy nuts?

Squirrel with Nuts

No, I’m not nuts – I’m using Erlang. The Erlang programming language and runtime have their own version of a “process” which is much lighter weight than an OS Thread or process, which makes creating new processes very inexpensive and allows the VM to run 10s (if not 100s) of thousands of processes in a very efficient manner. Native threads on most modern OSes take up to several megabytes of memory for their initial stack, while Erlang processes take almost no memory to start up, and dynamically allocate stack as necessary.

The VM also has its own scheduler separate from the OS scheduler, which helps keep expensive OS-level context switching at bay. Interestingly, Erlang’s scheduler appears to be preemptive and not cooperative like many other “non-OS-thread concurrency solutions” which require the programmer to yield back time to the runtime cooperatively, and allow for tight, long-running loops to potentially hog CPU time without end.

The TcpEndpoint

So, what does this Endpoint look like anyway? Let’s walk through it and dissect some things.

defmodule Owsla.TcpEndpoint do
  use GenServer.Behaviour

We start by defining our module, and ‘importing’ Elixir’s GenServer.Behavior which tells OTP that this module is a gen_server.

  defrecord EndpointState, socket: nil, id: nil, sequence_no: 0, closed: false

Next, we define a record type called EndpointState that describes the current state of our endpoint. Note that for very simple GenServers, it’s probably fine to use a simple tuple for state, but at some point it becomes easier to manage if you create a record. Our state includes the SSL socket, a unique identifier, a sequence number for messages being sent to our back-end systems, and a flag to tell if, somehow, our process is still up but the SSL socket has been closed.

  def start_link(socket, id) do
    :gen_server.start_link({:global, id}, __MODULE__, {socket, id}, [])
  end

  def init({socket, id}) do
    { :ok, EndpointState.new(socket: socket, id: id) }
  end

Here, we’ve got our boilerplate “Start up my gen_server” code.

  def send(connection_id, message) do
    :gen_server.cast {:global, connection_id}, {:send, message}
  end

The send function is a simple wrapper around a call to :gen_server.cast as a convenience to consumers. It takes a connection ID (which is a UUID in our case) and a message to send back down the pipe. Next, we have our gen_server handling code:

  def handle_cast( {:start}, state) do
    :ssl.setopts(state.socket, [active: :once])
    {:noreply, state }
  end

Defining handle_cast functions that pattern match to certain messages is how your implement your behavior in an Erlang gen_server (part of the OTP library). This one (handling the :start message) is invoked by the TcpAcceptor way back in part 1, once it has transferred ownership of the SSL socket to this process. It tells the process that it can start doing things with the socket without crashing for accessing another process’s socket.
Note that we set the SSL socket to [active: :once] rather than [active: true]. The difference is important, as [active: true] can allow your process’s mailbox to be filled by TCP/IP messages if whatever is on the other end can send data to you very quickly. For more information, take a look at The Buckets of Sockets section of Learn you some Erlang for great good. Then, read all the rest of it for good measure.

  def handle_cast( {:send, message}, state) do
    unless state.closed do
      :ssl.send(state.socket,message)
    end
    {:noreply, state}
  end

This handle_cast matches our :send from before. It simply checks to see if our connection has been closed and, if not, sends the message down the SSL socket.

  def handle_info({:ssl, _, data}, state) do
    Owsla.RabbitProducer.send(state.id, data, state.sequence_no)
    :ssl.setopts(state.socket, [active: once])
    {:noreply, state.sequence_no(state.sequence_no + 1)}
  end

  def handle_info({:ssl_closed, _}, state) do
    IO.puts "Client closed socket - stopping connection #{state.id}"
    {:stop, :normal, state.closed(true)}
  end

  def handle_info({:ssl_error, _, reason}, state) do
    IO.puts "Error on socket.recv. Error was: #{reason}"
    IO.puts "Closing connection #{:uuid.to_string(state.id)}"
    {:stop, :error, state}
  end

The three handle_info functions above are all about handling asynchronous messages from the SSL socket. Because we have to be able to both send and receive data on this socket, we can’t just put a blocking :ssl.recv() call in a tight loop, so we need to receive our data asynchronously. We did that by setting the socket to active, as discussed above.

The first, {:ssl, _, data}, is called every time we get a line of data (remember our data is newline-separated – there are other options for breaking apart your data using gen_tcp if you need, for example, parsing of buffers based on the first N bytes of a packet containing the length of the rest of the packet). It simply forwards data to a RabbitMQ queue via the Owsla.RabbitProducer module/process, which is a wrapper around our fork of ExRabbit which uses the native Erlang rabbitmq client.

The other two handle_info functions deal with the SSL socket either closing “naturally” or having some kind of error. In either case, we want to stop the current process, so rather than return a tuple with :ok as the first item, we return a tuple with :stop. This signals to OTP that our process has completed and should be terminated. In one case, we return :normal as the second item in our tuple, to let our supervisor know that we shut down normally, and in the other we return :error so that OTP knows our process failed for some unexpected reason.

A quick note on Elixir defrecords – note that the first two handle_info functions transform their current state in some way – the {:ssl, _, data} handler returns:

  {:noreply, state.sequence_no(state.sequence_no + 1)}

the state.sequence_no(state.sequence_no + 1) part takes our existing state, creates a copy (as records are immutable) with all of the data the same except for the sequence_no field, which will be incremented by 1. The same pattern is used to set the closed field in the :ssl_closed handler. state.sequence_no(state.sequence_no + 1) is a much easier way to express EndpointState.new(socket: state.socket, id: state.id, sequence_no: state.sequence_no + 1)

Finally, we want to shut down our SSL connection whenever our process ends, so we define a terminate function to close it down before our process is removed by Erlang:

  def terminate(_reason, state) do
    IO.puts "Closing connection #{state.id}"
    :ssl.close(state.socket)
    :ok
  end

And that’s pretty much that. Note that I’ve walked you through a slightly older and simpler version of our TCP endpoint and, if you handled all of your business logic inside of the Endpoint process, this would probably be enough. Further, you would probably be able to ignore the sequence_no gymnastics we’re going through. However, the astute reader may have noticed that outbound messages are sent down the SSL connection in order of receipt in the process’ mailbox. In our case, those messages are being picked up by a pool of processes reading from a RabbitMQ queue, which means that messages could arrive out-of-order. Our current implementation handles this more gracefully, dealing with out-of-order messages and timeouts for when we somehow miss a message from RabbitMQ. However, the commensurate increase in complexity would make it much harder to describe in a blog post.

The final part of the solution is the RabbitConsumer, which picks up messages from our back-end system (currently still written in Ruby) and sends them back to the appropriate endpoint to be sent down the SSL connection, assuming it’s still around. But we can talk about that one another time.

Categories: Elixir, Performance Tags: , ,

Elixir and the Internet of Things – Handling a Stampede

January 13, 2014 2 comments

The Internet of Things is upon us, and being able to efficiently communicate with those things is going to become more and more difficult as more things get connected. I’ve been working as a consultant for Nexia Home Intelligence for the last year, and they were kind enough to let me blog a bit about what it is we’re doing to help them handle the large number of connected devices involved in running a hosted home automation solution.

TL;DR – Elixir/Erlang are Awesome

After trying out several Ruby-based solutions to the issues we faced handling the crush of 50k internet-connected devices, including EventMachine and Celluloid, running on MRI and jRuby, we spiked a solution on Elixir, which runs on the Erlang virtual machine. Leveraging the Erlang VM and out-of-the-box OTP frameworks, along with a few more advanced tricks, we’ve built a system that can easily handle our target workload.

Connections, Connections, Connections

There are lots of “things” in the world, and with IPv6 slowly gaining traction, many more of them will have an IP address (or at least internet connectivity of some kind). In many cases, the communication with these devices may be mostly one-way, where the device simply reports information back to a central server somewhere. In those cases, a simple HTTP-based API may well be able to handle the communications needs of your devices, and normal scaling suggestions for web servers will probably serve you just fine. However, there are certain classes of devices (locks, cameras, and thermostats, for example), that require bidirectional, near-real-time communication, and it is this kind of device that we’re dealing with.

In the Beginning…

Our original implementation leveraged Ruby’s EventMachine, which is an evented I/O system for Ruby (similar to Node.js, if that helps). Devices would connect directly to the Ruby TCP/IP server via SSL, and communicate with our back-end Rails-based system via Resque. Outbound data from the Rails app was sent to RabbitMQ, with a header identifying the specific device to which the message was to be directed. The device servers all subscribed to the RabbitMQ exchange, and whichever one held the connection to the device would send out the results. This system held up well under stead-state load in the 3-6000 connection range. However, there were some issues…

The Stampede

Elephant Stampede

There are several kinds of problems you’ll run into when you try to build a system that can handle 10s to 100s of thousands of simultaneous TCP/IP connections. The first one is “how do I handle all of these things connecting in rapid succession.” This was, in fact, the original impetus for investigating splitting the TCP/IP handling side of things from the “Business Logic” side. And, because all problems in Computer Science can be solved with an additional level of indirection, we decided to add an additional layer in front of the business logic to simply terminate SSL connections and route raw data to a pool of workers, which would handle the business logic and communication with the back-end. Given we are familiar with EventMachine, it seemed to be a no-brainer to simply split the original implementation in two, with some additional logic to handle things like message ordering now that data flowing to/from the device was transiting via a second hop over RabbitMQ instead of the more direct path from before. So that’s what we did.

Houston, we have a problem…

312px-Apollo13_-_SM_after_separation

Apollo13 – view of the crippled Service Module after separation. NASA image AS13-59-8500

We quickly found that the new system was unable to keep up with the load of even 5,000 devices, much less our goal of something closer to 50,000 devices on a single machine. Things were not looking good. Using rubyprof, we found (and fixed) several performance-related issues with Ruby’s amqp-gem, which made some significant performance improvements. However, at this point we were CPU-bound with no obvious single bottleneck left to go after to further improve performance. It appeared that the single-threaded nature of EventMachine, along with some additional amqp-gem related performance issues (even when load-balancing against many instances on a multi-core machine), were going to sink this implementation.

A Side-Track Through Celluloid.io

Given our core application was a Ruby on Rails application, we really wanted to stay on Ruby for this solution, so we spent some time spiking a solution on Celluloid.io to see if it’s actor-based, multi-threaded model (and using jRuby as our underlying ruby implementation) would help resolve our issues. Running on jRuby, and using the march-hare gem, which is a lightweight wrapper around the java-based AMQP client, we hoped to be able to hit our targets.  And, while I must admit that we abandoned this effort without a significant amount of profiling time, it only got us to about 15k connections before it fell over, mostly due to huge memory consumption (on the order of 30 GB). My understanding (learned much later) is that this may indicate that we did something with Celluloid that we shouldn’t have, but we didn’t have time to continue down this route to try to fix the memory leak at this time.

Elixir to the Rescue

Finally, we felt it was necessary to try something a bit more drastic. Having been introduced to Elixir a few months ago, and understanding that the Erlang runtime was designed in large part for this kind of problem, we approached the team at Nexia with the suggestion that we take a few weeks and spike a solution using Elixir. We chose Elixir over Erlang because it’s syntax was much closer to the Ruby that the developers of the existing Rails application, which should make transitioning this work to their core team easier. So, we started to learn more about Elixir, Erlang, and the OTP framework that promised to help us build a scalable, robust system that could, conceivably, provide 99.9999999% uptime (ok, maybe we’re not that good, but Erlang can be).

A quick note – Elixir is a great language on top of the amazing Erlang runtime, and the OTP framework and libraries really provide most of the features we’re leveraging for this application, so you can mostly replace Elixir with Erlang in this post, and I’ll try to call out Elixir-specific stuff if there is any.

Welcoming the Herd With Open Arms…

or, how do you accept 1000 connections per second on a single machine, with one OS process. Most of the example TCP/IP (or SSL, in our case) server examples you find on the internet generally do something like:

  1. Create a listen socket
  2. block on that socket until a client connects (accept)
  3. hand off the newly-accepted socket to some other process
  4. Goto 1

In Elixir, this would look something like this tail-recursive call that implements our loop above for an SSL-based connection:

defp do_listen(listen_socket) do
  {:ok, socket} = :ssl.transport_accept(listen_socket)
  :ok = :ssl.ssl_accept(socket)
  endpoint = TcpSupervisor.start_endpoint(socket)
  :ssl.controlling_process(socket, endpoint)
  :gen_server.cast endpoint, {:start}
  do_listen(listen_socket)
end

Notice that you’ve now single-threaded your application’s ability to accept new connections, which will eventually cause the operating system to simply refuse new connections on your listening port if you can’t keep up with accepting them in a timely manner. There are some things you can tweak to get more time (especially, increasing the listen backlog for your service to allow more pending connections), but eventually you’re going to have to do something about that single listener. In Elixir, the answer is to spin up multiple “acceptor” processes, each of which blocks on the same listen port (yes, you can do this!). When a new connection arrives, it will awake the next available waiting process and that process will handle that connection. This pattern has allowed us to get to 1000 connections/second on a single server quite easily (and we haven’t really found out what the upper limit was). The code is obviously a bit more complex. First, we have a supervisor that owns the listen socket, and its children will be the acceptor processes:

defmodule Owsla.TcpListenerSupervisor do
  use Supervisor.Behaviour
 
  def start_link(port, acceptor_count, backlog) do
    :supervisor.start_link({ :local, :listener_sup}, __MODULE__, [port, acceptor_count, backlog])
  end
 
  def init([port, acceptor_count, backlog]) do
    :ssl.start()
    {:ok, listen_socket} = create_listen_socket(port, backlog)
    spawn(fn -> 
      Enum.each(1..acceptor_count,
        fn (_) -> start_listener() end
        )
      end)
      tree = [ worker(Owsla.TcpAcceptor, [listen_socket], restart: :permanent) ]
      supervise(tree, strategy: :simple_one_for_one)
  end
 
  def create_listen_socket(port, backlog) do
     tcp_options = [
      :binary,
      {:packet, :line}, 
      {:reuseaddr, true}, 
      {:active, false},
      {:backlog, backlog}
      ]
    :gen_tcp.listen(port, tcp_options)
  end
 
  def start_listener() do
    :supervisor.start_child(:listener_sup, [])
  end 
end

Next, we have the acceptors themselves:

defmodule Owsla.TcpAcceptor do
  use GenServer.Behaviour
 
  @ssl_options  [{:certfile, "deviceserver.crt"}, {:keyfile, "deviceserver.key"},
      {:ciphers, [{:dhe_rsa,:aes_256_cbc,:sha256},
           {:dhe_dss,:aes_256_cbc,:sha256},
           {:rsa,:aes_256_cbc,:sha256},
           {:dhe_rsa,:aes_128_cbc,:sha256},
           {:dhe_dss,:aes_128_cbc,:sha256},
           {:rsa,:aes_128_cbc,:sha256},
           {:dhe_rsa,:aes_256_cbc,:sha},
           {:dhe_dss,:aes_256_cbc,:sha},
           {:rsa,:aes_256_cbc,:sha},
           {:dhe_rsa,:'3des_ede_cbc',:sha},
           {:dhe_dss,:'3des_ede_cbc',:sha},
           {:rsa,:'3des_ede_cbc',:sha},
           {:dhe_rsa,:aes_128_cbc,:sha},
           {:dhe_dss,:aes_128_cbc,:sha},
           {:rsa,:aes_128_cbc,:sha},
           {:rsa,:rc4_128,:sha},
           {:rsa,:rc4_128,:md5},
           {:dhe_rsa,:des_cbc,:sha},
           {:rsa,:des_cbc,:sha} 
          ]}]
 
  def start_link(listen_socket) do
    :gen_server.start_link(__MODULE__, listen_socket, [])
  end
 
  def init(listen_socket) do
    # Setting the process priority to high /seems/ to improve performance of
    # incoming connection rate, but it also /seems/ to slow down processing
    # of messages. For now, we punt and leave the priority at the default setting.
    #Process.flag(:priority, :high)
    :gen_server.cast self, {:listen}
    {:ok, listen_socket }
  end
 
  def handle_cast( {:listen}, listen_socket) do
    do_listen(listen_socket)
  end
   
  defp do_listen(listen_socket) do
    case :gen_tcp.accept(listen_socket) do
      {:ok, socket} ->
        case :ssl.ssl_accept(socket, @ssl_options) do
          {:ok, ssl_socket} ->
            endpoint = Owsla.TcpSupervisor.start_endpoint(ssl_socket)
            :ssl.controlling_process(ssl_socket, endpoint)
            :gen_server.cast endpoint, {:start}
            do_listen(listen_socket)
          {:error, :closed} ->
            do_listen(listen_socket)
        end
      {:error, :closed} -> do_listen(listen_socket)
      {:error, _} -> { :stop, :error, [] }
    end
  end
end

Lines 48-51 above

  1. start up a new process to handle the individual TCP/IP connection (the TcpSupervisor.start_endpoint call)
  2. Transfers control of the SSL connection to that process (this is an Erlang thing)
  3. Starts up the endpoint listening for messages on its connection
  4. and then, just like before, does a tail-recursive call to listen again.

However, now we have 1000 of these running at a time, with a TCP/IP backlog of 2000 connections, and we have no issue handling 1000 connections/second. Note that we also haven’t tweaked those numbers at all – this was our first guess, and it “just worked” so we left it alone (but configurable). It’s possible these are non-optimal, YMMV, IANAL, etc.
From there, the individual endpoint processes forward messages across RabbitMQ to our back-end systems. There were some additional challenges there, which I’ll talk about in another post.

Now, For Something Completely Different…

October 15, 2012 Leave a comment

Image

As many of you probably know, I’ve been in search of a new job for a while now, and I’m happy to report that I’ve accepted an offer from New Context and will be joining their incredible Cincinnati team very soon. For some, this may come as a bit of a shock, as it’s pretty far afield from my decade-plus of .Net/Microsoft focused development. Which is certainly true.

However, I felt that it was the right time for me to push myself as an individual and grow some in my career, and, after spending a day with the folks at New Context pairing and just getting to know them better, I knew that this was the place for me.

At the start, I’ll be working on polishing my Ruby on Rails skills, but there’s also some iOS development in my future I’m sure.

Don’t think that I’ll suddenly stop showing up to the Cincinnati .Net Users’ Group or anything – I intend to keep participating. But I’ll also be a more regular attendee at Cincinnati Ruby Brigade and Cincinnati FP as well.

 

Categories: Uncategorized

Finding the Seams – Applying the Single Responsibility Principle to Get Code Under Test

September 7, 2012 Leave a comment

Last night, I attended Cincy Clean Coders and ended up having a fairly long conversation with another attendee about testing legacy code. In her particular case, she had a component that needed to be put under test, but she was having trouble figuring out how to get it done. As is often the case with legacy code, what we found was that the component in question had too many (read >1) responsibilities and, by identifying those responsibilities and splitting them out into appropriate classes, we could in fact test the code in question.

Here’s an incredibly simplified version of the class in question (which, in real life, created many different message types, but in the end always sent an XML document):

using System;
using System.Xml.Linq;
using System.Collections.Generic;

namespace MessageManager
{
	public class MessageCreator
	{
		private static Queue messageQueue = new Queue();

		public static XDocument GetNextMessage ()
		{
			return messageQueue.Dequeue ();
		}

		public MessageCreator ()
		{
		}

		public void CreateAndSendMessage(int id)
		{
			XDocument message = this.CreateMessage(id);
			this.SendMessage(message);
		}

		private XDocument CreateMessage(int id)
		{
			var data = this.GetDataFromDatabase(id);
			data.Element ("Document").Add (
				new XElement("AdditionalData", "Some Other Data")
				);
			return data;
		}

		private XDocument GetDataFromDatabase (int id)
		{
			// In the real world, this actually calls a stored procedure which returns XML
			var doc = new XDocument(
				new XElement("Document",
			             new XAttribute("Id", id)
			             )
				);
			return doc;
		}

		private void SendMessage(XDocument message)
		{
			// Send the message across the wire to somewhere
			// This message mutates state in another system, which is bad for unit testing the MessageCreator
			// for now, just add the message to an in-memory queue
			MessageCreator.messageQueue.Enqueue (message);
		}
	}
}

As you can see, there are three related but different responsibilities in this code. First, we load some data from the database (in XML format). Next, we add some additional data to the base message, and then we send the message to the external system. Unfortunately, sending that data alters the state of the external system, and any other developer hitting that system will run into problems with their local systems being out of sync.
So, the first goal is to break apart the sending of messages from the creation of messages, which ends up being a fairly simple task. However, as with any changes to legacy code, if we can create an integration test to prove we’re not breaking existing functionality while refactoring, we should do that. Therefore, we start with an end-to-end integration test that clarifies how the system works today and ensures that the appropriate message is sent:

[TestFixture()]
public class MessageManagerIntegrationTests
{
	[Test()]
	public void TestMessageSending ()
	{
		var creator = new MessageCreator();
		var otherData = "Some Other Data";
		var id = 1;
		creator.CreateAndSendMessage(id, otherData);
		var message = MessageSender.GetNextMessage ();
		var root = message.Root;
		Assert.That (root.Attribute("Id").Value, Is.EqualTo (id.ToString()));
		Assert.That (root.Element ("AdditionalData").Value, Is.EqualTo(otherData));
	}
}

With our first integration test in place, we can attempt to break appart some of the functionality of the MessageCreator for better testability. Our next step is to move the sending of messages to a separate class, and implement an interface that we can later use to mock the message sending.

public interface IMessageSender
{
	void SendMessage(XDocument message);
}

And the implementation:

public class MessageSender: IMessageSender
{
	private static Queue messageQueue = new Queue();

	///
<summary>
	/// Gets the next message on the queue. Note that in reality, this class sends messages to an external system,
	/// and validating the message send is more difficult than merely checking a queue.
	/// </summary>
	///
	/// The next message.
	///
	public static XDocument GetNextMessage ()
	{
		return messageQueue.Dequeue ();
	}

	public MessageSender ()
	{
	}

	public void SendMessage(XDocument message)
	{
		// Send the message across the wire to somewhere
		// This message mutates state in another system, which is bad for unit testing the MessageCreator
		// for now, just add the message to an in-memory queue
		messageQueue.Enqueue (message);
	}
}

Now, we can use a mock implementation of IMessageSender in our unit tests without actually sending data to the external system and getting a developer’s local database out of sync with the other system. Using Moq as our mock framework, this might look something like:

[TestFixture()]
public class MessageManagerTests_CreateAndSendMessage
{

	int id = 1;
	string otherData = "Some other data";
	XDocument result;

	[SetUp()]
	public void SetUp ()
	{
		var sender = new Mock(MockBehavior.Strict);
		sender.Setup (s => s.SendMessage(It.IsAny()))
			.Callback(d => result = d);
		var creator = new MessageCreator(sender.Object);
		creator.CreateAndSendMessage(id, otherData);
	}

	[Test]
	public void should_add_correct_id ()
	{
		Assert.That (result.Root.Attribute ("Id").Value, Is.EqualTo (this.id.ToString ()));
	}

	[Test]
	public void should_set_other_data ()
	{
		Assert.That (result.Root.Element ("AdditionalData").Value, Is.EqualTo (this.otherData));
	}
}

f you’d like to see the complete example, including all the commits along the way, you can check out the code at https://github.com/JeetKunDoug/LegacySeamsExample. In that repository, I’ve applied the same techniques to the data access portion of the MessageCreator class, so that our unit tests can now run without access to a database at all.

Working with legacy code can be a daunting task, but with some simple, mechanical refactorings, and the support of unit/integration tests, you can get your code under test.

Understand Your Application’s Performance and Save

August 24, 2012 Leave a comment

You know the feeling. You’ve finally deployed your system into production, and everything seems to be running smoothly. However, your end-users start to complain about the speed of your application, and within a few days things have slowed to a crawl. You think you know what the problem is, remembering this hack-ish bit of code you thought might be a performance problem, so you jump in and fix that piece of code. You deploy your fixes, and nothing changes. Then, you try the next thing on your list. And the next…

Understanding the actual run-time behavior of your application can save you time and money. However, gaining that understanding can be difficult. One of the most effective ways to better understand your system’s behavior is to add instrumentation to collect actionable data on what operations are taking the longest.

In this post, I’m going to present an example of how I leveraged the decorator pattern, some dependency injection, and windows performance counters to get a better understanding of the behavior of a Windows Azure-based application, and how that knowledge helped us focus on the real performance issues in our application and reduced our monthly computing costs on Windows Azure by more than half.

The Situation

This application is a queue-based system that receives commands on a Windows Azure queue, retrieves the appropriate data (from Azure Table storage or external APIs), does some aggregation/processing, and then writes the resulting data back to table storage.

In the beginning, we deployed our system out to Azure using several medium instances, and started watching our queue lengths to see how things were going. It ended up that, most of the time, 4 Medium instances would handle the load just fine, but occasionally (and more frequently than we thought), the queues would get backed up to the point that we’d have to add up to 8 additional instances to keep up with the load. Eventually, we ended up constantly running 12 instances just to make sure things didn’t get backed up, which cost quite a bit of money.

With no instrumentation in the application, we were flying blind, with no good way to figure out what was taking so much time. However, we had over 50 different commands the system could process, and adding code to write performance data for each of them individually was not feasible.

Adding Instrumentation the Lazy Way

The framework we were using to handle our queue processing used AutoFac to create the command handler instances, and Autofac supports wrapping the implementation of any class in one or more decorators that implement the same interface as the class being wrapped.

Thankfully, every command handling class in the system implemented an interface similar to:

public interface IHandle<TMessage> {
    void Handle(TMessage message);
}

So, as long as I could write something that generically implemented IHandle<TCommand> and could publish data to the correct place (in this case, performance counters) I was all set. I had previously used Enterprise Library’s Policy Injection Application Block to do some Aspect Oriented Programming, and knew that they had an instrumentation policy that would do about what I wanted, so I leveraged their source and simplified it somewhat for my needs. I’ve attached a slightly modified version of our decorator here, if you’re interested in the nity-grity details, or would like to use it (sadly, WordPress won’t let you upload a .cs file, so I had to call it .doc – you’ll have to rename it to .cs).

The Results

What we found was very interesting, very useful, and not at all what we expected. We knew that there were several classes of messages, with very different workloads. Some were heavily CPU/memory limited, as they were aggregating tens of thousands of rows of data. Others, calling out to third-party APIs, were I/O bound and completely dependent on the speed at which those services could return data to us. And we assumed those heavy, CPU/Memory bound messages were clearly going to take the longest to run.

And we were wrong. Looking at the data (which, unfortunately, I don’t have easy access to any more), we found that the I/O bound calls to third party APIs were sometimes taking upwards of 5 minutes to complete, making our entire system crawl to a halt as each of our instances (intentionally) was single-threaded and, therefore, could only process one message at a time. At about the same time we gathered this insight, Microsoft reduced the cost of extra-small instances to $.02/hour, which meant we could run 6 extra-small instances for the cost of a single small, and 12 for the cost of a medium instance. Knowing that the messages that were keeping us blocked were I/O bound and not CPU/Memory intensive, we knew what to do:

  1. Split the single command queue into a command queue and an “i/o bound” command queue
  2. Add another role type that only handled this new queue
  3. Update our message routing to route to the appropriate queue
  4. Redeploy with our heavy-lifting aggregation instances running on Medium VMs with and the new I/O bound workers on Extra-small VMs.
  5. Update our autoscaling rules appropriately, so that each worker role was scaled according to the length of the proper queue.
  6. Save money

Decorators to the Rescue

As we added more clients, we started to see a general slow-down in some of our command processing, but it appeared that it was only certain messages of a type that were the culprits. We therefore uses the same technique to add a “Long Running Messages Recorder” that would log the actual messages that took so long, so we could do additional analysis and see if there were other reasons for these issues. We found that sometimes, the delay was due to simple intermittent issues with Azure storage, but there were a class of messages that consistently took longer to run. These messages, which we could now review in detail, were accessing data across datacenters, sometimes across continents. We therefore built a “geo-worker” role that could be deployed in these other datacenters, and updated our routing code to route messages tagged with a specific header to a different geoworker queue, again greatly increasing the efficiency of our queue processing.

In Summary

Although our particular application was running on Windows Azure, these techniques are applicable to any system, in-house or hosted in the cloud. Understanding the behavior of your system through the collection of real-life performance data can save you big-time. Also, using the decorator pattern happened to work well for our situation, as each handler method had the exact same “generic” signature. However, you can use tools like Unity’s Interception (and the EntLib Policy Injection Application Block, which has a more full-featured policy for doing this built in), Aspect-oriented programming with PostSharp (who have an excellent post on adding performance counters here), or just adding the necessary code in by hand if none of these options makes sense. However you gather the data, you’ll know that you’re spending your time on fixing the actual performance issues in your application, instead of what you think those problems are.

P.S. – Think you could use someone like me to help you solve your performance issues? I’m available.

Follow

Get every new post delivered to your Inbox.

Join 257 other followers