summaryrefslogtreecommitdiff
path: root/teleirc/matterbridge/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
diff options
context:
space:
mode:
Diffstat (limited to 'teleirc/matterbridge/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go')
-rw-r--r--teleirc/matterbridge/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go633
1 files changed, 633 insertions, 0 deletions
diff --git a/teleirc/matterbridge/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go b/teleirc/matterbridge/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
new file mode 100644
index 0000000..b524810
--- /dev/null
+++ b/teleirc/matterbridge/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
@@ -0,0 +1,633 @@
+package kbchat
+
+import (
+ "bufio"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "math"
+ "os"
+ "os/exec"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/keybase/go-keybase-chat-bot/kbchat/types/chat1"
+ "github.com/keybase/go-keybase-chat-bot/kbchat/types/keybase1"
+ "github.com/keybase/go-keybase-chat-bot/kbchat/types/stellar1"
+)
+
+// SubscriptionMessage contains a message and conversation object
+type SubscriptionMessage struct {
+ Message chat1.MsgSummary
+ Conversation chat1.ConvSummary
+}
+
+type SubscriptionConversation struct {
+ Conversation chat1.ConvSummary
+}
+
+type SubscriptionWalletEvent struct {
+ Payment stellar1.PaymentDetailsLocal
+}
+
+// Subscription has methods to control the background message fetcher loop
+type Subscription struct {
+ *DebugOutput
+ sync.Mutex
+
+ newMsgsCh chan SubscriptionMessage
+ newConvsCh chan SubscriptionConversation
+ newWalletCh chan SubscriptionWalletEvent
+ errorCh chan error
+ running bool
+ shutdownCh chan struct{}
+}
+
+func NewSubscription() *Subscription {
+ newMsgsCh := make(chan SubscriptionMessage, 250)
+ newConvsCh := make(chan SubscriptionConversation, 250)
+ newWalletCh := make(chan SubscriptionWalletEvent, 250)
+ errorCh := make(chan error, 250)
+ shutdownCh := make(chan struct{})
+ return &Subscription{
+ DebugOutput: NewDebugOutput("Subscription"),
+ newMsgsCh: newMsgsCh,
+ newConvsCh: newConvsCh,
+ newWalletCh: newWalletCh,
+ shutdownCh: shutdownCh,
+ errorCh: errorCh,
+ running: true,
+ }
+}
+
+// Read blocks until a new message arrives
+func (m *Subscription) Read() (msg SubscriptionMessage, err error) {
+ defer m.Trace(&err, "Read")()
+ select {
+ case msg = <-m.newMsgsCh:
+ return msg, nil
+ case err = <-m.errorCh:
+ return SubscriptionMessage{}, err
+ case <-m.shutdownCh:
+ return SubscriptionMessage{}, errors.New("Subscription shutdown")
+ }
+}
+
+func (m *Subscription) ReadNewConvs() (conv SubscriptionConversation, err error) {
+ defer m.Trace(&err, "ReadNewConvs")()
+ select {
+ case conv = <-m.newConvsCh:
+ return conv, nil
+ case err = <-m.errorCh:
+ return SubscriptionConversation{}, err
+ case <-m.shutdownCh:
+ return SubscriptionConversation{}, errors.New("Subscription shutdown")
+ }
+}
+
+// Read blocks until a new message arrives
+func (m *Subscription) ReadWallet() (msg SubscriptionWalletEvent, err error) {
+ defer m.Trace(&err, "ReadWallet")()
+ select {
+ case msg = <-m.newWalletCh:
+ return msg, nil
+ case err = <-m.errorCh:
+ return SubscriptionWalletEvent{}, err
+ case <-m.shutdownCh:
+ return SubscriptionWalletEvent{}, errors.New("Subscription shutdown")
+ }
+}
+
+// Shutdown terminates the background process
+func (m *Subscription) Shutdown() {
+ defer m.Trace(nil, "Shutdown")()
+ m.Lock()
+ defer m.Unlock()
+ if m.running {
+ close(m.shutdownCh)
+ m.running = false
+ }
+}
+
+type ListenOptions struct {
+ Wallet bool
+ Convs bool
+}
+
+type PaymentHolder struct {
+ Payment stellar1.PaymentDetailsLocal `json:"notification"`
+}
+
+type TypeHolder struct {
+ Type string `json:"type"`
+}
+
+type OneshotOptions struct {
+ Username string
+ PaperKey string
+}
+
+type RunOptions struct {
+ KeybaseLocation string
+ HomeDir string
+ Oneshot *OneshotOptions
+ StartService bool
+ // Have the bot send/receive typing notifications
+ EnableTyping bool
+ // Disable bot lite mode
+ DisableBotLiteMode bool
+ // Number of processes to spin up to connect to the keybase service
+ NumPipes int
+}
+
+func (r RunOptions) Location() string {
+ if r.KeybaseLocation == "" {
+ return "keybase"
+ }
+ return r.KeybaseLocation
+}
+
+func (r RunOptions) Command(args ...string) *exec.Cmd {
+ var cmd []string
+ if r.HomeDir != "" {
+ cmd = append(cmd, "--home", r.HomeDir)
+ }
+ cmd = append(cmd, args...)
+ return exec.Command(r.Location(), cmd...)
+}
+
+// Start fires up the Keybase JSON API in stdin/stdout mode
+func Start(runOpts RunOptions, opts ...func(*API)) (*API, error) {
+ api := NewAPI(runOpts, opts...)
+ if err := api.startPipes(); err != nil {
+ return nil, err
+ }
+ return api, nil
+}
+
+type apiPipe struct {
+ sync.Mutex
+ input io.Writer
+ output *bufio.Reader
+ cmd *exec.Cmd
+}
+
+// API is the main object used for communicating with the Keybase JSON API
+type API struct {
+ sync.Mutex
+ *DebugOutput
+ // Round robin hand out API pipes to allow concurrent API requests.
+ pipeIdx int
+ pipes []*apiPipe
+ username string
+ runOpts RunOptions
+ subscriptions []*Subscription
+ Timeout time.Duration
+ LogSendBytes int
+}
+
+func CustomTimeout(timeout time.Duration) func(*API) {
+ return func(a *API) {
+ a.Timeout = timeout
+ }
+}
+
+func NewAPI(runOpts RunOptions, opts ...func(*API)) *API {
+ api := &API{
+ DebugOutput: NewDebugOutput("API"),
+ runOpts: runOpts,
+ Timeout: 5 * time.Second,
+ LogSendBytes: 1024 * 1024 * 5, // request 5MB so we don't get killed
+ }
+ for _, opt := range opts {
+ opt(api)
+ }
+ return api
+}
+
+func (a *API) Command(args ...string) *exec.Cmd {
+ return a.runOpts.Command(args...)
+}
+
+func (a *API) getUsername(runOpts RunOptions) (username string, err error) {
+ p := runOpts.Command("whoami", "-json")
+ output, err := p.StdoutPipe()
+ if err != nil {
+ return "", err
+ }
+ if runtime.GOOS != "windows" {
+ p.ExtraFiles = []*os.File{output.(*os.File)}
+ }
+ if err = p.Start(); err != nil {
+ return "", err
+ }
+
+ doneCh := make(chan error)
+ go func() {
+ defer func() { close(doneCh) }()
+ statusJSON, err := io.ReadAll(output)
+ if err != nil {
+ doneCh <- fmt.Errorf("error reading whoami output: %v", err)
+ return
+ }
+ var status keybase1.CurrentStatus
+ if err := json.Unmarshal(statusJSON, &status); err != nil {
+ doneCh <- fmt.Errorf("invalid whoami JSON %q: %v", statusJSON, err)
+ return
+ }
+ if status.LoggedIn && status.User != nil {
+ username = status.User.Username
+ doneCh <- nil
+ } else {
+ doneCh <- fmt.Errorf("unable to authenticate to keybase service: logged in: %v user: %+v", status.LoggedIn, status.User)
+ }
+ // Cleanup the command
+ if err := p.Wait(); err != nil {
+ a.Debug("unable to wait for cmd: %v", err)
+ }
+ }()
+
+ select {
+ case err = <-doneCh:
+ if err != nil {
+ return "", err
+ }
+ case <-time.After(a.Timeout):
+ return "", errors.New("unable to run Keybase command")
+ }
+
+ return username, nil
+}
+
+func (a *API) auth() (string, error) {
+ username, err := a.getUsername(a.runOpts)
+ if err == nil {
+ return username, nil
+ }
+ if a.runOpts.Oneshot == nil {
+ return "", err
+ }
+ username = ""
+ // If a paper key is specified, then login with oneshot mode (logout first)
+ if a.runOpts.Oneshot != nil {
+ if username == a.runOpts.Oneshot.Username {
+ // just get out if we are on the desired user already
+ return username, nil
+ }
+ if err := a.runOpts.Command("logout", "-f").Run(); err != nil {
+ return "", err
+ }
+ if err := a.runOpts.Command("oneshot", "--username", a.runOpts.Oneshot.Username, "--paperkey",
+ a.runOpts.Oneshot.PaperKey).Run(); err != nil {
+ return "", err
+ }
+ username = a.runOpts.Oneshot.Username
+ return username, nil
+ }
+ return "", errors.New("unable to auth")
+}
+
+func (a *API) startPipes() (err error) {
+ a.Lock()
+ defer a.Unlock()
+ for _, pipe := range a.pipes {
+ if pipe.cmd != nil {
+ if err := pipe.cmd.Process.Kill(); err != nil {
+ return fmt.Errorf("unable to kill previous API command %v", err)
+ }
+ }
+ pipe.cmd = nil
+ }
+ a.pipes = nil
+
+ if a.runOpts.StartService {
+ args := []string{fmt.Sprintf("-enable-bot-lite-mode=%v", a.runOpts.DisableBotLiteMode), "service"}
+ if err := a.runOpts.Command(args...).Start(); err != nil {
+ return fmt.Errorf("unable to start service %v", err)
+ }
+ }
+
+ if a.username, err = a.auth(); err != nil {
+ return fmt.Errorf("unable to auth: %v", err)
+ }
+
+ cmd := a.runOpts.Command("chat", "notification-settings", fmt.Sprintf("-disable-typing=%v", !a.runOpts.EnableTyping))
+ if err = cmd.Run(); err != nil {
+ // This is a performance optimization but isn't a fatal error.
+ a.Debug("unable to set notifiation settings %v", err)
+ }
+
+ // Startup NumPipes processes to the keybase chat api
+ for i := 0; i < int(math.Max(float64(a.runOpts.NumPipes), 1)); i++ {
+ pipe := apiPipe{}
+ pipe.cmd = a.runOpts.Command("chat", "api")
+ if pipe.input, err = pipe.cmd.StdinPipe(); err != nil {
+ return fmt.Errorf("unable to get api stdin: %v", err)
+ }
+ output, err := pipe.cmd.StdoutPipe()
+ if err != nil {
+ return fmt.Errorf("unable to get api stdout: %v", err)
+ }
+ if runtime.GOOS != "windows" {
+ pipe.cmd.ExtraFiles = []*os.File{output.(*os.File)}
+ }
+ if err := pipe.cmd.Start(); err != nil {
+ return fmt.Errorf("unable to run chat api cmd: %v", err)
+ }
+ pipe.output = bufio.NewReader(output)
+ a.pipes = append(a.pipes, &pipe)
+ }
+ return nil
+}
+
+func (a *API) getAPIPipes() (*apiPipe, error) {
+ a.Lock()
+ defer a.Unlock()
+ idx := a.pipeIdx % len(a.pipes)
+ a.pipeIdx++
+ pipe := a.pipes[idx]
+ if pipe.cmd == nil {
+ return nil, errAPIDisconnected
+ }
+ return pipe, nil
+}
+
+func (a *API) GetUsername() string {
+ return a.username
+}
+
+func (a *API) doSend(arg interface{}) (resp SendResponse, err error) {
+ bArg, err := json.Marshal(arg)
+ if err != nil {
+ return SendResponse{}, fmt.Errorf("unable to send arg: %+v: %v", arg, err)
+ }
+ pipe, err := a.getAPIPipes()
+ if err != nil {
+ return SendResponse{}, err
+ }
+ pipe.Lock()
+ defer pipe.Unlock()
+
+ if _, err := io.WriteString(pipe.input, string(bArg)); err != nil {
+ return SendResponse{}, err
+ }
+ responseRaw, err := pipe.output.ReadBytes('\n')
+ if err != nil {
+ return SendResponse{}, err
+ }
+ if err := json.Unmarshal(responseRaw, &resp); err != nil {
+ return resp, fmt.Errorf("failed to decode API response: %v %v", responseRaw, err)
+ } else if resp.Error != nil {
+ return resp, errors.New(resp.Error.Message)
+ }
+ return resp, nil
+}
+
+func (a *API) doFetch(apiInput string) ([]byte, error) {
+ pipe, err := a.getAPIPipes()
+ if err != nil {
+ return nil, err
+ }
+ pipe.Lock()
+ defer pipe.Unlock()
+
+ if _, err := io.WriteString(pipe.input, apiInput); err != nil {
+ return nil, err
+ }
+ byteOutput, err := pipe.output.ReadBytes('\n')
+ if err != nil {
+ return nil, err
+ }
+
+ return byteOutput, nil
+}
+
+// ListenForNewTextMessages proxies to Listen without wallet events
+func (a *API) ListenForNewTextMessages() (*Subscription, error) {
+ opts := ListenOptions{Wallet: false}
+ return a.Listen(opts)
+}
+
+func (a *API) registerSubscription(sub *Subscription) {
+ a.Lock()
+ defer a.Unlock()
+ a.subscriptions = append(a.subscriptions, sub)
+}
+
+// Listen fires of a background loop and puts chat messages and wallet
+// events into channels
+func (a *API) Listen(opts ListenOptions) (*Subscription, error) {
+ done := make(chan struct{})
+ sub := NewSubscription()
+ a.registerSubscription(sub)
+ pause := 2 * time.Second
+ readScanner := func(boutput *bufio.Scanner) {
+ defer func() { done <- struct{}{} }()
+ for {
+ select {
+ case <-sub.shutdownCh:
+ a.Debug("readScanner: received shutdown")
+ return
+ default:
+ }
+ boutput.Scan()
+ t := boutput.Text()
+ submitErr := func(err error) {
+ if len(sub.errorCh)*2 > cap(sub.errorCh) {
+ a.Debug("large errorCh queue: len: %d cap: %d ", len(sub.errorCh), cap(sub.errorCh))
+ }
+ sub.errorCh <- err
+ }
+ var typeHolder TypeHolder
+ if err := json.Unmarshal([]byte(t), &typeHolder); err != nil {
+ submitErr(fmt.Errorf("err: %v, data: %v", err, t))
+ break
+ }
+ switch typeHolder.Type {
+ case "chat":
+ var notification chat1.MsgNotification
+ if err := json.Unmarshal([]byte(t), &notification); err != nil {
+ submitErr(fmt.Errorf("err: %v, data: %v", err, t))
+ break
+ }
+ if notification.Error != nil {
+ a.Debug("error message received: %s", *notification.Error)
+ } else if notification.Msg != nil {
+ subscriptionMessage := SubscriptionMessage{
+ Message: *notification.Msg,
+ Conversation: chat1.ConvSummary{
+ Id: notification.Msg.ConvID,
+ Channel: notification.Msg.Channel,
+ },
+ }
+ if len(sub.newMsgsCh)*2 > cap(sub.newMsgsCh) {
+ a.Debug("large newMsgsCh queue: len: %d cap: %d ", len(sub.newMsgsCh), cap(sub.newMsgsCh))
+ }
+ sub.newMsgsCh <- subscriptionMessage
+ }
+ case "chat_conv":
+ var notification chat1.ConvNotification
+ if err := json.Unmarshal([]byte(t), &notification); err != nil {
+ submitErr(fmt.Errorf("err: %v, data: %v", err, t))
+ break
+ }
+ if notification.Error != nil {
+ a.Debug("error message received: %s", *notification.Error)
+ } else if notification.Conv != nil {
+ subscriptionConv := SubscriptionConversation{
+ Conversation: *notification.Conv,
+ }
+ if len(sub.newConvsCh)*2 > cap(sub.newConvsCh) {
+ a.Debug("large newConvsCh queue: len: %d cap: %d ", len(sub.newConvsCh), cap(sub.newConvsCh))
+ }
+ sub.newConvsCh <- subscriptionConv
+ }
+ case "wallet":
+ var holder PaymentHolder
+ if err := json.Unmarshal([]byte(t), &holder); err != nil {
+ submitErr(fmt.Errorf("err: %v, data: %v", err, t))
+ break
+ }
+ subscriptionPayment := SubscriptionWalletEvent(holder)
+ if len(sub.newWalletCh)*2 > cap(sub.newWalletCh) {
+ a.Debug("large newWalletCh queue: len: %d cap: %d ", len(sub.newWalletCh), cap(sub.newWalletCh))
+ }
+ sub.newWalletCh <- subscriptionPayment
+ default:
+ continue
+ }
+ }
+ }
+
+ attempts := 0
+ maxAttempts := 30
+ go func() {
+ defer func() {
+ close(sub.newMsgsCh)
+ close(sub.newConvsCh)
+ close(sub.newWalletCh)
+ close(sub.errorCh)
+ }()
+ for {
+ select {
+ case <-sub.shutdownCh:
+ a.Debug("Listen: received shutdown")
+ return
+ default:
+ }
+
+ if attempts >= maxAttempts {
+ if err := a.LogSend("Listen: failed to auth, giving up"); err != nil {
+ a.Debug("Listen: logsend failed to send: %v", err)
+ }
+ panic("Listen: failed to auth, giving up")
+ }
+ attempts++
+ if _, err := a.auth(); err != nil {
+ a.Debug("Listen: failed to auth: %s", err)
+ time.Sleep(pause)
+ continue
+ }
+ cmdElements := []string{"chat", "api-listen"}
+ if opts.Wallet {
+ cmdElements = append(cmdElements, "--wallet")
+ }
+ if opts.Convs {
+ cmdElements = append(cmdElements, "--convs")
+ }
+ p := a.runOpts.Command(cmdElements...)
+ output, err := p.StdoutPipe()
+ if err != nil {
+ a.Debug("Listen: failed to listen: %s", err)
+ time.Sleep(pause)
+ continue
+ }
+ stderr, err := p.StderrPipe()
+ if err != nil {
+ a.Debug("Listen: failed to listen to stderr: %s", err)
+ time.Sleep(pause)
+ continue
+ }
+ if runtime.GOOS != "windows" {
+ p.ExtraFiles = []*os.File{stderr.(*os.File), output.(*os.File)}
+ }
+ boutput := bufio.NewScanner(output)
+ if err := p.Start(); err != nil {
+ a.Debug("Listen: failed to make listen scanner: %s", err)
+ time.Sleep(pause)
+ continue
+ }
+ attempts = 0
+ go readScanner(boutput)
+ select {
+ case <-sub.shutdownCh:
+ a.Debug("Listen: received shutdown")
+ return
+ case <-done:
+ }
+ if err := p.Wait(); err != nil {
+ stderrBytes, rerr := io.ReadAll(stderr)
+ if rerr != nil {
+ stderrBytes = []byte(fmt.Sprintf("failed to get stderr: %v", rerr))
+ }
+ a.Debug("Listen: failed to Wait for command, restarting pipes: %s (```%s```)", err, stderrBytes)
+ if err := a.startPipes(); err != nil {
+ a.Debug("Listen: failed to restart pipes: %v", err)
+ }
+ }
+ time.Sleep(pause)
+ }
+ }()
+ return sub, nil
+}
+
+func (a *API) LogSend(feedback string) error {
+ feedback = "go-keybase-chat-bot log send\n" +
+ "username: " + a.GetUsername() + "\n" +
+ feedback
+
+ args := []string{
+ "log", "send",
+ "--no-confirm",
+ "--feedback", feedback,
+ "-n", fmt.Sprintf("%d", a.LogSendBytes),
+ }
+ return a.runOpts.Command(args...).Run()
+}
+
+func (a *API) Shutdown() (err error) {
+ defer a.Trace(&err, "Shutdown")()
+ a.Lock()
+ defer a.Unlock()
+ for _, sub := range a.subscriptions {
+ sub.Shutdown()
+ }
+ for _, pipe := range a.pipes {
+ if pipe.cmd != nil {
+ a.Debug("waiting for API command")
+ if err := pipe.cmd.Wait(); err != nil {
+ return err
+ }
+ }
+ }
+
+ if a.runOpts.Oneshot != nil {
+ a.Debug("logging out")
+ err := a.runOpts.Command("logout", "--force").Run()
+ if err != nil {
+ return err
+ }
+ }
+
+ if a.runOpts.StartService {
+ a.Debug("stopping service")
+ err := a.runOpts.Command("ctl", "stop", "--shutdown").Run()
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}