Skip to content

Instantly share code, notes, and snippets.

@Gozala
Created June 29, 2018 17:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Gozala/17a99fffe6838b95fdb7ede312bea480 to your computer and use it in GitHub Desktop.
Save Gozala/17a99fffe6838b95fdb7ede312bea480 to your computer and use it in GitHub Desktop.
// @flow
const EventEmitter = require("events")
const Buffer = require("buffer").Buffer
module.exports = UPDSocket => {
/*::
interface SocketOptions {
type: 'udp4' | 'udp6';
reuseAddr?: boolean;
recvBufferSize?:number;
sendBufferSize?:number;
lookup:?(hostname, (?Error, string, string) => void;
}
*/
class ERR_INVALID_ARG_TYPE extends TypeError {
constructor(name, expected, actual) {
// determiner: 'must be' or 'must not be'
let determiner
if (typeof expected === "string" && expected.startsWith("not ")) {
determiner = "must not be"
expected = expected.replace(/^not /, "")
} else {
determiner = "must be"
}
let msg
if (name.endsWith(" argument")) {
// For cases like 'first argument'
msg = `The ${name} ${determiner} ${oneOf(expected, "type")}`
} else {
const type = name.includes(".") ? "property" : "argument"
msg = `The "${name}" ${type} ${determiner} ${oneOf(expected, "type")}`
}
// TODO(BridgeAR): Improve the output by showing `null` and similar.
msg += `. Received type ${typeof actual}`
super(this.message)
}
}
class ERR_MISSING_ARGS extends TypeError {
constructor(...args) {
let msg = "The "
const len = args.length
args = args.map(a => `"${a}"`)
switch (len) {
case 1:
msg += `${args[0]} argument`
break
case 2:
msg += `${args[0]} and ${args[1]} arguments`
break
default:
msg += args.slice(0, len - 1).join(", ")
msg += `, and ${args[len - 1]} arguments`
break
}
super(`${msg} must be specified`)
}
}
class ERR_SOCKET_ALREADY_BOUND extends Error {
constructor() {
super("Socket is already bound")
}
}
class ERR_SOCKET_BAD_BUFFER_SIZE extends TypeError {
constructor() {
super("Buffer size must be a positive integer")
}
}
class ERR_SOCKET_BAD_PORT extends RangeError {
constructor(port) {
super("Port should be > 0 and < 65536. Received ${port}.")
}
}
class ERR_SOCKET_BAD_TYPE extends TypeError {
constructor() {
super("Bad socket type specified. Valid types are: udp4, udp6")
}
}
class ERR_SOCKET_BUFFER_SIZE extends Error {
constructor() {
super("Could not get or set buffer size")
}
}
class ERR_SOCKET_CANNOT_SEND extends Error {
constructor() {
super("Unable to send data")
}
}
class ERR_SOCKET_DGRAM_NOT_RUNNING extends Error {
constructor() {
super("Not running")
}
}
const errnoException = (err, syscall, original) => {
const code = err.message
const message = original
? `${syscall} ${code} ${original}`
: `${syscall} ${code}`
// eslint-disable-next-line no-restricted-syntax
const ex = new Error(message)
// TODO(joyeecheung): errno is supposed to err, like in uvException
ex.code = ex.errno = code
ex.syscall = syscall
return ex
}
function sliceBuffer(buffer, offset, length) {
if (typeof buffer === "string") {
buffer = Buffer.from(buffer)
} else if (!isUint8Array(buffer)) {
throw new ERR_INVALID_ARG_TYPE(
"buffer",
["Buffer", "Uint8Array", "string"],
buffer
)
}
offset = offset >>> 0
length = length >>> 0
return buffer.slice(offset, offset + length)
}
function fixBufferList(list) {
const newlist = new Array(list.length)
for (var i = 0, l = list.length; i < l; i++) {
var buf = list[i]
if (typeof buf === "string") newlist[i] = Buffer.from(buf)
else if (!isUint8Array(buf)) return null
else newlist[i] = buf
}
return newlist
}
class Socket extends EventEmitter {
_healthCheck() {
if (!this._handle) {
// Error message from dgram_legacy.js.
throw new ERR_SOCKET_DGRAM_NOT_RUNNING()
}
}
send(message, offset, length, port, host, callback) {
let list
if (address || (port && typeof port !== "function")) {
buffer = sliceBuffer(buffer, offset, length)
} else {
callback = port
port = offset
address = length
}
if (!Array.isArray(buffer)) {
if (typeof buffer === "string") {
list = [Buffer.from(buffer)]
} else if (!isUint8Array(buffer)) {
throw new ERR_INVALID_ARG_TYPE(
"buffer",
["Buffer", "Uint8Array", "string"],
buffer
)
} else {
list = [buffer]
}
} else if (!(list = fixBufferList(buffer))) {
throw new ERR_INVALID_ARG_TYPE(
"buffer list arguments",
["Buffer", "string"],
buffer
)
}
port = port >>> 0
if (port === 0 || port > 65535) throw new ERR_SOCKET_BAD_PORT(port)
// Normalize callback so it's either a function or undefined but not anything
// else.
if (typeof callback !== "function") callback = undefined
if (typeof address === "function") {
callback = address
address = undefined
} else if (address && typeof address !== "string") {
throw new ERR_INVALID_ARG_TYPE("address", ["string", "falsy"], address)
}
this._healthCheck()
if (this._bindState === BIND_STATE_UNBOUND) {
this.bind({ port: 0, exclusive: true }, null)
}
if (list.length === 0) {
list.push(Buffer.alloc(0))
}
this.schedule(new Send(this, list, port, address, callback))
this.awake()
}
close(callback) {
if (typeof callback === "function") {
this.on("close", callback)
}
this.schedule(new Close(this))
this.awake()
}
address() {
this._healthCheck()
return this._handle.address
}
setMulticastLoopback(flag /*:boolean*/) {
this._healthCheck()
try {
UPDSocket.setMulticastLoopback(this._handle, boolean)
} catch (error) {
throw errnoException(error, "setMulticastLoopback")
}
}
setMulticastInterface(interfaceAddress /*:string*/) {
this._healthCheck()
if (typeof interfaceAddress !== "string") {
throw new ERR_INVALID_ARG_TYPE(
"interfaceAddress",
"string",
interfaceAddress
)
}
try {
UPDSocket.setMulticastInterface(this._handle, interfaceAddress)
} catch (error) {
throw errnoException(error, "setMulticastInterface")
}
}
addMembership(
multicastAddress /*:string*/,
interfaceAddress /*::?:string*/
) {
this._healthCheck()
if (!multicastAddress) {
throw new ERR_MISSING_ARGS("multicastAddress")
}
try {
UPDSocket.addMembership(
this._handle,
multicastAddress,
interfaceAddress
)
} catch (error) {
throw errnoException(error, "addMembership")
}
}
dropMembership(
multicastAddress /*:string*/,
interfaceAddress /*::?:string*/
) {
this._healthCheck()
if (!multicastAddress) {
throw new ERR_MISSING_ARGS("multicastAddress")
}
try {
UPDSocket.dropMembership(
this._handle,
multicastAddress,
interfaceAddress
)
} catch (error) {
throw errnoException(error, "dropMembership")
}
}
bind(port, address, callback) {
if (this.socket == null) {
this.createSocket(
{
port: port === 0 ? undefined : port,
host: address
},
callback
)
} else {
throw Error("Can't call bind on socket more than once")
}
}
async createSocket(options, callback) {
try {
const socket = await UDPSocket.create(options)
this.socket = socket
this.emit("listening", this)
callback(null)
} catch (error) {
this.emit("error", error)
callback(error)
}
}
}
class Send {
constructor(socket, list, port, address, callback) {
this.socket = socket
this.list = list
this.port = port
this.address = address
this.callback = callback
}
async perform() {
const { socket, list, port, address, callback } = this
const { _handle } = socket
if (_handle) {
try {
for (const { buffer } of list) {
await UDPSocket.send(_handle, address, port, buffer)
}
callback(null)
} catch (error) {
if (callback) {
callback(error)
}
}
}
}
}
class Close {
construrctor(socket) {
this.socket = socket
}
async perform() {
try {
const { socket } = this
socket._healthCheck()
const { _handle } = socket
socket._handle = null
await UDPSocket.close(_handle)
socket.emit("close")
} catch (error) {
socket.emit("error", error)
}
}
}
const createSocket = (options: SocketOptions) => {
return new Socket()
}
return { createSocket }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment