Created
April 23, 2019 20:37
-
-
Save drochgenius/9e4c8476231552b86dc938c7dfef17bf to your computer and use it in GitHub Desktop.
Experimenting with Kafka : Simple Socket Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as dotenv from 'dotenv'; | |
import { Server, Message, WebSocketClient } from 'ws'; | |
import * as express from 'express'; | |
import { kafkaSubscribe } from './consumer'; | |
dotenv.config(); | |
const PORT: number = parseInt(process.env.PORT) || 3210; | |
const app = express(); | |
// Server static files | |
app.use(express.static('./')); | |
const server = new Server({ server: app.listen(PORT) }); | |
function send(message: Message): void { | |
server.clients.forEach( | |
(client: WebSocketClient): void => { | |
client.send(message.value); | |
} | |
); | |
} | |
server.on( | |
'connection', | |
(): void => { | |
// subscribe to the `test` stream | |
kafkaSubscribe( | |
'test', | |
(message: Message): void => { | |
send(message); | |
} | |
); | |
} | |
); | |
console.log(`Server listening: http://localhost:${PORT}`); |
Ohhh. you probably forgot to put callback here?
consumer.on('message', function (message) {
cb(message)
});
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
In your
kafkaSubscribe
function, there was no second parameter. How come there is a second parameter here?