Added kafka queue and initial messages for delete account (#10036)

* Added kafka queue and initial messages for delete account

* Checked for env vars
This commit is contained in:
Keith Holliday
2018-02-27 09:57:37 -07:00
committed by GitHub
parent 7dcd550209
commit be71c5f844
5 changed files with 68 additions and 0 deletions

View File

@@ -105,5 +105,12 @@
"LOGGLY" : {
"TOKEN" : "example-token",
"SUBDOMAIN" : "exmaple-subdomain"
},
"KAFKA": {
"GROUP_ID": "",
"CLOUDKARAFKA_BROKERS": "",
"CLOUDKARAFKA_USERNAME": "",
"CLOUDKARAFKA_PASSWORD": "",
"CLOUDKARAFKA_TOPIC_PREFIX": ""
}
}

14
package-lock.json generated
View File

@@ -1945,6 +1945,11 @@
"resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-1.11.0.tgz",
"integrity": "sha1-RqoXUftqL5PuXmibsQh9SxTGwgU="
},
"bindings": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/bindings/-/bindings-1.3.0.tgz",
"integrity": "sha512-DpLh5EzMR2kzvX1KIlVC0VkC3iZtHKTgdtZ0a3pglBZdaQFjt5S9g9xd1lE+YvXyfd6mtCeRnrUfOLYiTMlNSw=="
},
"bitsyntax": {
"version": "0.0.4",
"resolved": "https://registry.npmjs.org/bitsyntax/-/bitsyntax-0.0.4.tgz",
@@ -13888,6 +13893,15 @@
}
}
},
"node-rdkafka": {
"version": "2.2.3",
"resolved": "https://registry.npmjs.org/node-rdkafka/-/node-rdkafka-2.2.3.tgz",
"integrity": "sha1-BdjK/brye/Ho7yuOW/Oa+1mi5wE=",
"requires": {
"bindings": "1.3.0",
"nan": "2.6.2"
}
},
"node-sass": {
"version": "4.7.2",
"resolved": "https://registry.npmjs.org/node-sass/-/node-sass-4.7.2.tgz",

View File

@@ -71,6 +71,7 @@
"morgan": "^1.7.0",
"nconf": "^0.10.0",
"node-gcm": "^0.14.4",
"node-rdkafka": "^2.2.3",
"node-sass": "^4.5.0",
"nodemailer": "^4.5.0",
"ora": "^2.0.0",

View File

@@ -16,6 +16,7 @@ import {
getUserInfo,
sendTxn as txnEmail,
} from '../../libs/email';
import Queue from '../../libs/queue';
import nconf from 'nconf';
import get from 'lodash/get';
@@ -432,6 +433,8 @@ api.deleteUser = {
]);
}
if (feedback) Queue.sendMessage({feedback, username: user.profile.name}, user._id);
res.analytics.track('account delete', {
uuid: user._id,
hitType: 'event',

View File

@@ -0,0 +1,43 @@
import Kafka from 'node-rdkafka';
import nconf from 'nconf';
const GROUP_ID = nconf.get('KAFKA:GROUP_ID');
const CLOUDKARAFKA_BROKERS = nconf.get('KAFKA:CLOUDKARAFKA_BROKERS');
const CLOUDKARAFKA_USERNAME = nconf.get('KAFKA:CLOUDKARAFKA_USERNAME');
const CLOUDKARAFKA_PASSWORD = nconf.get('KAFKA:CLOUDKARAFKA_PASSWORD');
const CLOUDKARAFKA_TOPIC_PREFIX = nconf.get('KAFKA:CLOUDKARAFKA_TOPIC_PREFIX');
const kafkaConf = {
'group.id': GROUP_ID,
'metadata.broker.list': CLOUDKARAFKA_BROKERS ? CLOUDKARAFKA_BROKERS.split(',') : '',
'socket.keepalive.enable': true,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'SCRAM-SHA-256',
'sasl.username': CLOUDKARAFKA_USERNAME,
'sasl.password': CLOUDKARAFKA_PASSWORD,
debug: 'generic,broker,security',
};
const prefix = CLOUDKARAFKA_TOPIC_PREFIX;
const topic = `${prefix}-default`;
const producer = new Kafka.Producer(kafkaConf);
producer.connect();
process.on('exit', () => {
if (producer.isConnected()) producer.disconnect();
});
const api = {};
api.sendMessage = function sendMessage (message, key) {
if (!producer.isConnected()) return;
try {
producer.produce(topic, -1, new Buffer(JSON.stringify(message)), key);
} catch (e) {
// @TODO: Send the to loggly?
}
};
module.exports = api;