Skip to content

Instantly share code, notes, and snippets.

@robhruska
Created October 23, 2015 20:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robhruska/824d85a14e6b1fe46386 to your computer and use it in GitHub Desktop.
Save robhruska/824d85a14e6b1fe46386 to your computer and use it in GitHub Desktop.
UserMigrationJob.cs
/*
The MIT License (MIT)
Copyright (c) 2015 Agile Sports Technologies, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Hudl.Config;
namespace UserMigrationJob
{
// A bespoke console application written to migrate user records in sequential batches.
// Since it's a throwaway job, it's not the most organized or well-factored class, but the
// code is solid and gives good consideration to edge cases, usability, and ensuring a
// stable and predictable migration.
class Program
{
private static bool _isRunning = true;
private const string EurekaUrl = "http://example.com:8080/eureka/v2/apps/HUDL_USERS";
private static readonly HttpClient Http = new HttpClient();
private const ConsoleColor DefaultColor = ConsoleColor.White;
// On any eureka lookup the job will cache known user webservers.
private static List<Uri> _webservers;
private static long? ConsistentPointUserId;
private static long MigratedCount = 0;
private static long NonexistentCount = 0;
private static bool IsCanceled = false;
static void Main(string[] args)
{
Console.CancelKeyPress += HandleCancel;
Console.Title = "Migrate Users";
Console.ForegroundColor = ConsoleColor.Cyan;
WriteLine(" +----------------------------------------------------------------------+");
WriteLine(" | I will not fear. |");
WriteLine(" | Fear is the mind-killer. |");
WriteLine(" | Fear is the little-death that brings total obliteration. |");
WriteLine(" | I will face my fear. |");
WriteLine(" | I will permit it to pass over me and through me. |");
WriteLine(" | And when it has gone past I will turn the inner eye to see its path. |");
WriteLine(" | Where the fear has gone there will be nothing. Only I will remain. |");
WriteLine(" +----------------------------------------------------------------------+");
WriteLine();
Console.ForegroundColor = DefaultColor;
Initialize();
try
{
Console.WriteLine();
Console.WriteLine("Control+C will safely stop the 'run-job' migration job, but not quit. Control+Break will hard-terminate the program. Use 'exit' or 'quit' for a graceful termination.");
while (_isRunning)
{
WriteLine();
Console.ForegroundColor = ConsoleColor.Gray;
WriteLine("Commands:");
WriteLine(" get-state # Prints current state of all webservers.");
WriteLine(" set-state <point-userid> <batch-size> # Sets state on all webservers.");
WriteLine(" migrate-one <userid> # Migrates a user.");
WriteLine(" migrate-batch <userid> <batch-size> <parallelism?> # Migrates a batch of users.");
WriteLine(" run-job <userid> <batch-size> <stop-after-userid> <sleep-millis?> # Migrates batches back-to-back until completion or failure.");
Console.ForegroundColor = DefaultColor;
WriteLine();
var command = Prompt();
ProcessInputCommand(command);
WriteLine();
}
}
catch (Exception e)
{
WriteException(e, "Crashed.");
throw;
}
}
private static void ProcessInputCommand(string command)
{
if (string.IsNullOrWhiteSpace(command))
{
return;
}
command = command.Trim();
if (command == "exit" || command == "quit")
{
_isRunning = false;
return;
}
try
{
if (command.StartsWith("get-state"))
{
QueryCurrentWebserverStates(EurekaUrl);
return;
}
if (command.StartsWith("set-state"))
{
var split = command.Split(' ');
if (split.Length != 3)
{
throw new ArgumentException("Usage: set-state <point-userid> <batch-size>");
}
var userId = long.Parse(split[1]);
var batch = long.Parse(split[2]);
var success = SetAllWebserversState(userId, batch);
if (!success)
{
WriteLineError("ERROR: Not all webservers state updates were successful. Make them consistent before migrating anything.");
}
QueryCurrentWebserverStates(EurekaUrl);
return;
}
if (command.StartsWith("migrate-one"))
{
var split = command.Split(' ');
if (split.Length != 2)
{
throw new ArgumentException("Usage: migrate-one <userid>");
}
var userId = long.Parse(split[1]);
WriteLine();
MigrateOneUser(userId);
return;
}
if (command.StartsWith("migrate-batch"))
{
var split = command.Split(' ');
if (split.Length < 3 || 4 < split.Length)
{
throw new ArgumentException("Usage: migrate-batch <start-userid> <batch-size> <parallelism?>");
}
var userId = int.Parse(split[1]);
var batch = int.Parse(split[2]);
var parallelism = (split.Length == 4 ? int.Parse(split[3]) : 10);
WriteLine();
MigrateBatch(userId, batch, parallelism);
return;
}
if (command.StartsWith("run-job"))
{
var split = command.Split(' ');
if (split.Length < 4 || 5 < split.Length)
{
throw new ArgumentException("Usage: migrate-up-to <start-userid> <batch-size> <stop-at-userid> <sleep-millis?>");
}
var userId = int.Parse(split[1]);
var batch = int.Parse(split[2]);
var stopId = int.Parse(split[3]);
TimeSpan? sleep = null;
if (split.Length == 5)
{
sleep = TimeSpan.FromMilliseconds(int.Parse(split[4]));
}
WriteLine();
MigrateInBatchesUpTo(userId, batch, stopId, 10, sleep);
return;
}
if (command.StartsWith("clear-counts"))
{
MigratedCount = 0;
NonexistentCount = 0;
return;
}
WriteLineError("I don't know how to \"" + command + "\"");
}
catch(ArgumentException e)
{
WriteLineError(e.Message);
}
catch (Exception e)
{
WriteException(e);
}
}
private static void HandleCancel(object obj, ConsoleCancelEventArgs args)
{
if (args.SpecialKey == ConsoleSpecialKey.ControlC)
{
IsCanceled = true;
args.Cancel = true;
}
}
private static void Initialize()
{
// This file doesn't need to exist, this line just establishes a non-null ConfigProvider
// to initialize it into a known, working state.
ConfigProvider.UseProvider(new FileConfigurationProvider(@"c:\temp", "unused-config.txt"));
ReloadUsersWebservers(EurekaUrl, true);
QueryCurrentWebserverStates(EurekaUrl, false);
}
private static void ReloadUsersWebservers(string eurekaUrl, bool verbose = true)
{
if (verbose)
{
WriteLine();
WriteLine("Loading users webservers from eureka...");
}
var ipRegex = new Regex("<hostName[^>]*>([^<]+)</hostName>");
//var ipRegex = new Regex("<local-ipv4[^>]*>([^<]+)</local-ipv4>", RegexOptions.IgnoreCase);
var portRegex = new Regex("<port[^>]*>([^<]+)</port>", RegexOptions.IgnoreCase);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var eurekaUri = new Uri(eurekaUrl);
var content = Http.GetAsync(eurekaUri, cts.Token).Result.Content.ReadAsStringAsync().Result;
var portMatches = portRegex.Match(content);
var port = ushort.Parse(portMatches.Groups[1].Value);
var ipMatches = ipRegex.Matches(content);
if (ipMatches.Count == 0)
{
throw new Exception("No users servers found in eureka");
}
var uris = new List<Uri>();
foreach (Match match in ipMatches)
{
var ip = match.Groups[1].Value;
var uri = new UriBuilder("http", ip, port).Uri;
uris.Add(uri);
}
if (verbose) WriteLine(" Caching {0} users webservers + port {1}", uris.Count, port);
foreach (var uri in uris)
{
if (verbose) WriteLine(" {0}", uri);
}
_webservers = uris;
}
private static void QueryCurrentWebserverStates(string eurekaUrl, bool queryFirst = true)
{
if (queryFirst)
{
ReloadUsersWebservers(eurekaUrl, false);
}
WriteLine();
WriteLine("Loading migration state from webservers...");
var states = new HashSet<string>();
foreach (var webserver in _webservers)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var uri = new Uri(webserver, "/migration/current-state");
try
{
var response = Http.GetAsync(uri, cts.Token).Result;
if (response.StatusCode != HttpStatusCode.OK)
{
throw new Exception("Error reading state from webserver " + uri);
}
var result = response.Content.ReadAsStringAsync().Result;
var split = result.Split(',');
var point = split[0];
var batch = split[1];
var warning = "";
if (!states.Contains(result) && states.Count > 0)
{
warning = " [STATE MISMATCH]";
}
states.Add(result);
WriteLine(" Webserver state: Point={0} Batch={1} Host={2} {3}", point, batch, webserver, warning);
}
catch (Exception e)
{
ConsistentPointUserId = null;
WriteException(e, string.Format("Error loading state from {0}", uri));
return;
}
}
if (states.Count > 1)
{
ConsistentPointUserId = null;
WriteLine();
WriteLineError("Webserver states are inconsistent, you should send a set-state to sync them.");
}
else if (states.Count == 0)
{
ConsistentPointUserId = null;
WriteLine();
WriteLineError("There are no webservers to talk to.");
}
else
{
ConsistentPointUserId = long.Parse(states.ToArray()[0].Split(',')[0]);
}
LoadMongoState();
}
private static bool SetAllWebserversState(long point, long batch, bool verbose = true)
{
if (verbose)
{
WriteLine();
WriteLine("Setting migration state on webservers to Point={0} Batch={1}", point, batch);
}
var overallSuccess = true;
var anySuccess = false;
foreach (var webserver in _webservers)
{
if (verbose) Console.Write(" {0} ", webserver);
var uri = new Uri(webserver, "/migration/set-state");
try
{
var request = new HttpRequestMessage(HttpMethod.Post, uri)
{
Content = new StringContent(string.Format("point={0}&batch={1}", point, batch), Encoding.UTF8, "application/x-www-form-urlencoded"),
};
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var response = Http.SendAsync(request, cts.Token).Result;
if (response.StatusCode != HttpStatusCode.OK)
{
WriteLineError("ERROR HTTP " + (int)response.StatusCode + " (" + response.ReasonPhrase + ")");
overallSuccess = false;
}
else
{
anySuccess = true;
if (verbose) WriteLineSuccess("SUCCESS HTTP " + (int)response.StatusCode);
}
}
catch (Exception e)
{
WriteException(e, "Error setting webserver state to " + point + " " + batch + " for " + uri);
overallSuccess = false;
}
}
// If we managed to set the state anywhere, our ideal situation is one where we can
// sync the rest of the servers to that state. Save it to mongo so that if we restart
// them they'll load our intended state from there.
if (anySuccess)
{
try
{
SaveMongoState(point, batch);
}
catch (Exception e)
{
Console.ForegroundColor = ConsoleColor.DarkYellow;
WriteLine("Error saving mongo state: {0}", e.Message);
WriteLine();
WriteLine("Trying one more time.");
Console.ForegroundColor = DefaultColor;
Thread.Sleep(TimeSpan.FromSeconds(1));
try
{
SaveMongoState(point, batch);
}
catch (Exception)
{
WriteException(e, "Error saving mongo state after two tries, things may be kind of messed up");
return false;
}
}
}
return overallSuccess;
}
private static bool EnsureMigratableState(long startUserId)
{
QueryCurrentWebserverStates(EurekaUrl);
if (_webservers == null || _webservers.Count == 0)
{
WriteLine();
WriteLineError("There are no users webservers to talk to.");
return false;
}
if (ConsistentPointUserId == null)
{
WriteLine();
WriteLineError("The webserver states aren't consistent. Use set-state to correct this and try again.");
return false;
}
if (startUserId != ConsistentPointUserId)
{
WriteLine();
WriteLineError("I won't start a migration for a user that's not the current point user id (which is {0}).", ConsistentPointUserId);
return false;
}
return true;
}
private static void LoadMongoState()
{
var mongoState = MongoConnector.GetMigrationState();
if (mongoState == null)
{
WriteLine("No mongo state");
return;
}
WriteLine("Mongo state is Point={0} Batch={1}", mongoState.PointUserId, mongoState.BatchLength);
}
private static void SaveMongoState(long point, long batch)
{
MongoConnector.SetMigrationState(point, batch);
}
private static void MigrateOneUser(long userId)
{
if (!EnsureMigratableState(userId))
{
return;
}
Console.Write("Migrating User={0} ", userId);
var uri = new Uri(_webservers[0], "/migration/migrate-user");
var request = new HttpRequestMessage(HttpMethod.Post, uri)
{
Content = new StringContent(string.Format("userId={0}", userId), Encoding.UTF8, "application/x-www-form-urlencoded"),
};
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var response = Http.SendAsync(request, cts.Token).Result;
if (response.StatusCode != HttpStatusCode.OK)
{
var content = response.Content.ReadAsStringAsync().Result;
WriteLineError("ERROR HTTP " + (int) response.StatusCode + " (" + response.ReasonPhrase + ")");
WriteLineError(content);
}
else
{
WriteLineSuccess("SUCCESS HTTP " + (int) response.StatusCode);
}
}
// Only call this from within MigrateBatch. Used to help parallelize.
private static HttpResponseMessage _DoMigrateBatchOnWebserver(int startUserId, int batchLength, int perNodeParallelism, Uri serverUri)
{
var uri = new Uri(serverUri, "/migration/migrate-batch");
var request = new HttpRequestMessage(HttpMethod.Post, uri)
{
Content = new StringContent(
string.Format("startUserId={0}&batchLength={1}&parallelism={2}", startUserId, batchLength, perNodeParallelism),
Encoding.UTF8,
"application/x-www-form-urlencoded"),
};
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
return Http.SendAsync(request, cts.Token).Result;
}
private static bool MigrateBatch(int startUserId, int batchLength, int perNodeParallelism, bool ensure = true)
{
if (ensure && !EnsureMigratableState(startUserId))
{
return false;
}
WriteLine("Migrating User={0} to User={1}: ", startUserId, startUserId + batchLength - 1);
var tasks = new List<Task<ChunkResult>>();
var servers = new List<Uri>(_webservers);
var chunkSize = (int)Math.Ceiling(batchLength / (double)servers.Count);
var chunkStartId = startUserId;
var endUserId = (startUserId + batchLength) - 1;
var serverIndex = 0;
var stopwatch = Stopwatch.StartNew();
while (chunkStartId <= endUserId)
{
var server = servers[serverIndex];
var thisStartId = chunkStartId;
var thisChunkSize = Math.Min(chunkSize, (endUserId - thisStartId) + 1);
var task = Task.Run(() =>
{
Action writeFunc = null;
ChunkResult result;
try
{
var response = _DoMigrateBatchOnWebserver(thisStartId, thisChunkSize, perNodeParallelism, server);
var content = response.Content.ReadAsStringAsync().Result;
var lines = content.Split(new[] { Environment.NewLine }, StringSplitOptions.None);
var logPrefix = string.Format("Chunk StartUser={0} EndUser={1}", thisStartId, (thisStartId + thisChunkSize) - 1);
if (response.StatusCode == HttpStatusCode.OK)
{
var migrated = lines[0].Split('=')[1];
var nonexistent = lines[1].Split('=')[1];
var verified = lines[2].Split('=')[1];
result = new ChunkResult
{
Migrated = long.Parse(migrated),
Nonexistent = long.Parse(nonexistent),
Verified = long.Parse(verified),
};
if (verified != migrated)
{
writeFunc = () => WriteLineError(" {0} VERIFICATION COUNT MISMATCH ({1})", logPrefix, string.Format("Migrated {0} users but only {1} were verified", migrated, verified));
return null;
}
writeFunc = () => WriteLineSuccess(" {0} SUCCESS Migrated={1} Nonexistent={2} Verified={3}", logPrefix, migrated, nonexistent, verified);
}
else if (response.StatusCode == HttpStatusCode.BadRequest)
{
writeFunc = () => WriteLineError(" {0} BAD REQUEST ({1})", logPrefix, response.ReasonPhrase);
return null;
}
else
{
writeFunc = () => WriteLineError(" {0} MIGRATION ERROR {1}", logPrefix, string.Join(" | ", lines));
return null;
}
}
catch (Exception e)
{
writeFunc = () => WriteException(e, string.Format("Error migrating batch chunk on server {0}", server));
return null;
}
finally
{
if (writeFunc != null) writeFunc();
}
return result;
});
tasks.Add(task);
serverIndex++;
chunkStartId += chunkSize;
}
var results = Task.WhenAll(tasks).Result;
stopwatch.Stop();
WriteLine(" {0} ms", stopwatch.ElapsedMilliseconds);
var success = results.All(result => result != null);
if (success)
{
MigratedCount += results.Sum(result => result.Migrated);
NonexistentCount += results.Sum(result => result.Nonexistent);
}
return success;
}
private class ChunkResult
{
public long Migrated { get; set; }
public long Nonexistent { get; set; }
public long Verified { get; set; }
}
private static void MigrateInBatchesUpTo(int startUserId, int batchLength, int stopAfterId, int perNodeParallelism = 10, TimeSpan? sleep = null)
{
if (!EnsureMigratableState(startUserId))
{
return;
}
WriteLine();
var userId = startUserId;
var faulted = false;
var canceled = false;
var stopwatch = Stopwatch.StartNew();
var roughCount = 0;
while (userId <= stopAfterId)
{
roughCount += batchLength;
try
{
var length = Math.Min(batchLength, (stopAfterId - userId) + 1);
if (!SetAllWebserversState(userId, length, false))
{
faulted = true;
break;
}
var result = MigrateBatch(userId, length, perNodeParallelism, false);
if (!result)
{
faulted = true;
break;
}
userId += batchLength;
if (sleep != null)
{
Thread.Sleep(sleep.Value);
}
}
catch (Exception e)
{
WriteException(e);
faulted = true;
break;
}
if (IsCanceled)
{
canceled = true;
WriteLine();
Console.ForegroundColor = ConsoleColor.Yellow;
WriteLine("Canceled.");
Console.ForegroundColor = DefaultColor;
IsCanceled = false;
break;
}
}
stopwatch.Stop();
WriteLine();
WriteLine("~{0} users took {1} ms", roughCount, stopwatch.ElapsedMilliseconds / 1000);
WriteLine("Migrated: {0}; Nonexistent: {1}", MigratedCount, NonexistentCount);
if (faulted || canceled)
{
if (faulted)
{
WriteLine();
WriteLineError("Migration aborted. Check the output from the last batch for errors. Resetting webservers to an unlocked state at User={0}", userId);
}
SetAllWebserversState(userId, 0);
}
else
{
SetAllWebserversState(stopAfterId + 1, 0);
}
QueryCurrentWebserverStates(EurekaUrl);
}
private static void WriteLineError(string line, params object[] args)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine(line, args);
Console.ForegroundColor = DefaultColor;
}
private static void WriteLineError(Exception e)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine(e);
Console.ForegroundColor = DefaultColor;
}
private static void WriteError(string message, params object[] args)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.Write(message, args);
Console.ForegroundColor = DefaultColor;
}
private static void WriteException(Exception e, string additionalMessage = null)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine();
if (additionalMessage != null) Console.WriteLine(additionalMessage);
Console.WriteLine(e);
Console.ForegroundColor = DefaultColor;
}
private static void WriteLineSuccess(string line, params object[] args)
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine(line, args);
Console.ForegroundColor = DefaultColor;
}
private static void WriteSuccess(string message, params object[] args)
{
Console.ForegroundColor = ConsoleColor.Green;
Console.Write(message, args);
Console.ForegroundColor = DefaultColor;
}
private static string Prompt(string prompt = null)
{
Console.ForegroundColor = ConsoleColor.Yellow;
if (prompt != null) Console.WriteLine(prompt);
Console.Write("> ");
var input = Console.ReadLine();
Console.ForegroundColor = DefaultColor;
return input;
}
private static void WriteLine(string line, params object[] args)
{
Console.WriteLine(line, args);
}
private static void WriteLine()
{
Console.WriteLine();
}
private static void Write(string message, params object[] args)
{
Console.Write(message, args);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment