diff --git a/LICENSE b/LICENSE index ca4c77642da66745cb4bc80b180758677a22e938..91cac9f03cf4256d4fef85b60ec589f0965ac6f9 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,7 @@ MIT License Copyright (c) 2017 Dmitri Shuralyov +Copyright (c) 2020 Hasura Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 6fa03bd4cdef5ac892a1b4d06065370d2cd5e204..0aeb27b8f88c1d3be9dd4209ff9dd12b333835a2 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ -graphql +go-graphql-client ======= -[](https://travis-ci.org/shurcooL/graphql) [](https://godoc.org/github.com/shurcooL/graphql) +[](https://travis-ci.org/hasura/go-graphql-client.svg?branch=master) [](https://pkg.go.dev/github.com/hasura/go-graphql-client) + +**Preface:** This is a fork of `https://github.com/shurcooL/graphql` with extended features (subscription client, named operation) Package `graphql` provides a GraphQL client implementation. @@ -12,10 +14,10 @@ For more information, see package [`github.com/shurcooL/githubv4`](https://githu Installation ------------ -`graphql` requires Go version 1.8 or later. +`go-graphql-client` requires Go version 1.13 or later. ```bash -go get -u github.com/shurcooL/graphql +go get -u github.com/hasura/go-graphql-client ``` Usage @@ -278,6 +280,130 @@ fmt.Printf("Created a %v star review: %v\n", m.CreateReview.Stars, m.CreateRevie // Created a 5 star review: This is a great movie! ``` +### Subcriptions + +Usage +----- + +Construct a Subscription client, specifying the GraphQL server URL. + +```Go +client := graphql.NewSubscriptionClient("wss://example.com/graphql") +defer client.Close() + +// Subscribe subscriptions +// ... +// finally run the client +client.Run() +``` + +#### Subscribe + +To make a GraphQL subscription, you need to define a corresponding Go type. + +For example, to make the following GraphQL query: + +```GraphQL +subscription { + me { + name + } +} +``` + +You can define this variable: + +```Go +var subscription struct { + Me struct { + Name graphql.String + } +} +``` + +Then call `client.Subscribe`, passing a pointer to it: + +```Go +subscriptionId, err := client.Subscribe(&query, nil, func(dataValue *json.RawMessage, errValue error) error { + if errValue != nil { + // handle error + // if returns error, it will failback to `onError` event + return nil + } + data := query{} + err := json.Unmarshal(dataValue, &data) + + fmt.Println(query.Me.Name) + + // Output: Luke Skywalker +}) + +if err != nil { + // Handle error. +} + +// you can unsubscribe the subscription while the client is running +client.Unsubscribe(subscriptionId) +``` + +#### Authentication + +The subscription client is authenticated with GraphQL server through connection params: + +```Go +client := graphql.NewSubscriptionClient("wss://example.com/graphql"). + WithConnectionParams(map[string]interface{}{ + "headers": map[string]string{ + "authentication": "...", + }, + }) + +``` + +#### Options + +```Go +client. + // write timeout of websocket client + WithTimeout(time.Minute). + // When the websocket server was stopped, the client will retry connecting every second until timeout + WithRetryTimeout(time.Minute). + // sets loging function to print out received messages. By default, nothing is printed + WithLog(log.Println). + // max size of response message + WithReadLimit(10*1024*1024). + // these operation event logs won't be printed + WithoutLogTypes(graphql.GQL_DATA, graphql.GQL_CONNECTION_KEEP_ALIVE) + +``` + +#### Events + +```Go +// OnConnected event is triggered when the websocket connected to GraphQL server sucessfully +client.OnConnected(fn func()) + +// OnDisconnected event is triggered when the websocket server was stil down after retry timeout +client.OnDisconnected(fn func()) + +// OnConnected event is triggered when there is any connection error. This is bottom exception handler level +// If this function is empty, or returns nil, the error is ignored +// If returns error, the websocket connection will be terminated +client.OnError(onError func(sc *SubscriptionClient, err error) error) +``` + +### With operation name + +Operatiion name is still on API decision plan https://github.com/shurcooL/graphql/issues/12. However, in my opinion separate methods are easier choice to avoid breaking changes + +```Go +func (c *Client) NamedQuery(ctx context.Context, name string, q interface{}, variables map[string]interface{}) + +func (c *Client) NamedMutate(ctx context.Context, name string, q interface{}, variables map[string]interface{}) + +func (sc *SubscriptionClient) NamedSubscribe(name string, v interface{}, variables map[string]interface{}, handler func(message *json.RawMessage, err error) error) (string, error) +``` + Directories ----------- diff --git a/example/graphqldev/main.go b/example/graphqldev/main.go index ffc8302d460a79d6a5a7bb9282d411aaf894f6ea..3a6ef3cc7a7d898d23ea6b68a62d2269dfc02fd1 100644 --- a/example/graphqldev/main.go +++ b/example/graphqldev/main.go @@ -17,7 +17,7 @@ import ( graphqlserver "github.com/graph-gophers/graphql-go" "github.com/graph-gophers/graphql-go/example/starwars" "github.com/graph-gophers/graphql-go/relay" - "github.com/shurcooL/graphql" + graphql "github.com/hasura/go-graphql-client" ) func main() { diff --git a/example/subscription/main.go b/example/subscription/main.go new file mode 100644 index 0000000000000000000000000000000000000000..4f35042fefe48ba84174c128a852c4bf3817e14f --- /dev/null +++ b/example/subscription/main.go @@ -0,0 +1,97 @@ +// subscription is a test program currently being used for developing graphql package. +// It performs queries against a local test GraphQL server instance. +// +// It's not meant to be a clean or readable example. But it's functional. +// Better, actual examples will be created in the future. +package main + +import ( + "encoding/json" + "flag" + "log" + "os" + "time" + + graphql "github.com/hasura/go-graphql-client" +) + +func main() { + flag.Parse() + + err := run() + if err != nil { + panic(err) + } +} + +func run() error { + url := flag.Arg(0) + client := graphql.NewSubscriptionClient(url). + WithConnectionParams(map[string]interface{}{ + "headers": map[string]string{ + "x-hasura-admin-secret": "hasura", + }, + }).WithLog(log.Println). + WithoutLogTypes(graphql.GQL_DATA, graphql.GQL_CONNECTION_KEEP_ALIVE). + OnError(func(sc *graphql.SubscriptionClient, err error) error { + log.Print("err", err) + return err + }) + + defer client.Close() + + /* + subscription($limit: Int!) { + users(limit: $limit) { + id + name + } + } + */ + var sub struct { + User struct { + ID graphql.ID + Name graphql.String + } `graphql:"users(limit: $limit, order_by: { id: desc })"` + } + type Int int + variables := map[string]interface{}{ + "limit": Int(10), + } + _, err := client.Subscribe(sub, variables, func(data *json.RawMessage, err error) error { + + if err != nil { + return nil + } + + time.Sleep(10 * time.Second) + return nil + }) + + if err != nil { + panic(err) + } + + go func() { + for { + time.Sleep(5 * time.Second) + log.Println("reseting...") + go client.Reset() + } + }() + + go client.Run() + + time.Sleep(time.Minute) + return nil +} + +// print pretty prints v to stdout. It panics on any error. +func print(v interface{}) { + w := json.NewEncoder(os.Stdout) + w.SetIndent("", "\t") + err := w.Encode(v) + if err != nil { + panic(err) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000000000000000000000000000000000000..21170fa480c1da92960327433f3db7a291b7e26f --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module github.com/hasura/go-graphql-client + +go 1.14 + +require ( + github.com/google/uuid v1.1.2 + github.com/graph-gophers/graphql-go v0.0.0-20200819123640-3b5ddcd884ae + golang.org/x/net v0.0.0-20200822124328-c89045814202 + nhooyr.io/websocket v1.8.6 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000000000000000000000000000000000000..d2fe0a023bc68e71fe9d6879fd8a3ba29591053a --- /dev/null +++ b/go.sum @@ -0,0 +1,82 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= +github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= +github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/graph-gophers/graphql-go v0.0.0-20200819123640-3b5ddcd884ae h1:TQuRfD07N7uHp+CW7rCfR579o6PDnwJacRBJH74RMq0= +github.com/graph-gophers/graphql-go v0.0.0-20200819123640-3b5ddcd884ae/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= +github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= +github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= +nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= diff --git a/graphql.go b/graphql.go index 85209562c66dd5320dd79e377b5346f96d588707..045394aed046d9a6e63cb142854e7235beabad51 100644 --- a/graphql.go +++ b/graphql.go @@ -8,7 +8,7 @@ import ( "io/ioutil" "net/http" - "github.com/shurcooL/graphql/internal/jsonutil" + "github.com/hasura/go-graphql-client/internal/jsonutil" "golang.org/x/net/context/ctxhttp" ) @@ -34,24 +34,34 @@ func NewClient(url string, httpClient *http.Client) *Client { // with a query derived from q, populating the response into it. // q should be a pointer to struct that corresponds to the GraphQL schema. func (c *Client) Query(ctx context.Context, q interface{}, variables map[string]interface{}) error { - return c.do(ctx, queryOperation, q, variables) + return c.do(ctx, queryOperation, q, variables, "") +} + +// NamedQuery executes a single GraphQL query request, with operation name +func (c *Client) NamedQuery(ctx context.Context, name string, q interface{}, variables map[string]interface{}) error { + return c.do(ctx, queryOperation, q, variables, name) } // Mutate executes a single GraphQL mutation request, // with a mutation derived from m, populating the response into it. // m should be a pointer to struct that corresponds to the GraphQL schema. func (c *Client) Mutate(ctx context.Context, m interface{}, variables map[string]interface{}) error { - return c.do(ctx, mutationOperation, m, variables) + return c.do(ctx, mutationOperation, m, variables, "") +} + +// NamedMutate executes a single GraphQL mutation request, , with operation name +func (c *Client) NamedMutate(ctx context.Context, name string, m interface{}, variables map[string]interface{}) error { + return c.do(ctx, mutationOperation, m, variables, name) } // do executes a single GraphQL operation. -func (c *Client) do(ctx context.Context, op operationType, v interface{}, variables map[string]interface{}) error { +func (c *Client) do(ctx context.Context, op operationType, v interface{}, variables map[string]interface{}, name string) error { var query string switch op { case queryOperation: - query = constructQuery(v, variables) + query = constructQuery(v, variables, name) case mutationOperation: - query = constructMutation(v, variables) + query = constructMutation(v, variables, name) } in := struct { Query string `json:"query"` diff --git a/graphql_test.go b/graphql_test.go index e09dcc94db33f5bfa80e169031615d3ba03be33b..c0ce717812974ebb827d322ec4ca0dd871bd27d9 100644 --- a/graphql_test.go +++ b/graphql_test.go @@ -8,7 +8,7 @@ import ( "net/http/httptest" "testing" - "github.com/shurcooL/graphql" + "github.com/hasura/go-graphql-client" ) func TestClient_Query_partialDataWithErrorResponse(t *testing.T) { diff --git a/ident/ident_test.go b/ident/ident_test.go index 9ee1b47ca0a87c05323a61e4bfdbd11486e1b04e..6402dbb1704fa04cff8a68eeb574171b6df9a253 100644 --- a/ident/ident_test.go +++ b/ident/ident_test.go @@ -5,7 +5,7 @@ import ( "reflect" "testing" - "github.com/shurcooL/graphql/ident" + "github.com/hasura/go-graphql-client/ident" ) func Example_lowerCamelCaseToMixedCaps() { diff --git a/internal/jsonutil/benchmark_test.go b/internal/jsonutil/benchmark_test.go index f8788b169d98419a23f2bb7e7e2ed351ad7d166f..a50f331d8a8f45ea81572ece79ee60fb9e7b11f1 100644 --- a/internal/jsonutil/benchmark_test.go +++ b/internal/jsonutil/benchmark_test.go @@ -8,8 +8,8 @@ import ( "testing" "time" - "github.com/shurcooL/graphql" - "github.com/shurcooL/graphql/internal/jsonutil" + graphql "github.com/hasura/go-graphql-client" + "github.com/hasura/go-graphql-client/internal/jsonutil" ) func TestUnmarshalGraphQL_benchmark(t *testing.T) { diff --git a/internal/jsonutil/graphql_test.go b/internal/jsonutil/graphql_test.go index 6329ed833f4937bc3a3a365d691af8afbc4fcf13..fa42d060d70c3b0dd57ba63e2ca2791a1da44fc8 100644 --- a/internal/jsonutil/graphql_test.go +++ b/internal/jsonutil/graphql_test.go @@ -5,8 +5,8 @@ import ( "testing" "time" - "github.com/shurcooL/graphql" - "github.com/shurcooL/graphql/internal/jsonutil" + graphql "github.com/hasura/go-graphql-client" + "github.com/hasura/go-graphql-client/internal/jsonutil" ) func TestUnmarshalGraphQL(t *testing.T) { diff --git a/query.go b/query.go index e10b77189bea397054a93ced5e274fcd945ef3b6..28b6b80e3c26fc9537ad10b7adbd960b283b7e95 100644 --- a/query.go +++ b/query.go @@ -7,25 +7,43 @@ import ( "reflect" "sort" - "github.com/shurcooL/graphql/ident" + "github.com/hasura/go-graphql-client/ident" ) -func constructQuery(v interface{}, variables map[string]interface{}) string { +func constructQuery(v interface{}, variables map[string]interface{}, name string) string { query := query(v) if len(variables) > 0 { - return "query(" + queryArguments(variables) + ")" + query + return "query " + name + "(" + queryArguments(variables) + ")" + query + } + + if name != "" { + return "query " + name + query } return query } -func constructMutation(v interface{}, variables map[string]interface{}) string { +func constructMutation(v interface{}, variables map[string]interface{}, name string) string { query := query(v) if len(variables) > 0 { - return "mutation(" + queryArguments(variables) + ")" + query + return "mutation " + name + "(" + queryArguments(variables) + ")" + query + } + if name != "" { + return "mutation " + name + query } return "mutation" + query } +func constructSubscription(v interface{}, variables map[string]interface{}, name string) string { + query := query(v) + if len(variables) > 0 { + return "subscription " + name + "(" + queryArguments(variables) + ")" + query + } + if name != "" { + return "subscription " + name + query + } + return "subscription" + query +} + // queryArguments constructs a minified arguments string for variables. // // E.g., map[string]interface{}{"a": Int(123), "b": NewBoolean(true)} -> "$a:Int!$b:Boolean". diff --git a/query_test.go b/query_test.go index 4de8cb5f21a4dd5ea2d26003a8bd6915be75449b..1706c9e1365c599cff0550543db6215c531bf47e 100644 --- a/query_test.go +++ b/query_test.go @@ -8,6 +8,7 @@ import ( func TestConstructQuery(t *testing.T) { tests := []struct { + name string inV interface{} inVariables map[string]interface{} want string @@ -30,6 +31,7 @@ func TestConstructQuery(t *testing.T) { want: `{viewer{login,createdAt,id,databaseId},rateLimit{cost,limit,remaining,resetAt}}`, }, { + name: "GetRepository", inV: struct { Repository struct { DatabaseID Int @@ -53,7 +55,7 @@ func TestConstructQuery(t *testing.T) { } `graphql:"issue(number:1)"` } `graphql:"repository(owner:\"shurcooL-test\"name:\"test-repo\")"` }{}, - want: `{repository(owner:"shurcooL-test"name:"test-repo"){databaseId,url,issue(number:1){comments(first:1after:"Y3Vyc29yOjE5NTE4NDI1Ng=="){edges{node{body,author{login},editor{login}},cursor}}}}}`, + want: `query GetRepository{repository(owner:"shurcooL-test"name:"test-repo"){databaseId,url,issue(number:1){comments(first:1after:"Y3Vyc29yOjE5NTE4NDI1Ng=="){edges{node{body,author{login},editor{login}},cursor}}}}}`, }, { inV: func() interface{} { @@ -165,9 +167,10 @@ func TestConstructQuery(t *testing.T) { "repositoryName": String("test-repo"), "issueNumber": Int(1), }, - want: `query($issueNumber:Int!$repositoryName:String!$repositoryOwner:String!){repository(owner: $repositoryOwner, name: $repositoryName){issue(number: $issueNumber){body}}}`, + want: `query ($issueNumber:Int!$repositoryName:String!$repositoryOwner:String!){repository(owner: $repositoryOwner, name: $repositoryName){issue(number: $issueNumber){body}}}`, }, { + name: "SearchRepository", inV: struct { Repository struct { Issue struct { @@ -186,7 +189,7 @@ func TestConstructQuery(t *testing.T) { "repositoryName": String("test-repo"), "issueNumber": Int(1), }, - want: `query($issueNumber:Int!$repositoryName:String!$repositoryOwner:String!){repository(owner: $repositoryOwner, name: $repositoryName){issue(number: $issueNumber){reactionGroups{users(first:10){nodes{login}}}}}}`, + want: `query SearchRepository($issueNumber:Int!$repositoryName:String!$repositoryOwner:String!){repository(owner: $repositoryOwner, name: $repositoryName){issue(number: $issueNumber){reactionGroups{users(first:10){nodes{login}}}}}}`, }, // Embedded structs without graphql tag should be inlined in query. { @@ -229,7 +232,7 @@ func TestConstructQuery(t *testing.T) { }, } for _, tc := range tests { - got := constructQuery(tc.inV, tc.inVariables) + got := constructQuery(tc.inV, tc.inVariables, tc.name) if got != tc.want { t.Errorf("\ngot: %q\nwant: %q\n", got, tc.want) } @@ -264,7 +267,240 @@ func TestConstructMutation(t *testing.T) { }, } for _, tc := range tests { - got := constructMutation(tc.inV, tc.inVariables) + got := constructMutation(tc.inV, tc.inVariables, "") + if got != tc.want { + t.Errorf("\ngot: %q\nwant: %q\n", got, tc.want) + } + } +} + +func TestConstructSubscription(t *testing.T) { + tests := []struct { + name string + inV interface{} + inVariables map[string]interface{} + want string + }{ + { + inV: struct { + Viewer struct { + Login String + CreatedAt DateTime + ID ID + DatabaseID Int + } + RateLimit struct { + Cost Int + Limit Int + Remaining Int + ResetAt DateTime + } + }{}, + want: `subscription{viewer{login,createdAt,id,databaseId},rateLimit{cost,limit,remaining,resetAt}}`, + }, + { + name: "GetRepository", + inV: struct { + Repository struct { + DatabaseID Int + URL URI + + Issue struct { + Comments struct { + Edges []struct { + Node struct { + Body String + Author struct { + Login String + } + Editor struct { + Login String + } + } + Cursor String + } + } `graphql:"comments(first:1after:\"Y3Vyc29yOjE5NTE4NDI1Ng==\")"` + } `graphql:"issue(number:1)"` + } `graphql:"repository(owner:\"shurcooL-test\"name:\"test-repo\")"` + }{}, + want: `subscription GetRepository{repository(owner:"shurcooL-test"name:"test-repo"){databaseId,url,issue(number:1){comments(first:1after:"Y3Vyc29yOjE5NTE4NDI1Ng=="){edges{node{body,author{login},editor{login}},cursor}}}}}`, + }, + { + inV: func() interface{} { + type actor struct { + Login String + AvatarURL URI + URL URI + } + + return struct { + Repository struct { + DatabaseID Int + URL URI + + Issue struct { + Comments struct { + Edges []struct { + Node struct { + DatabaseID Int + Author actor + PublishedAt DateTime + LastEditedAt *DateTime + Editor *actor + Body String + ViewerCanUpdate Boolean + } + Cursor String + } + } `graphql:"comments(first:1)"` + } `graphql:"issue(number:1)"` + } `graphql:"repository(owner:\"shurcooL-test\"name:\"test-repo\")"` + }{} + }(), + want: `subscription{repository(owner:"shurcooL-test"name:"test-repo"){databaseId,url,issue(number:1){comments(first:1){edges{node{databaseId,author{login,avatarUrl,url},publishedAt,lastEditedAt,editor{login,avatarUrl,url},body,viewerCanUpdate},cursor}}}}}`, + }, + { + inV: func() interface{} { + type actor struct { + Login String + AvatarURL URI `graphql:"avatarUrl(size:72)"` + URL URI + } + + return struct { + Repository struct { + Issue struct { + Author actor + PublishedAt DateTime + LastEditedAt *DateTime + Editor *actor + Body String + ReactionGroups []struct { + Content ReactionContent + Users struct { + TotalCount Int + } + ViewerHasReacted Boolean + } + ViewerCanUpdate Boolean + + Comments struct { + Nodes []struct { + DatabaseID Int + Author actor + PublishedAt DateTime + LastEditedAt *DateTime + Editor *actor + Body String + ReactionGroups []struct { + Content ReactionContent + Users struct { + TotalCount Int + } + ViewerHasReacted Boolean + } + ViewerCanUpdate Boolean + } + PageInfo struct { + EndCursor String + HasNextPage Boolean + } + } `graphql:"comments(first:1)"` + } `graphql:"issue(number:1)"` + } `graphql:"repository(owner:\"shurcooL-test\"name:\"test-repo\")"` + }{} + }(), + want: `subscription{repository(owner:"shurcooL-test"name:"test-repo"){issue(number:1){author{login,avatarUrl(size:72),url},publishedAt,lastEditedAt,editor{login,avatarUrl(size:72),url},body,reactionGroups{content,users{totalCount},viewerHasReacted},viewerCanUpdate,comments(first:1){nodes{databaseId,author{login,avatarUrl(size:72),url},publishedAt,lastEditedAt,editor{login,avatarUrl(size:72),url},body,reactionGroups{content,users{totalCount},viewerHasReacted},viewerCanUpdate},pageInfo{endCursor,hasNextPage}}}}}`, + }, + { + inV: struct { + Repository struct { + Issue struct { + Body String + } `graphql:"issue(number: 1)"` + } `graphql:"repository(owner:\"shurcooL-test\"name:\"test-repo\")"` + }{}, + want: `subscription{repository(owner:"shurcooL-test"name:"test-repo"){issue(number: 1){body}}}`, + }, + { + inV: struct { + Repository struct { + Issue struct { + Body String + } `graphql:"issue(number: $issueNumber)"` + } `graphql:"repository(owner: $repositoryOwner, name: $repositoryName)"` + }{}, + inVariables: map[string]interface{}{ + "repositoryOwner": String("shurcooL-test"), + "repositoryName": String("test-repo"), + "issueNumber": Int(1), + }, + want: `subscription ($issueNumber:Int!$repositoryName:String!$repositoryOwner:String!){repository(owner: $repositoryOwner, name: $repositoryName){issue(number: $issueNumber){body}}}`, + }, + { + name: "SearchRepository", + inV: struct { + Repository struct { + Issue struct { + ReactionGroups []struct { + Users struct { + Nodes []struct { + Login String + } + } `graphql:"users(first:10)"` + } + } `graphql:"issue(number: $issueNumber)"` + } `graphql:"repository(owner: $repositoryOwner, name: $repositoryName)"` + }{}, + inVariables: map[string]interface{}{ + "repositoryOwner": String("shurcooL-test"), + "repositoryName": String("test-repo"), + "issueNumber": Int(1), + }, + want: `subscription SearchRepository($issueNumber:Int!$repositoryName:String!$repositoryOwner:String!){repository(owner: $repositoryOwner, name: $repositoryName){issue(number: $issueNumber){reactionGroups{users(first:10){nodes{login}}}}}}`, + }, + // Embedded structs without graphql tag should be inlined in query. + { + inV: func() interface{} { + type actor struct { + Login String + AvatarURL URI + URL URI + } + type event struct { // Common fields for all events. + Actor actor + CreatedAt DateTime + } + type IssueComment struct { + Body String + } + return struct { + event // Should be inlined. + IssueComment `graphql:"... on IssueComment"` // Should not be, because of graphql tag. + CurrentTitle String + PreviousTitle String + Label struct { + Name String + Color String + } + }{} + }(), + want: `subscription{actor{login,avatarUrl,url},createdAt,... on IssueComment{body},currentTitle,previousTitle,label{name,color}}`, + }, + { + inV: struct { + Viewer struct { + Login string + CreatedAt time.Time + ID interface{} + DatabaseID int + } + }{}, + want: `subscription{viewer{login,createdAt,id,databaseId}}`, + }, + } + for _, tc := range tests { + got := constructSubscription(tc.inV, tc.inVariables, tc.name) if got != tc.want { t.Errorf("\ngot: %q\nwant: %q\n", got, tc.want) } diff --git a/scalar_test.go b/scalar_test.go index 8334b960167f0c0fad559f23c734b6f73e4c57d8..bdb14a15a9bdc03c4afc5abfd5fcbb58338eaa16 100644 --- a/scalar_test.go +++ b/scalar_test.go @@ -3,7 +3,7 @@ package graphql_test import ( "testing" - "github.com/shurcooL/graphql" + "github.com/hasura/go-graphql-client" ) func TestNewScalars(t *testing.T) { diff --git a/subscription.go b/subscription.go new file mode 100644 index 0000000000000000000000000000000000000000..139a349495dcaccc518c3100fcd115bb3c63c00e --- /dev/null +++ b/subscription.go @@ -0,0 +1,589 @@ +package graphql + +import ( + "context" + "encoding/json" + "fmt" + "io" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "nhooyr.io/websocket" + "nhooyr.io/websocket/wsjson" +) + +// Subscription transport follow Apollo's subscriptions-transport-ws protocol specification +// https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md + +// OperationMessageType +type OperationMessageType string + +const ( + // Client sends this message after plain websocket connection to start the communication with the server + GQL_CONNECTION_INIT OperationMessageType = "connection_init" + // The server may responses with this message to the GQL_CONNECTION_INIT from client, indicates the server rejected the connection. + GQL_CONNECTION_ERROR OperationMessageType = "conn_err" + // Client sends this message to execute GraphQL operation + GQL_START OperationMessageType = "start" + // Client sends this message in order to stop a running GraphQL operation execution (for example: unsubscribe) + GQL_STOP OperationMessageType = "stop" + // Server sends this message upon a failing operation, before the GraphQL execution, usually due to GraphQL validation errors (resolver errors are part of GQL_DATA message, and will be added as errors array) + GQL_ERROR OperationMessageType = "error" + // The server sends this message to transfter the GraphQL execution result from the server to the client, this message is a response for GQL_START message. + GQL_DATA OperationMessageType = "data" + // Server sends this message to indicate that a GraphQL operation is done, and no more data will arrive for the specific operation. + GQL_COMPLETE OperationMessageType = "complete" + // Server message that should be sent right after each GQL_CONNECTION_ACK processed and then periodically to keep the client connection alive. + // The client starts to consider the keep alive message only upon the first received keep alive message from the server. + GQL_CONNECTION_KEEP_ALIVE OperationMessageType = "ka" + // The server may responses with this message to the GQL_CONNECTION_INIT from client, indicates the server accepted the connection. May optionally include a payload. + GQL_CONNECTION_ACK OperationMessageType = "connection_ack" + // Client sends this message to terminate the connection. + GQL_CONNECTION_TERMINATE OperationMessageType = "connection_terminate" + // Unknown operation type, for logging only + GQL_UNKNOWN OperationMessageType = "unknown" + // Internal status, for logging only + GQL_INTERNAL OperationMessageType = "internal" +) + +type OperationMessage struct { + ID string `json:"id,omitempty"` + Type OperationMessageType `json:"type"` + Payload json.RawMessage `json:"payload,omitempty"` +} + +func (om OperationMessage) String() string { + bs, _ := json.Marshal(om) + + return string(bs) +} + +// WebsocketHandler abstracts WebSocket connecton functions +// ReadJSON and WriteJSON data of a frame from the WebSocket connection. +// Close the WebSocket connection. +type WebsocketConn interface { + ReadJSON(v interface{}) error + WriteJSON(v interface{}) error + Close() error + // SetReadLimit sets the maximum size in bytes for a message read from the peer. If a + // message exceeds the limit, the connection sends a close message to the peer + // and returns ErrReadLimit to the application. + SetReadLimit(limit int64) +} + +type handlerFunc func(data *json.RawMessage, err error) error +type subscription struct { + query string + variables map[string]interface{} + handler func(data *json.RawMessage, err error) + started Boolean +} + +// SubscriptionClient is a GraphQL subscription client. +type SubscriptionClient struct { + url string + conn WebsocketConn + connectionParams map[string]interface{} + context context.Context + subscriptions map[string]*subscription + cancel context.CancelFunc + subscribersMu sync.Mutex + timeout time.Duration + isRunning Boolean + readLimit int64 // max size of response message. Default 10 MB + log func(args ...interface{}) + createConn func(sc *SubscriptionClient) (WebsocketConn, error) + retryTimeout time.Duration + onConnected func() + onDisconnected func() + onError func(sc *SubscriptionClient, err error) error + errorChan chan error + disabledLogTypes []OperationMessageType +} + +func NewSubscriptionClient(url string) *SubscriptionClient { + return &SubscriptionClient{ + url: url, + timeout: time.Minute, + readLimit: 10 * 1024 * 1024, // set default limit 10MB + subscriptions: make(map[string]*subscription), + createConn: newWebsocketConn, + retryTimeout: time.Minute, + errorChan: make(chan error), + } +} + +// GetURL returns GraphQL server's URL +func (sc *SubscriptionClient) GetURL() string { + return sc.url +} + +// GetContext returns current context of subscription client +func (sc *SubscriptionClient) GetContext() context.Context { + return sc.context +} + +// GetContext returns write timeout of websocket client +func (sc *SubscriptionClient) GetTimeout() time.Duration { + return sc.timeout +} + +// WithWebSocket replaces customized websocket client constructor +// In default, subscription client uses https://github.com/nhooyr/websocket +func (sc *SubscriptionClient) WithWebSocket(fn func(sc *SubscriptionClient) (WebsocketConn, error)) *SubscriptionClient { + sc.createConn = fn + return sc +} + +// WithConnectionParams updates connection params for sending to server through GQL_CONNECTION_INIT event +// It's usually used for authentication handshake +func (sc *SubscriptionClient) WithConnectionParams(params map[string]interface{}) *SubscriptionClient { + sc.connectionParams = params + return sc +} + +// WithTimeout updates write timeout of websocket client +func (sc *SubscriptionClient) WithTimeout(timeout time.Duration) *SubscriptionClient { + sc.timeout = timeout + return sc +} + +// WithRetryTimeout updates reconnecting timeout. When the websocket server was stopped, the client will retry connecting every second until timeout +func (sc *SubscriptionClient) WithRetryTimeout(timeout time.Duration) *SubscriptionClient { + sc.retryTimeout = timeout + return sc +} + +// WithLog sets loging function to print out received messages. By default, nothing is printed +func (sc *SubscriptionClient) WithLog(logger func(args ...interface{})) *SubscriptionClient { + sc.log = logger + return sc +} + +// WithoutLogTypes these operation types won't be printed +func (sc *SubscriptionClient) WithoutLogTypes(types ...OperationMessageType) *SubscriptionClient { + sc.disabledLogTypes = types + return sc +} + +// WithReadLimit set max size of response message +func (sc *SubscriptionClient) WithReadLimit(limit int64) *SubscriptionClient { + sc.readLimit = limit + return sc +} + +// OnConnected event is triggered when there is any connection error. This is bottom exception handler level +// If this function is empty, or returns nil, the error is ignored +// If returns error, the websocket connection will be terminated +func (sc *SubscriptionClient) OnError(onError func(sc *SubscriptionClient, err error) error) *SubscriptionClient { + sc.onError = onError + return sc +} + +// OnConnected event is triggered when the websocket connected to GraphQL server sucessfully +func (sc *SubscriptionClient) OnConnected(fn func()) *SubscriptionClient { + sc.onConnected = fn + return sc +} + +// OnDisconnected event is triggered when the websocket server was stil down after retry timeout +func (sc *SubscriptionClient) OnDisconnected(fn func()) *SubscriptionClient { + sc.onDisconnected = fn + return sc +} + +func (sc *SubscriptionClient) setIsRunning(value Boolean) { + sc.subscribersMu.Lock() + sc.isRunning = value + sc.subscribersMu.Unlock() +} + +func (sc *SubscriptionClient) init() error { + + now := time.Now() + ctx, cancel := context.WithCancel(context.Background()) + sc.context = ctx + sc.cancel = cancel + + for { + var err error + var conn WebsocketConn + // allow custom websocket client + if sc.conn == nil { + conn, err = newWebsocketConn(sc) + if err == nil { + sc.conn = conn + } + } + + if err == nil { + sc.conn.SetReadLimit(sc.readLimit) + // send connection init event to the server + err = sc.sendConnectionInit() + } + + if err == nil { + return nil + } + + if now.Add(sc.retryTimeout).Before(time.Now()) { + if sc.onDisconnected != nil { + sc.onDisconnected() + } + return err + } + sc.printLog(err.Error()+". retry in second....", GQL_INTERNAL) + time.Sleep(time.Second) + } +} + +func (sc *SubscriptionClient) printLog(message interface{}, opType OperationMessageType) { + if sc.log == nil { + return + } + for _, ty := range sc.disabledLogTypes { + if ty == opType { + return + } + } + + sc.log(message) +} + +func (sc *SubscriptionClient) sendConnectionInit() (err error) { + var bParams []byte = nil + if sc.connectionParams != nil { + + bParams, err = json.Marshal(sc.connectionParams) + if err != nil { + return + } + } + + // send connection_init event to the server + msg := OperationMessage{ + Type: GQL_CONNECTION_INIT, + Payload: bParams, + } + + sc.printLog(msg, GQL_CONNECTION_INIT) + return sc.conn.WriteJSON(msg) +} + +// Subscribe sends start message to server and open a channel to receive data. +// The handler callback function will receive raw message data or error. If the call return error, onError event will be triggered +// The function returns subscription ID and error. You can use subscription ID to unsubscribe the subscription +func (sc *SubscriptionClient) Subscribe(v interface{}, variables map[string]interface{}, handler func(message *json.RawMessage, err error) error) (string, error) { + return sc.do(v, variables, handler, "") +} + +// NamedSubscribe sends start message to server and open a channel to receive data, with operation name +func (sc *SubscriptionClient) NamedSubscribe(name string, v interface{}, variables map[string]interface{}, handler func(message *json.RawMessage, err error) error) (string, error) { + return sc.do(v, variables, handler, name) +} + +func (sc *SubscriptionClient) do(v interface{}, variables map[string]interface{}, handler func(message *json.RawMessage, err error) error, name string) (string, error) { + id := uuid.New().String() + query := constructSubscription(v, variables, name) + + sub := subscription{ + query: query, + variables: variables, + handler: sc.wrapHandler(handler), + } + + // if the websocket client is running, start subscription immediately + if sc.isRunning { + if err := sc.startSubscription(id, &sub); err != nil { + return "", err + } + } + + sc.subscribersMu.Lock() + sc.subscriptions[id] = &sub + sc.subscribersMu.Unlock() + + return id, nil +} + +// Subscribe sends start message to server and open a channel to receive data +func (sc *SubscriptionClient) startSubscription(id string, sub *subscription) error { + if sub == nil || sub.started { + return nil + } + + in := struct { + Query string `json:"query"` + Variables map[string]interface{} `json:"variables,omitempty"` + }{ + Query: sub.query, + Variables: sub.variables, + } + + payload, err := json.Marshal(in) + if err != nil { + return err + } + + // send stop message to the server + msg := OperationMessage{ + ID: id, + Type: GQL_START, + Payload: payload, + } + + sc.printLog(msg, GQL_START) + if err := sc.conn.WriteJSON(msg); err != nil { + return err + } + + sub.started = true + return nil +} + +func (sc *SubscriptionClient) wrapHandler(fn handlerFunc) func(data *json.RawMessage, err error) { + return func(data *json.RawMessage, err error) { + if errValue := fn(data, err); errValue != nil { + sc.errorChan <- errValue + } + } +} + +// Run start websocket client and subscriptions. If this function is run with goroutine, it can be stopped after closed +func (sc *SubscriptionClient) Run() error { + if err := sc.init(); err != nil { + return fmt.Errorf("retry timeout. exiting...") + } + + // lazily start subscriptions + for k, v := range sc.subscriptions { + if err := sc.startSubscription(k, v); err != nil { + sc.Unsubscribe(k) + return err + } + } + sc.setIsRunning(true) + + for sc.isRunning { + select { + case <-sc.context.Done(): + return nil + case e := <-sc.errorChan: + if sc.onError != nil { + if err := sc.onError(sc, e); err != nil { + return err + } + } + default: + + var message OperationMessage + if err := sc.conn.ReadJSON(&message); err != nil { + // manual EOF check + if err == io.EOF || strings.Contains(err.Error(), "EOF") { + return sc.Reset() + } + closeStatus := websocket.CloseStatus(err) + if closeStatus == websocket.StatusNormalClosure { + // close event from websocket client, exiting... + return nil + } + if closeStatus != -1 { + sc.printLog(fmt.Sprintf("%s. Retry connecting...", err), GQL_INTERNAL) + return sc.Reset() + } + + if sc.onError != nil { + if err = sc.onError(sc, err); err != nil { + return err + } + } + continue + } + + switch message.Type { + case GQL_ERROR: + sc.printLog(message, GQL_ERROR) + fallthrough + case GQL_DATA: + sc.printLog(message, GQL_DATA) + id, err := uuid.Parse(message.ID) + if err != nil { + continue + } + sub, ok := sc.subscriptions[id.String()] + if !ok { + continue + } + var out struct { + Data *json.RawMessage + Errors errors + //Extensions interface{} // Unused. + } + + err = json.Unmarshal(message.Payload, &out) + if err != nil { + go sub.handler(nil, err) + continue + } + if len(out.Errors) > 0 { + go sub.handler(nil, out.Errors) + continue + } + + go sub.handler(out.Data, nil) + case GQL_CONNECTION_ERROR: + sc.printLog(message, GQL_CONNECTION_ERROR) + case GQL_COMPLETE: + sc.printLog(message, GQL_COMPLETE) + sc.Unsubscribe(message.ID) + case GQL_CONNECTION_KEEP_ALIVE: + sc.printLog(message, GQL_CONNECTION_KEEP_ALIVE) + case GQL_CONNECTION_ACK: + sc.printLog(message, GQL_CONNECTION_ACK) + if sc.onConnected != nil { + sc.onConnected() + } + default: + sc.printLog(message, GQL_UNKNOWN) + } + } + } + + // if the running status is false, stop retrying + if !sc.isRunning { + return nil + } + + return sc.Reset() +} + +// Unsubscribe sends stop message to server and close subscription channel +// The input parameter is subscription ID that is returned from Subscribe function +func (sc *SubscriptionClient) Unsubscribe(id string) error { + _, ok := sc.subscriptions[id] + if !ok { + return fmt.Errorf("subscription id %s doesn't not exist", id) + } + + err := sc.stopSubscription(id) + + sc.subscribersMu.Lock() + delete(sc.subscriptions, id) + sc.subscribersMu.Unlock() + return err +} + +func (sc *SubscriptionClient) stopSubscription(id string) error { + if sc.conn != nil { + // send stop message to the server + msg := OperationMessage{ + ID: id, + Type: GQL_STOP, + } + + sc.printLog(msg, GQL_STOP) + if err := sc.conn.WriteJSON(msg); err != nil { + return err + } + + } + + return nil +} + +func (sc *SubscriptionClient) terminate() error { + if sc.conn != nil { + // send terminate message to the server + msg := OperationMessage{ + Type: GQL_CONNECTION_TERMINATE, + } + + sc.printLog(msg, GQL_CONNECTION_TERMINATE) + return sc.conn.WriteJSON(msg) + } + + return nil +} + +// Reset restart websocket connection and subscriptions +func (sc *SubscriptionClient) Reset() error { + if !sc.isRunning { + return nil + } + + for id, sub := range sc.subscriptions { + _ = sc.stopSubscription(id) + sub.started = false + } + + if sc.conn != nil { + _ = sc.terminate() + _ = sc.conn.Close() + sc.conn = nil + } + sc.cancel() + + return sc.Run() +} + +// Close closes all subscription channel and websocket as well +func (sc *SubscriptionClient) Close() (err error) { + sc.setIsRunning(false) + for id := range sc.subscriptions { + if err = sc.Unsubscribe(id); err != nil { + sc.cancel() + return err + } + } + if sc.conn != nil { + _ = sc.terminate() + err = sc.conn.Close() + sc.conn = nil + } + sc.cancel() + + return +} + +// default websocket handler implementation using https://github.com/nhooyr/websocket +type websocketHandler struct { + ctx context.Context + timeout time.Duration + *websocket.Conn +} + +func (wh *websocketHandler) WriteJSON(v interface{}) error { + ctx, cancel := context.WithTimeout(wh.ctx, wh.timeout) + defer cancel() + + return wsjson.Write(ctx, wh.Conn, v) +} + +func (wh *websocketHandler) ReadJSON(v interface{}) error { + ctx, cancel := context.WithTimeout(wh.ctx, wh.timeout) + defer cancel() + return wsjson.Read(ctx, wh.Conn, v) +} + +func (wh *websocketHandler) Close() error { + return wh.Conn.Close(websocket.StatusNormalClosure, "close websocket") +} + +func newWebsocketConn(sc *SubscriptionClient) (WebsocketConn, error) { + + options := &websocket.DialOptions{ + Subprotocols: []string{"graphql-ws"}, + } + c, _, err := websocket.Dial(sc.GetContext(), sc.GetURL(), options) + if err != nil { + return nil, err + } + + return &websocketHandler{ + ctx: sc.GetContext(), + Conn: c, + timeout: sc.GetTimeout(), + }, nil +}