""" @FileName:3666.py @Description:PyCharm @Author:Jerry Pan @Time:2023/5/3 17:50 @Department:技术部 @Copyright:©2019-2023 """ import time import logging import re class InputError(Exception): """Exception raised for errors in the input. Attributes: expression -- input expression in which the error occurred message -- explanation of the error """ def __init__(self, expression, message): self.expression = expression self.message = message class InvalidUserAgentError(Exception): def __init__( self, message=None, ): self.message = message class InvalidSystemClock(Exception): def __init__( self, message=None, ): self.message = message class IdWorker(object): def __init__(self, worker_id, data_center_id): self.worker_id = worker_id self.data_center_id = data_center_id self.user_agent_parser = re.compile("[a-zA-Z][a-zA-Z\-0-9]*") self.logger = logging.getLogger("idworker") # stats self.ids_generated = 0 self.twepoch = 1483200000000 self.sequence = 0 self.worker_id_bits = 5 self.data_center_id_bits = 5 self.max_worker_id = -1 ^ (-1 << self.worker_id_bits) self.max_data_center_id = -1 ^ (-1 << self.data_center_id_bits) self.sequence_bits = 12 self.worker_id_shift = self.sequence_bits self.data_center_id_shift = self.sequence_bits + self.worker_id_bits self.timestamp_left_shift = ( self.sequence_bits + self.worker_id_bits + self.data_center_id_bits ) self.sequence_mask = -1 ^ (-1 << self.sequence_bits) self.last_timestamp = -1 # Sanity check for worker_id if self.worker_id > self.max_worker_id or self.worker_id < 0: raise InputError( "worker_id", "worker id can't be greater than %i or less than 0" % self.max_worker_id, ) if self.data_center_id > self.max_data_center_id or self.data_center_id < 0: raise InputError( "data_center_id", "data center id can't be greater than %i or less than 0" % self.max_data_center_id, ) self.logger.info( "worker starting. timestamp left shift %d, data center id bits %d, worker id bits %d, sequence bits %d, worker id %d" % ( self.timestamp_left_shift, self.data_center_id_bits, self.worker_id_bits, self.sequence_bits, self.worker_id, ) ) def _time_gen(self): return int(time.time() * 1000) def _till_next_millis(self, last_timestamp): timestamp = self._time_gen() while timestamp <= last_timestamp: timestamp = self._time_gen() return timestamp def _next_id(self): timestamp = self._time_gen() if self.last_timestamp > timestamp: self.logger.warning( "clock is moving backwards. Rejecting request until %i" % self.last_timestamp ) raise InvalidSystemClock( "Clock moved backwards. Refusing to generate id for %i milliseocnds" % self.last_timestamp ) if self.last_timestamp == timestamp: self.sequence = (self.sequence + 1) & self.sequence_mask if self.sequence == 0: timestamp = self._till_next_millis(self.last_timestamp) else: self.sequence = 0 self.last_timestamp = timestamp new_id = ( ((timestamp - self.twepoch) << self.timestamp_left_shift) | (self.data_center_id << self.data_center_id_shift) | (self.worker_id << self.worker_id_shift) | self.sequence ) self.ids_generated += 1 return new_id def _valid_user_agent(self, user_agent): return self.user_agent_parser.search(user_agent) is not None def get_worker_id(self): return self.worker_id def get_timestamp(self): return self._time_gen() def get_id(self): new_id = self._next_id() self.logger.debug( "id: %i worker_id: %i data_center_id: %i" % (new_id, self.worker_id, self.data_center_id) ) return new_id def get_datacenter_id(self): return self.data_center_id if __name__ == "__main__": a = time.time() id_worker = IdWorker(2, 3) new_id = id_worker.get_id() print(new_id) s = set() for _ in range(1000000): s.add(id_worker.get_id()) print(len(s)) print(time.time() - a) # import uuid # # a = time.time() # s = set() # print(uuid.uuid4().hex) # for _ in range(1000000): # s.add(uuid.uuid4().hex) # print(len(s)) # print(time.time() - a)