Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Reflection;
using System.Text;
using System.Threading;
using Misuzilla.Applications.TwitterIrcGateway.AddIns.Console;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Converters;
namespace Misuzilla.Applications.TwitterIrcGateway.AddIns.UserStream
{
public class UserStreamAddIn : AddInBase
{
private HashSet<Int64> _friendIds;
private Thread _workerThread;
private Boolean _isRunning;
private HttpWebRequest _webRequest;
public UserStreamConfig Config { get; set; }
public override void Initialize()
{
// XXX:
ServicePointManager.DefaultConnectionLimit = 1000;
ServicePointManager.MaxServicePoints = 0;
CurrentSession.AddInsLoadCompleted += (sender, e) =>
{
CurrentSession.AddInManager.GetAddIn<ConsoleAddIn>().RegisterContext<UserStreamContext>();
Config = CurrentSession.AddInManager.GetConfig<UserStreamConfig>();
Setup(Config.Enabled);
};
}
public override void Uninitialize()
{
Setup(false);
}
internal void Setup(Boolean isStart)
{
if (_workerThread != null)
{
_isRunning = false;
if (_webRequest != null)
{
_webRequest.Abort();
_webRequest = null;
}
_workerThread.Abort();
_workerThread.Join(200);
_workerThread = null;
}
if (isStart)
{
_friendIds = new HashSet<Int64>();
_workerThread = new Thread(WorkerProcedureEntry);
_workerThread.Start();
_isRunning = true;
}
}
private void WorkerProcedureEntry()
{
while (true)
{
try
{
WorkerProcedure();
}
catch (ThreadAbortException)
{
_isRunning = false;
// rethrow
}
catch (Exception e)
{
CurrentSession.SendServerErrorMessage("UserStream: " + e.Message);
}
if (!Config.AutoRestart)
break;
// 適当に 60 秒待機
Thread.Sleep(60 * 1000);
}
_isRunning = false;
}
private IEnumerable<String> EnumerateLines(String url)
{
_webRequest = CurrentSession.TwitterService.OAuthClient.CreateRequest(
new Uri(url),
TwitterOAuth.HttpMethod.GET);
_webRequest.ServicePoint.ConnectionLimit = 1000;
_webRequest.Timeout = 30 * 1000;
using (var response = _webRequest.GetResponse())
using (var stream = response.GetResponseStream())
{
StreamReader sr = new StreamReader(stream, Encoding.UTF8);
while (!sr.EndOfStream && _isRunning)
{
var line = sr.ReadLine();
if (!String.IsNullOrEmpty(line))
yield return line;
}
}
}
private void WorkerProcedure()
{
var options = new Dictionary<String, String>();
if (Config.AllAtMode)
options["replies"] = "all";
String optionsString = String.Join("&", options.Select(kv => String.Format("{0}={1}", kv.Key, kv.Value)).ToArray());
String url = "https://userstream.twitter.com/1.1/user.json";
if (!String.IsNullOrEmpty(optionsString))
url += "?" + optionsString;
Boolean isFirstTime = true;
var stream = EnumerateLines(url);
foreach (var json in stream)
{
try
{
if (isFirstTime)
{
var friendIds = JsonConvert.DeserializeObject<FriendsObject>(json);
_friendIds.UnionWith(friendIds.Friends);
isFirstTime = false;
}
else
{
var jsonObject = JsonConvert.DeserializeObject<JObject>(json);
if (jsonObject["event"] != null)
{
var eventObject = JsonConvert.DeserializeObject<EventObject>(json);
if (eventObject.Event == "follow" && eventObject.Source.Id == CurrentSession.TwitterUser.Id)
_friendIds.Add(eventObject.Target.Id);
}
else if (jsonObject["user"] != null)
{
var statusObject = JsonConvert.DeserializeObject<Status>(json);
if (Config.IsThroughMyPostFromUserStream && statusObject.Id == CurrentSession.TwitterUser.Id)
continue;
Boolean friendCheckRequired = false;
CurrentSession.TwitterService.ProcessStatus(statusObject,
(s) => CurrentSession.ProcessTimelineStatus(s, ref friendCheckRequired, false, false));
}
}
}
catch (Exception e)
{
CurrentSession.SendServerErrorMessage("UserStream: " + e.Message);
throw e;
}
}
}
}
[Description("User Stream設定コンテキストに切り替えます")]
public class UserStreamContext : Context
{
public override IConfiguration[] Configurations
{
get
{
return new[] { CurrentSession.AddInManager.GetAddIn<UserStreamAddIn>().Config };
}
}
[Description("User Stream を有効にします")]
public void Enable()
{
var config = CurrentSession.AddInManager.GetConfig<UserStreamConfig>();
config.Enabled = true;
CurrentSession.AddInManager.SaveConfig(config);
CurrentSession.AddInManager.GetAddIn<UserStreamAddIn>().Setup(config.Enabled);
Console.NotifyMessage("User Stream を有効にしました。");
}
[Description("User Stream を無効にします")]
public void Disable()
{
var config = CurrentSession.AddInManager.GetConfig<UserStreamConfig>();
config.Enabled = false;
CurrentSession.AddInManager.SaveConfig(config);
CurrentSession.AddInManager.GetAddIn<UserStreamAddIn>().Setup(config.Enabled);
Console.NotifyMessage("User Stream を無効にしました。");
}
protected override void OnConfigurationChanged(IConfiguration config, MemberInfo memberInfo, object value)
{
CurrentSession.AddInManager.SaveConfig(config);
}
}
public class UserStreamConfig : IConfiguration
{
[Browsable(false)]
public Boolean Enabled { get; set; }
[Description("all@と同じ挙動になるかどうかを指定します。")]
public Boolean AllAtMode { get; set; }
[Description("自分のポストをUser Streamから拾わないようにするかどうかを指定します。")]
public Boolean IsThroughMyPostFromUserStream { get; set; }
[Description("切断された際に自動的に再接続を試みるかどうかを指定します。")]
public Boolean AutoRestart { get; set; }
}
class FriendsObject
{
[JsonProperty("friends")]
public Int64[] Friends { get; set; }
}
class EventTarget
{
[JsonProperty("id")]
public Int64 Id { get; set; }
}
class EventObject
{
[JsonProperty("event")]
public String Event { get; set; }
[JsonProperty("target")]
public EventTarget Target { get; set; }
[JsonProperty("source")]
public EventTarget Source { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment