前の記事でGoogleフォームに入力されたデータをGASを使ってXML形式に加工し、Gmailアカウントに送信しました。
今回は、そのメールをPythonとDjnagoを使って処理し、データベースに保存するまでを行います。
メール本文はXML宣言のない不完全なXML形式なのでXML宣言を追加してPythonで処理し、CSVファイルとデータベースに保存します。
送信したメールの宛先はGmailアカウントなので、二段階認証の処理も行います。
環境
・Python 3.9.5
・Django 3.2.4
仕様
・Djangoのカスタムコマンド機能を使ってcronで動かします。(django-crontabは使っていない。)
・初回の実行時に二段階認証のURLが表示されるので、受信するGmailアカウントで認証します。
・Gailから二段階認証でメールを受信してメール本文を解析します。
・Gmailで設定しているラベルを取得します。
・指定したラベルが存在しなければ作成します。
・処理したメールにラベルを付けます。
・CSVファイルに保存します。
・データベースに保存します。
XML形式のメール本文
Pythonの「xml.etree.ElementTree」を使って処理します。
Googleフォームから送信するときに完全なXML形式にしておけば良いのですが、今回は諸事情でこの仕様にしています。
取得したメール本文は以下のようになっているので
<name>xxxx</name> <age>oooo</age>
XML宣言を付加し、データ部分を<data>と</data>で囲んでから処理します。
<?xml version="1.0" encoding="utf-8"?> <data> <name>xxxx</name> <age>oooo</age> </data>
cronの設定
「django-crontab」は使っていないのでシステムのcronを使います。
pyenvで仮想環境を構築しているので、pythonのパスは以下の通りとなります。
「manage.py」の引数は、作成したスクリプト名をtest.pyとすると「.py」を除いたtestとなります。
root権限で動かしたい場合は「sudo」で指定します。
sudo crontab -e /home/hoge/.pyenv/shims/python manage.py /home/www/wsgi/project/manage.py test
Djangoのカスタムコマンド
今回のアプリ名はcronで動かすので「cron」とします。
「settings.py」の「INSTALLED_APPS」に「cron」を追加します。
「cron」アプリを作成して反映させます。
アプリの作成
python manage.py startapp cron python manage.py makemigrations python manage.py migrate
構成
構成は以下の通りです。
「management」と「commands」ディレクトリは自分で作成します。
project
┣ cron
┣ management
┣ commands
┣ 作成したスクリプト.py
┣ credentials.json
┣ token.pickle
「credentials.json」の取得
Google Search Consoleで「credentials.json」を取得、認証を受けて「token.pickle」を作成します。
- 送信先のGmailアカウントでhttps://console.cloud.google.com/?hl=jaにアクセスします。
- 新規にプロジェクトを作成して選択します。
- 「APIとサービス」→「ライブラリ」で[Gmail API」を有効にします。
- 「APIとサービス」→「認証情報」をクリックします。
- 「認証情報を作成」→[OAuthクライアントID」を選択します。
- 「アプリケーションの種類」を「デスクトップアプリ」にして適当な「名前」を入力、「作成」をクリックします。
- 「OAuth同意画面」の設定は環境に合わせて設定します。
- 「APIとサービス」→「認証情報」→「OAuth2.0クライアントID」に作成したIDがありますので、右端の↓マークをクリックして認証情報をダウンロードします。
- ダウンロードしたファイルを名前を「credentials.json」に変更してスクリプト同じディレクトリに置きます。
スクリプト
cronで動かす場合は「credentials.json」と「token.pickle」は絶対パスで指定します。
# coding: utf-8
import base64
import os
import csv
import datetime
import argparse
import xml.etree.ElementTree as ET
from django.utils.timezone import make_aware
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
# Commandクラス
from django.core.management.base import BaseCommand, CommandError
# モデルの読み込み
from xxx.models import XXX
# メールの検索条件
# 「subject」を実験ごとに変更します。
# https://support.google.com/mail/answer/7190
SEARCH_CRITERIA = {
'subject': "xxxxxxx",
'has': "nouserlabels", # ラベル無しのメール
'from': "xxxxx@xxx.xx",
'to': "xxxxx@xxx.xx"
}
# データ保存用ファイル
CSV_FILE = './xxxxx.csv'
# Gmailで振り分けるラベル
GMAIL_LABEL = 'xxxxx'
# OAuth2 for google
SCOPES = ['https://www.googleapis.com/auth/gmail.modify']
CLIENT_SECRETS_FILE = '/home/www/wsgi/xxx/cron/management/commands/credentials.json'
TOKEN_PICKLE_FILE = '/home/www/wsgi/xxx/cron/management/commands/token.pickle'
#
# クラス
#
class Command(BaseCommand):
# 「python manage.py help reguser」で表示されるメッセージ
help = 'xxxxxxxx'
# コマンドが実行された際に呼ばれるメソッド
def handle(self, *args, **options):
user_id = 'me'
creds = self.get_credentials()
#self.service = build("gmail", "v1", credentials=creds, cache_discovery=False)
self.service = build('gmail', 'v1', credentials=creds)
query = self.build_search_criteria(SEARCH_CRITERIA)
messages = self.get_mail_list(query, user_id)
#print('Start script')
if messages:
label_dict = self.get_label_dict(user_id)
for mes in messages:
mes_id = mes['id']
self.modify_label(mes_id, label_dict, user_id)
result = self.get_subject_message(mes_id, user_id)
list = self.xml_tag_removal(result['message'])
self.csv_write(CSV_FILE, list)
self.db_write(list)
else:
print('No messages list.')
# OAuth2(Authentication credential)
def get_credentials(self):
creds = None
if os.path.exists(TOKEN_PICKLE_FILE):
creds = Credentials.from_authorized_user_file(TOKEN_PICKLE_FILE, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
creds = flow.run_console()
with open(TOKEN_PICKLE_FILE, 'w') as token:
token.write(creds.to_json())
return creds
# メールリスト用のquery構築
def build_search_criteria(self, query_dict):
query_string = ''
for key, value in query_dict.items():
if value:
query_string += key + ':' + value + ' '
return query_string
# メールのリストを取得
def get_mail_list(self, query, user):
try:
results = self.service.users().messages().list(userId=user, q=query).execute()
except HttpError as error:
print('get_mail_list is error occurred: %s' % error)
messages = results.get('messages', [])
return messages
# メール本文の取得
def get_subject_message(self, id, user):
try:
res = self.service.users().messages().get(userId=user, id=id).execute()
except HttpError as err:
print('action=get_message error={err}')
raise
result = {}
subject = [d.get('value') for d in res['payload']['headers'] if d.get('name') == 'Subject'][0]
result['subject'] = subject
# text/plain
if 'data' in res['payload']['body']:
b64_message = res['payload']['body']['data']
# text/html
elif res['payload']['parts'] is not None:
b64_message = res['payload']['parts'][0]['body']['data']
message = self.base64_decode(b64_message)
result['message'] = message
return result
# base64デコード
def base64_decode(self, b64_message):
#message = base64.urlsafe_b64decode(b64_message + '=' * (-len(b64_message) % 4)).decode(encoding='utf-8')
message = base64.urlsafe_b64decode(b64_message).decode(encoding='utf-8')
return message
# XMLタグを除去してデータを取り出す
def xml_tag_removal(self, xml):
xml = '<?xml version="1.0" encoding="utf-8"?>\n' + '<data>\n' + xml + '</data>\n'
# 全角(半角)文字の「?」が入っているとXMLタグとして解析できないので削除
xml = xml.replace('?', '')
#xml = xml.replace('?', '')
# 全角(半角)文字の「&」が入っているとXMLタグとして解析できないので削除
#xml = xml.replace('&', '')
xml = xml.replace('&', '')
# 全角空白を半角空白に変換します
xml = xml.replace(' ', ' ')
root = ET.fromstring(xml)
ct = 0
list = []
# rootの要素数だけforで回してデータにアクセスします。
for data in root:
list.append(root[ct].text)
ct += 1
return list
# データをCSVファイルに出力します
def csv_write(self, csvfile, list):
with open(csvfile, 'a') as f:
w = csv.writer(f)
w.writerow(list)
# メールにラベルを付加します
def modify_label(self, msg_id, dict, user):
try:
message = self.service.users().messages().modify(userId=user, id=msg_id, body={'addLabelIds':dict[GMAIL_LABEL]}).execute()
except errors.HttpError as error:
raise('An error occurred: %s' % error)
# Gmailに設定されているラベルidを取得します
# 指定したラベルが存在しない場合は作成します。
def get_label_dict(self, user):
results = self.service.users().labels().list(userId=user).execute()
labellist = results.get('labels', [])
# ラベルの一覧を作成します
if labellist:
dict = {}
for i in labellist:
dict[ i['name'] ] = i['id']
# 指定したラベルが無い場合は作成して追加します。
try:
flg = dict[GMAIL_LABEL]
except KeyError:
flg = 0
if not flg:
label_obj = {'name': GMAIL_LABEL}
label = self.service.users().labels().create(userId=user, body=label_obj).execute()
dict[GMAIL_LABEL] = label['id']
return dict
# データをデータベースに保存します
def db_write(self, list):
reg = XXX(
title = list[0],
# タイムゾーン付に変換
regist_datetime = make_aware(datetime.datetime.strptime(list[1], '%Y/%m/%d %H:%M:%S')),
xxx = list[2],
)
reg.save()
おすすめ。


Comments