diff options
| author | Mistivia <i@mistivia.com> | 2025-11-02 15:27:18 +0800 |
|---|---|---|
| committer | Mistivia <i@mistivia.com> | 2025-11-02 15:27:18 +0800 |
| commit | e9c24f4af7ed56760f6db7941827d09f6db9020b (patch) | |
| tree | 62128c43b883ce5e3148113350978755779bb5de /teleirc/matterbridge/vendor/go.mau.fi/whatsmeow/client.go | |
| parent | 58d5e7cfda4781d8a57ec52aefd02983835c301a (diff) | |
add matterbridge
Diffstat (limited to 'teleirc/matterbridge/vendor/go.mau.fi/whatsmeow/client.go')
| -rw-r--r-- | teleirc/matterbridge/vendor/go.mau.fi/whatsmeow/client.go | 641 |
1 files changed, 641 insertions, 0 deletions
diff --git a/teleirc/matterbridge/vendor/go.mau.fi/whatsmeow/client.go b/teleirc/matterbridge/vendor/go.mau.fi/whatsmeow/client.go new file mode 100644 index 0000000..3f832c9 --- /dev/null +++ b/teleirc/matterbridge/vendor/go.mau.fi/whatsmeow/client.go @@ -0,0 +1,641 @@ +// Copyright (c) 2021 Tulir Asokan +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package whatsmeow implements a client for interacting with the WhatsApp web multidevice API. +package whatsmeow + +import ( + "context" + "crypto/rand" + "encoding/hex" + "errors" + "fmt" + "net/http" + "net/url" + "runtime/debug" + "sync" + "sync/atomic" + "time" + + "go.mau.fi/whatsmeow/appstate" + waBinary "go.mau.fi/whatsmeow/binary" + waProto "go.mau.fi/whatsmeow/binary/proto" + "go.mau.fi/whatsmeow/socket" + "go.mau.fi/whatsmeow/store" + "go.mau.fi/whatsmeow/types" + "go.mau.fi/whatsmeow/types/events" + "go.mau.fi/whatsmeow/util/keys" + waLog "go.mau.fi/whatsmeow/util/log" +) + +// EventHandler is a function that can handle events from WhatsApp. +type EventHandler func(evt interface{}) +type nodeHandler func(node *waBinary.Node) + +var nextHandlerID uint32 + +type wrappedEventHandler struct { + fn EventHandler + id uint32 +} + +// Client contains everything necessary to connect to and interact with the WhatsApp web API. +type Client struct { + Store *store.Device + Log waLog.Logger + recvLog waLog.Logger + sendLog waLog.Logger + + socket *socket.NoiseSocket + socketLock sync.RWMutex + socketWait chan struct{} + + isLoggedIn uint32 + expectedDisconnectVal uint32 + EnableAutoReconnect bool + LastSuccessfulConnect time.Time + AutoReconnectErrors int + + sendActiveReceipts uint32 + + // EmitAppStateEventsOnFullSync can be set to true if you want to get app state events emitted + // even when re-syncing the whole state. + EmitAppStateEventsOnFullSync bool + + appStateProc *appstate.Processor + appStateSyncLock sync.Mutex + + historySyncNotifications chan *waProto.HistorySyncNotification + historySyncHandlerStarted uint32 + + uploadPreKeysLock sync.Mutex + lastPreKeyUpload time.Time + + mediaConnCache *MediaConn + mediaConnLock sync.Mutex + + responseWaiters map[string]chan<- *waBinary.Node + responseWaitersLock sync.Mutex + + nodeHandlers map[string]nodeHandler + handlerQueue chan *waBinary.Node + eventHandlers []wrappedEventHandler + eventHandlersLock sync.RWMutex + + messageRetries map[string]int + messageRetriesLock sync.Mutex + + appStateKeyRequests map[string]time.Time + appStateKeyRequestsLock sync.RWMutex + + messageSendLock sync.Mutex + + privacySettingsCache atomic.Value + + groupParticipantsCache map[types.JID][]types.JID + groupParticipantsCacheLock sync.Mutex + userDevicesCache map[types.JID][]types.JID + userDevicesCacheLock sync.Mutex + + recentMessagesMap map[recentMessageKey]*waProto.Message + recentMessagesList [recentMessagesSize]recentMessageKey + recentMessagesPtr int + recentMessagesLock sync.RWMutex + + sessionRecreateHistory map[types.JID]time.Time + sessionRecreateHistoryLock sync.Mutex + // GetMessageForRetry is used to find the source message for handling retry receipts + // when the message is not found in the recently sent message cache. + GetMessageForRetry func(requester, to types.JID, id types.MessageID) *waProto.Message + // PreRetryCallback is called before a retry receipt is accepted. + // If it returns false, the accepting will be cancelled and the retry receipt will be ignored. + PreRetryCallback func(receipt *events.Receipt, id types.MessageID, retryCount int, msg *waProto.Message) bool + + // PrePairCallback is called before pairing is completed. If it returns false, the pairing will be cancelled and + // the client will disconnect. + PrePairCallback func(jid types.JID, platform, businessName string) bool + + // Should untrusted identity errors be handled automatically? If true, the stored identity and existing signal + // sessions will be removed on untrusted identity errors, and an events.IdentityChange will be dispatched. + // If false, decrypting a message from untrusted devices will fail. + AutoTrustIdentity bool + + // Should SubscribePresence return an error if no privacy token is stored for the user? + ErrorOnSubscribePresenceWithoutToken bool + + uniqueID string + idCounter uint32 + + proxy socket.Proxy + http *http.Client +} + +// Size of buffer for the channel that all incoming XML nodes go through. +// In general it shouldn't go past a few buffered messages, but the channel is big to be safe. +const handlerQueueSize = 2048 + +// NewClient initializes a new WhatsApp web client. +// +// The logger can be nil, it will default to a no-op logger. +// +// The device store must be set. A default SQL-backed implementation is available in the store/sqlstore package. +// +// container, err := sqlstore.New("sqlite3", "file:yoursqlitefile.db?_foreign_keys=on", nil) +// if err != nil { +// panic(err) +// } +// // If you want multiple sessions, remember their JIDs and use .GetDevice(jid) or .GetAllDevices() instead. +// deviceStore, err := container.GetFirstDevice() +// if err != nil { +// panic(err) +// } +// client := whatsmeow.NewClient(deviceStore, nil) +func NewClient(deviceStore *store.Device, log waLog.Logger) *Client { + if log == nil { + log = waLog.Noop + } + randomBytes := make([]byte, 2) + _, _ = rand.Read(randomBytes) + cli := &Client{ + http: &http.Client{ + Transport: (http.DefaultTransport.(*http.Transport)).Clone(), + }, + proxy: http.ProxyFromEnvironment, + Store: deviceStore, + Log: log, + recvLog: log.Sub("Recv"), + sendLog: log.Sub("Send"), + uniqueID: fmt.Sprintf("%d.%d-", randomBytes[0], randomBytes[1]), + responseWaiters: make(map[string]chan<- *waBinary.Node), + eventHandlers: make([]wrappedEventHandler, 0, 1), + messageRetries: make(map[string]int), + handlerQueue: make(chan *waBinary.Node, handlerQueueSize), + appStateProc: appstate.NewProcessor(deviceStore, log.Sub("AppState")), + socketWait: make(chan struct{}), + + historySyncNotifications: make(chan *waProto.HistorySyncNotification, 32), + + groupParticipantsCache: make(map[types.JID][]types.JID), + userDevicesCache: make(map[types.JID][]types.JID), + + recentMessagesMap: make(map[recentMessageKey]*waProto.Message, recentMessagesSize), + sessionRecreateHistory: make(map[types.JID]time.Time), + GetMessageForRetry: func(requester, to types.JID, id types.MessageID) *waProto.Message { return nil }, + appStateKeyRequests: make(map[string]time.Time), + + EnableAutoReconnect: true, + AutoTrustIdentity: true, + } + cli.nodeHandlers = map[string]nodeHandler{ + "message": cli.handleEncryptedMessage, + "receipt": cli.handleReceipt, + "call": cli.handleCallEvent, + "chatstate": cli.handleChatState, + "presence": cli.handlePresence, + "notification": cli.handleNotification, + "success": cli.handleConnectSuccess, + "failure": cli.handleConnectFailure, + "stream:error": cli.handleStreamError, + "iq": cli.handleIQ, + "ib": cli.handleIB, + // Apparently there's also an <error> node which can have a code=479 and means "Invalid stanza sent (smax-invalid)" + } + return cli +} + +// SetProxyAddress is a helper method that parses a URL string and calls SetProxy. +// +// Returns an error if url.Parse fails to parse the given address. +func (cli *Client) SetProxyAddress(addr string) error { + parsed, err := url.Parse(addr) + if err != nil { + return err + } + cli.SetProxy(http.ProxyURL(parsed)) + return nil +} + +// SetProxy sets the proxy to use for WhatsApp web websocket connections and media uploads/downloads. +// +// Must be called before Connect() to take effect in the websocket connection. +// If you want to change the proxy after connecting, you must call Disconnect() and then Connect() again manually. +// +// By default, the client will find the proxy from the https_proxy environment variable like Go's net/http does. +// +// To disable reading proxy info from environment variables, explicitly set the proxy to nil: +// +// cli.SetProxy(nil) +// +// To use a different proxy for the websocket and media, pass a function that checks the request path or headers: +// +// cli.SetProxy(func(r *http.Request) (*url.URL, error) { +// if r.URL.Host == "web.whatsapp.com" && r.URL.Path == "/ws/chat" { +// return websocketProxyURL, nil +// } else { +// return mediaProxyURL, nil +// } +// }) +func (cli *Client) SetProxy(proxy socket.Proxy) { + cli.proxy = proxy + cli.http.Transport.(*http.Transport).Proxy = proxy +} + +func (cli *Client) getSocketWaitChan() <-chan struct{} { + cli.socketLock.RLock() + ch := cli.socketWait + cli.socketLock.RUnlock() + return ch +} + +func (cli *Client) closeSocketWaitChan() { + cli.socketLock.Lock() + close(cli.socketWait) + cli.socketWait = make(chan struct{}) + cli.socketLock.Unlock() +} + +func (cli *Client) getOwnID() types.JID { + id := cli.Store.ID + if id == nil { + return types.EmptyJID + } + return *id +} + +func (cli *Client) WaitForConnection(timeout time.Duration) bool { + timeoutChan := time.After(timeout) + cli.socketLock.RLock() + for cli.socket == nil || !cli.socket.IsConnected() || !cli.IsLoggedIn() { + ch := cli.socketWait + cli.socketLock.RUnlock() + select { + case <-ch: + case <-timeoutChan: + return false + } + cli.socketLock.RLock() + } + cli.socketLock.RUnlock() + return true +} + +// Connect connects the client to the WhatsApp web websocket. After connection, it will either +// authenticate if there's data in the device store, or emit a QREvent to set up a new link. +func (cli *Client) Connect() error { + cli.socketLock.Lock() + defer cli.socketLock.Unlock() + if cli.socket != nil { + if !cli.socket.IsConnected() { + cli.unlockedDisconnect() + } else { + return ErrAlreadyConnected + } + } + + cli.resetExpectedDisconnect() + fs := socket.NewFrameSocket(cli.Log.Sub("Socket"), socket.WAConnHeader, cli.proxy) + if err := fs.Connect(); err != nil { + fs.Close(0) + return err + } else if err = cli.doHandshake(fs, *keys.NewKeyPair()); err != nil { + fs.Close(0) + return fmt.Errorf("noise handshake failed: %w", err) + } + go cli.keepAliveLoop(cli.socket.Context()) + go cli.handlerQueueLoop(cli.socket.Context()) + return nil +} + +// IsLoggedIn returns true after the client is successfully connected and authenticated on WhatsApp. +func (cli *Client) IsLoggedIn() bool { + return atomic.LoadUint32(&cli.isLoggedIn) == 1 +} + +func (cli *Client) onDisconnect(ns *socket.NoiseSocket, remote bool) { + ns.Stop(false) + cli.socketLock.Lock() + defer cli.socketLock.Unlock() + if cli.socket == ns { + cli.socket = nil + cli.clearResponseWaiters(xmlStreamEndNode) + if !cli.isExpectedDisconnect() && remote { + cli.Log.Debugf("Emitting Disconnected event") + go cli.dispatchEvent(&events.Disconnected{}) + go cli.autoReconnect() + } else if remote { + cli.Log.Debugf("OnDisconnect() called, but it was expected, so not emitting event") + } else { + cli.Log.Debugf("OnDisconnect() called after manual disconnection") + } + } else { + cli.Log.Debugf("Ignoring OnDisconnect on different socket") + } +} + +func (cli *Client) expectDisconnect() { + atomic.StoreUint32(&cli.expectedDisconnectVal, 1) +} + +func (cli *Client) resetExpectedDisconnect() { + atomic.StoreUint32(&cli.expectedDisconnectVal, 0) +} + +func (cli *Client) isExpectedDisconnect() bool { + return atomic.LoadUint32(&cli.expectedDisconnectVal) == 1 +} + +func (cli *Client) autoReconnect() { + if !cli.EnableAutoReconnect || cli.Store.ID == nil { + return + } + for { + autoReconnectDelay := time.Duration(cli.AutoReconnectErrors) * 2 * time.Second + cli.Log.Debugf("Automatically reconnecting after %v", autoReconnectDelay) + cli.AutoReconnectErrors++ + time.Sleep(autoReconnectDelay) + err := cli.Connect() + if errors.Is(err, ErrAlreadyConnected) { + cli.Log.Debugf("Connect() said we're already connected after autoreconnect sleep") + return + } else if err != nil { + cli.Log.Errorf("Error reconnecting after autoreconnect sleep: %v", err) + } else { + return + } + } +} + +// IsConnected checks if the client is connected to the WhatsApp web websocket. +// Note that this doesn't check if the client is authenticated. See the IsLoggedIn field for that. +func (cli *Client) IsConnected() bool { + cli.socketLock.RLock() + connected := cli.socket != nil && cli.socket.IsConnected() + cli.socketLock.RUnlock() + return connected +} + +// Disconnect disconnects from the WhatsApp web websocket. +// +// This will not emit any events, the Disconnected event is only used when the +// connection is closed by the server or a network error. +func (cli *Client) Disconnect() { + if cli.socket == nil { + return + } + cli.socketLock.Lock() + cli.unlockedDisconnect() + cli.socketLock.Unlock() +} + +// Disconnect closes the websocket connection. +func (cli *Client) unlockedDisconnect() { + if cli.socket != nil { + cli.socket.Stop(true) + cli.socket = nil + cli.clearResponseWaiters(xmlStreamEndNode) + } +} + +// Logout sends a request to unlink the device, then disconnects from the websocket and deletes the local device store. +// +// If the logout request fails, the disconnection and local data deletion will not happen either. +// If an error is returned, but you want to force disconnect/clear data, call Client.Disconnect() and Client.Store.Delete() manually. +// +// Note that this will not emit any events. The LoggedOut event is only used for external logouts +// (triggered by the user from the main device or by WhatsApp servers). +func (cli *Client) Logout() error { + ownID := cli.getOwnID() + if ownID.IsEmpty() { + return ErrNotLoggedIn + } + _, err := cli.sendIQ(infoQuery{ + Namespace: "md", + Type: "set", + To: types.ServerJID, + Content: []waBinary.Node{{ + Tag: "remove-companion-device", + Attrs: waBinary.Attrs{ + "jid": ownID, + "reason": "user_initiated", + }, + }}, + }) + if err != nil { + return fmt.Errorf("error sending logout request: %w", err) + } + cli.Disconnect() + err = cli.Store.Delete() + if err != nil { + return fmt.Errorf("error deleting data from store: %w", err) + } + return nil +} + +// AddEventHandler registers a new function to receive all events emitted by this client. +// +// The returned integer is the event handler ID, which can be passed to RemoveEventHandler to remove it. +// +// All registered event handlers will receive all events. You should use a type switch statement to +// filter the events you want: +// +// func myEventHandler(evt interface{}) { +// switch v := evt.(type) { +// case *events.Message: +// fmt.Println("Received a message!") +// case *events.Receipt: +// fmt.Println("Received a receipt!") +// } +// } +// +// If you want to access the Client instance inside the event handler, the recommended way is to +// wrap the whole handler in another struct: +// +// type MyClient struct { +// WAClient *whatsmeow.Client +// eventHandlerID uint32 +// } +// +// func (mycli *MyClient) register() { +// mycli.eventHandlerID = mycli.WAClient.AddEventHandler(mycli.myEventHandler) +// } +// +// func (mycli *MyClient) myEventHandler(evt interface{}) { +// // Handle event and access mycli.WAClient +// } +func (cli *Client) AddEventHandler(handler EventHandler) uint32 { + nextID := atomic.AddUint32(&nextHandlerID, 1) + cli.eventHandlersLock.Lock() + cli.eventHandlers = append(cli.eventHandlers, wrappedEventHandler{handler, nextID}) + cli.eventHandlersLock.Unlock() + return nextID +} + +// RemoveEventHandler removes a previously registered event handler function. +// If the function with the given ID is found, this returns true. +// +// N.B. Do not run this directly from an event handler. That would cause a deadlock because the +// event dispatcher holds a read lock on the event handler list, and this method wants a write lock +// on the same list. Instead run it in a goroutine: +// +// func (mycli *MyClient) myEventHandler(evt interface{}) { +// if noLongerWantEvents { +// go mycli.WAClient.RemoveEventHandler(mycli.eventHandlerID) +// } +// } +func (cli *Client) RemoveEventHandler(id uint32) bool { + cli.eventHandlersLock.Lock() + defer cli.eventHandlersLock.Unlock() + for index := range cli.eventHandlers { + if cli.eventHandlers[index].id == id { + if index == 0 { + cli.eventHandlers[0].fn = nil + cli.eventHandlers = cli.eventHandlers[1:] + return true + } else if index < len(cli.eventHandlers)-1 { + copy(cli.eventHandlers[index:], cli.eventHandlers[index+1:]) + } + cli.eventHandlers[len(cli.eventHandlers)-1].fn = nil + cli.eventHandlers = cli.eventHandlers[:len(cli.eventHandlers)-1] + return true + } + } + return false +} + +// RemoveEventHandlers removes all event handlers that have been registered with AddEventHandler +func (cli *Client) RemoveEventHandlers() { + cli.eventHandlersLock.Lock() + cli.eventHandlers = make([]wrappedEventHandler, 0, 1) + cli.eventHandlersLock.Unlock() +} + +func (cli *Client) handleFrame(data []byte) { + decompressed, err := waBinary.Unpack(data) + if err != nil { + cli.Log.Warnf("Failed to decompress frame: %v", err) + cli.Log.Debugf("Errored frame hex: %s", hex.EncodeToString(data)) + return + } + node, err := waBinary.Unmarshal(decompressed) + if err != nil { + cli.Log.Warnf("Failed to decode node in frame: %v", err) + cli.Log.Debugf("Errored frame hex: %s", hex.EncodeToString(decompressed)) + return + } + cli.recvLog.Debugf("%s", node.XMLString()) + if node.Tag == "xmlstreamend" { + if !cli.isExpectedDisconnect() { + cli.Log.Warnf("Received stream end frame") + } + // TODO should we do something else? + } else if cli.receiveResponse(node) { + // handled + } else if _, ok := cli.nodeHandlers[node.Tag]; ok { + select { + case cli.handlerQueue <- node: + default: + cli.Log.Warnf("Handler queue is full, message ordering is no longer guaranteed") + go func() { + cli.handlerQueue <- node + }() + } + } else { + cli.Log.Debugf("Didn't handle WhatsApp node %s", node.Tag) + } +} + +func (cli *Client) handlerQueueLoop(ctx context.Context) { + for { + select { + case node := <-cli.handlerQueue: + cli.nodeHandlers[node.Tag](node) + case <-ctx.Done(): + return + } + } +} + +func (cli *Client) sendNodeAndGetData(node waBinary.Node) ([]byte, error) { + cli.socketLock.RLock() + sock := cli.socket + cli.socketLock.RUnlock() + if sock == nil { + return nil, ErrNotConnected + } + + payload, err := waBinary.Marshal(node) + if err != nil { + return nil, fmt.Errorf("failed to marshal node: %w", err) + } + + cli.sendLog.Debugf("%s", node.XMLString()) + return payload, sock.SendFrame(payload) +} + +func (cli *Client) sendNode(node waBinary.Node) error { + _, err := cli.sendNodeAndGetData(node) + return err +} + +func (cli *Client) dispatchEvent(evt interface{}) { + cli.eventHandlersLock.RLock() + defer func() { + cli.eventHandlersLock.RUnlock() + err := recover() + if err != nil { + cli.Log.Errorf("Event handler panicked while handling a %T: %v\n%s", evt, err, debug.Stack()) + } + }() + for _, handler := range cli.eventHandlers { + handler.fn(evt) + } +} + +// ParseWebMessage parses a WebMessageInfo object into *events.Message to match what real-time messages have. +// +// The chat JID can be found in the Conversation data: +// +// chatJID, err := types.ParseJID(conv.GetId()) +// for _, historyMsg := range conv.GetMessages() { +// evt, err := cli.ParseWebMessage(chatJID, historyMsg.GetMessage()) +// yourNormalEventHandler(evt) +// } +func (cli *Client) ParseWebMessage(chatJID types.JID, webMsg *waProto.WebMessageInfo) (*events.Message, error) { + info := types.MessageInfo{ + MessageSource: types.MessageSource{ + Chat: chatJID, + IsFromMe: webMsg.GetKey().GetFromMe(), + IsGroup: chatJID.Server == types.GroupServer, + }, + ID: webMsg.GetKey().GetId(), + PushName: webMsg.GetPushName(), + Timestamp: time.Unix(int64(webMsg.GetMessageTimestamp()), 0), + } + var err error + if info.IsFromMe { + info.Sender = cli.getOwnID().ToNonAD() + if info.Sender.IsEmpty() { + return nil, ErrNotLoggedIn + } + } else if chatJID.Server == types.DefaultUserServer { + info.Sender = chatJID + } else if webMsg.GetParticipant() != "" { + info.Sender, err = types.ParseJID(webMsg.GetParticipant()) + } else if webMsg.GetKey().GetParticipant() != "" { + info.Sender, err = types.ParseJID(webMsg.GetKey().GetParticipant()) + } else { + return nil, fmt.Errorf("couldn't find sender of message %s", info.ID) + } + if err != nil { + return nil, fmt.Errorf("failed to parse sender of message %s: %v", info.ID, err) + } + evt := &events.Message{ + RawMessage: webMsg.GetMessage(), + Info: info, + } + evt.UnwrapRaw() + return evt, nil +} |
