Skip to content

Instantly share code, notes, and snippets.

@alalazo
Forked from nazavode/.gitignore
Created October 16, 2017 18:06
Show Gist options
  • Save alalazo/88a1f9cd4d077e7ccb86d9015d043010 to your computer and use it in GitHub Desktop.
Save alalazo/88a1f9cd4d077e7ccb86d9015d043010 to your computer and use it in GitHub Desktop.
# Set LED output propotional to input
timer(Timer) OUT -> TRIGGER potmeter(AnalogRead)
potmeter() OUT -> IN map(MapLinear) OUT -> DUTYCYCLE led(PwmWrite)
# Config
'10' -> INTERVAL timer
'0' -> INMIN map(), '1023' -> INMAX map()
'0' -> OUTMIN map(), '100' -> OUTMAX map()
board(ArduinoUno) PINA0 -> PIN potmeter()
board() PIN10 -> PIN led()
#timer(Timer) OUT -> IN toggle(ToggleBoolean) OUT -> IN split(Split), '1000' -> INTERVAL timer
timer(Timer) OUT -> TRIGGER read(DigitalRead) OUT -> IN split(Split), '5' -> INTERVAL timer
# 'true' -> IN split(Split)
board(ATUSBKEY) PORTA0 -> PIN outA(DigitalWrite), split OUT1 -> IN outA
board PORTA1 -> PIN outB(DigitalWrite), split OUT2 -> IN outB
board PORTA2 -> PIN outC(DigitalWrite), split OUT3 -> IN outC
board PORTA3 -> PIN outD(DigitalWrite), split OUT4 -> IN outD
board PORTA4 -> PIN outE(DigitalWrite), split OUT5 -> IN outE
board PORTA5 -> PIN outF(DigitalWrite), split OUT6 -> IN outF
board PORTA6 -> PIN outG(DigitalWrite), split OUT7 -> IN outG
board PORTA7 -> PIN outH(DigitalWrite), split OUT8 -> IN outH
board PORTF0 -> PIN read
in(ReadCapacitivePin) OUT -> IN outA(DigitalWrite)
timer(Timer) OUT -> TRIGGER in
# Config
'60' -> THRESHOLD in
'10' -> INTERVAL timer
board(ArduinoUno) PIN2 -> PIN in
board PIN13 -> PIN outA
#!/usr/bin/env python
import time
import argparse
import subprocess
import contextlib
from os import path
from textx.metamodel import metamodel_from_file
from textx.export import model_export
PARSER = argparse.ArgumentParser(description='parse and generate graphs from FBP sources.')
PARSER.add_argument('fbp_sources', metavar='INFILE', type=str, nargs='+',
help='path to the FBP source file')
PARSER.add_argument('--debug', dest='debug', action='store_true',
help='enable debug mode')
HERE = path.abspath(path.dirname(__file__))
METAMODEL_FILENAME = path.join(HERE, 'fbp.tx')
@contextlib.contextmanager
def timeit(msg):
t = time.perf_counter()
yield
t = time.perf_counter() - t
print(f"{msg} [{t*1000} ms]")
if __name__ == '__main__':
args = PARSER.parse_args()
with timeit(f"built metamodel from grammar {METAMODEL_FILENAME!r}"):
metamodel = metamodel_from_file(METAMODEL_FILENAME)
for fbp_filename in args.fbp_sources:
model_filename = path.abspath(fbp_filename)
dot_filename = model_filename + '.dot'
png_filename = model_filename + '.png'
with timeit(f"built ast from source {model_filename!r}"):
ast = metamodel.model_from_file(model_filename, debug=args.debug)
with timeit(f"rendered graph to {png_filename!r}"):
model_export(ast, dot_filename)
subprocess.run(['dot', '-Tpng', '-o', png_filename, dot_filename])
subprocess.run(['rm', '-f', dot_filename])
# Added exports to original example source
INPORT = Read.IN:INPUTFILE
OUTPORT = Display.OUT:DISPLAY
'package.json' -> IN Read OUT -> IN Count(Counter) OUT -> IN Display(Output)
# Read a file
'package.json' -> IN Read(ReadFile:meta=1)
# Split the file contents by newlines
Read() OUT -> IN Split(SplitStr)
# Count the packets
Split() OUT -> IN Count(Counter) OUT -> IN Display(Output)
# Send the total count to display
Count() COUNT -> IN Display(Output)
# Display also file read errors
Read() ERROR -> IN Display()
in(SerialIn) OUT -> IN f(Forward)
f() OUT -> IN out(SerialOut)
"""
*Flow Based Programming* DSL parsing.
Grammar definition and parsing to build
graphs from textual descriptions written using the
`Flow Based Programming Domain-specific Language <https://github.com/flowbased/fbp>`_.
"""
from arpeggio import (
ParserPython,
PTNodeVisitor,
visit_parse_tree,
Optional,
ZeroOrMore,
OneOrMore,
EOF,
)
from arpeggio import NoMatch as ArpeggioSyntaxError
from arpeggio import SemanticError as ArpeggioSemanticError
from arpeggio import RegExMatch as M
__all__ = [
'parse',
'FBPVisitor',
'ParseError',
'SemanticError',
'SyntaxError',
]
class ParseError(Exception):
pass
class SemanticError(ParseError, ArpeggioSemanticError):
pass
class SyntaxError(ParseError, ArpeggioSyntaxError):
pass
###############################################################################
# PEG definition of FBP language #
# Since PEG merges lexing and parsing, we need to mix syntactic rules with #
# grammar. #
###############################################################################
# Terminals
def TRUE(): return M(r'(True|true|1)\b')
def FALSE(): return M(r'(False|false|0)\b')
def BOOL(): return [TRUE, FALSE]
def NULL(): return 'null'
def ID(): return M(r'[^\d\W]\w*\b')
def INT(): return M(r'[-+]?[0-9]+\b')
def FLOAT(): return M(r'[-+]?[0-9]*\.[0-9]+([eE][-+]?[0-9]+)?\b')
def STRING(): return M(r'("(\\"|[^"])*")|(\'(\\\'|[^\'])*\')')
def NUMBER(): return [FLOAT, INT]
# Base grammar
def rvalue(): return [BOOL, NULL, STRING, NUMBER]
def port(): return ID
def kwvalue(): return ID, '=', rvalue
def metadata(): return ZeroOrMore(kwvalue, sep=',')
def process_ref(): return ID, Optional("(", ")")
def process_def(): return ID, '(', component,')'
def component(): return ID, Optional(':', metadata)
def link(): return '->', Optional('[', metadata, ']')
# Statements
def comment(): return M(r'#.*')
def export(): return ['INPORT', 'OUTPORT'], '=', process_ref, '.', port, ':', port
# Connection parts
def connection_op(): return port, link, port
def connection_rhs(): return [process_def, process_ref], Optional(connection_op, connection_rhs)
def connection(): return Optional(rvalue, link, port), connection_rhs
# Translation unit
def graph(): return ZeroOrMore(export), OneOrMore(connection), EOF
################################################################################
NONE_PLACEHOLDER = Ellipsis
def fixnone(value):
# Arpeggio drops a node if the visitor method
# returns None, so we need to use a placeholder.
# This function restores the None value if needed.
return None if value is NONE_PLACEHOLDER else value
class FBPVisitor(PTNodeVisitor):
def __init__(self, *args, graph=None, **kwargs):
self.graph = graph or Graph()
self.exports = []
super().__init__(self, *args, **kwargs)
def visit_TRUE(self, node, children):
return True
def visit_FALSE(self, node, children):
return False
def visit_NULL(self, node, children):
return NONE_PLACEHOLDER
def visit_STRING(self, node, children):
return str(node.value)
def visit_FLOAT(self, node, children):
return float(node.value)
def visit_INT(self, node, children):
return int(node.value)
def visit_ID(self, node, children):
return str(node.value)
def visit_kwvalue(self, node, children):
return {children[0]: fixnone(children[1])}
def visit_metadata(self, node, children):
return {k: fixnone(v) for x in children for k, v in x.items()}
def visit_process_ref(self, node, children):
return children[0]
def second_process_ref(self, procname):
# Second pass for process references: we
# have to resolve any previously unresolved
# references here, this is out last chance.
proc = self.graph.get_process(procname)
if not proc:
raise ArpeggioSemanticError(f'reference to undefined process: {procname!r}')
return proc
def visit_process_def(self, node, children):
name = children[0]
component = children[1]
proc = dict(name=name, component=component)
# Check if previous process reference has been added to graph
# due to an unresolved connection (at node 'visit_connection_rhs')
prev = self.graph.node.get(name, {}).get('process', None) # TODO fix Graph isolation
if prev:
if prev != proc:
raise ArpeggioSemanticError(
'process multiple declarations mismatch: {!r} != {!r}'.format(proc, prev))
else:
# Define process: any reference previously added to
# graph is going to be overwritten
self.graph.add_process(proc)
return proc
def visit_component(self, node, children):
name = children[0]
meta = children.results.get('metadata', [Metadata()])[0]
return Component(name, meta)
def visit_port(self, node, children):
return node.value
def visit_link(self, node, children):
meta = children.results.get('metadata', [Metadata()])[0]
return meta
def visit_connection_op(self, node, children):
inport, meta, outport = children
return Connection(inport, outport, meta)
def visit_connection_rhs(self, node, children):
if len(children) == 1:
return children[0]
else:
lhs, op, rhs = children
self.graph.add_connection(lhs, rhs, op)
return lhs
def visit_connection(self, node, children):
if len(children) > 1:
# We have a datalink
data, linkmeta, inport, process = children
self.graph.add_init_data(process, inport, InitData(data, linkmeta))
def visit_export(self, node, children):
# direction = children[0] # TODO
process = children[1]
port = children[2]
port_alias = children[3]
self.graph.export(Export(process, port, port_alias))
def visit_graph(self, node, children):
return self.graph
################################################################################
FBParser = ParserPython(graph, comment)
def parse(source, debug=False, visitor=None, parser=None):
visitor_cls = visitor or FBPVisitor
parser = parser or FBParser
try:
tree = parser.parse(source)
return visit_parse_tree(tree, visitor_cls(debug=debug))
# Translate exceptions to have a self-contained API:
except ArpeggioSemanticError as e:
raise SemanticError(str(e)) from e
except ArpeggioSyntaxError as e:
raise SyntaxError(str(e)) from e
// Flow Based Programming DSL grammar
// For FBP language definition: https://github.com/flowbased/fbp
// This grammar is written in textX meta-language, for further info: https://github.com/igordejanovic/textX
// Translation unit and subsections
TranslationUnit:
Exports? Graph;
Exports:
(ExportInputPort | ExportOutputPort)+;
Graph:
Connection+;
// Statements
Connection:
(Rvalue Link Port)? ConnectionRHS;
ConnectionRHS:
(ProcessDecl | ProcessRef) (ConnectionOp ConnectionRHS)?;
ConnectionOp:
Port Link Port;
ExportInputPort:
"INPORT" "=" ExportRHS;
ExportOutputPort:
"OUTPORT" "=" ExportRHS;
ExportRHS:
ProcessRef "." Port ":" Port;
// Expressions
Link:
"->" ("[" Metadata "]")?;
Component:
ID (":" Metadata)?;
ProcessDecl:
ID "(" Component ")";
ProcessRef:
ID ("(" ")")?;
Metadata:
ID = Rvalue ("," ID = Rvalue)*;
Port:
ID;
Rvalue:
FLOAT | INT | BOOL | NULL | STRING;
NULL:
"null";
Comment:
/#.*$/;
"""
Parser for FBP grammar.
"""
from os import path
from functools import partial
from typing import Any, Dict, Callable
from collections import namedtuple
here = path.abspath(path.dirname(__file__))
DEFAULT_GRAMMAR_FILENAME = path.join(here, 'fbp.tx')
Process = namedtuple('Process', 'id component meta')
Export = namedtuple('Export', 'process port export')
Data = namedtuple('InitConnection', 'data dest')
ConnectionEndpoint = namedtuple('ConnectionEndpoint', 'process port')
Connection = namedtuple('Connection', 'source meta dest')
Graph = namedtuple('Graph', 'in_exports out_exports processes connections data')
def _processdef_obj_processor(processdef: Any, registry: Dict[str, Process]) -> None:
p_id = processdef.id
c_id = processdef.component.id
c_meta = getattr(processdef.component, 'meta', {}) # component meta is optional
if p_id in registry:
raise TextXSemanticError(f'process {p_id!r} already defined')
else:
registry[p_id] = Process(id=p_id, component=c_id, meta=c_meta)
def _processref_obj_processor(processref: Any, registry: Dict[str, Any]) -> None:
p_id = processdef.id
if p_id not in registry:
raise TextXSemanticError(f'reference to undefined process {p_id!r}')
def _export_obj_processor(export: Any, registry: Dict[str, Export]) -> None:
e_id = export.export
e_proc = export.process
e_port = export.port
if e_id in registry:
raise TextXSemanticError(f'exported port alias {e_id!r} already defined')
else:
registry[e_id] = Export(process=e_proc, port=e_port, export=e_id)
def _connectionrhs_obj_processor(connrhs: Any, registry: Dict[str, Any]) -> None:
parent = connrhs.parent
parent_rule = parent.__class__.__name__
rhs = ConnectionEndpoint(process=connrhs[0], port=parent.op[1].dest)
if rhs in registry:
# TODO grammar doesn't support array ports yet, so endpoints must be connected once
raise TextXSemanticError(f'port {rhs.port!r} of process {rhs.process!r}'
'already connected')
if parent_rule == 'InitConnection':
registry[rhs] = Data(data=parent.op[0], dest=rhs)
else: # parent_rule == ProcConnection
meta = getattr(parent.op[1], 'meta', {})
lhs = ConnectionEndpoint(process=parent.op[0], port=parent.op[1].source)
if lhs in registry:
# TODO grammar doesn't support array ports yet, so endpoints must be connected once
raise TextXSemanticError(f'port {lhs.port!r} of process {lhs.process!r} already connected')
registry[rhs] = registry[lhs] = \
Connection(source=lhs, meta=meta, dest=rhs)
def parse(source: str, metamodel: Any, debug: bool=False) -> Graph:
proc_registry = {}
export_registry = {}
obj_processors = {
'ProcessDef': partial(_processdef_obj_processor, registry=proc_registry),
'ProcessRef': partial(_processref_obj_processor, registry=proc_registry),
}
metamodel.register_obj_processors(obj_processors)
ast = metamodel.model_from_str(source, debug=debug)
return parse_ast(ast)
import asyncio
import inspect
import typing
T = typing.TypeVar('T')
class Inport(typing.Generic[T]):
optional = False
def __init__(self):
self.queue = asyncio.Queue()
async def pop(self) -> T:
result = await self.queue.get()
return result
def empty(self):
return self.queue.empty()
class Outport(typing.Generic[T]):
def __init__(self):
self.queue = asyncio.Queue()
async def push(self, item: T):
await self.queue.put(item)
class Component:
def __init__(self, coro, inports, outports):
self.coro = coro
self.inports = inports
self.outports = outports
self._terminate = False
def configure_ports(self, ports, **kwargs):
for x in ports:
for k, v in kwargs.items():
setattr(self.inports[x], k, v)
async def __call__(self):
while True:
# If all the inports have values, activate the coroutine
if all(not x.queue.empty() for x in self.inports.values() if not x.optional):
kwargs = {k: v for k, v in self.inports.items()}
kwargs.update(self.outports)
await self.coro(**kwargs)
continue
# Sleep for a while
print("Waiting...")
await asyncio.sleep(1.0)
if self._terminate:
print("Closing component")
break
async def terminate(self):
print("Scheduling termination")
self._terminate = True
def component():
def decorator(coro):
s = inspect.signature(coro)
inports = {
k: Inport() for k, v in s.parameters.items() if issubclass(v.annotation, Inport)
}
outports = {
k: Outport() for k, v in s.parameters.items() if issubclass(v.annotation, Outport)
}
return Component(coro, inports, outports)
return decorator
// Flow Based Programming DSL grammar
// For FBP language definition: https://github.com/flowbased/fbp
// This grammar is written in textX meta-language: https://github.com/igordejanovic/textX
TranslationUnit:
exports *= Export
connections += Connection
;
// Exports
Export:
InputExport | OutputExport;
InputExport:
'INPORT' '=' process=ProcessRef '.' port=Port ':' export=Port;
OutputExport:
'OUTPORT' '=' process=ProcessRef '.' port=Port ':' export=Port;
// Connections
Connection:
InitConnection | ProcConnection;
InitConnection:
op=Rvalue '->' op=Port op=ConnectionRHS;
ProcConnection:
op=Process op=Link op=ConnectionRHS;
ConnectionRHS:
op=Process (op=Link op=ConnectionRHS)* ','?;
Link:
source=Port '->' ('[' meta=Map ']')? dest=Port;
Process:
ProcessDef | ProcessRef;
ProcessDef:
id=EntityID '(' component=Component ')';
ProcessRef:
id=EntityID ('(' ')')?;
Component:
id=EntityID (':' meta=Map)?;
Map:
items*=KeyValue[','];
KeyValue:
k=EntityID '=' v=Rvalue;
Port:
id=EntityID;
Rvalue:
FLOAT | INT | NULL | BOOL | STRING;
// We need a dedicated match for null to be able to cast the AST node to
// python's None.
NULL:
'null';
// Redefined ID rule.
// Since we have recursive rules, during RHS recursion an Rvalue ('true', for
// example) will end up being matched by ID. To prevent this, we introduce
// our custom rule for identifiers that prevents matching Rvalue-s using
// negative lookahead.
// For info: http://www.igordejanovic.net/textX/grammar/#syntactic-predicates
EntityID:
!Rvalue ID;
// Redefined FLOAT rule.
// The default FLOAT rule seems to behave like /\d+(\.\d+)?/ (the dot is
// optional), so:
// * with (FLOAT | INT) any INT is matched by FLOAT, and
// * with (INT | FLOAT) the integer part of any float constant is greedy
// matched by INT and the subsequent dot is an unavoidable model error.
// In this rule we are copying python's type inference semantics for literals.
FLOAT:
/\d+\.(\d+)?/;
Comment:
/#.*$/;
# Thermostat
timer(Timer) OUT -> TRIGGER thermometer(ReadDallasTemperature)
thermometer() OUT -> IN hysteresis(HysteresisLatch)
# On/Off switch
hysteresis() OUT -> IN switch(BreakBeforeMake)
switch() OUT1 -> IN ia(InvertBoolean) OUT -> IN turnOn(DigitalWrite)
switch() OUT2 -> IN ic(InvertBoolean) OUT -> IN turnOff(DigitalWrite)
# Feedback cycle to switch required for syncronizing break-before-make logic
turnOn() OUT -> IN ib(InvertBoolean) OUT -> MONITOR1 switch()
turnOff() OUT -> IN id(InvertBoolean) OUT -> MONITOR2 switch()
# Config
'5000' -> INTERVAL timer() # milliseconds
'true' -> ENABLE timer
'4' -> LOWTHRESHOLD hysteresis() # Celcius
'5' -> HIGHTHRESHOLD hysteresis() # Celcius
'["0x28", "0xAF", "0x1C", "0xB2", "0x04", "0x00", "0x00", "0x33"]' -> ADDRESS thermometer()
board(ArduinoUno) PIN9 -> PIN thermometer()
board() PIN12 -> PIN turnOff()
board() PIN11 -> PIN turnOn()

Interfaccia funzionale

Definizione componenti

import fbp

@fbp.inport('addend')
@fbp.inport('augend')
@fbp.outport('result')
async def sum(addend, augend, result):
    a = await addend.pop()
    b = await augend.pop()
    await result.push(a + b)
  • Le componenti sono coroutine
  • Le porte sono code FIFO (asyncio.Queue). Se le operazioni bloccano o meno dipende dalla capacità dell'arco a cui la porta è collegata (decisa in definizione della rete).

Attivazione

FBP dice che un processo è attivato non appena un pacchetto arriva ad una porta di input.

Disattivazione

Chiarire cosa succede quando un processo si disattiva (esce?)

Porte

Porte di input opzionali

Probabilmente, se il componente ha bisogno di avere pacchetti su tutte le porte, ha senso attivare quando tutte le porte di input hanno almeno un pacchetto. Si potrebbe permettere di specificare porte obbligatorie (default?) e opzionali:

@fbp.inport('a')
@fbp.inport('b')
async def foo(a, b):
    ...

In questo caso, serve un pacchetto sia su a che su b.

@fbp.inport('a')
@fbp.inport('b', optional=True)
async def foo(a, b):
    ...

In questo caso, un pacchetto su b non attiva il processo mentre ne basta uno su a.

Chiusura porte

FBP dice che le porte possono essere chiuse, alla stregua dei channel Go. Questo dovrebbe causare la disattivazione dei processi a monte, anche se questo aspetto non è molto chiaro.

Chiarire chiusura porte

Tipizzazione

Potrebbe essere interessante permettere di tipizzare le porte per verificare la coerenza delle connessioni in fase di definizione della rete:

from typing import Any

@fbp.inport('a', type=Any)
@fbp.inport('b', type=int)
async def foo(a, b):
    ...

Usando type=Any come default, la tipizzazione è spenta.

Porte vettoriali

FBP introduce il concetto di array port che permette di collegare più archi a endpoint indicizzati appartenenti ad una singola porta. E' molto utile per limitare la proliferazione di porte in componenti complessi e per permettere la connessione di input/output in numero non noto a priori.

from typing import Any

@fbp.inport('inbound', type=Any, array=True)
@fbp.inport('outbound', type=Any)
async def multiplex(inbound, outbound):
    async for value, index in inbound.pop()  # esegue una select su tutti i canali in ingresso
        await outbound.push(value)

Un altro caso dove l'utilizzo di array port permette di limitare molto il numero di componenti necessari:

from typing import Any

@fbp.inport('condition', type=Any)
@fbp.inport('inbound', type=Any, array=True)
@fbp.outport('outbound', type=Any, array=True)
async def filter(condition, inbound, outbound):
    async for value, index in inbound.pop()
        if condition(value):
            # Spedisce sulla corrispondente porta in uscita
            await outbound[index].push(value)

Estensioni al modello FBP

Ambiente

Quando tutti i processi devono accedere a valori globali, potrebbe essere utile fornire una sorta di ambiente. Per evitare problemi di sincronizzazione e che l'ambiente venga usato in modo improprio, i valori delle variabili d'ambiente dovrebbero essere readonly. Potrebbe essere riempito con il contenuto dell'ambiente in cui gira il runtime e altra roba per la specifica istanza della rete.

@fbp.inport('a')
@fbp.inport('b')
@fbp.env('env')
async def foo(a, b, env):
    ...

Stato

A differenza di altri modelli, in FBP i processi hanno stato e, finchè non si disattivano, possono continuare a consumare input/produrre output a piacere. Questo è l'aspetto più critico in relazione alla persistenza/ripristino dello stato: l'onere è scaricato completamente sull'utente.

Un modo sarebbe quello di fornire una sorta di storage in cui, qualsiasi cosa venga scritto, entra a far parte dello stato del processo e può venire fornito già riempito in attivazione per richiedere il ripristino di uno stato precedente:

import aiofiles

@fbp.inport('filename', type=str)
@fbp.outport('line', type=str)
@fbp.state('state')
async def cat(filename, line, state):
    async with aiofiles.open(await filename.pop(), mode='r') as f:
        # restore state if present
        fpos = state.get('fpos', 0)
        await f.seek(fpos)
        async for fline in f:
            await line.push(fline)
            # save state
            state['fpos'] = await f.tell()

Multiplexing

Probabilmente è utile (indispensabile?) avere un meccanismo di multiplexing di più porte in ingresso, alla stregua della select di Go:

@fbp.inport('a')
@fbp.inport('b')
@fbp.inport('c')
@fbp.outport('fastest')
async def speedtest(a, b, c, fastest):
    value, port = await fbp.select(a, b, c)
    # Così sarebbe più generica (funziona con qualsiasi coro) ma più verbosa:
    # value, port = await fbp.select(a.pop(), b.pop())
    if port == a:
        print('a wins!')
    elif port == b:
        print('b wins!')
    else:
        print('c wins!')
    await fastest.push(value)

Senza switch le catene di if/elif possono diventare problematiche da gestire. Un possibilità per limitarele è quella di introdurre il multiplexing a livello di definizione dell'interfaccia del componente:

@fbp.inport('a')
@fbp.inport('b')
@fbp.inport('c')
@fbp.outport('fastest', multiplex=('a', 'b', 'c'))
async def speedtest(a, b, c, fastest):
    async def on_a():
        print('a wins!')

    async def on_b():
        print('b wins!')

    async def on_c():
        print('c wins!')
    
    # ...

Serializzazione/persistenza

Riguardo la persistenza dello stato della rete, un approccio brutale e sempre valido è quello di salvare nello store di turno la rappresentazione pickle delle entità che devono essere ripristinate. Per serializzare con più criterio l'utente potrebbe avere la possibilità di specificare uno schema per ogni porta:

from marshmallow import Schema, fields

class Point(Schema):
    x = fields.Int()
    y = fields.Int()
    z = fields.Int()
    description = fields.Str()


@fbp.inport('name', type=str)
@fbp.outport('position', type=Point)
# @fbp.outport('position', type=dict, schema=Point)  # altra possibilità
async def whereami(name, position):
    ...

Questo permetterebbe di utilizzare diversi tipi di backend di persistenza in modo efficiente (ad es.: se sql, lo schema potrebbe essere usato per definire lo schema delle tabelle).

Rimane il fatto che, come buona pratica, i pacchetti dovrebbero essere sempre JSON-serializzabili. Si può forzarlo?

from marshmallow import Schema, fields
class ConnectionEndpointSchema(Schema):
process = fields.Str()
port = fields.Str()
class ExportSchema(Schema):
endpoint = fields.Nested(ConnectionEndpointSchema)
export = fields.Str()
class ProcessSchema(Schema):
id = fields.Str()
component = fields.Str()
meta = fields.Dict()
class InitConnectionSchema(Schema):
dest = fields.Nested(ConnectionEndpointSchema)
data = fields.Raw()
class ConnectionSchema(Schema):
source = fields.Nested(ConnectionEndpointSchema)
dest = fields.Nested(ConnectionEndpointSchema)
meta = fields.Dict()
class GraphSchema(Schema):
in_exports = fields.Nested(ExportSchema, many=True)
out_exports = fields.Nested(ExportSchema, many=True)
connections = fields.Nested(ConnectionSchema, many=True)
data = fields.Nested(InitConnectionSchema, many=True)
processes = fields.Nested(ProcessSchema, many=True)
id = fields.Str()
# Hello world in the Flow-Based Programming notation for NoFlo
'Hello, World!' -> IN Display(Output)
'30' -> PIXELS ledchain(LedChainWS)
# If wanting to use software SPI instead
# '2' -> PINDATA ledchain
# '3' -> PINCLK ledchain
'true' -> HWSPI ledchain
'[29, "0xFF00FF"]' -> IN ledchain
'true' -> SHOW ledchain
#INPORT=ledchain.IN:PIXEL
#OUTPORT=ledchain.READY:READY
'5' -> IN matrix(LedMatrixMax)
board(ArduinoUno) PIN8 -> PINCLK matrix
board() PIN7 -> PINDIN matrix
board() PIN6 -> PINCS matrix
in(MonitorPin) OUT -> IN split(Split)
split OUT1 -> IN outA(DigitalWrite)
split OUT2 -> IN outB(DigitalWrite)
# Config
board(ArduinoUno) PIN2 -> PIN in
board PIN11 -> PIN outA
board PIN10 -> PIN outB
'6' -> PIN ledchain(LedChainNeoPixel)
'30' -> PIXELS ledchain
'[29, "0xFF00FF"]' -> IN ledchain
'true' -> SHOW ledchain
#INPORT=ledchain.IN:PIXEL
#OUTPORT=ledchain.READY:READY
# This graph provides the base infrastructure for retrieving
# semantic information from Excel spreadsheets.
#
# You'll need to provide at least the following information to
# the ports of this network:
#
# * READ.SOURCE: Filename of the spreadsheet document
# * SLICEROWS.BEGIN: How many rows to skip from the start of a sheet
# * SLICEROWS.END: How many rows to skip from the end of a sheet (optional)
# * GROUPBYROWLABEL.KEY: Which column provides the label of the row
# * SLICECOLUMNS.BEGIN: How many columns to skip from the start of a sheet
# * SLICECOLUMNS.END: How many columns to skip from the end of a sheet (optional)
#
# In the end the output will be provided through the ENTITIZE.OUT port.
#
# This network uses Apache Tika for parsing the spreadsheet into
# XHTML. Ensure that Tika can be found in the location set below:
'tika-app-0.9.jar' -> TIKA Read(ReadDocument)
# If reading fails, just display the error
Read() ERROR -> IN Display(Output)
# Parse the file to JSON, get all spreadsheets from it
Read() OUT -> IN Parse(ParseXml)
# We're only interested in DIVs inside the BODY
'body' -> KEY GetBody(GetObjectKey)
'div' -> KEY GetDiv(GetObjectKey)
# Read DIVs and pass them forward
Parse() OUT -> IN GetBody() OUT -> IN GetDiv()
# Spreadsheet title is in a H1
'h1' -> KEY GroupByTableId(GroupByObjectKey)
# Group the data by spreadsheet titles
GetDiv() OUT -> IN GetSheet(GetObjectKey) OUT -> IN GroupByTableId()
# Get spreadsheet table and the rows from it
'table' -> KEY GetTable(GetObjectKey)
'tbody' -> KEY GetTBody(GetObjectKey)
'tr' -> KEY GetTR(GetObjectKey)
GroupByTableId() OUT -> IN GetTable() OUT -> IN GetTBody() OUT -> IN GetTR()
# Process each row individually and get the cells
'td' -> KEY GetTD(GetObjectKey)
GetTR() OUT -> IN SliceRows(SliceArray) OUT -> IN SplitRows(SplitArray) OUT -> IN GetTD()
# Group by the row label, and collect into objects
GetTD() OUT -> IN GroupByRowLabel(GroupByObjectKey) OUT -> IN SliceColumns(SliceArray) OUT -> IN Collect(CollectGroups)
# If no columns are found, display that as an error message
SliceColumns() ERROR -> IN Display()
# Turn the columns into objects
Collect() OUT -> IN SplitEntities(SplitArray) OUT -> IN Entitize(PropertiesToObjects)
timer(Timer) OUT -> TRIGGER button(DigitalRead)
button() OUT -> IN inverted(InvertBoolean)
inverted() OUT -> IN led(DigitalWrite)
# Config
board(ArduinoUno) PIN2 -> PIN button()
board() PIN3 -> PIN led()
INPORT=a.IN:IN
OUTPORT=b.OUT:OUT
a(Forward) OUT -> IN b(Forward)
import fbp
import asyncio
@fbp.component()
async def sum(
addend: fbp.Inport[float],
augend: fbp.Inport[float],
result: fbp.Outport[float]
):
a = await addend.pop()
if not augend.empty():
b = await augend.pop()
r = a + b
print('{0} + {1} = {2}'.format(a, b, r))
else:
r = a
print('{0} -> {1}'.format(a, r))
await result.push(r)
sum.configure_ports(['augend'], optional=True)
async def pump_numbers(inport, iterable, delay):
for index, number in enumerate(iterable):
await inport.queue.put(number)
await asyncio.sleep(delay)
async def terminate_after_delay(comp, delay):
await asyncio.sleep(delay)
await comp.terminate()
for ii in range(5, 0, -1):
print('Shutting down loop in {0} sec.'.format(ii))
await asyncio.sleep(1.0)
print('Shutting down')
loop = asyncio.get_event_loop()
loop.stop()
loop = asyncio.get_event_loop()
loop.create_task(sum())
loop.create_task(terminate_after_delay(sum, 8.0))
loop.create_task(pump_numbers(sum.inports['addend'], list(range(20)), 0.2))
loop.create_task(pump_numbers(sum.inports['augend'], [10, 13, 15, 16, 22], 0.5))
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment