Created June 23, 2021 10:03
Playing around with Pulumi Dynamic provider
import binascii
import os
from typing import Optional
from pulumi import Input, Output, ResourceOptions
from pulumi.dynamic import CreateResult, DiffResult, Resource, ResourceProvider, UpdateResult
class MyFileInputs(object):
path: Input[str]
content: Input[str]
def __init__(self, path, content):
self.path = path
self.content = content
class MyFileProvider(ResourceProvider):
def _get_me_list(self) -> list:
# return ','.join(['1', '2', '3'])
return ['1', '2', '3']
def create(self, args):
path = args["path"]
content = args["content"]
with open(path, "w") as f:
return CreateResult(
id_="myfile-" + binascii.b2a_hex(os.urandom(16)).decode("utf-8"),
outs={**args, 'test_list': self._get_me_list()},
# def check(self, oldArgs, newArgs):
# return CheckResult(inputs={**newArgs, 'test_list': self._get_me_list()}, failures=[])
def diff(self, id, oldArgs, newArgs):
path = oldArgs["path"]
if not os.path.exists(path):
return DiffResult(
changes=True, replaces=["path", "content"], delete_before_replace=True
content = oldArgs["content"]
with open(path, "r") as f:
acutalContent = f.readline()
if acutalContent != content:
return DiffResult(changes=True, replaces=["content"], delete_before_replace=True)
return DiffResult(changes=oldArgs != newArgs)
def delete(self, id, args):
path = args["path"]
if os.path.exists(path):
def update(self, id, oldArgs, newArgs):
oldPath = oldArgs["path"]
newPath = newArgs["path"]
if oldPath != newPath:
os.rename(oldPath, newPath)
oldContent = oldArgs["content"]
newContent = newArgs["content"]
if oldContent != newContent:
with open(newPath, "w") as f:
return UpdateResult(outs={**newArgs, 'test_list': self._get_me_list()})
class MyFileResource(Resource):
path: Output[str]
content: Output[str]
test_list: Output[list]
def __init__(self, name: str, args: MyFileInputs, opts: Optional[ResourceOptions] = None):
super().__init__(MyFileProvider(), name, {**vars(args), 'test_list': None}, opts)
