import fbp
@fbp.inport('addend')
@fbp.inport('augend')
@fbp.outport('result')
async def sum(addend, augend, result):
a = await addend.pop()
b = await augend.pop()
await result.push(a + b)
- Le componenti sono coroutine
- Le porte sono code FIFO (asyncio.Queue). Se le operazioni bloccano o meno dipende dalla capacità dell'arco a cui la porta è collegata (decisa in definizione della rete).
FBP dice che un processo è attivato non appena un pacchetto arriva ad una porta di input.
Chiarire cosa succede quando un processo si disattiva (esce?)
Probabilmente, se il componente ha bisogno di avere pacchetti su tutte le porte, ha senso attivare quando tutte le porte di input hanno almeno un pacchetto. Si potrebbe permettere di specificare porte obbligatorie (default?) e opzionali:
@fbp.inport('a')
@fbp.inport('b')
async def foo(a, b):
...
In questo caso, serve un pacchetto sia su a
che su b
.
@fbp.inport('a')
@fbp.inport('b', optional=True)
async def foo(a, b):
...
In questo caso, un pacchetto su b
non attiva il processo mentre ne basta uno
su a
.
FBP dice che le porte possono essere chiuse, alla stregua dei channel
Go. Questo
dovrebbe causare la disattivazione dei processi a monte, anche se questo aspetto
non è molto chiaro.
Chiarire chiusura porte
Potrebbe essere interessante permettere di tipizzare le porte per verificare la coerenza delle connessioni in fase di definizione della rete:
from typing import Any
@fbp.inport('a', type=Any)
@fbp.inport('b', type=int)
async def foo(a, b):
...
Usando type=Any
come default, la tipizzazione è spenta.
FBP introduce il concetto di array port che permette di collegare più archi a endpoint indicizzati appartenenti ad una singola porta. E' molto utile per limitare la proliferazione di porte in componenti complessi e per permettere la connessione di input/output in numero non noto a priori.
from typing import Any
@fbp.inport('inbound', type=Any, array=True)
@fbp.inport('outbound', type=Any)
async def multiplex(inbound, outbound):
async for value, index in inbound.pop() # esegue una select su tutti i canali in ingresso
await outbound.push(value)
Un altro caso dove l'utilizzo di array port permette di limitare molto il numero di componenti necessari:
from typing import Any
@fbp.inport('condition', type=Any)
@fbp.inport('inbound', type=Any, array=True)
@fbp.outport('outbound', type=Any, array=True)
async def filter(condition, inbound, outbound):
async for value, index in inbound.pop()
if condition(value):
# Spedisce sulla corrispondente porta in uscita
await outbound[index].push(value)
Quando tutti i processi devono accedere a valori globali, potrebbe essere utile fornire una sorta di ambiente. Per evitare problemi di sincronizzazione e che l'ambiente venga usato in modo improprio, i valori delle variabili d'ambiente dovrebbero essere readonly. Potrebbe essere riempito con il contenuto dell'ambiente in cui gira il runtime e altra roba per la specifica istanza della rete.
@fbp.inport('a')
@fbp.inport('b')
@fbp.env('env')
async def foo(a, b, env):
...
A differenza di altri modelli, in FBP i processi hanno stato e, finchè non si disattivano, possono continuare a consumare input/produrre output a piacere. Questo è l'aspetto più critico in relazione alla persistenza/ripristino dello stato: l'onere è scaricato completamente sull'utente.
Un modo sarebbe quello di fornire una sorta di storage in cui, qualsiasi cosa venga scritto, entra a far parte dello stato del processo e può venire fornito già riempito in attivazione per richiedere il ripristino di uno stato precedente:
import aiofiles
@fbp.inport('filename', type=str)
@fbp.outport('line', type=str)
@fbp.state('state')
async def cat(filename, line, state):
async with aiofiles.open(await filename.pop(), mode='r') as f:
# restore state if present
fpos = state.get('fpos', 0)
await f.seek(fpos)
async for fline in f:
await line.push(fline)
# save state
state['fpos'] = await f.tell()
Probabilmente è utile (indispensabile?) avere un meccanismo di multiplexing di
più porte in ingresso, alla stregua della select
di Go:
@fbp.inport('a')
@fbp.inport('b')
@fbp.inport('c')
@fbp.outport('fastest')
async def speedtest(a, b, c, fastest):
value, port = await fbp.select(a, b, c)
# Così sarebbe più generica (funziona con qualsiasi coro) ma più verbosa:
# value, port = await fbp.select(a.pop(), b.pop())
if port == a:
print('a wins!')
elif port == b:
print('b wins!')
else:
print('c wins!')
await fastest.push(value)
Senza switch
le catene di if/elif
possono diventare problematiche da
gestire. Un possibilità per limitarele è quella di introdurre il multiplexing a
livello di definizione dell'interfaccia del componente:
@fbp.inport('a')
@fbp.inport('b')
@fbp.inport('c')
@fbp.outport('fastest', multiplex=('a', 'b', 'c'))
async def speedtest(a, b, c, fastest):
async def on_a():
print('a wins!')
async def on_b():
print('b wins!')
async def on_c():
print('c wins!')
# ...
Riguardo la persistenza dello stato della rete, un approccio brutale e sempre
valido è quello di salvare nello store di turno la rappresentazione pickle
delle entità che devono essere ripristinate. Per serializzare con più criterio
l'utente potrebbe avere la possibilità di specificare uno schema per ogni porta:
from marshmallow import Schema, fields
class Point(Schema):
x = fields.Int()
y = fields.Int()
z = fields.Int()
description = fields.Str()
@fbp.inport('name', type=str)
@fbp.outport('position', type=Point)
# @fbp.outport('position', type=dict, schema=Point) # altra possibilità
async def whereami(name, position):
...
Questo permetterebbe di utilizzare diversi tipi di backend di persistenza in
modo efficiente (ad es.: se sql
, lo schema potrebbe essere usato per definire
lo schema delle tabelle).
Rimane il fatto che, come buona pratica, i pacchetti dovrebbero essere sempre JSON-serializzabili. Si può forzarlo?