|
|
'use strict';
const net = require('net'), tls = require('tls'), EventParser = require('../entities/EventParser.js'), Message = require('js-message'), fs = require('fs'), Queue = require('@node-ipc/js-queue'), Events = require('event-pubsub');
let eventParser = new EventParser();
class Client extends Events{ constructor(config,log){ super(); Object.assign( this, { Client : Client, config : config, queue : new Queue, socket : false, connect : connect, emit : emit, log : log, retriesRemaining:config.maxRetries||0, explicitlyDisconnected: false } );
eventParser=new EventParser(this.config); } }
function emit(type,data){ this.log('dispatching event to ', this.id, this.path, ' : ', type, ',', data);
let message=new Message; message.type=type; message.data=data;
if(this.config.rawBuffer){ message=Buffer.from(type,this.config.encoding); }else{ message=eventParser.format(message); }
if(!this.config.sync){ this.socket.write(message); return; }
this.queue.add( syncEmit.bind(this,message) ); }
function syncEmit(message){ this.log('dispatching event to ', this.id, this.path, ' : ', message); this.socket.write(message); }
function connect(){ //init client object for scope persistance especially inside of socket events.
let client=this;
client.log('requested connection to ', client.id, client.path); if(!this.path){ client.log('\n\n######\nerror: ', client.id ,' client has not specified socket path it wishes to connect to.'); return; }
const options={};
if(!client.port){ client.log('Connecting client on Unix Socket :', client.path);
options.path=client.path;
if (process.platform ==='win32' && !client.path.startsWith('\\\\.\\pipe\\')){ options.path = options.path.replace(/^\//, ''); options.path = options.path.replace(/\//g, '-'); options.path= `\\\\.\\pipe\\${options.path}`; }
client.socket = net.connect(options); }else{ options.host=client.path; options.port=client.port;
if(client.config.interface.localAddress){ options.localAddress=client.config.interface.localAddress; }
if(client.config.interface.localPort){ options.localPort=client.config.interface.localPort; }
if(client.config.interface.family){ options.family=client.config.interface.family; }
if(client.config.interface.hints){ options.hints=client.config.interface.hints; }
if(client.config.interface.lookup){ options.lookup=client.config.interface.lookup; }
if(!client.config.tls){ client.log('Connecting client via TCP to', options); client.socket = net.connect(options); }else{ client.log('Connecting client via TLS to', client.path ,client.port,client.config.tls); if(client.config.tls.private){ client.config.tls.key=fs.readFileSync(client.config.tls.private); } if(client.config.tls.public){ client.config.tls.cert=fs.readFileSync(client.config.tls.public); } if(client.config.tls.trustedConnections){ if(typeof client.config.tls.trustedConnections === 'string'){ client.config.tls.trustedConnections=[client.config.tls.trustedConnections]; } client.config.tls.ca=[]; for(let i=0; i<client.config.tls.trustedConnections.length; i++){ client.config.tls.ca.push( fs.readFileSync(client.config.tls.trustedConnections[i]) ); } }
Object.assign(client.config.tls,options);
client.socket = tls.connect( client.config.tls ); } }
client.socket.setEncoding(this.config.encoding);
client.socket.on( 'error', function(err){ client.log('\n\n######\nerror: ', err); client.publish('error', err);
} );
client.socket.on( 'connect', function connectionMade(){ client.publish('connect'); client.retriesRemaining=client.config.maxRetries; client.log('retrying reset'); } );
client.socket.on( 'close', function connectionClosed(){ client.log('connection closed' ,client.id , client.path, client.retriesRemaining, 'tries remaining of', client.config.maxRetries );
if( client.config.stopRetrying || client.retriesRemaining<1 || client.explicitlyDisconnected
){ client.publish('disconnect'); client.log( (client.config.id), 'exceeded connection rety amount of', ' or stopRetrying flag set.' );
client.socket.destroy(); client.publish('destroy'); client=undefined;
return; }
setTimeout( function retryTimeout(){ if (client.explicitlyDisconnected) { return; } client.retriesRemaining--; client.connect(); }.bind(null,client), client.config.retry );
client.publish('disconnect'); } );
client.socket.on( 'data', function(data) { client.log('## received events ##'); if(client.config.rawBuffer){ client.publish( 'data', Buffer.from(data,client.config.encoding) ); if(!client.config.sync){ return; }
client.queue.next(); return; }
if(!this.ipcBuffer){ this.ipcBuffer=''; }
data=(this.ipcBuffer+=data);
if(data.slice(-1)!=eventParser.delimiter || data.indexOf(eventParser.delimiter) == -1){ client.log('Messages are large, You may want to consider smaller messages.'); return; }
this.ipcBuffer='';
const events = eventParser.parse(data); const eCount = events.length; for(let i=0; i<eCount; i++){ let message=new Message; message.load(events[i]);
client.log('detected event', message.type, message.data); client.publish( message.type, message.data ); }
if(!client.config.sync){ return; }
client.queue.next(); } ); }
module.exports=Client;
|