Getting started with Avro and Python 3

At Authentise we do streaming. We stream instructions down to 3D printers, we get streaming telemetry data back. In the beginning we used to do streaming with C structs - just serialize the binary representation over the wire, unpack on the other side, off you go. This is a bad idea for several reasons. One, it's hard to debug with something like a wire capture. You must work at the application level to figure out what's going on. Two, it can be extremely hard to update your protocol without doing both sides at once which gets harder the larger your install base gets. You want your install base to get bigger, right? Three, as your structures become more complex the system breaks down. You can't directly serialize any pointers as they only have meaning on the host system, so you have to start dereferencing and creating symbolic links.

We eventually decided to move away from C structs and switched instead to a custom protocol that used UTF-8 text over the wire for each of the message metadata types. Each side knew the other sides command set and was coded to ignore commands that they didn't understand to deal with version mismatch. We had to get a lot more careful about how we encoded binary data, but this was quite doable. This lasted for a while. Eventually we moved on to something better.

Now we use Avro. Avro is really cool, it's in the realm of ProtoBufs or Thrift (which are both cool) but it doesn't require a pre-compile step where you do code generation. We're a Python shop, so we really prefer not to have any code generation. It just gums up our groove.

You can get started with Avro by pip installing it

pip install avro-python3

If you're using Python 2 just install Python 3 already. It's been years. It's time to join us in the future.

Avro has some built-in functions for creating an HTTP server. We're not going to use those. We want raw sockets. That's primarily because in a streaming case we don't use HTTP. Also, it would take me longer to read up on how to use it. Let's start with a basic client/server system

Server

import socket

def handle_client(connection, address):
    data = connection.recv(1024)
    print("Received", data)

def main():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('127.0.0.1', 12345))
    sock.listen(10)

    while True:
        conn, addr = sock.accept()
        handle_client(conn, addr)
        conn.close()

if __name__ == '__main__':
    main()

Client

import socket

def send_message(connection):
    connection.send("Hello World")

def main():
    connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    connection.connect(('127.0.0.1', 12345))
    send_message(connection)

if __name__ == '__main__':
    main()

This client/server pair is very simple. The server just listens on port 12345 for a connection. When it gets one it receives up to 1024 bytes of data and prints out whatever it got. The client connects on port 12345 to the localhost and sends 'Hello World'. It's a great starter script for working with sockets.

Alright, so, what's our goal? Well, we want to end up with the ability to send a simple Avro buffer across the wire that we can deserialize on the other end. Avro includes the schema with every message so neither side has to agree on the exact schema (though they must have an understanding of a protocol to even communicate).

Let's create a simple Avro schema that we can use to define a message

{
 "namespace"    : "example.avro",
 "type"         : "record",
 "name"         : "User",
 "fields"       : [
     {"name": "name"            , "type": "string"},
     {"name": "favorite_number" , "type": ["int", "null"]},
     {"name": "favorite_color"  , "type": ["string", "null"]}
 ]
}

This defines a schema for a 'User'. In Avro parlance a record is a complex type containing fields with subtypes. Sort of like a JSON object or a C struct. The User record has three fields, name, favorite_number and favorite_color. These fields are typed (int and string, respectively).

In this example we're going to have the client send the record to the server and the server will print it out. This will follow along with our basic structure we have already of the client sending UTF-8 string data and the server printing it out.

In order to send an Avro message we need to set up the structures to read in the schema and to use the schema to write a message. They look roughly like this:

import io
import avro.datafile
import avro.io

SCHEMA = ...
def send_message(connection, message):
    buf = io.BytesIO()
    writer = avro.datafile.DataFileWriter(buf, avro.io.DatumWriter(), SCHEMA)
    writer.append(message)
    writer.flush()
    buf.seek(0)
    message = buf.read()
    ...

For now let's ignore what SCHEMA is. I'll get back to it. Assume it looks kind of like the schema I introduced above.

What we're doing here is creating a Python 3 buffer object. We hand that to a data file writer (which expects a file-like object) along with a DatumWriter and our schema. This tells the DataFileWriter how to write messages to the buffer. After we've added the message we flush the writer which ensures that it has sync'd all of its data into the buffer. We then roll the buffer back to the beginning and read from it which should contain our binary data

There may be a better way to write this where I can get Avro's Python bindings to build its own buffers internally. Sadly, I couldn't find anything so I left it this way, which totally works and should be sufficiently performant. It's just a bit ugly.

Now, I said I'd get back to the schema. Avro expects the schema to be JSON. You could write your schema as JSON files and have Avro parse them directly. That's not what I'm going to do. I'm going to create the JSON schema inline from Python structures. This has the benefit of not requiring extra files or dumping a string blob in my source files. Depending on how large your schema grows you may want to consider different options.

import json
import avro.ipc
import avro.schema

SCHEMA = avro.schema.Parse(json.dumps({
 "namespace"    : "example.avro",
 "type"         : "record",
 "name"         : "User",
 "fields"       : [
     {"name": "name"            , "type": "string"},
     {"name": "favorite_number" , "type": ["int", "null"]},
     {"name": "favorite_color"  , "type": ["string", "null"]}
 ]
}))

Let's put it all together into a new client script so you can see how it all works.

import io
import json
import socket
import avro.datafile
import avro.schema
import avro.io
import avro.ipc

SCHEMA = avro.schema.Parse(json.dumps({
 "namespace"    : "example.avro",
 "type"         : "record",
 "name"         : "User",
 "fields"       : [
     {"name": "name"            , "type": "string"},
     {"name": "favorite_number" , "type": ["int", "null"]},
     {"name": "favorite_color"  , "type": ["string", "null"]}
 ]
}))

def send_message(connection, message):
    buf = io.BytesIO()
    writer = avro.datafile.DataFileWriter(buf, avro.io.DatumWriter(), SCHEMA)
    writer.append(message)
    writer.flush()
    buf.seek(0)
    data = buf.read()
    connection.send(data)

def main():
    connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    connection.connect(('127.0.0.1', 12345))
    send_message(connection, {'name': 'Eli', 'favorite_number': 42, 'favorite_color': 'black'})

if __name__ == '__main__':
    main()

Alright, so, let's run this! We start our server and then our client. Server says:

('Received', 'Obj\x01\x04\x16avro.schema\xba\x03{"fields": [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, {"type": ["string", "null"], "name": "favorite_color"}], "namespace": "example.avro", "type": "record", "name": "User"}\x14avro.codec\x08null\x00-\x0c\xbaB\x12\xff\xe2\x10go\xa9G\xefk\xf6\x16\x02\x1a\x06Eli\x00T\x00\nblack-\x0c\xbaB\x12\xff\xe2\x10go\xa9G\xefk\xf6\x16')

Neat. You can see in the beginning some binary data that indicates some information about the schema, then the schema itself and finally the record that we pushed across. More or less. Squint at it.

Alright, so we're halfway there. We can send across an Avro message. But how do we get the server to interpret it? All we need to do is essentially the reverse of the same process. We'll jump straight to the whole script as the changes are pretty minor

import avro.datafile
import avro.io
import io
import socket

def handle_client(connection, address):
    data = connection.recv(1024)
    message_buf = io.BytesIO(data)
    reader = avro.datafile.DataFileReader(message_buf, avro.io.DatumReader())
    for thing in reader:
        print(thing)
    reader.close()

def main():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('127.0.0.1', 12345))
    sock.listen(10)

    while True:
        conn, addr = sock.accept()
        handle_client(conn, addr)
        conn.close()

if __name__ == '__main__':
    main()

So now instead of receiving up to 1024 bytes and immediately printing it (and assuming they'll be UTF-8 that our console can display) we are instead putting them into a buffer. We then hand that buffer to Avro to read. We get back an interable that contains a sequence of messages. We just print them out and clean up and we're done.

Does it work? Sure!

Server output

{'favorite_color': 'black', 'favorite_number': 42, 'name': 'Eli'}

Great, life is good. Let's send a bunch of messages! We'll just add this to our client...

import random

def main():
    ...
    names = ["Nick", "Scott", "Josh", "Anusha", "Eli"]
    colors = ["red", "green", "blue", "black", "fuscia"]
    for _ in range(20):
        send_message(connection, {
            'name': random.choice(names),
            'favorite_number': random.randint(0, 100),
            'favorite_color': random.choice(colors)
        })

This should send 20 random messages to our server about members of our team and their preferences. Running it on the client we find:

Traceback (most recent call last):
  File "client.py", line 46, in <module>
    main()
  File "client.py", line 42, in main
    'favorite_color': random.choice(colors)
  File "client.py", line 31, in send_message
    connection.send(data)
BrokenPipeError: [Errno 32] Broken pipe

Oh. Looks like the server is shutting us down as soon as it gets a single message from us. Well, more accurately, as soon as it receives any data, up to 1024 bytes it interprets it then prints it and shuts down. We need to tell the server to keep the connection open

...
def main():
    ...
    while True:
        conn, addr = sock.accept()
        while True:
            handle_client(conn, addr)
        conn.close()

This says 'keep handling the client forever'. Which by that we mean 'read off a message and then display the output'. Alright, we'll run that. It should just keep looping and reading messages forever. When the client shuts its side of the connection it will...probably...um..throw an exception.

ERROR:root:Invalid UTF-8 input bytes: b'\x01\x04\x16avro.schema\xba\x03{"fields": [{"type": "string", "name"'
Traceback (most recent call last):
  File "server.py", line 28, in <module>
    main()
  File "server.py", line 24, in main
    handle_client(conn, addr)
  File "server.py", line 11, in handle_client
    for thing in reader:
  File "/Users/eliribble/src/streamus/ve/lib/python3.4/site-packages/avro/datafile.py", line 522, in __next__
    datum = self.datum_reader.read(self.datum_decoder)
  File "/Users/eliribble/src/streamus/ve/lib/python3.4/site-packages/avro/io.py", line 480, in read
    return self.read_data(self.writer_schema, self.reader_schema, decoder)
  File "/Users/eliribble/src/streamus/ve/lib/python3.4/site-packages/avro/io.py", line 525, in read_data
    return self.read_record(writer_schema, reader_schema, decoder)
  File "/Users/eliribble/src/streamus/ve/lib/python3.4/site-packages/avro/io.py", line 725, in read_record
    field_val = self.read_data(field.type, readers_field.type, decoder)
  File "/Users/eliribble/src/streamus/ve/lib/python3.4/site-packages/avro/io.py", line 503, in read_data
    return decoder.read_utf8()
  File "/Users/eliribble/src/streamus/ve/lib/python3.4/site-packages/avro/io.py", line 253, in read_utf8
    raise exn
  File "/Users/eliribble/src/streamus/ve/lib/python3.4/site-packages/avro/io.py", line 250, in read_utf8
    return input_bytes.decode('utf-8')
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xba in position 14: invalid start byte

Well, that's no good. Something must be wrong. Our error is coming from well inside Avro's io module. So what's the problem? We can get a clue if we print out the number of bytes we receive before interpreting the bytes:

def handle_client(connection, address):
    data = connection.recv(1024)
    message_buf = io.BytesIO(data)
    print(len(data))
    ...

On my machine this shows values around 920, but this will depend a great deal on many factors in your system. The socket.recv method only receives up to a certain number of bytes. The return value tells you exactly how many you received. If we try to hand a partial message to Avro it won't know what to do with it and will fail. Avro's job starts at a well-formed data buffer and ends at a well-understood message. It doesn't concern itself with how the data gets transported from one side to another. That's your job.

So how do we ensure we have only whole messages before we send them to Avro? The solution is surprisingly simple. We just prefix each message with the size. We know the size will always be a 64-bit integer (4 bytes) so that should always be the first thing we read. Then we read until we get that number of bytes. So, say our message is 37 bytes long. We write 37 to the socket. The other side reads 37 from the socket. Then it reads until it gets 37 bytes. Then it hands the bytes it read to Avro for interpretation.

It looks like this. Client first:

Client

...
import struct

def send_message(connection, message):
    buf = io.BytesIO()
    writer = avro.datafile.DataFileWriter(buf, avro.io.DatumWriter(), SCHEMA)
    writer.append(message)
    writer.flush()
    data_length = buf.tell()
    buf.seek(0)
    data = buf.read()
    bytes_written = connection.send(struct.pack("!L", data_length))
    print("Wrote bytes", bytes_written)
    connection.send(data)

This is mostly the same. We have only added the line to get the data_length value from our IO buffer before we seek back to the beginning of the buffer. Then we use the excellent struct library built in to Python to write out a 64-bit value for our message size.

On the server we have to make things a bit more complicated. First we need a function for reliably reading a block of data. A block should have a well-understood size so we can keep gather data until we have it

Server

class Disconnect(Exception):
    pass

def read_block(connection, message_size):
    bytes_read = 0
    block = b''
    while bytes_read < message_size:
        data = connection.recv(message_size - bytes_read)
        if len(data) == 0:
            raise Disconnect()
        block += data
        bytes_read += len(data)
        print("Read {} bytes".format(len(data)))
    print("Read {} byte block".format(len(block)))
    return block

We tell the function our block size then keep reading until we either have it or we get an empty read. That means the client disconnected. I've added some print statements so its easy to see what's going on. Now that we have a way to read these blocks, let's use them

def handle_client(connection, address):
    try:
        while True:
            size_block = read_block(connection, 4)
            message_size, = struct.unpack("!L", size_block)
            message_block = read_block(connection, message_size)

            message_buf = io.BytesIO(message_block)
            reader = avro.datafile.DataFileReader(message_buf, avro.io.DatumReader())
            for thing in reader:
                print(thing)
            reader.close()
    except Disconnect as e:
        print("Disconnected")

I've updated the handle_client code so that it just runs until the client disconnects. This makes our calling code cleaner and more understandable. We start by reading in a size block which we know will be 4 bytes in size. We use the excellent struct module to unpack the data into an integer. Then we use that to read the next block. That block must be our Avro message. The rest of the code is the same, stuff the data into a buffer and give it to Avro to parse. So what does our output look like?

Read 4 bytes
Read 4 byte block
Read 309 bytes
Read 309 byte block
{'favorite_color': 'fuscia', 'favorite_number': 70, 'name': 'Anusha'}
Read 4 bytes
Read 4 byte block
Read 305 bytes
Read 305 byte block
{'favorite_color': 'green', 'favorite_number': 30, 'name': 'Nick'}
Read 4 bytes
Read 4 byte block
Read 306 bytes
Read 306 byte block

If you were doing this over an actual network you'd likely see some chunking as the data is broken into packets and we receive portions of our message by a few hundred bytes at a time. I'll leave that as an exercise for the reader.

That's it! You've got a basic client/server in Python using Avro to serialize messages without a shared schema. We use this at Authentise to stream data to 3D printers. It's efficient, it's easy to support changes and surprisingly lightweight. If we wanted to write clients in things that aren't Python we can easily do that using one of the many other language bindings that Avro supports.

Happy Hacking!

Final Client

import io
import json
import random
import socket
import struct
import avro.datafile
import avro.schema
import avro.io
import avro.ipc

SCHEMA = avro.schema.Parse(json.dumps({
 "namespace"    : "example.avro",
 "type"         : "record",
 "name"         : "User",
 "fields"       : [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}))

def send_message(connection, message):
    buf = io.BytesIO()
    writer = avro.datafile.DataFileWriter(buf, avro.io.DatumWriter(), SCHEMA)
    writer.append(message)
    writer.flush()
    data_length = buf.tell()
    buf.seek(0)
    data = buf.read()
    bytes_written = connection.send(struct.pack("!L", data_length))
    print("Wrote bytes", bytes_written)
    bytes_written = connection.send(data)
    print("Wrote bytes", bytes_written)

def main():
    connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    connection.connect(('127.0.0.1', 12345))
    names = ["Nick", "Scott", "Josh", "Anusha", "Eli"]
    colors = ["red", "green", "blue", "black", "fuscia"]
    for _ in range(20):
        send_message(connection, {
            'name': random.choice(names),
            'favorite_number': random.randint(0, 100),
            'favorite_color': random.choice(colors)
        })

if __name__ == '__main__':
    main()

Final Server

import avro.datafile
import avro.io
import io
import socket
import struct

class Disconnect(Exception):
    pass

def read_block(connection, message_size):
    bytes_read = 0
    block = b''
    while bytes_read < message_size:
        data = connection.recv(message_size - bytes_read)
        if len(data) == 0:
            raise Disconnect()
        block += data
        bytes_read += len(data)
        print("Read {} bytes".format(len(data)))
    print("Read {} byte block".format(len(block)))
    return block

def handle_client(connection, address):
    try:
        while True:
            size_block = read_block(connection, 4)
            message_size, = struct.unpack("!L", size_block)
            message_block = read_block(connection, message_size)

            message_buf = io.BytesIO(message_block)
            reader = avro.datafile.DataFileReader(message_buf, avro.io.DatumReader())
            for thing in reader:
                print(thing)
            reader.close()
    except Disconnect as e:
        print("Disconnected")

def main():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('127.0.0.1', 12345))
    sock.listen(10)

    while True:
        conn, addr = sock.accept()
        handle_client(conn, addr)
        conn.close()

if __name__ == '__main__':
    main()