import vk
import time
import requests
import wget
import os
class VkPolling:
def __init__(self):
self._running = True
def terminate(self):
self._running = False
def run(self, vk_user, bot, chat_id):
while self._running:
updates = []
try:
updates = vk_user.get_new_messages()
except Exception as e:
print('Error: {}'.format(e))
if updates:
print(updates)
handle_updates(vk_user, bot, chat_id, updates)
for i in range(45):
if self._running:
time.sleep(0.1)
else:
break
def handle_messages(m, vk_user, bot, chat_id):
user = vk.API(vk_user.session).users.get(user_ids=m["uid"], fields=[])[0]
if 'body' in m and not 'attachment' in m and not 'geo' in m:
data = add_reply_info(m, user["first_name"], user["last_name"])
bot.send_message(chat_id, data, parse_mode='HTML',
disable_notification=check_notification(m)).wait()
if 'attachment' in m:
attachment_handler(m, user, bot, chat_id)
if 'geo' in m:
data = add_reply_info(m, user["first_name"], user["last_name"])
geo = bot.send_message(chat_id, data, parse_mode='HTML',
disable_notification=check_notification(m)).wait()
bot.send_venue(chat_id, m['geo']['coordinates'].split(' ')[0], m['geo']['coordinates'].split(' ')[1],
m['geo']['place']['title'], m['geo']['place']['city'],
disable_notification=check_notification(m),
reply_to_message_id=geo.message_id).wait()
def handle_updates(vk_user, bot, chat_id, updates):
for m in updates:
if not m['out']:
handle_messages(m, vk_user, bot, chat_id)
def attachment_handler(m, user, bot, chat_id):
if m['attachment']['type'] == 'photo':
for photo in m['attachments']:
data = add_reply_info(m, user['first_name'], user['last_name']) + 'Фото'.format(
get_max_src(photo['photo']))
bot.send_message(chat_id, data, parse_mode='HTML',
disable_notification=check_notification(m)).wait()
if m['attachment']['type'] == 'video':
for vid in m['attachments']:
link = 'https://vk.com/video{}_{}'.format(vid['video']['owner_id'],
vid['video']['vid'])
data = add_reply_info(m, user['first_name'], user['last_name']) + 'Видео'.format(link)
bot.send_message(chat_id, data, parse_mode='HTML',
disable_notification=check_notification(m)).wait()
if m['attachment']['type'] == 'audio':
for audio in m['attachments']:
data = add_reply_info(m, user['first_name'], user['last_name']) + '🎵 {} - {}
'.format(
audio['audio']['artist'],
audio['audio']['title'])
bot.send_message(chat_id, data, parse_mode='HTML',
disable_notification=check_notification(m)).wait()
if m['attachment']['type'] == 'doc':
for doc in m['attachments']:
if doc['doc']['ext'] == 'gif':
try:
link = doc['doc']['url']
data = add_reply_info(m, user["first_name"], user["last_name"]) + 'GIF'.format(
link)
bot.send_message(chat_id, data, parse_mode='HTML',
disable_notification=check_notification(m)).wait()
except:
send_doc_link(doc, m, user, bot, chat_id)
elif doc['doc']['ext'] == 'pdf' or doc['doc']['ext'] == 'zip':
try:
link = doc['doc']['url']
data = add_reply_info(m, user["first_name"],
user["last_name"], ) + 'Документ'.format(
link)
bot.send_message(chat_id, data, parse_mode='HTML',
disable_notification=check_notification(m)).wait()
except:
send_doc_link(doc, m, user, bot, chat_id)
elif doc['doc']['ext'] == 'jpg' or doc['doc']['ext'] == 'png':
try:
link = doc['doc']['url']
data = add_reply_info(m, user["first_name"], user["last_name"], ) + 'Документ'
notification = bot.send_message(chat_id, data, parse_mode='HTML',
disable_notification=check_notification(m)).wait()
uploading = bot.send_chat_action(chat_id, 'upload_document')
bot.send_document(chat_id, link, reply_to_message_id=notification.message_id,
disable_notification=check_notification(m)).wait()
uploading.wait()
except:
send_doc_link(doc, m, user, bot, chat_id)
elif doc['doc']['ext'] == 'ogg':
try:
link = doc['doc']['url']
data = add_reply_info(m, user["first_name"], user["last_name"], ) + \
'Аудио'.format(link)
bot.send_message(chat_id, data, parse_mode='HTML',
disable_notification=check_notification(m)).wait()
except:
send_doc_link(doc, m, user, bot, chat_id)
elif doc['doc']['ext'] == 'doc' or doc['doc']['ext'] == 'docx':
try:
data = add_reply_info(m, user["first_name"], user["last_name"], ) + 'Документ'
notification = bot.send_message(chat_id, data, parse_mode='HTML',
disable_notification=check_notification(m)).wait()
uploading = bot.send_chat_action(chat_id, 'upload_document')
file = wget.download(requests.get(doc['doc']['url']).url)
bot.send_document(chat_id, open(file, 'rb'),
reply_to_message_id=notification.message_id,
disable_notification=check_notification(m)).wait()
uploading.wait()
file.close()
os.remove(file)
except:
send_doc_link(doc, m, user, bot, chat_id)
else:
send_doc_link(doc, m, user, bot, chat_id)
if m['attachment']['type'] == 'sticker':
link = m['attachment']['sticker']['photo_512']
data = add_reply_info(m, user["first_name"], user["last_name"], ) + 'Стикер'.format(link)
bot.send_message(chat_id, data, parse_mode='HTML',
disable_notification=check_notification(m)).wait()
# TODO: Wall Posts and comments
def send_doc_link(doc, m, user, bot, chat_id):
link = doc['doc']['url']
data = add_reply_info(m, user["first_name"], user["last_name"]) + \
'Документ\n{}'.format(link,
doc['doc']['title'] + '.' + doc['doc']['ext'])
bot.send_message(chat_id, data, parse_mode='HTML',
disable_notification=check_notification(m)).wait()
def add_reply_info(m, first_name, last_name):
if m['body']:
if 'chat_id' in m:
# TODO: Handle forwared messages
return '{} {} @ {}:\n{}\n'.format(m['uid'], m['chat_id'], first_name,
last_name, m['title'],
m['body'].replace('
', '\n'))
else:
return '{} {}:\n{}\n'.format(m['uid'], first_name, last_name,
m['body'].replace('
', '\n'))
else:
if 'chat_id' in m:
return '{} {} @ {}:\n'.format(m['uid'], m['chat_id'], first_name,
last_name, m['title'])
else:
return '{} {}:\n'.format(m['uid'], first_name, last_name)
def check_notification(value):
if 'push_settings' in value:
return True
else:
return False
def get_max_src(attachment):
if 'src_xxbig' in attachment:
return attachment['src_xxbig']
if 'src_xbig' in attachment:
return attachment['src_xbig']
if 'src_big' in attachment:
return attachment['src_big']
if 'src' in attachment:
return attachment['src']
class VkMessage:
def __init__(self, token):
self.session = get_session(token)
self.ts, self.pts = get_tses(self.session)
def get_new_messages(self):
api = vk.API(self.session)
new = api.messages.getLongPollHistory(ts=self.ts, pts=self.pts)
msgs = new['messages']
self.pts = new["new_pts"]
count = msgs[0]
res = []
if count == 0:
pass
else:
messages = msgs[1:]
for m in messages:
res.append(m)
return res
def get_session(token):
return vk.Session(access_token=token)
def get_tses(session):
api = vk.API(session)
ts = api.messages.getLongPollServer(need_pts=1)
return ts['ts'], ts['pts']