Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Last active December 10, 2015 10:39
Show Gist options
  • Save anaisbetts/4422572 to your computer and use it in GitHub Desktop.
Save anaisbetts/4422572 to your computer and use it in GitHub Desktop.
A non-blocking version of @praeclarum's SqliteAsync.cs using Rx
using System;
using System.Diagnostics.CodeAnalysis;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using ReactiveUI;
namespace Akavache.Sqlite3
{
abstract class KeyedOperation
{
public string Key { get; set; }
[SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")]
public int Id { get; set; }
public abstract IObservable<Unit> EvaluateFunc();
}
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", Justification = "Observables are automatically disposed OnComplete")]
class KeyedOperation<T> : KeyedOperation
{
public Func<IObservable<T>> Func { get; set; }
public readonly ReplaySubject<T> Result = new ReplaySubject<T>();
public override IObservable<Unit> EvaluateFunc()
{
var ret = Func().Multicast(Result);
ret.Connect();
return ret.Select(_ => Unit.Default);
}
}
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", Justification = "Observables are automatically disposed OnComplete")]
class KeyedOperationQueue
{
readonly IScheduler scheduler;
static int sequenceNumber = 1;
readonly Subject<KeyedOperation> queuedOps = new Subject<KeyedOperation>();
readonly IConnectableObservable<KeyedOperation> resultObs;
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope", Justification = "Automatically disposed when the observable completes.")]
public KeyedOperationQueue(IScheduler scheduler = null)
{
scheduler = scheduler ?? RxApp.TaskpoolScheduler;
this.scheduler = scheduler;
resultObs = queuedOps
.GroupBy(x => x.Key)
.Select(x => x.Select(ProcessOperation).Concat())
.Merge()
.Multicast(new Subject<KeyedOperation>());
resultObs.Connect();
}
/// <summary>
/// Queue an operation to run in the background. All operations with the same key will run in sequence,
/// waiting for the previous operation to complete.
/// </summary>
/// <param name = "key">The key to use</param>
/// <param name = "action">A method to run in the background</param>
/// <returns>A future representing when the operation completes</returns>
public IObservable<Unit> EnqueueOperation(string key, Action action)
{
return EnqueueOperation(key, () =>
{
action();
return Unit.Default;
});
}
/// <summary>
/// Queue an operation to run in the background that returns a value. All operations with the same key will run in sequence,
/// waiting for the previous operation to complete.
/// </summary>
/// <param name="key">The key to use</param>
/// <param name="calculationFunc">A method to run in the background that returns a single value</param>
/// <returns>A future value</returns>
public IObservable<T> EnqueueOperation<T>(string key, Func<T> calculationFunc)
{
return EnqueueObservableOperation(key, () => SafeStart(calculationFunc));
}
/// <summary>
/// Queue an operation to run in the background that returns a stream of values. All operations with the same key will run in sequence,
/// waiting for the previous operation to complete.
/// If you want to queue an operation that already returns IObservable, this is your guy.
/// </summary>
/// <param name="key">The key to use</param>
/// <param name="asyncCalculationFunc">A method to run in the background that returns a stream of values</param>
/// <returns>A future stream of values</returns>
public IObservable<T> EnqueueObservableOperation<T>(string key, Func<IObservable<T>> asyncCalculationFunc)
{
int id = Interlocked.Increment(ref sequenceNumber);
key = key ?? "__NONE__";
var item = new KeyedOperation<T>
{
Key = key, Id = id,
Func = asyncCalculationFunc,
};
queuedOps.OnNext(item);
return item.Result;
}
IObservable<KeyedOperation> ProcessOperation(KeyedOperation operation)
{
return Observable.Defer(operation.EvaluateFunc)
.Select(_ => operation)
.Catch(Observable.Return(operation));
}
IObservable<T> SafeStart<T>(Func<T> calculationFunc)
{
var ret = new AsyncSubject<T>();
Observable.Start(() =>
{
try
{
var val = calculationFunc();
ret.OnNext(val);
ret.OnCompleted();
}
catch (Exception ex)
{
ret.OnError(ex);
}
}, scheduler);
return ret;
}
}
}
//
// Copyright (c) 2012 Krueger Systems, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akavache.Sqlite3;
using ReactiveUI;
namespace SQLite
{
public interface IAsyncTableQuery<T> where T : new()
{
IAsyncTableQuery<T> Where (Expression<Func<T, bool>> predExpr);
IAsyncTableQuery<T> Skip (int n);
IAsyncTableQuery<T> Take (int n);
IAsyncTableQuery<T> OrderBy<U> (Expression<Func<T, U>> orderExpr);
IAsyncTableQuery<T> OrderByDescending<U> (Expression<Func<T, U>> orderExpr);
IObservable<List<T>> ToListAsync ();
IObservable<int> CountAsync ();
IObservable<T> ElementAtAsync (int index);
IObservable<T> FirstAsync ();
IObservable<T> FirstOrDefaultAsync ();
}
public class SQLiteAsyncConnection
{
SQLiteConnectionString _connectionString;
static KeyedOperationQueue _opQueue = new KeyedOperationQueue(RxApp.TaskpoolScheduler);
public SQLiteAsyncConnection (string databasePath, bool storeDateTimeAsTicks = false)
{
_connectionString = new SQLiteConnectionString (databasePath, storeDateTimeAsTicks);
}
SQLiteConnectionWithoutLock GetConnection ()
{
return SQLiteConnectionPool.Shared.GetConnection (_connectionString);
}
public IObservable<CreateTablesResult> CreateTableAsync<T> ()
where T : new ()
{
return CreateTablesAsync (typeof (T));
}
public IObservable<CreateTablesResult> CreateTablesAsync<T, T2> ()
where T : new ()
where T2 : new ()
{
return CreateTablesAsync (typeof (T), typeof (T2));
}
public IObservable<CreateTablesResult> CreateTablesAsync<T, T2, T3> ()
where T : new ()
where T2 : new ()
where T3 : new ()
{
return CreateTablesAsync (typeof (T), typeof (T2), typeof (T3));
}
public IObservable<CreateTablesResult> CreateTablesAsync<T, T2, T3, T4> ()
where T : new ()
where T2 : new ()
where T3 : new ()
where T4 : new ()
{
return CreateTablesAsync (typeof (T), typeof (T2), typeof (T3), typeof (T4));
}
public IObservable<CreateTablesResult> CreateTablesAsync<T, T2, T3, T4, T5> ()
where T : new ()
where T2 : new ()
where T3 : new ()
where T4 : new ()
where T5 : new ()
{
return CreateTablesAsync (typeof (T), typeof (T2), typeof (T3), typeof (T4), typeof (T5));
}
public IObservable<CreateTablesResult> CreateTablesAsync (params Type[] types)
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
CreateTablesResult result = new CreateTablesResult ();
var conn = GetConnection ();
foreach (Type type in types) {
int aResult = conn.CreateTable (type);
result.Results[type] = aResult;
}
return result;
});
}
public IObservable<int> DropTableAsync<T> ()
where T : new ()
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
var conn = GetConnection ();
return conn.DropTable<T> ();
});
}
public IObservable<int> InsertAsync (object item)
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
var conn = GetConnection ();
return conn.Insert (item);
});
}
public IObservable<int> UpdateAsync (object item)
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
var conn = GetConnection ();
return conn.Update (item);
});
}
public IObservable<int> DeleteAsync (object item)
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
var conn = GetConnection ();
return conn.Delete (item);
});
}
public IObservable<T> GetAsync<T>(object pk)
where T : new()
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
var conn = GetConnection();
return conn.Get<T>(pk);
});
}
public IObservable<T> FindAsync<T> (object pk)
where T : new ()
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
var conn = GetConnection ();
return conn.Find<T> (pk);
});
}
public IObservable<T> GetAsync<T> (Expression<Func<T, bool>> predicate)
where T : new()
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () =>
{
var conn = GetConnection();
return conn.Get<T> (predicate);
});
}
public IObservable<T> FindAsync<T> (Expression<Func<T, bool>> predicate)
where T : new ()
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
var conn = GetConnection ();
return conn.Find<T> (predicate);
});
}
public IObservable<int> ExecuteAsync (string query, params object[] args)
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
var conn = GetConnection ();
return conn.Execute (query, args);
});
}
public IObservable<int> InsertAllAsync (IEnumerable items)
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
var conn = GetConnection ();
return conn.InsertAll (items);
});
}
public IObservable<Unit>RunInTransactionAsync(Action<SQLiteConnection> action)
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
var conn = this.GetConnection();
conn.BeginTransaction();
try {
action(conn);
conn.Commit();
} catch (Exception) {
conn.Rollback();
throw;
}
});
}
public IAsyncTableQuery<T> Table<T> ()
where T : new ()
{
//
// This isn't async as the underlying connection doesn't go out to the database
// until the query is performed. The Async methods are on the query iteself.
//
var conn = GetConnection ();
return new AsyncTableQuery<T> (conn.Table<T> (), _opQueue, _connectionString);
}
public IObservable<T> ExecuteScalarAsync<T> (string sql, params object[] args)
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
var conn = GetConnection ();
var command = conn.CreateCommand (sql, args);
return command.ExecuteScalar<T> ();
});
}
public IObservable<List<T>> QueryAsync<T> (string sql, params object[] args)
where T : new ()
{
return _opQueue.EnqueueOperation(_connectionString.DatabasePath, () => {
var conn = GetConnection ();
return conn.Query<T> (sql, args);
});
}
//
// TODO: Bind to AsyncConnection.GetConnection instead so that delayed
// execution can still work after a Pool.Reset.
//
class AsyncTableQuery<T> : IAsyncTableQuery<T> where T : new ()
{
TableQuery<T> _innerQuery;
KeyedOperationQueue _opQueue;
SQLiteConnectionString _connString;
public AsyncTableQuery (TableQuery<T> innerQuery, KeyedOperationQueue opQueue, SQLiteConnectionString connString)
{
_innerQuery = innerQuery;
_opQueue = opQueue;
_connString = connString;
}
public IAsyncTableQuery<T> Where (Expression<Func<T, bool>> predExpr)
{
return new AsyncTableQuery<T> (_innerQuery.Where (predExpr), _opQueue, _connString);
}
public IAsyncTableQuery<T> Skip (int n)
{
return new AsyncTableQuery<T> (_innerQuery.Skip (n), _opQueue, _connString);
}
public IAsyncTableQuery<T> Take (int n)
{
return new AsyncTableQuery<T> (_innerQuery.Take (n), _opQueue, _connString);
}
public IAsyncTableQuery<T> OrderBy<U> (Expression<Func<T, U>> orderExpr)
{
return new AsyncTableQuery<T> (_innerQuery.OrderBy<U> (orderExpr), _opQueue, _connString);
}
public IAsyncTableQuery<T> OrderByDescending<U> (Expression<Func<T, U>> orderExpr)
{
return new AsyncTableQuery<T> (_innerQuery.OrderByDescending<U> (orderExpr), _opQueue, _connString);
}
public IObservable<List<T>> ToListAsync ()
{
return _opQueue.EnqueueOperation(_connString.DatabasePath, () => {
return _innerQuery.ToList ();
});
}
public IObservable<int> CountAsync ()
{
return _opQueue.EnqueueOperation(_connString.DatabasePath, () => {
return _innerQuery.Count ();
});
}
public IObservable<T> ElementAtAsync (int index)
{
return _opQueue.EnqueueOperation(_connString.DatabasePath, () => {
return _innerQuery.ElementAt (index);
});
}
public IObservable<T> FirstAsync ()
{
return _opQueue.EnqueueOperation(_connString.DatabasePath, () => {
return _innerQuery.First ();
});
}
public IObservable<T> FirstOrDefaultAsync ()
{
return _opQueue.EnqueueOperation(_connString.DatabasePath, () => {
return _innerQuery.FirstOrDefault ();
});
}
}
}
public class CreateTablesResult
{
public Dictionary<Type, int> Results { get; private set; }
internal CreateTablesResult ()
{
this.Results = new Dictionary<Type, int> ();
}
}
class SQLiteConnectionPool
{
class Entry
{
public SQLiteConnectionString ConnectionString { get; private set; }
public SQLiteConnectionWithoutLock Connection { get; private set; }
public Entry (SQLiteConnectionString connectionString)
{
ConnectionString = connectionString;
Connection = new SQLiteConnectionWithoutLock (connectionString);
}
public void OnApplicationSuspended ()
{
Connection.Dispose ();
Connection = null;
}
}
readonly Dictionary<string, Entry> _entries = new Dictionary<string, Entry> ();
readonly object _entriesLock = new object ();
static readonly SQLiteConnectionPool _shared = new SQLiteConnectionPool ();
/// <summary>
/// Gets the singleton instance of the connection tool.
/// </summary>
public static SQLiteConnectionPool Shared
{
get
{
return _shared;
}
}
public SQLiteConnectionWithoutLock GetConnection (SQLiteConnectionString connectionString)
{
lock (_entriesLock) {
Entry entry;
string key = connectionString.ConnectionString;
if (!_entries.TryGetValue (key, out entry)) {
entry = new Entry (connectionString);
_entries[key] = entry;
}
return entry.Connection;
}
}
/// <summary>
/// Closes all connections managed by this pool.
/// </summary>
public void Reset ()
{
lock (_entriesLock) {
foreach (var entry in _entries.Values) {
entry.OnApplicationSuspended ();
}
_entries.Clear ();
}
}
/// <summary>
/// Call this method when the application is suspended.
/// </summary>
/// <remarks>Behaviour here is to close any open connections.</remarks>
public void ApplicationSuspended ()
{
Reset ();
}
}
class SQLiteConnectionWithoutLock : SQLiteConnection
{
public SQLiteConnectionWithoutLock (SQLiteConnectionString connectionString)
: base (connectionString.DatabasePath, connectionString.StoreDateTimeAsTicks)
{
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment