Skip to content

Instantly share code, notes, and snippets.

@hongruiqi
Created January 26, 2014 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hongruiqi/8633181 to your computer and use it in GitHub Desktop.
Save hongruiqi/8633181 to your computer and use it in GitHub Desktop.
package main
import (
"encoding/json"
"errors"
"log"
"net/http"
"sync"
"time"
)
const (
AIRQUALITY_URL = "http://api100.duapp.com/airquality/?appkey=trialuser&city="
AIRQUALITY_CACHE_TIMEOUT = 5 * time.Minute
RESPONSE_TIMEOUT = 30 * time.Second
)
type AirQuality struct {
Title string
Description string
PicUrl string
Url string
}
type AirQualityCacheEntry struct {
Quality *AirQuality
Time time.Time
}
// AirQuality的缓存
// 从缓存中取AirQuality时,若缓存超时,则从缓存中删除
type AirQualityCache struct {
cache map[string]*AirQualityCacheEntry
cacheLock sync.Mutex
timeout time.Duration
}
// timeout指定缓存超时时间
func NewAirQualityCache(timeout time.Duration) *AirQualityCache {
return &AirQualityCache{cache: make(map[string]*AirQualityCacheEntry), timeout: timeout}
}
// 根据city从缓存中获取空气质量,若不在缓存中,返回nil
func (h *AirQualityCache) Get(city string) *AirQuality {
h.cacheLock.Lock()
defer h.cacheLock.Unlock()
if entry, ok := h.cache[city]; !ok {
return nil
} else if time.Now().Sub(entry.Time) > h.timeout { // 超时删除
log.Printf("delete %s' air quality from cache: timeout", city)
delete(h.cache, city)
return nil
} else {
return entry.Quality
}
}
// 设置缓存项
func (h *AirQualityCache) Set(city string, quality *AirQuality) {
now := time.Now()
entry := &AirQualityCacheEntry{Quality: quality, Time: now}
log.Printf("store %s's air quality into cache", city)
h.cacheLock.Lock()
defer h.cacheLock.Unlock()
h.cache[city] = entry
}
type AirQualityAndError struct {
*AirQuality
Error error
}
// 查询空气质量的处理器
type AirQualityHandler struct {
cache *AirQualityCache
fetchMutex map[string]*sync.Mutex
mu sync.Mutex
}
func NewAirQualityHandler() *AirQualityHandler {
return &AirQualityHandler{
cache: NewAirQualityCache(AIRQUALITY_CACHE_TIMEOUT),
fetchMutex: make(map[string]*sync.Mutex),
}
}
// 返回每个城市独立的锁,若不存在则自动创建
func (h *AirQualityHandler) getCityLock(city string) *sync.Mutex {
h.mu.Lock()
defer h.mu.Unlock()
if muCity, ok := h.fetchMutex[city]; !ok {
muCity = new(sync.Mutex)
h.fetchMutex[city] = muCity
return muCity
} else {
return muCity
}
}
// 返回空气质量
// 首先在缓存中查找,若存在,则直接返回,否则
// 通过API接口获取数据,并存储到缓存中
func (h *AirQualityHandler) fetchAirQuality(city string) (quality *AirQuality, err error) {
lockCity := h.getCityLock(city)
lockCity.Lock()
defer lockCity.Unlock()
// 尝试从缓存中获取
if quality = h.cache.Get(city); quality != nil {
log.Printf("air quality of %s get from cache", city)
return
}
// 通过API获取
resp, err := http.Get(AIRQUALITY_URL + city)
if err != nil {
log.Println(err)
return
}
// 处理返回数据
var ret []*AirQuality
err = json.NewDecoder(resp.Body).Decode(&ret)
if err != nil {
log.Println(err)
return
}
if len(ret) < 1 {
log.Printf("invalid result: %v", ret)
err = errors.New("invalid result")
return
}
quality = ret[0]
// 存入缓存中
h.cache.Set(city, quality)
log.Printf("air quality of %s get from net", city)
return
}
// 获取空气质量异步接口,返回传输空气质量和错误信息的channel
func (h *AirQualityHandler) getAirQuality(city string) chan *AirQualityAndError {
c := make(chan *AirQualityAndError, 1)
go func() {
quality, err := h.fetchAirQuality(city)
c <- &AirQualityAndError{AirQuality: quality, Error: err}
}()
return c
}
func (h *AirQualityHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
city := r.FormValue("city")
// 获取空气数据,若超过指定的响应时间,则后台继续获取
// 同时向客户端响应服务器繁忙
select {
case airQualityAndError := <-h.getAirQuality(city):
if airQualityAndError.Error != nil {
w.Write([]byte("服务器错误!"))
return
}
json.NewEncoder(w).Encode(airQualityAndError.AirQuality)
return
case <-time.After(RESPONSE_TIMEOUT):
w.Write([]byte("服务器繁忙,请稍后再试!"))
}
}
func main() {
http.Handle("/air_quality", NewAirQualityHandler())
err := http.ListenAndServe(":8000", nil)
if err != nil {
panic(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment