Важно! Обязательно обновите Rockstat перед началом!
# Rodin helpers: набор полезных функций для вывода данных
!pip install -U git+https://github.com/madiedinro/rodin_helpers_py
import rodin_helpers as rh
Кабинет разработчика в VK
Документация по API. Будем использовать Ads и Wall
Статья про OAuth - авторизацию, которую использует VK и другие.
Нам нужно получить статистику в разрезе UTM меток, чтобы можно было сопоставить с Google Analytics / Yandex Metrika. Сразу скажу, что ссылок в статистике нет и придется совершить путешествие, чтобы найти ссылки и достать из них utm_ метки или шаблон, по которому они строятся.
Шаги:
Импортируем необходимые зависимости
import requests
import os
import json
import arrow
from pprint import pprint
from collections import defaultdict
from urllib.parse import urlparse, parse_qs, quote
from dotenv import load_dotenv
Итак, вы получили от VK параметры приложения, подставьте их в код или запишите в файл creds_fn
rh.video('Pthq7BvJJco')
conf_dir = '../../'
# вам вероятнее лучше подойдет просто '', что будет означать: текущая директория
# conf_dir = ''
# Файлы с переменными окружения. Используются для конфигурации приложений
creds_fn = f'{conf_dir}.env.vk'
token_fn = f'{conf_dir}.env.vk_token'
# версия API, требуется ее знать и передавать в запросах
vk_v = '5.92'
# Загрузка данных из файла, если таковой имеется. Если файл отсутствует это нормально и ошибки не будет
# load_dotenv(creds_fn)
load_dotenv(token_fn)
# Можно подставить ручками
os.environ["VK_APP_ID"] = '6833460'
os.environ["VK_APP_SECRET"] = 'nlw3m2OxfgnZqPrsKp7g'
# Подсказка для самих себя. Дальше поймете
if os.environ.get('VK_ACC_TOKEN'):
print('Токен уже есть, получать не требуется')
# Дата завершения (сегодня - D1 дней)
D1_AGO = 0
# Дата начала (сегодня - D2 дней)
D2_AGO = 10
# Если нужно переполучить токен
# del os.environ['VK_ACC_TOKEN']
Требуется пройти по ссылке, она отфутболит на страницу где можно взять код (вообще задумано что автоматом должно получаться). Его обработка не сделана, поэтому просто возьмите его из строки адреса браузера
# пропускаем ячейку, если токен уже есть
if not os.environ.get('VK_ACC_TOKEN'):
vk_redir = f'https://oauth.vk.com/blank.html'
vk_auth_url = f'https://oauth.vk.com/authorize?client_id={os.environ["VK_APP_ID"]}&redirect_uri={vk_redir}&scope=ads,offline&response_type=code&v={vk_v}'
print('redirect to:', vk_redir)
print('-----------------------')
print(rh.show_link(vk_auth_url))
code = '32319670c10ce7aa46'
# пропускаем ячейку, если токен уже есть
if not os.environ.get('VK_ACC_TOKEN'):
token_url= f'https://oauth.vk.com/access_token?client_id={os.environ["VK_APP_ID"]}&client_secret={os.environ["VK_APP_SECRET"]}&code={code}&redirect_uri={vk_redir}'
responce = requests.get(token_url)
if responce.status_code == 200:
auth_result = responce.json()
access_token = auth_result['access_token']
user_id = auth_result['user_id']
# Обрежем токен, чтобы удостовериться, что он есть, но не палить целиком
print('Ваши ключи для доступа сохранены в переменную.', user_id, access_token[:20])
# Сохраняем токен в файл, чтобы в следующий раз не требовалось
# проходить процедуру получения токена через подтверждение кодом
with open(token_fn, 'w') as f:
f.write(f'VK_ACC_TOKEN={access_token}')
# Кроме того запишем токен в переменую оружения.
# В следующий раз он сразу загрузится туда из файла
os.environ["VK_ACC_TOKEN"] = access_token
# Если код ответа отличается от 200, печатаем сообщение с ошибкой
else:
print('error!', responce.status_code, responce.text)
Запросим информацию о рекламном аккаунте
# Словарь базовых параметров: авторизационный токен и нужная версия API. Требуется передавать со всеми запросами.
common_params = {
'access_token': os.environ["VK_ACC_TOKEN"],
'v': vk_v
}
vk_accounts_url = 'https://api.vk.com/method/ads.getAccounts'
resp = requests.get(vk_accounts_url, params=common_params)
resp.json()
Легко, не правда ли?)
API VK всегда выдает инфоммацию в виде {'response': ...
Ф-ция и переменные, которые потребуются для дальнейшего общения с VK
# Шаблон для адресов методов VK (можно узнать в документации, ссылки на которую в самом верху)
vk_call_tmpl = 'https://api.vk.com/method/{method}'
# Ставим пометку, нужно ли запрашивать удаленные элементы
vk_include_deleted = 0
# Функция выполнения запроса к VK
def vk_req(method, params=None, data = None, http_method = 'get'):
# мы специально не используем дефолтовые параметры = {}, там есть подводные камни,
# которые мы не будем разбирать. Просто поверьте.
if params is None:
params = {}
# Что касается http запросов, что такое data, что такое params смело
# ступайте в документацию по протоколу HTTP и библиотеки requests :) или на курсы digital god :)
if data is None:
data = {}
# Если запрашиваем постом, то перемещаем параметры в тело запроса.
if http_method == 'post':
# Дополняем data содержимым params
data.update(params)
params = {}
# Но в параметрах все равно передадутся базовые параметры (версия и токен)
final_params = {**common_params, **params}
# выполняем запрос, который вернет объект ответа Response
resp = requests.request(http_method, vk_call_tmpl.format(method=method), data=data, params=final_params)
# Воспользуемся ислючениями для остановки выполнения программы.
# Это позволит избежать можества дополнительных проверок дальше
if resp.status_code != 200:
print(resp.status_code, resp.text())
raise Exception('Wrong response code')
# преобразовываем текстовый json в формат данных python
result = resp.json()
# API всегда выдает ответ в {'response':, можно это сразу и обработать
if 'response' not in result:
print(resp.status_code, result)
raise Exception('No response in result')
return result['response']
Запрашиваем аккаунты текущего пользователя, чтобы удостовериться, что все так
vk_accs = vk_req('ads.getAccounts')
account_id = vk_accs[0]['account_id']
vk_accs
Интересующие я заранее пометил префиксом ||dg|| в интерфейсе VK, так в обработку попадут только нужные мне кампании
# Словарь, куда будем дописывать структуру кампаний
camps = {}
# Отдельно создадим список с айдишками кампаний, он нам дальше пригодится
vk_camps_ids = []
# Выполняем запрос
vk_camps = vk_req('ads.getCampaigns', params={'account_id': account_id, 'include_deleted': vk_include_deleted})
for camp in vk_camps:
if camp.get('name', '').startswith('||dg||'):
vk_camps_ids.append(camp['id'])
camps[camp['id']] = camp
print(vk_camps_ids)
# вернуть только первую кампанию
vk_camps[0]
Они нужны чтобы достать оттуда ссылки
# В этом словаре будет вся-вся инфа по объявлениям
ads = {}
# Отдельно сохраним айдишки объявлений в списке
vk_ads_ids = []
# Строим запрос к методу
vk_ads_req_params = {
'account_id': account_id,
'include_deleted': vk_include_deleted,
'campaign_ids': json.dumps(vk_camps_ids)
}
print('vk_ads_req_params=',vk_ads_req_params)
# Выполняем запрос
vk_ads_resp = vk_req('ads.getAds', params=vk_ads_req_params)
for i, ad in enumerate(vk_ads_resp):
vk_ads_ids.append(ad['id'])
ad_id = ad['id']
ads[ad_id] = {
# Сохраняем структуру объявления
'ad': ad,
# Дописываем прямо к объявлению структуру кампании
'camp': camps.get(ad['campaign_id'])
}
print(f'processed {i} ads')
print('-----------------------')
# list(ads.keys())[0] - получить первый ключ справочника с объявлениями. Такая вот сложность, чтобы вывести всего один элемент
pprint(ads[list(ads.keys())[0]])
Только вот ссылок тут нет. Ищем дальше и находим метод вads.getAdsLayout
Чтобы каждый раз не париться с выводом первого элемента (я это делаю для наглядности, а если выводить все, то придется очень много скроллить) напишем специальную ф-цию.
def first(mydict):
return mydict[list(mydict.keys())[0]]
Как оказалось их тут тоже нет, но есть ссылки на специально созданные под эти объявления рекламные
посты.
Дополнительно придется построить справочник соответсвия постов и объявлений wall_ids
.
Layout объявлениея просто допишем к словарю ads
.
# Маска, по которой можно отличить, что объявление является постом
POST_PREFIX = 'http://vk.com/wall'
wall_ids = defaultdict(list)
al_params={
'account_id': account_id,
'include_deleted': vk_include_deleted,
'campaign_ids': json.dumps(vk_camps_ids)
}
vk_ads_lay = vk_req('ads.getAdsLayout', params=al_params)
# Обходим результаты запроса
for i, ad in enumerate(vk_ads_lay):
ad_id = ad.get('id')
camp_id = ad.get('campaign_id')
# Проверяем что у нас есть такая кампания
if not camp_id or camp_id not in vk_camps_ids:
continue
# Проверяем что такое объявдение получено
if ad_id not in ads:
continue
# Достаем ссылку из словаря по ключу link_url
link_url = ad.get('link_url')
if link_url and link_url.startswith(POST_PREFIX):
# Находим ID записи на стене
wall_id = link_url[len(POST_PREFIX):]
wall_ids[wall_id].append(ad_id)
# Сохраняем в общем справочнике объявлений
ad['wall_id'] = wall_id
ads[ad_id]['layout'] = ad
print(f'processed {i} ads')
print('-----------------------')
pprint(first(ads))
print('-----------------------')
pprint(wall_ids)
Делаем это только по подготовленной таблице соответствия постов (словарь, где ключ это id поста).
# Как и с выводом первого элемента, берем ключи словаря и преобразовываем их в список [*wall_ids.keys()]
vk_posts = vk_req('wall.getById', {'posts': ",".join([*wall_ids.keys()])})
for post in vk_posts:
post_id = '{from_id}_{id}'.format(**post)
# По ранее составленному справочнику связей постов и объявлений находим соответствие
for ad_id in wall_ids[post_id]:
ads[ad_id]['post'] = post
# Ищем ссылку в аттачах
for attach in post.get('attachments'):
if attach['type'] == 'link':
link = attach['link']
# Дописываем к объявлению
ads[ad_id]['url'] = link.get('url')
print(f'processed {i} posts')
print('-----------------------')
pprint(post)
Как видите в посте содержится оооочень много всего. Но нас интересует лишь ссылка, ее то мы и достали
В итоге по каждому объявлению получили вот такую структуру
pprint(first(ads))
Поехали дальше. Доставать разметку
Пример разметки:
utm_source=vk&utm_medium=cpm&utm_campaign={campaign_id}&utm_content={ad_id}
Ф-ция set_params
получает на вход ссылку с шаблоном и объявление и на выходе дает
полностью сформированную ссылку. Пример:
https://....utm_source=vk&utm_medium=cpm&utm_campaign={campaign_id}&utm_content={ad_id}
-> https://....utm_source=vk&utm_medium=cpm&utm_campaign=1010819423&utm_content=49726084
Другая ф-ция extract_utms
разбирает готовую ссылку и достает из нее utm_ параметры.
Возвращает в виде словаря.
https://....utm_source=vk&utm_medium=cpm&utm_campaign=1010819423&utm_content=49726084
-> {'utm_source': 'vk', 'utm_medium': 'cpm', 'utm_campaign': '1010819423', 'utm_content': '49726084'}
# Соответствия между параметрами статистики и шаблона
params_map = {
'campaign_name': '{camp[name]}',
'campaign_id': '{camp[id]}',
'ad_id': '{ad[id]}'
}
# Функция, которая получает на вход ссылку с шаблоном и объявление а на выходе дает полностью сформированную ссылку
def set_params(url, ad):
for prop, value in params_map.items():
key_map = params_map.get(prop, prop.lower())
value_esc = value.format(**ad)
url = url.replace('{'+ key_map + '}', value_esc)
return url
# Из ссылки достает utm_* метки.
def extract_utms(url):
parsed_url = urlparse(url)
query = parse_qs(parsed_url.query)
return {k: v[0] for k,v in query.items() if k.startswith('utm_')}
# Форматируем даты в формат, требуемый VK: 2019-01-01
d1 = arrow.now().shift(days=-D2_AGO).format('YYYY-MM-DD')
d2 = arrow.now().shift(days=-D1_AGO).format('YYYY-MM-DD')
# Формируем объект запроса к API
stat_params = {
'period': 'day',
'account_id': account_id,
'ids_type': 'ad',
'ids': ",".join(vk_ads_ids),
'date_from': d1.format('YYYY-MM-DD'),
'date_to': d2.format('YYYY-MM-DD'),
}
print('----------------------- params ')
pprint(stat_params)
print('-----------------------')
vk_stats = vk_req('ads.getStatistics', stat_params)
# Список под статистику, ведь статистика это набор однотипных записей неизвестного кол-ва
vk_ads_stats = []
for ad_stats in vk_stats:
# Находим объявление в заготовленном справочнике
ad_id = str(ad_stats['id'])
# Если у нас такое объявление есть
ad = ads.get(ad_id)
if not ad:
continue
# Проверяем, что у объявления имеется url
url = ad.get('url', None)
if not url:
continue
# Подставляет шаблонные параметры, если имеются
final_url = set_params(url, ad)
# Достаем utm_*
utms = extract_utms(final_url)
# Проверяем что есть хоть какая-то статистика
if not ad_stats.get('stats'):
continue
# pop - забрать из словаря (прям забрать) значение по ключу.
ad_stat = ad_stats.pop('stats')
# Обходим пришедшие по объявлению данные
for day_stat in ad_stat:
# Готовим дополнение для словаря записи строки статистики
upd = {
'date': day_stat.pop('day'),
'ad_id': ad_id,
'campaign_id': ads[ad_id]['camp']['id'],
'account_id': account_id
}
# Объединяем все данные
rec = {**utms, **day_stat,**upd}
vk_ads_stats.append(rec)
Осмотрим структуры, которые были у нас по пути
print('----------------------- статистика одного объявления ')
pprint(ad_stat)
print('----------------------- дополнение к одной из записей в стате объявления')
pprint(upd)
print('----------------------- финальная версия одной записи в стате ')
pprint(rec)
Вот что получили в итоге. Статистика + метки, все что нужно!
rh.print_rows(vk_ads_stats, limit=10)
Импортируем библиотеку simplech и создаем инстанс клиента. Сразу воспользуемся асинхронной версией, чтобы потом меньше переделывать.
from simplech import AsyncClickHouse
ch = AsyncClickHouse()
Воспользуемся TableDiscovery из модуля simplech для автоматической генерации схемы БД
td = ch.discover('vk_stat', vk_ads_stats)
# Укажем какие поля метрики, где дата и что будет первичным ключем
td.date('date').idx('account_id', 'date').metrics('spent', 'impressions', 'reach', 'clicks')
Подробрнее о библиотеке: https://github.com/madiedinro/simple-clickhouse
Если коротко:
td = ch.discovery(table, list_data)
будут проанализированы записи и подобрана структура хранения
Но требуется указать:
Важно! Работает только с метриками, которые можно спокойно складывать/вычитать т.е. показатели вроде CTR, CPC нельзя держать в таблице, только вычислять на лету
td.date('date').idx('account_id', 'date').metrics('spent', 'impressions', 'reach', 'clicks')
Все методы конфигурации поддерживают точечный синтаксис для выставления следующего параметра
with td.difference(d1, d2, data_list) as d:
for row in d:
td.push(row)
print(row)
sql = td.merge_tree()
print(sql)
Выполняем SQL запрос
await td.merge_tree(execute=True)
query = f'SELECT count() FROM {td.table}'
print('query:', query)
result = await ch.select(query)
print('result:', result)
При использовании td.difference(d1, d2) и td.push нет необходимости самостоятельно вызывать ch.flush в конце, он выполнится автоматически
# в целях наглядности запишем сюда то, что отправилось на запись в БД
written = []
# инициализируем оброаботчик разницы
async with td.difference(d1, d2, vk_ads_stats) as d:
# он является асинхронным генератором и его следует обойти циклом
async for row in d:
# добавляем в буфер на запись
td.push(row)
# пишем себе, чтобы потом поглядеть
written.append(row)
rh.print_rows(written, limit=10)
Посмотрим что в таблице
query = 'SELECT * FROM vk_stat ORDER BY date'
rows = [r async for r in ch.objects_stream(query)]
rh.print_rows(rows, limit=10)
У нас все получилось. Данные были получены и записаны в ClickHouse. Обрабатывается "дописывание" свежих данных. Многим и этого хватит а кому не хватит, пошли дальше!
Теперь надо превратить это из формата блокнота, где надо все выполнять руками в удобный набор функций, которые можно будет перенести в сервис, где они будут выполняться по расписанию. Более того, будет автоматически обрабатываться получение токена.
Заранее прошу прощения, но подробно расписывать не буду, только какие-то специфичные места. Для всего остального есть документация (ее мало, но есть) и доки питона.
Соберем вместе все, что нам понадобится. Просто соберем, тут не все сможет запуститься
import asyncio
from band import expose, cleanup, worker, settings as cfg, logger, response
import aiohttp
from collections import defaultdict
from urllib.parse import urlparse, parse_qs, quote
import json
from simplech import AsyncClickHouse
import arrow
from aiocron import crontab
vk_v = '5.92'
# Превращаем в обычный шаблон, чтобы применить параметры в момент использования
token_url_tmpl = 'https://oauth.vk.com/access_token?client_id={id}&client_secret={secret}&code={code}&redirect_uri={redir}'
# ссылка, куда отправлять для авторизации
vk_auth = f'https://oauth.vk.com/authorize?client_id={cfg.app_id}&redirect_uri={cfg.app_redir}&scope=ads,offline&response_type=code&v={vk_v}'
vk_call_tmpl = 'https://api.vk.com/method/{method}'
# Шаблон для адресов методов VK (можно узнать в документации, ссылки на которую в самом верху)
vk_call_tmpl = 'https://api.vk.com/method/{method}'
# Ставим пометку, нужно ли запрашивать удаленные элементы
vk_include_deleted = 0
# Папка с конфигами
conf_dir = ''
# Файлы с переменными окружения. Использются для конфигурации приложений
# creds_fn = f'{conf_dir}.env.vk' - в сервисах рокстат есть специальные конфиги
token_fn = f'{conf_dir}.env.vk_token'
POST_PREFIX = 'http://vk.com/wall'
# Соответствия между параметрами статистики и шаблона
params_map = {
'campaign_name': '{camp[name]}',
'campaign_id': '{camp[id]}',
'ad_id': '{ad[id]}'
}
# С редиректом тут будет отдельная история, можно сразу его обработать, не копируя ключ из строки адреса
# https://{{DOMAIN}}/vk_collect/get_code
# Тут:
# {{DOMAIN}} - домен с рокстат - подставляется автоматом
# vk_collect - название сервиса в рокстат
# get_code - название метода в сервисе
# >>> Пометка: не забыть поместить в конфиг APP_ID, APP_SECRET
common_params = {
'access_token': None,
'v': vk_v
}
# создаем клиента ClickHouse
ch = AsyncClickHouse()
app_redir: https://{{DOMAIN}}/vk_collect/get_code
app_id: {{APP_ID}}
app_secret: {{APP_SECRET}}
APP_ID=ваши данные от приложения vk
APP_SECRET=ваши данные от приложения vk
Перепишем функцию выполнения запроса к API в асинхронный формат
async def vk_req_async(method, params=None, data=None, http_method='get'):
if params is None:
params = {}
if http_method == 'post':
data = data or {}
data.update(params)
params = {}
final_params = {**common_params, **params}
url = vk_call_tmpl.format(method=method)
logger.debug('requesting', u=url, p=final_params)
async with aiohttp.ClientSession() as session:
async with session.request(http_method, url, json=data, params=final_params) as res:
if res.status != 200:
print(res.status, await res.text())
raise Exception('Wrong response code')
result = await res.json()
if 'response' not in result:
print(res.status, result)
raise Exception('No response in result')
return result['response']
Пример использования
await vk_req_async('ads.getAccounts')
[{'account_id': 1603421955,
'account_type': 'general',
'account_status': 1,
'account_name': 'Личный кабинет',
'access_role': 'admin'}]
Набор необходимых функций для хранения токена и разбора url
def save_token(access_token):
with open(token_fn, 'w') as f:
f.write(access_token)
def load_token():
try:
with open(token_fn) as f:
# read читает все содержимое файла. strip убирает переносы строк по краям
return f.read().strip()
except Exception:
logger.exception('ex')
# Функция, которая получает на вход ссылку с шаблоном и объявление и на выходе дает полностью сформированную ссылку
def set_params(url, ad):
for prop, value in params_map.items():
key_map = params_map.get(prop, prop.lower())
value_esc = value.format(**ad)
url = url.replace('{'+ key_map + '}', value_esc)
return url
# Из ссылки достает utm_* метки.
def extract_utms(url):
parsed_url = urlparse(url)
query = parse_qs(parsed_url.query)
return {k: v[0] for k,v in query.items() if k.startswith('utm_')}
async def load_data(d1, d2):
# Аккаунт
vk_accs = await vk_req_async('ads.getAccounts')
account_id = vk_accs[0]['account_id']
# Кампании
camps = {}
vk_camps_ids = []
vk_camps = await vk_req_async('ads.getCampaigns', params={'account_id': account_id, 'include_deleted': vk_include_deleted})
for camp in vk_camps:
if camp.get('name', '').startswith('||dg||'):
vk_camps_ids.append(camp['id'])
camps[camp['id']] = camp
# Немного подождем
await asyncio.sleep(0.5)
# Объявления
ads = {}
vk_ads_ids = []
vk_ads_req_params = {
'account_id': account_id,
'include_deleted': vk_include_deleted,
'campaign_ids': json.dumps(vk_camps_ids)
}
vk_ads_resp = await vk_req_async('ads.getAds', params=vk_ads_req_params)
for i, ad in enumerate(vk_ads_resp):
vk_ads_ids.append(ad['id'])
ad_id = ad['id']
ads[ad_id] = {
'ad': ad,
'camp': camps.get(ad['campaign_id'])
}
wall_ids = defaultdict(list)
al_params = {
'account_id': account_id,
'include_deleted': vk_include_deleted,
'campaign_ids': json.dumps(vk_camps_ids)
}
await asyncio.sleep(1)
vk_ads_lay = await vk_req_async('ads.getAdsLayout', params=al_params)
for i, ad in enumerate(vk_ads_lay):
ad_id = ad.get('id')
camp_id = ad.get('campaign_id')
if not camp_id or camp_id not in vk_camps_ids:
continue
if ad_id not in ads:
continue
link_url = ad.get('link_url')
if link_url and link_url.startswith(POST_PREFIX):
wall_id = link_url[len(POST_PREFIX):]
wall_ids[wall_id].append(ad_id)
ad['wall_id'] = wall_id
ads[ad_id]['layout'] = ad
await asyncio.sleep(1)
vk_posts = await vk_req_async('wall.getById', {'posts': ",".join([*wall_ids.keys()])})
for post in vk_posts:
post_id = '{from_id}_{id}'.format(**post)
for ad_id in wall_ids[post_id]:
ads[ad_id]['post'] = post
for attach in post.get('attachments'):
if attach['type'] == 'link':
link = attach['link']
ads[ad_id]['url'] = link.get('url')
# Формируем объект запроса к API
stat_params = {
'period': 'day',
'account_id': account_id,
'ids_type': 'ad',
'ids': ",".join(vk_ads_ids),
'date_from': d1.format('YYYY-MM-DD'),
'date_to': d2.format('YYYY-MM-DD'),
}
await asyncio.sleep(1)
vk_stats = await vk_req_async('ads.getStatistics', stat_params)
# Список под статистику, ведь статистика это набор однотипных записей неизвестного кол-ва
vk_ads_stats = []
for ad_stats in vk_stats:
# Находим объявление в заготовленном справочнике
ad_id = str(ad_stats['id'])
# Если у нас такое объявление есть
ad = ads.get(ad_id)
if not ad:
continue
# Проверяем, что у объявления имеется url
url = ad.get('url', None)
if not url:
continue
# Подставляет шаблонные параметры, если имеются
final_url = set_params(url, ad)
# Достаем utm_*
utms = extract_utms(final_url)
# Проверяем что есть хоть какая-то статистика
if not ad_stats.get('stats'):
continue
# pop - забрать из словаря (прям забрать) значение по ключу.
ad_stat = ad_stats.pop('stats')
# Обходим пришедшие по объявлению данные
for day_stat in ad_stat:
# Готовим дополнение для словаря записи строки статистики
upd = {
'date': day_stat.pop('day'),
'ad_id': ad_id,
'campaign_id': ads[ad_id]['camp']['id'],
'account_id': account_id
}
# Объединяем все данные
rec = {**utms, **day_stat,**upd}
vk_ads_stats.append(rec)
print(vk_ads_stats)
return vk_ads_stats
Сделаем обработчик авторизации (возврата по ссылке, с кодом для получения токена)
# @expose.handler() - помечает фунуцию обработчиком внешних запросов.
# Она будет доступна по адресу, который мы указали в vk_redir = f'https://<ваш домен>/vk_collect/get_code'
# при условии, что название сервиса будет vk_collect
@expose.handler()
async def get_code(data, **params):
# так работают обработчики в рокстат. Все входящие данные приходят в data
# а вот **params это приемник всех остальных параметров, заранее неизвестно сколько их будет
# поэтому у обработчиков запросов надо всегда добавлять словарь, куда они попадут
code = data.get('code')
token_url = token_url_tmpl.format(
id=cfg.app_id,
secret=cfg.app_secret,
code=code,
redir=cfg.app_redir)
async with aiohttp.ClientSession() as session:
async with session.get(token_url) as res:
if res.status == 200:
result = await res.json()
print(result)
common_params['access_token'] = result['access_token']
save_token(result['access_token'])
else:
print(result.status, await result.text())
Осталось дело за сохранением. Получаем описание таблицы и копируем его в код
print(td.pycode())
td_vk_stat = ch.discover('vk_stat', columns={
'utm_source': 'String',
'utm_campaign': 'String',
'utm_content': 'String',
'utm_term': 'String',
'impressions': 'Int64',
'reach': 'Int64',
'date': 'Date',
'ad_id': 'Int64',
'campaign_id': 'Int64',
'account_id': 'Int64',
'spent': 'Float64',
'clicks': 'Int64'})\
.metrics('clicks', 'reach', 'impressions', 'spent')\
.dimensions('ad_id', 'utm_content', 'account_id', 'utm_term', 'utm_campaign', 'campaign_id', 'date', 'utm_source')\
.date('date')\
.idx('account_id', 'date')
async def save_data(d1, d2, data):
await td_vk_stat.merge_tree(execute=True)
# инициализируем оброаботчик разницы
async with td_vk_stat.difference(d1, d2, data) as d:
# он является асинхронным генератором и его следует обойти циклом
async for row in d:
# добавляем в буффер на запись
td_vk_stat.push(row)
# Смотрем статистику, что у нас получилось
logger.info('diff stat', stat=d.stat)
Воркер
# @crontab('5 5 * * *') - стандартрный формат кронтаб,
# где описывается расписание запуска. Тут указано запускать 5:05 ежедневно
@crontab('5 5 * * *')
# @worker() - делает функцию "воркером", если в ней имеется бесконечный цикл. Запускает при старте
@worker()
async def work():
logger.info('auth link:', link=vk_auth)
token = load_token()
if not token:
logger.warn('authorize please')
return
print(token)
common_params['access_token'] = token
d1 = arrow.now().shift(days=-2).format('YYYY-MM-DD')
d2 = arrow.now().shift(days=-1).format('YYYY-MM-DD')
data = await load_data(d1, d2)
await save_data(d1, d2, data)
Достаем свои параметры
Будьте аккуратны, aiohttp очень быстрый, можно легко попасть на
{'error': {'error_code': 9,
'error_msg': 'Flood control',
'request_params': [{'key': 'oauth', 'value': '1'},
{'key': 'method', 'value': 'ads.getAds'},
{'key': 'v', 'value': '5.92'},
{'key': 'account_id', 'value': '1603421955'},
{'key': 'include_deleted', 'value': '0'},
{'key': 'campaign_ids',
'value': '[1010819423, 1010920266]'}]}}
`
В этом случае используйте
await asyncio.sleep(1)
длительность ожидания придется подобрать
Скринкаст полного цикла создания сервиса
rh.video('IvtuAGHR9HA')
rh.video('RtBNnE_VBH8')
Запросы:
SELECT
$timeSeries as t,
sum(impressions)
FROM $table
WHERE $timeFilter
GROUP BY t
ORDER BY t
---
SELECT
$timeSeries as t,
sum(spent)
FROM $table
WHERE $timeFilter
GROUP BY t
ORDER BY t
---
$columns(
utm_content,
sum(impressions) c)
FROM $table
---
$columns(
utm_term,
sum(impressions) c)
FROM $table
На этом все. Ждите новых серий :)