Skip to content

Instantly share code, notes, and snippets.

@miyamuko
Forked from youz/xyttr-streaming-api.l
Created January 15, 2012 11:48
Show Gist options
  • Save miyamuko/1615580 to your computer and use it in GitHub Desktop.
Save miyamuko/1615580 to your computer and use it in GitHub Desktop.
xl-winhttp版、xyttrでstreaming-apiを使用 (とりあえずfilter.jsonのtrackのみ)
;;; -*- mode: lisp; package: xyttr -*-
(eval-when (:compile-toplevel :load-toplevel :execute)
(require "cmu_loop")
(require "json")
(require "xl-winhttp")
(require "xyttr"))
(in-package :xyttr)
(defconstant *stream-api-host* "stream.twitter.com")
(defconstant *stream-api-method* "/1/statuses/filter.json")
(defconstant *stream-api-verb* 'post)
(defvar *streaming-session* nil)
(defvar *streaming-connection* nil)
(defvar *streaming-request* nil)
(defvar *streaming-buffer* nil)
(defvar *streaming-data* nil)
(defun url-encode (e)
(si:www-url-encode
(convert-encoding-from-internal *encoding-utf8n* (format nil "~A" e))))
(defun winhttp-session ()
(or *streaming-session*
(setf *streaming-session*
(winhttp:open :user-agent (format nil "xyzzy/~A" (software-version))
:async t)
)))
;; ref. https://dev.twitter.com/docs/streaming-api/methods
(defun streaming-connect (params &key user callback)
(let* ((cn (winhttp:connect (winhttp-session) *stream-api-host*))
(path *stream-api-method*)
(cred (list :consumer-key *consumer-key* :consumer-secret *consumer-secret*
:token *token* :token-secret *token-secret*))
(auth-url (format nil "https://~A~A" *stream-api-host* *stream-api-method*))
(auth (oauth:auth-header cred *stream-api-verb* auth-url params))
(req (winhttp:open-request cn *stream-api-verb* path :secure t)))
(setq *streaming-connection* cn)
(setq *streaming-request* req)
(message "Connecting stream.twitter.com ...")
;; リクエストヘッダの設定
(winhttp:add-request-headers req `(:Connection "Keep-Alive"
:Authorization ,auth
))
(when (member *stream-api-verb* '(post put) :test #'string-equal)
(winhttp:add-request-headers req `(:Content-Type "application/x-www-form-urlencoded")))
;; コールバックの設定
(setf (winhttp:request-context req) callback)
(winhttp:set-status-callback req 'streaming-callback)
;; リクエスト送信開始
(winhttp:send-request req :data (format nil "~{~A=~A~^&~}" (mapcar #'url-encode params)))
))
(defun streaming-callback (&rest args)
(flet ((fire (event req &rest args)
(handler-case
(apply (winhttp:request-context req) event req args)
(error (c)
(msgbox "~A" c)))))
(alexandria:destructuring-case args
;; リクエスト送信完了
((:send-request-complete req)
(winhttp:receive-response req))
;; レスポンスヘッダ受信完了
((:headers-available req)
(case (winhttp:query-response-header req :status-code :type :number)
(200
(fire :onconnect req)
(winhttp:query-data-available req))
(t
(fire :onerror req)
(winhttp:close-handle req))))
;; レスポンスボディ受信チェック
((:data-available req n)
(if (< 0 n)
(winhttp:read-data req n)
(start-timer 1 #'(lambda ()
(winhttp:query-data-available req)))))
;; レスポンスボディ受信
((:read-complete req data n)
(fire :ondata req data n)
(winhttp:query-data-available req))
;; レスポンスボディ受信完了
((:handle-closing req hinternet)
(fire :onclose req))
;; 送信エラー
((:request-error req type error)
(fire :onerror req type error)
(winhttp:close-handle req))
((otherwise req &rest params)
(message "~S" (cons (car args) params)))
)))
(defun streaming-close ()
(when *streaming-connection*
(winhttp:close-handle *streaming-connection*)
(setq *streaming-connection* nil))
(when *streaming-request*
(winhttp:close-handle *streaming-request*)
(setq *streaming-request* nil)))
(defun streaming-receive-statuses (data)
(let ((statuses nil))
(setf *streaming-data* (concat *streaming-data* data))
(while (string-match "\\(\r\n\\|[\r\n]\\)" *streaming-data*)
(let ((json-txt (substring *streaming-data* 0 (match-beginning 0))))
(setf *streaming-data* (substring *streaming-data* (match-end 0)))
(unless (string= json-txt "")
(push (json:json-decode json-txt) statuses))))
(nreverse statuses)))
(defun streaming-start (params buf &key (user *default-user*))
(labels ((closer (b)
(when (eq b buf)
(delete-hook '*before-delete-buffer-hook* #'closer)
(streaming-close)
(setq *streaming-buffer* nil))
t))
(set-buffer buf)
(streaming-connect
params
:user user
:callback #'(lambda (&rest args)
(alexandria:destructuring-case args
((:onconnect req)
(message "接続しました")
(set-buffer buf)
(make-local-variable '*before-delete-buffer-hook*)
(add-hook '*before-delete-buffer-hook* #'closer)
(setq *streaming-buffer* buf))
((:ondata req data n)
(if (deleted-buffer-p buf)
(winhttp:close-handle req)
(whenlet res (streaming-receive-statuses data)
(with-set-buffer
(timeline-draw-statuses buf res))
(refresh-screen))))
((:onclose req)
(streaming-close))
((:onerror req type error)
(unless (eq (winhttp:winhttp-condition-error-keyword error)
:operation-cancelled)
(message-box (format nil "~A~%~A"
(winhttp:query-request-header req :raw-headers-crlf)
(winhttp:query-response-header req :raw-headers-crlf))
"接続失敗"
(list :ok :exclamation))))
))
)))
;;; command
(defun user::xyttr-stream-track (track)
(interactive "sTrack: ")
(when *streaming-connection*
(if (eq (message-box (format nil "~Aでのストリーミングを終了しますか?"
(buffer-name *streaming-buffer*))
"確認" (list :question :yes-no)) :yes)
(streaming-close)
(quit)))
(setq *streaming-connection* nil
*streaming-request* nil
*streaming-buffer* nil)
(let ((buf (get-buffer-create (format nil "*tws*: ~A" track))))
(when *use-frame*
(select-pseudo-frame
(or (find-pseudo-frame *frame-name*)
(new-pseudo-frame *frame-name*))))
(set-buffer buf)
(xyttr-timeline-mode)
; (setf (modeconf-mode buffer-modeconf) :xyttr-stream-track)
(streaming-start `(:track ,track) buf :user *default-user*)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment