• 用你熟悉的编程语言实现一致性 hash 算法。
  • 编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。

代码只测试用,大部分功能耦合在一个类里,需要后续重构

package com.zhk.consistentHash;

public interface Ihash {
    int hash (String o);
}
package com.zhk.consistentHash;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

/**
 * @author zhuhk
 * @create 2020-07-06 8:34 下午
 * @Version 1.0
 **/
public class KetamaHashStrategy implements  Ihash{

    private static MessageDigest md5Digest;

    static {
        try {
            md5Digest = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("MD5 not supported", e);
        }
    }

    @Override
    public int hash(String origin) {
        byte[] bKey = computeMd5(origin);
        long rv = ((long) (bKey[3] & 0xFF) << 24)
                | ((long) (bKey[2] & 0xFF) << 16)
                | ((long) (bKey[1] & 0xFF) << 8)
                | (bKey[0] & 0xFF);
        return (int) (rv & 0xffffffffL);
    }

    /**
     * Get the md5 of the given key.
     */
    public static byte[] computeMd5(String k) {
        MessageDigest md5;
        try {
            md5 = (MessageDigest) md5Digest.clone();
        } catch (CloneNotSupportedException e) {
            throw new RuntimeException("clone of MD5 not supported", e);
        }
        md5.update(k.getBytes());
        return md5.digest();
    }
}
package com.zhk.consistentHash;

import java.util.*;

/**
 * @author zhuhk
 * @create 2020-07-06 8:34 下午
 * @Version 1.0
 **/
public class ConsistentHashWithVN {
    private Ihash myHash;

    ConsistentHashWithVN(Ihash ihash) {
        this.myHash = ihash;
    }

    private static final int VN_SUM = 100;//虚拟节点个数
    private static final int DATA_ACCOUNT = 1000000;//100w kv数据
    private static List<String> realNode = new ArrayList<>();
    private static SortedMap<Integer, String> sortedMap = new TreeMap<>();
    private static SortedMap<Integer, String> newSortedMap = new TreeMap<>();

    private List<Integer> initData () {
        List<Integer> datas = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < DATA_ACCOUNT; i++  ) {
            datas.add(random.nextInt());
        }
        return datas;
    }

    public void initNode(int n , SortedMap sortedMap) {
        for (int i = 0; i < n; i++) {
            realNode.add("192.168.0." + i);
        }
        for (String s : realNode) {
            for (int i = 0; i < VN_SUM; i++) {
                String virtualNodeName = s + "VN" + String.valueOf(i);
                int key = myHash.hash(virtualNodeName);
                sortedMap.put(key, s);
            }
        }
    }

    public String getNode(String keyString, SortedMap<Integer,String> sortedMap) {
        if (sortedMap == null) return null;
        int hash = myHash.hash(keyString);
        SortedMap<Integer, String> subMap = sortedMap.tailMap(hash);
        hash = subMap.isEmpty() ? sortedMap.firstKey() : subMap.firstKey();
        return sortedMap.get(hash);
    }
/**
 * @Author zhuhk
 * @Description // 统计工具
 * @Date 4:43 下午 2020/7/7
 * @Param
 * @return
**/
    public static void analyze(Map<String, Integer> nodeCount, int dataNum, int nodeNum) {
        double average = (double) dataNum / nodeNum;

        IntSummaryStatistics s1
                = nodeCount.values().stream().mapToInt(Integer::intValue).summaryStatistics();
        int max = s1.getMax();
        int min = s1.getMin();
        int range = max - min;
        double standardDeviation
                = nodeCount.values().stream().mapToDouble(n -> Math.abs(n - average)).summaryStatistics().getAverage();

        System.out.println(String.format("平均值:%.2f", average));
        System.out.println(String.format("最大值:%d,(%.2f%%)", max, 100.0 * max / average));
        System.out.println(String.format("最小值:%d,(%.2f%%)", min, 100.0 * min / average));
        System.out.println(String.format("极差:%d,(%.2f%%)", range, 100.0 * range / average));
        System.out.println(String.format("标准差:%.2f,(%.2f%%)", standardDeviation, 100.0 * standardDeviation / average));
    }



    public static void main(String[] args) {
        ConsistentHashWithVN consistentHashWithVN = new ConsistentHashWithVN(new KetamaHashStrategy());
        List<Integer> datas = consistentHashWithVN.initData();
        consistentHashWithVN.initNode(10,sortedMap);

        Map<String,Integer> dataCount = new HashMap<>();
        for (Integer data: datas) {
            String node  = consistentHashWithVN.getNode(data.toString(),sortedMap);
         //   System.out.println("数据"+data.toString() + "分配到了节点:" + node );
            if (dataCount.containsKey(node)) {
                dataCount.put(node,dataCount.get(node) + 1);
            } else {
                dataCount.put(node,1);
            }
        }
        analyze(dataCount,DATA_ACCOUNT,10);

        //计算节点数据迁移量
        int migrateCount = 0;
        consistentHashWithVN.initNode(12,newSortedMap);
        //consistentHashWithVN.initData(11);
        for (Integer data : datas) {
            String node = consistentHashWithVN.getNode(data.toString(),sortedMap );
            String newNode = consistentHashWithVN.getNode(data.toString(),newSortedMap );
            if (!node.equals(newNode)) {
                migrateCount++;
            }
        }
        System.out.println(String.format("数据迁移量:%d(%.2f%%)", migrateCount, migrateCount * 100.0 / datas.size()));


    }

}

运行结果如下:


Connected to the target VM, address: '127.0.0.1:58246', transport: 'socket'
平均值:100000.00
最大值:119258,(119.26%)
最小值:80506,(80.51%)
极差:38752,(38.75%)
标准差:7590.60,(7.59%)
数据迁移量:159123(15.91%)
Disconnected from the target VM, address: '127.0.0.1:58246', transport: 'socket'

Process finished with exit code 0