26
A Rust controller for Kubernetes
To teach myself Kubernetes in general and controllers in particular, I previously developed one in Java. This week, I decided to do the same in Rust by following the same steps I did.
The guiding principle is creating a Kubernetes controller that watches pods' lifecycle and injects a sidecar into them. When Kubernetes schedules the pod, the controller schedules the sidecar; when it deletes the former, it deletes the latter as well.
I understand this would be better handled by the out-of-the-box admission controller, but it's a good learning exercise. Because of that, I approached the development through several steps:
- Print the list of pods to the console. After the code is executed, the process stops. Kubernetes sees the pod as failing; it kills it and schedules a new one. Rinse and repeat.
- Make the program a loop so that it doesn't exit
- Use a dedicated listener instead of a loop to be notified of lifecycle events
- Replace logging by doing the scheduling of the actual sidecar
The original Java project used quite more steps, but they are not relevant for this post.
This project is the first one on my Rust path in which I had to set up a project from scratch. All my previous work either used an existing project or copied a provided template.
With cargo
, it's pretty straightforward:
cargo new rust-operator # 1
- Create the
rust-operator
folder and scaffold a new skeleton project in it. Alternatively, one can usecargo init
instead in an existing folder.
The initial structure is the following:
rust-operator/
├── Cargo.toml
└── src
└── main.rs
With the following content:
[package]
name = "rust-operator" # 1
version = "0.1.0"
authors = ["Nicolas Frankel <[email protected]>"] # 2
edition = "2018" # 3
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] # 4
- Package name
- Author taken from the global Git configuration
- Latest Rust edition
- Start with an empty dependency list
fn main() {
println!("Hello, world!");
}
At this point, you can:
-
Build the package
cargo b
-
And run it
target/debug/rust-operator
As expected, it outputs:
Hello, world!
We will start by replacing the out-of-the-box logging macro with a library with logging levels (debug, info, etc.).
After some research, I found the log4rs crate:
log4rs is a highly configurable logging framework modeled after Java's Logback and log4j libraries.
Given that I've don't have enough perspective on the Rust library ecosystem, I chose log4rs
because its design is similar to Log4J.
To use log4rs
, you need to add two dependencies:
[dependencies]
log4rs = "1.0.0" # 1
log = "0.4.14" # 2
- Add log4rs
-
log
is a lightweight logging facade that will uselog4rs
as its implementation
We can now replace the logging macro in the source file:
use log::info;
use log4rs;
fn main() {
log4rs::init_file("log4rs.yml", Default::default()).unwrap(); // 1
info!("Hello, world!"); // 2
}
- Initialize
log4rs
, expecting alog4rs.yml
file - Proper log
Now, running the binary might output a log or not, depending on the log4rs.yml
file content. With my sample, it prints:
2021-07-05T16:26:16.041150+02:00 INFO rust_operator - Hello, world!
As for logging, I had to search for libraries. My criteria were simple: it was to be not too low-level, so I don't have to deal with HTTP calls, but not too high-level, so I don't need to do anything. In the end, I set my eyes on kube:
Crate for interacting with the Kubernetes API
This crate includes the tools for manipulating Kubernetes resources as well as keeping track of those resources as they change over time
We also need the Rust bindings generated from the Open API specification:
[dependencies]
kube = "0.52.0"
k8s-openapi = { version = "0.11.0", default-features = false,
features = ["v1_19"] } # 1
- The library uses features, one feature for each Kubernetes API version. Set the one relevant for your cluster version.
The entry point into the API is Client
:
To get a Client
, the easiest is to use Client::try_default()
. As in the Java code:
Will use
Config::infer
to try in-cluster environment variables first, then fallback to the local kubeconfig.Will fail if neither configuration could be loaded.
Astute readers might have noticed the <<async>>
stereotype in the above diagram. Every Client
call is indeed asynchronous. Rust supports asynchronicity natively with the Future
trait, async
/await
syntax, and the futures
crate. Let's try to get a Client
:
fn main() {
Client::try_default();
}
The snippet compiles but outputs a warning:
warning: unused implementer of `futures::Future` that must be used
--> src/main.rs:14:5
|
14 | Client::try_default();
| ^^^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(unused_must_use)]` on by default
= note: futures do nothing unless you `.await` or poll them
As the compiler noticed, a Future
does nothing on its own. To use it, we have to await
it.
fn main() {
Client::try_default().await;
}
Now, the code fails with:
error[E0728]: `await` is only allowed inside `async` functions and blocks
--> src/main.rs:14:18
|
12 | fn main() {
| ---- this is not `async`
13 | log4rs::init_file("log4rs.yml", Default::default()).unwrap();
14 | let client = Client::try_default().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks
The lesson is that you can only call async
functions from other async
functions (and blocks); and main
is not async
. The easiest path is to bring Tokio, a crate dedicated to async programming. Tokio offers a macro to make main
asynchronous.
#[tokio::main]
async fn main() {
Client::try_default().await;
}
The last step is to unwrap the Client
contained inside the Result
.
#[tokio::main]
async fn main() {
let client = Client::try_default().await.unwrap();
}
At this point, we have obtained a Client
and can interact with the Kubernetes cluster, e.g., to list pods. The entry point is the generic Api<K>
type, where K
is the Kubernetes object we are interested in.
Let's start small and list pods for now.
Api::list()
requires a ListParams
parameter, which implements Default
.
Hence, the code is quite straightforward:
#[tokio::main]
async fn main() {
let api: Api<Pod> = Api::namespaced(client, "kube-system"); <1>
api.list(&ListParams::default())
.await
.unwrap()
.items
.iter()
.map(|pod| pod.name())
.for_each(|name| info!("{}", name));
}
- All pods objects in the
kube-system
namespace
The code works as expected!
The next step is to migrate from a pull approach to a push approach, i.e., to watch the pods. For this, we have to register a watch to be notified of every change. Aye, there's the rub. While the API is pretty simple, the code itself is not.
#[tokio::main]
async fn main() {
let client = Client::try_default().await.unwrap();
let api: Api<Pod> = Api::namespaced(client, "kube-system");
let mut stream = api.watch(&ListParams::default(), "0") <1>
.await? <2>
.boxed(); <3>
while let Some(event) = stream
.try_next() <4>
.await? {
match event {
_ => {}
};
}
}
- Call
watch()
that returns aResult
- Await to either get the underlying
Stream
or return anError
- Box the
Stream
. At the time of this writing, my understanding of pinning is zero, so assume that it's needed and works. - Try to get the next item in the
Stream
wrapped in aResult
- Either get the underlying
WatchEvent
or return anError
Yet, compilation fails because of await?
:
error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `Try`)
--> src/main.rs:12:22
|
8 | async fn main() {
| _________________-
9 | | let client = Client::try_default().await.unwrap();
10 | | let api: Api<Pod> = Api::namespaced(client, "kube-system");
11 | | let mut stream = api.watch(&ListParams::default(), "0").await?.boxed();
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cannot use the `?` operator in an async block that returns `()`
12 | | while let Some(event) = stream.try_next().await? {
13 | | }
14 | | }
| |_- this function should return `Result` or `Option` to accept `?`
|
= help: the trait `Try` is not implemented for `()`
= note: required by `from_error`
Remember that Result
either contains a regular value or a failure, in general, an Error
. The ?
operator is a shortcut on Result
that either:
- unwraps the regular value so you can proceed further
- returns the failure from the current function
In the above snippet, the main()
function does not define any return type. To fix the compilation problem, we need to add it:
#[tokio::main]
async fn main() -> Result<(), Error> { # 1
let client = Client::try_default().await.unwrap();
let api: Api<Pod> = Api::namespaced(client, "kube-system");
let mut stream = api.watch(&ListParams::default(), "0").await?.boxed();
while let Some(event) = stream.try_next().await? {
match event {
WatchEvent::Added(pod) => info!("ADDED: {}", pod.name()),
WatchEvent::Modified(pod) => info!("UPDATED: {}", pod.name()),
WatchEvent::Deleted(pod) => info!("DELETED: {}", pod.name()),
WatchEvent::Error(e) => error!("ERROR: {} {} ({})", e.code, e.message, e.status),
_ => {}
};
}
Ok(()) # 2
}
- Define an empty
Result
to return - Necessary to compile successfully
The rest of the controller code is pretty straightforward: every time a pod is added, if it's not a sidecar, add a sidecar to the pod, and make the latter its owner.
To containerize the code, we use a multi-stage build:
FROM ekidd/rust-musl-builder:1.51.0 as build
WORKDIR /app
COPY src src
COPY Cargo.lock .
COPY Cargo.toml .
RUN cargo build --release # 1
FROM scratch # 2
WORKDIR /app
COPY /app/target/x86_64-unknown-linux-musl/release/rust-operator /app
COPY log4rs.yml . # 3
CMD ["./rust-operator"]
- Build the binary as a release
- Start from
scratch
for the smallest size possible - Don't forget the logging configuration file
IMHO, the final size is good enough:
REPOSITORY TAG IMAGE ID CREATED SIZE
rust-operator latest 5cac942d46a0 1 hour ago 18MB
In this post, we have described how to create a Kubernetes controller. In Rust, it means:
- Setting up a project
- Adding the few needed dependencies
- Coding
- Configuring a multi-stage build
- Enjoy!
Besides readable code, the most significant benefit is all the Rust compiler's hints to generate safe code.
The complete source code for this post can be found on Github.
To go further:
Originally published at A Java Geek on July 11th, 2021
26