SnowFlake.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import time
  2. import logging
  3. # 64 位 id 的划分,通常机器位和数据位各为 5 位
  4. WORKER_ID_BITS = 5 # 机器位
  5. DATACENTER_ID_BITS = 5 # 数据位
  6. SEQUENCE_BITS = 12 # 循环位
  7. # 最大取值计算,计算机中负数表示为他的补码
  8. MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS) # 2**5 -1 =31
  9. MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS)
  10. # 移位偏移计算
  11. WORKER_ID_SHIFT = SEQUENCE_BITS
  12. DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS
  13. TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS
  14. # X序号循环掩码
  15. SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)
  16. # Twitter 元年时间戳
  17. TWEPOCH = 1288834974657
  18. logger = logging.getLogger('雪花算法')
  19. class IdWorker(object):
  20. '''
  21. 用于生成IDS.
  22. '''
  23. def __init__(self, datacenter_id, worker_id, sequence=0):
  24. '''
  25. 初始化方法
  26. :param datacenter_id:数据id
  27. :param worker_id:机器id
  28. :param sequence:序列码
  29. '''
  30. if worker_id > MAX_WORKER_ID or worker_id < 0:
  31. raise ValueError('worker_id 值越界')
  32. if datacenter_id > MAX_DATACENTER_ID or datacenter_id < 0:
  33. raise ValueError('datacenter_id 值越界')
  34. self.worker_id = worker_id
  35. self.datacenter_id = datacenter_id
  36. self.sequence = sequence
  37. self.last_timestamp = -1 # 上次计算的时间戳
  38. def get_id(self):
  39. '''
  40. 获取新的ID.
  41. :return:
  42. '''
  43. # 获取当前时间戳
  44. timestamp = int(time.time() * 1000)
  45. if timestamp == self.last_timestamp:
  46. # 同一毫秒的处理。
  47. self.sequence = (self.sequence + 1) & SEQUENCE_MASK
  48. if self.sequence == 0:
  49. timestamp = self._til_next_millis(self.last_timestamp)
  50. else:
  51. self.sequence = 0
  52. self.last_timestamp = timestamp
  53. new_id = (((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | (self.worker_id << WORKER_ID_SHIFT)) | self.sequence
  54. return new_id
  55. def _til_next_millis(self, last_timestamp):
  56. '''
  57. 等到下一毫秒。
  58. :param last_timestamp:
  59. :return:
  60. '''
  61. timestamp = int(time.time() * 1000)
  62. while timestamp <= last_timestamp:
  63. timestamp = int(time.time() * 1000)
  64. return timestamp
  65. if __name__ == '__main__':
  66. worker = IdWorker(1, 2, 0)
  67. print(worker.get_id())