Skip to content

Instantly share code, notes, and snippets.

@sdboyer
Last active August 29, 2015 14:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sdboyer/4b116fd78d8bad07a9ff to your computer and use it in GitHub Desktop.
Save sdboyer/4b116fd78d8bad07a9ff to your computer and use it in GitHub Desktop.
package transduce_mode_s
// HELLO FRIEND. this is an example implementation of transducers in Go,
// created in response to a suggestion on the golang-nuts mailing list:
// https://groups.google.com/d/msg/golang-nuts/pt8hRcvzeIQ/QXfLxvJrt5kJ
//
// this is untested (since it's mock code anyway), and may well contain
// bugs. as long as they're minor, that's fine, because this is just intended
// as an illustrative example.
import t "github.com/sdboyer/transducers-go"
// for simplicity's sake let's pretend this channel is how we get messages
// from the PPM receiver. messages are guaranteed to be a complete DF message
// packet, though they may be corrupted/garbled.
var magicalInput = make(chan []byte, 0)
// again, for simplicity's sake, we're pretending sending into this channel
// is the equivalent of "sending it upstream" in Jason's original spec. this
// is where completed plane state data goes.
var completedOut = make(chan aircraftData, 0)
// garbled messages should go here. my guess is that the correct behavior for
// this application is usually to inform an external piece of hardware to re-request
// data from the aircraft in the event of a garbled message, so implementing it is
// out of scope for this example. this is just here for illustrative purposes.
var garbleChan = make(chan interface{})
// this defines things in an order way that mostly parallels how the
// values will pass through the transduction process, but remember that
// none of this actually does anything until we wire it together. the
// wiring up at the bottom of this function.
func main() {
// the transducer stack for the main entry point. filters garbled
// messages, then sets up the address for extraction. the
// bottom reducer here then proxies the message along to individual
// transduction processors, one per aircraft being tracked.
//
// in general, we want to keep the processing here minimal because
// this is pre-fanout, and thus has to bear the full weight of inpu
entryTransform := []t.Transducer{
// first step - validate messages. those that fail are
// ejected into the garbleChan by the Escape transducer (that is,
// when the basicMessageValidation predicate func returns true)
t.Escape(basicMessageValidation, garbleChan, true),
// if we made it here, the message passed validation. so now,
// we're gonna wrap the simple byte slice in a DFMessage struct.
// this is quite useful later, too.
t.Map(toDFMessage),
}
// the map of aircraft to the channel entry point into their transduction
// processing stack
aircraftProcs := make(map[ICAOAddress]chan<- interface{})
// the transducer stack we'll use to process messages for individual aircraft.
// we'll fill this in in a minute, but have to declare it up here because
// entryBottom needs to use it.
var aircraftTransform []t.Transducer
// this is the "bottom reducer" for the main entry transduction process.
// it's responsible for fanout: it creates a new transduction process in
// a goroutine for each unique aircraft that we have a message from,
// and sends all subsequent messages into that pipeline via a channel.
var entryBottom t.ReduceStep = func(accum interface{}, v interface{}) (interface{}, bool) {
// we know v is gonna fulfill DFBasic because the makeIdGettable mapper
// always returns the appropriate type.
val := v.(DFMessage)
if c, exists := aircraftProcs[val.Address()]; !exists {
// first time we're seeing a message for this aircraft. give it its
// own processing stack, in its own goroutine, fed from this channel,
// and register that channel into our map of aircraft codes -> channels.
aircraftInput := make(chan interface{}, 0)
aircraftProcs[val.Address()] = aircraftInput
// kick off the goroutine that will handle this aircraft.
go func() {
// aircraftReducer() provides the bottom reducer here. It'll start us from
// an empty AircraftData struct, then incrementally add stuff onto the AircraftData
// until it's complete. At that point, the bottom reducer signals termination,
// the accumulator returns all the way back up and out from the Transduce
// processor, and into our 'final' variable here.
final := t.Transduce(aircraftInput, aircraftReducer(), aircraftTransform...).(aircraftData)
delete(aircraftProcs, val.Address())
close(aircraftInput)
// this is fan-in. having a single channel for this might make
// things look kinda artificially simple, but it still seems
// like a reasonably good approximation for this case.
completedOut <- final
}()
} else {
// we've seen this craft already. send the message along to the processor
// we already created. (note - unsynchronized access to the aircraftProcs
// map here does mean possibility of panic from sending on closed channel)
c <- val
}
// dummy return value, we're not gonna use it.
return struct{}{}, false
}
// that's everything for the main entry processor. now, we define the transducer
// for individual aircraft and actual message/data processing.
// ##### HEY HEY HEY HEY HEY THIS IS WHERE IT GETS COOL #####
//
// there are two basic things happening here: sniffing the DF message type out
// from the message byte array, and dropping the data found therein into our
// AircraftData.
//
// the conventional way to do the sniff+convert is probably with a single Mapper and
// a big grandaddy switch statement, wherein we maybe call out to functions
// dedicated to each of the message types, and they return a struct corresponding
// to their message type (something like `func ProcessDF1 (b []byte) DF1`).
//
// this defeats the purpose of using transducers, though, because one of the major
// principles behind transducers is composition: break things down into really
// small parts, then adroitly recombine them.
//
// going the DF0, DF1, DF... struct type route has another problem: it means
// we'd need to teach the bottom reducer about how to handle each and every
// one of those struct types. so, another giant grandaddy switch statement.
//
// so, instead of having any DF message type-specific structs, we just focus on
// the discrete values that the messages contain. this is accomplished by having
// a generic DFMessage struct that contains an []interface{}, and is passed through
// a series of mappers that are each responsible for seeing if the current message
// is the type they handle, and if so, extracting all the discrete typed values from the
// message and appending them onto the []interface{}. (we actually converted to DFMessage
// at the end of the main entry pipeline).
//
// now, the bottom reducer doesn't have to know or care about the type of message that
// came through. instead, it ranges across the []interface{} and with a big type switch,
// grabs out whatever data it wants and tucks it into the appropriate place in the
// AircraftData struct. still a big type switch, but better, because now it's focused
// directly on the data the AircraftData wants, rather than through the layer of
// indirection of where those datum appear on each message type.
//
// NOTE: this example is treating the problem space as a very confined problem - there's
// just one type of AircraftData, and all MODE-S transmitters can produce the same
// data in the same way. the former is unlikely to be true, and the second is
// definitely untrue. thus, a real version of this application would probably want
// to create transduction stacks that vary slightly for different transmitters,
// and possibly use different versions of AircraftData.
//
// with a conventional, monolithic approach, code starts exploding fast you need
// a separate giant switch statement for each transmitter type, and a separate
// gianter switch for each AircraftData type. with this composed, value-oriented
// approach, you just swap out, say, the DF4 mapper for DF4_custom, and
// everything's hunky dory. (See DF4 for example, others are empty)
aircraftTransform = []t.Transducer{
t.Map(df0),
t.Map(df4),
t.Map(df5),
t.Map(df11),
t.Map(df16),
t.Map(df17),
t.Map(df20),
t.Map(df21),
t.Map(df24),
}
// remember another benefit here, too - call t.AttachLoggers() on aircraftTransform,
// and there's immediately debug output at every transducer boundary.
// we now have everything we need to hook it all up. first, we create an
// appropriately-typed input channel, then start up its transduction processor
// in a goroutine.
// buffer is fairly large because this is the main loop and better to give
// it some breathing room before things get backed up
mainInput := make(chan interface{}, 100)
// we use the Transduce processor here, as we did above, because this is a push/eager
// pipeline: we want values to propagate through it as soon as they come in,
// rather than waiting for something at the end to *pull* the values through.
go t.Transduce(mainInput, entryBottom, entryTransform...)
// then we just range over the magical input and send values in.
for v := range magicalInput {
mainInput <- v
}
}
// Discrete data types for data encoded in some/all of the DF packets.
type ICAOAddress [3]byte
type Lat float64
type Lon float64
type Altitude int
func (m DFMessage) Address() ICAOAddress {
switch len(m.Msg) {
case 7:
return [3]byte{m.Msg[4], m.Msg[5], m.Msg[6]}
case 14:
return [3]byte{m.Msg[11], m.Msg[12], m.Msg[13]}
default:
panic("wrong message length")
}
}
// A struct representing the final AircraftData we want to build up.
type aircraftData struct {
ICAOAddress
Lat
Lon
Altitude
// probably a bunch of other specific stuff that i don't know
}
type DFMessage struct {
Msg []byte
Type uint8
Data []interface{}
}
// This creates a reducer that's specially designed to consume data extractable
// from DF messages, and assemble them onto a AircraftData
func aircraftReducer() t.Reducer {
r := t.CreateStep(func(accum interface{}, v interface{}) (interface{}, bool) {
// all the work we've done so far on putting together the plane data
// always comes back into this function as the accumulator. note that
// this use pattern is a totally conventional reduction process: we use
// the value to build up the accumulator on each pass.
pd := accum.(aircraftData)
val := v.(DFMessage)
for _, datum := range val.Data {
// and here's where we do the big type switch, to figure out how/where
// we want to stick each bit of data, based on its type.
switch d := datum.(type) {
case ICAOAddress:
pd.ICAOAddress = d
case Lat:
pd.Lat = d
case Lon:
pd.Lon = d
case Altitude:
pd.Altitude = d
}
}
// if plane data is complete, we send back the termination signal, which
// guarantees no more values will come down the pipeline.
return pd, aircraftDataIsComplete(pd)
})
// This is the init function - the Transducer processor we're using calls it
// to create the initial value for the accumulator. This is what guarantees that
// the dynamic type of accum in the preceding closure is AircraftData, even on the
// first pass through the transduction stack.
r.I = func() interface{} {
return aircraftData{}
}
return r
}
func toDFMessage(v interface{}) interface{} {
val := v.([]byte)
return DFMessage{
Msg: val,
Type: val[0] &^ 224, // mask out the three high bits. note, probably wrong because endianness
Data: make([]interface{}, 0), // this does mean heap alloc i think, ugh
}
}
func df4(v interface{}) interface{} {
val := v.(DFMessage)
// only do stuff if message type is 4
if val.Type != 4 {
return v
}
// i know DF4 reports altitude, but i don't know where it is exactly.
// so this is wrong, but illustrative of the pattern.
//
// for simplicity, and because my bit-extracting-fu is terrible, let's just
// assume altitude is expressed using all of the 4th byte. (also ignoring
// big endianness of the stream, here)
val.Data = append(val.Data, val.Msg[3])
return val
}
// BUSINESS LOGIC, OH YES
func df0(v interface{}) interface{} {
return v
}
func df5(v interface{}) interface{} {
return v
}
func df11(v interface{}) interface{} {
return v
}
func df16(v interface{}) interface{} {
return v
}
func df17(v interface{}) interface{} {
return v
}
func df20(v interface{}) interface{} {
return v
}
func df21(v interface{}) interface{} {
return v
}
func df24(v interface{}) interface{} {
return v
}
func aircraftDataIsComplete(pd aircraftData) bool {
// all this does is see if struct fields are at their zero values.
// so, this is hardly a real check - just intended to be illustrative.
switch {
case pd.ICAOAddress == [3]byte{0, 0, 0}:
return false
case pd.Lat == 0, pd.Lon == 0, pd.Altitude == 0:
return false
default:
return true
}
}
// Filterer function. This is responsible for doing basic message validation;
// if the validation fails, this returns true, and the Escape transducer
// sends the message into the garbleChan.
func basicMessageValidation(v interface{}) bool {
// Transducers do necessitate type assertions inside predicate funcs.
val := v.([]byte)
// check to make sure the message is 14 bytes/112 bits, or 7 bytes/56 bits, long.
// probably not necessary, but since i have no clue how parity checking works,
// i'm putting this here as illustrative of a validation mechanism.
return len(val) != 14 && len(val) != 7
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment