- 用你熟悉的编程语言实现一致性 hash 算法。
- 编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。
代码只测试用,大部分功能耦合在一个类里,需要后续重构
package com.zhk.consistentHash;
public interface Ihash {
int hash (String o);
}
package com.zhk.consistentHash;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
/**
* @author zhuhk
* @create 2020-07-06 8:34 下午
* @Version 1.0
**/
public class KetamaHashStrategy implements Ihash{
private static MessageDigest md5Digest;
static {
try {
md5Digest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 not supported", e);
}
}
@Override
public int hash(String origin) {
byte[] bKey = computeMd5(origin);
long rv = ((long) (bKey[3] & 0xFF) << 24)
| ((long) (bKey[2] & 0xFF) << 16)
| ((long) (bKey[1] & 0xFF) << 8)
| (bKey[0] & 0xFF);
return (int) (rv & 0xffffffffL);
}
/**
* Get the md5 of the given key.
*/
public static byte[] computeMd5(String k) {
MessageDigest md5;
try {
md5 = (MessageDigest) md5Digest.clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException("clone of MD5 not supported", e);
}
md5.update(k.getBytes());
return md5.digest();
}
}
package com.zhk.consistentHash;
import java.util.*;
/**
* @author zhuhk
* @create 2020-07-06 8:34 下午
* @Version 1.0
**/
public class ConsistentHashWithVN {
private Ihash myHash;
ConsistentHashWithVN(Ihash ihash) {
this.myHash = ihash;
}
private static final int VN_SUM = 100;//虚拟节点个数
private static final int DATA_ACCOUNT = 1000000;//100w kv数据
private static List<String> realNode = new ArrayList<>();
private static SortedMap<Integer, String> sortedMap = new TreeMap<>();
private static SortedMap<Integer, String> newSortedMap = new TreeMap<>();
private List<Integer> initData () {
List<Integer> datas = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < DATA_ACCOUNT; i++ ) {
datas.add(random.nextInt());
}
return datas;
}
public void initNode(int n , SortedMap sortedMap) {
for (int i = 0; i < n; i++) {
realNode.add("192.168.0." + i);
}
for (String s : realNode) {
for (int i = 0; i < VN_SUM; i++) {
String virtualNodeName = s + "VN" + String.valueOf(i);
int key = myHash.hash(virtualNodeName);
sortedMap.put(key, s);
}
}
}
public String getNode(String keyString, SortedMap<Integer,String> sortedMap) {
if (sortedMap == null) return null;
int hash = myHash.hash(keyString);
SortedMap<Integer, String> subMap = sortedMap.tailMap(hash);
hash = subMap.isEmpty() ? sortedMap.firstKey() : subMap.firstKey();
return sortedMap.get(hash);
}
/**
* @Author zhuhk
* @Description // 统计工具
* @Date 4:43 下午 2020/7/7
* @Param
* @return
**/
public static void analyze(Map<String, Integer> nodeCount, int dataNum, int nodeNum) {
double average = (double) dataNum / nodeNum;
IntSummaryStatistics s1
= nodeCount.values().stream().mapToInt(Integer::intValue).summaryStatistics();
int max = s1.getMax();
int min = s1.getMin();
int range = max - min;
double standardDeviation
= nodeCount.values().stream().mapToDouble(n -> Math.abs(n - average)).summaryStatistics().getAverage();
System.out.println(String.format("平均值:%.2f", average));
System.out.println(String.format("最大值:%d,(%.2f%%)", max, 100.0 * max / average));
System.out.println(String.format("最小值:%d,(%.2f%%)", min, 100.0 * min / average));
System.out.println(String.format("极差:%d,(%.2f%%)", range, 100.0 * range / average));
System.out.println(String.format("标准差:%.2f,(%.2f%%)", standardDeviation, 100.0 * standardDeviation / average));
}
public static void main(String[] args) {
ConsistentHashWithVN consistentHashWithVN = new ConsistentHashWithVN(new KetamaHashStrategy());
List<Integer> datas = consistentHashWithVN.initData();
consistentHashWithVN.initNode(10,sortedMap);
Map<String,Integer> dataCount = new HashMap<>();
for (Integer data: datas) {
String node = consistentHashWithVN.getNode(data.toString(),sortedMap);
// System.out.println("数据"+data.toString() + "分配到了节点:" + node );
if (dataCount.containsKey(node)) {
dataCount.put(node,dataCount.get(node) + 1);
} else {
dataCount.put(node,1);
}
}
analyze(dataCount,DATA_ACCOUNT,10);
//计算节点数据迁移量
int migrateCount = 0;
consistentHashWithVN.initNode(12,newSortedMap);
//consistentHashWithVN.initData(11);
for (Integer data : datas) {
String node = consistentHashWithVN.getNode(data.toString(),sortedMap );
String newNode = consistentHashWithVN.getNode(data.toString(),newSortedMap );
if (!node.equals(newNode)) {
migrateCount++;
}
}
System.out.println(String.format("数据迁移量:%d(%.2f%%)", migrateCount, migrateCount * 100.0 / datas.size()));
}
}
运行结果如下:
Connected to the target VM, address: '127.0.0.1:58246', transport: 'socket'
平均值:100000.00
最大值:119258,(119.26%)
最小值:80506,(80.51%)
极差:38752,(38.75%)
标准差:7590.60,(7.59%)
数据迁移量:159123(15.91%)
Disconnected from the target VM, address: '127.0.0.1:58246', transport: 'socket'
Process finished with exit code 0