OS X Yosemite and Max Open Files

October 20, 2014 2 comments

A quick note on Ulimits and OS X Yosemite. If you need to increase your ulimit for max open files, some of the old tricks (especially launchctl.conf) no longer work. You’ll need to create a new file,/etc/sysctl.conf if it doesn’t already exist, and add the following lines:

kern.maxfiles=<big number>
kern.maxfilesperproc=<slightly smaller or equal big number>

kern.maxfiles determines the maximum number of files available to be open across the entire system. kern.maxfilesperproc, not surprisingly, sets the limit for a single process.

Save the file, and reboot for good measure, and you should now be able to set a per process max open files greater than the default 10240. Unfortunately, /etc/launchd.conf appears to be ignored now due to ‘security concerns,’ so you’ll have to add a line to your .bashrc or .zshrc (or your .rc) to up your ulimit if you don’t want to do it every time you start up whatever program you have that needs it:

ulimit -n 32768

As I’m working on Riak, which requires many more open files than the defaults. Per the Riak Docs, I’ve set mine to:

kern.maxfiles=262144
kern.maxfilesperproc=32768

This should support an 8-node cluster’s worth of open files, which is way more than I’ll ever actually run on this machine. Mind you, if you’re attempting to run a very large cluster, or a cluster with many, many, keys, you may need to tune this further. In other words, YMMV.

Categories: Mac OS X, Riak Tags: , ,

Strangeloop Elixir and the IoT talk video available

October 20, 2014 Leave a comment

I love Strangeloop’s videos, as they capture the screen separately from the speaker and then merge the two together nicely. My talk on Elixir and the Internet of Things from Strangeloop 2014 is available:

Categories: Uncategorized

Video from my Dayton Elixir talk on IoT available.

August 19, 2014 Leave a comment

For those who would like to hear my talk on Elixir and the Internet of Things, it’s available on YouTube:

Categories: Uncategorized

A Gentle Introduction to OTP

April 24, 2014 Leave a comment

Supervisors, Workers, and Trees – An Overview

Open Telecom Platform, or OTP is a group of Erlang libraries that help implement distributed, fault-tolerant systems. Because of the patterns that are encouraged by the OTP libraries, these systems tend to be built of lots of small, independent processes that are easy to reason about and manage. And while the name implies that they are only useful for Telecom applications, many types of applications can be built using OTP.

At its simplest, an OTP application, at runtime, is nothing but a tree of Erlang processes. There are two fundamental kinds of OTP processes (implemented as Erlang behaviors). The first is the Supervisor, the job of which is to start up its children manage their lifecycle. The other is the Worker (of which there are several sub-types). Workers, as the name implies, do the work of the system.

I ended my third installment on Elixir and the Internet of Things with a picture of the Supervisor Tree we’ve built for our SSL RabbitMQ endpoint host. As a reminder, it looks like this:

Owsla Supervisor Tree

For those of you who are familiar with OTP, this diagram probably already makes sense. If you’re not, follow along and I’ll try to make some sense of this slightly different version of a typical “circles, boxes, and lines” diagram.

Supervisors

Supervisors do nothing but start, monitor, and (potentially) restart their children. While this sounds like a small thing, supervisors are, in may ways, what give OTP applications their reputation for stability and resilience in the face of errors. I’ll go into more details about supervisors, supervision strategies, and restart settings in a later post, but for now understand that each supervisor can be configured to do start up different kinds of children and react differently when those children die.

Workers

As their name implies, OTP workers perform the actual work of the application. There are three generic worker behaviors provided by the OTP libraries, the gen_server, gen_fsm, and gen_event, each of which is implemented as a behavior in Erlang parlance.

gen_server (Generic Server)

The most generic of the three is the gen_server behavior. This behavior implements a typical Erlang event loop and provides callbacks for a generic server. However, it doesn’t inherently implement any sort of networking server (as its name may imply) beyond what any Erlang process provides. It’s really a host for fairly generic business/communications logic. You may, however, use this to create things like our TcpEndoint from the Owsla project, where the generic gen_server happens to “own” a TCP connection and receives and reacts to events from that connection. Rather than repeat myself, I’ll refer you to the 2nd part of my series on Elixir and the Internet of Things, where I break down our TcpEndpoint worker in great detail.

gen_fsm (the Finite State Machine)

Does anyone remember what Finite State Machines (FSMs) are? You know, those little circle->arrow->circle diagrams that turn into regular expressions, or something like that? At least, that’s what I remember from my CS class in college. Seriously though, FSMs are useful for situations where you have logic that maps to a series of states and transitions. A small example from the wikipedia article on Finite State Machines will give you the idea of what I’m talking about. This one models a simple coin-operated turnstyle:

Coin Operated Turnstyle FSM

These kinds of workers are often used to model real-world devices (like the above-mentioned turnstyle).

	-module(turnstyle).
	-behaviour(gen_fsm).

	-export([start_link/1]).
	-export([coin/0]).
	-export([push/0]).
	-export([init/1, locked/2, unlocked/2]).

	start_link() -&gt;
	    gen_fsm:start_link({local, turnstyle}, tunrstyle, {}, []).

	coin() -&gt;
	    gen_fsm:send_event(turnstyle, {}).

	init() -&gt;
	    {ok, locked, {[]}}.

	locked({coin}, {}) -&gt;
	       do_unlock(),
	       {next_state, unlocked, {[], }, 30000}.

	locked({push}, {}) -&gt;
	       {next_state, locked, {[], }, 30000}.
	       
	unlocked(timeout, {}) -&gt;
	    do_lock(),
	    {next_state, locked, State}.
	    
    unlocked({push}, {}) -&gt;
    	do_rotate(),
    	{next_state, locked, State}.

You may have noticed that this is straight-up Erlang code. This is because, as of Elixir 0.12.5, the GenFSM.Behavior module was deprecated, and it was removed in 0.13. The reason given is that there are easier and more idiomatic ways in Erlang to implmement a finite state machine, and that those solutions (like) plain_fsm probably provide a better basis for a Finite State Machine implementation in Elixir. Also, please pardon what I am sure are the many typos in my Erlang code (I compiled it in my head and it seemed to work fine).

gen_event (Event Handlers)

Event Handlers, as implemented by the gen_event behavior, work in partnership with an event manager to separate the responsibility of raising events and handling them in different ways. An event manager may have zero, one, or several event handlers registered, and each will respond to an event sent to its event manager. Let’s implement the console logger from the erlang docs in Elixir:

	defmodule ConsoleLogger do
		use GenEvent.Behaviour
		
		def init(_) do
			{ :ok, [] }
		end
		
		def handle_event(error, state) do
			IO.puts &quot;***ERROR*** #{error}&quot;
			{:ok, state}
		end
	end

Now, you can start up a generic “event manager”, register our handler with it, and send it events:

	# start up our event manager
	:gen_event.start({:local, :error_manager})
	# add our handler
	:gen_event.add_handler(:error_manager, ConsoleLogger, [])
	# Now, send an event
	:gen_event.notify(:error_manager, :error)

which would result in an IEX session something like this:

	iex(2)&gt; :gen_event.start({:local, :error_manager})
	{:ok, #PID&lt;0.59.0&gt;}
	iex(3)&gt; :gen_event.add_handler(:error_manager, ConsoleLogger, [])
	:ok
	iex(4)&gt; :gen_event.notify(:error_manager, :error)
	:ok
	***ERROR*** error
	iex(5)&gt;

Note that you can register additional event handlers, each of which will be notified whenever an event is sent to the event manager. This is great when you want to separate out different concerns (logging to the console, log files, database tables, and airbrake.io, for example) and simply register the event handler(s) you need at the time.

Conclusion

This only scratches the surface of OTP, but hopefully it will give you a starting point for understanding how some of the moving parts fit together. In a future post, I’ll go into more detail on building OTP applications, supervisor trees and strategies, why our system kept imploding every time we disconnected more than 5 TCP/IP connections in a short period of time, and how you can avoid having the same thing happen to you.

Categories: Elixir, Erlang

Elixir and the Internet of Things (Part 3) – Rabbits, Rabbits Everywhere.

March 6, 2014 1 comment

(Cross-posted from neo.com, somewhat belatedly)

In honor of Erlang Factory 2014 (where I’ll be soaking much Elixir and Erlang knowledge today and tomorrow), I’d like to present the 3rd and final installment in my series on using Elixir when dealing with the Internet of Things.

In part 1 of this series, I discussed the stampede of connections issue we faced when working with Nexia Home on their home automation system, and what (Ruby) solutions we tried before landing on Elixir. Additionally, I explained how we backed into implementing an acceptor pool in order to handle 1000 new SSL connections/second from home automation bridges and other devices.

In part 2, I discussed the implementation of our TcpEndpoint process, and how it routes incoming, line-based TCP/IP protocols to a RabbitMQ queue for processing by our back-end systems, but I left the implementation of our RabbitProducer and RabbitConsumer for later.

So it appears we’re at now now.

Let’s start by taking a look at the Rabbit Producer. We’re using the ExRabbit Elixir rabbit client (forked from). The producer is a single Erlang process that simply pushes data to RabbitMQ. So far, it seems that this is fast enough to handle many thousands of messages per second, so we haven’t done any additional tweaking of this code.

     defmodule Owsla.RabbitProducer do
      use GenServer.Behaviour

      defrecord RabbitProducerState, amqp: nil, channel: nil, config: nil

      def start_link(config) do
        :gen_server.start_link({ :local, :producer }, __MODULE__, config, [])
      end

      def init(config) do
        amqp = Exrabbit.Utils.connect(host: config.rabbit_host,
                                      port: config.rabbit_port,
                                      username: config.rabbit_user,
                                      password: config.rabbit_pass)

        channel = Exrabbit.Utils.channel amqp
        queue = Exrabbit.Utils.declare_queue(channel, config.from_device, auto_delete: false, durable: true)

        {:ok, RabbitProducerState.new(amqp: amqp, channel: channel, config: config) }
      end

      def send(connection_id, message, from_device_seq) do
        :gen_server.cast(:producer, {:send, connection_id, message, from_device_seq})
      end

      def handle_cast({:send, connection_id, message, from_device_seq}, state ) do
        props = [headers: [{"connection_id", :longstr, connection_id}, {state.config.from_device_seq, :long, from_device_seq}]]
        Exrabbit.Utils.publish(state.channel, "", state.config.from_device, message, props)
        {:noreply, state }
      end

      def terminate(reason, state) do
        IO.puts "Rabbit producer terminating. Reason was:"
        IO.inspect reason
        Exrabbit.Utils.channel_close(state.channel)
        Exrabbit.Utils.disconnect(state.amqp)
        :ok
      end
    end

In init, we create a rabbitmq channel, declare our queue, and return a new record that stores our amqp connection and channel, along with some configuration information. A call to the send helper function will send a message to the locally registered producer process with the relevant data. Note that the TcpEndpoint maintains an outbound and inbound message sequence, and the from_device_seq parameter to send is that sequence number.

The single handle_cast receives the send message, adds two message headers for the connection ID and the previously-mentioned sequence number, and then publishes the message to a queue named in our configuration object.

There’s nothing really magical going on here, and the RabbitConsumer isn’t much different. However, we are leveraging another part of ExRabbit for the RabbitConsumer, which is the ExRabbit.Subscriber module.

    defmodule Owsla.RabbitConsumer do
      use Exrabbit.Subscriber

      def start_link(config) do
        :gen_server.start_link(Owsla.RabbitConsumer,
                          [host: config.rabbit_host,
                           port: config.rabbit_port,
                           username: config.rabbit_user,
                           password: config.rabbit_pass,
                           queue: config.to_device],
                          [])
      end

      def handle_message(msg, state) do
        case parse_message_with_props(msg) do
          nil -> nil
          {tag, payload, props} ->
            conid = get_header(props.headers, "connection_id")
            to_device_seq = get_header(props.headers, "to_device_seq")
            Owsla.TcpEndpoint.send(conid, to_device_seq, payload)
            ack state[:channel], tag
        end
      end

      def get_header(nil, _key), do: nil
      def get_header(:undefined, _key), do: nil
      def get_header([], _key), do: nil
      def get_header(headers, key) do
        [head | rest] = headers
        case head do
          {^key, _, value} -> value
          {_, _, _} -> get_header(rest, key)
        end
      end
    end

So, we start up our RabbitConsumer with the same kinds of configuration data we used for the RabbitProducer in the start_link function. The ExRabbit.Subscriber module does most of the hard work of subscribing to the queue we provided in the start_link call, and all we have to worry about is reacting to the handle_message function, which is called for us whenever we receive a message. Unfortunately, the headers property of the message properties is a list of tuples in the form of {key, type, value}, and not something useful like a HashDict, so we have some utility methods to walk the list and find the headers we need (namely, the connection ID and the sequence number from the incoming message) and then we just call the TcpEndpoint.send function.

If you remember from part 1, whenever we create a new TcpEndpoint process we register it’s PID in the global registry, which allows us to easily find the connection regardless of what node in our cluster it’s on. Note that we start up 8 (configurable) RabbitConsumers, and do some message ordering magic in the TcpEndpoint to make sure we’re delivering messages to the devices in the correct order.

And that’s pretty much it, round-trip from device -> backend -> device. For those who have been following along, our supervisor tree ends up looking like this:

Owsla Supervisor Tree

Categories: Elixir, Performance

Elixir and the Internet of Things (Part 2) – What’s in a Process?

January 30, 2014 2 comments

In my previous installment I discussed how our application is required to accept up to 1000 new TCP/IP connections per second, handle upwards of 40,000 simultaneous TCP/IP connections, and the kinds of things we tried to get that working under Ruby/JRuby, and how we eventually decided to try Elixir and Erlang to solve the problem. If you haven’t read part 1, I’d suggest reading it now. Go ahead… I’ll wait…

So, now that we’ve accepted 40,000 connections talking over SSL, what are we going to do with them? That was our next challenge.

TL;DR – (Erlang) Processes FTW

So each time we called Owsla.TcpSupervisor.start_endpoint(ssl_socket) in part 1, we created a new Erlang process for that SSL connection. That process has two responsibilities. First, it handles reading data from the socket (which is generally text-based and line-oriented) and putting it, along with some routing information, onto a RabbitMQ queue. Second, it can receive messages from any Erlang process in our cluster and send them back to the device.

40,000 Processes — is this guy nuts?

No, I’m not nuts – I’m using Erlang. The Erlang programming language and runtime have their own version of a “process” which is much lighter weight than an OS Thread or process, which makes creating new processes very inexpensive and allows the VM to run 10s (if not 100s) of thousands of processes in a very efficient manner. Native threads on most modern OSes take up to several megabytes of memory for their initial stack, while Erlang processes take almost no memory to start up, and dynamically allocate stack as necessary.

The VM also has its own scheduler separate from the OS scheduler, which helps keep expensive OS-level context switching at bay. Interestingly, Erlang’s scheduler appears to be preemptive and not cooperative like many other “non-OS-thread concurrency solutions” which require the programmer to yield back time to the runtime cooperatively, and allow for tight, long-running loops to potentially hog CPU time without end.

The TcpEndpoint

So, what does this Endpoint look like anyway? Let’s walk through it and dissect some things.

defmodule Owsla.TcpEndpoint do
  use GenServer.Behaviour

We start by defining our module, and ‘importing’ Elixir’s GenServer.Behavior which tells OTP that this module is a gen_server.

  defrecord EndpointState, socket: nil, id: nil, sequence_no: 0, closed: false

Next, we define a record type called EndpointState that describes the current state of our endpoint. Note that for very simple GenServers, it’s probably fine to use a simple tuple for state, but at some point it becomes easier to manage if you create a record. Our state includes the SSL socket, a unique identifier, a sequence number for messages being sent to our back-end systems, and a flag to tell if, somehow, our process is still up but the SSL socket has been closed.

  def start_link(socket, id) do
    :gen_server.start_link({:global, id}, __MODULE__, {socket, id}, [])
  end

  def init({socket, id}) do
    { :ok, EndpointState.new(socket: socket, id: id) }
  end

Here, we’ve got our boilerplate “Start up my gen_server” code.

  def send(connection_id, message) do
    :gen_server.cast {:global, connection_id}, {:send, message}
  end

The send function is a simple wrapper around a call to :gen_server.cast as a convenience to consumers. It takes a connection ID (which is a UUID in our case) and a message to send back down the pipe. Next, we have our gen_server handling code:

  def handle_cast( {:start}, state) do
    :ssl.setopts(state.socket, [active: :once])
    {:noreply, state }
  end

Defining handle_cast functions that pattern match to certain messages is how your implement your behavior in an Erlang gen_server (part of the OTP library). This one (handling the :start message) is invoked by the TcpAcceptor way back in part 1, once it has transferred ownership of the SSL socket to this process. It tells the process that it can start doing things with the socket without crashing for accessing another process’s socket.
Note that we set the SSL socket to [active: :once] rather than [active: true]. The difference is important, as [active: true] can allow your process’s mailbox to be filled by TCP/IP messages if whatever is on the other end can send data to you very quickly. For more information, take a look at The Buckets of Sockets section of Learn you some Erlang for great good. Then, read all the rest of it for good measure.

  def handle_cast( {:send, message}, state) do
    unless state.closed do
      :ssl.send(state.socket,message)
    end
    {:noreply, state}
  end

This handle_cast matches our :send from before. It simply checks to see if our connection has been closed and, if not, sends the message down the SSL socket.

  def handle_info({:ssl, _, data}, state) do
    Owsla.RabbitProducer.send(state.id, data, state.sequence_no)
    :ssl.setopts(state.socket, [active: once])
    {:noreply, state.sequence_no(state.sequence_no + 1)}
  end

  def handle_info({:ssl_closed, _}, state) do
    IO.puts "Client closed socket - stopping connection #{state.id}"
    {:stop, :normal, state.closed(true)}
  end

  def handle_info({:ssl_error, _, reason}, state) do
    IO.puts "Error on socket.recv. Error was: #{reason}"
    IO.puts "Closing connection #{:uuid.to_string(state.id)}"
    {:stop, :error, state}
  end

The three handle_info functions above are all about handling asynchronous messages from the SSL socket. Because we have to be able to both send and receive data on this socket, we can’t just put a blocking :ssl.recv() call in a tight loop, so we need to receive our data asynchronously. We did that by setting the socket to active, as discussed above.

The first, {:ssl, _, data}, is called every time we get a line of data (remember our data is newline-separated – there are other options for breaking apart your data using gen_tcp if you need, for example, parsing of buffers based on the first N bytes of a packet containing the length of the rest of the packet). It simply forwards data to a RabbitMQ queue via the Owsla.RabbitProducer module/process, which is a wrapper around our fork of ExRabbit which uses the native Erlang rabbitmq client.

The other two handle_info functions deal with the SSL socket either closing “naturally” or having some kind of error. In either case, we want to stop the current process, so rather than return a tuple with :ok as the first item, we return a tuple with :stop. This signals to OTP that our process has completed and should be terminated. In one case, we return :normal as the second item in our tuple, to let our supervisor know that we shut down normally, and in the other we return :error so that OTP knows our process failed for some unexpected reason.

A quick note on Elixir defrecords – note that the first two handle_info functions transform their current state in some way – the {:ssl, _, data} handler returns:

  {:noreply, state.sequence_no(state.sequence_no + 1)}

the state.sequence_no(state.sequence_no + 1) part takes our existing state, creates a copy (as records are immutable) with all of the data the same except for the sequence_no field, which will be incremented by 1. The same pattern is used to set the closed field in the :ssl_closed handler. state.sequence_no(state.sequence_no + 1) is a much easier way to express EndpointState.new(socket: state.socket, id: state.id, sequence_no: state.sequence_no + 1)

Finally, we want to shut down our SSL connection whenever our process ends, so we define a terminate function to close it down before our process is removed by Erlang:

  def terminate(_reason, state) do
    IO.puts "Closing connection #{state.id}"
    :ssl.close(state.socket)
    :ok
  end

And that’s pretty much that. Note that I’ve walked you through a slightly older and simpler version of our TCP endpoint and, if you handled all of your business logic inside of the Endpoint process, this would probably be enough. Further, you would probably be able to ignore the sequence_no gymnastics we’re going through. However, the astute reader may have noticed that outbound messages are sent down the SSL connection in order of receipt in the process’ mailbox. In our case, those messages are being picked up by a pool of processes reading from a RabbitMQ queue, which means that messages could arrive out-of-order. Our current implementation handles this more gracefully, dealing with out-of-order messages and timeouts for when we somehow miss a message from RabbitMQ. However, the commensurate increase in complexity would make it much harder to describe in a blog post.

The final part of the solution is the RabbitConsumer, which picks up messages from our back-end system (currently still written in Ruby) and sends them back to the appropriate endpoint to be sent down the SSL connection, assuming it’s still around. But we can talk about that one another time.

Categories: Performance, Elixir Tags: , ,

Elixir and the Internet of Things – Handling a Stampede

January 13, 2014 4 comments
Stampede by t3rmin4t0r, on Flickr
Creative Commons Creative Commons Attribution 2.0 Generic License   by  t3rmin4t0r 

The Internet of Things is upon us, and being able to efficiently communicate with those things is going to become more and more difficult as more things get connected. I’ve been working as a consultant for Nexia Home Intelligence for the last year, and they were kind enough to let me blog a bit about what it is we’re doing to help them handle the large number of connected devices involved in running a hosted home automation solution.

TL;DR – Elixir/Erlang are Awesome

After trying out several Ruby-based solutions to the issues we faced handling the crush of 50k internet-connected devices, including EventMachine and Celluloid, running on MRI and jRuby, we spiked a solution on Elixir, which runs on the Erlang virtual machine. Leveraging the Erlang VM and out-of-the-box OTP frameworks, along with a few more advanced tricks, we’ve built a system that can easily handle our target workload.

Connections, Connections, Connections

There are lots of “things” in the world, and with IPv6 slowly gaining traction, many more of them will have an IP address (or at least internet connectivity of some kind). In many cases, the communication with these devices may be mostly one-way, where the device simply reports information back to a central server somewhere. In those cases, a simple HTTP-based API may well be able to handle the communications needs of your devices, and normal scaling suggestions for web servers will probably serve you just fine. However, there are certain classes of devices (locks, cameras, and thermostats, for example), that require bidirectional, near-real-time communication, and it is this kind of device that we’re dealing with.

In the Beginning…

Our original implementation leveraged Ruby’s EventMachine, which is an evented I/O system for Ruby (similar to Node.js, if that helps). Devices would connect directly to the Ruby TCP/IP server via SSL, and communicate with our back-end Rails-based system via Resque. Outbound data from the Rails app was sent to RabbitMQ, with a header identifying the specific device to which the message was to be directed. The device servers all subscribed to the RabbitMQ exchange, and whichever one held the connection to the device would send out the results. This system held up well under stead-state load in the 3-6000 connection range. However, there were some issues…

The Stampede

There are several kinds of problems you’ll run into when you try to build a system that can handle 10s to 100s of thousands of simultaneous TCP/IP connections. The first one is “how do I handle all of these things connecting in rapid succession.” This was, in fact, the original impetus for investigating splitting the TCP/IP handling side of things from the “Business Logic” side. And, because all problems in Computer Science can be solved with an additional level of indirection, we decided to add an additional layer in front of the business logic to simply terminate SSL connections and route raw data to a pool of workers, which would handle the business logic and communication with the back-end. Given we are familiar with EventMachine, it seemed to be a no-brainer to simply split the original implementation in two, with some additional logic to handle things like message ordering now that data flowing to/from the device was transiting via a second hop over RabbitMQ instead of the more direct path from before. So that’s what we did.

Houston, we have a problem…

312px-Apollo13_-_SM_after_separation

Apollo13 – view of the crippled Service Module after separation. NASA image AS13-59-8500

We quickly found that the new system was unable to keep up with the load of even 5,000 devices, much less our goal of something closer to 50,000 devices on a single machine. Things were not looking good. Using rubyprof, we found (and fixed) several performance-related issues with Ruby’s amqp-gem, which made some significant performance improvements. However, at this point we were CPU-bound with no obvious single bottleneck left to go after to further improve performance. It appeared that the single-threaded nature of EventMachine, along with some additional amqp-gem related performance issues (even when load-balancing against many instances on a multi-core machine), were going to sink this implementation.

A Side-Track Through Celluloid.io

Given our core application was a Ruby on Rails application, we really wanted to stay on Ruby for this solution, so we spent some time spiking a solution on Celluloid.io to see if it’s actor-based, multi-threaded model (and using jRuby as our underlying ruby implementation) would help resolve our issues. Running on jRuby, and using the march-hare gem, which is a lightweight wrapper around the java-based AMQP client, we hoped to be able to hit our targets.  And, while I must admit that we abandoned this effort without a significant amount of profiling time, it only got us to about 15k connections before it fell over, mostly due to huge memory consumption (on the order of 30 GB). My understanding (learned much later) is that this may indicate that we did something with Celluloid that we shouldn’t have, but we didn’t have time to continue down this route to try to fix the memory leak at this time.

Elixir to the Rescue

Finally, we felt it was necessary to try something a bit more drastic. Having been introduced to Elixir a few months ago, and understanding that the Erlang runtime was designed in large part for this kind of problem, we approached the team at Nexia with the suggestion that we take a few weeks and spike a solution using Elixir. We chose Elixir over Erlang because it’s syntax was much closer to the Ruby that the developers of the existing Rails application, which should make transitioning this work to their core team easier. So, we started to learn more about Elixir, Erlang, and the OTP framework that promised to help us build a scalable, robust system that could, conceivably, provide 99.9999999% uptime (ok, maybe we’re not that good, but Erlang can be).

A quick note – Elixir is a great language on top of the amazing Erlang runtime, and the OTP framework and libraries really provide most of the features we’re leveraging for this application, so you can mostly replace Elixir with Erlang in this post, and I’ll try to call out Elixir-specific stuff if there is any.

Welcoming the Herd With Open Arms…

or, how do you accept 1000 connections per second on a single machine, with one OS process. Most of the example TCP/IP (or SSL, in our case) server examples you find on the internet generally do something like:

  1. Create a listen socket
  2. block on that socket until a client connects (accept)
  3. hand off the newly-accepted socket to some other process
  4. Goto 1

In Elixir, this would look something like this tail-recursive call that implements our loop above for an SSL-based connection:

defp do_listen(listen_socket) do
  {:ok, socket} = :ssl.transport_accept(listen_socket)
  :ok = :ssl.ssl_accept(socket)
  endpoint = TcpSupervisor.start_endpoint(socket)
  :ssl.controlling_process(socket, endpoint)
  :gen_server.cast endpoint, {:start}
  do_listen(listen_socket)
end

Notice that you’ve now single-threaded your application’s ability to accept new connections, which will eventually cause the operating system to simply refuse new connections on your listening port if you can’t keep up with accepting them in a timely manner. There are some things you can tweak to get more time (especially, increasing the listen backlog for your service to allow more pending connections), but eventually you’re going to have to do something about that single listener. In Elixir, the answer is to spin up multiple “acceptor” processes, each of which blocks on the same listen port (yes, you can do this!). When a new connection arrives, it will awake the next available waiting process and that process will handle that connection. This pattern has allowed us to get to 1000 connections/second on a single server quite easily (and we haven’t really found out what the upper limit was). The code is obviously a bit more complex. First, we have a supervisor that owns the listen socket, and its children will be the acceptor processes:

defmodule Owsla.TcpListenerSupervisor do
  use Supervisor.Behaviour
 
  def start_link(port, acceptor_count, backlog) do
    :supervisor.start_link({ :local, :listener_sup}, __MODULE__, [port, acceptor_count, backlog])
  end
 
  def init([port, acceptor_count, backlog]) do
    :ssl.start()
    {:ok, listen_socket} = create_listen_socket(port, backlog)
    spawn(fn -> 
      Enum.each(1..acceptor_count,
        fn (_) -> start_listener() end
        )
      end)
      tree = [ worker(Owsla.TcpAcceptor, [listen_socket], restart: :permanent) ]
      supervise(tree, strategy: :simple_one_for_one)
  end
 
  def create_listen_socket(port, backlog) do
     tcp_options = [
      :binary,
      {:packet, :line}, 
      {:reuseaddr, true}, 
      {:active, false},
      {:backlog, backlog}
      ]
    :gen_tcp.listen(port, tcp_options)
  end
 
  def start_listener() do
    :supervisor.start_child(:listener_sup, [])
  end 
end

Next, we have the acceptors themselves:

defmodule Owsla.TcpAcceptor do
  use GenServer.Behaviour
 
  @ssl_options  [{:certfile, "deviceserver.crt"}, {:keyfile, "deviceserver.key"},
      {:ciphers, [{:dhe_rsa,:aes_256_cbc,:sha256},
           {:dhe_dss,:aes_256_cbc,:sha256},
           {:rsa,:aes_256_cbc,:sha256},
           {:dhe_rsa,:aes_128_cbc,:sha256},
           {:dhe_dss,:aes_128_cbc,:sha256},
           {:rsa,:aes_128_cbc,:sha256},
           {:dhe_rsa,:aes_256_cbc,:sha},
           {:dhe_dss,:aes_256_cbc,:sha},
           {:rsa,:aes_256_cbc,:sha},
           {:dhe_rsa,:'3des_ede_cbc',:sha},
           {:dhe_dss,:'3des_ede_cbc',:sha},
           {:rsa,:'3des_ede_cbc',:sha},
           {:dhe_rsa,:aes_128_cbc,:sha},
           {:dhe_dss,:aes_128_cbc,:sha},
           {:rsa,:aes_128_cbc,:sha},
           {:rsa,:rc4_128,:sha},
           {:rsa,:rc4_128,:md5},
           {:dhe_rsa,:des_cbc,:sha},
           {:rsa,:des_cbc,:sha} 
          ]}]
 
  def start_link(listen_socket) do
    :gen_server.start_link(__MODULE__, listen_socket, [])
  end
 
  def init(listen_socket) do
    # Setting the process priority to high /seems/ to improve performance of
    # incoming connection rate, but it also /seems/ to slow down processing
    # of messages. For now, we punt and leave the priority at the default setting.
    #Process.flag(:priority, :high)
    :gen_server.cast self, {:listen}
    {:ok, listen_socket }
  end
 
  def handle_cast( {:listen}, listen_socket) do
    do_listen(listen_socket)
  end
   
  defp do_listen(listen_socket) do
    case :gen_tcp.accept(listen_socket) do
      {:ok, socket} ->
        case :ssl.ssl_accept(socket, @ssl_options) do
          {:ok, ssl_socket} ->
            endpoint = Owsla.TcpSupervisor.start_endpoint(ssl_socket)
            :ssl.controlling_process(ssl_socket, endpoint)
            :gen_server.cast endpoint, {:start}
            do_listen(listen_socket)
          {:error, :closed} ->
            do_listen(listen_socket)
        end
      {:error, :closed} -> do_listen(listen_socket)
      {:error, _} -> { :stop, :error, [] }
    end
  end
end

Lines 48-51 above

  1. start up a new process to handle the individual TCP/IP connection (the TcpSupervisor.start_endpoint call)
  2. Transfers control of the SSL connection to that process (this is an Erlang thing)
  3. Starts up the endpoint listening for messages on its connection
  4. and then, just like before, does a tail-recursive call to listen again.

However, now we have 1000 of these running at a time, with a TCP/IP backlog of 2000 connections, and we have no issue handling 1000 connections/second. Note that we also haven’t tweaked those numbers at all – this was our first guess, and it “just worked” so we left it alone (but configurable). It’s possible these are non-optimal, YMMV, IANAL, etc.
From there, the individual endpoint processes forward messages across RabbitMQ to our back-end systems. There were some additional challenges there, which I’ll talk about in another post.

Follow

Get every new post delivered to your Inbox.

Join 341 other followers