1
0
mirror of https://github.com/chai2010/advanced-go-programming-book.git synced 2025-05-24 04:22:22 +00:00
This commit is contained in:
chai2010 2018-12-17 18:08:30 +08:00
parent 9ad62fe063
commit 6e50377cee
15 changed files with 1129 additions and 1109 deletions

View File

@ -28,24 +28,24 @@ package main
import (...) import (...)
func echo(wr http.ResponseWriter, r *http.Request) { func echo(wr http.ResponseWriter, r *http.Request) {
msg, err := ioutil.ReadAll(r.Body) msg, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
wr.Write([]byte("echo error")) wr.Write([]byte("echo error"))
return return
} }
writeLen, err := wr.Write(msg) writeLen, err := wr.Write(msg)
if err != nil || writeLen != len(msg) { if err != nil || writeLen != len(msg) {
log.Println(err, "write len:", writeLen) log.Println(err, "write len:", writeLen)
} }
} }
func main() { func main() {
http.HandleFunc("/", echo) http.HandleFunc("/", echo)
err := http.ListenAndServe(":8080", nil) err := http.ListenAndServe(":8080", nil)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
``` ```
@ -57,15 +57,15 @@ func main() {
```go ```go
//Burrow: http_server.go //Burrow: http_server.go
func NewHttpServer(app *ApplicationContext) (*HttpServer, error) { func NewHttpServer(app *ApplicationContext) (*HttpServer, error) {
... ...
server.mux.HandleFunc("/", handleDefault) server.mux.HandleFunc("/", handleDefault)
server.mux.HandleFunc("/burrow/admin", handleAdmin) server.mux.HandleFunc("/burrow/admin", handleAdmin)
server.mux.Handle("/v2/kafka", appHandler{server.app, handleClusterList}) server.mux.Handle("/v2/kafka", appHandler{server.app, handleClusterList})
server.mux.Handle("/v2/kafka/", appHandler{server.app, handleKafka}) server.mux.Handle("/v2/kafka/", appHandler{server.app, handleKafka})
server.mux.Handle("/v2/zookeeper", appHandler{server.app, handleClusterList}) server.mux.Handle("/v2/zookeeper", appHandler{server.app, handleClusterList})
... ...
} }
``` ```
@ -83,66 +83,66 @@ func NewHttpServer(app *ApplicationContext) (*HttpServer, error) {
```go ```go
func handleKafka(app *ApplicationContext, w http.ResponseWriter, r *http.Request) (int, string) { func handleKafka(app *ApplicationContext, w http.ResponseWriter, r *http.Request) (int, string) {
pathParts := strings.Split(r.URL.Path[1:], "/") pathParts := strings.Split(r.URL.Path[1:], "/")
if _, ok := app.Config.Kafka[pathParts[2]]; !ok { if _, ok := app.Config.Kafka[pathParts[2]]; !ok {
return makeErrorResponse(http.StatusNotFound, "cluster not found", w, r) return makeErrorResponse(http.StatusNotFound, "cluster not found", w, r)
} }
if pathParts[2] == "" { if pathParts[2] == "" {
// Allow a trailing / on requests // Allow a trailing / on requests
return handleClusterList(app, w, r) return handleClusterList(app, w, r)
} }
if (len(pathParts) == 3) || (pathParts[3] == "") { if (len(pathParts) == 3) || (pathParts[3] == "") {
return handleClusterDetail(app, w, r, pathParts[2]) return handleClusterDetail(app, w, r, pathParts[2])
} }
switch pathParts[3] { switch pathParts[3] {
case "consumer": case "consumer":
switch { switch {
case r.Method == "DELETE": case r.Method == "DELETE":
switch { switch {
case (len(pathParts) == 5) || (pathParts[5] == ""): case (len(pathParts) == 5) || (pathParts[5] == ""):
return handleConsumerDrop(app, w, r, pathParts[2], pathParts[4]) return handleConsumerDrop(app, w, r, pathParts[2], pathParts[4])
default: default:
return makeErrorResponse(http.StatusMethodNotAllowed, "request method not supported", w, r) return makeErrorResponse(http.StatusMethodNotAllowed, "request method not supported", w, r)
} }
case r.Method == "GET": case r.Method == "GET":
switch { switch {
case (len(pathParts) == 4) || (pathParts[4] == ""): case (len(pathParts) == 4) || (pathParts[4] == ""):
return handleConsumerList(app, w, r, pathParts[2]) return handleConsumerList(app, w, r, pathParts[2])
case (len(pathParts) == 5) || (pathParts[5] == ""): case (len(pathParts) == 5) || (pathParts[5] == ""):
// Consumer detail - list of consumer streams/hosts? Can be config info later // Consumer detail - list of consumer streams/hosts? Can be config info later
return makeErrorResponse(http.StatusNotFound, "unknown API call", w, r) return makeErrorResponse(http.StatusNotFound, "unknown API call", w, r)
case pathParts[5] == "topic": case pathParts[5] == "topic":
switch { switch {
case (len(pathParts) == 6) || (pathParts[6] == ""): case (len(pathParts) == 6) || (pathParts[6] == ""):
return handleConsumerTopicList(app, w, r, pathParts[2], pathParts[4]) return handleConsumerTopicList(app, w, r, pathParts[2], pathParts[4])
case (len(pathParts) == 7) || (pathParts[7] == ""): case (len(pathParts) == 7) || (pathParts[7] == ""):
return handleConsumerTopicDetail(app, w, r, pathParts[2], pathParts[4], pathParts[6]) return handleConsumerTopicDetail(app, w, r, pathParts[2], pathParts[4], pathParts[6])
} }
case pathParts[5] == "status": case pathParts[5] == "status":
return handleConsumerStatus(app, w, r, pathParts[2], pathParts[4], false) return handleConsumerStatus(app, w, r, pathParts[2], pathParts[4], false)
case pathParts[5] == "lag": case pathParts[5] == "lag":
return handleConsumerStatus(app, w, r, pathParts[2], pathParts[4], true) return handleConsumerStatus(app, w, r, pathParts[2], pathParts[4], true)
} }
default: default:
return makeErrorResponse(http.StatusMethodNotAllowed, "request method not supported", w, r) return makeErrorResponse(http.StatusMethodNotAllowed, "request method not supported", w, r)
} }
case "topic": case "topic":
switch { switch {
case r.Method != "GET": case r.Method != "GET":
return makeErrorResponse(http.StatusMethodNotAllowed, "request method not supported", w, r) return makeErrorResponse(http.StatusMethodNotAllowed, "request method not supported", w, r)
case (len(pathParts) == 4) || (pathParts[4] == ""): case (len(pathParts) == 4) || (pathParts[4] == ""):
return handleBrokerTopicList(app, w, r, pathParts[2]) return handleBrokerTopicList(app, w, r, pathParts[2])
case (len(pathParts) == 5) || (pathParts[5] == ""): case (len(pathParts) == 5) || (pathParts[5] == ""):
return handleBrokerTopicDetail(app, w, r, pathParts[2], pathParts[4]) return handleBrokerTopicDetail(app, w, r, pathParts[2], pathParts[4])
} }
case "offsets": case "offsets":
// Reserving this endpoint to implement later // Reserving this endpoint to implement later
return makeErrorResponse(http.StatusNotFound, "unknown API call", w, r) return makeErrorResponse(http.StatusNotFound, "unknown API call", w, r)
} }
// If we fell through, return a 404 // If we fell through, return a 404
return makeErrorResponse(http.StatusNotFound, "unknown API call", w, r) return makeErrorResponse(http.StatusNotFound, "unknown API call", w, r)
} }
``` ```

View File

@ -6,15 +6,15 @@ restful 是几年前刮起的 API 设计风潮,在 restful 中除了 GET 和 P
```go ```go
const ( const (
MethodGet = "GET" MethodGet = "GET"
MethodHead = "HEAD" MethodHead = "HEAD"
MethodPost = "POST" MethodPost = "POST"
MethodPut = "PUT" MethodPut = "PUT"
MethodPatch = "PATCH" // RFC 5789 MethodPatch = "PATCH" // RFC 5789
MethodDelete = "DELETE" MethodDelete = "DELETE"
MethodConnect = "CONNECT" MethodConnect = "CONNECT"
MethodOptions = "OPTIONS" MethodOptions = "OPTIONS"
MethodTrace = "TRACE" MethodTrace = "TRACE"
) )
``` ```
@ -57,15 +57,15 @@ panic: wildcard route ':id' conflicts with existing children in path '/user/:id'
goroutine 1 [running]: goroutine 1 [running]:
github.com/cch123/httprouter.(*node).insertChild(0xc4200801e0, 0xc42004fc01, 0x126b177, 0x3, 0x126b171, 0x9, 0x127b668) github.com/cch123/httprouter.(*node).insertChild(0xc4200801e0, 0xc42004fc01, 0x126b177, 0x3, 0x126b171, 0x9, 0x127b668)
/Users/caochunhui/go_work/src/github.com/cch123/httprouter/tree.go:256 +0x841 /Users/caochunhui/go_work/src/github.com/cch123/httprouter/tree.go:256 +0x841
github.com/cch123/httprouter.(*node).addRoute(0xc4200801e0, 0x126b171, 0x9, 0x127b668) github.com/cch123/httprouter.(*node).addRoute(0xc4200801e0, 0x126b171, 0x9, 0x127b668)
/Users/caochunhui/go_work/src/github.com/cch123/httprouter/tree.go:221 +0x22a /Users/caochunhui/go_work/src/github.com/cch123/httprouter/tree.go:221 +0x22a
github.com/cch123/httprouter.(*Router).Handle(0xc42004ff38, 0x126a39b, 0x3, 0x126b171, 0x9, 0x127b668) github.com/cch123/httprouter.(*Router).Handle(0xc42004ff38, 0x126a39b, 0x3, 0x126b171, 0x9, 0x127b668)
/Users/caochunhui/go_work/src/github.com/cch123/httprouter/router.go:262 +0xc3 /Users/caochunhui/go_work/src/github.com/cch123/httprouter/router.go:262 +0xc3
github.com/cch123/httprouter.(*Router).GET(0xc42004ff38, 0x126b171, 0x9, 0x127b668) github.com/cch123/httprouter.(*Router).GET(0xc42004ff38, 0x126b171, 0x9, 0x127b668)
/Users/caochunhui/go_work/src/github.com/cch123/httprouter/router.go:193 +0x5e /Users/caochunhui/go_work/src/github.com/cch123/httprouter/router.go:193 +0x5e
main.main() main.main()
/Users/caochunhui/test/go_web/httprouter_learn2.go:18 +0xaf /Users/caochunhui/test/go_web/httprouter_learn2.go:18 +0xaf
exit status 2 exit status 2
``` ```
@ -88,16 +88,16 @@ Pattern: /src/*filepath
```go ```go
r := httprouter.New() r := httprouter.New()
r.NotFound = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r.NotFound = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("oh no, not found")) w.Write([]byte("oh no, not found"))
}) })
``` ```
或者内部 panic 的时候: 或者内部 panic 的时候:
```go ```go
r.PanicHandler = func(w http.ResponseWriter, r *http.Request, c interface{}) { r.PanicHandler = func(w http.ResponseWriter, r *http.Request, c interface{}) {
log.Printf("Recovering from panic, Reason: %#v", c.(error)) log.Printf("Recovering from panic, Reason: %#v", c.(error))
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(c.(error).Error())) w.Write([]byte(c.(error).Error()))
} }
``` ```
@ -143,9 +143,9 @@ httprouter 的 Router struct 中存储压缩字典树使用的是下述数据结
```go ```go
// 略去了其它部分的 Router struct // 略去了其它部分的 Router struct
type Router struct { type Router struct {
// ... // ...
trees map[string]*node trees map[string]*node
// ... // ...
} }
``` ```

View File

@ -11,13 +11,13 @@
package main package main
func hello(wr http.ResponseWriter, r *http.Request) { func hello(wr http.ResponseWriter, r *http.Request) {
wr.Write([]byte("hello")) wr.Write([]byte("hello"))
} }
func main() { func main() {
http.HandleFunc("/", hello) http.HandleFunc("/", hello)
err := http.ListenAndServe(":8080", nil) err := http.ListenAndServe(":8080", nil)
... ...
} }
``` ```
@ -30,10 +30,10 @@ func main() {
var logger = log.New(os.Stdout, "", 0) var logger = log.New(os.Stdout, "", 0)
func hello(wr http.ResponseWriter, r *http.Request) { func hello(wr http.ResponseWriter, r *http.Request) {
timeStart := time.Now() timeStart := time.Now()
wr.Write([]byte("hello")) wr.Write([]byte("hello"))
timeElapsed := time.Since(timeStart) timeElapsed := time.Since(timeStart)
logger.Println(timeElapsed) logger.Println(timeElapsed)
} }
``` ```
@ -47,30 +47,30 @@ func hello(wr http.ResponseWriter, r *http.Request) {
package main package main
func helloHandler(wr http.ResponseWriter, r *http.Request) { func helloHandler(wr http.ResponseWriter, r *http.Request) {
... // ...
} }
func showInfoHandler(wr http.ResponseWriter, r *http.Request) { func showInfoHandler(wr http.ResponseWriter, r *http.Request) {
... // ...
} }
func showEmailHandler(wr http.ResponseWriter, r *http.Request) { func showEmailHandler(wr http.ResponseWriter, r *http.Request) {
... // ...
} }
func showFriendsHandler(wr http.ResponseWriter, r *http.Request) { func showFriendsHandler(wr http.ResponseWriter, r *http.Request) {
timeStart := time.Now() timeStart := time.Now()
wr.Write([]byte("your friends is tom and alex")) wr.Write([]byte("your friends is tom and alex"))
timeElapsed := time.Since(timeStart) timeElapsed := time.Since(timeStart)
logger.Println(timeElapsed) logger.Println(timeElapsed)
} }
func main() { func main() {
http.HandleFunc("/", helloHandler) http.HandleFunc("/", helloHandler)
http.HandleFunc("/info/show", showInfoHandler) http.HandleFunc("/info/show", showInfoHandler)
http.HandleFunc("/email/show", showEmailHandler) http.HandleFunc("/email/show", showEmailHandler)
http.HandleFunc("/friends/show", showFriendsHandler) http.HandleFunc("/friends/show", showFriendsHandler)
... // ...
} }
``` ```
@ -83,12 +83,12 @@ func main() {
```go ```go
func helloHandler(wr http.ResponseWriter, r *http.Request) { func helloHandler(wr http.ResponseWriter, r *http.Request) {
timeStart := time.Now() timeStart := time.Now()
wr.Write([]byte("hello")) wr.Write([]byte("hello"))
timeElapsed := time.Since(timeStart) timeElapsed := time.Since(timeStart)
logger.Println(timeElapsed) logger.Println(timeElapsed)
// 新增耗时上报 // 新增耗时上报
metrics.Upload("timeHandler", timeElapsed) metrics.Upload("timeHandler", timeElapsed)
} }
``` ```
@ -103,25 +103,25 @@ func helloHandler(wr http.ResponseWriter, r *http.Request) {
```go ```go
func hello(wr http.ResponseWriter, r *http.Request) { func hello(wr http.ResponseWriter, r *http.Request) {
wr.Write([]byte("hello")) wr.Write([]byte("hello"))
} }
func timeMiddleware(next http.Handler) http.Handler { func timeMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(wr http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(wr http.ResponseWriter, r *http.Request) {
timeStart := time.Now() timeStart := time.Now()
// next handler // next handler
next.ServeHTTP(wr, r) next.ServeHTTP(wr, r)
timeElapsed := time.Since(timeStart) timeElapsed := time.Since(timeStart)
logger.Println(timeElapsed) logger.Println(timeElapsed)
}) })
} }
func main() { func main() {
http.Handle("/", timeMiddleware(http.HandlerFunc(hello))) http.Handle("/", timeMiddleware(http.HandlerFunc(hello)))
err := http.ListenAndServe(":8080", nil) err := http.ListenAndServe(":8080", nil)
... ...
} }
``` ```
@ -129,7 +129,7 @@ func main() {
```go ```go
type Handler interface { type Handler interface {
ServeHTTP(ResponseWriter, *Request) ServeHTTP(ResponseWriter, *Request)
} }
``` ```
@ -137,13 +137,13 @@ type Handler interface {
```go ```go
type Handler interface { type Handler interface {
ServeHTTP(ResponseWriter, *Request) ServeHTTP(ResponseWriter, *Request)
} }
type HandlerFunc func(ResponseWriter, *Request) type HandlerFunc func(ResponseWriter, *Request)
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request)
f(w, r) f(w, r)
} }
``` ```
@ -188,19 +188,19 @@ customizedHandler = logger(timeout(ratelimit(helloHandler)))
再直白一些,这个流程在进行请求处理的时候实际上就是不断地进行函数压栈再出栈,有一些类似于递归的执行流: 再直白一些,这个流程在进行请求处理的时候实际上就是不断地进行函数压栈再出栈,有一些类似于递归的执行流:
``` ```
[exec of logger logic] 函数栈: [] [exec of logger logic] 函数栈: []
[exec of timeout logic] 函数栈: [logger] [exec of timeout logic] 函数栈: [logger]
[exec of ratelimit logic] 函数栈: [timeout/logger] [exec of ratelimit logic] 函数栈: [timeout/logger]
[exec of helloHandler logic] 函数栈: [ratelimit/timeout/logger] [exec of helloHandler logic] 函数栈: [ratelimit/timeout/logger]
[exec of ratelimit logic part2] 函数栈: [timeout/logger] [exec of ratelimit logic part2] 函数栈: [timeout/logger]
[exec of timeout logic part2] 函数栈: [logger] [exec of timeout logic part2] 函数栈: [logger]
[exec of logger logic part2] 函数栈: [] [exec of logger logic part2] 函数栈: []
``` ```
功能实现了,但在上面的使用过程中我们也看到了,这种函数套函数的用法不是很美观,同时也不具备什么可读性。 功能实现了,但在上面的使用过程中我们也看到了,这种函数套函数的用法不是很美观,同时也不具备什么可读性。
@ -227,26 +227,26 @@ r.Add("/", helloHandler)
type middleware func(http.Handler) http.Handler type middleware func(http.Handler) http.Handler
type Router struct { type Router struct {
middlewareChain [] middleware middlewareChain [] middleware
mux map[string] http.Handler mux map[string] http.Handler
} }
func NewRouter() *Router{ func NewRouter() *Router{
return &Router{} return &Router{}
} }
func (r *Router) Use(m middleware) { func (r *Router) Use(m middleware) {
r.middlewareChain = append(r.middlewareChain, m) r.middlewareChain = append(r.middlewareChain, m)
} }
func (r *Router) Add(route string, h http.Handler) { func (r *Router) Add(route string, h http.Handler) {
var mergedHandler = h var mergedHandler = h
for i := len(r.middlewareChain) - 1; i >= 0; i-- { for i := len(r.middlewareChain) - 1; i >= 0; i-- {
mergedHandler = r.middlewareChain[i](mergedHandler) mergedHandler = r.middlewareChain[i](mergedHandler)
} }
r.mux[route] = mergedHandler r.mux[route] = mergedHandler
} }
``` ```

View File

@ -12,31 +12,31 @@
```go ```go
type RegisterReq struct { type RegisterReq struct {
Username string `json:"username"` Username string `json:"username"`
PasswordNew string `json:"password_new"` PasswordNew string `json:"password_new"`
PasswordRepeat string `json:"password_repeat"` PasswordRepeat string `json:"password_repeat"`
Email string `json:"email"` Email string `json:"email"`
} }
func register(req RegisterReq) error{ func register(req RegisterReq) error{
if len(req.Username) > 0 { if len(req.Username) > 0 {
if len(req.PasswordNew) > 0 && len(req.PasswordRepeat) > 0 { if len(req.PasswordNew) > 0 && len(req.PasswordRepeat) > 0 {
if req.PasswordNew == req.PasswordRepeat { if req.PasswordNew == req.PasswordRepeat {
if emailFormatValid(req.Email) { if emailFormatValid(req.Email) {
createUser() createUser()
return nil return nil
} else { } else {
return errors.New("invalid email") return errors.New("invalid email")
} }
} else { } else {
return errors.New("password and reinput must be the same") return errors.New("password and reinput must be the same")
} }
} else { } else {
return errors.New("password and password reinput must be longer than 0") return errors.New("password and password reinput must be longer than 0")
} }
} else { } else {
return errors.New("length of username cannot be 0") return errors.New("length of username cannot be 0")
} }
} }
``` ```
@ -46,24 +46,24 @@ func register(req RegisterReq) error{
```go ```go
func register(req RegisterReq) error{ func register(req RegisterReq) error{
if len(req.Username) == 0 { if len(req.Username) == 0 {
return errors.New("length of username cannot be 0") return errors.New("length of username cannot be 0")
} }
if len(req.PasswordNew) == 0 || len(req.PasswordRepeat) == 0 { if len(req.PasswordNew) == 0 || len(req.PasswordRepeat) == 0 {
return errors.New("password and password reinput must be longer than 0") return errors.New("password and password reinput must be longer than 0")
} }
if req.PasswordNew != req.PasswordRepeat { if req.PasswordNew != req.PasswordRepeat {
return errors.New("password and reinput must be the same") return errors.New("password and reinput must be the same")
} }
if emailFormatValid(req.Email) { if emailFormatValid(req.Email) {
return errors.New("invalid email") return errors.New("invalid email")
} }
createUser() createUser()
return nil return nil
} }
``` ```
@ -81,25 +81,25 @@ func register(req RegisterReq) error{
import "gopkg.in/go-playground/validator.v9" import "gopkg.in/go-playground/validator.v9"
type RegisterReq struct { type RegisterReq struct {
// 字符串的 gt=0 表示长度必须 > 0gt = greater than // 字符串的 gt=0 表示长度必须 > 0gt = greater than
Username string `validate:"gt=0"` Username string `validate:"gt=0"`
// 同上 // 同上
PasswordNew string `validate:"gt=0"` PasswordNew string `validate:"gt=0"`
// eqfield 跨字段相等校验 // eqfield 跨字段相等校验
PasswordRepeat string `validate:"eqfield=PasswordNew"` PasswordRepeat string `validate:"eqfield=PasswordNew"`
// 合法 email 格式校验 // 合法 email 格式校验
Email string `validate:"email"` Email string `validate:"email"`
} }
validate := validator.New() validate := validator.New()
func validate(req RegisterReq) error { func validate(req RegisterReq) error {
err := validate.Struct(req) err := validate.Struct(req)
if err != nil { if err != nil {
doSomething() doSomething()
return err return err
} }
... ...
} }
``` ```
@ -112,14 +112,17 @@ func validate(req RegisterReq) error {
//... //...
var req = RegisterReq { var req = RegisterReq {
Username : "Xargin", Username : "Xargin",
PasswordNew : "ohno", PasswordNew : "ohno",
PasswordRepeat : "ohn", PasswordRepeat : "ohn",
Email : "alex@abc.com", Email : "alex@abc.com",
} }
err := validate(req) err := validate(req)
fmt.Println(err) // Key: 'RegisterReq.PasswordRepeat' Error:Field validation for 'PasswordRepeat' failed on the 'eqfield' tag fmt.Println(err)
// Key: 'RegisterReq.PasswordRepeat' Error:Field validation for
// 'PasswordRepeat' failed on the 'eqfield' tag
``` ```
如果觉得这个 validator 提供的错误信息不够人性化,例如要把错误信息返回给用户,那就不应该直接显示英文了。可以针对每种 tag 进行错误信息定制,读者可以自行探索。 如果觉得这个 validator 提供的错误信息不够人性化,例如要把错误信息返回给用户,那就不应该直接显示英文了。可以针对每种 tag 进行错误信息定制,读者可以自行探索。
@ -130,11 +133,11 @@ fmt.Println(err) // Key: 'RegisterReq.PasswordRepeat' Error:Field validation for
```go ```go
type Nested struct { type Nested struct {
Email string `validate:"email"` Email string `validate:"email"`
} }
type T struct { type T struct {
Age int `validate:"eq=10"` Age int `validate:"eq=10"`
Nested Nested Nested Nested
} }
``` ```
@ -150,77 +153,81 @@ type T struct {
package main package main
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
) )
type Nested struct { type Nested struct {
Email string `validate:"email"` Email string `validate:"email"`
} }
type T struct { type T struct {
Age int `validate:"eq=10"` Age int `validate:"eq=10"`
Nested Nested Nested Nested
} }
func validateEmail(input string) bool { func validateEmail(input string) bool {
if pass, _ := regexp.MatchString(`^([\w\.\_]{2,10})@(\w{1,}).([a-z]{2,4})$`, input); pass { if pass, _ := regexp.MatchString(
return true `^([\w\.\_]{2,10})@(\w{1,}).([a-z]{2,4})$`, input,
} ); pass {
return false return true
}
return false
} }
func validate(v interface{}) (bool, string) { func validate(v interface{}) (bool, string) {
validateResult := true validateResult := true
errmsg := "success" errmsg := "success"
vt := reflect.TypeOf(v) vt := reflect.TypeOf(v)
vv := reflect.ValueOf(v) vv := reflect.ValueOf(v)
for i := 0; i < vv.NumField(); i++ { for i := 0; i < vv.NumField(); i++ {
fieldVal := vv.Field(i) fieldVal := vv.Field(i)
tagContent := vt.Field(i).Tag.Get("validate") tagContent := vt.Field(i).Tag.Get("validate")
k := fieldVal.Kind() k := fieldVal.Kind()
switch k { switch k {
case reflect.Int: case reflect.Int:
val := fieldVal.Int() val := fieldVal.Int()
tagValStr := strings.Split(tagContent, "=") tagValStr := strings.Split(tagContent, "=")
tagVal, _ := strconv.ParseInt(tagValStr[1], 10, 64) tagVal, _ := strconv.ParseInt(tagValStr[1], 10, 64)
if val != tagVal { if val != tagVal {
errmsg = "validate int failed, tag is: "+ strconv.FormatInt(tagVal, 10) errmsg = "validate int failed, tag is: "+ strconv.FormatInt(
validateResult = false tagVal, 10,
} )
case reflect.String: validateResult = false
val := fieldVal.String() }
tagValStr := tagContent case reflect.String:
switch tagValStr { val := fieldVal.String()
case "email": tagValStr := tagContent
nestedResult := validateEmail(val) switch tagValStr {
if nestedResult == false { case "email":
errmsg = "validate mail failed, field val is: "+ val nestedResult := validateEmail(val)
validateResult = false if nestedResult == false {
} errmsg = "validate mail failed, field val is: "+ val
} validateResult = false
case reflect.Struct: }
// 如果有内嵌的 struct那么深度优先遍历 }
// 就是一个递归过程 case reflect.Struct:
valInter := fieldVal.Interface() // 如果有内嵌的 struct那么深度优先遍历
nestedResult, msg := validate(valInter) // 就是一个递归过程
if nestedResult == false { valInter := fieldVal.Interface()
nestedResult, msg := validate(valInter)
if nestedResult == false {
validateResult = false validateResult = false
errmsg = msg errmsg = msg
} }
} }
} }
return validateResult, errmsg return validateResult, errmsg
} }
func main() { func main() {
var a = T{Age: 10, Nested: Nested{Email: "abc@abc.com"}} var a = T{Age: 10, Nested: Nested{Email: "abc@abc.com"}}
validateResult, errmsg := validate(a) validateResult, errmsg := validate(a)
fmt.Println(validateResult, errmsg) fmt.Println(validateResult, errmsg)
} }
``` ```

View File

@ -23,7 +23,7 @@ import _ "github.com/go-sql-driver/mysql"
```go ```go
func init() { func init() {
sql.Register("mysql", &MySQLDriver{}) sql.Register("mysql", &MySQLDriver{})
} }
``` ```
@ -31,7 +31,7 @@ func init() {
```go ```go
type Driver interface { type Driver interface {
Open(name string) (Conn, error) Open(name string) (Conn, error)
} }
``` ```
@ -39,9 +39,9 @@ type Driver interface {
```go ```go
type Conn interface { type Conn interface {
Prepare(query string) (Stmt, error) Prepare(query string) (Stmt, error)
Close() error Close() error
Begin() (Tx, error) Begin() (Tx, error)
} }
``` ```
@ -53,46 +53,46 @@ type Conn interface {
package main package main
import ( import (
"database/sql" "database/sql"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
) )
func main() { func main() {
// db 是一个 sql.DB 类型的对象 // db 是一个 sql.DB 类型的对象
// 该对象线程安全,且内部已包含了一个连接池 // 该对象线程安全,且内部已包含了一个连接池
// 连接池的选项可以在 sql.DB 的方法中设置,这里为了简单省略了 // 连接池的选项可以在 sql.DB 的方法中设置,这里为了简单省略了
db, err := sql.Open("mysql", db, err := sql.Open("mysql",
"user:password@tcp(127.0.0.1:3306)/hello") "user:password@tcp(127.0.0.1:3306)/hello")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer db.Close() defer db.Close()
var ( var (
id int id int
name string name string
) )
rows, err := db.Query("select id, name from users where id = ?", 1) rows, err := db.Query("select id, name from users where id = ?", 1)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer rows.Close() defer rows.Close()
// 必须要把 rows 里的内容读完,或者显式调用 Close() 方法, // 必须要把 rows 里的内容读完,或者显式调用 Close() 方法,
// 否则在 defer 的 rows.Close() 执行之前,连接永远不会释放 // 否则在 defer 的 rows.Close() 执行之前,连接永远不会释放
for rows.Next() { for rows.Next() {
err := rows.Scan(&id, &name) err := rows.Scan(&id, &name)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
log.Println(id, name) log.Println(id, name)
} }
err = rows.Err() err = rows.Err()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
``` ```
@ -130,7 +130,7 @@ func main() {
# 伪代码 # 伪代码
shopList := [] shopList := []
for product in productList { for product in productList {
shopList = append(shopList, product.GetShop) shopList = append(shopList, product.GetShop)
} }
``` ```
@ -163,8 +163,8 @@ num, err := o.QueryTable("cardgroup").Filter("Cards__Card__Name", cardName).All(
```go ```go
where := map[string]interface{} { where := map[string]interface{} {
"order_id > ?" : 0, "order_id > ?" : 0,
"customer_id != ?" : 0, "customer_id != ?" : 0,
} }
limit := []int{0,100} limit := []int{0,100}
orderBy := []string{"id asc", "create_time desc"} orderBy := []string{"id asc", "create_time desc"}
@ -186,12 +186,12 @@ orders := orderModel.GetList(where, limit, orderBy)
```go ```go
where := map[string]interface{} { where := map[string]interface{} {
"product_id = ?" : 10, "product_id = ?" : 10,
"user_id = ?" : 1232 , "user_id = ?" : 1232 ,
} }
if order_id != 0 { if order_id != 0 {
where["order_id = ?"] = order_id where["order_id = ?"] = order_id
} }
res, err := historyModel.GetList(where, limit, orderBy) res, err := historyModel.GetList(where, limit, orderBy)
@ -209,7 +209,7 @@ res, err := historyModel.GetList(where, limit, orderBy)
```go ```go
const ( const (
getAllByProductIDAndCustomerID = `select * from p_orders where product_id in (:product_id) and customer_id=:customer_id` getAllByProductIDAndCustomerID = `select * from p_orders where product_id in (:product_id) and customer_id=:customer_id`
) )
// GetAllByProductIDAndCustomerID // GetAllByProductIDAndCustomerID
@ -217,25 +217,25 @@ const (
// @param rate_date // @param rate_date
// @return []Order, error // @return []Order, error
func GetAllByProductIDAndCustomerID(ctx context.Context, productIDs []uint64, customerID uint64) ([]Order, error) { func GetAllByProductIDAndCustomerID(ctx context.Context, productIDs []uint64, customerID uint64) ([]Order, error) {
var orderList []Order var orderList []Order
params := map[string]interface{}{ params := map[string]interface{}{
"product_id" : productIDs, "product_id" : productIDs,
"customer_id": customerID, "customer_id": customerID,
} }
// getAllByProductIDAndCustomerID 是 const 类型的 sql 字符串 // getAllByProductIDAndCustomerID 是 const 类型的 sql 字符串
sql, args, err := sqlutil.Named(getAllByProductIDAndCustomerID, params) sql, args, err := sqlutil.Named(getAllByProductIDAndCustomerID, params)
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = dao.QueryList(ctx, sqldbInstance, sql, args, &orderList) err = dao.QueryList(ctx, sqldbInstance, sql, args, &orderList)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return orderList, err return orderList, err
} }
``` ```

View File

@ -39,18 +39,18 @@ Core: 2
Threads: 4 Threads: 4
Graphics/Displays: Graphics/Displays:
Chipset Model: Intel Iris Graphics 6100 Chipset Model: Intel Iris Graphics 6100
Resolution: 2560 x 1600 Retina Resolution: 2560 x 1600 Retina
Memory Slots: Memory Slots:
Size: 4 GB Size: 4 GB
Speed: 1867 MHz Speed: 1867 MHz
Size: 4 GB Size: 4 GB
Speed: 1867 MHz Speed: 1867 MHz
Storage: Storage:
Size: 250.14 GB (250,140,319,744 bytes) Size: 250.14 GB (250,140,319,744 bytes)
Media Name: APPLE SSD SM0256G Media Media Name: APPLE SSD SM0256G Media
Size: 250.14 GB (250,140,319,744 bytes) Size: 250.14 GB (250,140,319,744 bytes)
Medium Type: SSD Medium Type: SSD
``` ```
测试结果: 测试结果:
@ -59,32 +59,32 @@ Storage:
~ wrk -c 10 -d 10s -t10 http://localhost:9090 ~ wrk -c 10 -d 10s -t10 http://localhost:9090
Running 10s test @ http://localhost:9090 Running 10s test @ http://localhost:9090
10 threads and 10 connections 10 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev Thread Stats Avg Stdev Max +/- Stdev
Latency 339.99us 1.28ms 44.43ms 98.29% Latency 339.99us 1.28ms 44.43ms 98.29%
Req/Sec 4.49k 656.81 7.47k 73.36% Req/Sec 4.49k 656.81 7.47k 73.36%
449588 requests in 10.10s, 54.88MB read 449588 requests in 10.10s, 54.88MB read
Requests/sec: 44513.22 Requests/sec: 44513.22
Transfer/sec: 5.43MB Transfer/sec: 5.43MB
~ wrk -c 10 -d 10s -t10 http://localhost:9090 ~ wrk -c 10 -d 10s -t10 http://localhost:9090
Running 10s test @ http://localhost:9090 Running 10s test @ http://localhost:9090
10 threads and 10 connections 10 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev Thread Stats Avg Stdev Max +/- Stdev
Latency 334.76us 1.21ms 45.47ms 98.27% Latency 334.76us 1.21ms 45.47ms 98.27%
Req/Sec 4.42k 633.62 6.90k 71.16% Req/Sec 4.42k 633.62 6.90k 71.16%
443582 requests in 10.10s, 54.15MB read 443582 requests in 10.10s, 54.15MB read
Requests/sec: 43911.68 Requests/sec: 43911.68
Transfer/sec: 5.36MB Transfer/sec: 5.36MB
~ wrk -c 10 -d 10s -t10 http://localhost:9090 ~ wrk -c 10 -d 10s -t10 http://localhost:9090
Running 10s test @ http://localhost:9090 Running 10s test @ http://localhost:9090
10 threads and 10 connections 10 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev Thread Stats Avg Stdev Max +/- Stdev
Latency 379.26us 1.34ms 44.28ms 97.62% Latency 379.26us 1.34ms 44.28ms 97.62%
Req/Sec 4.55k 591.64 8.20k 76.37% Req/Sec 4.55k 591.64 8.20k 76.37%
455710 requests in 10.10s, 55.63MB read 455710 requests in 10.10s, 55.63MB read
Requests/sec: 45118.57 Requests/sec: 45118.57
Transfer/sec: 5.51MB Transfer/sec: 5.51MB
``` ```
多次测试的结果在 4w 左右的 QPS 浮动,响应时间最多也就是 40ms 左右,对于一个 web 程序来说,这已经是很不错的成绩了,我们只是照抄了别人的示例代码,就完成了一个高性能的 `hello world` 服务器,是不是很有成就感? 多次测试的结果在 4w 左右的 QPS 浮动,响应时间最多也就是 40ms 左右,对于一个 web 程序来说,这已经是很不错的成绩了,我们只是照抄了别人的示例代码,就完成了一个高性能的 `hello world` 服务器,是不是很有成就感?
@ -135,7 +135,9 @@ func NewBucketWithRate(rate float64, capacity int64) *Bucket
```go ```go
func (tb *Bucket) Take(count int64) time.Duration {} func (tb *Bucket) Take(count int64) time.Duration {}
func (tb *Bucket) TakeAvailable(count int64) int64 {} func (tb *Bucket) TakeAvailable(count int64) int64 {}
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {} func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (
time.Duration, bool,
) {}
func (tb *Bucket) Wait(count int64) {} func (tb *Bucket) Wait(count int64) {}
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool {} func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool {}
``` ```
@ -223,22 +225,22 @@ current token cnt: 100 2018-06-16 18:17:50.313970334 +0800 CST m=+1.060937371
```go ```go
func TakeAvailable(block bool) bool{ func TakeAvailable(block bool) bool{
var takenResult bool var takenResult bool
if block { if block {
select { select {
case <-tokenBucket: case <-tokenBucket:
takenResult = true takenResult = true
} }
} else { } else {
select { select {
case <-tokenBucket: case <-tokenBucket:
takenResult = true takenResult = true
default: default:
takenResult = false takenResult = false
} }
} }
return takenResult return takenResult
} }
``` ```

View File

@ -29,7 +29,10 @@
这样我们 controller 中的入口函数就变成了下面这样: 这样我们 controller 中的入口函数就变成了下面这样:
```go ```go
func CreateOrder(ctx context.Context, req *CreateOrderStruct) (*CreateOrderRespStruct, error) { func CreateOrder(ctx context.Context, req *CreateOrderStruct) (
*CreateOrderRespStruct, error,
) {
// ...
} }
``` ```
@ -40,26 +43,26 @@ CreateOrder 有两个参数ctx 用来传入 trace_id 一类的需要串联请
```go ```go
// defined in protocol layer // defined in protocol layer
type CreateOrderRequest struct { type CreateOrderRequest struct {
OrderID int64 `json:"order_id"` OrderID int64 `json:"order_id"`
// ... // ...
} }
// defined in controller // defined in controller
type CreateOrderParams struct { type CreateOrderParams struct {
OrderID int64 OrderID int64
} }
func HTTPCreateOrderHandler(wr http.ResponseWriter, r *http.Request) { func HTTPCreateOrderHandler(wr http.ResponseWriter, r *http.Request) {
var req CreateOrderRequest var req CreateOrderRequest
var params CreateOrderParams var params CreateOrderParams
ctx := context.TODO() ctx := context.TODO()
// bind data to req // bind data to req
bind(r, &req) bind(r, &req)
// map protocol binded to protocol-independent // map protocol binded to protocol-independent
map(req, params) map(req, params)
logicResp,err := controller.CreateOrder(ctx, &params) logicResp,err := controller.CreateOrder(ctx, &params)
if err != nil {} if err != nil {}
// ... // ...
} }
``` ```
@ -72,27 +75,27 @@ func HTTPCreateOrderHandler(wr http.ResponseWriter, r *http.Request) {
```go ```go
// http request struct // http request struct
type CreateOrder struct { type CreateOrder struct {
OrderID int64 `json:"order_id" validate:"required"` OrderID int64 `json:"order_id" validate:"required"`
UserID int64 `json:"user_id" validate:"required"` UserID int64 `json:"user_id" validate:"required"`
ProductID int `json:"prod_id" validate:"required"` ProductID int `json:"prod_id" validate:"required"`
Addr string `json:"addr" validate:"required"` Addr string `json:"addr" validate:"required"`
} }
// thrift request struct // thrift request struct
type FeatureSetParams struct { type FeatureSetParams struct {
DriverID int64 `thrift:"driverID,1,required"` DriverID int64 `thrift:"driverID,1,required"`
OrderID int64 `thrift:"OrderID,2,required"` OrderID int64 `thrift:"OrderID,2,required"`
UserID int64 `thrift:"UserID,3,required"` UserID int64 `thrift:"UserID,3,required"`
ProductID int `thrift:"ProductID,4,required"` ProductID int `thrift:"ProductID,4,required"`
Addr string `thrift:"Addr,5,required"` Addr string `thrift:"Addr,5,required"`
} }
// controller input struct // controller input struct
type CreateOrderParams struct { type CreateOrderParams struct {
OrderID int64 OrderID int64
UserID int64 UserID int64
ProductID int ProductID int
Addr string Addr string
} }
``` ```
@ -101,11 +104,11 @@ type CreateOrderParams struct {
```go ```go
type FeatureSetParams struct { type FeatureSetParams struct {
DriverID int64 `thrift:"driverID,1,required" json:"driver_id"` DriverID int64 `thrift:"driverID,1,required" json:"driver_id"`
OrderID int64 `thrift:"OrderID,2,required" json:"order_id"` OrderID int64 `thrift:"OrderID,2,required" json:"order_id"`
UserID int64 `thrift:"UserID,3,required" json:"user_id"` UserID int64 `thrift:"UserID,3,required" json:"user_id"`
ProductID int `thrift:"ProductID,4,required" json:"prod_id"` ProductID int `thrift:"ProductID,4,required" json:"prod_id"`
Addr string `thrift:"Addr,5,required" json:"addr"` Addr string `thrift:"Addr,5,required" json:"addr"`
} }
``` ```

View File

@ -24,13 +24,13 @@
```go ```go
func BusinessProcess(ctx context.Context, params Params) (resp, error){ func BusinessProcess(ctx context.Context, params Params) (resp, error){
ValidateLogin() ValidateLogin()
ValidateParams() ValidateParams()
AntispamCheck() AntispamCheck()
GetPrice() GetPrice()
CreateOrder() CreateOrder()
UpdateUserStatus() UpdateUserStatus()
NotifyDownstreamSystems() NotifyDownstreamSystems()
} }
``` ```
@ -40,13 +40,13 @@ func BusinessProcess(ctx context.Context, params Params) (resp, error){
```go ```go
func CreateOrder() { func CreateOrder() {
ValidateDistrict() // 判断是否是地区限定商品 ValidateDistrict() // 判断是否是地区限定商品
ValidateVIPProduct() // 检查是否是只提供给 vip 的商品 ValidateVIPProduct() // 检查是否是只提供给 vip 的商品
GetUserInfo() // 从用户系统获取更详细的用户信息 GetUserInfo() // 从用户系统获取更详细的用户信息
GetProductDesc() // 从商品系统中获取商品在该时间点的详细信息 GetProductDesc() // 从商品系统中获取商品在该时间点的详细信息
DecrementStorage() // 扣减库存 DecrementStorage() // 扣减库存
CreateOrderSnapshot() // 创建订单快照 CreateOrderSnapshot() // 创建订单快照
return CreateSuccess return CreateSuccess
} }
``` ```
@ -63,12 +63,12 @@ func CreateOrder() {
```go ```go
// OrderCreator 创建订单流程 // OrderCreator 创建订单流程
type OrderCreator interface { type OrderCreator interface {
ValidateDistrict() // 判断是否是地区限定商品 ValidateDistrict() // 判断是否是地区限定商品
ValidateVIPProduct() // 检查是否是只提供给 vip 的商品 ValidateVIPProduct() // 检查是否是只提供给 vip 的商品
GetUserInfo() // 从用户系统获取更详细的用户信息 GetUserInfo() // 从用户系统获取更详细的用户信息
GetProductDesc() // 从商品系统中获取商品在该时间点的详细信息 GetProductDesc() // 从商品系统中获取商品在该时间点的详细信息
DecrementStorage() // 扣减库存 DecrementStorage() // 扣减库存
CreateOrderSnapshot() // 创建订单快照 CreateOrderSnapshot() // 创建订单快照
} }
``` ```
@ -86,30 +86,30 @@ type OrderCreator interface {
```go ```go
import ( import (
"sample.com/travelorder" "sample.com/travelorder"
"sample.com/marketorder" "sample.com/marketorder"
) )
func CreateOrder() { func CreateOrder() {
switch businessType { switch businessType {
case TravelBusiness: case TravelBusiness:
travelorder.CreateOrder() travelorder.CreateOrder()
case MarketBusiness: case MarketBusiness:
marketorder.CreateOrderForMarket() marketorder.CreateOrderForMarket()
default: default:
return errors.New("not supported business") return errors.New("not supported business")
} }
} }
func ValidateUser() { func ValidateUser() {
switch businessType { switch businessType {
case TravelBusiness: case TravelBusiness:
travelorder.ValidateUserVIP() travelorder.ValidateUserVIP()
case MarketBusiness: case MarketBusiness:
marketorder.ValidateUserRegistered() marketorder.ValidateUserRegistered()
default: default:
return errors.New("not supported business") return errors.New("not supported business")
} }
} }
// ... // ...
@ -122,35 +122,35 @@ switch ...
```go ```go
type BusinessInstance interface { type BusinessInstance interface {
ValidateLogin() ValidateLogin()
ValidateParams() ValidateParams()
AntispamCheck() AntispamCheck()
GetPrice() GetPrice()
CreateOrder() CreateOrder()
UpdateUserStatus() UpdateUserStatus()
NotifyDownstreamSystems() NotifyDownstreamSystems()
} }
func entry() { func entry() {
var bi BusinessInstance var bi BusinessInstance
switch businessType { switch businessType {
case TravelBusiness: case TravelBusiness:
bi = travelorder.New() bi = travelorder.New()
case MarketBusiness: case MarketBusiness:
bi = marketorder.New() bi = marketorder.New()
default: default:
return errors.New("not supported business") return errors.New("not supported business")
} }
} }
func BusinessProcess(bi BusinessInstance) { func BusinessProcess(bi BusinessInstance) {
bi.ValidateLogin() bi.ValidateLogin()
bi.ValidateParams() bi.ValidateParams()
bi.AntispamCheck() bi.AntispamCheck()
bi.GetPrice() bi.GetPrice()
bi.CreateOrder() bi.CreateOrder()
bi.UpdateUserStatus() bi.UpdateUserStatus()
bi.NotifyDownstreamSystems() bi.NotifyDownstreamSystems()
} }
``` ```
@ -162,7 +162,7 @@ Go 被人称道的最多的地方是其 interface 设计的正交性,模块之
```go ```go
type Writer interface { type Writer interface {
Write(p []byte) (n int, err error) Write(p []byte) (n int, err error)
} }
``` ```
@ -172,7 +172,7 @@ type Writer interface {
type MyType struct {} type MyType struct {}
func (m MyType) Write(p []byte) (n int, err error) { func (m MyType) Write(p []byte) (n int, err error) {
return 0, nil return 0, nil
} }
``` ```
@ -182,7 +182,7 @@ func (m MyType) Write(p []byte) (n int, err error) {
package log package log
func SetOutput(w io.Writer) { func SetOutput(w io.Writer) {
output = w output = w
} }
``` ```
@ -194,7 +194,7 @@ package my-business
import "xy.com/log" import "xy.com/log"
func init() { func init() {
log.SetOutput(MyType) log.SetOutput(MyType)
} }
``` ```
@ -208,8 +208,8 @@ func init() {
package main package main
type OrderCreator interface { type OrderCreator interface {
ValidateUser() ValidateUser()
CreateOrder() CreateOrder()
} }
type BookOrderCreator struct{} type BookOrderCreator struct{}
@ -217,12 +217,12 @@ type BookOrderCreator struct{}
func (boc BookOrderCreator) ValidateUser() {} func (boc BookOrderCreator) ValidateUser() {}
func createOrder(oc OrderCreator) { func createOrder(oc OrderCreator) {
oc.ValidateUser() oc.ValidateUser()
oc.CreateOrder() oc.CreateOrder()
} }
func main() { func main() {
createOrder(BookOrderCreator{}) createOrder(BookOrderCreator{})
} }
``` ```
@ -231,7 +231,7 @@ func main() {
```shell ```shell
# command-line-arguments # command-line-arguments
./a.go:18:30: cannot use BookOrderCreator literal (type BookOrderCreator) as type OrderCreator in argument to createOrder: ./a.go:18:30: cannot use BookOrderCreator literal (type BookOrderCreator) as type OrderCreator in argument to createOrder:
BookOrderCreator does not implement OrderCreator (missing CreateOrder method) BookOrderCreator does not implement OrderCreator (missing CreateOrder method)
``` ```
所以 interface 也可以认为是一种编译期进行检查的保证类型安全的手段。 所以 interface 也可以认为是一种编译期进行检查的保证类型安全的手段。
@ -242,15 +242,15 @@ func main() {
```go ```go
func entry() { func entry() {
var bi BusinessInstance var bi BusinessInstance
switch businessType { switch businessType {
case TravelBusiness: case TravelBusiness:
bi = travelorder.New() bi = travelorder.New()
case MarketBusiness: case MarketBusiness:
bi = marketorder.New() bi = marketorder.New()
default: default:
return errors.New("not supported business") return errors.New("not supported business")
} }
} }
``` ```
@ -258,12 +258,12 @@ func entry() {
```go ```go
var businessInstanceMap = map[int]BusinessInstance { var businessInstanceMap = map[int]BusinessInstance {
TravelBusiness : travelorder.New(), TravelBusiness : travelorder.New(),
MarketBusiness : marketorder.New(), MarketBusiness : marketorder.New(),
} }
func entry() { func entry() {
bi := businessInstanceMap[businessType] bi := businessInstanceMap[businessType]
} }
``` ```

View File

@ -32,12 +32,12 @@
```go ```go
// pass 3/1000 // pass 3/1000
func passed() bool { func passed() bool {
key := hashFunctions(userID) % 1000 key := hashFunctions(userID) % 1000
if key <= 2 { if key <= 2 {
return true return true
} }
return false return false
} }
``` ```
@ -61,7 +61,7 @@ func passed() bool {
```go ```go
func isTrue() bool { func isTrue() bool {
return true/false according to the rate provided by user return true/false according to the rate provided by user
} }
``` ```
@ -71,11 +71,11 @@ func isTrue() bool {
```go ```go
func isTrue(phone string) bool { func isTrue(phone string) bool {
if hash of phone matches { if hash of phone matches {
return true return true
} }
return false return false
} }
``` ```
@ -123,16 +123,16 @@ func isTrue(phone string) bool {
var cityID2Open = [12000]bool{} var cityID2Open = [12000]bool{}
func init() { func init() {
readConfig() readConfig()
for i:=0;i<len(cityID2Open);i++ { for i:=0;i<len(cityID2Open);i++ {
if city i is opened in configs { if city i is opened in configs {
cityID2Open[i] = true cityID2Open[i] = true
} }
} }
} }
func isPassed(cityID int) bool { func isPassed(cityID int) bool {
return cityID2Open[cityID] return cityID2Open[cityID]
} }
``` ```
@ -142,20 +142,19 @@ func isPassed(cityID int) bool {
var cityID2Open = map[int]struct{}{} var cityID2Open = map[int]struct{}{}
func init() { func init() {
readConfig() readConfig()
for _, city := range openCities { for _, city := range openCities {
cityID2Open[city] = struct{}{} cityID2Open[city] = struct{}{}
} }
} }
func isPassed(cityID int) bool { func isPassed(cityID int) bool {
if _, ok := cityID2Open[cityID]; ok { if _, ok := cityID2Open[cityID]; ok {
return true return true
} }
return false return false
} }
``` ```
按白名单、按业务线、按 UA、按分发渠道发布本质上和按城市发布是一样的这里就不再赘述了。 按白名单、按业务线、按 UA、按分发渠道发布本质上和按城市发布是一样的这里就不再赘述了。
@ -165,20 +164,20 @@ func isPassed(cityID int) bool {
```go ```go
func init() { func init() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
} }
// rate 为 0~100 // rate 为 0~100
func isPassed(rate int) bool { func isPassed(rate int) bool {
if rate >= 100 { if rate >= 100 {
return true return true
} }
if rate > 0 && rand.Int(100) > rate { if rate > 0 && rand.Int(100) > rate {
return true return true
} }
return false return false
} }
``` ```
@ -193,28 +192,30 @@ hash.go:
```go ```go
package main package main
import "crypto/md5" import (
import "crypto/sha1" "crypto/md5"
import "github.com/spaolacci/murmur3" "crypto/sha1"
"github.com/spaolacci/murmur3"
)
var str = "hello world" var str = "hello world"
func md5Hash() [16]byte { func md5Hash() [16]byte {
return md5.Sum([]byte(str)) return md5.Sum([]byte(str))
} }
func sha1Hash() [20]byte { func sha1Hash() [20]byte {
return sha1.Sum([]byte(str)) return sha1.Sum([]byte(str))
} }
func murmur32() uint32 { func murmur32() uint32 {
return murmur3.Sum32([]byte(str)) return murmur3.Sum32([]byte(str))
} }
func murmur64() uint64 { func murmur64() uint64 {
return murmur3.Sum64([]byte(str)) return murmur3.Sum64([]byte(str))
} }
``` ```
hash_test.go hash_test.go
@ -225,27 +226,27 @@ package main
import "testing" import "testing"
func BenchmarkMD5(b *testing.B) { func BenchmarkMD5(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
md5Hash() md5Hash()
} }
} }
func BenchmarkSHA1(b *testing.B) { func BenchmarkSHA1(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
sha1Hash() sha1Hash()
} }
} }
func BenchmarkMurmurHash32(b *testing.B) { func BenchmarkMurmurHash32(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
murmur32() murmur32()
} }
} }
func BenchmarkMurmurHash64(b *testing.B) { func BenchmarkMurmurHash64(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
murmur64() murmur64()
} }
} }
``` ```
@ -254,12 +255,12 @@ func BenchmarkMurmurHash64(b *testing.B) {
~/t/g/hash_bench git:master go test -bench=. ~/t/g/hash_bench git:master go test -bench=.
goos: darwin goos: darwin
goarch: amd64 goarch: amd64
BenchmarkMD5-4 10000000 180 ns/op BenchmarkMD5-4 10000000 180 ns/op
BenchmarkSHA1-4 10000000 211 ns/op BenchmarkSHA1-4 10000000 211 ns/op
BenchmarkMurmurHash32-4 50000000 25.7 ns/op BenchmarkMurmurHash32-4 50000000 25.7 ns/op
BenchmarkMurmurHash64-4 20000000 66.2 ns/op BenchmarkMurmurHash64-4 20000000 66.2 ns/op
PASS PASS
ok _/Users/caochunhui/test/go/hash_bench 7.050s ok _/Users/caochunhui/test/go/hash_bench 7.050s
``` ```
可见 murmurhash 相比其它的算法有三倍以上的性能提升。 可见 murmurhash 相比其它的算法有三倍以上的性能提升。
@ -274,29 +275,30 @@ ok _/Users/caochunhui/test/go/hash_bench 7.050s
package main package main
import ( import (
"fmt" "fmt"
"github.com/spaolacci/murmur3" "github.com/spaolacci/murmur3"
) )
var bucketSize = 10 var bucketSize = 10
func main() { func main() {
var bucketMap = map[uint64]int{} var bucketMap = map[uint64]int{}
for i := 15000000000; i < 15000000000+10000000; i++ { for i := 15000000000; i < 15000000000+10000000; i++ {
hashInt := murmur64(fmt.Sprint(i)) % uint64(bucketSize) hashInt := murmur64(fmt.Sprint(i)) % uint64(bucketSize)
bucketMap[hashInt]++ bucketMap[hashInt]++
} }
fmt.Println(bucketMap) fmt.Println(bucketMap)
} }
func murmur64(p string) uint64 { func murmur64(p string) uint64 {
return murmur3.Sum64([]byte(p)) return murmur3.Sum64([]byte(p))
} }
``` ```
```shell ```shell
map[7:999475 5:1000359 1:999945 6:1000200 3:1000193 9:1000765 2:1000044 4:1000343 8:1000823 0:997853] map[7:999475 5:1000359 1:999945 6:1000200 3:1000193 9:1000765 2:1000044 \
4:1000343 8:1000823 0:997853]
``` ```
偏差基本都在 1/100 以内,是可以接受的。 偏差基本都在 1/100 以内,是可以接受的。

View File

@ -7,7 +7,6 @@
Twitter 的 snowflake 算法是这种场景下的一个典型解法。先来看看 snowflake 是怎么一回事: Twitter 的 snowflake 算法是这种场景下的一个典型解法。先来看看 snowflake 是怎么一回事:
``` ```
datacenter_id sequence_id datacenter_id sequence_id
unused unused
│ │ │ │
@ -28,7 +27,6 @@ Twitter 的 snowflake 算法是这种场景下的一个典型解法。先来看
│ │ │ │
time in milliseconds worker_id time in milliseconds worker_id
``` ```
首先确定我们的数值是 64 位int64 类型,被划分为四部分,不含开头的第一个 bit因为这个 bit 是符号位。用 41 位来表示收到请求时的时间戳,单位为毫秒,然后五位来表示数据中心的 id然后再五位来表示机器的实例 id最后是 12 位的循环自增 id(到达 1111 1111 1111 后会归 0)。 首先确定我们的数值是 64 位int64 类型,被划分为四部分,不含开头的第一个 bit因为这个 bit 是符号位。用 41 位来表示收到请求时的时间戳,单位为毫秒,然后五位来表示数据中心的 id然后再五位来表示机器的实例 id最后是 12 位的循环自增 id(到达 1111 1111 1111 后会归 0)。
@ -82,42 +80,46 @@ mysql> select last_insert_id();
package main package main
import ( import (
"fmt" "fmt"
"os" "os"
"github.com/bwmarrin/snowflake" "github.com/bwmarrin/snowflake"
) )
func main() { func main() {
n, err := snowflake.NewNode(1) n, err := snowflake.NewNode(1)
if err != nil { if err != nil {
println(err) println(err)
os.Exit(1) os.Exit(1)
} }
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
id := n.Generate() id := n.Generate()
fmt.Println("id", id) fmt.Println("id", id)
fmt.Println("node: ", id.Node(), "step: ", id.Step(), "time: ", id.Time(), "\n") fmt.Println(
} "node: ", id.Node(),
"step: ", id.Step(),
"time: ", id.Time(),
"\n",
)
}
} }
``` ```
当然,这个库也给我们留好了定制的后路: 当然,这个库也给我们留好了定制的后路:
```go ```go
// Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC // Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC
// You may customize this to set a different epoch for your application. // You may customize this to set a different epoch for your application.
Epoch int64 = 1288834974657 Epoch int64 = 1288834974657
// Number of bits to use for Node // Number of bits to use for Node
// Remember, you have a total 22 bits to share between Node/Step // Remember, you have a total 22 bits to share between Node/Step
NodeBits uint8 = 10 NodeBits uint8 = 10
// Number of bits to use for Step // Number of bits to use for Step
// Remember, you have a total 22 bits to share between Node/Step // Remember, you have a total 22 bits to share between Node/Step
StepBits uint8 = 12 StepBits uint8 = 12
``` ```
Epoch 就是本节开头讲的起始时间NodeBits 指的是机器编号的位长StepBits 指的是自增序列的位长。 Epoch 就是本节开头讲的起始时间NodeBits 指的是机器编号的位长StepBits 指的是自增序列的位长。
@ -144,9 +146,9 @@ Settings 数据结构如下:
```go ```go
type Settings struct { type Settings struct {
StartTime time.Time StartTime time.Time
MachineID func() (uint16, error) MachineID func() (uint16, error)
CheckMachineID func(uint16) bool CheckMachineID func(uint16) bool
} }
``` ```
@ -169,57 +171,56 @@ redis 127.0.0.1:6379> SADD base64_encoding_of_last16bits MzI0Mgo=
package main package main
import ( import (
"fmt" "fmt"
"os" "os"
"time" "time"
"github.com/sony/sonyflake" "github.com/sony/sonyflake"
) )
func getMachineID() (uint16, error) { func getMachineID() (uint16, error) {
var machineID uint16 var machineID uint16
var err error var err error
machineID = readMachineIDFromLocalFile() machineID = readMachineIDFromLocalFile()
if machineID == 0 { if machineID == 0 {
machineID, err = generateMachineID() machineID, err = generateMachineID()
if err != nil { if err != nil {
return 0, err return 0, err
} }
} }
return machineID, nil return machineID, nil
} }
func checkMachineID(machineID uint16) bool { func checkMachineID(machineID uint16) bool {
saddResult, err := saddMachineIDToRedisSet() saddResult, err := saddMachineIDToRedisSet()
if err != nil || saddResult == 0 { if err != nil || saddResult == 0 {
return true return true
} }
err := saveMachineIDToLocalFile(machineID) err := saveMachineIDToLocalFile(machineID)
if err != nil { if err != nil {
return true return true
} }
return false return false
} }
func main() { func main() {
t, _ := time.Parse("2006-01-02", "2018-01-01") t, _ := time.Parse("2006-01-02", "2018-01-01")
settings := sonyflake.Settings{ settings := sonyflake.Settings{
StartTime: t, StartTime: t,
MachineID: getMachineID, MachineID: getMachineID,
CheckMachineID: checkMachineID, CheckMachineID: checkMachineID,
} }
sf := sonyflake.NewSonyflake(settings) sf := sonyflake.NewSonyflake(settings)
id, err := sf.NextID() id, err := sf.NextID()
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
} }
fmt.Println(id) fmt.Println(id)
} }
``` ```

View File

@ -6,24 +6,24 @@
package main package main
import ( import (
"sync" "sync"
) )
// 全局变量 // 全局变量
var counter int var counter int
func main() { func main() {
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
counter++ counter++
}() }()
} }
wg.Wait() wg.Wait()
println(counter) println(counter)
} }
``` ```
@ -47,13 +47,13 @@ func main() {
var wg sync.WaitGroup var wg sync.WaitGroup
var l sync.Mutex var l sync.Mutex
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
l.Lock() l.Lock()
counter++ counter++
l.Unlock() l.Unlock()
}() }()
} }
wg.Wait() wg.Wait()
@ -74,58 +74,58 @@ println(counter)
package main package main
import ( import (
"sync" "sync"
) )
// Lock try lock // Lock try lock
type Lock struct { type Lock struct {
c chan struct{} c chan struct{}
} }
// NewLock generate a try lock // NewLock generate a try lock
func NewLock() Lock { func NewLock() Lock {
var l Lock var l Lock
l.c = make(chan struct{}, 1) l.c = make(chan struct{}, 1)
l.c <- struct{}{} l.c <- struct{}{}
return l return l
} }
// Lock try lock, return lock result // Lock try lock, return lock result
func (l Lock) Lock() bool { func (l Lock) Lock() bool {
lockResult := false lockResult := false
select { select {
case <-l.c: case <-l.c:
lockResult = true lockResult = true
default: default:
} }
return lockResult return lockResult
} }
// Unlock , Unlock the try lock // Unlock , Unlock the try lock
func (l Lock) Unlock() { func (l Lock) Unlock() {
l.c <- struct{}{} l.c <- struct{}{}
} }
var counter int var counter int
func main() { func main() {
var l = NewLock() var l = NewLock()
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
if !l.Lock() { if !l.Lock() {
// log error // log error
println("lock failed") println("lock failed")
return return
} }
counter++ counter++
println("current counter", counter) println("current counter", counter)
l.Unlock() l.Unlock()
}() }()
} }
wg.Wait() wg.Wait()
} }
``` ```
@ -141,67 +141,66 @@ func main() {
package main package main
import ( import (
"fmt" "fmt"
"sync" "sync"
"time" "time"
"github.com/go-redis/redis" "github.com/go-redis/redis"
) )
func incr() { func incr() {
client := redis.NewClient(&redis.Options{ client := redis.NewClient(&redis.Options{
Addr: "localhost:6379", Addr: "localhost:6379",
Password: "", // no password set Password: "", // no password set
DB: 0, // use default DB DB: 0, // use default DB
}) })
var lockKey = "counter_lock" var lockKey = "counter_lock"
var counterKey = "counter" var counterKey = "counter"
// lock // lock
resp := client.SetNX(lockKey, 1, time.Second*5) resp := client.SetNX(lockKey, 1, time.Second*5)
lockSuccess, err := resp.Result() lockSuccess, err := resp.Result()
if err != nil || !lockSuccess { if err != nil || !lockSuccess {
fmt.Println(err, "lock result: ", lockSuccess) fmt.Println(err, "lock result: ", lockSuccess)
return return
} }
// counter ++ // counter ++
getResp := client.Get(counterKey) getResp := client.Get(counterKey)
cntValue, err := getResp.Int64() cntValue, err := getResp.Int64()
if err == nil { if err == nil {
cntValue++ cntValue++
resp := client.Set(counterKey, cntValue, 0) resp := client.Set(counterKey, cntValue, 0)
_, err := resp.Result() _, err := resp.Result()
if err != nil { if err != nil {
// log err // log err
println("set value error!") println("set value error!")
} }
} }
println("current counter is ", cntValue) println("current counter is ", cntValue)
delResp := client.Del(lockKey) delResp := client.Del(lockKey)
unlockSuccess, err := delResp.Result() unlockSuccess, err := delResp.Result()
if err == nil && unlockSuccess > 0 { if err == nil && unlockSuccess > 0 {
println("unlock success!") println("unlock success!")
} else { } else {
println("unlock failed", err) println("unlock failed", err)
} }
} }
func main() { func main() {
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
incr() incr()
}() }()
} }
wg.Wait() wg.Wait()
} }
``` ```
看看运行结果: 看看运行结果:
@ -233,28 +232,28 @@ setnx 很适合在高并发场景下,用来争抢一些“唯一”的资源
package main package main
import ( import (
"time" "time"
"github.com/samuel/go-zookeeper/zk" "github.com/samuel/go-zookeeper/zk"
) )
func main() { func main() {
c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second) //*10) c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second) //*10)
if err != nil { if err != nil {
panic(err) panic(err)
} }
l := zk.NewLock(c, "/lock", zk.WorldACL(zk.PermAll)) l := zk.NewLock(c, "/lock", zk.WorldACL(zk.PermAll))
err = l.Lock() err = l.Lock()
if err != nil { if err != nil {
panic(err) panic(err)
} }
println("lock succ, do your business logic") println("lock succ, do your business logic")
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
// do some thing // do some thing
l.Unlock() l.Unlock()
println("unlock succ, finish business logic") println("unlock succ, finish business logic")
} }
``` ```
@ -270,34 +269,33 @@ func main() {
package main package main
import ( import (
"log" "log"
"github.com/zieckey/etcdsync" "github.com/zieckey/etcdsync"
) )
func main() { func main() {
m, err := etcdsync.New("/lock", 10, []string{"http://127.0.0.1:2379"}) m, err := etcdsync.New("/lock", 10, []string{"http://127.0.0.1:2379"})
if m == nil || err != nil { if m == nil || err != nil {
log.Printf("etcdsync.New failed") log.Printf("etcdsync.New failed")
return return
} }
err = m.Lock() err = m.Lock()
if err != nil { if err != nil {
log.Printf("etcdsync.Lock failed") log.Printf("etcdsync.Lock failed")
return return
} }
log.Printf("etcdsync.Lock OK") log.Printf("etcdsync.Lock OK")
log.Printf("Get the lock. Do something here.") log.Printf("Get the lock. Do something here.")
err = m.Unlock() err = m.Unlock()
if err != nil { if err != nil {
log.Printf("etcdsync.Unlock failed") log.Printf("etcdsync.Unlock failed")
} else { } else {
log.Printf("etcdsync.Unlock OK") log.Printf("etcdsync.Unlock OK")
} }
} }
``` ```
etcd 中没有像 ZooKeeper 那样的 sequence 节点。所以其锁实现和基于 ZooKeeper 实现的有所不同。在上述示例代码中使用的 etcdsync 的 Lock 流程是: etcd 中没有像 ZooKeeper 那样的 sequence 节点。所以其锁实现和基于 ZooKeeper 实现的有所不同。在上述示例代码中使用的 etcdsync 的 Lock 流程是:
@ -313,56 +311,57 @@ etcd 中没有像 ZooKeeper 那样的 sequence 节点。所以其锁实现和基
package main package main
import ( import (
"fmt" "fmt"
"time" "time"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
"gopkg.in/redsync.v1" "gopkg.in/redsync.v1"
) )
func newPool(server string) *redis.Pool { func newPool(server string) *redis.Pool {
return &redis.Pool{ return &redis.Pool{
MaxIdle: 3, MaxIdle: 3,
IdleTimeout: 240 * time.Second, IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) { Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", server) c, err := redis.Dial("tcp", server)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return c, err return c, err
}, },
TestOnBorrow: func(c redis.Conn, t time.Time) error { TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING") _, err := c.Do("PING")
return err return err
}, },
} }
} }
func newPools(servers []string) []redsync.Pool { func newPools(servers []string) []redsync.Pool {
pools := []redsync.Pool{} pools := []redsync.Pool{}
for _, server := range servers { for _, server := range servers {
pool := newPool(server) pool := newPool(server)
pools = append(pools, pool) pools = append(pools, pool)
} }
return pools return pools
} }
func main() { func main() {
pools := newPools([]string{"127.0.0.1:6379", "127.0.0.1:6378", "127.0.0.1:6377"}) pools := newPools([]string{
rs := redsync.New(pools) "127.0.0.1:6379", "127.0.0.1:6378", "127.0.0.1:6377",
m := rs.NewMutex("/lock") })
rs := redsync.New(pools)
err := m.Lock() m := rs.NewMutex("/lock")
if err != nil {
panic(err)
}
fmt.Println("lock success")
unlockRes := m.Unlock()
fmt.Println("unlock result: ", unlockRes)
err := m.Lock()
if err != nil {
panic(err)
}
fmt.Println("lock success")
unlockRes := m.Unlock()
fmt.Println("unlock result: ", unlockRes)
} }
``` ```

View File

@ -57,19 +57,22 @@ elasticsearch 是开源分布式搜索引擎的霸主,其依赖于 Lucene 实
```go ```go
func equal() { func equal() {
if postEntry.docID of '天气' == postEntry.docID of '气很' && postEntry.offset + 1 of '天气' == postEntry.offset of '气很' { if postEntry.docID of '天气' == postEntry.docID of '气很' &&
return true postEntry.offset + 1 of '天气' == postEntry.offset of '气很' {
} return true
}
if postEntry.docID of '气很' == postEntry.docID of '很好' && postEntry.offset + 1 of '气很' == postEntry.offset of '很好' { if postEntry.docID of '气很' == postEntry.docID of '很好' &&
return true postEntry.offset + 1 of '气很' == postEntry.offset of '很好' {
} return true
}
if postEntry.docID of '天气' == postEntry.docID of '很好' && postEntry.offset + 2 of '天气' == postEntry.offset of '很好' { if postEntry.docID of '天气' == postEntry.docID of '很好' &&
return true postEntry.offset + 2 of '天气' == postEntry.offset of '很好' {
} return true
}
return false return false
} }
``` ```
@ -171,7 +174,7 @@ if field_1 == 1 && field_2 == 2 && field_3 == 3 && field_4 == 4 {
```go ```go
if field_1 == 1 || field_2 == 2 { if field_1 == 1 || field_2 == 2 {
return true return true
} }
``` ```
@ -193,21 +196,21 @@ es 的 Bool Query 方案,实际上就是用 json 来表达了这种程序语
// 选用 elastic 版本时 // 选用 elastic 版本时
// 注意与自己使用的 elasticsearch 要对应 // 注意与自己使用的 elasticsearch 要对应
import ( import (
elastic "gopkg.in/olivere/elastic.v3" elastic "gopkg.in/olivere/elastic.v3"
) )
var esClient *elastic.Client var esClient *elastic.Client
func initElasticsearchClient(host string, port string) { func initElasticsearchClient(host string, port string) {
var err error var err error
esClient, err = elastic.NewClient( esClient, err = elastic.NewClient(
elastic.SetURL(fmt.Sprintf("http://%s:%s", host, port)), elastic.SetURL(fmt.Sprintf("http://%s:%s", host, port)),
elastic.SetMaxRetries(3), elastic.SetMaxRetries(3),
) )
if err != nil { if err != nil {
// log error // log error
} }
} }
``` ```
@ -216,62 +219,65 @@ func initElasticsearchClient(host string, port string) {
```go ```go
func insertDocument(db string, table string, obj map[string]interface{}) { func insertDocument(db string, table string, obj map[string]interface{}) {
id := obj["id"] id := obj["id"]
var indexName, typeName string var indexName, typeName string
// 数据库中的 database/table 概念,可以简单映射到 es 的 index 和 type // 数据库中的 database/table 概念,可以简单映射到 es 的 index 和 type
// 不过需要注意,因为 es 中的 _type 本质上只是 document 的一个字段 // 不过需要注意,因为 es 中的 _type 本质上只是 document 的一个字段
// 所以单个 index 内容过多会导致性能问题 // 所以单个 index 内容过多会导致性能问题
// 在新版本中 type 已经废弃 // 在新版本中 type 已经废弃
// 为了让不同表的数据落入不同的 index这里我们用 table+name 作为 index 的名字 // 为了让不同表的数据落入不同的 index这里我们用 table+name 作为 index 的名字
indexName = fmt.Sprintf("%v_%v", db, table) indexName = fmt.Sprintf("%v_%v", db, table)
typeName = table typeName = table
//正常情况 // 正常情况
res, err := esClient.Index().Index(indexName).Type(typeName).Id(id).BodyJson(obj).Do() res, err := esClient.Index().Index(indexName).Type(typeName).Id(id).BodyJson(obj).Do()
if err != nil { if err != nil {
// handle error // handle error
} else { } else {
// insert success // insert success
} }
} }
``` ```
获取: 获取:
```go ```go
func query(indexName string, typeName string) (*elastic.SearchResult, error) { func query(indexName string, typeName string) (*elastic.SearchResult, error) {
// 通过 bool must 和 bool should 添加 bool 查询条件 // 通过 bool must 和 bool should 添加 bool 查询条件
q := elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("id", 1), q := elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("id", 1),
elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("male", "m"))) elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("male", "m")))
q = q.Should(elastic.NewMatchPhraseQuery("name", "alex"), q = q.Should(
elastic.NewMatchPhraseQuery("name", "xargin")) elastic.NewMatchPhraseQuery("name", "alex"),
elastic.NewMatchPhraseQuery("name", "xargin"),
)
searchService := esClient.Search(indexName).Type(typeName) searchService := esClient.Search(indexName).Type(typeName)
res, err := searchService.Query(q).Do() res, err := searchService.Query(q).Do()
if err != nil { if err != nil {
// log error // log error
return nil, err return nil, err
} }
return res, nil return res, nil
} }
``` ```
删除: 删除:
```go ```go
func deleteDocument(indexName string, typeName string, obj map[string]interface{}) { func deleteDocument(
id := obj["id"] indexName string, typeName string, obj map[string]interface{},
) {
id := obj["id"]
res, err := esClient.Delete().Index(indexName).Type(typeName).Id(id).Do() res, err := esClient.Delete().Index(indexName).Type(typeName).Id(id).Do()
if err != nil { if err != nil {
// handle error // handle error
} else { } else {
// delete success // delete success
} }
} }
``` ```
@ -284,7 +290,9 @@ func deleteDocument(indexName string, typeName string, obj map[string]interface{
比如我们有一段 bool 表达式user_id = 1 and (product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1),写成 SQL 是如下形式: 比如我们有一段 bool 表达式user_id = 1 and (product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1),写成 SQL 是如下形式:
```sql ```sql
select * from xxx where user_id = 1 and (product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1) select * from xxx where user_id = 1 and (
product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1
)
``` ```
写成 es 的 DSL 是如下形式: 写成 es 的 DSL 是如下形式:
@ -399,7 +407,9 @@ select * from wms_orders where update_time >= date_sub(now(), interval 10 minute
当然,考虑到边界情况,我们可以让这个时间段的数据与前一次的有一些重叠: 当然,考虑到边界情况,我们可以让这个时间段的数据与前一次的有一些重叠:
```sql ```sql
select * from wms_orders where update_time >= date_sub(now(), interval 11 minute); select * from wms_orders where update_time >= date_sub(
now(), interval 11 minute
);
``` ```
取最近 11 分钟有变动的数据覆盖更新到 es 中。这种方案的缺点显而易见,我们必须要求业务数据严格遵守一定的规范。比如这里的,必须要有 update_time 字段,并且每次创建和更新都要保证该字段有正确的时间值。否则我们的同步逻辑就会丢失数据。 取最近 11 分钟有变动的数据覆盖更新到 es 中。这种方案的缺点显而易见,我们必须要求业务数据严格遵守一定的规范。比如这里的,必须要有 update_time 字段,并且每次创建和更新都要保证该字段有正确的时间值。否则我们的同步逻辑就会丢失数据。

View File

@ -22,48 +22,47 @@
```go ```go
var endpoints = []string { var endpoints = []string {
"100.69.62.1:3232", "100.69.62.1:3232",
"100.69.62.32:3232", "100.69.62.32:3232",
"100.69.62.42:3232", "100.69.62.42:3232",
"100.69.62.81:3232", "100.69.62.81:3232",
"100.69.62.11:3232", "100.69.62.11:3232",
"100.69.62.113:3232", "100.69.62.113:3232",
"100.69.62.101:3232", "100.69.62.101:3232",
} }
// 重点在这个 shuffle // 重点在这个 shuffle
func shuffle(slice []int) { func shuffle(slice []int) {
for i := 0; i < len(slice); i++ { for i := 0; i < len(slice); i++ {
a := rand.Intn(len(slice)) a := rand.Intn(len(slice))
b := rand.Intn(len(slice)) b := rand.Intn(len(slice))
slice[a], slice[b] = slice[b], slice[a] slice[a], slice[b] = slice[b], slice[a]
} }
} }
func request(params map[string]interface{}) error { func request(params map[string]interface{}) error {
var indexes = []int {0,1,2,3,4,5,6} var indexes = []int {0,1,2,3,4,5,6}
var err error var err error
shuffle(indexes) shuffle(indexes)
maxRetryTimes := 3 maxRetryTimes := 3
idx := 0 idx := 0
for i := 0; i < maxRetryTimes; i++ { for i := 0; i < maxRetryTimes; i++ {
err = apiRequest(params, indexes[idx]) err = apiRequest(params, indexes[idx])
if err == nil { if err == nil {
break break
} }
idx++ idx++
} }
if err != nil { if err != nil {
// logging // logging
return err return err
} }
return nil return nil
} }
``` ```
我们循环一遍 slice两两交换这个和我们平常打牌时常用的洗牌方法类似。看起来没有什么问题。 我们循环一遍 slice两两交换这个和我们平常打牌时常用的洗牌方法类似。看起来没有什么问题。
@ -86,11 +85,11 @@ func request(params map[string]interface{}) error {
```go ```go
func shuffle(indexes []int) { func shuffle(indexes []int) {
for i:=len(indexes); i>0; i-- { for i:=len(indexes); i>0; i-- {
lastIdx := i - 1 lastIdx := i - 1
idx := rand.Int(i) idx := rand.Int(i)
indexes[lastIdx], indexes[idx] = indexes[idx], indexes[lastIdx] indexes[lastIdx], indexes[idx] = indexes[idx], indexes[lastIdx]
} }
} }
``` ```
@ -98,8 +97,8 @@ func shuffle(indexes []int) {
```go ```go
func shuffle(n int) []int { func shuffle(n int) []int {
b := rand.Perm(n) b := rand.Perm(n)
return b return b
} }
``` ```
@ -125,49 +124,48 @@ rand.Seed(time.Now().UnixNano())
package main package main
import ( import (
"fmt" "fmt"
"math/rand" "math/rand"
"time" "time"
) )
func init() { func init() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
} }
func shuffle1(slice []int) { func shuffle1(slice []int) {
for i := 0; i < len(slice); i++ { for i := 0; i < len(slice); i++ {
a := rand.Intn(len(slice)) a := rand.Intn(len(slice))
b := rand.Intn(len(slice)) b := rand.Intn(len(slice))
slice[a], slice[b] = slice[b], slice[a] slice[a], slice[b] = slice[b], slice[a]
} }
} }
func shuffle2(indexes []int) { func shuffle2(indexes []int) {
for i := len(indexes); i > 0; i-- { for i := len(indexes); i > 0; i-- {
lastIdx := i - 1 lastIdx := i - 1
idx := rand.Intn(i) idx := rand.Intn(i)
indexes[lastIdx], indexes[idx] = indexes[idx], indexes[lastIdx] indexes[lastIdx], indexes[idx] = indexes[idx], indexes[lastIdx]
} }
} }
func main() { func main() {
var cnt1 = map[int]int{} var cnt1 = map[int]int{}
for i := 0; i < 1000000; i++ { for i := 0; i < 1000000; i++ {
var sl = []int{0, 1, 2, 3, 4, 5, 6} var sl = []int{0, 1, 2, 3, 4, 5, 6}
shuffle1(sl) shuffle1(sl)
cnt1[sl[0]]++ cnt1[sl[0]]++
} }
var cnt2 = map[int]int{} var cnt2 = map[int]int{}
for i := 0; i < 1000000; i++ { for i := 0; i < 1000000; i++ {
var sl = []int{0, 1, 2, 3, 4, 5, 6} var sl = []int{0, 1, 2, 3, 4, 5, 6}
shuffle2(sl) shuffle2(sl)
cnt2[sl[0]]++ cnt2[sl[0]]++
} }
fmt.Println(cnt1, "\n", cnt2) fmt.Println(cnt1, "\n", cnt2)
} }
``` ```
输出: 输出:

View File

@ -31,12 +31,12 @@
```shell ```shell
etcdctl get /configs/remote_config.json etcdctl get /configs/remote_config.json
{ {
"addr" : "127.0.0.1:1080", "addr" : "127.0.0.1:1080",
"aes_key" : "01B345B7A9ABC00F0123456789ABCDAF", "aes_key" : "01B345B7A9ABC00F0123456789ABCDAF",
"https" : false, "https" : false,
"secret" : "", "secret" : "",
"private_key_path" : "", "private_key_path" : "",
"cert_file_path" : "" "cert_file_path" : ""
} }
``` ```
@ -44,9 +44,9 @@ etcdctl get /configs/remote_config.json
```go ```go
cfg := client.Config{ cfg := client.Config{
Endpoints: []string{"http://127.0.0.1:2379"}, Endpoints: []string{"http://127.0.0.1:2379"},
Transport: client.DefaultTransport, Transport: client.DefaultTransport,
HeaderTimeoutPerRequest: time.Second, HeaderTimeoutPerRequest: time.Second,
} }
``` ```
@ -57,10 +57,10 @@ cfg := client.Config{
```go ```go
resp, err = kapi.Get(context.Background(), "/path/to/your/config", nil) resp, err = kapi.Get(context.Background(), "/path/to/your/config", nil)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} else { } else {
log.Printf("Get is done. Metadata is %q\n", resp) log.Printf("Get is done. Metadata is %q\n", resp)
log.Printf("%q key has %q value\n", resp.Node.Key, resp.Node.Value) log.Printf("%q key has %q value\n", resp.Node.Key, resp.Node.Value)
} }
``` ```
@ -72,11 +72,11 @@ if err != nil {
kapi := client.NewKeysAPI(c) kapi := client.NewKeysAPI(c)
w := kapi.Watcher("/path/to/your/config", nil) w := kapi.Watcher("/path/to/your/config", nil)
go func() { go func() {
for { for {
resp, err := w.Next(context.Background()) resp, err := w.Next(context.Background())
log.Println(resp, err) log.Println(resp, err)
log.Println("new values is ", resp.Node.Value) log.Println("new values is ", resp.Node.Value)
} }
}() }()
``` ```
@ -88,79 +88,79 @@ go func() {
package main package main
import ( import (
"log" "log"
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/coreos/etcd/client" "github.com/coreos/etcd/client"
) )
var configPath = `/configs/remote_config.json` var configPath = `/configs/remote_config.json`
var kapi client.KeysAPI var kapi client.KeysAPI
type ConfigStruct struct { type ConfigStruct struct {
Addr string `json:"addr"` Addr string `json:"addr"`
AesKey string `json:"aes_key"` AesKey string `json:"aes_key"`
HTTPS bool `json:"https"` HTTPS bool `json:"https"`
Secret string `json:"secret"` Secret string `json:"secret"`
PrivateKeyPath string `json:"private_key_path"` PrivateKeyPath string `json:"private_key_path"`
CertFilePath string `json:"cert_file_path"` CertFilePath string `json:"cert_file_path"`
} }
var appConfig ConfigStruct var appConfig ConfigStruct
func init() { func init() {
cfg := client.Config{ cfg := client.Config{
Endpoints: []string{"http://127.0.0.1:2379"}, Endpoints: []string{"http://127.0.0.1:2379"},
Transport: client.DefaultTransport, Transport: client.DefaultTransport,
HeaderTimeoutPerRequest: time.Second, HeaderTimeoutPerRequest: time.Second,
} }
c, err := client.New(cfg) c, err := client.New(cfg)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
kapi = client.NewKeysAPI(c) kapi = client.NewKeysAPI(c)
initConfig() initConfig()
} }
func watchAndUpdate() { func watchAndUpdate() {
w := kapi.Watcher(configPath, nil) w := kapi.Watcher(configPath, nil)
go func() { go func() {
// watch 该节点下的每次变化 // watch 该节点下的每次变化
for { for {
resp, err := w.Next(context.Background()) resp, err := w.Next(context.Background())
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
log.Println("new values is ", resp.Node.Value) log.Println("new values is ", resp.Node.Value)
err = json.Unmarshal([]byte(resp.Node.Value), &appConfig) err = json.Unmarshal([]byte(resp.Node.Value), &appConfig)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
}() }()
} }
func initConfig() { func initConfig() {
resp, err = kapi.Get(context.Background(), configPath, nil) resp, err = kapi.Get(context.Background(), configPath, nil)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
err := json.Unmarshal(resp.Node.Value, &appConfig) err := json.Unmarshal(resp.Node.Value, &appConfig)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
func getConfig() ConfigStruct { func getConfig() ConfigStruct {
return appConfig return appConfig
} }
func main() { func main() {
// init your app // init your app
} }
``` ```

View File

@ -16,56 +16,55 @@
package main package main
import ( import (
"fmt" "fmt"
"regexp" "regexp"
"time" "time"
"github.com/gocolly/colly" "github.com/gocolly/colly"
) )
var visited = map[string]bool{} var visited = map[string]bool{}
func main() { func main() {
// Instantiate default collector // Instantiate default collector
c := colly.NewCollector( c := colly.NewCollector(
colly.AllowedDomains("www.v2ex.com"), colly.AllowedDomains("www.v2ex.com"),
colly.MaxDepth(1), colly.MaxDepth(1),
) )
detailRegex, _ := regexp.Compile(`/go/go\?p=\d+$`) detailRegex, _ := regexp.Compile(`/go/go\?p=\d+$`)
listRegex, _ := regexp.Compile(`/t/\d+#\w+`) listRegex, _ := regexp.Compile(`/t/\d+#\w+`)
// On every a element which has href attribute call callback // On every a element which has href attribute call callback
c.OnHTML("a[href]", func(e *colly.HTMLElement) { c.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Attr("href") link := e.Attr("href")
// 已访问过的详情页或列表页,跳过 // 已访问过的详情页或列表页,跳过
if visited[link] && (detailRegex.Match([]byte(link)) || listRegex.Match([]byte(link))) { if visited[link] && (detailRegex.Match([]byte(link)) || listRegex.Match([]byte(link))) {
return return
} }
// 匹配下列两种 url 模式的,才去 visit // 匹配下列两种 url 模式的,才去 visit
// https://www.v2ex.com/go/go?p=2 // https://www.v2ex.com/go/go?p=2
// https://www.v2ex.com/t/472945#reply3 // https://www.v2ex.com/t/472945#reply3
if !detailRegex.Match([]byte(link)) && !listRegex.Match([]byte(link)) { if !detailRegex.Match([]byte(link)) && !listRegex.Match([]byte(link)) {
println("not match", link) println("not match", link)
return return
} }
time.Sleep(time.Second) time.Sleep(time.Second)
println("match", link) println("match", link)
visited[link] = true visited[link] = true
time.Sleep(time.Millisecond * 2) time.Sleep(time.Millisecond * 2)
c.Visit(e.Request.AbsoluteURL(link)) c.Visit(e.Request.AbsoluteURL(link))
}) })
err := c.Visit("https://www.v2ex.com/go/go") err := c.Visit("https://www.v2ex.com/go/go")
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
} }
``` ```
## 分布式爬虫 ## 分布式爬虫
@ -108,8 +107,8 @@ nats 的服务端项目是 gnatsd客户端与 gnatsd 的通信方式为基于
```go ```go
nc, err := nats.Connect(nats.DefaultURL) nc, err := nats.Connect(nats.DefaultURL)
if err != nil { if err != nil {
// log error // log error
return return
} }
// 指定 subject 为 tasks消息内容随意 // 指定 subject 为 tasks消息内容随意
@ -127,8 +126,8 @@ nc.Flush()
```go ```go
nc, err := nats.Connect(nats.DefaultURL) nc, err := nats.Connect(nats.DefaultURL)
if err != nil { if err != nil {
// log error // log error
return return
} }
// queue subscribe 相当于在消费者之间进行任务分发的分支均衡 // queue subscribe 相当于在消费者之间进行任务分发的分支均衡
@ -136,19 +135,19 @@ if err != nil {
// nats 中的 queue 概念上类似于 kafka 中的 consumer group // nats 中的 queue 概念上类似于 kafka 中的 consumer group
sub, err := nc.QueueSubscribeSync("tasks", "workers") sub, err := nc.QueueSubscribeSync("tasks", "workers")
if err != nil { if err != nil {
// log error // log error
return return
} }
var msg *nats.Msg var msg *nats.Msg
for { for {
msg, err = sub.NextMsg(time.Hour * 10000) msg, err = sub.NextMsg(time.Hour * 10000)
if err != nil { if err != nil {
// log error // log error
break break
} }
// 正确地消费到了消息 // 正确地消费到了消息
// 可用 nats.Msg 对象处理任务 // 可用 nats.Msg 对象处理任务
} }
``` ```
@ -160,10 +159,10 @@ for {
package main package main
import ( import (
"fmt" "fmt"
"net/url" "net/url"
"github.com/gocolly/colly" "github.com/gocolly/colly"
) )
var domain2Collector = map[string]*colly.Collector{} var domain2Collector = map[string]*colly.Collector{}
@ -172,62 +171,62 @@ var maxDepth = 10
var natsURL = "nats://localhost:4222" var natsURL = "nats://localhost:4222"
func factory(urlStr string) *colly.Collector { func factory(urlStr string) *colly.Collector {
u, _ := url.Parse(urlStr) u, _ := url.Parse(urlStr)
return domain2Collector[u.Host] return domain2Collector[u.Host]
} }
func initV2exCollector() *colly.Collector { func initV2exCollector() *colly.Collector {
c := colly.NewCollector( c := colly.NewCollector(
colly.AllowedDomains("www.v2ex.com"), colly.AllowedDomains("www.v2ex.com"),
colly.MaxDepth(maxDepth), colly.MaxDepth(maxDepth),
) )
c.OnResponse(func(resp *colly.Response) { c.OnResponse(func(resp *colly.Response) {
// 做一些爬完之后的善后工作 // 做一些爬完之后的善后工作
// 比如页面已爬完的确认存进 MySQL // 比如页面已爬完的确认存进 MySQL
}) })
c.OnHTML("a[href]", func(e *colly.HTMLElement) { c.OnHTML("a[href]", func(e *colly.HTMLElement) {
// 基本的反爬虫策略 // 基本的反爬虫策略
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
// TODO, 正则 match 列表页的话,就 visit // TODO, 正则 match 列表页的话,就 visit
// TODO, 正则 match 落地页的话,就发消息队列 // TODO, 正则 match 落地页的话,就发消息队列
c.Visit(e.Request.AbsoluteURL(link)) c.Visit(e.Request.AbsoluteURL(link))
}) })
return c return c
} }
func initV2fxCollector() *colly.Collector { func initV2fxCollector() *colly.Collector {
c := colly.NewCollector( c := colly.NewCollector(
colly.AllowedDomains("www.v2fx.com"), colly.AllowedDomains("www.v2fx.com"),
colly.MaxDepth(maxDepth), colly.MaxDepth(maxDepth),
) )
c.OnHTML("a[href]", func(e *colly.HTMLElement) { c.OnHTML("a[href]", func(e *colly.HTMLElement) {
}) })
return c return c
} }
func init() { func init() {
domain2Collector["www.v2ex.com"] = initV2exCollector() domain2Collector["www.v2ex.com"] = initV2exCollector()
domain2Collector["www.v2fx.com"] = initV2fxCollector() domain2Collector["www.v2fx.com"] = initV2fxCollector()
var err error var err error
nc, err = nats.Connect(natsURL) nc, err = nats.Connect(natsURL)
if err != nil { if err != nil {
// log fatal // log fatal
os.Exit(1) os.Exit(1)
} }
} }
func main() { func main() {
urls := []string{"https://www.v2ex.com", "https://www.v2fx.com"} urls := []string{"https://www.v2ex.com", "https://www.v2fx.com"}
for _, url := range urls { for _, url := range urls {
instance := factory(url) instance := factory(url)
instance.Visit(url) instance.Visit(url)
} }
} }
``` ```
@ -238,10 +237,10 @@ func main() {
package main package main
import ( import (
"fmt" "fmt"
"net/url" "net/url"
"github.com/gocolly/colly" "github.com/gocolly/colly"
) )
var domain2Collector = map[string]*colly.Collector{} var domain2Collector = map[string]*colly.Collector{}
@ -250,73 +249,72 @@ var maxDepth = 10
var natsURL = "nats://localhost:4222" var natsURL = "nats://localhost:4222"
func factory(urlStr string) *colly.Collector { func factory(urlStr string) *colly.Collector {
u, _ := url.Parse(urlStr) u, _ := url.Parse(urlStr)
return domain2Collector[u.Host] return domain2Collector[u.Host]
} }
func initV2exCollector() *colly.Collector { func initV2exCollector() *colly.Collector {
c := colly.NewCollector( c := colly.NewCollector(
colly.AllowedDomains("www.v2ex.com"), colly.AllowedDomains("www.v2ex.com"),
colly.MaxDepth(maxDepth), colly.MaxDepth(maxDepth),
) )
return c return c
} }
func initV2fxCollector() *colly.Collector { func initV2fxCollector() *colly.Collector {
c := colly.NewCollector( c := colly.NewCollector(
colly.AllowedDomains("www.v2fx.com"), colly.AllowedDomains("www.v2fx.com"),
colly.MaxDepth(maxDepth), colly.MaxDepth(maxDepth),
) )
return c return c
} }
func init() { func init() {
domain2Collector["www.v2ex.com"] = initV2exCollector() domain2Collector["www.v2ex.com"] = initV2exCollector()
domain2Collector["www.v2fx.com"] = initV2fxCollector() domain2Collector["www.v2fx.com"] = initV2fxCollector()
var err error var err error
nc, err = nats.Connect(natsURL) nc, err = nats.Connect(natsURL)
if err != nil { if err != nil {
// log fatal // log fatal
os.Exit(1) os.Exit(1)
} }
} }
func startConsumer() { func startConsumer() {
nc, err := nats.Connect(nats.DefaultURL) nc, err := nats.Connect(nats.DefaultURL)
if err != nil { if err != nil {
// log error // log error
return return
} }
sub, err := nc.QueueSubscribeSync("tasks", "workers") sub, err := nc.QueueSubscribeSync("tasks", "workers")
if err != nil { if err != nil {
// log error // log error
return return
} }
var msg *nats.Msg var msg *nats.Msg
for { for {
msg, err = sub.NextMsg(time.Hour * 10000) msg, err = sub.NextMsg(time.Hour * 10000)
if err != nil { if err != nil {
// log error // log error
break break
} }
urlStr := string(msg.Data) urlStr := string(msg.Data)
ins := factory(urlStr) ins := factory(urlStr)
// 因为最下游拿到的一定是对应网站的落地页 // 因为最下游拿到的一定是对应网站的落地页
// 所以不用进行多余的判断了,直接爬内容即可 // 所以不用进行多余的判断了,直接爬内容即可
ins.Visit(urlStr) ins.Visit(urlStr)
} }
} }
func main() { func main() {
startConsumer() startConsumer()
} }
``` ```
从代码层面上来讲,这里的生产者和消费者其实本质上差不多。如果日后我们要灵活地支持增加、减少各种网站的爬取的话,应该思考如何将这些爬虫的策略、参数尽量地配置化。 从代码层面上来讲,这里的生产者和消费者其实本质上差不多。如果日后我们要灵活地支持增加、减少各种网站的爬取的话,应该思考如何将这些爬虫的策略、参数尽量地配置化。