Created
January 26, 2014 14:04
-
-
Save hongruiqi/8633181 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"errors" | |
"log" | |
"net/http" | |
"sync" | |
"time" | |
) | |
const ( | |
AIRQUALITY_URL = "http://api100.duapp.com/airquality/?appkey=trialuser&city=" | |
AIRQUALITY_CACHE_TIMEOUT = 5 * time.Minute | |
RESPONSE_TIMEOUT = 30 * time.Second | |
) | |
type AirQuality struct { | |
Title string | |
Description string | |
PicUrl string | |
Url string | |
} | |
type AirQualityCacheEntry struct { | |
Quality *AirQuality | |
Time time.Time | |
} | |
// AirQuality的缓存 | |
// 从缓存中取AirQuality时,若缓存超时,则从缓存中删除 | |
type AirQualityCache struct { | |
cache map[string]*AirQualityCacheEntry | |
cacheLock sync.Mutex | |
timeout time.Duration | |
} | |
// timeout指定缓存超时时间 | |
func NewAirQualityCache(timeout time.Duration) *AirQualityCache { | |
return &AirQualityCache{cache: make(map[string]*AirQualityCacheEntry), timeout: timeout} | |
} | |
// 根据city从缓存中获取空气质量,若不在缓存中,返回nil | |
func (h *AirQualityCache) Get(city string) *AirQuality { | |
h.cacheLock.Lock() | |
defer h.cacheLock.Unlock() | |
if entry, ok := h.cache[city]; !ok { | |
return nil | |
} else if time.Now().Sub(entry.Time) > h.timeout { // 超时删除 | |
log.Printf("delete %s' air quality from cache: timeout", city) | |
delete(h.cache, city) | |
return nil | |
} else { | |
return entry.Quality | |
} | |
} | |
// 设置缓存项 | |
func (h *AirQualityCache) Set(city string, quality *AirQuality) { | |
now := time.Now() | |
entry := &AirQualityCacheEntry{Quality: quality, Time: now} | |
log.Printf("store %s's air quality into cache", city) | |
h.cacheLock.Lock() | |
defer h.cacheLock.Unlock() | |
h.cache[city] = entry | |
} | |
type AirQualityAndError struct { | |
*AirQuality | |
Error error | |
} | |
// 查询空气质量的处理器 | |
type AirQualityHandler struct { | |
cache *AirQualityCache | |
fetchMutex map[string]*sync.Mutex | |
mu sync.Mutex | |
} | |
func NewAirQualityHandler() *AirQualityHandler { | |
return &AirQualityHandler{ | |
cache: NewAirQualityCache(AIRQUALITY_CACHE_TIMEOUT), | |
fetchMutex: make(map[string]*sync.Mutex), | |
} | |
} | |
// 返回每个城市独立的锁,若不存在则自动创建 | |
func (h *AirQualityHandler) getCityLock(city string) *sync.Mutex { | |
h.mu.Lock() | |
defer h.mu.Unlock() | |
if muCity, ok := h.fetchMutex[city]; !ok { | |
muCity = new(sync.Mutex) | |
h.fetchMutex[city] = muCity | |
return muCity | |
} else { | |
return muCity | |
} | |
} | |
// 返回空气质量 | |
// 首先在缓存中查找,若存在,则直接返回,否则 | |
// 通过API接口获取数据,并存储到缓存中 | |
func (h *AirQualityHandler) fetchAirQuality(city string) (quality *AirQuality, err error) { | |
lockCity := h.getCityLock(city) | |
lockCity.Lock() | |
defer lockCity.Unlock() | |
// 尝试从缓存中获取 | |
if quality = h.cache.Get(city); quality != nil { | |
log.Printf("air quality of %s get from cache", city) | |
return | |
} | |
// 通过API获取 | |
resp, err := http.Get(AIRQUALITY_URL + city) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
// 处理返回数据 | |
var ret []*AirQuality | |
err = json.NewDecoder(resp.Body).Decode(&ret) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
if len(ret) < 1 { | |
log.Printf("invalid result: %v", ret) | |
err = errors.New("invalid result") | |
return | |
} | |
quality = ret[0] | |
// 存入缓存中 | |
h.cache.Set(city, quality) | |
log.Printf("air quality of %s get from net", city) | |
return | |
} | |
// 获取空气质量异步接口,返回传输空气质量和错误信息的channel | |
func (h *AirQualityHandler) getAirQuality(city string) chan *AirQualityAndError { | |
c := make(chan *AirQualityAndError, 1) | |
go func() { | |
quality, err := h.fetchAirQuality(city) | |
c <- &AirQualityAndError{AirQuality: quality, Error: err} | |
}() | |
return c | |
} | |
func (h *AirQualityHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
city := r.FormValue("city") | |
// 获取空气数据,若超过指定的响应时间,则后台继续获取 | |
// 同时向客户端响应服务器繁忙 | |
select { | |
case airQualityAndError := <-h.getAirQuality(city): | |
if airQualityAndError.Error != nil { | |
w.Write([]byte("服务器错误!")) | |
return | |
} | |
json.NewEncoder(w).Encode(airQualityAndError.AirQuality) | |
return | |
case <-time.After(RESPONSE_TIMEOUT): | |
w.Write([]byte("服务器繁忙,请稍后再试!")) | |
} | |
} | |
func main() { | |
http.Handle("/air_quality", NewAirQualityHandler()) | |
err := http.ListenAndServe(":8000", nil) | |
if err != nil { | |
panic(err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment