IoT Day Norway talk on Elixir and the Internet of Things – 2014

October 7, 2015 Leave a comment

So I can find it more quickly later, my IoT talk from NDC’s IoT Day in Norway is available. This version incorporates some of the conversation Joe Armstrong started in my Strange Loop talk into the presentation itself.


Categories: Uncategorized

OS X Yosemite and Max Open Files

October 20, 2014 6 comments


My colleagues at Basho have a better solution to this, which also works with launchd. Use theirs instead of mine!



A quick note on Ulimits and OS X Yosemite. If you need to increase your ulimit for max open files, some of the old tricks (especially launchctl.conf) no longer work. You’ll need to create a new file,/etc/sysctl.conf if it doesn’t already exist, and add the following lines:


`kern.maxfiles` determines the maximum number of files available to be open across the entire system. `kern.maxfilesperproc`, not surprisingly, sets the limit for a single process.

Save the file, and reboot for good measure, and you should now be able to set a per process max open files greater than the default 10240. Unfortunately, /etc/launchd.conf appears to be ignored now due to 'security concerns,' so you'll have to add a line to your .bashrc or .zshrc (or your .rc) to up your ulimit if you don't want to do it every time you start up whatever program you have that needs it:

ulimit -n 32768

<br />As I'm working on Riak, which requires many more open files than the defaults. Per the Riak Docs, I've set mine to:


This should support an 8-node cluster’s worth of open files, which is way more than I’ll ever actually run on this machine. Mind you, if you’re attempting to run a very large cluster, or a cluster with many, many, keys, you may need to tune this further. In other words, YMMV.

Categories: Mac OS X, Riak Tags: , ,

Strangeloop Elixir and the IoT talk video available

October 20, 2014 Leave a comment

I love Strangeloop’s videos, as they capture the screen separately from the speaker and then merge the two together nicely. My talk on Elixir and the Internet of Things from Strangeloop 2014 is available:

Categories: Uncategorized

Video from my Dayton Elixir talk on IoT available.

August 19, 2014 Leave a comment

For those who would like to hear my talk on Elixir and the Internet of Things, it’s available on YouTube:

Categories: Uncategorized

A Gentle Introduction to OTP

April 24, 2014 Leave a comment

Supervisors, Workers, and Trees – An Overview

Open Telecom Platform, or OTP is a group of Erlang libraries that help implement distributed, fault-tolerant systems. Because of the patterns that are encouraged by the OTP libraries, these systems tend to be built of lots of small, independent processes that are easy to reason about and manage. And while the name implies that they are only useful for Telecom applications, many types of applications can be built using OTP.

At its simplest, an OTP application, at runtime, is nothing but a tree of Erlang processes. There are two fundamental kinds of OTP processes (implemented as Erlang behaviors). The first is the Supervisor, the job of which is to start up its children manage their lifecycle. The other is the Worker (of which there are several sub-types). Workers, as the name implies, do the work of the system.

I ended my third installment on Elixir and the Internet of Things with a picture of the Supervisor Tree we’ve built for our SSL RabbitMQ endpoint host. As a reminder, it looks like this:

Owsla Supervisor Tree

For those of you who are familiar with OTP, this diagram probably already makes sense. If you’re not, follow along and I’ll try to make some sense of this slightly different version of a typical “circles, boxes, and lines” diagram.


Supervisors do nothing but start, monitor, and (potentially) restart their children. While this sounds like a small thing, supervisors are, in may ways, what give OTP applications their reputation for stability and resilience in the face of errors. I’ll go into more details about supervisors, supervision strategies, and restart settings in a later post, but for now understand that each supervisor can be configured to do start up different kinds of children and react differently when those children die.


As their name implies, OTP workers perform the actual work of the application. There are three generic worker behaviors provided by the OTP libraries, the gen_server, gen_fsm, and gen_event, each of which is implemented as a behavior in Erlang parlance.

gen_server (Generic Server)

The most generic of the three is the gen_server behavior. This behavior implements a typical Erlang event loop and provides callbacks for a generic server. However, it doesn’t inherently implement any sort of networking server (as its name may imply) beyond what any Erlang process provides. It’s really a host for fairly generic business/communications logic. You may, however, use this to create things like our TcpEndoint from the Owsla project, where the generic gen_server happens to “own” a TCP connection and receives and reacts to events from that connection. Rather than repeat myself, I’ll refer you to the 2nd part of my series on Elixir and the Internet of Things, where I break down our TcpEndpoint worker in great detail.

gen_fsm (the Finite State Machine)

Does anyone remember what Finite State Machines (FSMs) are? You know, those little circle->arrow->circle diagrams that turn into regular expressions, or something like that? At least, that’s what I remember from my CS class in college. Seriously though, FSMs are useful for situations where you have logic that maps to a series of states and transitions. A small example from the wikipedia article on Finite State Machines will give you the idea of what I’m talking about. This one models a simple coin-operated turnstyle:

Coin Operated Turnstyle FSM

These kinds of workers are often used to model real-world devices (like the above-mentioned turnstyle).


	-export([init/1, locked/2, unlocked/2]).

	start_link() -&gt;
	    gen_fsm:start_link({local, turnstyle}, tunrstyle, {}, []).

	coin() -&gt;
	    gen_fsm:send_event(turnstyle, {}).

	init() -&gt;
	    {ok, locked, {[]}}.

	locked({coin}, {}) -&gt;
	       {next_state, unlocked, {[], }, 30000}.

	locked({push}, {}) -&gt;
	       {next_state, locked, {[], }, 30000}.
	unlocked(timeout, {}) -&gt;
	    {next_state, locked, State}.
    unlocked({push}, {}) -&gt;
    	{next_state, locked, State}.

You may have noticed that this is straight-up Erlang code. This is because, as of Elixir 0.12.5, the GenFSM.Behavior module was deprecated, and it was removed in 0.13. The reason given is that there are easier and more idiomatic ways in Erlang to implmement a finite state machine, and that those solutions (like) plain_fsm probably provide a better basis for a Finite State Machine implementation in Elixir. Also, please pardon what I am sure are the many typos in my Erlang code (I compiled it in my head and it seemed to work fine).

gen_event (Event Handlers)

Event Handlers, as implemented by the gen_event behavior, work in partnership with an event manager to separate the responsibility of raising events and handling them in different ways. An event manager may have zero, one, or several event handlers registered, and each will respond to an event sent to its event manager. Let’s implement the console logger from the erlang docs in Elixir:

	defmodule ConsoleLogger do
		use GenEvent.Behaviour
		def init(_) do
			{ :ok, [] }
		def handle_event(error, state) do
			IO.puts &quot;***ERROR*** #{error}&quot;
			{:ok, state}

Now, you can start up a generic “event manager”, register our handler with it, and send it events:

	# start up our event manager
	:gen_event.start({:local, :error_manager})
	# add our handler
	:gen_event.add_handler(:error_manager, ConsoleLogger, [])
	# Now, send an event
	:gen_event.notify(:error_manager, :error)

which would result in an IEX session something like this:

	iex(2)&gt; :gen_event.start({:local, :error_manager})
	{:ok, #PID&lt;0.59.0&gt;}
	iex(3)&gt; :gen_event.add_handler(:error_manager, ConsoleLogger, [])
	iex(4)&gt; :gen_event.notify(:error_manager, :error)
	***ERROR*** error

Note that you can register additional event handlers, each of which will be notified whenever an event is sent to the event manager. This is great when you want to separate out different concerns (logging to the console, log files, database tables, and, for example) and simply register the event handler(s) you need at the time.


This only scratches the surface of OTP, but hopefully it will give you a starting point for understanding how some of the moving parts fit together. In a future post, I’ll go into more detail on building OTP applications, supervisor trees and strategies, why our system kept imploding every time we disconnected more than 5 TCP/IP connections in a short period of time, and how you can avoid having the same thing happen to you.

Categories: Elixir, Erlang

Elixir and the Internet of Things (Part 3) – Rabbits, Rabbits Everywhere.

March 6, 2014 1 comment

(Cross-posted from, somewhat belatedly)

In honor of Erlang Factory 2014 (where I’ll be soaking much Elixir and Erlang knowledge today and tomorrow), I’d like to present the 3rd and final installment in my series on using Elixir when dealing with the Internet of Things.

In part 1 of this series, I discussed the stampede of connections issue we faced when working with Nexia Home on their home automation system, and what (Ruby) solutions we tried before landing on Elixir. Additionally, I explained how we backed into implementing an acceptor pool in order to handle 1000 new SSL connections/second from home automation bridges and other devices.

In part 2, I discussed the implementation of our TcpEndpoint process, and how it routes incoming, line-based TCP/IP protocols to a RabbitMQ queue for processing by our back-end systems, but I left the implementation of our RabbitProducer and RabbitConsumer for later.

So it appears we’re at now now.

Let’s start by taking a look at the Rabbit Producer. We’re using the ExRabbit Elixir rabbit client (forked from). The producer is a single Erlang process that simply pushes data to RabbitMQ. So far, it seems that this is fast enough to handle many thousands of messages per second, so we haven’t done any additional tweaking of this code.

     defmodule Owsla.RabbitProducer do
      use GenServer.Behaviour

      defrecord RabbitProducerState, amqp: nil, channel: nil, config: nil

      def start_link(config) do
        :gen_server.start_link({ :local, :producer }, __MODULE__, config, [])

      def init(config) do
        amqp = Exrabbit.Utils.connect(host: config.rabbit_host,
                                      port: config.rabbit_port,
                                      username: config.rabbit_user,
                                      password: config.rabbit_pass)

        channel = amqp
        queue = Exrabbit.Utils.declare_queue(channel, config.from_device, auto_delete: false, durable: true)

        {:ok, amqp, channel: channel, config: config) }

      def send(connection_id, message, from_device_seq) do
        :gen_server.cast(:producer, {:send, connection_id, message, from_device_seq})

      def handle_cast({:send, connection_id, message, from_device_seq}, state ) do
        props = [headers: [{"connection_id", :longstr, connection_id}, {state.config.from_device_seq, :long, from_device_seq}]]
        Exrabbit.Utils.publish(, "", state.config.from_device, message, props)
        {:noreply, state }

      def terminate(reason, state) do
        IO.puts "Rabbit producer terminating. Reason was:"
        IO.inspect reason

In init, we create a rabbitmq channel, declare our queue, and return a new record that stores our amqp connection and channel, along with some configuration information. A call to the send helper function will send a message to the locally registered producer process with the relevant data. Note that the TcpEndpoint maintains an outbound and inbound message sequence, and the from_device_seq parameter to send is that sequence number.

The single handle_cast receives the send message, adds two message headers for the connection ID and the previously-mentioned sequence number, and then publishes the message to a queue named in our configuration object.

There’s nothing really magical going on here, and the RabbitConsumer isn’t much different. However, we are leveraging another part of ExRabbit for the RabbitConsumer, which is the ExRabbit.Subscriber module.

    defmodule Owsla.RabbitConsumer do
      use Exrabbit.Subscriber

      def start_link(config) do
                          [host: config.rabbit_host,
                           port: config.rabbit_port,
                           username: config.rabbit_user,
                           password: config.rabbit_pass,
                           queue: config.to_device],

      def handle_message(msg, state) do
        case parse_message_with_props(msg) do
          nil -> nil
          {tag, payload, props} ->
            conid = get_header(props.headers, "connection_id")
            to_device_seq = get_header(props.headers, "to_device_seq")
            Owsla.TcpEndpoint.send(conid, to_device_seq, payload)
            ack state[:channel], tag

      def get_header(nil, _key), do: nil
      def get_header(:undefined, _key), do: nil
      def get_header([], _key), do: nil
      def get_header(headers, key) do
        [head | rest] = headers
        case head do
          {^key, _, value} -> value
          {_, _, _} -> get_header(rest, key)

So, we start up our RabbitConsumer with the same kinds of configuration data we used for the RabbitProducer in the start_link function. The ExRabbit.Subscriber module does most of the hard work of subscribing to the queue we provided in the start_link call, and all we have to worry about is reacting to the handle_message function, which is called for us whenever we receive a message. Unfortunately, the headers property of the message properties is a list of tuples in the form of {key, type, value}, and not something useful like a HashDict, so we have some utility methods to walk the list and find the headers we need (namely, the connection ID and the sequence number from the incoming message) and then we just call the TcpEndpoint.send function.

If you remember from part 1, whenever we create a new TcpEndpoint process we register it’s PID in the global registry, which allows us to easily find the connection regardless of what node in our cluster it’s on. Note that we start up 8 (configurable) RabbitConsumers, and do some message ordering magic in the TcpEndpoint to make sure we’re delivering messages to the devices in the correct order.

And that’s pretty much it, round-trip from device -> backend -> device. For those who have been following along, our supervisor tree ends up looking like this:

Owsla Supervisor Tree

Categories: Elixir, Performance

Elixir and the Internet of Things (Part 2) – What’s in a Process?

January 30, 2014 2 comments

In my previous installment I discussed how our application is required to accept up to 1000 new TCP/IP connections per second, handle upwards of 40,000 simultaneous TCP/IP connections, and the kinds of things we tried to get that working under Ruby/JRuby, and how we eventually decided to try Elixir and Erlang to solve the problem. If you haven’t read part 1, I’d suggest reading it now. Go ahead… I’ll wait…

So, now that we’ve accepted 40,000 connections talking over SSL, what are we going to do with them? That was our next challenge.

TL;DR – (Erlang) Processes FTW

So each time we called Owsla.TcpSupervisor.start_endpoint(ssl_socket) in part 1, we created a new Erlang process for that SSL connection. That process has two responsibilities. First, it handles reading data from the socket (which is generally text-based and line-oriented) and putting it, along with some routing information, onto a RabbitMQ queue. Second, it can receive messages from any Erlang process in our cluster and send them back to the device.

40,000 Processes — is this guy nuts?

No, I’m not nuts – I’m using Erlang. The Erlang programming language and runtime have their own version of a “process” which is much lighter weight than an OS Thread or process, which makes creating new processes very inexpensive and allows the VM to run 10s (if not 100s) of thousands of processes in a very efficient manner. Native threads on most modern OSes take up to several megabytes of memory for their initial stack, while Erlang processes take almost no memory to start up, and dynamically allocate stack as necessary.

The VM also has its own scheduler separate from the OS scheduler, which helps keep expensive OS-level context switching at bay. Interestingly, Erlang’s scheduler appears to be preemptive and not cooperative like many other “non-OS-thread concurrency solutions” which require the programmer to yield back time to the runtime cooperatively, and allow for tight, long-running loops to potentially hog CPU time without end.

The TcpEndpoint

So, what does this Endpoint look like anyway? Let’s walk through it and dissect some things.

defmodule Owsla.TcpEndpoint do
  use GenServer.Behaviour

We start by defining our module, and ‘importing’ Elixir’s GenServer.Behavior which tells OTP that this module is a gen_server.

  defrecord EndpointState, socket: nil, id: nil, sequence_no: 0, closed: false

Next, we define a record type called EndpointState that describes the current state of our endpoint. Note that for very simple GenServers, it’s probably fine to use a simple tuple for state, but at some point it becomes easier to manage if you create a record. Our state includes the SSL socket, a unique identifier, a sequence number for messages being sent to our back-end systems, and a flag to tell if, somehow, our process is still up but the SSL socket has been closed.

  def start_link(socket, id) do
    :gen_server.start_link({:global, id}, __MODULE__, {socket, id}, [])

  def init({socket, id}) do
    { :ok, socket, id: id) }

Here, we’ve got our boilerplate “Start up my gen_server” code.

  def send(connection_id, message) do
    :gen_server.cast {:global, connection_id}, {:send, message}

The send function is a simple wrapper around a call to :gen_server.cast as a convenience to consumers. It takes a connection ID (which is a UUID in our case) and a message to send back down the pipe. Next, we have our gen_server handling code:

  def handle_cast( {:start}, state) do
    :ssl.setopts(state.socket, [active: :once])
    {:noreply, state }

Defining handle_cast functions that pattern match to certain messages is how your implement your behavior in an Erlang gen_server (part of the OTP library). This one (handling the :start message) is invoked by the TcpAcceptor way back in part 1, once it has transferred ownership of the SSL socket to this process. It tells the process that it can start doing things with the socket without crashing for accessing another process’s socket.
Note that we set the SSL socket to [active: :once] rather than [active: true]. The difference is important, as [active: true] can allow your process’s mailbox to be filled by TCP/IP messages if whatever is on the other end can send data to you very quickly. For more information, take a look at The Buckets of Sockets section of Learn you some Erlang for great good. Then, read all the rest of it for good measure.

  def handle_cast( {:send, message}, state) do
    unless state.closed do
    {:noreply, state}

This handle_cast matches our :send from before. It simply checks to see if our connection has been closed and, if not, sends the message down the SSL socket.

  def handle_info({:ssl, _, data}, state) do
    Owsla.RabbitProducer.send(, data, state.sequence_no)
    :ssl.setopts(state.socket, [active: once])
    {:noreply, state.sequence_no(state.sequence_no + 1)}

  def handle_info({:ssl_closed, _}, state) do
    IO.puts "Client closed socket - stopping connection #{}"
    {:stop, :normal, state.closed(true)}

  def handle_info({:ssl_error, _, reason}, state) do
    IO.puts "Error on socket.recv. Error was: #{reason}"
    IO.puts "Closing connection #{:uuid.to_string(}"
    {:stop, :error, state}

The three handle_info functions above are all about handling asynchronous messages from the SSL socket. Because we have to be able to both send and receive data on this socket, we can’t just put a blocking :ssl.recv() call in a tight loop, so we need to receive our data asynchronously. We did that by setting the socket to active, as discussed above.

The first, {:ssl, _, data}, is called every time we get a line of data (remember our data is newline-separated – there are other options for breaking apart your data using gen_tcp if you need, for example, parsing of buffers based on the first N bytes of a packet containing the length of the rest of the packet). It simply forwards data to a RabbitMQ queue via the Owsla.RabbitProducer module/process, which is a wrapper around our fork of ExRabbit which uses the native Erlang rabbitmq client.

The other two handle_info functions deal with the SSL socket either closing “naturally” or having some kind of error. In either case, we want to stop the current process, so rather than return a tuple with :ok as the first item, we return a tuple with :stop. This signals to OTP that our process has completed and should be terminated. In one case, we return :normal as the second item in our tuple, to let our supervisor know that we shut down normally, and in the other we return :error so that OTP knows our process failed for some unexpected reason.

A quick note on Elixir defrecords – note that the first two handle_info functions transform their current state in some way – the {:ssl, _, data} handler returns:

  {:noreply, state.sequence_no(state.sequence_no + 1)}

the state.sequence_no(state.sequence_no + 1) part takes our existing state, creates a copy (as records are immutable) with all of the data the same except for the sequence_no field, which will be incremented by 1. The same pattern is used to set the closed field in the :ssl_closed handler. state.sequence_no(state.sequence_no + 1) is a much easier way to express state.socket, id:, sequence_no: state.sequence_no + 1)

Finally, we want to shut down our SSL connection whenever our process ends, so we define a terminate function to close it down before our process is removed by Erlang:

  def terminate(_reason, state) do
    IO.puts "Closing connection #{}"

And that’s pretty much that. Note that I’ve walked you through a slightly older and simpler version of our TCP endpoint and, if you handled all of your business logic inside of the Endpoint process, this would probably be enough. Further, you would probably be able to ignore the sequence_no gymnastics we’re going through. However, the astute reader may have noticed that outbound messages are sent down the SSL connection in order of receipt in the process’ mailbox. In our case, those messages are being picked up by a pool of processes reading from a RabbitMQ queue, which means that messages could arrive out-of-order. Our current implementation handles this more gracefully, dealing with out-of-order messages and timeouts for when we somehow miss a message from RabbitMQ. However, the commensurate increase in complexity would make it much harder to describe in a blog post.

The final part of the solution is the RabbitConsumer, which picks up messages from our back-end system (currently still written in Ruby) and sends them back to the appropriate endpoint to be sent down the SSL connection, assuming it’s still around. But we can talk about that one another time.

Categories: Elixir, Performance Tags: , ,

Get every new post delivered to your Inbox.

Join 430 other followers