summaryrefslogtreecommitdiff
path: root/teleirc/matterbridge/bridge/slack/users_channels.go
diff options
context:
space:
mode:
Diffstat (limited to 'teleirc/matterbridge/bridge/slack/users_channels.go')
-rw-r--r--teleirc/matterbridge/bridge/slack/users_channels.go343
1 files changed, 343 insertions, 0 deletions
diff --git a/teleirc/matterbridge/bridge/slack/users_channels.go b/teleirc/matterbridge/bridge/slack/users_channels.go
new file mode 100644
index 0000000..85b944b
--- /dev/null
+++ b/teleirc/matterbridge/bridge/slack/users_channels.go
@@ -0,0 +1,343 @@
+package bslack
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/42wim/matterbridge/bridge/config"
+ "github.com/sirupsen/logrus"
+ "github.com/slack-go/slack"
+)
+
+const minimumRefreshInterval = 10 * time.Second
+
+type users struct {
+ log *logrus.Entry
+ sc *slack.Client
+
+ users map[string]*slack.User
+ usersMutex sync.RWMutex
+ usersSyncPoints map[string]chan struct{}
+
+ refreshInProgress bool
+ earliestRefresh time.Time
+ refreshMutex sync.Mutex
+}
+
+func newUserManager(log *logrus.Entry, sc *slack.Client) *users {
+ return &users{
+ log: log,
+ sc: sc,
+ users: make(map[string]*slack.User),
+ usersSyncPoints: make(map[string]chan struct{}),
+ earliestRefresh: time.Now(),
+ }
+}
+
+func (b *users) getUser(id string) *slack.User {
+ b.usersMutex.RLock()
+ user, ok := b.users[id]
+ b.usersMutex.RUnlock()
+ if ok {
+ return user
+ }
+ b.populateUser(id)
+ b.usersMutex.RLock()
+ defer b.usersMutex.RUnlock()
+
+ return b.users[id]
+}
+
+func (b *users) getUsername(id string) string {
+ if user := b.getUser(id); user != nil {
+ if user.Profile.DisplayName != "" {
+ return user.Profile.DisplayName
+ }
+ return user.Name
+ }
+ b.log.Warnf("Could not find user with ID '%s'", id)
+ return ""
+}
+
+func (b *users) getAvatar(id string) string {
+ if user := b.getUser(id); user != nil {
+ return user.Profile.Image48
+ }
+ return ""
+}
+
+func (b *users) populateUser(userID string) {
+ for {
+ b.usersMutex.Lock()
+ _, exists := b.users[userID]
+ if exists {
+ // already in cache
+ b.usersMutex.Unlock()
+ return
+ }
+
+ if syncPoint, ok := b.usersSyncPoints[userID]; ok {
+ // Another goroutine is already populating this user for us so wait on it to finish.
+ b.usersMutex.Unlock()
+ <-syncPoint
+ // We do not return and iterate again to check that the entry does indeed exist
+ // in case the previous query failed for some reason.
+ } else {
+ b.usersSyncPoints[userID] = make(chan struct{})
+ defer func() {
+ // Wake up any waiting goroutines and remove the synchronization point.
+ close(b.usersSyncPoints[userID])
+ delete(b.usersSyncPoints, userID)
+ }()
+ break
+ }
+ }
+
+ // Do not hold the lock while fetching information from Slack
+ // as this might take an unbounded amount of time.
+ b.usersMutex.Unlock()
+
+ user, err := b.sc.GetUserInfo(userID)
+ if err != nil {
+ b.log.Debugf("GetUserInfo failed for %v: %v", userID, err)
+ return
+ }
+
+ b.usersMutex.Lock()
+ defer b.usersMutex.Unlock()
+
+ // Register user information.
+ b.users[userID] = user
+}
+
+func (b *users) invalidateUser(userID string) {
+ b.usersMutex.Lock()
+ defer b.usersMutex.Unlock()
+ delete(b.users, userID)
+}
+
+func (b *users) populateUsers(wait bool) {
+ b.refreshMutex.Lock()
+ if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) {
+ b.log.Debugf("Not refreshing user list as it was done less than %v ago.", minimumRefreshInterval)
+ b.refreshMutex.Unlock()
+
+ return
+ }
+ for b.refreshInProgress {
+ b.refreshMutex.Unlock()
+ time.Sleep(time.Second)
+ b.refreshMutex.Lock()
+ }
+ b.refreshInProgress = true
+ b.refreshMutex.Unlock()
+
+ newUsers := map[string]*slack.User{}
+ pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200))
+ count := 0
+ for {
+ var err error
+ pagination, err = pagination.Next(context.Background())
+ time.Sleep(time.Second)
+ if err != nil {
+ if pagination.Done(err) {
+ break
+ }
+
+ if err = handleRateLimit(b.log, err); err != nil {
+ b.log.Errorf("Could not retrieve users: %#v", err)
+ return
+ }
+ continue
+ }
+
+ for i := range pagination.Users {
+ newUsers[pagination.Users[i].ID] = &pagination.Users[i]
+ }
+ b.log.Debugf("getting %d users", len(pagination.Users))
+ count++
+ // more > 2000 users, slack will complain and ratelimit. break
+ if count > 10 {
+ b.log.Info("Large slack detected > 2000 users, skipping loading complete userlist.")
+ break
+ }
+ }
+
+ b.usersMutex.Lock()
+ defer b.usersMutex.Unlock()
+ b.users = newUsers
+
+ b.refreshMutex.Lock()
+ defer b.refreshMutex.Unlock()
+ b.earliestRefresh = time.Now().Add(minimumRefreshInterval)
+ b.refreshInProgress = false
+}
+
+type channels struct {
+ log *logrus.Entry
+ sc *slack.Client
+
+ channelsByID map[string]*slack.Channel
+ channelsByName map[string]*slack.Channel
+ channelsMutex sync.RWMutex
+
+ channelMembers map[string][]string
+ channelMembersMutex sync.RWMutex
+
+ refreshInProgress bool
+ earliestRefresh time.Time
+ refreshMutex sync.Mutex
+}
+
+func newChannelManager(log *logrus.Entry, sc *slack.Client) *channels {
+ return &channels{
+ log: log,
+ sc: sc,
+ channelsByID: make(map[string]*slack.Channel),
+ channelsByName: make(map[string]*slack.Channel),
+ earliestRefresh: time.Now(),
+ }
+}
+
+func (b *channels) getChannel(channel string) (*slack.Channel, error) {
+ if strings.HasPrefix(channel, "ID:") {
+ return b.getChannelByID(strings.TrimPrefix(channel, "ID:"))
+ }
+ return b.getChannelByName(channel)
+}
+
+func (b *channels) getChannelByName(name string) (*slack.Channel, error) {
+ return b.getChannelBy(name, b.channelsByName)
+}
+
+func (b *channels) getChannelByID(id string) (*slack.Channel, error) {
+ return b.getChannelBy(id, b.channelsByID)
+}
+
+func (b *channels) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) {
+ b.channelsMutex.RLock()
+ defer b.channelsMutex.RUnlock()
+
+ if channel, ok := lookupMap[lookupKey]; ok {
+ return channel, nil
+ }
+ return nil, fmt.Errorf("channel %s not found", lookupKey)
+}
+
+func (b *channels) getChannelMembers(users *users) config.ChannelMembers {
+ b.channelMembersMutex.RLock()
+ defer b.channelMembersMutex.RUnlock()
+
+ membersInfo := config.ChannelMembers{}
+ for channelID, members := range b.channelMembers {
+ for _, member := range members {
+ channelName := ""
+ userName := ""
+ userNick := ""
+ user := users.getUser(member)
+ if user != nil {
+ userName = user.Name
+ userNick = user.Profile.DisplayName
+ }
+ channel, _ := b.getChannelByID(channelID)
+ if channel != nil {
+ channelName = channel.Name
+ }
+ memberInfo := config.ChannelMember{
+ Username: userName,
+ Nick: userNick,
+ UserID: member,
+ ChannelID: channelID,
+ ChannelName: channelName,
+ }
+ membersInfo = append(membersInfo, memberInfo)
+ }
+ }
+ return membersInfo
+}
+
+func (b *channels) registerChannel(channel slack.Channel) {
+ b.channelsMutex.Lock()
+ defer b.channelsMutex.Unlock()
+
+ b.channelsByID[channel.ID] = &channel
+ b.channelsByName[channel.Name] = &channel
+}
+
+func (b *channels) populateChannels(wait bool) {
+ b.refreshMutex.Lock()
+ if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) {
+ b.log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.", minimumRefreshInterval)
+ b.refreshMutex.Unlock()
+ return
+ }
+ for b.refreshInProgress {
+ b.refreshMutex.Unlock()
+ time.Sleep(time.Second)
+ b.refreshMutex.Lock()
+ }
+ b.refreshInProgress = true
+ b.refreshMutex.Unlock()
+
+ newChannelsByID := map[string]*slack.Channel{}
+ newChannelsByName := map[string]*slack.Channel{}
+ newChannelMembers := make(map[string][]string)
+
+ // We only retrieve public and private channels, not IMs
+ // and MPIMs as those do not have a channel name.
+ queryParams := &slack.GetConversationsParameters{
+ ExcludeArchived: true,
+ Types: []string{"public_channel,private_channel"},
+ Limit: 1000,
+ }
+ for {
+ channels, nextCursor, err := b.sc.GetConversations(queryParams)
+ if err != nil {
+ if err = handleRateLimit(b.log, err); err != nil {
+ b.log.Errorf("Could not retrieve channels: %#v", err)
+ return
+ }
+ continue
+ }
+
+ for i := range channels {
+ newChannelsByID[channels[i].ID] = &channels[i]
+ newChannelsByName[channels[i].Name] = &channels[i]
+ // also find all the members in every channel
+ // comment for now, issues on big slacks
+ /*
+ members, err := b.getUsersInConversation(channels[i].ID)
+ if err != nil {
+ if err = b.handleRateLimit(err); err != nil {
+ b.Log.Errorf("Could not retrieve channel members: %#v", err)
+ return
+ }
+ continue
+ }
+ newChannelMembers[channels[i].ID] = members
+ */
+ }
+
+ if nextCursor == "" {
+ break
+ }
+ queryParams.Cursor = nextCursor
+ }
+
+ b.channelsMutex.Lock()
+ defer b.channelsMutex.Unlock()
+ b.channelsByID = newChannelsByID
+ b.channelsByName = newChannelsByName
+
+ b.channelMembersMutex.Lock()
+ defer b.channelMembersMutex.Unlock()
+ b.channelMembers = newChannelMembers
+
+ b.refreshMutex.Lock()
+ defer b.refreshMutex.Unlock()
+ b.earliestRefresh = time.Now().Add(minimumRefreshInterval)
+ b.refreshInProgress = false
+}