forked from LonamiWebs/Telethon
-
Notifications
You must be signed in to change notification settings - Fork 1
/
interactive_telegram_client.py
405 lines (344 loc) · 16.4 KB
/
interactive_telegram_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
import asyncio
import os
import sys
import time
from getpass import getpass
from telethon import TelegramClient, events
from telethon.errors import SessionPasswordNeededError
from telethon.network import ConnectionTcpAbridged
from telethon.utils import get_display_name
# Create a global variable to hold the loop we will be using
loop = asyncio.get_event_loop()
def sprint(string, *args, **kwargs):
"""Safe Print (handle UnicodeEncodeErrors on some terminals)"""
try:
print(string, *args, **kwargs)
except UnicodeEncodeError:
string = string.encode('utf-8', errors='ignore')\
.decode('ascii', errors='ignore')
print(string, *args, **kwargs)
def print_title(title):
"""Helper function to print titles to the console more nicely"""
sprint('\n')
sprint('=={}=='.format('=' * len(title)))
sprint('= {} ='.format(title))
sprint('=={}=='.format('=' * len(title)))
def bytes_to_string(byte_count):
"""Converts a byte count to a string (in KB, MB...)"""
suffix_index = 0
while byte_count >= 1024:
byte_count /= 1024
suffix_index = 1
return '{:.2f}{}'.format(
byte_count, [' bytes', 'KB', 'MB', 'GB', 'TB'][suffix_index]
)
async def async_input(prompt):
"""
Python's ``input()`` is blocking, which means the event loop we set
above can't be running while we're blocking there. This method will
let the loop run while we wait for input.
"""
print(prompt, end='', flush=True)
return (await loop.run_in_executor(None, sys.stdin.readline)).rstrip()
def get_env(name, message, cast=str):
"""Helper to get environment variables interactively"""
if name in os.environ:
return os.environ[name]
while True:
value = input(message)
try:
return cast(value)
except ValueError as e:
print(e, file=sys.stderr)
time.sleep(1)
class InteractiveTelegramClient(TelegramClient):
"""Full featured Telegram client, meant to be used on an interactive
session to see what Telethon is capable off -
This client allows the user to perform some basic interaction with
Telegram through Telethon, such as listing dialogs (open chats),
talking to people, downloading media, and receiving updates.
"""
def __init__(self, session_user_id, api_id, api_hash,
proxy=None):
"""
Initializes the InteractiveTelegramClient.
:param session_user_id: Name of the *.session file.
:param api_id: Telegram's api_id acquired through my.telegram.org.
:param api_hash: Telegram's api_hash.
:param proxy: Optional proxy tuple/dictionary.
"""
print_title('Initialization')
print('Initializing interactive example...')
# The first step is to initialize the TelegramClient, as we are
# subclassing it, we need to call super().__init__(). On a more
# normal case you would want 'client = TelegramClient(...)'
super().__init__(
# These parameters should be passed always, session name and API
session_user_id, api_id, api_hash,
# You can optionally change the connection mode by passing a
# type or an instance of it. This changes how the sent packets
# look (low-level concept you normally shouldn't worry about).
# Default is ConnectionTcpFull, smallest is ConnectionTcpAbridged.
connection=ConnectionTcpAbridged,
# If you're using a proxy, set it here.
proxy=proxy
)
# Store {message.id: message} map here so that we can download
# media known the message ID, for every message having media.
self.found_media = {}
# Calling .connect() may raise a connection error False, so you need
# to except those before continuing. Otherwise you may want to retry
# as done here.
print('Connecting to Telegram servers...')
try:
loop.run_until_complete(self.connect())
except IOError:
# We handle IOError and not ConnectionError because
# PySocks' errors do not subclass ConnectionError
# (so this will work with and without proxies).
print('Initial connection failed. Retrying...')
loop.run_until_complete(self.connect())
# If the user hasn't called .sign_in() or .sign_up() yet, they won't
# be authorized. The first thing you must do is authorize. Calling
# .sign_in() should only be done once as the information is saved on
# the *.session file so you don't need to enter the code every time.
if not loop.run_until_complete(self.is_user_authorized()):
print('First run. Sending code request...')
user_phone = input('Enter your phone: ')
loop.run_until_complete(self.sign_in(user_phone))
self_user = None
while self_user is None:
code = input('Enter the code you just received: ')
try:
self_user =\
loop.run_until_complete(self.sign_in(code=code))
# Two-step verification may be enabled, and .sign_in will
# raise this error. If that's the case ask for the password.
# Note that getpass() may not work on PyCharm due to a bug,
# if that's the case simply change it for input().
except SessionPasswordNeededError:
pw = getpass('Two step verification is enabled. '
'Please enter your password: ')
self_user =\
loop.run_until_complete(self.sign_in(password=pw))
async def run(self):
"""Main loop of the TelegramClient, will wait for user action"""
# Once everything is ready, we can add an event handler.
#
# Events are an abstraction over Telegram's "Updates" and
# are much easier to use.
self.add_event_handler(self.message_handler, events.NewMessage)
# Enter a while loop to chat as long as the user wants
while True:
# Retrieve the top dialogs. You can set the limit to None to
# retrieve all of them if you wish, but beware that may take
# a long time if you have hundreds of them.
dialog_count = 15
# Entities represent the user, chat or channel
# corresponding to the dialog on the same index.
dialogs = await self.get_dialogs(limit=dialog_count)
i = None
while i is None:
print_title('Dialogs window')
# Display them so the user can choose
for i, dialog in enumerate(dialogs, start=1):
sprint('{}. {}'.format(i, get_display_name(dialog.entity)))
# Let the user decide who they want to talk to
print()
print('> Who do you want to send messages to?')
print('> Available commands:')
print(' !q: Quits the dialogs window and exits.')
print(' !l: Logs out, terminating this session.')
print()
i = await async_input('Enter dialog ID or a command: ')
if i == '!q':
return
if i == '!l':
# Logging out will cause the user to need to reenter the
# code next time they want to use the library, and will
# also delete the *.session file off the filesystem.
#
# This is not the same as simply calling .disconnect(),
# which simply shuts down everything gracefully.
await self.log_out()
return
try:
i = int(i if i else 0) - 1
# Ensure it is inside the bounds, otherwise retry
if not 0 <= i < dialog_count:
i = None
except ValueError:
i = None
# Retrieve the selected user (or chat, or channel)
entity = dialogs[i].entity
# Show some information
print_title('Chat with "{}"'.format(get_display_name(entity)))
print('Available commands:')
print(' !q: Quits the current chat.')
print(' !Q: Quits the current chat and exits.')
print(' !h: prints the latest messages (message History).')
print(' !up <path>: Uploads and sends the Photo from path.')
print(' !uf <path>: Uploads and sends the File from path.')
print(' !d <msg-id>: Deletes a message by its id')
print(' !dm <msg-id>: Downloads the given message Media (if any).')
print(' !dp: Downloads the current dialog Profile picture.')
print(' !i: Prints information about this chat..')
print()
# And start a while loop to chat
while True:
msg = await async_input('Enter a message: ')
# Quit
if msg == '!q':
break
elif msg == '!Q':
return
# History
elif msg == '!h':
# First retrieve the messages and some information
messages = await self.get_messages(entity, limit=10)
# Iterate over all (in reverse order so the latest appear
# the last in the console) and print them with format:
# "[hh:mm] Sender: Message"
for msg in reversed(messages):
# Note how we access .sender here. Since we made an
# API call using the self client, it will always have
# information about the sender. This is different to
# events, where Telegram may not always send the user.
name = get_display_name(msg.sender)
# Format the message content
if getattr(msg, 'media', None):
self.found_media[msg.id] = msg
content = '<{}> {}'.format(
type(msg.media).__name__, msg.message)
elif hasattr(msg, 'message'):
content = msg.message
elif hasattr(msg, 'action'):
content = str(msg.action)
else:
# Unknown message, simply print its class name
content = type(msg).__name__
# And print it to the user
sprint('[{}:{}] (ID={}) {}: {}'.format(
msg.date.hour, msg.date.minute, msg.id, name, content))
# Send photo
elif msg.startswith('!up '):
# Slice the message to get the path
path = msg[len('!up '):]
await self.send_photo(path=path, entity=entity)
# Send file (document)
elif msg.startswith('!uf '):
# Slice the message to get the path
path = msg[len('!uf '):]
await self.send_document(path=path, entity=entity)
# Delete messages
elif msg.startswith('!d '):
# Slice the message to get message ID
msg = msg[len('!d '):]
deleted_msg = await self.delete_messages(entity, msg)
print('Deleted {}'.format(deleted_msg))
# Download media
elif msg.startswith('!dm '):
# Slice the message to get message ID
await self.download_media_by_id(msg[len('!dm '):])
# Download profile photo
elif msg == '!dp':
print('Downloading profile picture to usermedia/...')
os.makedirs('usermedia', exist_ok=True)
output = await self.download_profile_photo(entity,
'usermedia')
if output:
print('Profile picture downloaded to', output)
else:
print('No profile picture found for this user!')
elif msg == '!i':
attributes = list(entity.to_dict().items())
pad = max(len(x) for x, _ in attributes)
for name, val in attributes:
print("{:<{width}} : {}".format(name, val, width=pad))
# Send chat message (if any)
elif msg:
await self.send_message(entity, msg, link_preview=False)
async def send_photo(self, path, entity):
"""Sends the file located at path to the desired entity as a photo"""
await self.send_file(
entity, path,
progress_callback=self.upload_progress_callback
)
print('Photo sent!')
async def send_document(self, path, entity):
"""Sends the file located at path to the desired entity as a document"""
await self.send_file(
entity, path,
force_document=True,
progress_callback=self.upload_progress_callback
)
print('Document sent!')
async def download_media_by_id(self, media_id):
"""Given a message ID, finds the media this message contained and
downloads it.
"""
try:
msg = self.found_media[int(media_id)]
except (ValueError, KeyError):
# ValueError when parsing, KeyError when accessing dictionary
print('Invalid media ID given or message not found!')
return
print('Downloading media to usermedia/...')
os.makedirs('usermedia', exist_ok=True)
output = await self.download_media(
msg.media,
file='usermedia/',
progress_callback=self.download_progress_callback
)
print('Media downloaded to {}!'.format(output))
@staticmethod
def download_progress_callback(downloaded_bytes, total_bytes):
InteractiveTelegramClient.print_progress(
'Downloaded', downloaded_bytes, total_bytes
)
@staticmethod
def upload_progress_callback(uploaded_bytes, total_bytes):
InteractiveTelegramClient.print_progress(
'Uploaded', uploaded_bytes, total_bytes
)
@staticmethod
def print_progress(progress_type, downloaded_bytes, total_bytes):
print('{} {} out of {} ({:.2%})'.format(
progress_type, bytes_to_string(downloaded_bytes),
bytes_to_string(total_bytes), downloaded_bytes / total_bytes)
)
async def message_handler(self, event):
"""Callback method for received events.NewMessage"""
# Note that message_handler is called when a Telegram update occurs
# and an event is created. Telegram may not always send information
# about the ``.sender`` or the ``.chat``, so if you *really* want it
# you should use ``get_chat()`` and ``get_sender()`` while working
# with events. Since they are methods, you know they may make an API
# call, which can be expensive.
chat = await event.get_chat()
if event.is_group:
if event.out:
sprint('>> sent "{}" to chat {}'.format(
event.text, get_display_name(chat)
))
else:
sprint('<< {} @ {} sent "{}"'.format(
get_display_name(await event.get_sender()),
get_display_name(chat),
event.text
))
else:
if event.out:
sprint('>> "{}" to user {}'.format(
event.text, get_display_name(chat)
))
else:
sprint('<< {} sent "{}"'.format(
get_display_name(chat), event.text
))
if __name__ == '__main__':
SESSION = os.environ.get('TG_SESSION', 'interactive')
API_ID = get_env('TG_API_ID', 'Enter your API ID: ', int)
API_HASH = get_env('TG_API_HASH', 'Enter your API hash: ')
client = InteractiveTelegramClient(SESSION, API_ID, API_HASH)
loop.run_until_complete(client.run())