diff options
| author | Mistivia <i@mistivia.com> | 2025-11-02 15:27:18 +0800 |
|---|---|---|
| committer | Mistivia <i@mistivia.com> | 2025-11-02 15:27:18 +0800 |
| commit | e9c24f4af7ed56760f6db7941827d09f6db9020b (patch) | |
| tree | 62128c43b883ce5e3148113350978755779bb5de /teleirc/matterbridge/vendor/go.mau.fi/whatsmeow/request.go | |
| parent | 58d5e7cfda4781d8a57ec52aefd02983835c301a (diff) | |
add matterbridge
Diffstat (limited to 'teleirc/matterbridge/vendor/go.mau.fi/whatsmeow/request.go')
| -rw-r--r-- | teleirc/matterbridge/vendor/go.mau.fi/whatsmeow/request.go | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/teleirc/matterbridge/vendor/go.mau.fi/whatsmeow/request.go b/teleirc/matterbridge/vendor/go.mau.fi/whatsmeow/request.go new file mode 100644 index 0000000..6945fd7 --- /dev/null +++ b/teleirc/matterbridge/vendor/go.mau.fi/whatsmeow/request.go @@ -0,0 +1,221 @@ +// Copyright (c) 2021 Tulir Asokan +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package whatsmeow + +import ( + "context" + "fmt" + "strconv" + "sync/atomic" + "time" + + waBinary "go.mau.fi/whatsmeow/binary" + "go.mau.fi/whatsmeow/types" +) + +func (cli *Client) generateRequestID() string { + return cli.uniqueID + strconv.FormatUint(uint64(atomic.AddUint32(&cli.idCounter, 1)), 10) +} + +var xmlStreamEndNode = &waBinary.Node{Tag: "xmlstreamend"} + +func isDisconnectNode(node *waBinary.Node) bool { + return node == xmlStreamEndNode || node.Tag == "stream:error" +} + +// isAuthErrorDisconnect checks if the given disconnect node is an error that shouldn't cause retrying. +func isAuthErrorDisconnect(node *waBinary.Node) bool { + if node.Tag != "stream:error" { + return false + } + code, _ := node.Attrs["code"].(string) + conflict, _ := node.GetOptionalChildByTag("conflict") + conflictType := conflict.AttrGetter().OptionalString("type") + if code == "401" || conflictType == "replaced" || conflictType == "device_removed" { + return true + } + return false +} + +func (cli *Client) clearResponseWaiters(node *waBinary.Node) { + cli.responseWaitersLock.Lock() + for _, waiter := range cli.responseWaiters { + select { + case waiter <- node: + default: + close(waiter) + } + } + cli.responseWaiters = make(map[string]chan<- *waBinary.Node) + cli.responseWaitersLock.Unlock() +} + +func (cli *Client) waitResponse(reqID string) chan *waBinary.Node { + ch := make(chan *waBinary.Node, 1) + cli.responseWaitersLock.Lock() + cli.responseWaiters[reqID] = ch + cli.responseWaitersLock.Unlock() + return ch +} + +func (cli *Client) cancelResponse(reqID string, ch chan *waBinary.Node) { + cli.responseWaitersLock.Lock() + close(ch) + delete(cli.responseWaiters, reqID) + cli.responseWaitersLock.Unlock() +} + +func (cli *Client) receiveResponse(data *waBinary.Node) bool { + id, ok := data.Attrs["id"].(string) + if !ok || (data.Tag != "iq" && data.Tag != "ack") { + return false + } + cli.responseWaitersLock.Lock() + waiter, ok := cli.responseWaiters[id] + if !ok { + cli.responseWaitersLock.Unlock() + return false + } + delete(cli.responseWaiters, id) + cli.responseWaitersLock.Unlock() + waiter <- data + return true +} + +type infoQueryType string + +const ( + iqSet infoQueryType = "set" + iqGet infoQueryType = "get" +) + +type infoQuery struct { + Namespace string + Type infoQueryType + To types.JID + Target types.JID + ID string + Content interface{} + + Timeout time.Duration + NoRetry bool + Context context.Context +} + +func (cli *Client) sendIQAsyncAndGetData(query *infoQuery) (<-chan *waBinary.Node, []byte, error) { + if len(query.ID) == 0 { + query.ID = cli.generateRequestID() + } + waiter := cli.waitResponse(query.ID) + attrs := waBinary.Attrs{ + "id": query.ID, + "xmlns": query.Namespace, + "type": string(query.Type), + } + if !query.To.IsEmpty() { + attrs["to"] = query.To + } + if !query.Target.IsEmpty() { + attrs["target"] = query.Target + } + data, err := cli.sendNodeAndGetData(waBinary.Node{ + Tag: "iq", + Attrs: attrs, + Content: query.Content, + }) + if err != nil { + cli.cancelResponse(query.ID, waiter) + return nil, data, err + } + return waiter, data, nil +} + +func (cli *Client) sendIQAsync(query infoQuery) (<-chan *waBinary.Node, error) { + ch, _, err := cli.sendIQAsyncAndGetData(&query) + return ch, err +} + +func (cli *Client) sendIQ(query infoQuery) (*waBinary.Node, error) { + resChan, data, err := cli.sendIQAsyncAndGetData(&query) + if err != nil { + return nil, err + } + if query.Timeout == 0 { + query.Timeout = 75 * time.Second + } + if query.Context == nil { + query.Context = context.Background() + } + select { + case res := <-resChan: + if isDisconnectNode(res) { + if query.NoRetry { + return nil, &DisconnectedError{Action: "info query", Node: res} + } + res, err = cli.retryFrame("info query", query.ID, data, res, query.Context, query.Timeout) + if err != nil { + return nil, err + } + } + resType, _ := res.Attrs["type"].(string) + if res.Tag != "iq" || (resType != "result" && resType != "error") { + return res, &IQError{RawNode: res} + } else if resType == "error" { + return res, parseIQError(res) + } + return res, nil + case <-query.Context.Done(): + return nil, query.Context.Err() + case <-time.After(query.Timeout): + return nil, ErrIQTimedOut + } +} + +func (cli *Client) retryFrame(reqType, id string, data []byte, origResp *waBinary.Node, ctx context.Context, timeout time.Duration) (*waBinary.Node, error) { + if isAuthErrorDisconnect(origResp) { + cli.Log.Debugf("%s (%s) was interrupted by websocket disconnection (%s), not retrying as it looks like an auth error", id, reqType, origResp.XMLString()) + return nil, &DisconnectedError{Action: reqType, Node: origResp} + } + + cli.Log.Debugf("%s (%s) was interrupted by websocket disconnection (%s), waiting for reconnect to retry...", id, reqType, origResp.XMLString()) + if !cli.WaitForConnection(5 * time.Second) { + cli.Log.Debugf("Websocket didn't reconnect within 5 seconds of failed %s (%s)", reqType, id) + return nil, &DisconnectedError{Action: reqType, Node: origResp} + } + + cli.socketLock.RLock() + sock := cli.socket + cli.socketLock.RUnlock() + if sock == nil { + return nil, ErrNotConnected + } + + respChan := cli.waitResponse(id) + err := sock.SendFrame(data) + if err != nil { + cli.cancelResponse(id, respChan) + return nil, err + } + var resp *waBinary.Node + timeoutChan := make(<-chan time.Time, 1) + if timeout > 0 { + timeoutChan = time.After(timeout) + } + select { + case resp = <-respChan: + case <-ctx.Done(): + return nil, ctx.Err() + case <-timeoutChan: + // FIXME this error isn't technically correct (but works for now - the timeout param is only used from sendIQ) + return nil, ErrIQTimedOut + } + if isDisconnectNode(resp) { + cli.Log.Debugf("Retrying %s %s was interrupted by websocket disconnection (%v), not retrying anymore", reqType, id, resp.XMLString()) + return nil, &DisconnectedError{Action: fmt.Sprintf("%s (retry)", reqType), Node: resp} + } + return resp, nil +} |
