diff options
Diffstat (limited to 'teleirc/matterbridge/bridge/slack')
| -rw-r--r-- | teleirc/matterbridge/bridge/slack/handlers.go | 415 | ||||
| -rw-r--r-- | teleirc/matterbridge/bridge/slack/helpers.go | 255 | ||||
| -rw-r--r-- | teleirc/matterbridge/bridge/slack/helpers_test.go | 36 | ||||
| -rw-r--r-- | teleirc/matterbridge/bridge/slack/legacy.go | 80 | ||||
| -rw-r--r-- | teleirc/matterbridge/bridge/slack/slack.go | 566 | ||||
| -rw-r--r-- | teleirc/matterbridge/bridge/slack/users_channels.go | 343 |
6 files changed, 1695 insertions, 0 deletions
diff --git a/teleirc/matterbridge/bridge/slack/handlers.go b/teleirc/matterbridge/bridge/slack/handlers.go new file mode 100644 index 0000000..2424442 --- /dev/null +++ b/teleirc/matterbridge/bridge/slack/handlers.go @@ -0,0 +1,415 @@ +package bslack + +import ( + "errors" + "fmt" + "html" + "time" + + "github.com/42wim/matterbridge/bridge/config" + "github.com/42wim/matterbridge/bridge/helper" + "github.com/slack-go/slack" +) + +// ErrEventIgnored is for events that should be ignored +var ErrEventIgnored = errors.New("this event message should ignored") + +func (b *Bslack) handleSlack() { + messages := make(chan *config.Message) + if b.GetString(incomingWebhookConfig) != "" && b.GetString(tokenConfig) == "" { + b.Log.Debugf("Choosing webhooks based receiving") + go b.handleMatterHook(messages) + } else { + b.Log.Debugf("Choosing token based receiving") + go b.handleSlackClient(messages) + } + time.Sleep(time.Second) + b.Log.Debug("Start listening for Slack messages") + for message := range messages { + // don't do any action on deleted/typing messages + if message.Event != config.EventUserTyping && message.Event != config.EventMsgDelete && + message.Event != config.EventFileDelete { + b.Log.Debugf("<= Sending message from %s on %s to gateway", message.Username, b.Account) + // cleanup the message + message.Text = b.replaceMention(message.Text) + message.Text = b.replaceVariable(message.Text) + message.Text = b.replaceChannel(message.Text) + message.Text = b.replaceURL(message.Text) + message.Text = b.replaceb0rkedMarkDown(message.Text) + message.Text = html.UnescapeString(message.Text) + + // Add the avatar + message.Avatar = b.users.getAvatar(message.UserID) + } + + b.Log.Debugf("<= Message is %#v", message) + b.Remote <- *message + } +} + +func (b *Bslack) handleSlackClient(messages chan *config.Message) { + for msg := range b.rtm.IncomingEvents { + if msg.Type != sUserTyping && msg.Type != sHello && msg.Type != sLatencyReport { + b.Log.Debugf("== Receiving event %#v", msg.Data) + } + switch ev := msg.Data.(type) { + case *slack.UserTypingEvent: + if !b.GetBool("ShowUserTyping") { + continue + } + rmsg, err := b.handleTypingEvent(ev) + if err == ErrEventIgnored { + continue + } else if err != nil { + b.Log.Errorf("%#v", err) + continue + } + + messages <- rmsg + case *slack.MessageEvent: + if b.skipMessageEvent(ev) { + b.Log.Debugf("Skipped message: %#v", ev) + continue + } + rmsg, err := b.handleMessageEvent(ev) + if err != nil { + b.Log.Errorf("%#v", err) + continue + } + messages <- rmsg + case *slack.FileDeletedEvent: + rmsg, err := b.handleFileDeletedEvent(ev) + if err != nil { + b.Log.Printf("%#v", err) + continue + } + messages <- rmsg + case *slack.OutgoingErrorEvent: + b.Log.Debugf("%#v", ev.Error()) + case *slack.ChannelJoinedEvent: + // When we join a channel we update the full list of users as + // well as the information for the channel that we joined as this + // should now tell that we are a member of it. + b.channels.registerChannel(ev.Channel) + case *slack.ConnectedEvent: + b.si = ev.Info + b.channels.populateChannels(true) + b.users.populateUsers(true) + case *slack.InvalidAuthEvent: + b.Log.Fatalf("Invalid Token %#v", ev) + case *slack.ConnectionErrorEvent: + b.Log.Errorf("Connection failed %#v %#v", ev.Error(), ev.ErrorObj) + case *slack.MemberJoinedChannelEvent: + b.users.populateUser(ev.User) + case *slack.HelloEvent, *slack.LatencyReport, *slack.ConnectingEvent: + continue + case *slack.UserChangeEvent: + b.users.invalidateUser(ev.User.ID) + default: + b.Log.Debugf("Unhandled incoming event: %T", ev) + } + } +} + +func (b *Bslack) handleMatterHook(messages chan *config.Message) { + for { + message := b.mh.Receive() + b.Log.Debugf("receiving from matterhook (slack) %#v", message) + if message.UserName == "slackbot" { + continue + } + messages <- &config.Message{ + Username: message.UserName, + Text: message.Text, + Channel: message.ChannelName, + } + } +} + +// skipMessageEvent skips event that need to be skipped :-) +func (b *Bslack) skipMessageEvent(ev *slack.MessageEvent) bool { + switch ev.SubType { + case sChannelLeave, sChannelJoin: + return b.GetBool(noSendJoinConfig) + case sPinnedItem, sUnpinnedItem: + return true + case sChannelTopic, sChannelPurpose: + // Skip the event if our bot/user account changed the topic/purpose + if ev.User == b.si.User.ID { + return true + } + } + + // Check for our callback ID + hasOurCallbackID := false + if len(ev.Blocks.BlockSet) == 1 { + block, ok := ev.Blocks.BlockSet[0].(*slack.SectionBlock) + hasOurCallbackID = ok && block.BlockID == "matterbridge_"+b.uuid + } + + if ev.SubMessage != nil { + // It seems ev.SubMessage.Edited == nil when slack unfurls. + // Do not forward these messages. See Github issue #266. + if ev.SubMessage.ThreadTimestamp != ev.SubMessage.Timestamp && + ev.SubMessage.Edited == nil { + return true + } + // see hidden subtypes at https://api.slack.com/events/message + // these messages are sent when we add a message to a thread #709 + if ev.SubType == "message_replied" && ev.Hidden { + return true + } + if len(ev.SubMessage.Blocks.BlockSet) == 1 { + block, ok := ev.SubMessage.Blocks.BlockSet[0].(*slack.SectionBlock) + hasOurCallbackID = ok && block.BlockID == "matterbridge_"+b.uuid + } + } + + // Skip any messages that we made ourselves or from 'slackbot' (see #527). + if ev.Username == sSlackBotUser || + (b.rtm != nil && ev.Username == b.si.User.Name) || hasOurCallbackID { + return true + } + + if len(ev.Files) > 0 { + return b.filesCached(ev.Files) + } + return false +} + +func (b *Bslack) filesCached(files []slack.File) bool { + for i := range files { + if !b.fileCached(&files[i]) { + return false + } + } + return true +} + +// handleMessageEvent handles the message events. Together with any called sub-methods, +// this method implements the following event processing pipeline: +// +// 1. Check if the message should be ignored. +// NOTE: This is not actually part of the method below but is done just before it +// is called via the 'skipMessageEvent()' method. +// 2. Populate the Matterbridge message that will be sent to the router based on the +// received event and logic that is common to all events that are not skipped. +// 3. Detect and handle any message that is "status" related (think join channel, etc.). +// This might result in an early exit from the pipeline and passing of the +// pre-populated message to the Matterbridge router. +// 4. Handle the specific case of messages that edit existing messages depending on +// configuration. +// 5. Handle any attachments of the received event. +// 6. Check that the Matterbridge message that we end up with after at the end of the +// pipeline is valid before sending it to the Matterbridge router. +func (b *Bslack) handleMessageEvent(ev *slack.MessageEvent) (*config.Message, error) { + rmsg, err := b.populateReceivedMessage(ev) + if err != nil { + return nil, err + } + + // Handle some message types early. + if b.handleStatusEvent(ev, rmsg) { + return rmsg, nil + } + + b.handleAttachments(ev, rmsg) + + // Verify that we have the right information and the message + // is well-formed before sending it out to the router. + if len(ev.Files) == 0 && (rmsg.Text == "" || rmsg.Username == "") { + if ev.BotID != "" { + // This is probably a webhook we couldn't resolve. + return nil, fmt.Errorf("message handling resulted in an empty bot message (probably an incoming webhook we couldn't resolve): %#v", ev) + } + if ev.SubMessage != nil { + return nil, fmt.Errorf("message handling resulted in an empty message: %#v with submessage %#v", ev, ev.SubMessage) + } + return nil, fmt.Errorf("message handling resulted in an empty message: %#v", ev) + } + return rmsg, nil +} + +func (b *Bslack) handleFileDeletedEvent(ev *slack.FileDeletedEvent) (*config.Message, error) { + if rawChannel, ok := b.cache.Get(cfileDownloadChannel + ev.FileID); ok { + channel, err := b.channels.getChannelByID(rawChannel.(string)) + if err != nil { + return nil, err + } + + return &config.Message{ + Event: config.EventFileDelete, + Text: config.EventFileDelete, + Channel: channel.Name, + Account: b.Account, + ID: ev.FileID, + Protocol: b.Protocol, + }, nil + } + + return nil, fmt.Errorf("channel ID for file ID %s not found", ev.FileID) +} + +func (b *Bslack) handleStatusEvent(ev *slack.MessageEvent, rmsg *config.Message) bool { + switch ev.SubType { + case sChannelJoined, sMemberJoined: + // There's no further processing needed on channel events + // so we return 'true'. + return true + case sChannelJoin, sChannelLeave: + rmsg.Username = sSystemUser + rmsg.Event = config.EventJoinLeave + case sChannelTopic, sChannelPurpose: + b.channels.populateChannels(false) + rmsg.Event = config.EventTopicChange + case sMessageChanged: + rmsg.Text = ev.SubMessage.Text + // handle deleted thread starting messages + if ev.SubMessage.Text == "This message was deleted." { + rmsg.Event = config.EventMsgDelete + return true + } + case sMessageDeleted: + rmsg.Text = config.EventMsgDelete + rmsg.Event = config.EventMsgDelete + rmsg.ID = ev.DeletedTimestamp + // If a message is being deleted we do not need to process + // the event any further so we return 'true'. + return true + case sMeMessage: + rmsg.Event = config.EventUserAction + } + return false +} + +func getMessageTitle(attach *slack.Attachment) string { + if attach.TitleLink != "" { + return fmt.Sprintf("[%s](%s)\n", attach.Title, attach.TitleLink) + } + return attach.Title +} + +func (b *Bslack) handleAttachments(ev *slack.MessageEvent, rmsg *config.Message) { + // File comments are set by the system (because there is no username given). + if ev.SubType == sFileComment { + rmsg.Username = sSystemUser + } + + // See if we have some text in the attachments. + if rmsg.Text == "" { + for i, attach := range ev.Attachments { + if attach.Text != "" { + if attach.Title != "" { + rmsg.Text = getMessageTitle(&ev.Attachments[i]) + } + rmsg.Text += attach.Text + if attach.Footer != "" { + rmsg.Text += "\n\n" + attach.Footer + } + } else { + rmsg.Text = attach.Fallback + } + } + } + + // Save the attachments, so that we can send them to other slack (compatible) bridges. + if len(ev.Attachments) > 0 { + rmsg.Extra[sSlackAttachment] = append(rmsg.Extra[sSlackAttachment], ev.Attachments) + } + + // If we have files attached, download them (in memory) and put a pointer to it in msg.Extra. + for i := range ev.Files { + // keep reference in cache on which channel we added this file + b.cache.Add(cfileDownloadChannel+ev.Files[i].ID, ev.Channel) + if err := b.handleDownloadFile(rmsg, &ev.Files[i], false); err != nil { + b.Log.Errorf("Could not download incoming file: %#v", err) + } + } +} + +func (b *Bslack) handleTypingEvent(ev *slack.UserTypingEvent) (*config.Message, error) { + if ev.User == b.si.User.ID { + return nil, ErrEventIgnored + } + channelInfo, err := b.channels.getChannelByID(ev.Channel) + if err != nil { + return nil, err + } + return &config.Message{ + Channel: channelInfo.Name, + Account: b.Account, + Event: config.EventUserTyping, + }, nil +} + +// handleDownloadFile handles file download +func (b *Bslack) handleDownloadFile(rmsg *config.Message, file *slack.File, retry bool) error { + if b.fileCached(file) { + return nil + } + // Check that the file is neither too large nor blacklisted. + if err := helper.HandleDownloadSize(b.Log, rmsg, file.Name, int64(file.Size), b.General); err != nil { + b.Log.WithError(err).Infof("Skipping download of incoming file.") + return nil + } + + // Actually download the file. + data, err := helper.DownloadFileAuth(file.URLPrivateDownload, "Bearer "+b.GetString(tokenConfig)) + if err != nil { + return fmt.Errorf("download %s failed %#v", file.URLPrivateDownload, err) + } + + if len(*data) != file.Size && !retry { + b.Log.Debugf("Data size (%d) is not equal to size declared (%d)\n", len(*data), file.Size) + time.Sleep(1 * time.Second) + return b.handleDownloadFile(rmsg, file, true) + } + + // If a comment is attached to the file(s) it is in the 'Text' field of the Slack messge event + // and should be added as comment to only one of the files. We reset the 'Text' field to ensure + // that the comment is not duplicated. + comment := rmsg.Text + rmsg.Text = "" + helper.HandleDownloadData2(b.Log, rmsg, file.Name, file.ID, comment, file.URLPrivateDownload, data, b.General) + return nil +} + +// handleGetChannelMembers handles messages containing the GetChannelMembers event +// Sends a message to the router containing *config.ChannelMembers +func (b *Bslack) handleGetChannelMembers(rmsg *config.Message) bool { + if rmsg.Event != config.EventGetChannelMembers { + return false + } + + cMembers := b.channels.getChannelMembers(b.users) + + extra := make(map[string][]interface{}) + extra[config.EventGetChannelMembers] = append(extra[config.EventGetChannelMembers], cMembers) + msg := config.Message{ + Extra: extra, + Event: config.EventGetChannelMembers, + Account: b.Account, + } + + b.Log.Debugf("sending msg to remote %#v", msg) + b.Remote <- msg + + return true +} + +// fileCached implements Matterbridge's caching logic for files +// shared via Slack. +// +// We consider that a file was cached if its ID was added in the last minute or +// it's name was registered in the last 10 seconds. This ensures that an +// identically named file but with different content will be uploaded correctly +// (the assumption is that such name collisions will not occur within the given +// timeframes). +func (b *Bslack) fileCached(file *slack.File) bool { + if ts, ok := b.cache.Get("file" + file.ID); ok && time.Since(ts.(time.Time)) < time.Minute { + return true + } else if ts, ok = b.cache.Get("filename" + file.Name); ok && time.Since(ts.(time.Time)) < 10*time.Second { + return true + } + return false +} diff --git a/teleirc/matterbridge/bridge/slack/helpers.go b/teleirc/matterbridge/bridge/slack/helpers.go new file mode 100644 index 0000000..e46e272 --- /dev/null +++ b/teleirc/matterbridge/bridge/slack/helpers.go @@ -0,0 +1,255 @@ +package bslack + +import ( + "fmt" + "regexp" + "strings" + "time" + + "github.com/42wim/matterbridge/bridge/config" + "github.com/sirupsen/logrus" + "github.com/slack-go/slack" +) + +// populateReceivedMessage shapes the initial Matterbridge message that we will forward to the +// router before we apply message-dependent modifications. +func (b *Bslack) populateReceivedMessage(ev *slack.MessageEvent) (*config.Message, error) { + // Use our own func because rtm.GetChannelInfo doesn't work for private channels. + channel, err := b.channels.getChannelByID(ev.Channel) + if err != nil { + return nil, err + } + + rmsg := &config.Message{ + Text: ev.Text, + Channel: channel.Name, + Account: b.Account, + ID: ev.Timestamp, + Extra: make(map[string][]interface{}), + ParentID: ev.ThreadTimestamp, + Protocol: b.Protocol, + } + if b.useChannelID { + rmsg.Channel = "ID:" + channel.ID + } + + // Handle 'edit' messages. + if ev.SubMessage != nil && !b.GetBool(editDisableConfig) { + rmsg.ID = ev.SubMessage.Timestamp + if ev.SubMessage.ThreadTimestamp != ev.SubMessage.Timestamp { + b.Log.Debugf("SubMessage %#v", ev.SubMessage) + rmsg.Text = ev.SubMessage.Text + b.GetString(editSuffixConfig) + } + } + + // For edits, only submessage has thread ts. + // Ensures edits to threaded messages maintain their prefix hint on the + // unthreaded end. + if ev.SubMessage != nil { + rmsg.ParentID = ev.SubMessage.ThreadTimestamp + } + + if err = b.populateMessageWithUserInfo(ev, rmsg); err != nil { + return nil, err + } + return rmsg, err +} + +func (b *Bslack) populateMessageWithUserInfo(ev *slack.MessageEvent, rmsg *config.Message) error { + if ev.SubType == sMessageDeleted || ev.SubType == sFileComment { + return nil + } + + // First, deal with bot-originating messages but only do so when not using webhooks: we + // would not be able to distinguish which bot would be sending them. + if err := b.populateMessageWithBotInfo(ev, rmsg); err != nil { + return err + } + + // Second, deal with "real" users if we have the necessary information. + var userID string + switch { + case ev.User != "": + userID = ev.User + case ev.SubMessage != nil && ev.SubMessage.User != "": + userID = ev.SubMessage.User + default: + return nil + } + + user := b.users.getUser(userID) + if user == nil { + return fmt.Errorf("could not find information for user with id %s", ev.User) + } + + rmsg.UserID = user.ID + rmsg.Username = user.Name + if user.Profile.DisplayName != "" { + rmsg.Username = user.Profile.DisplayName + } + if b.GetBool("UseFullName") && user.Profile.RealName != "" { + rmsg.Username = user.Profile.RealName + } + return nil +} + +func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config.Message) error { + if ev.BotID == "" || b.GetString(outgoingWebhookConfig) != "" { + return nil + } + + var err error + var bot *slack.Bot + for { + bot, err = b.rtm.GetBotInfo(ev.BotID) + if err == nil { + break + } + + if err = handleRateLimit(b.Log, err); err != nil { + b.Log.Errorf("Could not retrieve bot information: %#v", err) + return err + } + } + b.Log.Debugf("Found bot %#v", bot) + + if bot.Name != "" { + rmsg.Username = bot.Name + if ev.Username != "" { + rmsg.Username = ev.Username + } + rmsg.UserID = bot.ID + } + return nil +} + +var ( + mentionRE = regexp.MustCompile(`<@([a-zA-Z0-9]+)>`) + channelRE = regexp.MustCompile(`<#[a-zA-Z0-9]+\|(.+?)>`) + variableRE = regexp.MustCompile(`<!((?:subteam\^)?[a-zA-Z0-9]+)(?:\|@?(.+?))?>`) + urlRE = regexp.MustCompile(`<([^<\|]+)\|([^>]+)>`) + codeFenceRE = regexp.MustCompile(`(?m)^` + "```" + `\w+$`) + topicOrPurposeRE = regexp.MustCompile(`(?s)(@.+) (cleared|set)(?: the)? channel (topic|purpose)(?:: (.*))?`) +) + +func (b *Bslack) extractTopicOrPurpose(text string) (string, string) { + r := topicOrPurposeRE.FindStringSubmatch(text) + if len(r) == 5 { + action, updateType, extracted := r[2], r[3], r[4] + switch action { + case "set": + return updateType, extracted + case "cleared": + return updateType, "" + } + } + b.Log.Warnf("Encountered channel topic or purpose change message with unexpected format: %s", text) + return "unknown", "" +} + +// @see https://api.slack.com/docs/message-formatting#linking_to_channels_and_users +func (b *Bslack) replaceMention(text string) string { + replaceFunc := func(match string) string { + userID := strings.Trim(match, "@<>") + if username := b.users.getUsername(userID); userID != "" { + return "@" + username + } + return match + } + return mentionRE.ReplaceAllStringFunc(text, replaceFunc) +} + +// @see https://api.slack.com/docs/message-formatting#linking_to_channels_and_users +func (b *Bslack) replaceChannel(text string) string { + for _, r := range channelRE.FindAllStringSubmatch(text, -1) { + text = strings.Replace(text, r[0], "#"+r[1], 1) + } + return text +} + +// @see https://api.slack.com/docs/message-formatting#variables +func (b *Bslack) replaceVariable(text string) string { + for _, r := range variableRE.FindAllStringSubmatch(text, -1) { + if r[2] != "" { + text = strings.Replace(text, r[0], "@"+r[2], 1) + } else { + text = strings.Replace(text, r[0], "@"+r[1], 1) + } + } + return text +} + +// @see https://api.slack.com/docs/message-formatting#linking_to_urls +func (b *Bslack) replaceURL(text string) string { + return urlRE.ReplaceAllString(text, "[${2}](${1})") +} + +func (b *Bslack) replaceb0rkedMarkDown(text string) string { + // taken from https://github.com/mattermost/mattermost-server/blob/master/app/slackimport.go + // + regexReplaceAllString := []struct { + regex *regexp.Regexp + rpl string + }{ + // bold + { + regexp.MustCompile(`(^|[\s.;,])\*(\S[^*\n]+)\*`), + "$1**$2**", + }, + // strikethrough + { + regexp.MustCompile(`(^|[\s.;,])\~(\S[^~\n]+)\~`), + "$1~~$2~~", + }, + // single paragraph blockquote + // Slack converts > character to > + { + regexp.MustCompile(`(?sm)^>`), + ">", + }, + } + for _, rule := range regexReplaceAllString { + text = rule.regex.ReplaceAllString(text, rule.rpl) + } + return text +} + +func (b *Bslack) replaceCodeFence(text string) string { + return codeFenceRE.ReplaceAllString(text, "```") +} + +// getUsersInConversation returns an array of userIDs that are members of channelID +func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) { + channelMembers := []string{} + for { + queryParams := &slack.GetUsersInConversationParameters{ + ChannelID: channelID, + } + + members, nextCursor, err := b.sc.GetUsersInConversation(queryParams) + if err != nil { + if err = handleRateLimit(b.Log, err); err != nil { + return channelMembers, fmt.Errorf("Could not retrieve users in channels: %#v", err) + } + continue + } + + channelMembers = append(channelMembers, members...) + + if nextCursor == "" { + break + } + queryParams.Cursor = nextCursor + } + return channelMembers, nil +} + +func handleRateLimit(log *logrus.Entry, err error) error { + rateLimit, ok := err.(*slack.RateLimitedError) + if !ok { + return err + } + log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter) + time.Sleep(rateLimit.RetryAfter) + return nil +} diff --git a/teleirc/matterbridge/bridge/slack/helpers_test.go b/teleirc/matterbridge/bridge/slack/helpers_test.go new file mode 100644 index 0000000..fe3ba41 --- /dev/null +++ b/teleirc/matterbridge/bridge/slack/helpers_test.go @@ -0,0 +1,36 @@ +package bslack + +import ( + "io/ioutil" + "testing" + + "github.com/42wim/matterbridge/bridge" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" +) + +func TestExtractTopicOrPurpose(t *testing.T) { + testcases := map[string]struct { + input string + wantChangeType string + wantOutput string + }{ + "success - topic type": {"@someone set channel topic: foo bar", "topic", "foo bar"}, + "success - purpose type": {"@someone set channel purpose: foo bar", "purpose", "foo bar"}, + "success - one line": {"@someone set channel topic: foo bar", "topic", "foo bar"}, + "success - multi-line": {"@someone set channel topic: foo\nbar", "topic", "foo\nbar"}, + "success - cleared": {"@someone cleared channel topic", "topic", ""}, + "error - unhandled": {"some unmatched message", "unknown", ""}, + } + + logger := logrus.New() + logger.SetOutput(ioutil.Discard) + cfg := &bridge.Config{Bridge: &bridge.Bridge{Log: logrus.NewEntry(logger)}} + b := newBridge(cfg) + for name, tc := range testcases { + gotChangeType, gotOutput := b.extractTopicOrPurpose(tc.input) + + assert.Equalf(t, tc.wantChangeType, gotChangeType, "This testcase failed: %s", name) + assert.Equalf(t, tc.wantOutput, gotOutput, "This testcase failed: %s", name) + } +} diff --git a/teleirc/matterbridge/bridge/slack/legacy.go b/teleirc/matterbridge/bridge/slack/legacy.go new file mode 100644 index 0000000..d89d286 --- /dev/null +++ b/teleirc/matterbridge/bridge/slack/legacy.go @@ -0,0 +1,80 @@ +package bslack + +import ( + "errors" + + "github.com/42wim/matterbridge/bridge" + "github.com/42wim/matterbridge/matterhook" + "github.com/slack-go/slack" +) + +type BLegacy struct { + *Bslack +} + +func NewLegacy(cfg *bridge.Config) bridge.Bridger { + b := &BLegacy{Bslack: newBridge(cfg)} + b.legacy = true + return b +} + +func (b *BLegacy) Connect() error { + b.RLock() + defer b.RUnlock() + if b.GetString(incomingWebhookConfig) != "" { + switch { + case b.GetString(outgoingWebhookConfig) != "": + b.Log.Info("Connecting using webhookurl (sending) and webhookbindaddress (receiving)") + b.mh = matterhook.New(b.GetString(outgoingWebhookConfig), matterhook.Config{ + InsecureSkipVerify: b.GetBool(skipTLSConfig), + BindAddress: b.GetString(incomingWebhookConfig), + }) + case b.GetString(tokenConfig) != "": + b.Log.Info("Connecting using token (sending)") + b.sc = slack.New(b.GetString(tokenConfig)) + b.rtm = b.sc.NewRTM() + go b.rtm.ManageConnection() + b.Log.Info("Connecting using webhookbindaddress (receiving)") + b.mh = matterhook.New(b.GetString(outgoingWebhookConfig), matterhook.Config{ + InsecureSkipVerify: b.GetBool(skipTLSConfig), + BindAddress: b.GetString(incomingWebhookConfig), + }) + default: + b.Log.Info("Connecting using webhookbindaddress (receiving)") + b.mh = matterhook.New(b.GetString(outgoingWebhookConfig), matterhook.Config{ + InsecureSkipVerify: b.GetBool(skipTLSConfig), + BindAddress: b.GetString(incomingWebhookConfig), + }) + } + go b.handleSlack() + return nil + } + if b.GetString(outgoingWebhookConfig) != "" { + b.Log.Info("Connecting using webhookurl (sending)") + b.mh = matterhook.New(b.GetString(outgoingWebhookConfig), matterhook.Config{ + InsecureSkipVerify: b.GetBool(skipTLSConfig), + DisableServer: true, + }) + if b.GetString(tokenConfig) != "" { + b.Log.Info("Connecting using token (receiving)") + b.sc = slack.New(b.GetString(tokenConfig), slack.OptionDebug(b.GetBool("debug"))) + b.channels = newChannelManager(b.Log, b.sc) + b.users = newUserManager(b.Log, b.sc) + b.rtm = b.sc.NewRTM() + go b.rtm.ManageConnection() + go b.handleSlack() + } + } else if b.GetString(tokenConfig) != "" { + b.Log.Info("Connecting using token (sending and receiving)") + b.sc = slack.New(b.GetString(tokenConfig), slack.OptionDebug(b.GetBool("debug"))) + b.channels = newChannelManager(b.Log, b.sc) + b.users = newUserManager(b.Log, b.sc) + b.rtm = b.sc.NewRTM() + go b.rtm.ManageConnection() + go b.handleSlack() + } + if b.GetString(incomingWebhookConfig) == "" && b.GetString(outgoingWebhookConfig) == "" && b.GetString(tokenConfig) == "" { + return errors.New("no connection method found. See that you have WebhookBindAddress, WebhookURL or Token configured") + } + return nil +} diff --git a/teleirc/matterbridge/bridge/slack/slack.go b/teleirc/matterbridge/bridge/slack/slack.go new file mode 100644 index 0000000..c39c608 --- /dev/null +++ b/teleirc/matterbridge/bridge/slack/slack.go @@ -0,0 +1,566 @@ +package bslack + +import ( + "bytes" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/42wim/matterbridge/bridge" + "github.com/42wim/matterbridge/bridge/config" + "github.com/42wim/matterbridge/bridge/helper" + "github.com/42wim/matterbridge/matterhook" + lru "github.com/hashicorp/golang-lru" + "github.com/rs/xid" + "github.com/slack-go/slack" +) + +type Bslack struct { + sync.RWMutex + *bridge.Config + + mh *matterhook.Client + sc *slack.Client + rtm *slack.RTM + si *slack.Info + + cache *lru.Cache + uuid string + useChannelID bool + + channels *channels + users *users + legacy bool +} + +const ( + sHello = "hello" + sChannelJoin = "channel_join" + sChannelLeave = "channel_leave" + sChannelJoined = "channel_joined" + sMemberJoined = "member_joined_channel" + sMessageChanged = "message_changed" + sMessageDeleted = "message_deleted" + sSlackAttachment = "slack_attachment" + sPinnedItem = "pinned_item" + sUnpinnedItem = "unpinned_item" + sChannelTopic = "channel_topic" + sChannelPurpose = "channel_purpose" + sFileComment = "file_comment" + sMeMessage = "me_message" + sUserTyping = "user_typing" + sLatencyReport = "latency_report" + sSystemUser = "system" + sSlackBotUser = "slackbot" + cfileDownloadChannel = "file_download_channel" + + tokenConfig = "Token" + incomingWebhookConfig = "WebhookBindAddress" + outgoingWebhookConfig = "WebhookURL" + skipTLSConfig = "SkipTLSVerify" + useNickPrefixConfig = "PrefixMessagesWithNick" + editDisableConfig = "EditDisable" + editSuffixConfig = "EditSuffix" + iconURLConfig = "iconurl" + noSendJoinConfig = "nosendjoinpart" + messageLength = 3000 +) + +func New(cfg *bridge.Config) bridge.Bridger { + // Print a deprecation warning for legacy non-bot tokens (#527). + token := cfg.GetString(tokenConfig) + if token != "" && !strings.HasPrefix(token, "xoxb") { + cfg.Log.Warn("Non-bot token detected. It is STRONGLY recommended to use a proper bot-token instead.") + cfg.Log.Warn("Legacy tokens may be deprecated by Slack at short notice. See the Matterbridge GitHub wiki for a migration guide.") + cfg.Log.Warn("See https://github.com/42wim/matterbridge/wiki/Slack-bot-setup") + return NewLegacy(cfg) + } + return newBridge(cfg) +} + +func newBridge(cfg *bridge.Config) *Bslack { + newCache, err := lru.New(5000) + if err != nil { + cfg.Log.Fatalf("Could not create LRU cache for Slack bridge: %v", err) + } + b := &Bslack{ + Config: cfg, + uuid: xid.New().String(), + cache: newCache, + } + return b +} + +func (b *Bslack) Command(cmd string) string { + return "" +} + +func (b *Bslack) Connect() error { + b.RLock() + defer b.RUnlock() + + if b.GetString(incomingWebhookConfig) == "" && b.GetString(outgoingWebhookConfig) == "" && b.GetString(tokenConfig) == "" { + return errors.New("no connection method found: WebhookBindAddress, WebhookURL or Token need to be configured") + } + + // If we have a token we use the Slack websocket-based RTM for both sending and receiving. + if token := b.GetString(tokenConfig); token != "" { + b.Log.Info("Connecting using token") + + b.sc = slack.New(token, slack.OptionDebug(b.GetBool("Debug"))) + + b.channels = newChannelManager(b.Log, b.sc) + b.users = newUserManager(b.Log, b.sc) + + b.rtm = b.sc.NewRTM() + go b.rtm.ManageConnection() + go b.handleSlack() + return nil + } + + // In absence of a token we fall back to incoming and outgoing Webhooks. + b.mh = matterhook.New( + "", + matterhook.Config{ + InsecureSkipVerify: b.GetBool("SkipTLSVerify"), + DisableServer: true, + }, + ) + if b.GetString(outgoingWebhookConfig) != "" { + b.Log.Info("Using specified webhook for outgoing messages.") + b.mh.Url = b.GetString(outgoingWebhookConfig) + } + if b.GetString(incomingWebhookConfig) != "" { + b.Log.Info("Setting up local webhook for incoming messages.") + b.mh.BindAddress = b.GetString(incomingWebhookConfig) + b.mh.DisableServer = false + go b.handleSlack() + } + return nil +} + +func (b *Bslack) Disconnect() error { + return b.rtm.Disconnect() +} + +// JoinChannel only acts as a verification method that checks whether Matterbridge's +// Slack integration is already member of the channel. This is because Slack does not +// allow apps or bots to join channels themselves and they need to be invited +// manually by a user. +func (b *Bslack) JoinChannel(channel config.ChannelInfo) error { + // We can only join a channel through the Slack API. + if b.sc == nil { + return nil + } + + // try to join a channel when in legacy + if b.legacy { + _, _, _, err := b.sc.JoinConversation(channel.Name) + if err != nil { + switch err.Error() { + case "name_taken", "restricted_action": + case "default": + return err + } + } + } + + b.channels.populateChannels(false) + + channelInfo, err := b.channels.getChannel(channel.Name) + if err != nil { + return fmt.Errorf("could not join channel: %#v", err) + } + + if strings.HasPrefix(channel.Name, "ID:") { + b.useChannelID = true + channel.Name = channelInfo.Name + } + + // we can't join a channel unless we are using legacy tokens #651 + if !channelInfo.IsMember && !b.legacy { + return fmt.Errorf("slack integration that matterbridge is using is not member of channel '%s', please add it manually", channelInfo.Name) + } + return nil +} + +func (b *Bslack) Reload(cfg *bridge.Config) (string, error) { + return "", nil +} + +func (b *Bslack) Send(msg config.Message) (string, error) { + // Too noisy to log like other events + if msg.Event != config.EventUserTyping { + b.Log.Debugf("=> Receiving %#v", msg) + } + + msg.Text = helper.ClipMessage(msg.Text, messageLength, b.GetString("MessageClipped")) + msg.Text = b.replaceCodeFence(msg.Text) + + // Make a action /me of the message + if msg.Event == config.EventUserAction { + msg.Text = "_" + msg.Text + "_" + } + + // Use webhook to send the message + if b.GetString(outgoingWebhookConfig) != "" && b.GetString(tokenConfig) == "" { + return "", b.sendWebhook(msg) + } + return b.sendRTM(msg) +} + +// sendWebhook uses the configured WebhookURL to send the message +func (b *Bslack) sendWebhook(msg config.Message) error { + // Skip events. + if msg.Event != "" { + return nil + } + + if b.GetBool(useNickPrefixConfig) { + msg.Text = msg.Username + msg.Text + } + + if msg.Extra != nil { + // This sends a message only if we received a config.EVENT_FILE_FAILURE_SIZE. + for _, rmsg := range helper.HandleExtra(&msg, b.General) { + rmsg := rmsg // scopelint + iconURL := config.GetIconURL(&rmsg, b.GetString(iconURLConfig)) + matterMessage := matterhook.OMessage{ + IconURL: iconURL, + Channel: msg.Channel, + UserName: rmsg.Username, + Text: rmsg.Text, + } + if err := b.mh.Send(matterMessage); err != nil { + b.Log.Errorf("Failed to send message: %v", err) + } + } + + // Webhook doesn't support file uploads, so we add the URL manually. + for _, f := range msg.Extra["file"] { + fi, ok := f.(config.FileInfo) + if !ok { + b.Log.Errorf("Received a file with unexpected content: %#v", f) + continue + } + if fi.URL != "" { + msg.Text += " " + fi.URL + } + } + } + + // If we have native slack_attachments add them. + var attachs []slack.Attachment + for _, attach := range msg.Extra[sSlackAttachment] { + attachs = append(attachs, attach.([]slack.Attachment)...) + } + + iconURL := config.GetIconURL(&msg, b.GetString(iconURLConfig)) + matterMessage := matterhook.OMessage{ + IconURL: iconURL, + Attachments: attachs, + Channel: msg.Channel, + UserName: msg.Username, + Text: msg.Text, + } + if msg.Avatar != "" { + matterMessage.IconURL = msg.Avatar + } + if err := b.mh.Send(matterMessage); err != nil { + b.Log.Errorf("Failed to send message via webhook: %#v", err) + return err + } + return nil +} + +func (b *Bslack) sendRTM(msg config.Message) (string, error) { + // Handle channelmember messages. + if handled := b.handleGetChannelMembers(&msg); handled { + return "", nil + } + + channelInfo, err := b.channels.getChannel(msg.Channel) + if err != nil { + return "", fmt.Errorf("could not send message: %v", err) + } + if msg.Event == config.EventUserTyping { + if b.GetBool("ShowUserTyping") { + b.rtm.SendMessage(b.rtm.NewTypingMessage(channelInfo.ID)) + } + return "", nil + } + + var handled bool + + // Handle topic/purpose updates. + if handled, err = b.handleTopicOrPurpose(&msg, channelInfo); handled { + return "", err + } + + // Handle prefix hint for unthreaded messages. + if msg.ParentNotFound() { + msg.ParentID = "" + msg.Text = fmt.Sprintf("[thread]: %s", msg.Text) + } + + // Handle message deletions. + if handled, err = b.deleteMessage(&msg, channelInfo); handled { + return msg.ID, err + } + + // Prepend nickname if configured. + if b.GetBool(useNickPrefixConfig) { + msg.Text = msg.Username + msg.Text + } + + // Handle message edits. + if handled, err = b.editMessage(&msg, channelInfo); handled { + return msg.ID, err + } + + // Upload a file if it exists. + if len(msg.Extra) > 0 { + extraMsgs := helper.HandleExtra(&msg, b.General) + for i := range extraMsgs { + rmsg := &extraMsgs[i] + rmsg.Text = rmsg.Username + rmsg.Text + _, err = b.postMessage(rmsg, channelInfo) + if err != nil { + b.Log.Error(err) + } + } + // Upload files if necessary (from Slack, Telegram or Mattermost). + return b.uploadFile(&msg, channelInfo.ID) + } + + // Post message. + return b.postMessage(&msg, channelInfo) +} + +func (b *Bslack) updateTopicOrPurpose(msg *config.Message, channelInfo *slack.Channel) error { + var updateFunc func(channelID string, value string) (*slack.Channel, error) + + incomingChangeType, text := b.extractTopicOrPurpose(msg.Text) + switch incomingChangeType { + case "topic": + updateFunc = b.rtm.SetTopicOfConversation + case "purpose": + updateFunc = b.rtm.SetPurposeOfConversation + default: + b.Log.Errorf("Unhandled type received from extractTopicOrPurpose: %s", incomingChangeType) + return nil + } + for { + _, err := updateFunc(channelInfo.ID, text) + if err == nil { + return nil + } + if err = handleRateLimit(b.Log, err); err != nil { + return err + } + } +} + +// handles updating topic/purpose and determining whether to further propagate update messages. +func (b *Bslack) handleTopicOrPurpose(msg *config.Message, channelInfo *slack.Channel) (bool, error) { + if msg.Event != config.EventTopicChange { + return false, nil + } + + if b.GetBool("SyncTopic") { + return true, b.updateTopicOrPurpose(msg, channelInfo) + } + + // Pass along to normal message handlers. + if b.GetBool("ShowTopicChange") { + return false, nil + } + + // Swallow message as handled no-op. + return true, nil +} + +func (b *Bslack) deleteMessage(msg *config.Message, channelInfo *slack.Channel) (bool, error) { + if msg.Event != config.EventMsgDelete { + return false, nil + } + + // Some protocols echo deletes, but with an empty ID. + if msg.ID == "" { + return true, nil + } + + for { + _, _, err := b.rtm.DeleteMessage(channelInfo.ID, msg.ID) + if err == nil { + return true, nil + } + + if err = handleRateLimit(b.Log, err); err != nil { + b.Log.Errorf("Failed to delete user message from Slack: %#v", err) + return true, err + } + } +} + +func (b *Bslack) editMessage(msg *config.Message, channelInfo *slack.Channel) (bool, error) { + if msg.ID == "" { + return false, nil + } + messageOptions := b.prepareMessageOptions(msg) + for { + _, _, _, err := b.rtm.UpdateMessage(channelInfo.ID, msg.ID, messageOptions...) + if err == nil { + return true, nil + } + + if err = handleRateLimit(b.Log, err); err != nil { + b.Log.Errorf("Failed to edit user message on Slack: %#v", err) + return true, err + } + } +} + +func (b *Bslack) postMessage(msg *config.Message, channelInfo *slack.Channel) (string, error) { + // don't post empty messages + if msg.Text == "" { + return "", nil + } + messageOptions := b.prepareMessageOptions(msg) + for { + _, id, err := b.rtm.PostMessage(channelInfo.ID, messageOptions...) + if err == nil { + return id, nil + } + + if err = handleRateLimit(b.Log, err); err != nil { + b.Log.Errorf("Failed to sent user message to Slack: %#v", err) + return "", err + } + } +} + +// uploadFile handles native upload of files +func (b *Bslack) uploadFile(msg *config.Message, channelID string) (string, error) { + var messageID string + for _, f := range msg.Extra["file"] { + fi, ok := f.(config.FileInfo) + if !ok { + b.Log.Errorf("Received a file with unexpected content: %#v", f) + continue + } + if msg.Text == fi.Comment { + msg.Text = "" + } + // Because the result of the UploadFile is slower than the MessageEvent from slack + // we can't match on the file ID yet, so we have to match on the filename too. + ts := time.Now() + b.Log.Debugf("Adding file %s to cache at %s with timestamp", fi.Name, ts.String()) + b.cache.Add("filename"+fi.Name, ts) + initialComment := fmt.Sprintf("File from %s", msg.Username) + if fi.Comment != "" { + initialComment += fmt.Sprintf(" with comment: %s", fi.Comment) + } + res, err := b.sc.UploadFile(slack.FileUploadParameters{ + Reader: bytes.NewReader(*fi.Data), + Filename: fi.Name, + Channels: []string{channelID}, + InitialComment: initialComment, + ThreadTimestamp: msg.ParentID, + }) + if err != nil { + b.Log.Errorf("uploadfile %#v", err) + return "", err + } + if res.ID != "" { + b.Log.Debugf("Adding file ID %s to cache with timestamp %s", res.ID, ts.String()) + b.cache.Add("file"+res.ID, ts) + + // search for message id by uploaded file in private/public channels, get thread timestamp from uploaded file + if v, ok := res.Shares.Private[channelID]; ok && len(v) > 0 { + messageID = v[0].Ts + } + if v, ok := res.Shares.Public[channelID]; ok && len(v) > 0 { + messageID = v[0].Ts + } + } + } + return messageID, nil +} + +func (b *Bslack) prepareMessageOptions(msg *config.Message) []slack.MsgOption { + params := slack.NewPostMessageParameters() + if b.GetBool(useNickPrefixConfig) { + params.AsUser = true + } + params.Username = msg.Username + params.LinkNames = 1 // replace mentions + params.IconURL = config.GetIconURL(msg, b.GetString(iconURLConfig)) + params.ThreadTimestamp = msg.ParentID + if msg.Avatar != "" { + params.IconURL = msg.Avatar + } + + var attachments []slack.Attachment + // add file attachments + attachments = append(attachments, b.createAttach(msg.Extra)...) + // add slack attachments (from another slack bridge) + if msg.Extra != nil { + for _, attach := range msg.Extra[sSlackAttachment] { + attachments = append(attachments, attach.([]slack.Attachment)...) + } + } + + var opts []slack.MsgOption + opts = append(opts, + // provide regular text field (fallback used in Slack notifications, etc.) + slack.MsgOptionText(msg.Text, false), + + // add a callback ID so we can see we created it + slack.MsgOptionBlocks(slack.NewSectionBlock( + slack.NewTextBlockObject(slack.MarkdownType, msg.Text, false, false), + nil, nil, + slack.SectionBlockOptionBlockID("matterbridge_"+b.uuid), + )), + + slack.MsgOptionEnableLinkUnfurl(), + ) + opts = append(opts, slack.MsgOptionAttachments(attachments...)) + opts = append(opts, slack.MsgOptionPostMessageParameters(params)) + return opts +} + +func (b *Bslack) createAttach(extra map[string][]interface{}) []slack.Attachment { + var attachements []slack.Attachment + for _, v := range extra["attachments"] { + entry := v.(map[string]interface{}) + s := slack.Attachment{ + Fallback: extractStringField(entry, "fallback"), + Color: extractStringField(entry, "color"), + Pretext: extractStringField(entry, "pretext"), + AuthorName: extractStringField(entry, "author_name"), + AuthorLink: extractStringField(entry, "author_link"), + AuthorIcon: extractStringField(entry, "author_icon"), + Title: extractStringField(entry, "title"), + TitleLink: extractStringField(entry, "title_link"), + Text: extractStringField(entry, "text"), + ImageURL: extractStringField(entry, "image_url"), + ThumbURL: extractStringField(entry, "thumb_url"), + Footer: extractStringField(entry, "footer"), + FooterIcon: extractStringField(entry, "footer_icon"), + } + attachements = append(attachements, s) + } + return attachements +} + +func extractStringField(data map[string]interface{}, field string) string { + if rawValue, found := data[field]; found { + if value, ok := rawValue.(string); ok { + return value + } + } + return "" +} diff --git a/teleirc/matterbridge/bridge/slack/users_channels.go b/teleirc/matterbridge/bridge/slack/users_channels.go new file mode 100644 index 0000000..85b944b --- /dev/null +++ b/teleirc/matterbridge/bridge/slack/users_channels.go @@ -0,0 +1,343 @@ +package bslack + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/42wim/matterbridge/bridge/config" + "github.com/sirupsen/logrus" + "github.com/slack-go/slack" +) + +const minimumRefreshInterval = 10 * time.Second + +type users struct { + log *logrus.Entry + sc *slack.Client + + users map[string]*slack.User + usersMutex sync.RWMutex + usersSyncPoints map[string]chan struct{} + + refreshInProgress bool + earliestRefresh time.Time + refreshMutex sync.Mutex +} + +func newUserManager(log *logrus.Entry, sc *slack.Client) *users { + return &users{ + log: log, + sc: sc, + users: make(map[string]*slack.User), + usersSyncPoints: make(map[string]chan struct{}), + earliestRefresh: time.Now(), + } +} + +func (b *users) getUser(id string) *slack.User { + b.usersMutex.RLock() + user, ok := b.users[id] + b.usersMutex.RUnlock() + if ok { + return user + } + b.populateUser(id) + b.usersMutex.RLock() + defer b.usersMutex.RUnlock() + + return b.users[id] +} + +func (b *users) getUsername(id string) string { + if user := b.getUser(id); user != nil { + if user.Profile.DisplayName != "" { + return user.Profile.DisplayName + } + return user.Name + } + b.log.Warnf("Could not find user with ID '%s'", id) + return "" +} + +func (b *users) getAvatar(id string) string { + if user := b.getUser(id); user != nil { + return user.Profile.Image48 + } + return "" +} + +func (b *users) populateUser(userID string) { + for { + b.usersMutex.Lock() + _, exists := b.users[userID] + if exists { + // already in cache + b.usersMutex.Unlock() + return + } + + if syncPoint, ok := b.usersSyncPoints[userID]; ok { + // Another goroutine is already populating this user for us so wait on it to finish. + b.usersMutex.Unlock() + <-syncPoint + // We do not return and iterate again to check that the entry does indeed exist + // in case the previous query failed for some reason. + } else { + b.usersSyncPoints[userID] = make(chan struct{}) + defer func() { + // Wake up any waiting goroutines and remove the synchronization point. + close(b.usersSyncPoints[userID]) + delete(b.usersSyncPoints, userID) + }() + break + } + } + + // Do not hold the lock while fetching information from Slack + // as this might take an unbounded amount of time. + b.usersMutex.Unlock() + + user, err := b.sc.GetUserInfo(userID) + if err != nil { + b.log.Debugf("GetUserInfo failed for %v: %v", userID, err) + return + } + + b.usersMutex.Lock() + defer b.usersMutex.Unlock() + + // Register user information. + b.users[userID] = user +} + +func (b *users) invalidateUser(userID string) { + b.usersMutex.Lock() + defer b.usersMutex.Unlock() + delete(b.users, userID) +} + +func (b *users) populateUsers(wait bool) { + b.refreshMutex.Lock() + if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) { + b.log.Debugf("Not refreshing user list as it was done less than %v ago.", minimumRefreshInterval) + b.refreshMutex.Unlock() + + return + } + for b.refreshInProgress { + b.refreshMutex.Unlock() + time.Sleep(time.Second) + b.refreshMutex.Lock() + } + b.refreshInProgress = true + b.refreshMutex.Unlock() + + newUsers := map[string]*slack.User{} + pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200)) + count := 0 + for { + var err error + pagination, err = pagination.Next(context.Background()) + time.Sleep(time.Second) + if err != nil { + if pagination.Done(err) { + break + } + + if err = handleRateLimit(b.log, err); err != nil { + b.log.Errorf("Could not retrieve users: %#v", err) + return + } + continue + } + + for i := range pagination.Users { + newUsers[pagination.Users[i].ID] = &pagination.Users[i] + } + b.log.Debugf("getting %d users", len(pagination.Users)) + count++ + // more > 2000 users, slack will complain and ratelimit. break + if count > 10 { + b.log.Info("Large slack detected > 2000 users, skipping loading complete userlist.") + break + } + } + + b.usersMutex.Lock() + defer b.usersMutex.Unlock() + b.users = newUsers + + b.refreshMutex.Lock() + defer b.refreshMutex.Unlock() + b.earliestRefresh = time.Now().Add(minimumRefreshInterval) + b.refreshInProgress = false +} + +type channels struct { + log *logrus.Entry + sc *slack.Client + + channelsByID map[string]*slack.Channel + channelsByName map[string]*slack.Channel + channelsMutex sync.RWMutex + + channelMembers map[string][]string + channelMembersMutex sync.RWMutex + + refreshInProgress bool + earliestRefresh time.Time + refreshMutex sync.Mutex +} + +func newChannelManager(log *logrus.Entry, sc *slack.Client) *channels { + return &channels{ + log: log, + sc: sc, + channelsByID: make(map[string]*slack.Channel), + channelsByName: make(map[string]*slack.Channel), + earliestRefresh: time.Now(), + } +} + +func (b *channels) getChannel(channel string) (*slack.Channel, error) { + if strings.HasPrefix(channel, "ID:") { + return b.getChannelByID(strings.TrimPrefix(channel, "ID:")) + } + return b.getChannelByName(channel) +} + +func (b *channels) getChannelByName(name string) (*slack.Channel, error) { + return b.getChannelBy(name, b.channelsByName) +} + +func (b *channels) getChannelByID(id string) (*slack.Channel, error) { + return b.getChannelBy(id, b.channelsByID) +} + +func (b *channels) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) { + b.channelsMutex.RLock() + defer b.channelsMutex.RUnlock() + + if channel, ok := lookupMap[lookupKey]; ok { + return channel, nil + } + return nil, fmt.Errorf("channel %s not found", lookupKey) +} + +func (b *channels) getChannelMembers(users *users) config.ChannelMembers { + b.channelMembersMutex.RLock() + defer b.channelMembersMutex.RUnlock() + + membersInfo := config.ChannelMembers{} + for channelID, members := range b.channelMembers { + for _, member := range members { + channelName := "" + userName := "" + userNick := "" + user := users.getUser(member) + if user != nil { + userName = user.Name + userNick = user.Profile.DisplayName + } + channel, _ := b.getChannelByID(channelID) + if channel != nil { + channelName = channel.Name + } + memberInfo := config.ChannelMember{ + Username: userName, + Nick: userNick, + UserID: member, + ChannelID: channelID, + ChannelName: channelName, + } + membersInfo = append(membersInfo, memberInfo) + } + } + return membersInfo +} + +func (b *channels) registerChannel(channel slack.Channel) { + b.channelsMutex.Lock() + defer b.channelsMutex.Unlock() + + b.channelsByID[channel.ID] = &channel + b.channelsByName[channel.Name] = &channel +} + +func (b *channels) populateChannels(wait bool) { + b.refreshMutex.Lock() + if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) { + b.log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.", minimumRefreshInterval) + b.refreshMutex.Unlock() + return + } + for b.refreshInProgress { + b.refreshMutex.Unlock() + time.Sleep(time.Second) + b.refreshMutex.Lock() + } + b.refreshInProgress = true + b.refreshMutex.Unlock() + + newChannelsByID := map[string]*slack.Channel{} + newChannelsByName := map[string]*slack.Channel{} + newChannelMembers := make(map[string][]string) + + // We only retrieve public and private channels, not IMs + // and MPIMs as those do not have a channel name. + queryParams := &slack.GetConversationsParameters{ + ExcludeArchived: true, + Types: []string{"public_channel,private_channel"}, + Limit: 1000, + } + for { + channels, nextCursor, err := b.sc.GetConversations(queryParams) + if err != nil { + if err = handleRateLimit(b.log, err); err != nil { + b.log.Errorf("Could not retrieve channels: %#v", err) + return + } + continue + } + + for i := range channels { + newChannelsByID[channels[i].ID] = &channels[i] + newChannelsByName[channels[i].Name] = &channels[i] + // also find all the members in every channel + // comment for now, issues on big slacks + /* + members, err := b.getUsersInConversation(channels[i].ID) + if err != nil { + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Could not retrieve channel members: %#v", err) + return + } + continue + } + newChannelMembers[channels[i].ID] = members + */ + } + + if nextCursor == "" { + break + } + queryParams.Cursor = nextCursor + } + + b.channelsMutex.Lock() + defer b.channelsMutex.Unlock() + b.channelsByID = newChannelsByID + b.channelsByName = newChannelsByName + + b.channelMembersMutex.Lock() + defer b.channelMembersMutex.Unlock() + b.channelMembers = newChannelMembers + + b.refreshMutex.Lock() + defer b.refreshMutex.Unlock() + b.earliestRefresh = time.Now().Add(minimumRefreshInterval) + b.refreshInProgress = false +} |
