summaryrefslogtreecommitdiff
path: root/teleirc/matterbridge/vendor/github.com/gopackage
diff options
context:
space:
mode:
authorMistivia <i@mistivia.com>2025-11-02 15:27:18 +0800
committerMistivia <i@mistivia.com>2025-11-02 15:27:18 +0800
commite9c24f4af7ed56760f6db7941827d09f6db9020b (patch)
tree62128c43b883ce5e3148113350978755779bb5de /teleirc/matterbridge/vendor/github.com/gopackage
parent58d5e7cfda4781d8a57ec52aefd02983835c301a (diff)
add matterbridge
Diffstat (limited to 'teleirc/matterbridge/vendor/github.com/gopackage')
-rw-r--r--teleirc/matterbridge/vendor/github.com/gopackage/ddp/.gitignore28
-rw-r--r--teleirc/matterbridge/vendor/github.com/gopackage/ddp/LICENSE13
-rw-r--r--teleirc/matterbridge/vendor/github.com/gopackage/ddp/README.md5
-rw-r--r--teleirc/matterbridge/vendor/github.com/gopackage/ddp/client.go634
-rw-r--r--teleirc/matterbridge/vendor/github.com/gopackage/ddp/collection.go245
-rw-r--r--teleirc/matterbridge/vendor/github.com/gopackage/ddp/doc.go6
-rw-r--r--teleirc/matterbridge/vendor/github.com/gopackage/ddp/messages.go128
-rw-r--r--teleirc/matterbridge/vendor/github.com/gopackage/ddp/stats.go170
-rw-r--r--teleirc/matterbridge/vendor/github.com/gopackage/ddp/time.go49
-rw-r--r--teleirc/matterbridge/vendor/github.com/gopackage/ddp/utils.go77
10 files changed, 1355 insertions, 0 deletions
diff --git a/teleirc/matterbridge/vendor/github.com/gopackage/ddp/.gitignore b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/.gitignore
new file mode 100644
index 0000000..e71cbee
--- /dev/null
+++ b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/.gitignore
@@ -0,0 +1,28 @@
+# Compiled Object files, Static and Dynamic libs (Shared Objects)
+*.o
+*.a
+*.so
+
+# Folders
+_obj
+_test
+
+# Architecture specific extensions/prefixes
+*.[568vq]
+[568vq].out
+
+*.cgo1.go
+*.cgo2.c
+_cgo_defun.c
+_cgo_gotypes.go
+_cgo_export.*
+
+_testmain.go
+
+*.exe
+*.test
+*.prof
+
+# Editors
+.idea/
+.vscode/
diff --git a/teleirc/matterbridge/vendor/github.com/gopackage/ddp/LICENSE b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/LICENSE
new file mode 100644
index 0000000..98a0fe7
--- /dev/null
+++ b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/LICENSE
@@ -0,0 +1,13 @@
+Copyright (c) 2021, Metamech LLC.
+
+Permission to use, copy, modify, and/or distribute this software for any
+purpose with or without fee is hereby granted, provided that the above
+copyright notice and this permission notice appear in all copies.
+
+THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
diff --git a/teleirc/matterbridge/vendor/github.com/gopackage/ddp/README.md b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/README.md
new file mode 100644
index 0000000..7ae713b
--- /dev/null
+++ b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/README.md
@@ -0,0 +1,5 @@
+# DDP in Go
+
+[Meteor](https://meteor.com) DDP library for Go. This library allows Go applications to connect to Meteor applications, subscribe to Meteor publications, read from a cached Collection (similar to minimongo), and call Meteor methods on the server.
+
+See `ddp/_examples` for some tips and an example app that walks through all the features of the library. \ No newline at end of file
diff --git a/teleirc/matterbridge/vendor/github.com/gopackage/ddp/client.go b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/client.go
new file mode 100644
index 0000000..205dc43
--- /dev/null
+++ b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/client.go
@@ -0,0 +1,634 @@
+package ddp
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/apex/log"
+ "golang.org/x/net/websocket"
+)
+
+const (
+ DISCONNECTED = iota
+ DIALING
+ CONNECTING
+ CONNECTED
+)
+
+type ConnectionListener interface {
+ Connected()
+}
+
+type ConnectionNotifier interface {
+ AddConnectionListener(listener ConnectionListener)
+}
+
+type StatusListener interface {
+ Status(status int)
+}
+
+type StatusNotifier interface {
+ AddStatusListener(listener StatusListener)
+}
+
+// Client represents a DDP client connection. The DDP client establish a DDP
+// session and acts as a message pump for other tools.
+type Client struct {
+ // HeartbeatInterval is the time between heartbeats to send
+ HeartbeatInterval time.Duration
+ // HeartbeatTimeout is the time for a heartbeat ping to timeout
+ HeartbeatTimeout time.Duration
+ // ReconnectInterval is the time between reconnections on bad connections
+ ReconnectInterval time.Duration
+
+ // writeSocketStats controls statistics gathering for current websocket writes.
+ writeSocketStats *WriterStats
+ // writeStats controls statistics gathering for overall client writes.
+ writeStats *WriterStats
+ // readSocketStats controls statistics gathering for current websocket reads.
+ readSocketStats *ReaderStats
+ // readStats controls statistics gathering for overall client reads.
+ readStats *ReaderStats
+ // reconnects in the number of reconnections the client has made
+ reconnects int64
+ // pingsIn is the number of pings received from the server
+ pingsIn int64
+ // pingsOut is te number of pings sent by the client
+ pingsOut int64
+
+ // session contains the DDP session token (can be used for reconnects and debugging).
+ session string
+ // version contains the negotiated DDP protocol version in use.
+ version string
+ // serverID the cluster node ID for the server we connected to
+ serverID string
+ // ws is the underlying websocket being used.
+ ws *websocket.Conn
+ // encoder is a JSON encoder to send outgoing packets to the websocket.
+ encoder *json.Encoder
+ // url the websocket is connected to
+ url string
+ // origin is the origin for the websocket connection
+ origin string
+ // inbox is an incoming message channel
+ inbox chan map[string]interface{}
+ // errors is an incoming errors channel
+ errors chan error
+ // pingTimer is a timer for sending regular pings to the server
+ pingTimer *time.Timer
+ // pings tracks inflight pings based on each ping ID.
+ pings map[string][]*PingTracker
+ // calls tracks method invocations that are still in flight
+ calls map[string]*Call
+ // subs tracks active subscriptions. Map contains name->args
+ subs map[string]*Call
+ // collections contains all the collections currently subscribed
+ collections map[string]Collection
+ // connectionStatus is the current connection status of the client
+ connectionStatus int
+ // reconnectTimer is the timer tracking reconnections
+ reconnectTimer *time.Timer
+ // reconnectLock protects access to reconnection
+ reconnectLock *sync.Mutex
+
+ // statusListeners will be informed when the connection status of the client changes
+ statusListeners []StatusListener
+ // connectionListeners will be informed when a connection to the server is established
+ connectionListeners []ConnectionListener
+
+ // KeyManager tracks IDs for ddp messages
+ KeyManager
+}
+
+// NewClient creates a default client (using an internal websocket) to the
+// provided URL using the origin for the connection. The client will
+// automatically connect, upgrade to a websocket, and establish a DDP
+// connection session before returning the client. The client will
+// automatically and internally handle heartbeats and reconnects.
+//
+// TBD create an option to use an external websocket (aka htt.Transport)
+// TBD create an option to substitute heartbeat and reconnect behavior (aka http.Transport)
+// TBD create an option to hijack the connection (aka http.Hijacker)
+// TBD create profiling features (aka net/http/pprof)
+func NewClient(url, origin string) *Client {
+ c := &Client{
+ HeartbeatInterval: time.Minute, // Meteor impl default + 10 (we ping last)
+ HeartbeatTimeout: 15 * time.Second, // Meteor impl default
+ ReconnectInterval: 5 * time.Second,
+ collections: map[string]Collection{},
+ url: url,
+ origin: origin,
+ inbox: make(chan map[string]interface{}, 100),
+ errors: make(chan error, 100),
+ pings: map[string][]*PingTracker{},
+ calls: map[string]*Call{},
+ subs: map[string]*Call{},
+ connectionStatus: DISCONNECTED,
+ reconnectLock: &sync.Mutex{},
+
+ // Stats
+ writeSocketStats: NewWriterStats(nil),
+ writeStats: NewWriterStats(nil),
+ readSocketStats: NewReaderStats(nil),
+ readStats: NewReaderStats(nil),
+
+ KeyManager: *NewKeyManager(),
+ }
+ c.encoder = json.NewEncoder(c.writeStats)
+
+ // We spin off an inbox processing goroutine
+ go c.inboxManager()
+
+ return c
+}
+
+// Session returns the negotiated session token for the connection.
+func (c *Client) Session() string {
+ return c.session
+}
+
+// Version returns the negotiated protocol version in use by the client.
+func (c *Client) Version() string {
+ return c.version
+}
+
+// AddStatusListener in order to receive status change updates.
+func (c *Client) AddStatusListener(listener StatusListener) {
+ c.statusListeners = append(c.statusListeners, listener)
+}
+
+// AddConnectionListener in order to receive connection updates.
+func (c *Client) AddConnectionListener(listener ConnectionListener) {
+ c.connectionListeners = append(c.connectionListeners, listener)
+}
+
+// status updates all status listeners with the new client status.
+func (c *Client) status(status int) {
+ if c.connectionStatus == status {
+ return
+ }
+ c.connectionStatus = status
+ for _, listener := range c.statusListeners {
+ listener.Status(status)
+ }
+}
+
+// Connect attempts to connect the client to the server.
+func (c *Client) Connect() error {
+ c.status(DIALING)
+ ws, err := websocket.Dial(c.url, "", c.origin)
+ if err != nil {
+ c.Close()
+ log.WithError(err).Debug("dial error")
+ c.reconnectLater()
+ return err
+ }
+ log.Debug("dialed")
+ // Start DDP connection
+ c.start(ws, NewConnect())
+ return nil
+}
+
+// Reconnect attempts to reconnect the client to the server on the existing
+// DDP session.
+//
+// TODO needs a reconnect backoff so we don't trash a down server
+// TODO reconnect should not allow more reconnects while a reconnection is already in progress.
+func (c *Client) Reconnect() {
+ func() {
+ c.reconnectLock.Lock()
+ defer c.reconnectLock.Unlock()
+ if c.reconnectTimer != nil {
+ c.reconnectTimer.Stop()
+ c.reconnectTimer = nil
+ }
+ }()
+
+ c.Close()
+
+ c.reconnects++
+
+ // Reconnect
+ c.status(DIALING)
+ ws, err := websocket.Dial(c.url, "", c.origin)
+ if err != nil {
+ c.Close()
+ log.WithError(err).Debug("Dial error")
+ c.reconnectLater()
+ return
+ }
+
+ c.start(ws, NewReconnect(c.session))
+
+ // --------------------------------------------------------------------
+ // We resume inflight or ongoing subscriptions - we don't have to wait
+ // for connection confirmation (messages can be pipelined).
+ // --------------------------------------------------------------------
+
+ // Send calls that haven't been confirmed - may not have been sent
+ // and effects should be idempotent
+ for _, call := range c.calls {
+ IgnoreErr(c.Send(NewMethod(call.ID, call.ServiceMethod, call.Args.([]interface{}))), "resend method")
+ }
+
+ // Resend subscriptions and patch up collections
+ for _, sub := range c.subs {
+ IgnoreErr(c.Send(NewSub(sub.ID, sub.ServiceMethod, sub.Args.([]interface{}))), "resend sub")
+ }
+}
+
+// Subscribe to data updates.
+func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) *Call {
+
+ if args == nil {
+ args = []interface{}{}
+ }
+ call := new(Call)
+ call.ID = c.Next()
+ call.ServiceMethod = subName
+ call.Args = args
+ call.Owner = c
+
+ if done == nil {
+ done = make(chan *Call, 10) // buffered.
+ } else {
+ // If caller passes done != nil, it must arrange that
+ // done has enough buffer for the number of simultaneous
+ // RPCs that will be using that channel. If the channel
+ // is totally unbuffered, it's best not to run at all.
+ if cap(done) == 0 {
+ log.Fatal("ddp.rpc: done channel is unbuffered")
+ }
+ }
+ call.Done = done
+ c.subs[call.ID] = call
+
+ // Save this subscription to the client so we can reconnect
+ subArgs := make([]interface{}, len(args))
+ copy(subArgs, args)
+
+ IgnoreErr(c.Send(NewSub(call.ID, subName, args)), "send sub")
+
+ return call
+}
+
+// Sub sends a synchronous subscription request to the server.
+func (c *Client) Sub(subName string, args ...interface{}) error {
+ call := <-c.Subscribe(subName, make(chan *Call, 1), args...).Done
+ return call.Error
+}
+
+// Go invokes the function asynchronously. It returns the Call structure representing
+// the invocation. The done channel will signal when the call is complete by returning
+// the same Call object. If done is nil, Go will allocate a new channel.
+// If non-nil, done must be buffered or Go will deliberately crash.
+//
+// Go and Call are modeled after the standard `net/rpc` package versions.
+func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{}) *Call {
+
+ if args == nil {
+ args = []interface{}{}
+ }
+ call := new(Call)
+ call.ID = c.Next()
+ call.ServiceMethod = serviceMethod
+ call.Args = args
+ call.Owner = c
+ if done == nil {
+ done = make(chan *Call, 10) // buffered.
+ } else {
+ // If caller passes done != nil, it must arrange that
+ // done has enough buffer for the number of simultaneous
+ // RPCs that will be using that channel. If the channel
+ // is totally unbuffered, it's best not to run at all.
+ if cap(done) == 0 {
+ log.Fatal("ddp.rpc: done channel is unbuffered")
+ }
+ }
+ call.Done = done
+ c.calls[call.ID] = call
+
+ IgnoreErr(c.Send(NewMethod(call.ID, serviceMethod, args)), "send method")
+
+ return call
+}
+
+// Call invokes the named function, waits for it to complete, and returns its error status.
+func (c *Client) Call(serviceMethod string, args ...interface{}) (interface{}, error) {
+ call := <-c.Go(serviceMethod, make(chan *Call, 1), args...).Done
+ return call.Reply, call.Error
+}
+
+// Ping sends a heartbeat signal to the server. The Ping doesn't look for
+// a response but may trigger the connection to reconnect if the ping times out.
+// This is primarily useful for reviving an unresponsive Client connection.
+func (c *Client) Ping() {
+ c.PingPong(c.Next(), c.HeartbeatTimeout, func(err error) {
+ if err != nil {
+ // Is there anything else we should or can do?
+ c.reconnectLater()
+ }
+ })
+}
+
+// PingPong sends a heartbeat signal to the server and calls the provided
+// function when a pong is received. An optional id can be sent to help
+// track the responses - or an empty string can be used. It is the
+// responsibility of the caller to respond to any errors that may occur.
+func (c *Client) PingPong(id string, timeout time.Duration, handler func(error)) {
+ err := c.Send(NewPing(id))
+ if err != nil {
+ handler(err)
+ return
+ }
+ c.pingsOut++
+ pings, ok := c.pings[id]
+ if !ok {
+ pings = make([]*PingTracker, 0, 5)
+ }
+ tracker := &PingTracker{handler: handler, timeout: timeout, timer: time.AfterFunc(timeout, func() {
+ handler(fmt.Errorf("ping timeout"))
+ })}
+ c.pings[id] = append(pings, tracker)
+}
+
+// Send transmits messages to the server. The msg parameter must be json
+// encoder compatible.
+func (c *Client) Send(msg interface{}) error {
+ return c.encoder.Encode(msg)
+}
+
+// Close implements the io.Closer interface.
+func (c *Client) Close() {
+ // Shutdown out all outstanding pings
+ if c.pingTimer != nil {
+ c.pingTimer.Stop()
+ c.pingTimer = nil
+ }
+
+ // Close websocket
+ if c.ws != nil {
+ IgnoreErr(c.ws.Close(), "close ws")
+ c.ws = nil
+ }
+ for _, collection := range c.collections {
+ collection.reset()
+ }
+ c.status(DISCONNECTED)
+}
+
+// ResetStats resets the statistics for the client.
+func (c *Client) ResetStats() {
+ c.readSocketStats.Reset()
+ c.readStats.Reset()
+ c.writeSocketStats.Reset()
+ c.writeStats.Reset()
+ c.reconnects = 0
+ c.pingsIn = 0
+ c.pingsOut = 0
+}
+
+// Stats returns the read and write statistics of the client.
+func (c *Client) Stats() *ClientStats {
+ return &ClientStats{
+ Reads: c.readSocketStats.Snapshot(),
+ TotalReads: c.readStats.Snapshot(),
+ Writes: c.writeSocketStats.Snapshot(),
+ TotalWrites: c.writeStats.Snapshot(),
+ Reconnects: c.reconnects,
+ PingsSent: c.pingsOut,
+ PingsRecv: c.pingsIn,
+ }
+}
+
+// CollectionByName retrieves a collection by its name.
+func (c *Client) CollectionByName(name string) Collection {
+ collection, ok := c.collections[name]
+ if !ok {
+ collection = NewCollection(name)
+ c.collections[name] = collection
+ }
+ return collection
+}
+
+// CollectionStats returns a snapshot of statistics for the currently known collections.
+func (c *Client) CollectionStats() []CollectionStats {
+ stats := make([]CollectionStats, 0, len(c.collections))
+ for name, collection := range c.collections {
+ stats = append(stats, CollectionStats{Name: name, Count: len(collection.FindAll())})
+ }
+ return stats
+}
+
+// start a new client connection on the provided websocket
+func (c *Client) start(ws *websocket.Conn, connect *Connect) {
+
+ c.status(CONNECTING)
+
+ c.ws = ws
+ c.writeSocketStats = NewWriterStats(c.ws)
+ c.writeStats.Writer = c.writeSocketStats
+ c.readSocketStats = NewReaderStats(c.ws)
+ c.readStats.Reader = c.readSocketStats
+
+ // We spin off an inbox stuffing goroutine
+ go c.inboxWorker(c.readStats)
+
+ IgnoreErr(c.Send(connect), "send connect")
+}
+
+// inboxManager pulls messages from the inbox and routes them to appropriate
+// handlers.
+func (c *Client) inboxManager() {
+ for {
+ select {
+ case msg := <-c.inbox:
+ // Message!
+ //log.Println("Got message", msg)
+ msgType, ok := msg["msg"]
+ if ok {
+ log.WithField("msg", msgType).Debug("recv")
+ switch msgType.(string) {
+ // Connection management
+ case "connected":
+ c.status(CONNECTED)
+ for _, collection := range c.collections {
+ collection.init()
+ }
+ c.version = "1" // "1" is the only version we support
+ c.session = msg["session"].(string)
+ // Start automatic heartbeats
+ c.pingTimer = time.AfterFunc(c.HeartbeatInterval, func() {
+ c.Ping()
+ c.pingTimer.Reset(c.HeartbeatInterval)
+ })
+ // Notify connection listeners
+ for _, listener := range c.connectionListeners {
+ go listener.Connected()
+ }
+ case "failed":
+ log.Fatalf("IM Failed to connect, we support version 1 but server supports %s", msg["version"])
+
+ // Heartbeats
+ case "ping":
+ // We received a ping - need to respond with a pong
+ id, ok := msg["id"]
+ if ok {
+ IgnoreErr(c.Send(NewPong(id.(string))), "send id ping")
+ } else {
+ IgnoreErr(c.Send(NewPong("")), "send empty ping")
+ }
+ c.pingsIn++
+ case "pong":
+ // We received a pong - we can clear the ping tracker and call its handler
+ id, ok := msg["id"]
+ var key string
+ if ok {
+ key = id.(string)
+ }
+ pings, ok := c.pings[key]
+ if ok && len(pings) > 0 {
+ ping := pings[0]
+ pings = pings[1:]
+ if len(key) == 0 || len(pings) > 0 {
+ c.pings[key] = pings
+ }
+ ping.timer.Stop()
+ ping.handler(nil)
+ }
+
+ // Live Data
+ case "nosub":
+ log.WithField("msg", msg).Debug("sub returned a nosub error")
+ // Clear related subscriptions
+ sub, ok := msg["id"]
+ if ok {
+ id := sub.(string)
+ runningSub := c.subs[id]
+
+ if runningSub != nil {
+ runningSub.Error = errors.New("sub returned a nosub error")
+ runningSub.done()
+ delete(c.subs, id)
+ }
+ }
+ case "ready":
+ // Run 'done' callbacks on all ready subscriptions
+ subs, ok := msg["subs"]
+ if ok {
+ for _, sub := range subs.([]interface{}) {
+ call, ok := c.subs[sub.(string)]
+ if ok {
+ call.done()
+ }
+ }
+ }
+ case "added":
+ c.collectionBy(msg).added(msg)
+ case "changed":
+ c.collectionBy(msg).changed(msg)
+ case "removed":
+ c.collectionBy(msg).removed(msg)
+ case "addedBefore":
+ c.collectionBy(msg).addedBefore(msg)
+ case "movedBefore":
+ c.collectionBy(msg).movedBefore(msg)
+
+ // RPC
+ case "result":
+ id, ok := msg["id"]
+ if ok {
+ call := c.calls[id.(string)]
+ delete(c.calls, id.(string))
+ e, ok := msg["error"]
+ if ok {
+ txt, _ := json.Marshal(e)
+ call.Error = fmt.Errorf(string(txt))
+ call.Reply = e
+ } else {
+ call.Reply = msg["result"]
+ }
+ call.done()
+ }
+ case "updated":
+ // We currently don't do anything with updated status
+
+ default:
+ // Ignore?
+ log.WithField("msg", msg).Debug("Server sent unexpected message")
+ }
+ } else {
+ // Current Meteor server sends an undocumented DDP message
+ // (looks like clustering "hint"). We will register and
+ // ignore rather than log an error.
+ serverID, ok := msg["server_id"]
+ if ok {
+ switch ID := serverID.(type) {
+ case string:
+ c.serverID = ID
+ default:
+ log.WithField("id", serverID).Debug("Server cluster node")
+ }
+ } else {
+ log.WithField("msg", msg).Debug("Server sent message with no `msg` field")
+ }
+ }
+ case err := <-c.errors:
+ log.WithError(err).Warn("Websocket error")
+ }
+ }
+}
+
+func (c *Client) collectionBy(msg map[string]interface{}) Collection {
+ n, ok := msg["collection"]
+ if !ok {
+ return NewMockCollection()
+ }
+ switch name := n.(type) {
+ case string:
+ return c.CollectionByName(name)
+ default:
+ return NewMockCollection()
+ }
+}
+
+// inboxWorker pulls messages from a websocket, decodes JSON packets, and
+// stuffs them into a message channel.
+func (c *Client) inboxWorker(ws io.Reader) {
+ dec := json.NewDecoder(ws)
+ for {
+ var event interface{}
+
+ if err := dec.Decode(&event); err == io.EOF {
+ break
+ } else if err != nil {
+ c.errors <- err
+ }
+ if c.pingTimer != nil {
+ c.pingTimer.Reset(c.HeartbeatInterval)
+ }
+ if event == nil {
+ log.Debug("Inbox worker found nil event. May be due to broken websocket. Reconnecting.")
+ break
+ } else {
+ c.inbox <- event.(map[string]interface{})
+ }
+ }
+
+ c.reconnectLater()
+}
+
+// reconnectLater schedules a reconnect action for later. We need to make sure that we don't
+// block, and that we don't reconnect more frequently than once every c.ReconnectInterval
+func (c *Client) reconnectLater() {
+ c.Close()
+ c.reconnectLock.Lock()
+ defer c.reconnectLock.Unlock()
+ if c.reconnectTimer == nil {
+ c.reconnectTimer = time.AfterFunc(c.ReconnectInterval, c.Reconnect)
+ }
+}
diff --git a/teleirc/matterbridge/vendor/github.com/gopackage/ddp/collection.go b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/collection.go
new file mode 100644
index 0000000..f417e68
--- /dev/null
+++ b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/collection.go
@@ -0,0 +1,245 @@
+package ddp
+
+// ----------------------------------------------------------------------
+// Collection
+// ----------------------------------------------------------------------
+
+type Update map[string]interface{}
+type UpdateListener interface {
+ CollectionUpdate(collection, operation, id string, doc Update)
+}
+
+// Collection managed cached collection data sent from the server in a
+// livedata subscription.
+//
+// It would be great to build an entire mongo compatible local store (minimongo)
+type Collection interface {
+
+ // FindOne queries objects and returns the first match.
+ FindOne(id string) Update
+ // FindAll returns a map of all items in the cache - this is a hack
+ // until we have time to build out a real minimongo interface.
+ FindAll() map[string]Update
+ // AddUpdateListener adds a channel that receives update messages.
+ AddUpdateListener(listener UpdateListener)
+
+ // livedata updates
+ added(msg Update)
+ changed(msg Update)
+ removed(msg Update)
+ addedBefore(msg Update)
+ movedBefore(msg Update)
+ init() // init informs the collection that the connection to the server has begun/resumed
+ reset() // reset informs the collection that the connection to the server has been lost
+}
+
+// NewMockCollection creates an empty collection that does nothing.
+func NewMockCollection() Collection {
+ return &MockCache{}
+}
+
+// NewCollection creates a new collection - always KeyCache.
+func NewCollection(name string) Collection {
+ return &KeyCache{name, make(map[string]Update), nil}
+}
+
+// KeyCache caches items keyed on unique ID.
+type KeyCache struct {
+ // The name of the collection
+ Name string
+ // items contains collection items by ID
+ items map[string]Update
+ // listeners contains all the listeners that should be notified of collection updates.
+ listeners []UpdateListener
+ // TODO(badslug): do we need to protect from multiple threads
+}
+
+func (c *KeyCache) added(msg Update) {
+ id, fields := parseUpdate(msg)
+ if fields != nil {
+ c.items[id] = fields
+ c.notify("create", id, fields)
+ }
+}
+
+func (c *KeyCache) changed(msg Update) {
+ id, fields := parseUpdate(msg)
+ if fields != nil {
+ item, ok := c.items[id]
+ if ok {
+ for key, value := range fields {
+ item[key] = value
+ }
+ c.items[id] = item
+ c.notify("update", id, item)
+ }
+ }
+}
+
+func (c *KeyCache) removed(msg Update) {
+ id, _ := parseUpdate(msg)
+ if len(id) > 0 {
+ delete(c.items, id)
+ c.notify("remove", id, nil)
+ }
+}
+
+func (c *KeyCache) addedBefore(msg Update) {
+ // for keyed cache, ordered commands are a noop
+}
+
+func (c *KeyCache) movedBefore(msg Update) {
+ // for keyed cache, ordered commands are a noop
+}
+
+// init prepares the collection for data updates (called when a new connection is
+// made or a connection/session is resumed).
+func (c *KeyCache) init() {
+ // TODO start to patch up the current data with fresh server state
+}
+
+func (c *KeyCache) reset() {
+ // TODO we should mark the collection but maintain it's contents and then
+ // patch up the current contents with the new contents when we receive them.
+ //c.items = nil
+ c.notify("reset", "", nil)
+}
+
+// notify sends a Update to all UpdateListener's which should never block.
+func (c *KeyCache) notify(operation, id string, doc Update) {
+ for _, listener := range c.listeners {
+ listener.CollectionUpdate(c.Name, operation, id, doc)
+ }
+}
+
+// FindOne returns the item with matching id.
+func (c *KeyCache) FindOne(id string) Update {
+ return c.items[id]
+}
+
+// FindAll returns a dump of all items in the collection
+func (c *KeyCache) FindAll() map[string]Update {
+ return c.items
+}
+
+// AddUpdateListener adds a listener for changes on a collection.
+func (c *KeyCache) AddUpdateListener(listener UpdateListener) {
+ c.listeners = append(c.listeners, listener)
+}
+
+// OrderedCache caches items based on list order.
+// This is a placeholder, currently not implemented as the Meteor server
+// does not transmit ordered collections over DDP yet.
+type OrderedCache struct {
+ // ranks contains ordered collection items for ordered collections
+ items []interface{}
+}
+
+func (c *OrderedCache) added(msg Update) {
+ // for ordered cache, key commands are a noop
+}
+
+func (c *OrderedCache) changed(msg Update) {
+
+}
+
+func (c *OrderedCache) removed(msg Update) {
+
+}
+
+func (c *OrderedCache) addedBefore(msg Update) {
+
+}
+
+func (c *OrderedCache) movedBefore(msg Update) {
+
+}
+
+func (c *OrderedCache) init() {
+
+}
+
+func (c *OrderedCache) reset() {
+
+}
+
+// FindOne returns the item with matching id.
+func (c *OrderedCache) FindOne(id string) Update {
+ return nil
+}
+
+// FindAll returns a dump of all items in the collection
+func (c *OrderedCache) FindAll() map[string]Update {
+ return map[string]Update{}
+}
+
+// AddUpdateListener does nothing.
+func (c *OrderedCache) AddUpdateListener(ch UpdateListener) {
+}
+
+// MockCache implements the Collection interface but does nothing with the data.
+type MockCache struct {
+}
+
+func (c *MockCache) added(msg Update) {
+
+}
+
+func (c *MockCache) changed(msg Update) {
+
+}
+
+func (c *MockCache) removed(msg Update) {
+
+}
+
+func (c *MockCache) addedBefore(msg Update) {
+
+}
+
+func (c *MockCache) movedBefore(msg Update) {
+
+}
+
+func (c *MockCache) init() {
+
+}
+
+func (c *MockCache) reset() {
+
+}
+
+// FindOne returns the item with matching id.
+func (c *MockCache) FindOne(id string) Update {
+ return nil
+}
+
+// FindAll returns a dump of all items in the collection
+func (c *MockCache) FindAll() map[string]Update {
+ return map[string]Update{}
+}
+
+// AddUpdateListener does nothing.
+func (c *MockCache) AddUpdateListener(ch UpdateListener) {
+}
+
+// parseUpdate returns the ID and fields from a DDP Update document.
+func parseUpdate(up Update) (ID string, Fields Update) {
+ key, ok := up["id"]
+ if ok {
+ switch id := key.(type) {
+ case string:
+ updates, ok := up["fields"]
+ if ok {
+ switch fields := updates.(type) {
+ case map[string]interface{}:
+ return id, Update(fields)
+ default:
+ // Don't know what to do...
+ }
+ }
+ return id, nil
+ }
+ }
+ return "", nil
+}
diff --git a/teleirc/matterbridge/vendor/github.com/gopackage/ddp/doc.go b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/doc.go
new file mode 100644
index 0000000..97f1b63
--- /dev/null
+++ b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/doc.go
@@ -0,0 +1,6 @@
+// Package ddp implements the MeteorJS DDP protocol over websockets. Fallback
+// to long polling is NOT supported (and is not planned on ever being supported
+// by this library). We will try to model the library after `net/http` - right
+// now the library is bare bones and doesn't provide the plug-ability of http.
+// However, that's the goal for the package eventually.
+package ddp
diff --git a/teleirc/matterbridge/vendor/github.com/gopackage/ddp/messages.go b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/messages.go
new file mode 100644
index 0000000..fc127ce
--- /dev/null
+++ b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/messages.go
@@ -0,0 +1,128 @@
+package ddp
+
+import (
+ "crypto/sha256"
+ "encoding/hex"
+ "io"
+)
+
+// ------------------------------------------------------------
+// DDP Messages
+//
+// Go structs representing common DDP raw messages ready for JSON
+// encoding.
+// ------------------------------------------------------------
+
+// Message contains the common fields that all DDP messages use.
+type Message struct {
+ Type string `json:"msg"`
+ ID string `json:"id,omitempty"`
+}
+
+// Connect represents a DDP connect message.
+type Connect struct {
+ Message
+ Version string `json:"version"`
+ Support []string `json:"support"`
+ Session string `json:"session,omitempty"`
+}
+
+// NewConnect creates a new connect message
+func NewConnect() *Connect {
+ return &Connect{Message: Message{Type: "connect"}, Version: "1", Support: []string{"1"}}
+}
+
+// NewReconnect creates a new connect message with a session ID to resume.
+func NewReconnect(session string) *Connect {
+ c := NewConnect()
+ c.Session = session
+ return c
+}
+
+// Ping represents a DDP ping message.
+type Ping Message
+
+// NewPing creates a new ping message with optional ID.
+func NewPing(id string) *Ping {
+ return &Ping{Type: "ping", ID: id}
+}
+
+// Pong represents a DDP pong message.
+type Pong Message
+
+// NewPong creates a new pong message with optional ID.
+func NewPong(id string) *Pong {
+ return &Pong{Type: "pong", ID: id}
+}
+
+// Method is used to send a remote procedure call to the server.
+type Method struct {
+ Message
+ ServiceMethod string `json:"method"`
+ Args []interface{} `json:"params"`
+}
+
+// NewMethod creates a new method invocation object.
+func NewMethod(id, serviceMethod string, args []interface{}) *Method {
+ return &Method{
+ Message: Message{Type: "method", ID: id},
+ ServiceMethod: serviceMethod,
+ Args: args,
+ }
+}
+
+// Sub is used to send a subscription request to the server.
+type Sub struct {
+ Message
+ SubName string `json:"name"`
+ Args []interface{} `json:"params"`
+}
+
+// NewSub creates a new sub object.
+func NewSub(id, subName string, args []interface{}) *Sub {
+ return &Sub{
+ Message: Message{Type: "sub", ID: id},
+ SubName: subName,
+ Args: args,
+ }
+}
+
+
+// Login provides a Meteor.Accounts password login support
+type Login struct {
+ User *User `json:"user"`
+ Password *Password `json:"password"`
+}
+
+func NewEmailLogin(email, pass string) *Login {
+ return &Login{User: &User{Email: email}, Password: NewPassword(pass)}
+}
+
+func NewUsernameLogin(user, pass string) *Login {
+ return &Login{User: &User{Username: user}, Password: NewPassword(pass)}
+}
+
+type LoginResume struct {
+ Token string `json:"resume"`
+}
+
+func NewLoginResume(token string) *LoginResume {
+ return &LoginResume{Token: token}
+}
+
+type User struct {
+ Email string `json:"email,omitempty"`
+ Username string `json:"username,omitempty"`
+}
+
+type Password struct {
+ Digest string `json:"digest"`
+ Algorithm string `json:"algorithm"`
+}
+
+func NewPassword(pass string) *Password {
+ sha := sha256.New()
+ io.WriteString(sha, pass)
+ digest := sha.Sum(nil)
+ return &Password{Digest: hex.EncodeToString(digest), Algorithm: "sha-256"}
+}
diff --git a/teleirc/matterbridge/vendor/github.com/gopackage/ddp/stats.go b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/stats.go
new file mode 100644
index 0000000..b254809
--- /dev/null
+++ b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/stats.go
@@ -0,0 +1,170 @@
+package ddp
+
+import (
+ "fmt"
+ "io"
+ "sync"
+ "time"
+)
+
+// Gather statistics about a DDP connection.
+
+// Stats tracks statistics for i/o operations.
+type Stats struct {
+ // Bytes is the total number of bytes transferred.
+ Bytes int64
+ // Ops is the total number of i/o operations performed.
+ Ops int64
+ // Errors is the total number of i/o errors encountered.
+ Errors int64
+ // Runtime is the duration that stats have been gathered.
+ Runtime time.Duration
+}
+
+// ClientStats displays combined statistics for the Client.
+type ClientStats struct {
+ // Reads provides statistics on the raw i/o network reads for the current connection.
+ Reads *Stats
+ // Reads provides statistics on the raw i/o network reads for the all client connections.
+ TotalReads *Stats
+ // Writes provides statistics on the raw i/o network writes for the current connection.
+ Writes *Stats
+ // Writes provides statistics on the raw i/o network writes for all the client connections.
+ TotalWrites *Stats
+ // Reconnects is the number of reconnections the client has made.
+ Reconnects int64
+ // PingsSent is the number of pings sent by the client
+ PingsSent int64
+ // PingsRecv is the number of pings received by the client
+ PingsRecv int64
+}
+
+// String produces a compact string representation of the client stats.
+func (stats *ClientStats) String() string {
+ i := stats.Reads
+ ti := stats.TotalReads
+ o := stats.Writes
+ to := stats.TotalWrites
+ totalRun := (ti.Runtime * 1000000) / 1000000
+ run := (i.Runtime * 1000000) / 1000000
+ return fmt.Sprintf("bytes: %d/%d##%d/%d ops: %d/%d##%d/%d err: %d/%d##%d/%d reconnects: %d pings: %d/%d uptime: %v##%v",
+ i.Bytes, o.Bytes,
+ ti.Bytes, to.Bytes,
+ i.Ops, o.Ops,
+ ti.Ops, to.Ops,
+ i.Errors, o.Errors,
+ ti.Errors, to.Errors,
+ stats.Reconnects,
+ stats.PingsRecv, stats.PingsSent,
+ run, totalRun)
+}
+
+// CollectionStats combines statistics about a collection.
+type CollectionStats struct {
+ Name string // Name of the collection
+ Count int // Count is the total number of documents in the collection
+}
+
+// String produces a compact string representation of the collection stat.
+func (s *CollectionStats) String() string {
+ return fmt.Sprintf("%s[%d]", s.Name, s.Count)
+}
+
+// StatsTracker provides the basic tooling for tracking i/o stats.
+type StatsTracker struct {
+ bytes int64
+ ops int64
+ errors int64
+ start time.Time
+ lock sync.Mutex
+}
+
+// NewStatsTracker create a new tracker with start time set to now.
+func NewStatsTracker() *StatsTracker {
+ return &StatsTracker{start: time.Now()}
+}
+
+// Op records an i/o operation. The parameters are passed through to
+// allow easy chaining.
+func (t *StatsTracker) Op(n int, err error) (int, error) {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+ t.ops++
+ if err == nil {
+ t.bytes += int64(n)
+ } else {
+ if err == io.EOF {
+ // I don't think we should log EOF stats as an error
+ } else {
+ t.errors++
+ }
+ }
+
+ return n, err
+}
+
+// Snapshot takes a snapshot of the current Reader statistics.
+func (t *StatsTracker) Snapshot() *Stats {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+ return t.snap()
+}
+
+// Reset all stats to initial values.
+func (t *StatsTracker) Reset() *Stats {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ stats := t.snap()
+ t.bytes = 0
+ t.ops = 0
+ t.errors = 0
+ t.start = time.Now()
+
+ return stats
+}
+
+func (t *StatsTracker) snap() *Stats {
+ return &Stats{Bytes: t.bytes, Ops: t.ops, Errors: t.errors, Runtime: time.Since(t.start)}
+}
+
+// ReaderStats tracks statistics on any io.Reader.
+// ReaderStats wraps a Reader and passes data to the actual data consumer.
+type ReaderStats struct {
+ StatsTracker
+ Reader io.Reader
+}
+
+// NewReaderStats creates a ReaderStats object for the provided Reader.
+func NewReaderStats(reader io.Reader) *ReaderStats {
+ r := &ReaderStats{Reader: reader}
+ r.Reset()
+ return r
+}
+
+// Read passes through a read collecting statistics and logging activity.
+func (r *ReaderStats) Read(p []byte) (int, error) {
+ return r.Op(r.Reader.Read(p))
+}
+
+// WriterStats tracks statistics on any io.Writer.
+// WriterStats wraps a Writer and passes data to the actual data producer.
+type WriterStats struct {
+ StatsTracker
+ Writer io.Writer
+}
+
+// NewWriterStats creates a WriterStats object for the provided Writer.
+func NewWriterStats(writer io.Writer) *WriterStats {
+ w := &WriterStats{Writer: writer}
+ w.Reset()
+ return w
+}
+
+// Write collects Writer statistics.
+func (w *WriterStats) Write(p []byte) (int, error) {
+ if w.Writer != nil {
+ return w.Op(w.Writer.Write(p))
+ }
+ return 0, nil
+}
diff --git a/teleirc/matterbridge/vendor/github.com/gopackage/ddp/time.go b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/time.go
new file mode 100644
index 0000000..584f9ce
--- /dev/null
+++ b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/time.go
@@ -0,0 +1,49 @@
+package ddp
+
+import (
+ "encoding/json"
+ "io"
+ "strconv"
+ "time"
+)
+
+// utcOffset in milliseconds for the current local time (east of UTC).
+var utcOffset int64
+
+func init() {
+ _, offsetSeconds := time.Now().Zone()
+ utcOffset = int64(offsetSeconds * 1000)
+}
+
+// Time is an alias for time.Time with custom json marshalling implementations to support ejson.
+type Time struct {
+ time.Time
+}
+
+// UnixMilli creates a new Time from the given unix millis but in UTC (as opposed to time.UnixMilli which returns
+// time in the local time zone). This supports the proper loading of times from EJSON $date objects.
+func UnixMilli(i int64) Time {
+ return Time{Time: time.UnixMilli(i - utcOffset)}
+}
+
+func (t *Time) UnmarshalJSON(b []byte) error {
+ var data map[string]float64
+ err := json.Unmarshal(b, &data)
+ if err != nil {
+ return err
+ }
+ val, ok := data["$date"]
+ if !ok {
+ return io.ErrUnexpectedEOF
+ }
+ // The time MUST be UTC but time.UnixMilli uses local time.
+ // We see what time it is in local time and calculate the offset to UTC
+ *t = UnixMilli(int64(val))
+
+ return nil
+}
+
+func (t Time) MarshalJSON() ([]byte, error) {
+ return []byte("{\"$date\":" + strconv.FormatInt(t.UnixMilli(), 10) + "}"), nil
+}
+
diff --git a/teleirc/matterbridge/vendor/github.com/gopackage/ddp/utils.go b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/utils.go
new file mode 100644
index 0000000..7ff16a2
--- /dev/null
+++ b/teleirc/matterbridge/vendor/github.com/gopackage/ddp/utils.go
@@ -0,0 +1,77 @@
+package ddp
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/apex/log"
+)
+
+// Contains common utility types.
+
+// -------------------------------------------------------------------
+
+// KeyManager provides simple incrementing IDs for ddp messages.
+type KeyManager struct {
+ // nextID is the next ID for API calls
+ nextID uint64
+ // idMutex is a mutex to protect ID updates
+ idMutex *sync.Mutex
+}
+
+// NewKeyManager creates a new instance and sets up resources.
+func NewKeyManager() *KeyManager {
+ return &KeyManager{idMutex: new(sync.Mutex)}
+}
+
+// Next issues a new ID for use in calls.
+func (id *KeyManager) Next() string {
+ id.idMutex.Lock()
+ next := id.nextID
+ id.nextID++
+ id.idMutex.Unlock()
+ return fmt.Sprintf("%x", next)
+}
+
+// -------------------------------------------------------------------
+
+// PingTracker tracks in-flight pings.
+type PingTracker struct {
+ handler func(error)
+ timeout time.Duration
+ timer *time.Timer
+}
+
+// -------------------------------------------------------------------
+
+// Call represents an active RPC call.
+type Call struct {
+ ID string // The uuid for this method call
+ ServiceMethod string // The name of the service and method to call.
+ Args interface{} // The argument to the function (*struct).
+ Reply interface{} // The reply from the function (*struct).
+ Error error // After completion, the error status.
+ Done chan *Call // Strobes when call is complete.
+ Owner *Client // Client that owns the method call
+}
+
+// done removes the call from any owners and strobes the done channel with itself.
+func (call *Call) done() {
+ delete(call.Owner.calls, call.ID)
+ select {
+ case call.Done <- call:
+ // ok
+ default:
+ // We don't want to block here. It is the caller's responsibility to make
+ // sure the channel has enough buffer space. See comment in Go().
+ log.Debug("rpc: discarding Call reply due to insufficient Done chan capacity")
+ }
+}
+
+// IgnoreErr logs an error if it occurs and ignores it.
+func IgnoreErr(err error, msg string) {
+ if err != nil {
+ log.WithError(err).Debug(msg)
+ }
+} \ No newline at end of file