Last active
August 29, 2015 14:11
-
-
Save sdboyer/4b116fd78d8bad07a9ff to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package transduce_mode_s | |
// HELLO FRIEND. this is an example implementation of transducers in Go, | |
// created in response to a suggestion on the golang-nuts mailing list: | |
// https://groups.google.com/d/msg/golang-nuts/pt8hRcvzeIQ/QXfLxvJrt5kJ | |
// | |
// this is untested (since it's mock code anyway), and may well contain | |
// bugs. as long as they're minor, that's fine, because this is just intended | |
// as an illustrative example. | |
import t "github.com/sdboyer/transducers-go" | |
// for simplicity's sake let's pretend this channel is how we get messages | |
// from the PPM receiver. messages are guaranteed to be a complete DF message | |
// packet, though they may be corrupted/garbled. | |
var magicalInput = make(chan []byte, 0) | |
// again, for simplicity's sake, we're pretending sending into this channel | |
// is the equivalent of "sending it upstream" in Jason's original spec. this | |
// is where completed plane state data goes. | |
var completedOut = make(chan aircraftData, 0) | |
// garbled messages should go here. my guess is that the correct behavior for | |
// this application is usually to inform an external piece of hardware to re-request | |
// data from the aircraft in the event of a garbled message, so implementing it is | |
// out of scope for this example. this is just here for illustrative purposes. | |
var garbleChan = make(chan interface{}) | |
// this defines things in an order way that mostly parallels how the | |
// values will pass through the transduction process, but remember that | |
// none of this actually does anything until we wire it together. the | |
// wiring up at the bottom of this function. | |
func main() { | |
// the transducer stack for the main entry point. filters garbled | |
// messages, then sets up the address for extraction. the | |
// bottom reducer here then proxies the message along to individual | |
// transduction processors, one per aircraft being tracked. | |
// | |
// in general, we want to keep the processing here minimal because | |
// this is pre-fanout, and thus has to bear the full weight of inpu | |
entryTransform := []t.Transducer{ | |
// first step - validate messages. those that fail are | |
// ejected into the garbleChan by the Escape transducer (that is, | |
// when the basicMessageValidation predicate func returns true) | |
t.Escape(basicMessageValidation, garbleChan, true), | |
// if we made it here, the message passed validation. so now, | |
// we're gonna wrap the simple byte slice in a DFMessage struct. | |
// this is quite useful later, too. | |
t.Map(toDFMessage), | |
} | |
// the map of aircraft to the channel entry point into their transduction | |
// processing stack | |
aircraftProcs := make(map[ICAOAddress]chan<- interface{}) | |
// the transducer stack we'll use to process messages for individual aircraft. | |
// we'll fill this in in a minute, but have to declare it up here because | |
// entryBottom needs to use it. | |
var aircraftTransform []t.Transducer | |
// this is the "bottom reducer" for the main entry transduction process. | |
// it's responsible for fanout: it creates a new transduction process in | |
// a goroutine for each unique aircraft that we have a message from, | |
// and sends all subsequent messages into that pipeline via a channel. | |
var entryBottom t.ReduceStep = func(accum interface{}, v interface{}) (interface{}, bool) { | |
// we know v is gonna fulfill DFBasic because the makeIdGettable mapper | |
// always returns the appropriate type. | |
val := v.(DFMessage) | |
if c, exists := aircraftProcs[val.Address()]; !exists { | |
// first time we're seeing a message for this aircraft. give it its | |
// own processing stack, in its own goroutine, fed from this channel, | |
// and register that channel into our map of aircraft codes -> channels. | |
aircraftInput := make(chan interface{}, 0) | |
aircraftProcs[val.Address()] = aircraftInput | |
// kick off the goroutine that will handle this aircraft. | |
go func() { | |
// aircraftReducer() provides the bottom reducer here. It'll start us from | |
// an empty AircraftData struct, then incrementally add stuff onto the AircraftData | |
// until it's complete. At that point, the bottom reducer signals termination, | |
// the accumulator returns all the way back up and out from the Transduce | |
// processor, and into our 'final' variable here. | |
final := t.Transduce(aircraftInput, aircraftReducer(), aircraftTransform...).(aircraftData) | |
delete(aircraftProcs, val.Address()) | |
close(aircraftInput) | |
// this is fan-in. having a single channel for this might make | |
// things look kinda artificially simple, but it still seems | |
// like a reasonably good approximation for this case. | |
completedOut <- final | |
}() | |
} else { | |
// we've seen this craft already. send the message along to the processor | |
// we already created. (note - unsynchronized access to the aircraftProcs | |
// map here does mean possibility of panic from sending on closed channel) | |
c <- val | |
} | |
// dummy return value, we're not gonna use it. | |
return struct{}{}, false | |
} | |
// that's everything for the main entry processor. now, we define the transducer | |
// for individual aircraft and actual message/data processing. | |
// ##### HEY HEY HEY HEY HEY THIS IS WHERE IT GETS COOL ##### | |
// | |
// there are two basic things happening here: sniffing the DF message type out | |
// from the message byte array, and dropping the data found therein into our | |
// AircraftData. | |
// | |
// the conventional way to do the sniff+convert is probably with a single Mapper and | |
// a big grandaddy switch statement, wherein we maybe call out to functions | |
// dedicated to each of the message types, and they return a struct corresponding | |
// to their message type (something like `func ProcessDF1 (b []byte) DF1`). | |
// | |
// this defeats the purpose of using transducers, though, because one of the major | |
// principles behind transducers is composition: break things down into really | |
// small parts, then adroitly recombine them. | |
// | |
// going the DF0, DF1, DF... struct type route has another problem: it means | |
// we'd need to teach the bottom reducer about how to handle each and every | |
// one of those struct types. so, another giant grandaddy switch statement. | |
// | |
// so, instead of having any DF message type-specific structs, we just focus on | |
// the discrete values that the messages contain. this is accomplished by having | |
// a generic DFMessage struct that contains an []interface{}, and is passed through | |
// a series of mappers that are each responsible for seeing if the current message | |
// is the type they handle, and if so, extracting all the discrete typed values from the | |
// message and appending them onto the []interface{}. (we actually converted to DFMessage | |
// at the end of the main entry pipeline). | |
// | |
// now, the bottom reducer doesn't have to know or care about the type of message that | |
// came through. instead, it ranges across the []interface{} and with a big type switch, | |
// grabs out whatever data it wants and tucks it into the appropriate place in the | |
// AircraftData struct. still a big type switch, but better, because now it's focused | |
// directly on the data the AircraftData wants, rather than through the layer of | |
// indirection of where those datum appear on each message type. | |
// | |
// NOTE: this example is treating the problem space as a very confined problem - there's | |
// just one type of AircraftData, and all MODE-S transmitters can produce the same | |
// data in the same way. the former is unlikely to be true, and the second is | |
// definitely untrue. thus, a real version of this application would probably want | |
// to create transduction stacks that vary slightly for different transmitters, | |
// and possibly use different versions of AircraftData. | |
// | |
// with a conventional, monolithic approach, code starts exploding fast you need | |
// a separate giant switch statement for each transmitter type, and a separate | |
// gianter switch for each AircraftData type. with this composed, value-oriented | |
// approach, you just swap out, say, the DF4 mapper for DF4_custom, and | |
// everything's hunky dory. (See DF4 for example, others are empty) | |
aircraftTransform = []t.Transducer{ | |
t.Map(df0), | |
t.Map(df4), | |
t.Map(df5), | |
t.Map(df11), | |
t.Map(df16), | |
t.Map(df17), | |
t.Map(df20), | |
t.Map(df21), | |
t.Map(df24), | |
} | |
// remember another benefit here, too - call t.AttachLoggers() on aircraftTransform, | |
// and there's immediately debug output at every transducer boundary. | |
// we now have everything we need to hook it all up. first, we create an | |
// appropriately-typed input channel, then start up its transduction processor | |
// in a goroutine. | |
// buffer is fairly large because this is the main loop and better to give | |
// it some breathing room before things get backed up | |
mainInput := make(chan interface{}, 100) | |
// we use the Transduce processor here, as we did above, because this is a push/eager | |
// pipeline: we want values to propagate through it as soon as they come in, | |
// rather than waiting for something at the end to *pull* the values through. | |
go t.Transduce(mainInput, entryBottom, entryTransform...) | |
// then we just range over the magical input and send values in. | |
for v := range magicalInput { | |
mainInput <- v | |
} | |
} | |
// Discrete data types for data encoded in some/all of the DF packets. | |
type ICAOAddress [3]byte | |
type Lat float64 | |
type Lon float64 | |
type Altitude int | |
func (m DFMessage) Address() ICAOAddress { | |
switch len(m.Msg) { | |
case 7: | |
return [3]byte{m.Msg[4], m.Msg[5], m.Msg[6]} | |
case 14: | |
return [3]byte{m.Msg[11], m.Msg[12], m.Msg[13]} | |
default: | |
panic("wrong message length") | |
} | |
} | |
// A struct representing the final AircraftData we want to build up. | |
type aircraftData struct { | |
ICAOAddress | |
Lat | |
Lon | |
Altitude | |
// probably a bunch of other specific stuff that i don't know | |
} | |
type DFMessage struct { | |
Msg []byte | |
Type uint8 | |
Data []interface{} | |
} | |
// This creates a reducer that's specially designed to consume data extractable | |
// from DF messages, and assemble them onto a AircraftData | |
func aircraftReducer() t.Reducer { | |
r := t.CreateStep(func(accum interface{}, v interface{}) (interface{}, bool) { | |
// all the work we've done so far on putting together the plane data | |
// always comes back into this function as the accumulator. note that | |
// this use pattern is a totally conventional reduction process: we use | |
// the value to build up the accumulator on each pass. | |
pd := accum.(aircraftData) | |
val := v.(DFMessage) | |
for _, datum := range val.Data { | |
// and here's where we do the big type switch, to figure out how/where | |
// we want to stick each bit of data, based on its type. | |
switch d := datum.(type) { | |
case ICAOAddress: | |
pd.ICAOAddress = d | |
case Lat: | |
pd.Lat = d | |
case Lon: | |
pd.Lon = d | |
case Altitude: | |
pd.Altitude = d | |
} | |
} | |
// if plane data is complete, we send back the termination signal, which | |
// guarantees no more values will come down the pipeline. | |
return pd, aircraftDataIsComplete(pd) | |
}) | |
// This is the init function - the Transducer processor we're using calls it | |
// to create the initial value for the accumulator. This is what guarantees that | |
// the dynamic type of accum in the preceding closure is AircraftData, even on the | |
// first pass through the transduction stack. | |
r.I = func() interface{} { | |
return aircraftData{} | |
} | |
return r | |
} | |
func toDFMessage(v interface{}) interface{} { | |
val := v.([]byte) | |
return DFMessage{ | |
Msg: val, | |
Type: val[0] &^ 224, // mask out the three high bits. note, probably wrong because endianness | |
Data: make([]interface{}, 0), // this does mean heap alloc i think, ugh | |
} | |
} | |
func df4(v interface{}) interface{} { | |
val := v.(DFMessage) | |
// only do stuff if message type is 4 | |
if val.Type != 4 { | |
return v | |
} | |
// i know DF4 reports altitude, but i don't know where it is exactly. | |
// so this is wrong, but illustrative of the pattern. | |
// | |
// for simplicity, and because my bit-extracting-fu is terrible, let's just | |
// assume altitude is expressed using all of the 4th byte. (also ignoring | |
// big endianness of the stream, here) | |
val.Data = append(val.Data, val.Msg[3]) | |
return val | |
} | |
// BUSINESS LOGIC, OH YES | |
func df0(v interface{}) interface{} { | |
return v | |
} | |
func df5(v interface{}) interface{} { | |
return v | |
} | |
func df11(v interface{}) interface{} { | |
return v | |
} | |
func df16(v interface{}) interface{} { | |
return v | |
} | |
func df17(v interface{}) interface{} { | |
return v | |
} | |
func df20(v interface{}) interface{} { | |
return v | |
} | |
func df21(v interface{}) interface{} { | |
return v | |
} | |
func df24(v interface{}) interface{} { | |
return v | |
} | |
func aircraftDataIsComplete(pd aircraftData) bool { | |
// all this does is see if struct fields are at their zero values. | |
// so, this is hardly a real check - just intended to be illustrative. | |
switch { | |
case pd.ICAOAddress == [3]byte{0, 0, 0}: | |
return false | |
case pd.Lat == 0, pd.Lon == 0, pd.Altitude == 0: | |
return false | |
default: | |
return true | |
} | |
} | |
// Filterer function. This is responsible for doing basic message validation; | |
// if the validation fails, this returns true, and the Escape transducer | |
// sends the message into the garbleChan. | |
func basicMessageValidation(v interface{}) bool { | |
// Transducers do necessitate type assertions inside predicate funcs. | |
val := v.([]byte) | |
// check to make sure the message is 14 bytes/112 bits, or 7 bytes/56 bits, long. | |
// probably not necessary, but since i have no clue how parity checking works, | |
// i'm putting this here as illustrative of a validation mechanism. | |
return len(val) != 14 && len(val) != 7 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment