
    2Bf                     ~    d dl Z d dlZ	 d dlZddlmZ  e j                  d      Z G d de      Zy# e$ r dZY -w xY w)    N   )PubSubManagersocketioc                   >     e Zd ZdZdZ	 	 d fd	Zd Zd Zd Z xZ	S )KafkaManageraI  Kafka based client manager.

    This class implements a Kafka backend for event sharing across multiple
    processes.

    To use a Kafka backend, initialize the :class:`Server` instance as
    follows::

        url = 'kafka://hostname:port'
        server = socketio.Server(client_manager=socketio.KafkaManager(url))

    :param url: The connection URL for the Kafka server. For a default Kafka
                store running on the same host, use ``kafka://``.
    :param channel: The channel name (topic) on which the server sends and
                    receives notifications. Must be the same in all the
                    servers.
    :param write_only: If set to ``True``, only initialize to emit events. The
                       default of ``False`` initializes the class for emitting
                       and receiving.
    kafkac                    t         t        d      t        t        |   ||       |dk7  r|dd  nd| _        t        j                  | j
                        | _        t        j                  | j                  | j
                        | _
        y )NzZkafka-python package is not installed (Run "pip install kafka-python" in your virtualenv).)channel
write_onlyzkafka://   zlocalhost:9092)bootstrap_servers)r   RuntimeErrorsuperr   __init__	kafka_urlKafkaProducerproducerKafkaConsumerr
   consumer)selfurlr
   r   	__class__s       V/var/www/highfloat_scraper/venv/lib/python3.12/site-packages/socketio/kafka_manager.pyr   zKafkaManager.__init__%   s    =  . / / 	lD*76@ 	+ 	B %(:$5QR;K++dnnM++DLL>BnnN    c                     | j                   j                  | j                  t        j                  |             | j                   j                          y )N)value)r   sendr
   pickledumpsflush)r   datas     r   _publishzKafkaManager._publish4   s6    4<<v||D/ABr   c              #   6   K   | j                   D ]  }|  y wN)r   r   messages     r   _kafka_listenzKafkaManager._kafka_listen8   s     }} 	GM	s   c              #      K   | j                         D ]=  }|j                  | j                  k(  st        j                  |j
                         ? y wr$   )r'   topicr
   r   loadsr   r%   s     r   _listenzKafkaManager._listen<   sA     ))+ 	2G}},ll7==11	2s
   -A$A)zkafka://localhost:9092r   F)
__name__
__module____qualname____doc__namer   r"   r'   r+   __classcell__)r   s   @r   r   r      s*    ( D=G!N2r   r   )	loggingr   r   ImportErrorpubsub_managerr   	getLoggerloggerr    r   r   <module>r8      sL      *			:	&12= 12  Es   2 <<