Create a gist now

Instantly share code, notes, and snippets.

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Reflection;
using System.ServiceModel.Dispatcher;
using System.Text;
using System.Threading;
using Misuzilla.Applications.TwitterIrcGateway.AddIns.Console;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Json;
using System.Xml;
using System.Xml.Linq;
namespace Misuzilla.Applications.TwitterIrcGateway.AddIns.UserStream
{
public class UserStreamAddIn : AddInBase
{
private HashSet<Int64> _friendIds;
private Thread _workerThread;
private Boolean _isRunning;
private HttpWebRequest _webRequest;
public UserStreamConfig Config { get; set; }
public override void Initialize()
{
// XXX:
ServicePointManager.DefaultConnectionLimit = 1000;
ServicePointManager.MaxServicePoints = 0;
CurrentSession.AddInsLoadCompleted += (sender, e) =>
{
CurrentSession.AddInManager.GetAddIn<ConsoleAddIn>().RegisterContext<UserStreamContext>();
Config = CurrentSession.AddInManager.GetConfig<UserStreamConfig>();
Setup(Config.Enabled);
};
}
public override void Uninitialize()
{
Setup(false);
}
internal void Setup(Boolean isStart)
{
if (_workerThread != null)
{
_isRunning = false;
if (_webRequest != null)
{
_webRequest.Abort();
_webRequest = null;
}
_workerThread.Abort();
_workerThread.Join(200);
_workerThread = null;
}
if (isStart)
{
_friendIds = new HashSet<Int64>();
_workerThread = new Thread(WorkerProcedureEntry);
_workerThread.Start();
_isRunning = true;
}
}
private void WorkerProcedureEntry()
{
while (true)
{
try
{
WorkerProcedure();
}
catch (ThreadAbortException)
{
_isRunning = false;
// rethrow
}
catch (Exception e)
{
CurrentSession.SendServerErrorMessage("UserStream: " + e.Message);
}
if (!Config.AutoRestart)
break;
// 適当に 60 秒待機
Thread.Sleep(60 * 1000);
}
_isRunning = false;
}
private IEnumerable<String> EnumerateLines(string url)
{
_webRequest = CurrentSession.TwitterService.OAuthClient.CreateRequest(
new Uri(url),
TwitterOAuth.HttpMethod.GET);
_webRequest.ServicePoint.ConnectionLimit = 1000;
_webRequest.Timeout = 30 * 1000;
using (var response = _webRequest.GetResponse())
using (var stream = response.GetResponseStream())
{
StreamReader sr = new StreamReader(stream, Encoding.UTF8);
while (!sr.EndOfStream && _isRunning)
{
var line = sr.ReadLine();
if (!String.IsNullOrEmpty(line))
yield return line;
}
}
}
private void WorkerProcedure()
{
Boolean isFirstLine = true;
foreach (var json in EnumerateLines("https://userstream.twitter.com/2/user.json"))
{
try
{
XElement element = null;
using (var jsonReader = JsonReaderWriterFactory.CreateJsonReader(Encoding.UTF8.GetBytes(json), XmlDictionaryReaderQuotas.Max))
element = XElement.Load(jsonReader);
if (isFirstLine)
{
// ユーザ一覧を取得
var friendIds = _FriendsObject.Serializer.ReadObject(new MemoryStream(Encoding.UTF8.GetBytes(json))) as _FriendsObject;
_friendIds.UnionWith(friendIds.Friends);
isFirstLine = false;
}
else if (element.Element("event") != null)
{
var eventObject = _EventObject.Serializer.ReadObject(new MemoryStream(Encoding.UTF8.GetBytes(json))) as _EventObject;
if (eventObject.Event == "follow" && eventObject.Source.Id == CurrentSession.TwitterUser.Id)
_friendIds.Add(eventObject.Target.Id);
}
else if (element.Element("user") != null)
{
var status = (_Status.Serializer.ReadObject(new MemoryStream(Encoding.UTF8.GetBytes(json))) as _Status).ToOriginal();
if (Config.IsThroughMyPostFromUserStream && status.Id == CurrentSession.TwitterUser.Id)
continue;
if (Config.AllAtMode ||
(String.IsNullOrEmpty(status.InReplyToUserId)) ||
(String.IsNullOrEmpty(status.InReplyToUserId) && _friendIds.Contains(Int64.Parse(status.InReplyToUserId))))
{
Boolean friendCheckRequired = false;
CurrentSession.TwitterService.ProcessStatus(status,
(s) => CurrentSession.ProcessTimelineStatus(s, ref friendCheckRequired, false, false));
}
}
}
catch (Exception e)
{
continue;
}
}
}
}
[Description("User Stream設定コンテキストに切り替えます")]
public class UserStreamContext : Context
{
public override IConfiguration[] Configurations
{
get
{
return new[] { CurrentSession.AddInManager.GetAddIn<UserStreamAddIn>().Config };
}
}
[Description("User Stream を有効にします")]
public void Enable()
{
var config = CurrentSession.AddInManager.GetConfig<UserStreamConfig>();
config.Enabled = true;
CurrentSession.AddInManager.SaveConfig(config);
CurrentSession.AddInManager.GetAddIn<UserStreamAddIn>().Setup(config.Enabled);
Console.NotifyMessage("User Stream を有効にしました。");
}
[Description("User Stream を無効にします")]
public void Disable()
{
var config = CurrentSession.AddInManager.GetConfig<UserStreamConfig>();
config.Enabled = false;
CurrentSession.AddInManager.SaveConfig(config);
CurrentSession.AddInManager.GetAddIn<UserStreamAddIn>().Setup(config.Enabled);
Console.NotifyMessage("User Stream を無効にしました。");
}
protected override void OnConfigurationChanged(IConfiguration config, MemberInfo memberInfo, object value)
{
CurrentSession.AddInManager.SaveConfig(config);
}
}
public class UserStreamConfig : IConfiguration
{
[Browsable(false)]
public Boolean Enabled { get; set; }
[Description("all@と同じ挙動になるかどうかを指定します。")]
public Boolean AllAtMode { get; set; }
[Description("自分のポストをUser Streamから拾わないようにするかどうかを指定します。")]
public Boolean IsThroughMyPostFromUserStream { get; set; }
[Description("切断された際に自動的に再接続を試みるかどうかを指定します。")]
public Boolean AutoRestart { get; set; }
}
[DataContract]
class _User
{
[DataMember(Name = "id")]
public Int32 Id { get; set; }
[DataMember(Name = "name")]
public String Name { get; set; }
[DataMember(Name = "screen_name")]
public String ScreenName { get; set; }
[DataMember(Name = "location")]
public String Location { get; set; }
[DataMember(Name = "description")]
public String Description { get; set; }
[DataMember(Name = "profile_image_url")]
public String ProfileImageUrl { get; set; }
[DataMember(Name = "url")]
public String Url { get; set; }
[DataMember(Name = "protected")]
public Boolean Protected { get; set; }
[DataMember(Name = "status")]
public _Status Status { get; set; }
[DataMember(Name = "following")]
public Object _following { get; set; }
public Boolean Following { get { return _following != null ? (Boolean)_following : false; } }
public User ToOriginal()
{
return new User()
{
Id = this.Id,
Name = this.Name,
ScreenName = this.ScreenName,
Location = this.Location,
Description = this.Description,
ProfileImageUrl = this.ProfileImageUrl,
Url = this.Url,
Protected = this.Protected,
Status = (this.Status != null) ? this.Status.ToOriginal() : null,
Following = this.Following
};
}
}
[DataContract]
class _Status
{
public static readonly DataContractJsonSerializer Serializer = new DataContractJsonSerializer(typeof(_Status));
[DataMember(Name = "created_at")]
public String _created_at { get; set; }
public DateTime CreatedAt { get { return DateTime.ParseExact(_created_at, "ddd MMM dd HH:mm:ss zz00 yyyy", CultureInfo.InvariantCulture.DateTimeFormat); } }
[DataMember(Name = "id")]
public Int64 Id { get; set; }
[DataMember(Name = "in_reply_to_status_id")]
public String InReplyToStatusId { get; set; }
[DataMember(Name = "in_reply_to_user_id")]
public String InReplyToUserId { get; set; }
[DataMember(Name = "retweeted_status")]
public _Status RetweetedStatus { get; set; }
[DataMember(Name = "text")]
public String Text { get; set; }
[DataMember(Name = "user")]
public _User User { get; set; }
[DataMember(Name = "source")]
public String Source { get; set; }
[DataMember(Name = "favorited")]
public String Favorited { get; set; }
[DataMember(Name = "truncated")]
public Boolean Truncated { get; set; }
[DataMember(Name = "retweet_count")]
public String RetweetCount;
[DataMember(Name = "retweeted")]
public Boolean Retweeted;
[DataMember(Name = "entities")]
public _Entities Entities;
public Status ToOriginal()
{
return new Status()
{
CreatedAt = this.CreatedAt,
Id = this.Id,
InReplyToStatusId = this.InReplyToStatusId,
InReplyToUserId = this.InReplyToUserId,
RetweetedStatus = (this.RetweetedStatus != null) ? this.RetweetedStatus.ToOriginal() : null,
_textOriginal = this.Text,
User = (this.User != null) ? this.User.ToOriginal() : null,
Source = this.Source,
Favorited = this.Favorited,
Truncated = this.Truncated,
RetweetCount = this.RetweetCount,
Retweeted = this.Retweeted,
Entities = (this.Entities != null) ? this.Entities.ToOriginal() : null,
};
}
}
/// <summary>
/// エンティティの情報を表します。
/// </summary>
[DataContract(Name = "entities")]
public class _Entities
{
//[XmlElement("media")]
//public MediaEntity[] Media;
[DataMember(Name = "urls")]
public _UrlEntity[] Urls;
[DataMember(Name = "hashtags")]
public _HashtagEntity[] Hashtags;
public Entities ToOriginal()
{
return new Entities()
{
Urls = (this.Urls != null) ? this.Urls.Select(a => a.ToOriginal()).ToArray() : null,
Hashtags = (this.Hashtags != null) ? this.Hashtags.Select(a => a.ToOriginal()).ToArray() : null,
};
}
}
/// <summary>
/// URLエンティティの情報を表します。
/// </summary>
[DataContract(Name = "url")]
public class _UrlEntity
{
[DataMember(Name = "start")]
public Int32 Start;
[DataMember(Name = "end")]
public Int32 End;
[DataMember(Name = "url")]
public String Url;
[DataMember(Name = "display_url")]
public String DisplayUrl;
[DataMember(Name = "expanded_url")]
public String ExpandedUrl;
public UrlEntity ToOriginal()
{
return new UrlEntity()
{
Start = this.Start,
End = this.End,
Url = this.Url,
DisplayUrl = this.DisplayUrl,
ExpandedUrl = this.ExpandedUrl,
};
}
}
/// <summary>
/// URLエンティティの情報を表します。
/// </summary>
[DataContract(Name = "hashtag")]
public class _HashtagEntity
{
[DataMember(Name = "start")]
public Int32 Start;
[DataMember(Name = "end")]
public Int32 End;
[DataMember(Name = "text")]
public String Text;
public HashtagEntity ToOriginal()
{
return new HashtagEntity()
{
Start = this.Start,
End = this.End,
Text = this.Text,
};
}
}
[DataContract]
class _FriendsObject
{
public static readonly DataContractJsonSerializer Serializer = new DataContractJsonSerializer(typeof(_FriendsObject));
[DataMember(Name = "friends")]
public Int64[] Friends { get; set; }
}
[DataContract]
class _EventTarget
{
[DataMember(Name = "id")]
public Int64 Id { get; set; }
}
[DataContract]
class _EventObject
{
public static readonly DataContractJsonSerializer Serializer = new DataContractJsonSerializer(typeof(_EventObject));
[DataMember(Name = "event")]
public String Event { get; set; }
[DataMember(Name = "target")]
public _EventTarget Target { get; set; }
[DataMember(Name = "source")]
public _EventTarget Source { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment