Skip to content

Instantly share code, notes, and snippets.

@kirkshoop
Created August 31, 2013 19:48
Show Gist options
  • Save kirkshoop/6400212 to your computer and use it in GitHub Desktop.
Save kirkshoop/6400212 to your computer and use it in GitHub Desktop.
transformation of SaveAsync in a Windows 8.1 sample app to use RX++.
std::shared_ptr<rx::Observable<bool>> SuspensionManager::ReactiveSave(void)
{
...
// Serialize the session state synchronously to avoid asynchronous access to shared
// state
auto sessionData = ref new InMemoryRandomAccessStream();
auto sessionDataWriter = ref new DataWriter(sessionData->GetOutputStreamAt(0));
WriteObject(sessionDataWriter, _sessionState);
// one-time construction of reactive functions needed to save.
auto reactiveStore = rxrt::FromAsyncPattern<DataWriter^>(
[](DataWriter^ dw){
return dw->StoreAsync(); });
auto reactiveFlush = rxrt::FromAsyncPattern<DataWriter^>(
[](DataWriter^ dw){
return dw->FlushAsync(); });
auto reactiveCreateFile = rxrt::FromAsyncPattern<StorageFolder^, String^, CreationCollisionOption >(
[](StorageFolder^ folder, String^ name, CreationCollisionOption option){
return folder->CreateFileAsync(name, option); });
auto reactiveOpenFile = rxrt::FromAsyncPattern<StorageFile^, FileAccessMode >(
[](StorageFile^ f, FileAccessMode mode){
return f->OpenAsync(mode); });
auto reactiveCopy = rxrt::FromAsyncPattern<IInputStream^, IOutputStream^ >(
[](IInputStream^ in, IOutputStream^ out){
return RandomAccessStream::CopyAndCloseAsync(in, out); });
// Begin the asynchronous process
// of writing the result to disk
return observable(from(reactiveStore(sessionDataWriter))
.select_many([=](unsigned int){
return reactiveFlush(sessionDataWriter); })
.select_many([=](Boolean){
return reactiveCreateFile(
ApplicationData::Current->LocalFolder,
sessionStateFilename,
CreationCollisionOption::ReplaceExisting); })
.select_many([=](StorageFile^ file){
return reactiveOpenFile(file, FileAccessMode::ReadWrite); })
.select_many([=](IRandomAccessStream^ stream){
return reactiveCopy(
sessionData->GetInputStreamAt(0),
stream->GetOutputStreamAt(0)); })
.select([](UINT64){ // convert to bool observable
return true; }) // success! call onnext with true
.publish(false) // only save once even if the caller subscribes
// more than once. initially onnext will be called with false
.connect_forever()); // save now, even if the caller does not subscribe
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment