diff options
Diffstat (limited to 'teleirc/matterbridge/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go')
| -rw-r--r-- | teleirc/matterbridge/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go | 633 |
1 files changed, 633 insertions, 0 deletions
diff --git a/teleirc/matterbridge/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go b/teleirc/matterbridge/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go new file mode 100644 index 0000000..b524810 --- /dev/null +++ b/teleirc/matterbridge/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go @@ -0,0 +1,633 @@ +package kbchat + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "math" + "os" + "os/exec" + "runtime" + "sync" + "time" + + "github.com/keybase/go-keybase-chat-bot/kbchat/types/chat1" + "github.com/keybase/go-keybase-chat-bot/kbchat/types/keybase1" + "github.com/keybase/go-keybase-chat-bot/kbchat/types/stellar1" +) + +// SubscriptionMessage contains a message and conversation object +type SubscriptionMessage struct { + Message chat1.MsgSummary + Conversation chat1.ConvSummary +} + +type SubscriptionConversation struct { + Conversation chat1.ConvSummary +} + +type SubscriptionWalletEvent struct { + Payment stellar1.PaymentDetailsLocal +} + +// Subscription has methods to control the background message fetcher loop +type Subscription struct { + *DebugOutput + sync.Mutex + + newMsgsCh chan SubscriptionMessage + newConvsCh chan SubscriptionConversation + newWalletCh chan SubscriptionWalletEvent + errorCh chan error + running bool + shutdownCh chan struct{} +} + +func NewSubscription() *Subscription { + newMsgsCh := make(chan SubscriptionMessage, 250) + newConvsCh := make(chan SubscriptionConversation, 250) + newWalletCh := make(chan SubscriptionWalletEvent, 250) + errorCh := make(chan error, 250) + shutdownCh := make(chan struct{}) + return &Subscription{ + DebugOutput: NewDebugOutput("Subscription"), + newMsgsCh: newMsgsCh, + newConvsCh: newConvsCh, + newWalletCh: newWalletCh, + shutdownCh: shutdownCh, + errorCh: errorCh, + running: true, + } +} + +// Read blocks until a new message arrives +func (m *Subscription) Read() (msg SubscriptionMessage, err error) { + defer m.Trace(&err, "Read")() + select { + case msg = <-m.newMsgsCh: + return msg, nil + case err = <-m.errorCh: + return SubscriptionMessage{}, err + case <-m.shutdownCh: + return SubscriptionMessage{}, errors.New("Subscription shutdown") + } +} + +func (m *Subscription) ReadNewConvs() (conv SubscriptionConversation, err error) { + defer m.Trace(&err, "ReadNewConvs")() + select { + case conv = <-m.newConvsCh: + return conv, nil + case err = <-m.errorCh: + return SubscriptionConversation{}, err + case <-m.shutdownCh: + return SubscriptionConversation{}, errors.New("Subscription shutdown") + } +} + +// Read blocks until a new message arrives +func (m *Subscription) ReadWallet() (msg SubscriptionWalletEvent, err error) { + defer m.Trace(&err, "ReadWallet")() + select { + case msg = <-m.newWalletCh: + return msg, nil + case err = <-m.errorCh: + return SubscriptionWalletEvent{}, err + case <-m.shutdownCh: + return SubscriptionWalletEvent{}, errors.New("Subscription shutdown") + } +} + +// Shutdown terminates the background process +func (m *Subscription) Shutdown() { + defer m.Trace(nil, "Shutdown")() + m.Lock() + defer m.Unlock() + if m.running { + close(m.shutdownCh) + m.running = false + } +} + +type ListenOptions struct { + Wallet bool + Convs bool +} + +type PaymentHolder struct { + Payment stellar1.PaymentDetailsLocal `json:"notification"` +} + +type TypeHolder struct { + Type string `json:"type"` +} + +type OneshotOptions struct { + Username string + PaperKey string +} + +type RunOptions struct { + KeybaseLocation string + HomeDir string + Oneshot *OneshotOptions + StartService bool + // Have the bot send/receive typing notifications + EnableTyping bool + // Disable bot lite mode + DisableBotLiteMode bool + // Number of processes to spin up to connect to the keybase service + NumPipes int +} + +func (r RunOptions) Location() string { + if r.KeybaseLocation == "" { + return "keybase" + } + return r.KeybaseLocation +} + +func (r RunOptions) Command(args ...string) *exec.Cmd { + var cmd []string + if r.HomeDir != "" { + cmd = append(cmd, "--home", r.HomeDir) + } + cmd = append(cmd, args...) + return exec.Command(r.Location(), cmd...) +} + +// Start fires up the Keybase JSON API in stdin/stdout mode +func Start(runOpts RunOptions, opts ...func(*API)) (*API, error) { + api := NewAPI(runOpts, opts...) + if err := api.startPipes(); err != nil { + return nil, err + } + return api, nil +} + +type apiPipe struct { + sync.Mutex + input io.Writer + output *bufio.Reader + cmd *exec.Cmd +} + +// API is the main object used for communicating with the Keybase JSON API +type API struct { + sync.Mutex + *DebugOutput + // Round robin hand out API pipes to allow concurrent API requests. + pipeIdx int + pipes []*apiPipe + username string + runOpts RunOptions + subscriptions []*Subscription + Timeout time.Duration + LogSendBytes int +} + +func CustomTimeout(timeout time.Duration) func(*API) { + return func(a *API) { + a.Timeout = timeout + } +} + +func NewAPI(runOpts RunOptions, opts ...func(*API)) *API { + api := &API{ + DebugOutput: NewDebugOutput("API"), + runOpts: runOpts, + Timeout: 5 * time.Second, + LogSendBytes: 1024 * 1024 * 5, // request 5MB so we don't get killed + } + for _, opt := range opts { + opt(api) + } + return api +} + +func (a *API) Command(args ...string) *exec.Cmd { + return a.runOpts.Command(args...) +} + +func (a *API) getUsername(runOpts RunOptions) (username string, err error) { + p := runOpts.Command("whoami", "-json") + output, err := p.StdoutPipe() + if err != nil { + return "", err + } + if runtime.GOOS != "windows" { + p.ExtraFiles = []*os.File{output.(*os.File)} + } + if err = p.Start(); err != nil { + return "", err + } + + doneCh := make(chan error) + go func() { + defer func() { close(doneCh) }() + statusJSON, err := io.ReadAll(output) + if err != nil { + doneCh <- fmt.Errorf("error reading whoami output: %v", err) + return + } + var status keybase1.CurrentStatus + if err := json.Unmarshal(statusJSON, &status); err != nil { + doneCh <- fmt.Errorf("invalid whoami JSON %q: %v", statusJSON, err) + return + } + if status.LoggedIn && status.User != nil { + username = status.User.Username + doneCh <- nil + } else { + doneCh <- fmt.Errorf("unable to authenticate to keybase service: logged in: %v user: %+v", status.LoggedIn, status.User) + } + // Cleanup the command + if err := p.Wait(); err != nil { + a.Debug("unable to wait for cmd: %v", err) + } + }() + + select { + case err = <-doneCh: + if err != nil { + return "", err + } + case <-time.After(a.Timeout): + return "", errors.New("unable to run Keybase command") + } + + return username, nil +} + +func (a *API) auth() (string, error) { + username, err := a.getUsername(a.runOpts) + if err == nil { + return username, nil + } + if a.runOpts.Oneshot == nil { + return "", err + } + username = "" + // If a paper key is specified, then login with oneshot mode (logout first) + if a.runOpts.Oneshot != nil { + if username == a.runOpts.Oneshot.Username { + // just get out if we are on the desired user already + return username, nil + } + if err := a.runOpts.Command("logout", "-f").Run(); err != nil { + return "", err + } + if err := a.runOpts.Command("oneshot", "--username", a.runOpts.Oneshot.Username, "--paperkey", + a.runOpts.Oneshot.PaperKey).Run(); err != nil { + return "", err + } + username = a.runOpts.Oneshot.Username + return username, nil + } + return "", errors.New("unable to auth") +} + +func (a *API) startPipes() (err error) { + a.Lock() + defer a.Unlock() + for _, pipe := range a.pipes { + if pipe.cmd != nil { + if err := pipe.cmd.Process.Kill(); err != nil { + return fmt.Errorf("unable to kill previous API command %v", err) + } + } + pipe.cmd = nil + } + a.pipes = nil + + if a.runOpts.StartService { + args := []string{fmt.Sprintf("-enable-bot-lite-mode=%v", a.runOpts.DisableBotLiteMode), "service"} + if err := a.runOpts.Command(args...).Start(); err != nil { + return fmt.Errorf("unable to start service %v", err) + } + } + + if a.username, err = a.auth(); err != nil { + return fmt.Errorf("unable to auth: %v", err) + } + + cmd := a.runOpts.Command("chat", "notification-settings", fmt.Sprintf("-disable-typing=%v", !a.runOpts.EnableTyping)) + if err = cmd.Run(); err != nil { + // This is a performance optimization but isn't a fatal error. + a.Debug("unable to set notifiation settings %v", err) + } + + // Startup NumPipes processes to the keybase chat api + for i := 0; i < int(math.Max(float64(a.runOpts.NumPipes), 1)); i++ { + pipe := apiPipe{} + pipe.cmd = a.runOpts.Command("chat", "api") + if pipe.input, err = pipe.cmd.StdinPipe(); err != nil { + return fmt.Errorf("unable to get api stdin: %v", err) + } + output, err := pipe.cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("unable to get api stdout: %v", err) + } + if runtime.GOOS != "windows" { + pipe.cmd.ExtraFiles = []*os.File{output.(*os.File)} + } + if err := pipe.cmd.Start(); err != nil { + return fmt.Errorf("unable to run chat api cmd: %v", err) + } + pipe.output = bufio.NewReader(output) + a.pipes = append(a.pipes, &pipe) + } + return nil +} + +func (a *API) getAPIPipes() (*apiPipe, error) { + a.Lock() + defer a.Unlock() + idx := a.pipeIdx % len(a.pipes) + a.pipeIdx++ + pipe := a.pipes[idx] + if pipe.cmd == nil { + return nil, errAPIDisconnected + } + return pipe, nil +} + +func (a *API) GetUsername() string { + return a.username +} + +func (a *API) doSend(arg interface{}) (resp SendResponse, err error) { + bArg, err := json.Marshal(arg) + if err != nil { + return SendResponse{}, fmt.Errorf("unable to send arg: %+v: %v", arg, err) + } + pipe, err := a.getAPIPipes() + if err != nil { + return SendResponse{}, err + } + pipe.Lock() + defer pipe.Unlock() + + if _, err := io.WriteString(pipe.input, string(bArg)); err != nil { + return SendResponse{}, err + } + responseRaw, err := pipe.output.ReadBytes('\n') + if err != nil { + return SendResponse{}, err + } + if err := json.Unmarshal(responseRaw, &resp); err != nil { + return resp, fmt.Errorf("failed to decode API response: %v %v", responseRaw, err) + } else if resp.Error != nil { + return resp, errors.New(resp.Error.Message) + } + return resp, nil +} + +func (a *API) doFetch(apiInput string) ([]byte, error) { + pipe, err := a.getAPIPipes() + if err != nil { + return nil, err + } + pipe.Lock() + defer pipe.Unlock() + + if _, err := io.WriteString(pipe.input, apiInput); err != nil { + return nil, err + } + byteOutput, err := pipe.output.ReadBytes('\n') + if err != nil { + return nil, err + } + + return byteOutput, nil +} + +// ListenForNewTextMessages proxies to Listen without wallet events +func (a *API) ListenForNewTextMessages() (*Subscription, error) { + opts := ListenOptions{Wallet: false} + return a.Listen(opts) +} + +func (a *API) registerSubscription(sub *Subscription) { + a.Lock() + defer a.Unlock() + a.subscriptions = append(a.subscriptions, sub) +} + +// Listen fires of a background loop and puts chat messages and wallet +// events into channels +func (a *API) Listen(opts ListenOptions) (*Subscription, error) { + done := make(chan struct{}) + sub := NewSubscription() + a.registerSubscription(sub) + pause := 2 * time.Second + readScanner := func(boutput *bufio.Scanner) { + defer func() { done <- struct{}{} }() + for { + select { + case <-sub.shutdownCh: + a.Debug("readScanner: received shutdown") + return + default: + } + boutput.Scan() + t := boutput.Text() + submitErr := func(err error) { + if len(sub.errorCh)*2 > cap(sub.errorCh) { + a.Debug("large errorCh queue: len: %d cap: %d ", len(sub.errorCh), cap(sub.errorCh)) + } + sub.errorCh <- err + } + var typeHolder TypeHolder + if err := json.Unmarshal([]byte(t), &typeHolder); err != nil { + submitErr(fmt.Errorf("err: %v, data: %v", err, t)) + break + } + switch typeHolder.Type { + case "chat": + var notification chat1.MsgNotification + if err := json.Unmarshal([]byte(t), ¬ification); err != nil { + submitErr(fmt.Errorf("err: %v, data: %v", err, t)) + break + } + if notification.Error != nil { + a.Debug("error message received: %s", *notification.Error) + } else if notification.Msg != nil { + subscriptionMessage := SubscriptionMessage{ + Message: *notification.Msg, + Conversation: chat1.ConvSummary{ + Id: notification.Msg.ConvID, + Channel: notification.Msg.Channel, + }, + } + if len(sub.newMsgsCh)*2 > cap(sub.newMsgsCh) { + a.Debug("large newMsgsCh queue: len: %d cap: %d ", len(sub.newMsgsCh), cap(sub.newMsgsCh)) + } + sub.newMsgsCh <- subscriptionMessage + } + case "chat_conv": + var notification chat1.ConvNotification + if err := json.Unmarshal([]byte(t), ¬ification); err != nil { + submitErr(fmt.Errorf("err: %v, data: %v", err, t)) + break + } + if notification.Error != nil { + a.Debug("error message received: %s", *notification.Error) + } else if notification.Conv != nil { + subscriptionConv := SubscriptionConversation{ + Conversation: *notification.Conv, + } + if len(sub.newConvsCh)*2 > cap(sub.newConvsCh) { + a.Debug("large newConvsCh queue: len: %d cap: %d ", len(sub.newConvsCh), cap(sub.newConvsCh)) + } + sub.newConvsCh <- subscriptionConv + } + case "wallet": + var holder PaymentHolder + if err := json.Unmarshal([]byte(t), &holder); err != nil { + submitErr(fmt.Errorf("err: %v, data: %v", err, t)) + break + } + subscriptionPayment := SubscriptionWalletEvent(holder) + if len(sub.newWalletCh)*2 > cap(sub.newWalletCh) { + a.Debug("large newWalletCh queue: len: %d cap: %d ", len(sub.newWalletCh), cap(sub.newWalletCh)) + } + sub.newWalletCh <- subscriptionPayment + default: + continue + } + } + } + + attempts := 0 + maxAttempts := 30 + go func() { + defer func() { + close(sub.newMsgsCh) + close(sub.newConvsCh) + close(sub.newWalletCh) + close(sub.errorCh) + }() + for { + select { + case <-sub.shutdownCh: + a.Debug("Listen: received shutdown") + return + default: + } + + if attempts >= maxAttempts { + if err := a.LogSend("Listen: failed to auth, giving up"); err != nil { + a.Debug("Listen: logsend failed to send: %v", err) + } + panic("Listen: failed to auth, giving up") + } + attempts++ + if _, err := a.auth(); err != nil { + a.Debug("Listen: failed to auth: %s", err) + time.Sleep(pause) + continue + } + cmdElements := []string{"chat", "api-listen"} + if opts.Wallet { + cmdElements = append(cmdElements, "--wallet") + } + if opts.Convs { + cmdElements = append(cmdElements, "--convs") + } + p := a.runOpts.Command(cmdElements...) + output, err := p.StdoutPipe() + if err != nil { + a.Debug("Listen: failed to listen: %s", err) + time.Sleep(pause) + continue + } + stderr, err := p.StderrPipe() + if err != nil { + a.Debug("Listen: failed to listen to stderr: %s", err) + time.Sleep(pause) + continue + } + if runtime.GOOS != "windows" { + p.ExtraFiles = []*os.File{stderr.(*os.File), output.(*os.File)} + } + boutput := bufio.NewScanner(output) + if err := p.Start(); err != nil { + a.Debug("Listen: failed to make listen scanner: %s", err) + time.Sleep(pause) + continue + } + attempts = 0 + go readScanner(boutput) + select { + case <-sub.shutdownCh: + a.Debug("Listen: received shutdown") + return + case <-done: + } + if err := p.Wait(); err != nil { + stderrBytes, rerr := io.ReadAll(stderr) + if rerr != nil { + stderrBytes = []byte(fmt.Sprintf("failed to get stderr: %v", rerr)) + } + a.Debug("Listen: failed to Wait for command, restarting pipes: %s (```%s```)", err, stderrBytes) + if err := a.startPipes(); err != nil { + a.Debug("Listen: failed to restart pipes: %v", err) + } + } + time.Sleep(pause) + } + }() + return sub, nil +} + +func (a *API) LogSend(feedback string) error { + feedback = "go-keybase-chat-bot log send\n" + + "username: " + a.GetUsername() + "\n" + + feedback + + args := []string{ + "log", "send", + "--no-confirm", + "--feedback", feedback, + "-n", fmt.Sprintf("%d", a.LogSendBytes), + } + return a.runOpts.Command(args...).Run() +} + +func (a *API) Shutdown() (err error) { + defer a.Trace(&err, "Shutdown")() + a.Lock() + defer a.Unlock() + for _, sub := range a.subscriptions { + sub.Shutdown() + } + for _, pipe := range a.pipes { + if pipe.cmd != nil { + a.Debug("waiting for API command") + if err := pipe.cmd.Wait(); err != nil { + return err + } + } + } + + if a.runOpts.Oneshot != nil { + a.Debug("logging out") + err := a.runOpts.Command("logout", "--force").Run() + if err != nil { + return err + } + } + + if a.runOpts.StartService { + a.Debug("stopping service") + err := a.runOpts.Command("ctl", "stop", "--shutdown").Run() + if err != nil { + return err + } + } + + return nil +} |
