38
A Rust controller for Kubernetes
To teach myself Kubernetes in general and controllers in particular, I previously developed one in Java. This week, I decided to do the same in Rust by following the same steps I did.
The guiding principle is creating a Kubernetes controller that watches pods' lifecycle and injects a sidecar into them. When Kubernetes schedules the pod, the controller schedules the sidecar; when it deletes the former, it deletes the latter as well.
I understand this would be better handled by the out-of-the-box admission controller, but it's a good learning exercise. Because of that, I approached the development through several steps:
The original Java project used quite more steps, but they are not relevant for this post.
This project is the first one on my Rust path in which I had to set up a project from scratch. All my previous work either used an existing project or copied a provided template.
With
cargo
, it's pretty straightforward:cargo new rust-operator # 1
rust-operator
folder and scaffold a new skeleton project in it. Alternatively, one can use cargo init
instead in an existing folder.The initial structure is the following:
rust-operator/
├── Cargo.toml
└── src
└── main.rs
With the following content:
[package]
name = "rust-operator" # 1
version = "0.1.0"
authors = ["Nicolas Frankel <nicolas@frankel.ch>"] # 2
edition = "2018" # 3
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] # 4
fn main() {
println!("Hello, world!");
}
At this point, you can:
Build the package
cargo b
And run it
target/debug/rust-operator
As expected, it outputs:
Hello, world!
We will start by replacing the out-of-the-box logging macro with a library with logging levels (debug, info, etc.).
After some research, I found the log4rs crate:
log4rs is a highly configurable logging framework modeled after Java's Logback and log4j libraries.
Given that I've don't have enough perspective on the Rust library ecosystem, I chose
log4rs
because its design is similar to Log4J.To use
log4rs
, you need to add two dependencies:[dependencies]
log4rs = "1.0.0" # 1
log = "0.4.14" # 2
log
is a lightweight logging facade that will use log4rs
as its implementationWe can now replace the logging macro in the source file:
use log::info;
use log4rs;
fn main() {
log4rs::init_file("log4rs.yml", Default::default()).unwrap(); // 1
info!("Hello, world!"); // 2
}
log4rs
, expecting a log4rs.yml
fileNow, running the binary might output a log or not, depending on the
log4rs.yml
file content. With my sample, it prints:2021-07-05T16:26:16.041150+02:00 INFO rust_operator - Hello, world!
As for logging, I had to search for libraries. My criteria were simple: it was to be not too low-level, so I don't have to deal with HTTP calls, but not too high-level, so I don't need to do anything. In the end, I set my eyes on kube:
Crate for interacting with the Kubernetes API
This crate includes the tools for manipulating Kubernetes resources as well as keeping track of those resources as they change over time
We also need the Rust bindings generated from the Open API specification:
[dependencies]
kube = "0.52.0"
k8s-openapi = { version = "0.11.0", default-features = false,
features = ["v1_19"] } # 1
The entry point into the API is
Client
:
To get a
Client
, the easiest is to use Client::try_default()
. As in the Java code:Will use Config::infer
to try in-cluster environment variables first, then fallback to the local kubeconfig.
Will fail if neither configuration could be loaded.
Astute readers might have noticed the
<<async>>
stereotype in the above diagram. Every Client
call is indeed asynchronous. Rust supports asynchronicity natively with the Future
trait, async
/await
syntax, and the futures
crate. Let's try to get a Client
:fn main() {
Client::try_default();
}
The snippet compiles but outputs a warning:
warning: unused implementer of `futures::Future` that must be used
--> src/main.rs:14:5
|
14 | Client::try_default();
| ^^^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(unused_must_use)]` on by default
= note: futures do nothing unless you `.await` or poll them
As the compiler noticed, a
Future
does nothing on its own. To use it, we have to await
it.fn main() {
Client::try_default().await;
}
Now, the code fails with:
error[E0728]: `await` is only allowed inside `async` functions and blocks
--> src/main.rs:14:18
|
12 | fn main() {
| ---- this is not `async`
13 | log4rs::init_file("log4rs.yml", Default::default()).unwrap();
14 | let client = Client::try_default().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks
The lesson is that you can only call
async
functions from other async
functions (and blocks); and main
is not async
. The easiest path is to bring Tokio, a crate dedicated to async programming. Tokio offers a macro to make main
asynchronous.#[tokio::main]
async fn main() {
Client::try_default().await;
}
The last step is to unwrap the
Client
contained inside the Result
.#[tokio::main]
async fn main() {
let client = Client::try_default().await.unwrap();
}
At this point, we have obtained a
Client
and can interact with the Kubernetes cluster, e.g., to list pods. The entry point is the generic Api<K>
type, where K
is the Kubernetes object we are interested in.
Let's start small and list pods for now.
Hence, the code is quite straightforward:
Api::list()
requires a ListParams
parameter, which implements Default
.Hence, the code is quite straightforward:
#[tokio::main]
async fn main() {
let api: Api<Pod> = Api::namespaced(client, "kube-system"); <1>
api.list(&ListParams::default())
.await
.unwrap()
.items
.iter()
.map(|pod| pod.name())
.for_each(|name| info!("{}", name));
}
kube-system
namespaceThe code works as expected!
The next step is to migrate from a pull approach to a push approach, i.e., to watch the pods. For this, we have to register a watch to be notified of every change. Aye, there's the rub. While the API is pretty simple, the code itself is not.
#[tokio::main]
async fn main() {
let client = Client::try_default().await.unwrap();
let api: Api<Pod> = Api::namespaced(client, "kube-system");
let mut stream = api.watch(&ListParams::default(), "0") <1>
.await? <2>
.boxed(); <3>
while let Some(event) = stream
.try_next() <4>
.await? {
match event {
_ => {}
};
}
}
watch()
that returns a Result
Stream
or return an Error
Stream
.
At the time of this writing, my understanding of pinning is zero, so assume that it's needed and works.Stream
wrapped in a Result
WatchEvent
or return an Error
Yet, compilation fails because of
await?
:error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `Try`)
--> src/main.rs:12:22
|
8 | async fn main() {
| _________________-
9 | | let client = Client::try_default().await.unwrap();
10 | | let api: Api<Pod> = Api::namespaced(client, "kube-system");
11 | | let mut stream = api.watch(&ListParams::default(), "0").await?.boxed();
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cannot use the `?` operator in an async block that returns `()`
12 | | while let Some(event) = stream.try_next().await? {
13 | | }
14 | | }
| |_- this function should return `Result` or `Option` to accept `?`
|
= help: the trait `Try` is not implemented for `()`
= note: required by `from_error`
Remember that
Result
either contains a regular value or a failure, in general, an Error
. The ?
operator is a shortcut on Result
that either:In the above snippet, the
main()
function does not define any return type. To fix the compilation problem, we need to add it:#[tokio::main]
async fn main() -> Result<(), Error> { # 1
let client = Client::try_default().await.unwrap();
let api: Api<Pod> = Api::namespaced(client, "kube-system");
let mut stream = api.watch(&ListParams::default(), "0").await?.boxed();
while let Some(event) = stream.try_next().await? {
match event {
WatchEvent::Added(pod) => info!("ADDED: {}", pod.name()),
WatchEvent::Modified(pod) => info!("UPDATED: {}", pod.name()),
WatchEvent::Deleted(pod) => info!("DELETED: {}", pod.name()),
WatchEvent::Error(e) => error!("ERROR: {} {} ({})", e.code, e.message, e.status),
_ => {}
};
}
Ok(()) # 2
}
Result
to returnThe rest of the controller code is pretty straightforward: every time a pod is added, if it's not a sidecar, add a sidecar to the pod, and make the latter its owner.
To containerize the code, we use a multi-stage build:
FROM ekidd/rust-musl-builder:1.51.0 as build
WORKDIR /app
COPY src src
COPY Cargo.lock .
COPY Cargo.toml .
RUN cargo build --release # 1
FROM scratch # 2
WORKDIR /app
COPY /app/target/x86_64-unknown-linux-musl/release/rust-operator /app
COPY log4rs.yml . # 3
CMD ["./rust-operator"]
scratch
for the smallest size possibleIMHO, the final size is good enough:
REPOSITORY TAG IMAGE ID CREATED SIZE
rust-operator latest 5cac942d46a0 1 hour ago 18MB
In this post, we have described how to create a Kubernetes controller. In Rust, it means:
Besides readable code, the most significant benefit is all the Rust compiler's hints to generate safe code.
The complete source code for this post can be found on Github.
To go further:
Originally published at A Java Geek on July 11th, 2021
38