Sticky Session Controller for Kubernetes
How to implement a sticky session controller for kubernetes with .net6 and YARP.
In this post I want to show an implemention of a sticky session controller for kubernetes in .net6.
Some ingress controller for kubernetes support sticky session handling, like Traefik. The provided
solution for Traefik is based on cookies. But what if a client can not handle cookies? Or a more
complex strategy is needed to find the right instance of a pod?
With the help of two nuget packages
- KubernetesClient
- Yarp.ReverseProxy
it is easy to implement an own controller.
First the controller needs to inspect the kubernetes cluster and find all target pods.
With the help of the KubernetesClient, the controller can register a listener to
get informed about all pod actions.
We have to filter the list of the pods to find all pods we are interested in.
Now that we have all pods, we can store their IPs in memory.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
public class PodIpCollectorService : BackgroundService
{
// thread safe list
public static readonly ArrayList IpAdresses = ArrayList.Synchronized(new());
private readonly IConfiguration _configuration;
public PodIpCollectorService(IConfiguration configuration)
{
_configuration = configuration;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var inCluster = _configuration.GetValue("IN_CLUSTER", false);
var config = inCluster
? KubernetesClientConfiguration.InClusterConfig()
: KubernetesClientConfiguration.BuildConfigFromConfigFile();
IKubernetes client = new Kubernetes(config);
var kubernetesNamespace = _configuration.GetValue("K8S_NAMESPACE_TO_WATCH", "default");
var podlistResp = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync(kubernetesNamespace, watch: true, cancellationToken: stoppingToken);
await foreach (var (type, item) in podlistResp.WatchAsync<V1Pod, V1PodList>().WithCancellation(stoppingToken))
{
if (item.Metadata.Annotations.TryGetValue("sticky-session-controller", out string? value))
{
if ("true".Equals(value, StringComparison.InvariantCultureIgnoreCase))
{
switch (type)
{
case WatchEventType.Added:
IpAdresses.Add(item.Status.PodIP);
break;
case WatchEventType.Deleted:
case WatchEventType.Error:
IpAdresses.Remove(item.Status.PodIP);
break;
}
}
}
}
}
}
|
Let’s assume that the client identifies himself with a unique id in the header and that id must be send
to the same pod if the pod still exists.
To forward the request, we use the Yarp.ReverseProxy.
So the only steps our controller must do is:
- If a request enters, read the id from header.
- Check if the id exists in a Dictionary with a given IP.
- If there is no entry for the id, create a new entry and pick an IP from the IP list.
- If an entry exist, take the IP from the dictionary.
- Send the request to the pod.
- Listen to pod creation and deletion events to add new IPs to the list or remove an IP from the list and the dictionary.
To receive the path of the current request the HttpContextAccessor has to be added to the services.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Http;
using StickySessionHelperExample;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Yarp.ReverseProxy.Forwarder;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddHttpForwarder();
builder.Services.AddHostedService<PodIpCollectorService>();
var httpClient = new HttpMessageInvoker(new SocketsHttpHandler
{
UseProxy = false,
AllowAutoRedirect = false,
AutomaticDecompression = DecompressionMethods.None,
UseCookies = false,
ActivityHeadersPropagator = new ReverseProxyPropagator(DistributedContextPropagator.Current)
});
var transformer = HttpTransformer.Default;
var requestConfig = new ForwarderRequestConfig { ActivityTimeout = TimeSpan.FromSeconds(100) };
Random rnd = new Random();
Dictionary<string, string?> idToIpMap = new();
var app = builder.Build();
app.UseRouting();
app.UseEndpoints(endpoints =>
endpoints.Map("/{**catch-all}", handler: async (HttpContext httpContext, IHttpForwarder forwarder) =>
{
var clientId = httpContext.Request.Headers["client-id"];
if (idToIpMap.TryGetValue(clientId.ToString(), out var ipAddress))
{
// Check if IP still exists in List
if (!PodIpCollectorService.IpAdresses.Contains(ipAddress))
{
// Reset Ip if it does not exist anymore
ipAddress = null;
}
}
if (ipAddress != null)
{
var ipAddressNumber = rnd.Next(0, PodIpCollectorService.IpAdresses.Count);
ipAddress = PodIpCollectorService.IpAdresses[ipAddressNumber] as string;
idToIpMap[clientId.ToString()] = ipAddress;
}
var error = await forwarder.SendAsync(httpContext, $"http://{ipAddress}/",
httpClient, requestConfig, transformer);
// Check if the operation was successful
if (error != ForwarderError.None)
{
var errorFeature = httpContext.GetForwarderErrorFeature();
var exception = errorFeature.Exception;
// Error handling
}
})
);
app.Run();
|
This simple implementation works only if there is one controller. If there are more than one controller
instances, the id to ip map must be stored in persistence layer like redis or the map must be synchronized
between all controller instances.
Also the failover handling if a client gets a new pod is not implemented and it is part of the client
and target pod definition what happens if the target pod changes.
Maybe a special http status must be send to the client, or the client and the target pod have their own
protocoll to setup a new session, …