forked from pritunl/pritunl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
messenger.py
156 lines (126 loc) · 4.64 KB
/
messenger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from pritunl.helpers import *
from pritunl import mongo
from pritunl import cache
from pritunl import utils
import pymongo
import time
def publish(channels, message, extra=None, transaction=None):
if cache.has_cache:
return cache.publish(channels, message, extra=extra)
collection = mongo.get_collection('messages')
doc = {
'message': message,
'timestamp': utils.now(),
}
if extra:
for key, val in extra.items():
doc[key] = val
# ObjectId must be set by server and ObjectId order must match $natural
# order. Docs sent in order on client are not guaranteed to match $natural
# order on server. Nonce is added to force an insert from upsert where
# insert is not supported.
# When using inserts manipulate=False must be set to prevent pymongo
# from setting ObjectId locally.
if transaction:
tran_collection = transaction.collection(collection.name_str)
if isinstance(channels, str):
doc['channel'] = channels
tran_collection.update({
'nonce': utils.ObjectId(),
}, {
'$set': doc,
}, upsert=True)
else:
for channel in channels:
doc_copy = doc.copy()
doc_copy['channel'] = channel
tran_collection.bulk().find({
'nonce': utils.ObjectId(),
}).upsert().update({
'$set': doc_copy,
})
tran_collection.bulk_execute()
else:
if isinstance(channels, str):
doc['channel'] = channels
collection.insert(doc, manipulate=False)
else:
docs = []
for channel in channels:
doc_copy = doc.copy()
doc_copy['channel'] = channel
docs.append(doc_copy)
collection.insert(docs, manipulate=False)
def get_cursor_id(channels):
if cache.has_cache:
if not isinstance(channels, str):
raise TypeError(
'Cannot get cache cursor_id for muiltiple channels')
return cache.get_cursor_id(channels)
collection = mongo.get_collection('messages')
spec = {}
if isinstance(channels, str):
spec['channel'] = channels
else:
spec['channel'] = {'$in': channels}
for i in xrange(2):
try:
return collection.find(spec).sort(
'$natural', pymongo.DESCENDING)[0]['_id']
except IndexError:
if i:
raise
else:
publish(channels, None)
@interrupter_generator
def subscribe(channels, cursor_id=None, timeout=None, yield_delay=None,
yield_app_server=False):
if cache.has_cache:
for msg in cache.subscribe(channels, cursor_id=cursor_id,
timeout=timeout, yield_delay=yield_delay,
yield_app_server=yield_app_server):
yield msg
return
collection = mongo.get_collection('messages')
start_time = time.time()
cursor_id = cursor_id or get_cursor_id(channels)
while True:
try:
spec = {}
if isinstance(channels, str):
spec['channel'] = channels
else:
spec['channel'] = {'$in': channels}
if cursor_id:
spec['_id'] = {'$gt': cursor_id}
yield
cursor = collection.find(
spec,
cursor_type=pymongo.cursor.CursorType.TAILABLE_AWAIT,
).sort('$natural', pymongo.ASCENDING)
yield
while cursor.alive:
for doc in cursor:
cursor_id = doc['_id']
yield
if doc.get('message') is not None:
doc.pop('nonce', None)
yield doc
if yield_delay:
time.sleep(yield_delay)
spec = spec.copy()
spec['_id'] = {'$gt': cursor_id}
cursor = collection.find(spec).sort(
'$natural', pymongo.ASCENDING)
for doc in cursor:
if doc.get('message') is not None:
doc.pop('nonce', None)
yield doc
return
if yield_app_server and check_app_server_interrupt():
return
if timeout and time.time() - start_time >= timeout:
return
yield
except pymongo.errors.AutoReconnect:
time.sleep(0.2)