o
    Ìv›a0  ã                   @   s`   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
Z
ddlZG dd„ deƒZdS )	zKafka interface class.é    N)Úlogger)Ú	iteritems)ÚGlancesExport)ÚKafkaProducerc                       s>   e Zd ZdZd‡ fdd„	Zdd„ Zdd„ Z‡ fd	d
„Z‡  ZS )ÚExportz+This class manages the Kafka export module.Nc                    s^   t t| ƒj||d d| _d| _d| _| jdg d¢ddgd| _| js(t 	d¡ |  
¡ | _dS )	zInit the Kafka export IF.)ÚconfigÚargsNÚkafka)ÚhostÚportÚtopicÚcompressionÚtags)ZmandatoriesÚoptionsé   )Úsuperr   Ú__init__r   r   r   Z	load_confÚexport_enableÚsysÚexitÚinitÚclient)Úselfr   r   ©Ú	__class__© ú?/usr/lib/python3/dist-packages/glances/exports/glances_kafka.pyr   %   s   ÿý
zExport.__init__c              
   C   sˆ   | j sdS d | j| j¡}zt|dd„ | jd}W n! ty: } zt d||f ¡ t	 
d¡ W Y d}~|S d}~ww t d| ¡ |S )	z(Init the connection to the Kafka server.Nz{}:{}c                 S   s   t  | ¡ d¡S )Núutf-8)ÚjsonÚdumpsÚencode)Úvr   r   r   Ú<lambda>F   s    zExport.init.<locals>.<lambda>)Zbootstrap_serversZvalue_serializerZcompression_typez&Cannot connect to Kafka server %s (%s)r   z Connected to the Kafka server %s)r   Úformatr
   r   r   r   Ú	Exceptionr   Zcriticalr   r   Úinfo)r   Z
server_uriÚsÚer   r   r   r   <   s    
þ€úzExport.initc              
   C   s”   t  d |¡¡ tt||ƒƒ}| jdur| |  | j¡¡ z| jj	| j
| d¡|d W dS  tyI } zt  d ||¡¡ W Y d}~dS d}~ww )z%Write the points to the kafka server.zExport {} stats to KafkaNr   )ÚkeyÚvaluez$Cannot export {} stats to Kafka ({}))r   Údebugr#   ÚdictÚzipr   ÚupdateZ
parse_tagsr   Úsendr   r    r$   Úerror)r   ÚnameÚcolumnsZpointsÚdatar'   r   r   r   ÚexportP   s   

ý €ÿzExport.exportc                    s&   | j  ¡  | j  ¡  tt| ƒ ¡  dS )zClose the Kafka export module.N)r   ÚflushÚcloser   r   r   )r   r   r   r   r   d   s   

zExport.exit)NN)	Ú__name__Ú
__module__Ú__qualname__Ú__doc__r   r   r3   r   Ú__classcell__r   r   r   r   r   !   s    r   )r9   r   Zglances.loggerr   Zglances.compatr   Zglances.exports.glances_exportr   r	   r   r   Úcodecsr   r   r   r   r   Ú<module>   s   