A Rust controller for Kubernetes

To teach myself Kubernetes in general and controllers in particular, I previously developed one in Java. This week, I decided to do the same in Rust by following the same steps I did.

The guiding principle

The guiding principle is creating a Kubernetes controller that watches pods' lifecycle and injects a sidecar into them. When Kubernetes schedules the pod, the controller schedules the sidecar; when it deletes the former, it deletes the latter as well.

I understand this would be better handled by the out-of-the-box admission controller, but it's a good learning exercise. Because of that, I approached the development through several steps:

  1. Print the list of pods to the console. After the code is executed, the process stops. Kubernetes sees the pod as failing; it kills it and schedules a new one. Rinse and repeat.
  2. Make the program a loop so that it doesn't exit
  3. Use a dedicated listener instead of a loop to be notified of lifecycle events
  4. Replace logging by doing the scheduling of the actual sidecar

The original Java project used quite more steps, but they are not relevant for this post.

Setting up the project

This project is the first one on my Rust path in which I had to set up a project from scratch. All my previous work either used an existing project or copied a provided template.

With cargo, it's pretty straightforward:

cargo new rust-operator     # 1
  1. Create the rust-operator folder and scaffold a new skeleton project in it. Alternatively, one can use cargo init instead in an existing folder.

The initial structure is the following:

rust-operator/
├── Cargo.toml
└── src
    └── main.rs

With the following content:

[package]
name = "rust-operator"                                # 1
version = "0.1.0"
authors = ["Nicolas Frankel <[email protected]>"]    # 2
edition = "2018"                                      # 3

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]                                        # 4
  1. Package name
  2. Author taken from the global Git configuration
  3. Latest Rust edition
  4. Start with an empty dependency list
fn main() {
    println!("Hello, world!");
}

At this point, you can:

  1. Build the package

    cargo b
    
  2. And run it

    target/debug/rust-operator
    

As expected, it outputs:

Hello, world!

Logging

We will start by replacing the out-of-the-box logging macro with a library with logging levels (debug, info, etc.).

After some research, I found the log4rs crate:

log4rs is a highly configurable logging framework modeled after Java's Logback and log4j libraries.

Given that I've don't have enough perspective on the Rust library ecosystem, I chose log4rs because its design is similar to Log4J.

To use log4rs, you need to add two dependencies:

[dependencies]
log4rs = "1.0.0"        # 1
log = "0.4.14"          # 2
  1. Add log4rs
  2. log is a lightweight logging facade that will use log4rs as its implementation

We can now replace the logging macro in the source file:

use log::info;
use log4rs;

fn main() {
    log4rs::init_file("log4rs.yml", Default::default()).unwrap();      // 1
    info!("Hello, world!");                                            // 2
}
  1. Initialize log4rs, expecting a log4rs.yml file
  2. Proper log

Now, running the binary might output a log or not, depending on the log4rs.yml file content. With my sample, it prints:

2021-07-05T16:26:16.041150+02:00 INFO rust_operator - Hello, world!

Calling Kubernetes API from Rust

As for logging, I had to search for libraries. My criteria were simple: it was to be not too low-level, so I don't have to deal with HTTP calls, but not too high-level, so I don't need to do anything. In the end, I set my eyes on kube:

Crate for interacting with the Kubernetes API

This crate includes the tools for manipulating Kubernetes resources as well as keeping track of those resources as they change over time

We also need the Rust bindings generated from the Open API specification:

[dependencies]
kube = "0.52.0"
k8s-openapi = { version = "0.11.0", default-features = false,
                                    features = ["v1_19"] }     # 1
  1. The library uses features, one feature for each Kubernetes API version. Set the one relevant for your cluster version.

The entry point into the API is Client:

To get a Client, the easiest is to use Client::try_default(). As in the Java code:

Will use Config::infer to try in-cluster environment variables first, then fallback to the local kubeconfig.

Will fail if neither configuration could be loaded.

Asynchronous calls

Astute readers might have noticed the <<async>> stereotype in the above diagram. Every Client call is indeed asynchronous. Rust supports asynchronicity natively with the Future trait, async/await syntax, and the futures crate. Let's try to get a Client:

fn main() {
    Client::try_default();
}

The snippet compiles but outputs a warning:

warning: unused implementer of `futures::Future` that must be used
  --> src/main.rs:14:5
   |
14 |     Client::try_default();
   |     ^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: `#[warn(unused_must_use)]` on by default
   = note: futures do nothing unless you `.await` or poll them

As the compiler noticed, a Future does nothing on its own. To use it, we have to await it.

fn main() {
    Client::try_default().await;
}

Now, the code fails with:

error[E0728]: `await` is only allowed inside `async` functions and blocks
  --> src/main.rs:14:18
   |
12 | fn main() {
   |    ---- this is not `async`
13 |     log4rs::init_file("log4rs.yml", Default::default()).unwrap();
14 |     let client = Client::try_default().await;
   |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks

The lesson is that you can only call async functions from other async functions (and blocks); and main is not async. The easiest path is to bring Tokio, a crate dedicated to async programming. Tokio offers a macro to make main asynchronous.

#[tokio::main]
async fn main() {
    Client::try_default().await;
}

The last step is to unwrap the Client contained inside the Result.

#[tokio::main]
async fn main() {
    let client = Client::try_default().await.unwrap();
}

Listing pods

At this point, we have obtained a Client and can interact with the Kubernetes cluster, e.g., to list pods. The entry point is the generic Api<K> type, where K is the Kubernetes object we are interested in.

Let's start small and list pods for now.
Api::list() requires a ListParams parameter, which implements Default.
Hence, the code is quite straightforward:

#[tokio::main]
async fn main() {
    let api: Api<Pod> = Api::namespaced(client, "kube-system");   <1>
    api.list(&ListParams::default())
        .await
        .unwrap()
        .items
        .iter()
        .map(|pod| pod.name())
        .for_each(|name| info!("{}", name));
}
  1. All pods objects in the kube-system namespace

The code works as expected!

Watching pods

The next step is to migrate from a pull approach to a push approach, i.e., to watch the pods. For this, we have to register a watch to be notified of every change. Aye, there's the rub. While the API is pretty simple, the code itself is not.

#[tokio::main]
async fn main() {
    let client = Client::try_default().await.unwrap();
    let api: Api<Pod> = Api::namespaced(client, "kube-system");
    let mut stream = api.watch(&ListParams::default(), "0")        <1>
      .await?                                                      <2>
      .boxed();                                                    <3>
    while let Some(event) = stream
      .try_next()                                                  <4>
      .await? {
        match event {
            _ => {}
        };
    }
}
  1. Call watch() that returns a Result
  2. Await to either get the underlying Stream or return an Error
  3. Box the Stream. At the time of this writing, my understanding of pinning is zero, so assume that it's needed and works.
  4. Try to get the next item in the Stream wrapped in a Result
  5. Either get the underlying WatchEvent or return an Error

Yet, compilation fails because of await?:

error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `Try`)
  --> src/main.rs:12:22
   |
8  |   async fn main() {
   |  _________________-
 9 | |     let client = Client::try_default().await.unwrap();
10 | |     let api: Api<Pod> = Api::namespaced(client, "kube-system");
11 | |     let mut stream = api.watch(&ListParams::default(), "0").await?.boxed();
   | |                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cannot use the `?` operator in an async block that returns `()`
12 | |     while let Some(event) = stream.try_next().await? {
13 | |     }
14 | | }
   | |_- this function should return `Result` or `Option` to accept `?`
   |
   = help: the trait `Try` is not implemented for `()`
   = note: required by `from_error`

Remember that Result either contains a regular value or a failure, in general, an Error. The ? operator is a shortcut on Result that either:

  • unwraps the regular value so you can proceed further
  • returns the failure from the current function

In the above snippet, the main() function does not define any return type. To fix the compilation problem, we need to add it:

#[tokio::main]
async fn main() -> Result<(), Error> {                                        # 1
    let client = Client::try_default().await.unwrap();
    let api: Api<Pod> = Api::namespaced(client, "kube-system");
    let mut stream = api.watch(&ListParams::default(), "0").await?.boxed();
    while let Some(event) = stream.try_next().await? {
        match event {
            WatchEvent::Added(pod) => info!("ADDED: {}", pod.name()),
            WatchEvent::Modified(pod) => info!("UPDATED: {}", pod.name()),
            WatchEvent::Deleted(pod) => info!("DELETED: {}", pod.name()),
            WatchEvent::Error(e) => error!("ERROR: {} {} ({})", e.code, e.message, e.status),
            _ => {}
        };
    }
    Ok(())                                                                    # 2
}
  1. Define an empty Result to return
  2. Necessary to compile successfully

The rest of the controller code is pretty straightforward: every time a pod is added, if it's not a sidecar, add a sidecar to the pod, and make the latter its owner.

Containerizing the controller

To containerize the code, we use a multi-stage build:

FROM ekidd/rust-musl-builder:1.51.0 as build

WORKDIR /app

COPY src src
COPY Cargo.lock .
COPY Cargo.toml .

RUN cargo build --release                       # 1

FROM scratch                                    # 2

WORKDIR /app

COPY --from=build /app/target/x86_64-unknown-linux-musl/release/rust-operator /app
COPY log4rs.yml .                               # 3

CMD ["./rust-operator"]
  1. Build the binary as a release
  2. Start from scratch for the smallest size possible
  3. Don't forget the logging configuration file

IMHO, the final size is good enough:

REPOSITORY           TAG           IMAGE ID             CREATED             SIZE
rust-operator        latest        5cac942d46a0         1 hour ago          18MB

Conclusion

In this post, we have described how to create a Kubernetes controller. In Rust, it means:

  1. Setting up a project
  2. Adding the few needed dependencies
  3. Coding
  4. Configuring a multi-stage build
  5. Enjoy!

Besides readable code, the most significant benefit is all the Rust compiler's hints to generate safe code.

The complete source code for this post can be found on Github.

To go further:

Originally published at A Java Geek on July 11th, 2021

26