Skip to content

Instantly share code, notes, and snippets.

@nazavode
Last active October 30, 2017 17:32
Show Gist options
  • Save nazavode/8e22cbef33a09e15c1fd01fae441990e to your computer and use it in GitHub Desktop.
Save nazavode/8e22cbef33a09e15c1fd01fae441990e to your computer and use it in GitHub Desktop.
# Set LED output propotional to input
timer(Timer) OUT -> TRIGGER potmeter(AnalogRead)
potmeter() OUT -> IN map(MapLinear) OUT -> DUTYCYCLE led(PwmWrite)
# Config
'10' -> INTERVAL timer
'0' -> INMIN map(), '1023' -> INMAX map()
'0' -> OUTMIN map(), '100' -> OUTMAX map()
board(ArduinoUno) PINA0 -> PIN potmeter()
board() PIN10 -> PIN led()
#timer(Timer) OUT -> IN toggle(ToggleBoolean) OUT -> IN split(Split), '1000' -> INTERVAL timer
timer(Timer) OUT -> TRIGGER read(DigitalRead) OUT -> IN split(Split), '5' -> INTERVAL timer
# 'true' -> IN split(Split)
board(ATUSBKEY) PORTA0 -> PIN outA(DigitalWrite), split OUT1 -> IN outA
board PORTA1 -> PIN outB(DigitalWrite), split OUT2 -> IN outB
board PORTA2 -> PIN outC(DigitalWrite), split OUT3 -> IN outC
board PORTA3 -> PIN outD(DigitalWrite), split OUT4 -> IN outD
board PORTA4 -> PIN outE(DigitalWrite), split OUT5 -> IN outE
board PORTA5 -> PIN outF(DigitalWrite), split OUT6 -> IN outF
board PORTA6 -> PIN outG(DigitalWrite), split OUT7 -> IN outG
board PORTA7 -> PIN outH(DigitalWrite), split OUT8 -> IN outH
board PORTF0 -> PIN read
in(ReadCapacitivePin) OUT -> IN outA(DigitalWrite)
timer(Timer) OUT -> TRIGGER in
# Config
'60' -> THRESHOLD in
'10' -> INTERVAL timer
board(ArduinoUno) PIN2 -> PIN in
board PIN13 -> PIN outA
#!/usr/bin/env python
import time
import argparse
import subprocess
import contextlib
from os import path
from textx.metamodel import metamodel_from_file
from textx.export import model_export
PARSER = argparse.ArgumentParser(description='parse and generate graphs from FBP sources.')
PARSER.add_argument('fbp_sources', metavar='INFILE', type=str, nargs='+',
help='path to the FBP source file')
PARSER.add_argument('--debug', dest='debug', action='store_true',
help='enable debug mode')
HERE = path.abspath(path.dirname(__file__))
METAMODEL_FILENAME = path.join(HERE, 'fbp.tx')
@contextlib.contextmanager
def timeit(msg):
t = time.perf_counter()
yield
t = time.perf_counter() - t
print(f"{msg} [{t*1000} ms]")
if __name__ == '__main__':
args = PARSER.parse_args()
with timeit(f"built metamodel from grammar {METAMODEL_FILENAME!r}"):
metamodel = metamodel_from_file(METAMODEL_FILENAME)
for fbp_filename in args.fbp_sources:
model_filename = path.abspath(fbp_filename)
dot_filename = model_filename + '.dot'
png_filename = model_filename + '.png'
with timeit(f"built ast from source {model_filename!r}"):
ast = metamodel.model_from_file(model_filename, debug=args.debug)
with timeit(f"rendered graph to {png_filename!r}"):
model_export(ast, dot_filename)
subprocess.run(['dot', '-Tpng', '-o', png_filename, dot_filename])
subprocess.run(['rm', '-f', dot_filename])
# Added exports to original example source
INPORT = Read.IN:INPUTFILE
OUTPORT = Display.OUT:DISPLAY
'package.json' -> IN Read OUT -> IN Count(Counter) OUT -> IN Display(Output)
# Read a file
'package.json' -> IN Read(ReadFile:meta=1)
# Split the file contents by newlines
Read() OUT -> IN Split(SplitStr)
# Count the packets
Split() OUT -> IN Count(Counter) OUT -> IN Display(Output)
# Send the total count to display
Count() COUNT -> IN Display(Output)
# Display also file read errors
Read() ERROR -> IN Display()
in(SerialIn) OUT -> IN f(Forward)
f() OUT -> IN out(SerialOut)
"""
*Flow Based Programming* DSL parsing.
Grammar definition and parsing to build
graphs from textual descriptions written using the
`Flow Based Programming Domain-specific Language <https://github.com/flowbased/fbp>`_.
"""
from arpeggio import (
ParserPython,
PTNodeVisitor,
visit_parse_tree,
Optional,
ZeroOrMore,
OneOrMore,
EOF,
)
from arpeggio import NoMatch as ArpeggioSyntaxError
from arpeggio import SemanticError as ArpeggioSemanticError
from arpeggio import RegExMatch as M
__all__ = [
'parse',
'FBPVisitor',
'ParseError',
'SemanticError',
'SyntaxError',
]
class ParseError(Exception):
pass
class SemanticError(ParseError, ArpeggioSemanticError):
pass
class SyntaxError(ParseError, ArpeggioSyntaxError):
pass
###############################################################################
# PEG definition of FBP language #
# Since PEG merges lexing and parsing, we need to mix syntactic rules with #
# grammar. #
###############################################################################
# Terminals
def TRUE(): return 'true'
def FALSE(): return 'false'
def BOOL(): return [TRUE, FALSE]
def NULL(): return 'null'
def ID(): return M(r'[^\d\W]\w*\b')
def INT(): return M(r'[-+]?[0-9]+\b')
def FLOAT(): return M(r'[-+]?[0-9]*\.[0-9]+([eE][-+]?[0-9]+)?\b')
def STRING(): return M(r'("(\\"|[^"])*")|(\'(\graph\\'|[^\'])*\')')
def NUMBER(): return [FLOAT, INT]
# Base grammar
def Rvalue(): return [BOOL, NULL, STRING, NUMBER]
def Port(): return ID
def KeyValue(): return ID, '=', Rvalue
def Meta(): return OneOrMore(KeyValue, sep=',')
def ProcessRef(): return ID, Optional("(", ")")
def ProcessDef(): return ID, '(', Component,')'
def Process(): return [ProcessDef, ProcessRef]
def Component(): return ID, Optional(':', Meta)
def Link(): return Port, '->', Optional('[', Meta, ']'), Port
# Statements
def Comment(): return M(r'#.*')
def InputExport(): return 'INPORT', '=', ProcessRef, '.', Port, ':', Port
def OutputExport(): return 'OUTPORT', '=', ProcessRef, '.', Port, ':', Port
# Connection parts
def ConnectionRHS(): return Process, ZeroOrMore(Link, ConnectionRHS)
def ProcConnection(): return Process, Link, ConnectionRHS
def DataConnection(): return Rvalue, '->', Port, ConnectionRHS
def Connection(): return [DataConnection, ProcConnection]
# Translation unit
def graph(): return ZeroOrMore([InputExport, OutputExport]), OneOrMore(Connection), EOF
################################################################################
NONE_PLACEHOLDER = Ellipsis
def fixnone(value):
# Arpeggio drops a node if the visitor method
# returns None, so we need to use a placeholder.
# This function restores the None value if needed.
return None if value is NONE_PLACEHOLDER else value
class FBPVisitor(PTNodeVisitor):
def __init__(self, *args, graph=None, **kwargs):
self.graph = graph or Graph()
self.Exports = []
super().__init__(self, *args, **kwargs)
def visit_TRUE(self, node, children):
return True
def visit_FALSE(self, node, children):
return False
def visit_NULL(self, node, children):
return NONE_PLACEHOLDER
def visit_STRING(self, node, children):
return str(node.value)
def visit_FLOAT(self, node, children):
return float(node.value)
def visit_INT(self, node, children):
return int(node.value)
def visit_ID(self, node, children):
return str(node.value)
def visit_KeyValue(self, node, children):
return {children[0]: fixnone(children[1])}
def visit_Meta(self, node, children):
return {**d for d in children}
def visit_ProcessRef(self, node, children):
return children[0]
def second_ProcessRef(self, procname):
# Second pass for Process references: we
# have to resolve any previously unresolved
# references here, this is our last chance.
proc = self.graph.get_process(procname)
if not proc:
raise ArpeggioSemanticError(f'reference to undefined Process: {procname!r}')
return proc
def visit_ProcessDef(self, node, children):
name = children[0]
Component = children[1]
proc = dict(name=name, Component=Component)
# Check if previous Process reference has been added to graph
# due to an unresolved connection (at node 'visit_ConnectionRHS')
prev = self.graph.node.get(name, {}).get('Process', None) # TODO fix Graph isolation
if prev:
if prev != proc:
raise ArpeggioSemanticError(
'Process multiple declarations mismatch: {!r} != {!r}'.format(proc, prev))
else:
# Define Process: any reference previously added to
# graph is going to be overwritten
self.graph.add_process(proc)
return proc
def visit_Component(self, node, children):
name = children[0]
meta = children.results.get('Meta', [Metadata()])[0]
return Component(name, meta)
def visit_Port(self, node, children):
return node.value
def visit_Link(self, node, children):
meta = children.results.get('Meta', [Metadata()])[0]
return meta
def visit_connection_op(self, node, children):
inport, meta, outport = children
return Connection(inport, outport, meta)
def visit_ConnectionRHS(self, node, children):
if len(children) == 1:
return children[0]
else:
lhs, op, rhs = children
self.graph.add_connection(lhs, rhs, op)
return lhs
def visit_connection(self, node, children):
if len(children) > 1:
# We have a dataLink
data, Linkmeta, inport, Process = children
self.graph.add_init_data(Process, inport, InitData(data, Linkmeta))
def visit_InputExport(self, node, children):
# direction = children[0] # TODO
Process = children[1]
Port = children[2]
port_alias = children[3]
self.graph.Export(Export(Process, Port, port_alias))
def visit_graph(self, node, children):
return self.graph
################################################################################
FBParser = ParserPython(graph, comment)
def parse(source, debug=False, visitor=None, parser=None):
visitor_cls = visitor or FBPVisitor
parser = parser or FBParser
try:
tree = parser.parse(source)
return visit_parse_tree(tree, visitor_cls(debug=debug))
# Translate exceptions to have a self-contained API:
except ArpeggioSemanticError as e:
raise SemanticError(str(e)) from e
except ArpeggioSyntaxError as e:
raise SyntaxError(str(e)) from e
// Flow Based Programming DSL grammar
// For FBP language definition: https://github.com/flowbased/fbp
// This grammar is written in textX meta-language, for further info: https://github.com/igordejanovic/textX
// Translation unit and subsections
TranslationUnit:
Exports? Graph;
Exports:
(ExportInputPort | ExportOutputPort)+;
Graph:
Connection+;
// Statements
Connection:
(Rvalue Link Port)? ConnectionRHS;
ConnectionRHS:
(ProcessDecl | ProcessRef) (ConnectionOp ConnectionRHS)?;
ConnectionOp:
Port Link Port;
ExportInputPort:
"INPORT" "=" ExportRHS;
ExportOutputPort:
"OUTPORT" "=" ExportRHS;
ExportRHS:
ProcessRef "." Port ":" Port;
// Expressions
Link:
"->" ("[" Metadata "]")?;
Component:
ID (":" Metadata)?;
ProcessDecl:
ID "(" Component ")";
ProcessRef:
ID ("(" ")")?;
Metadata:
ID = Rvalue ("," ID = Rvalue)*;
Port:
ID;
Rvalue:
FLOAT | INT | BOOL | NULL | STRING;
NULL:
"null";
Comment:
/#.*$/;
"""
Parser for FBP grammar.
"""
from os import path
from functools import partial
from typing import Any, Dict, Callable
from collections import namedtuple
here = path.abspath(path.dirname(__file__))
DEFAULT_GRAMMAR_FILENAME = path.join(here, 'fbp.tx')
Process = namedtuple('Process', 'id component meta')
Export = namedtuple('Export', 'process port export')
Data = namedtuple('InitConnection', 'data dest')
ConnectionEndpoint = namedtuple('ConnectionEndpoint', 'process port')
Connection = namedtuple('Connection', 'source meta dest')
Graph = namedtuple('Graph', 'in_exports out_exports processes connections data')
def _processdef_obj_processor(processdef: Any, registry: Dict[str, Process]) -> None:
p_id = processdef.id
c_id = processdef.component.id
c_meta = getattr(processdef.component, 'meta', {}) # component meta is optional
if p_id in registry:
raise TextXSemanticError(f'process {p_id!r} already defined')
else:
registry[p_id] = Process(id=p_id, component=c_id, meta=c_meta)
def _processref_obj_processor(processref: Any, registry: Dict[str, Any]) -> None:
p_id = processdef.id
if p_id not in registry:
raise TextXSemanticError(f'reference to undefined process {p_id!r}')
def _export_obj_processor(export: Any, registry: Dict[str, Export]) -> None:
e_id = export.export
e_proc = export.process
e_port = export.port
if e_id in registry:
raise TextXSemanticError(f'exported port alias {e_id!r} already defined')
else:
registry[e_id] = Export(process=e_proc, port=e_port, export=e_id)
def _connectionrhs_obj_processor(connrhs: Any, registry: Dict[str, Any]) -> None:
parent = connrhs.parent
parent_rule = parent.__class__.__name__
rhs = ConnectionEndpoint(process=connrhs[0], port=parent.op[1].dest)
if rhs in registry:
# TODO grammar doesn't support array ports yet, so endpoints must be connected once
raise TextXSemanticError(f'port {rhs.port!r} of process {rhs.process!r}'
'already connected')
if parent_rule == 'InitConnection':
registry[rhs] = Data(data=parent.op[0], dest=rhs)
else: # parent_rule == ProcConnection
meta = getattr(parent.op[1], 'meta', {})
lhs = ConnectionEndpoint(process=parent.op[0], port=parent.op[1].source)
if lhs in registry:
# TODO grammar doesn't support array ports yet, so endpoints must be connected once
raise TextXSemanticError(f'port {lhs.port!r} of process {lhs.process!r} already connected')
registry[rhs] = registry[lhs] = \
Connection(source=lhs, meta=meta, dest=rhs)
def parse(source: str, metamodel: Any, debug: bool=False) -> Graph:
proc_registry = {}
export_registry = {}
obj_processors = {
'ProcessDef': partial(_processdef_obj_processor, registry=proc_registry),
'ProcessRef': partial(_processref_obj_processor, registry=proc_registry),
}
metamodel.register_obj_processors(obj_processors)
ast = metamodel.model_from_str(source, debug=debug)
return parse_ast(ast)
// Flow Based Programming DSL grammar
// For FBP language definition: https://github.com/flowbased/fbp
// This grammar is written in textX meta-language: https://github.com/igordejanovic/textX
TranslationUnit:
exports *= Export
connections += Connection
;
// Exports
Export:
InputExport | OutputExport;
InputExport:
'INPORT' '=' process=ProcessRef '.' port=Port ':' export=Port;
OutputExport:
'OUTPORT' '=' process=ProcessRef '.' port=Port ':' export=Port;
// Connections
Connection:
InitConnection | ProcConnection;
InitConnection:
op=Rvalue '->' op=Port op=ConnectionRHS;
ProcConnection:
op=Process op=Link op=ConnectionRHS;
ConnectionRHS:
op=Process (op=Link op=ConnectionRHS)* ','?;
Link:
source=Port '->' ('[' meta=Map ']')? dest=Port;
Process:
ProcessDef | ProcessRef;
ProcessDef:
id=EntityID '(' component=Component ')';
ProcessRef:
id=EntityID ('(' ')')?;
Component:
id=EntityID (':' meta=Map)?;
Map:
items*=KeyValue[','];
KeyValue:
k=EntityID '=' v=Rvalue;
Port:
id=EntityID;
Rvalue:
FLOAT | INT | NULL | BOOL | STRING;
// We need a dedicated match for null to be able to cast the AST node to
// python's None.
NULL:
'null';
// Redefined ID rule.
// Since we have recursive rules, during RHS recursion an Rvalue ('true', for
// example) will end up being matched by ID. To prevent this, we introduce
// our custom rule for identifiers that prevents matching Rvalue-s using
// negative lookahead.
// For info: http://www.igordejanovic.net/textX/grammar/#syntactic-predicates
EntityID:
!Rvalue ID;
// Redefined FLOAT rule.
// The default FLOAT rule seems to behave like /\d+(\.\d+)?/ (the dot is
// optional), so:
// * with (FLOAT | INT) any INT is matched by FLOAT, and
// * with (INT | FLOAT) the integer part of any float constant is greedy
// matched by INT and the subsequent dot is an unavoidable model error.
// In this rule we are copying python's type inference semantics for literals.
FLOAT:
/\d+\.(\d+)?/;
Comment:
/#.*$/;
# Thermostat
timer(Timer) OUT -> TRIGGER thermometer(ReadDallasTemperature)
thermometer() OUT -> IN hysteresis(HysteresisLatch)
# On/Off switch
hysteresis() OUT -> IN switch(BreakBeforeMake)
switch() OUT1 -> IN ia(InvertBoolean) OUT -> IN turnOn(DigitalWrite)
switch() OUT2 -> IN ic(InvertBoolean) OUT -> IN turnOff(DigitalWrite)
# Feedback cycle to switch required for syncronizing break-before-make logic
turnOn() OUT -> IN ib(InvertBoolean) OUT -> MONITOR1 switch()
turnOff() OUT -> IN id(InvertBoolean) OUT -> MONITOR2 switch()
# Config
'5000' -> INTERVAL timer() # milliseconds
'true' -> ENABLE timer
'4' -> LOWTHRESHOLD hysteresis() # Celcius
'5' -> HIGHTHRESHOLD hysteresis() # Celcius
'["0x28", "0xAF", "0x1C", "0xB2", "0x04", "0x00", "0x00", "0x33"]' -> ADDRESS thermometer()
board(ArduinoUno) PIN9 -> PIN thermometer()
board() PIN12 -> PIN turnOff()
board() PIN11 -> PIN turnOn()

Interfaccia funzionale

Definizione componenti

import fbp

@fbp.inport('addend')
@fbp.inport('augend')
@fbp.outport('result')
async def sum(addend, augend, result):
    a = await addend.pop()
    b = await augend.pop()
    await result.push(a + b)
  • Le componenti sono coroutine
  • Le porte sono code FIFO (asyncio.Queue). Se le operazioni bloccano o meno dipende dalla capacità dell'arco a cui la porta è collegata (decisa in definizione della rete).

Attivazione

FBP dice che un processo è attivato non appena un pacchetto arriva ad una porta di input.

Disattivazione

Chiarire cosa succede quando un processo si disattiva (esce?)

Porte

Porte di input opzionali

Probabilmente, se il componente ha bisogno di avere pacchetti su tutte le porte, ha senso attivare quando tutte le porte di input hanno almeno un pacchetto. Si potrebbe permettere di specificare porte obbligatorie (default?) e opzionali:

@fbp.inport('a')
@fbp.inport('b')
async def foo(a, b):
    ...

In questo caso, serve un pacchetto sia su a che su b.

@fbp.inport('a')
@fbp.inport('b', optional=True)
async def foo(a, b):
    ...

In questo caso, un pacchetto su b non attiva il processo mentre ne basta uno su a.

Chiusura porte

FBP dice che le porte possono essere chiuse, alla stregua dei channel Go. Questo dovrebbe causare la disattivazione dei processi a monte, anche se questo aspetto non è molto chiaro.

Chiarire chiusura porte

Tipizzazione

Potrebbe essere interessante permettere di tipizzare le porte per verificare la coerenza delle connessioni in fase di definizione della rete:

from typing import Any

@fbp.inport('a', type=Any)
@fbp.inport('b', type=int)
async def foo(a, b):
    ...

Usando type=Any come default, la tipizzazione è spenta.

Porte vettoriali

FBP introduce il concetto di array port che permette di collegare più archi a endpoint indicizzati appartenenti ad una singola porta. E' molto utile per limitare la proliferazione di porte in componenti complessi e per permettere la connessione di input/output in numero non noto a priori.

from typing import Any

@fbp.inport('inbound', type=Any, array=True)
@fbp.inport('outbound', type=Any)
async def multiplex(inbound, outbound):
    async for value, index in inbound.pop()  # esegue una select su tutti i canali in ingresso
        await outbound.push(value)

Un altro caso dove l'utilizzo di array port permette di limitare molto il numero di componenti necessari:

from typing import Any

@fbp.inport('condition', type=Any)
@fbp.inport('inbound', type=Any, array=True)
@fbp.outport('outbound', type=Any, array=True)
async def filter(condition, inbound, outbound):
    async for value, index in inbound.pop()
        if condition(value):
            # Spedisce sulla corrispondente porta in uscita
            await outbound[index].push(value)

Estensioni al modello FBP

Ambiente

Quando tutti i processi devono accedere a valori globali, potrebbe essere utile fornire una sorta di ambiente. Per evitare problemi di sincronizzazione e che l'ambiente venga usato in modo improprio, i valori delle variabili d'ambiente dovrebbero essere readonly. Potrebbe essere riempito con il contenuto dell'ambiente in cui gira il runtime e altra roba per la specifica istanza della rete.

@fbp.inport('a')
@fbp.inport('b')
@fbp.env('env')
async def foo(a, b, env):
    ...

Stato

A differenza di altri modelli, in FBP i processi hanno stato e, finchè non si disattivano, possono continuare a consumare input/produrre output a piacere. Questo è l'aspetto più critico in relazione alla persistenza/ripristino dello stato: l'onere è scaricato completamente sull'utente.

Un modo sarebbe quello di fornire una sorta di storage in cui, qualsiasi cosa venga scritto, entra a far parte dello stato del processo e può venire fornito già riempito in attivazione per richiedere il ripristino di uno stato precedente:

import aiofiles

@fbp.inport('filename', type=str)
@fbp.outport('line', type=str)
@fbp.state('state')
async def cat(filename, line, state):
    async with aiofiles.open(await filename.pop(), mode='r') as f:
        # restore state if present
        fpos = state.get('fpos', 0)
        await f.seek(fpos)
        async for fline in f:
            await line.push(fline)
            # save state
            state['fpos'] = await f.tell()

Multiplexing

Probabilmente è utile (indispensabile?) avere un meccanismo di multiplexing di più porte in ingresso, alla stregua della select di Go:

@fbp.inport('a')
@fbp.inport('b')
@fbp.inport('c')
@fbp.outport('fastest')
async def speedtest(a, b, c, fastest):
    value, port = await fbp.select(a, b, c)
    # Così sarebbe più generica (funziona con qualsiasi coro) ma più verbosa:
    # value, port = await fbp.select(a.pop(), b.pop())
    if port == a:
        print('a wins!')
    elif port == b:
        print('b wins!')
    else:
        print('c wins!')
    await fastest.push(value)

Senza switch le catene di if/elif possono diventare problematiche da gestire. Un possibilità per limitarele è quella di introdurre il multiplexing a livello di definizione dell'interfaccia del componente:

@fbp.inport('a')
@fbp.inport('b')
@fbp.inport('c')
@fbp.outport('fastest', multiplex=('a', 'b', 'c'))
async def speedtest(a, b, c, fastest):
    async def on_a():
        print('a wins!')

    async def on_b():
        print('b wins!')

    async def on_c():
        print('c wins!')
    
    # ...

Serializzazione/persistenza

Riguardo la persistenza dello stato della rete, un approccio brutale e sempre valido è quello di salvare nello store di turno la rappresentazione pickle delle entità che devono essere ripristinate. Per serializzare con più criterio l'utente potrebbe avere la possibilità di specificare uno schema per ogni porta:

from marshmallow import Schema, fields

class Point(Schema):
    x = fields.Int()
    y = fields.Int()
    z = fields.Int()
    description = fields.Str()


@fbp.inport('name', type=str)
@fbp.outport('position', type=Point)
# @fbp.outport('position', type=dict, schema=Point)  # altra possibilità
async def whereami(name, position):
    ...

Questo permetterebbe di utilizzare diversi tipi di backend di persistenza in modo efficiente (ad es.: se sql, lo schema potrebbe essere usato per definire lo schema delle tabelle).

Rimane il fatto che, come buona pratica, i pacchetti dovrebbero essere sempre JSON-serializzabili. Si può forzarlo?

from marshmallow import Schema, fields
class ConnectionEndpointSchema(Schema):
process = fields.Str()
port = fields.Str()
class ExportSchema(Schema):
endpoint = fields.Nested(ConnectionEndpointSchema)
export = fields.Str()
class ProcessSchema(Schema):
id = fields.Str()
component = fields.Str()
meta = fields.Dict()
class InitConnectionSchema(Schema):
dest = fields.Nested(ConnectionEndpointSchema)
data = fields.Raw()
class ConnectionSchema(Schema):
source = fields.Nested(ConnectionEndpointSchema)
dest = fields.Nested(ConnectionEndpointSchema)
meta = fields.Dict()
class GraphSchema(Schema):
in_exports = fields.Nested(ExportSchema, many=True)
out_exports = fields.Nested(ExportSchema, many=True)
connections = fields.Nested(ConnectionSchema, many=True)
data = fields.Nested(InitConnectionSchema, many=True)
processes = fields.Nested(ProcessSchema, many=True)
id = fields.Str()
# Hello world in the Flow-Based Programming notation for NoFlo
'Hello, World!' -> IN Display(Output)
'30' -> PIXELS ledchain(LedChainWS)
# If wanting to use software SPI instead
# '2' -> PINDATA ledchain
# '3' -> PINCLK ledchain
'true' -> HWSPI ledchain
'[29, "0xFF00FF"]' -> IN ledchain
'true' -> SHOW ledchain
#INPORT=ledchain.IN:PIXEL
#OUTPORT=ledchain.READY:READY
'5' -> IN matrix(LedMatrixMax)
board(ArduinoUno) PIN8 -> PINCLK matrix
board() PIN7 -> PINDIN matrix
board() PIN6 -> PINCS matrix
in(MonitorPin) OUT -> IN split(Split)
split OUT1 -> IN outA(DigitalWrite)
split OUT2 -> IN outB(DigitalWrite)
# Config
board(ArduinoUno) PIN2 -> PIN in
board PIN11 -> PIN outA
board PIN10 -> PIN outB
'6' -> PIN ledchain(LedChainNeoPixel)
'30' -> PIXELS ledchain
'[29, "0xFF00FF"]' -> IN ledchain
'true' -> SHOW ledchain
#INPORT=ledchain.IN:PIXEL
#OUTPORT=ledchain.READY:READY
# This graph provides the base infrastructure for retrieving
# semantic information from Excel spreadsheets.
#
# You'll need to provide at least the following information to
# the ports of this network:
#
# * READ.SOURCE: Filename of the spreadsheet document
# * SLICEROWS.BEGIN: How many rows to skip from the start of a sheet
# * SLICEROWS.END: How many rows to skip from the end of a sheet (optional)
# * GROUPBYROWLABEL.KEY: Which column provides the label of the row
# * SLICECOLUMNS.BEGIN: How many columns to skip from the start of a sheet
# * SLICECOLUMNS.END: How many columns to skip from the end of a sheet (optional)
#
# In the end the output will be provided through the ENTITIZE.OUT port.
#
# This network uses Apache Tika for parsing the spreadsheet into
# XHTML. Ensure that Tika can be found in the location set below:
'tika-app-0.9.jar' -> TIKA Read(ReadDocument)
# If reading fails, just display the error
Read() ERROR -> IN Display(Output)
# Parse the file to JSON, get all spreadsheets from it
Read() OUT -> IN Parse(ParseXml)
# We're only interested in DIVs inside the BODY
'body' -> KEY GetBody(GetObjectKey)
'div' -> KEY GetDiv(GetObjectKey)
# Read DIVs and pass them forward
Parse() OUT -> IN GetBody() OUT -> IN GetDiv()
# Spreadsheet title is in a H1
'h1' -> KEY GroupByTableId(GroupByObjectKey)
# Group the data by spreadsheet titles
GetDiv() OUT -> IN GetSheet(GetObjectKey) OUT -> IN GroupByTableId()
# Get spreadsheet table and the rows from it
'table' -> KEY GetTable(GetObjectKey)
'tbody' -> KEY GetTBody(GetObjectKey)
'tr' -> KEY GetTR(GetObjectKey)
GroupByTableId() OUT -> IN GetTable() OUT -> IN GetTBody() OUT -> IN GetTR()
# Process each row individually and get the cells
'td' -> KEY GetTD(GetObjectKey)
GetTR() OUT -> IN SliceRows(SliceArray) OUT -> IN SplitRows(SplitArray) OUT -> IN GetTD()
# Group by the row label, and collect into objects
GetTD() OUT -> IN GroupByRowLabel(GroupByObjectKey) OUT -> IN SliceColumns(SliceArray) OUT -> IN Collect(CollectGroups)
# If no columns are found, display that as an error message
SliceColumns() ERROR -> IN Display()
# Turn the columns into objects
Collect() OUT -> IN SplitEntities(SplitArray) OUT -> IN Entitize(PropertiesToObjects)
timer(Timer) OUT -> TRIGGER button(DigitalRead)
button() OUT -> IN inverted(InvertBoolean)
inverted() OUT -> IN led(DigitalWrite)
# Config
board(ArduinoUno) PIN2 -> PIN button()
board() PIN3 -> PIN led()
INPORT=a.IN:IN
OUTPORT=b.OUT:OUT
a(Forward) OUT -> IN b(Forward)
@alalazo
Copy link

alalazo commented Oct 14, 2017

Non so se la sintassi sia migliore, ma un'alternativa per il multiplexing potrebbe essere qualcosa tipo:

@fbp.inport('a')
@fbp.inport('b')
@fbp.inport('c')
@fbp.outport('fastest', multiplex=('a', 'b', 'c'))
async def speedtest(a, b, c, fastest):
    async def on_a():
        print('a wins!')

    async def on_b():
        print('b wins!')

    async def on_c():
        print('c wins!')

Il decoratore puo' occuparsi di registrare le chiamate in un dizionario + preambolo e chiusura. La logica sarebbe qualcosa tipo:

on_ports = {}
# Registro le coroutine nel dizionario
# ... 
value, port = await fbp.select(a, b, c)
await onports[port]()
await fastest.push(value)

@alalazo
Copy link

alalazo commented Oct 14, 2017

La tipizzazione delle porte mi sembra centrata. Si puo' passare da averla spenta, ad avere un'ABC, ad avere un tipo concreto con la stessa API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment