Skip to content

Instantly share code, notes, and snippets.

@mccoder
Created June 17, 2010 19:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mccoder/442605 to your computer and use it in GitHub Desktop.
Save mccoder/442605 to your computer and use it in GitHub Desktop.
#encoding: utf-8
=begin
Copyright 2010 Denis Kokorin <virkdi@mail.ru>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
=end
include_pligins(['joffice'])
require 'never_block'
require 'nokogiri'
require 'feed-normalizer'
require 'sanitize'
require 'uri'
require 'digest/sha1'
module JOffice
module Rss
class RssUpdateService
ERROR_ATTEMPT=5
def subscription_helper
@subscription_helper||=JOffice::Rss::Datasource::SubscriptionHelper.new
end
def update_client
@update_client||=JOffice::Rss::UpdateClient.new
end
def html2text_xsl
@html2text_xsl||= Nokogiri::XSLT(File.read(File.join( File.dirname(__FILE__),'templates', 'html2text.xsl')))
end
def start
@update_thread=EventMachine::PeriodicTimer.new(15.minutes) do
spawn do
::XmppUser.update_timezone(0)
feeds=JOffice::Rss::Feed.find(:all, :conditions=>['is_active=? and (next_update_at <= ? or next_update_at=?) AND subscriptions_count>0 ', true, Time.new, nil])
update_feeds(feeds)
end
end
spawn do
::XmppUser.update_timezone(0)
JOffice::FiberLogger.block(:name=>"Update feeds", :flush=>true, :raise=>false) do
feeds=JOffice::Rss::Feed.find(:all, :conditions=>{:next_update_at=> nil, :is_active=>true})
update_feeds(feeds)
end
end
end
def spawn
Fiber.new { yield }.resume
end
def fiber_pool
@fiber_pool||=NB::Pool::FiberPool.new(5)
end
def update_feeds(feeds)
feeds.each do |feed|
spawn do
JOffice::FiberLogger.block(:name=>"Update feed", :flush=>true, :raise=>false) do |block|
last_updated=Time.new
uri=URI(feed.uri)
status, feed_parsed, content, headers=update_client.download(uri, feed.last_update_at, feed.id)
block.attributes[:status]=status
block.attributes[:uri]=feed.uri
Fiber.sleep_to_next_tick
case status.to_i
when 200 then feed_successful_updated(feed, feed_parsed, last_updated, uri)
when 301,302 then feed_not_moved(feed,status,content,headers, last_updated)
when 304 then feed_not_modified(feed, last_updated)
else feed_error_update(feed, status,content, last_updated)
end
end
end
end if feeds
end
def feed_successful_updated(feed, content, last_updated, uri)
case content
when Feedzirra::Parser::Atom then importer=JOffice::Rss::Import::Atom.new(feed, content, uri, last_updated, html2text_xsl)
when Feedzirra::Parser::RSS then importer=JOffice::Rss::Import::Rss.new(feed, content, uri, last_updated, html2text_xsl)
else
p "[#{Time.new}]Error unknown feed type #{content.class}"
pp feed
return nil
end
p "Update #{feed.uri} #{id_factory}" if $debug
if importer.import
JOffice::Rss::Feed.transaction do
subscriptions=JOffice::Rss::Datasource::FeedSubscription.find_all_by_feed_id(feed.id)
unless subscriptions.blank?
subscriptions_ids = subscriptions.map(&:id)
JOffice::Rss::Datasource::FeedSubscription.updated(subscriptions_ids)
subscription_helper.clean_unreaded_for_subscriptions(subscriptions_ids)
end
end
end
#Step N. Remove entries older than 6 month
# if (feed.entries_count-feed.entries_in_feed)>10
# expired_entries=JOffice::Rss::Entry.find(:select=>'id', :conditions=>['feed_id=? and created_at<?',feed.id,Time.new-6.month])
# end
p "*"*50 if $debug
end
def feed_not_moved(feed,status,content,headers, last_updated)
p "Feed moved '#{feed.uri}'=#{status} to => #{headers['location']}"
uri=(headers['location'] || []).first
if uri && (uri[0..6]=='http://')
feed.uri_old=feed.uri
feed.uri=uri
else
feed.error_attempt+=1
feed.next_update_at=last_updated+((feed.error_attempt>ERROR_ATTEMPT) ? 3.hours : 30.minutes)
end
feed.save
end
def feed_not_modified(feed, last_updated)
feed.error_attempt=0
feed.update_interval=Math.min(60,feed.update_interval)
feed.next_update_at=last_updated+Math.max(10,feed.update_interval).minutes
feed.save
end
def feed_error_update(feed, status,content, last_updated)
p "Error update '#{feed.uri}'=#{status} '#{feed.error_attempt}'"
feed.next_update_at=last_updated+((feed.error_attempt>ERROR_ATTEMPT) ? 1.hours : 30.minutes)
feed.error_attempt+=1
feed.save
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment