Skip to content

Instantly share code, notes, and snippets.

@linxGnu
Created September 13, 2019 00:38
Show Gist options
  • Save linxGnu/b488997a0e62b3f6a7060ba2af6391ea to your computer and use it in GitHub Desktop.
Save linxGnu/b488997a0e62b3f6a7060ba2af6391ea to your computer and use it in GitHub Desktop.
Connect to SMSC for sending MT and receiving MO via Session Manager
package daemon
import (
"context"
"fmt"
"math"
"os"
"strconv"
"sync"
"time"
"github.com/linxGnu/gosmpp"
"github.com/linxGnu/gosmpp/Data"
"github.com/linxGnu/gosmpp/Exception"
"github.com/linxGnu/gosmpp/PDU"
"github.com/linxGnu/gosmpp/Utils"
)
const (
stateUnBind = iota
stateBind
)
const (
statusSendMTFail = iota
statusSendMTSuccess
)
const dataCodingASCII byte = 0
var instance SessionInterface
var lock sync.RWMutex
// PDU config
var (
pduSrcAddrTon byte = 5
pduSrcAddrNpi byte
pduDesAddrTon byte = 1
pduDesAddrNpi byte = 1
pduProtocolID byte
pduRegisterDelivery byte
pduReplaceIfPresentFlag byte
pduEsmClass byte
)
// SessionInterface ...
type SessionInterface interface {
Bind() error
Rebind() error
Destroy()
UnBind() (unbindResp *PDU.UnbindResp, err *Exception.Exception)
}
// SessionManager ...
type SessionManager struct {
session *gosmpp.Session
sessionLock sync.RWMutex
config config.SMPPConnection
lg logger.Logger
moHandler *handler.MOHandler
// PDU event handle channel
eventChan chan *gosmpp.ServerPDUEvent
wg *sync.WaitGroup
firstTimeBind bool
seq int
seqLock sync.RWMutex
// use for stop workers
ctx context.Context
cancel context.CancelFunc
}
// Bind ...
func (s *SessionManager) Bind() error {
s.sessionLock.Lock()
defer s.sessionLock.Unlock()
s.ctx, s.cancel = context.WithCancel(context.Background())
// retry if err
var connection *gosmpp.TCPIPConnection
var err error
for i := 0; i < 3; i++ {
connection, err = gosmpp.NewTCPIPConnectionWithAddrPort(s.config.Address, s.config.Port)
if err == nil {
break
}
time.Sleep(50 * time.Millisecond)
}
if err != nil {
s.lg.Error(&model.LogFormat{Action: "Bind", Message: "failed to connect SMPP", Err: err})
time.Sleep(10 * time.Millisecond)
os.Exit(2)
}
s.session = gosmpp.NewSessionWithConnection(connection)
s.session.EnableStateChecking()
request := PDU.NewBindTransceiver()
request.SetSystemId(s.config.SystemID)
request.SetPassword(s.config.Password)
request.SetSystemType(s.config.SystemType)
// try to bind
if resp, e := s.session.BindWithListener(request, s); e != nil || resp.GetCommandStatus() != 0 {
s.lg.Error(&model.LogFormat{Action: "Bind", Message: "Try to bind SMPP", Err: e})
connection.Close()
s.session = nil
panic(e.Error.Error()) // let panic so that webserver daemon is also closed
} else if s.config.DebugMode {
s.lg.Debug(&model.LogFormat{Action: "Bind", Message: "Try to bind SMPP", Data: resp})
}
s.lg.Info(&model.LogFormat{Action: "Bind", Message: "Successful"})
// start pdu event handlers buffered
for i := 0; i < s.config.NumberOfMOHandler; i++ {
s.wg.Add(1)
go s.handleEventWorker()
}
// start submitSM worker
for i := 0; i < s.config.NumberOfMTHandler; i++ {
s.wg.Add(1)
go s.submitSMWorker()
}
// start enquire link
if s.firstTimeBind {
s.firstTimeBind = false
if s.config.EnquireLinkIntervalSec > 0 {
go s.doEnquireLink()
}
}
return nil
}
// Rebind Unbind then bind SMSC
func (s *SessionManager) Rebind() error {
SessionManagerCounter.Incr("rebind")
s.UnBind()
return s.Bind()
}
// Destroy Unbind then close connection
func (s *SessionManager) Destroy() {
s.UnBind()
}
// UnBind Unbind SMPP session
func (s *SessionManager) UnBind() (unbindResp *PDU.UnbindResp, err *Exception.Exception) {
s.lg.Info(&model.LogFormat{Action: "start Unbind SMPP"})
s.sessionLock.RLock()
if s.session == nil {
s.sessionLock.RUnlock()
return
}
s.sessionLock.RUnlock()
s.lg.Info(&model.LogFormat{Action: "Unbind SMPP"})
// notify all worker to stop
s.cancel()
s.wg.Wait()
s.lg.Info(&model.LogFormat{Action: "Close chan done"})
// Do session lock
s.sessionLock.Lock()
unbindResp, err = s.session.Unbind()
if err != nil {
s.lg.Error(&model.LogFormat{Action: "Unbind SMPP", Err: err})
} else if s.config.DebugMode {
s.lg.Debug(&model.LogFormat{Action: "Unbind SMPP", Data: unbindResp})
}
s.lg.Info(&model.LogFormat{Action: "Close chan done"})
s.session = nil
s.sessionLock.Unlock()
return
}
// handleEventWorker handle signal sent by SMPP
func (s *SessionManager) handleEventWorker() {
defer s.wg.Done()
for {
select {
case <-s.ctx.Done():
return
case event := <-s.eventChan:
s.handleEvent(event)
}
}
}
func (s *SessionManager) handleEvent(event *gosmpp.ServerPDUEvent) {
var (
// msgID = int64(-1)
commandStatus int32
// status int
moContent string
)
if event == nil {
return
}
t := event.GetPDU()
if t == nil {
return
}
commandStatus = t.GetCommandStatus()
if commandStatus == Data.ESME_RTHROTTLED {
s.lg.Warn(&model.LogFormat{Action: "handleEventWorker", Message: "Throttled", Data: t})
mtCounter.Incr("throttled")
} else if commandStatus == Data.ESME_RMSGQFUL {
s.lg.Warn(&model.LogFormat{Action: "handleEventWorker", Message: "Queue full", Data: t})
mtCounter.Incr("queue_full")
}
switch v := t.(type) {
case *PDU.DeliverSM:
if s.config.DebugMode {
s.lg.Debug(&model.LogFormat{Action: "DeliverSM", Data: v})
}
if response, err := v.GetResponse(); err == nil {
s.session.Respond(response)
}
if v.GetEsmClass() == 0 { // receive MO from user
moContent, _ = v.GetShortMessageWithEncoding(Data.ENC_UTF8)
if v.GetSourceAddr() != nil {
isdn := v.GetSourceAddr().GetAddress()
s.lg.Info(&model.LogFormat{Action: "ReceiveMO", Data: map[string]interface{}{
"isdn": isdn,
"content": moContent,
}})
if err := s.moHandler.ProcessMO(isdn, moContent); err != nil {
s.lg.Error(&model.LogFormat{Action: "ProcessMO", Err: err})
}
}
}
case *PDU.Unbind:
s.lg.Info(&model.LogFormat{Action: "Unbind SMSC", Message: "Received"})
if response, err := v.GetResponse(); err == nil {
s.lg.Info(response)
s.lg.Info(err)
s.session.Respond(response)
}
case *PDU.EnquireLink:
s.lg.Info(&model.LogFormat{Action: "EnquireLink SMSC", Message: "Received"})
if response, err := v.GetResponse(); err == nil {
s.lg.Info(response)
s.lg.Info(err)
s.session.Respond(response)
}
case *PDU.SubmitSMResp:
// we dont care submitSMResp at time
s.lg.Info(&model.LogFormat{Action: "SubmitSMResp SMSC", Message: "Received"})
}
}
func (s *SessionManager) submitSMWorker() {
defer s.wg.Done()
// receives MT from webserver
for {
select {
case <-s.ctx.Done():
return
case v := <-core.GetHTTPMTChan():
s.lg.Info(&model.LogFormat{Action: "Receive MT from Channel", Data: v})
core.MTChanGauge.Dec()
s.handleMT(v)
if s.config.MTIntervalMiliSec > 0 {
time.Sleep(time.Duration(s.config.MTIntervalMiliSec) * time.Millisecond)
}
}
}
}
func (s *SessionManager) handleMT(v *model.MT) {
var submitSM *PDU.SubmitSM
ln, firstMTInSeq := 0, true
ln = len(v.Info)
// we dont care submitSMResp now. So we dont save seq to Redis
if ln < 140 {
submitSM = s.createSubmitSM(v.EncryptedIsdn, v.Info, v.Alias)
// cache this seq
seq := int32(s.incrementSequence())
// expiredTime := time.Duration(s.config.) * time.Second
// keyName := "seq_" + strconv.Itoa(int(seq))
// if err := s.redisClient.Set(keyName, v, expiredTime).Err(); err != nil {
// logger.WithFields(map[string]interface{}{
// "seq": seq,
// "mt": v,
// "error": err,
// }).Error("Can not set sequence number")
// }
// set sequence number
submitSM.SetSequenceNumber(seq)
if s.config.DebugMode {
s.lg.Debug(&model.LogFormat{Action: "SUBMIT_SM", Data: submitSM})
}
s.sessionLock.RLock()
if _, e := s.session.Submit(submitSM); e != nil {
s.lg.Error(&model.LogFormat{Action: "SUBMIT_SM", Data: submitSM, Err: e})
mtCounter.Incr("error")
} else {
s.lg.Info(&model.LogFormat{Action: "SUBMIT_SM", Message: "Success", Data: submitSM})
mtCounter.Incr("success")
}
s.sessionLock.RUnlock()
} else {
s.lg.Info(&model.LogFormat{Action: "SUBMIT_SM_MULTI", Data: v})
firstMTInSeq = true
smRunes := []byte(v.Info)
runeLen := len(v.Info)
totalPart := byte(int(math.Ceil(float64(runeLen) / 134)))
partNum := 1
uuID := s.incrementSequence()
for i := 0; i < runeLen; i += 134 {
start, end := i, i+134
if end > runeLen {
end = runeLen
}
// https://help.nexmo.com/hc/en-us/articles/204015653-Sending-Concatenated-Messages-via-SMPP
part := []byte{5, 0, 3, byte(uuID), totalPart, byte(partNum)}
part = append(part, []byte(smRunes[start:end])...)
submitSM = s.createSubmitSM(v.EncryptedIsdn, "", v.Alias)
submitSM.SetShortMessageData(Utils.NewBuffer(part))
submitSM.SetEsmClass(byte(64))
if s.config.DebugMode {
s.lg.Debug(&model.LogFormat{Action: "SUBMIT_SM", Message: "Req", Data: submitSM})
}
var seq int32
if firstMTInSeq {
seq = submitSM.GetSequenceNumber()
} else {
seq = int32(s.incrementSequence())
}
submitSM.SetSequenceNumber(seq)
s.sessionLock.RLock()
s.lg.Info(&model.LogFormat{Action: "SUBMIT_SM_MULTI", Message: "Part #" + strconv.Itoa(partNum)})
if resp, e := s.session.Submit(submitSM); e == nil {
if firstMTInSeq {
firstMTInSeq = false
}
s.lg.Info(&model.LogFormat{Action: "SUBMIT_SM_MULTI", Message: "Part #" + strconv.Itoa(partNum) + ". Success"})
mtCounter.Incr("success")
} else {
s.lg.Error(&model.LogFormat{Action: "SUBMIT_SM_MULTI", Message: "Part #" + strconv.Itoa(partNum), Err: e, Data: resp})
mtCounter.Incr("error")
}
s.sessionLock.RUnlock()
partNum++
if i < int(totalPart)-1 && s.config.MTIntervalMiliSec > 0 {
time.Sleep(time.Duration(s.config.MTIntervalMiliSec) * time.Millisecond)
}
}
}
}
func (s *SessionManager) createSubmitSM(receiver, content, alias string) *PDU.SubmitSM {
sm := PDU.NewSubmitSM()
var err *Exception.Exception
//srcAddr, err := PDU.NewAddressWithAddr(smscShortCode)
srcAddr := PDU.NewAddress()
if alias == "" {
err = srcAddr.SetAddress(s.config.ShortCode)
if err != nil {
s.lg.Error(&model.LogFormat{Action: "CREATE_SUBMIT_SM_SHORTCODE", Message: "INIT SrcAddr " + s.config.ShortCode, Err: err})
}
} else {
err := srcAddr.SetAddress(alias)
if err != nil {
s.lg.Error(&model.LogFormat{Action: "CREATE_SUBMIT_SM_ALIAS", Message: "INIT SrcAddr " + alias, Err: err})
}
}
srcAddr.SetTon(pduSrcAddrTon)
srcAddr.SetNpi(pduSrcAddrNpi)
sm.SetSourceAddr(srcAddr)
//desAddr, err := PDU.NewAddressWithAddr(receiver)
desAddr := PDU.NewAddress()
err = desAddr.SetAddress(receiver)
if err != nil {
s.lg.Error(&model.LogFormat{Action: "CREATE_SUBMIT_SM", Message: "INIT DestAddr", Err: err})
}
desAddr.SetTon(pduDesAddrTon)
desAddr.SetNpi(pduDesAddrNpi)
sm.SetDestAddr(desAddr)
sm.SetProtocolId(pduProtocolID)
sm.SetRegisteredDelivery(pduRegisterDelivery)
sm.SetReplaceIfPresentFlag(pduReplaceIfPresentFlag)
sm.SetEsmClass(pduEsmClass)
sm.SetDataCoding(dataCodingASCII)
if len(content) > 0 {
sm.SetShortMessageWithEncoding(content, Data.ENC_UTF8)
}
return sm
}
func (s *SessionManager) incrementSequence() int {
s.seqLock.Lock()
defer s.seqLock.Unlock()
s.seq += s.config.SeqMod
if s.seq > 2000000000 {
s.seq = 1
}
return s.seq
}
func (s *SessionManager) doEnquireLink() {
defer func() {
s.lg.Info(&model.LogFormat{Action: "doEnquireLink", Message: "Done"})
}()
for {
select {
case <-s.ctx.Done():
return
default:
// do enquire link
s.sessionLock.RLock()
if s.session != nil {
resp, err := s.session.EnquireLink(PDU.NewEnquireLink())
if err != nil {
s.sessionLock.RUnlock()
if s.config.DebugMode {
s.lg.Debug(&model.LogFormat{Action: "doEnquireLink", Err: err, Data: resp})
}
// trigger rebind
_ = s.Rebind()
return
} else if s.config.DebugMode && s.config.HeartBeat {
s.lg.Info(&model.LogFormat{Action: "SMSC Hertbeat", Message: "Resp", Data: resp})
}
s.lg.Info(&model.LogFormat{Action: "enquire link", Err: err, Data: resp})
}
s.sessionLock.RUnlock()
timeSleep := time.Duration(s.config.EnquireLinkIntervalSec) * time.Second
time.Sleep(timeSleep)
}
}
}
// HandleEvent ...
func (s *SessionManager) HandleEvent(event *gosmpp.ServerPDUEvent) (ex *Exception.Exception) {
defer func() {
if e := recover(); e != nil {
s.lg.Error(&model.LogFormat{Action: "HandleEvent", Err: e})
ex = Exception.NewException(fmt.Errorf("%v", e))
}
}()
s.eventChan <- event
return
}
// GetSMPPInstace return private SMPP instance
func GetSMPPInstace() SessionInterface {
return instance
}
// NewSMPPSessionManager ...
func NewSMPPSessionManager(ctx context.Context) (fn model.Daemon, err error) {
// init session
lock.Lock()
defer lock.Unlock()
if instance != nil {
instance.UnBind()
}
conf := config.Get().SMPPConnection
lg := logger.MustGet("SMPP-Session")
core.NewHTTPMTChan()
instance = &SessionManager{
firstTimeBind: true,
config: conf,
moHandler: handler.NewMOHandler(),
wg: &sync.WaitGroup{},
lg: lg,
eventChan: make(chan *gosmpp.ServerPDUEvent, conf.NumberOfMOHandler),
}
go func() {
if err := GetSMPPInstace().Bind(); err != nil {
lg.Error(err)
}
}()
fn = func() {
<-ctx.Done()
lg.Warn("start destroy")
GetSMPPInstace().Destroy()
lg.Warn("Gracefully stop SessionManager")
}
return
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment