Skip to content

Instantly share code, notes, and snippets.

@SaveTheRbtz
Last active July 12, 2018 12:01
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save SaveTheRbtz/a0fd725f5a59170a1e80e7964be8d0cb to your computer and use it in GitHub Desktop.
Save SaveTheRbtz/a0fd725f5a59170a1e80e7964be8d0cb to your computer and use it in GitHub Desktop.
Consistent source hashing scheduler for IPVS (SH-derivative, GPL)
/*
* IPVS: Source Consistent Hashing scheduling module
*
* Authors: Jonathan Lee <jonlee@dropbox.com>
*
* Changes:
*
*/
#define KMSG_COMPONENT "IPVS"
#define pr_fmt(fmt) KMSG_COMPONENT ": " fmt
#include <linux/ip.h>
#include <linux/slab.h>
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/skbuff.h>
#include <linux/sort.h>
#include <net/ip_vs.h>
#include <net/tcp.h>
#include <linux/udp.h>
#include <linux/sctp.h>
/*
* IPVS SCH bucket
*/
struct ip_vs_sch_bucket {
u32 hashkey;
struct ip_vs_dest *dest; /* real server */
};
/*
* IPVS SCH consistent hash table
*/
struct ip_vs_sch_table {
struct rcu_head rcu_head;
u32 length;
struct ip_vs_sch_bucket *buckets;
};
struct ip_vs_sch_state {
struct rcu_head rcu_head;
struct ip_vs_sch_table __rcu *table;
};
/*
* Simplied murmur hash 3 implementation based off of
* https://github.com/dropbox/godropbox/blob/master/hash2/consistent_hash.go
*/
static inline u32 murmur_32(u32 val)
{
u32 k, h;
k = val * 0xcc9e2d51;
k = (k << 15) | (k >> 17);
k *= 0x1b873593;
h = 12345 ^ k;
h = (h << 13) | (h >> 19);
h = h*5 + 0xe6546b64;
h = h ^ 4;
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
h ^= h >> 16;
return h;
}
/* Helper function to determine if server is unavailable */
static inline bool is_unavailable(struct ip_vs_dest *dest)
{
return atomic_read(&dest->weight) <= 0 ||
dest->flags & IP_VS_DEST_F_OVERLOAD;
}
/*
* clockwise_index - find the clockwise index of a key in an array
* by doing a modified binary search. Used for consistent hashing.
*
* Based off of linux/lib/bsearch.
*/
static inline u32
clockwise_index(const void *key, const void *base, size_t num, size_t size,
int (*cmp)(const void *key, const void *elt))
{
u32 start = 0, end = num;
int result;
while (start < end) {
u32 mid = start + (end - start) / 2;
result = cmp(key, base + mid * size);
if (result < 0)
end = mid;
else if (result > 0)
start = mid + 1;
else
return mid;
}
return num != 0 ? start % num : 0;
}
/*
* Branchless bucket comparing.
*
* Should not have overflow or underflow issues.
*/
static inline int ip_vs_sch_cmp(const void *a, const void *b)
{
u32 a_h = ((struct ip_vs_sch_bucket *)a)->hashkey;
u32 b_h = ((struct ip_vs_sch_bucket *)b)->hashkey;
return (a_h > b_h) - (a_h < b_h);
}
/*
* Simple swapping of two ip_vs_sch_buckets.
*
* Should not have overflow or underflow issues.
*/
static inline void ip_vs_sch_swap(void *a, void *b, int size)
{
struct ip_vs_sch_bucket *a_bucket = (struct ip_vs_sch_bucket *) a;
struct ip_vs_sch_bucket *b_bucket = (struct ip_vs_sch_bucket *) b;
u32 t_hashkey = a_bucket->hashkey;
struct ip_vs_dest *t_dest = a_bucket->dest;
a_bucket->hashkey = b_bucket->hashkey;
b_bucket->hashkey = t_hashkey;
a_bucket->dest = b_bucket->dest;
b_bucket->dest = t_dest;
}
/*
* Returns hash value for IPVS SCH entry
*/
static inline u32
ip_vs_sch_hashkey(int af, const union nf_inet_addr *addr, __be16 port)
{
__be32 addr_fold = addr->ip;
#ifdef CONFIG_IP_VS_IPV6
if (af == AF_INET6)
addr_fold = addr->ip6[0]^addr->ip6[1]^
addr->ip6[2]^addr->ip6[3];
#endif
return murmur_32((ntohs(port) + ntohl(addr_fold)));
}
/*
* Get ip_vs_dest associated with supplied parameters using a consistent hash.
*
* The strategy is to find the index that corresponds to the clockwise closest hash
* entry. If the server corresponding to the selected index is unavailable, continue
* clockwise, looking for an available server. If no servers are available, return NULL.
*/
static inline struct ip_vs_dest *
ip_vs_sch_get(struct ip_vs_service *svc, struct ip_vs_sch_state *s,
const union nf_inet_addr *addr, __be16 port)
{
struct ip_vs_sch_table *table = rcu_dereference(s->table);
struct ip_vs_sch_bucket key = {ip_vs_sch_hashkey(svc->af, addr, port), NULL};
u32 length = table->length;
u32 index = clockwise_index(&key, table->buckets, length, sizeof(struct ip_vs_sch_bucket),
ip_vs_sch_cmp);
u32 offset, rindex;
struct ip_vs_dest *dest;
for (offset = 0; offset < length; ++offset) {
rindex = (offset + index) % length;
dest = table->buckets[rindex].dest;
if (!dest)
break;
if (!is_unavailable(dest))
return dest;
IP_VS_DBG_BUF(6, "SCH: selected unavailable "
"server %s:%d (index %d), reselecting",
IP_VS_DBG_ADDR(svc->af, &dest->addr),
ntohs(dest->port), rindex);
}
return NULL;
}
static void ip_vs_sch_table_reclaim(struct rcu_head *rp)
{
struct ip_vs_sch_table *t;
int i;
t = container_of(rp, struct ip_vs_sch_table, rcu_head);
for (i=0; i < t->length; ++i) {
ip_vs_dest_put(t->buckets[i].dest);
}
IP_VS_DBG(6, "Freed old SCH hash table (memory=%Zdbytes) for current service\n",
sizeof(struct ip_vs_sch_bucket) * (t->length));
kfree(t->buckets);
kfree(t);
}
/*
* Assign all the hash buckets of the specified table with the service.
*/
static int
ip_vs_sch_reassign(struct ip_vs_sch_state *s, struct ip_vs_service *svc)
{
int i;
struct ip_vs_sch_table *nt, *ot;
struct ip_vs_sch_bucket *b;
struct list_head *p;
struct ip_vs_dest *dest;
u32 hashkey;
int weight, num_dests;
p = &svc->destinations;
num_dests = 0;
while ((p = p->next) != &svc->destinations) {
dest = list_entry(p, struct ip_vs_dest, n_list);
weight = atomic_read(&dest->weight);
num_dests += weight < 1 ? 1 : weight;
}
nt = kzalloc(sizeof(struct ip_vs_sch_table), GFP_KERNEL);
if (nt == NULL)
return -ENOMEM;
nt->buckets = kzalloc(sizeof(struct ip_vs_sch_bucket)*num_dests, GFP_KERNEL);
if (nt->buckets == NULL) {
kfree(nt);
return -ENOMEM;
}
IP_VS_DBG(6, "SCH hash table (memory=%Zdbytes) allocated for current service\n",
sizeof(struct ip_vs_sch_bucket)*num_dests);
nt->length = num_dests;
p = &svc->destinations;
b = nt->buckets;
while ((p = p->next) != &svc->destinations) {
dest = list_entry(p, struct ip_vs_dest, n_list);
weight = atomic_read(&dest->weight);
i = 0;
hashkey = ip_vs_sch_hashkey(svc->af, &dest->addr, dest->port);
do {
ip_vs_dest_hold(dest);
b->dest = dest;
b->hashkey = hashkey;
hashkey = murmur_32(hashkey);
IP_VS_DBG_BUF(6, "assigned i: %d dest: %s weight: %d hash: %x\n",
i, IP_VS_DBG_ADDR(svc->af, &dest->addr), weight, b->hashkey);
++b;
} while (++i < weight);
}
sort(nt->buckets, nt->length, sizeof(struct ip_vs_sch_bucket),
ip_vs_sch_cmp, ip_vs_sch_swap);
ot = rcu_dereference_protected(s->table, 1);
RCU_INIT_POINTER(s->table, nt);
if (ot != NULL)
call_rcu(&ot->rcu_head, ip_vs_sch_table_reclaim);
return 0;
}
static int ip_vs_sch_init_svc(struct ip_vs_service *svc)
{
struct ip_vs_sch_state *s;
/* allocate the SCH table for this service */
s = kzalloc(sizeof(struct ip_vs_sch_state), GFP_KERNEL);
if (s == NULL)
return -ENOMEM;
svc->sched_data = s;
/* assign the hash buckets with current dests */
ip_vs_sch_reassign(s, svc);
return 0;
}
static void ip_vs_sch_done_svc(struct ip_vs_service *svc)
{
struct ip_vs_sch_state *s = svc->sched_data;
struct ip_vs_sch_table *t = rcu_dereference_protected(s->table, 1);
/* release the table itself */
call_rcu(&t->rcu_head, ip_vs_sch_table_reclaim);
kfree_rcu(s, rcu_head);
}
static int ip_vs_sch_dest_changed(struct ip_vs_service *svc,
struct ip_vs_dest *dest)
{
struct ip_vs_sch_state *s = svc->sched_data;
/* assign the hash buckets with the updated service */
ip_vs_sch_reassign(s, svc);
return 0;
}
/* Helper function to get port number */
static inline __be16
ip_vs_sch_get_port(const struct sk_buff *skb, struct ip_vs_iphdr *iph)
{
__be16 port;
struct tcphdr _tcph, *th;
struct udphdr _udph, *uh;
sctp_sctphdr_t _sctph, *sh;
switch (iph->protocol) {
case IPPROTO_TCP:
th = skb_header_pointer(skb, iph->len, sizeof(_tcph), &_tcph);
if (unlikely(th == NULL))
return 0;
port = th->source;
break;
case IPPROTO_UDP:
uh = skb_header_pointer(skb, iph->len, sizeof(_udph), &_udph);
if (unlikely(uh == NULL))
return 0;
port = uh->source;
break;
case IPPROTO_SCTP:
sh = skb_header_pointer(skb, iph->len, sizeof(_sctph), &_sctph);
if (unlikely(sh == NULL))
return 0;
port = sh->source;
break;
default:
port = 0;
}
return port;
}
/*
* Source Consistent Hashing scheduling
*/
static struct ip_vs_dest *
ip_vs_sch_schedule(struct ip_vs_service *svc, const struct sk_buff *skb,
struct ip_vs_iphdr *iph)
{
struct ip_vs_dest *dest;
struct ip_vs_sch_state *s;
__be16 port = 0;
IP_VS_DBG(6, "ip_vs_sch_schedule(): Scheduling...\n");
if (svc->flags & IP_VS_SVC_F_SCHED2)
port = ip_vs_sch_get_port(skb, iph);
s = (struct ip_vs_sch_state *) svc->sched_data;
dest = ip_vs_sch_get(svc, s, &iph->saddr, port);
if (!dest) {
ip_vs_scheduler_err(svc, "no destination available");
return NULL;
}
IP_VS_DBG_BUF(6, "SCH: source IP address %s --> server %s:%d\n",
IP_VS_DBG_ADDR(svc->af, &iph->saddr),
IP_VS_DBG_ADDR(svc->af, &dest->addr),
ntohs(dest->port));
return dest;
}
/*
* IPVS SCH Scheduler structure
*/
static struct ip_vs_scheduler ip_vs_sch_scheduler =
{
.name = "ip_vs_sch",
.refcnt = ATOMIC_INIT(0),
.module = THIS_MODULE,
.n_list = LIST_HEAD_INIT(ip_vs_sch_scheduler.n_list),
.init_service = ip_vs_sch_init_svc,
.done_service = ip_vs_sch_done_svc,
.add_dest = ip_vs_sch_dest_changed,
.del_dest = ip_vs_sch_dest_changed,
.upd_dest = ip_vs_sch_dest_changed,
.schedule = ip_vs_sch_schedule,
};
static int __init ip_vs_sch_init(void)
{
IP_VS_DBG(6, "ip_vs_sch_init(): Initializing...\n");
return register_ip_vs_scheduler(&ip_vs_sch_scheduler);
}
static void __exit ip_vs_sch_cleanup(void)
{
IP_VS_DBG(6, "ip_vs_sch_cleanup(): Cleaning up...\n");
unregister_ip_vs_scheduler(&ip_vs_sch_scheduler);
synchronize_rcu();
}
module_init(ip_vs_sch_init);
module_exit(ip_vs_sch_cleanup);
MODULE_LICENSE("GPL");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment