Files
habitica/website/server/libs/queue/index.js
Matteo Pagliazzi 70dec611e3 Make Kafka dep optional (#10129)
* Made kafka optional

* make kafka optional
2018-03-14 18:21:39 +01:00

60 lines
1.6 KiB
JavaScript

import nconf from 'nconf';
import logger from '../logger';
const GROUP_ID = nconf.get('KAFKA:GROUP_ID');
const CLOUDKARAFKA_BROKERS = nconf.get('KAFKA:CLOUDKARAFKA_BROKERS');
const CLOUDKARAFKA_USERNAME = nconf.get('KAFKA:CLOUDKARAFKA_USERNAME');
const CLOUDKARAFKA_PASSWORD = nconf.get('KAFKA:CLOUDKARAFKA_PASSWORD');
const CLOUDKARAFKA_TOPIC_PREFIX = nconf.get('KAFKA:CLOUDKARAFKA_TOPIC_PREFIX');
const prefix = CLOUDKARAFKA_TOPIC_PREFIX;
const topic = `${prefix}-default`;
let Kafka;
let producer;
try {
Kafka = require('node-rdkafka'); // eslint-disable-line global-require
} catch (er) {
Kafka = null;
}
function createProducer () {
const kafkaConf = {
'group.id': GROUP_ID,
'metadata.broker.list': CLOUDKARAFKA_BROKERS ? CLOUDKARAFKA_BROKERS.split(',') : '',
'socket.keepalive.enable': true,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'SCRAM-SHA-256',
'sasl.username': CLOUDKARAFKA_USERNAME,
'sasl.password': CLOUDKARAFKA_PASSWORD,
debug: 'generic,broker,security',
};
producer = new Kafka.Producer(kafkaConf);
producer.connect();
}
if (Kafka && GROUP_ID && CLOUDKARAFKA_BROKERS && CLOUDKARAFKA_USERNAME && CLOUDKARAFKA_PASSWORD && CLOUDKARAFKA_TOPIC_PREFIX) {
createProducer();
}
process.on('exit', () => {
if (producer && producer.isConnected()) producer.disconnect();
});
const api = {};
api.sendMessage = function sendMessage (message, key) {
if (!producer || !producer.isConnected()) return;
try {
producer.produce(topic, -1, new Buffer(JSON.stringify(message)), key);
} catch (err) {
logger.error(err);
}
};
module.exports = api;