前の記事でGoogleフォームに入力されたデータをGASを使ってXML形式に加工し、Gmailアカウントに送信しました。
今回は、そのメールをPythonとDjnagoを使って処理し、データベースに保存するまでを行います。
メール本文はXML宣言のない不完全なXML形式なのでXML宣言を追加してPythonで処理し、CSVファイルとデータベースに保存します。
送信したメールの宛先はGmailアカウントなので、二段階認証の処理も行います。
環境
・Python 3.9.5
・Django 3.2.4
仕様
・Djangoのカスタムコマンド機能を使ってcronで動かします。(django-crontabは使っていない。)
・初回の実行時に二段階認証のURLが表示されるので、受信するGmailアカウントで認証します。
・Gailから二段階認証でメールを受信してメール本文を解析します。
・Gmailで設定しているラベルを取得します。
・指定したラベルが存在しなければ作成します。
・処理したメールにラベルを付けます。
・CSVファイルに保存します。
・データベースに保存します。
XML形式のメール本文
Pythonの「xml.etree.ElementTree」を使って処理します。
Googleフォームから送信するときに完全なXML形式にしておけば良いのですが、今回は諸事情でこの仕様にしています。
取得したメール本文は以下のようになっているので
<name>xxxx</name> <age>oooo</age>
XML宣言を付加し、データ部分を<data>と</data>で囲んでから処理します。
<?xml version="1.0" encoding="utf-8"?> <data> <name>xxxx</name> <age>oooo</age> </data>
cronの設定
「django-crontab」は使っていないのでシステムのcronを使います。
pyenvで仮想環境を構築しているので、pythonのパスは以下の通りとなります。
「manage.py」の引数は、作成したスクリプト名をtest.pyとすると「.py」を除いたtestとなります。
root権限で動かしたい場合は「sudo」で指定します。
sudo crontab -e /home/hoge/.pyenv/shims/python manage.py /home/www/wsgi/project/manage.py test
Djangoのカスタムコマンド
今回のアプリ名はcronで動かすので「cron」とします。
「settings.py」の「INSTALLED_APPS」に「cron」を追加します。
「cron」アプリを作成して反映させます。
アプリの作成
python manage.py startapp cron python manage.py makemigrations python manage.py migrate
構成
構成は以下の通りです。
「management」と「commands」ディレクトリは自分で作成します。
project ┣ cron ┣ management ┣ commands ┣ 作成したスクリプト.py ┣ credentials.json ┣ token.pickle
「credentials.json」の取得
Google Search Consoleで「credentials.json」を取得、認証を受けて「token.pickle」を作成します。
- 送信先のGmailアカウントでhttps://console.cloud.google.com/?hl=jaにアクセスします。
- 新規にプロジェクトを作成して選択します。
- 「APIとサービス」→「ライブラリ」で[Gmail API」を有効にします。
- 「APIとサービス」→「認証情報」をクリックします。
- 「認証情報を作成」→[OAuthクライアントID」を選択します。
- 「アプリケーションの種類」を「デスクトップアプリ」にして適当な「名前」を入力、「作成」をクリックします。
- 「OAuth同意画面」の設定は環境に合わせて設定します。
- 「APIとサービス」→「認証情報」→「OAuth2.0クライアントID」に作成したIDがありますので、右端の↓マークをクリックして認証情報をダウンロードします。
- ダウンロードしたファイルを名前を「credentials.json」に変更してスクリプト同じディレクトリに置きます。
スクリプト
cronで動かす場合は「credentials.json」と「token.pickle」は絶対パスで指定します。
# coding: utf-8 import base64 import os import csv import datetime import argparse import xml.etree.ElementTree as ET from django.utils.timezone import make_aware from googleapiclient.discovery import build from googleapiclient.errors import HttpError from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials # Commandクラス from django.core.management.base import BaseCommand, CommandError # モデルの読み込み from xxx.models import XXX # メールの検索条件 # 「subject」を実験ごとに変更します。 # https://support.google.com/mail/answer/7190 SEARCH_CRITERIA = { 'subject': "xxxxxxx", 'has': "nouserlabels", # ラベル無しのメール 'from': "xxxxx@xxx.xx", 'to': "xxxxx@xxx.xx" } # データ保存用ファイル CSV_FILE = './xxxxx.csv' # Gmailで振り分けるラベル GMAIL_LABEL = 'xxxxx' # OAuth2 for google SCOPES = ['https://www.googleapis.com/auth/gmail.modify'] CLIENT_SECRETS_FILE = '/home/www/wsgi/xxx/cron/management/commands/credentials.json' TOKEN_PICKLE_FILE = '/home/www/wsgi/xxx/cron/management/commands/token.pickle' # # クラス # class Command(BaseCommand): # 「python manage.py help reguser」で表示されるメッセージ help = 'xxxxxxxx' # コマンドが実行された際に呼ばれるメソッド def handle(self, *args, **options): user_id = 'me' creds = self.get_credentials() #self.service = build("gmail", "v1", credentials=creds, cache_discovery=False) self.service = build('gmail', 'v1', credentials=creds) query = self.build_search_criteria(SEARCH_CRITERIA) messages = self.get_mail_list(query, user_id) #print('Start script') if messages: label_dict = self.get_label_dict(user_id) for mes in messages: mes_id = mes['id'] self.modify_label(mes_id, label_dict, user_id) result = self.get_subject_message(mes_id, user_id) list = self.xml_tag_removal(result['message']) self.csv_write(CSV_FILE, list) self.db_write(list) else: print('No messages list.') # OAuth2(Authentication credential) def get_credentials(self): creds = None if os.path.exists(TOKEN_PICKLE_FILE): creds = Credentials.from_authorized_user_file(TOKEN_PICKLE_FILE, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES) creds = flow.run_console() with open(TOKEN_PICKLE_FILE, 'w') as token: token.write(creds.to_json()) return creds # メールリスト用のquery構築 def build_search_criteria(self, query_dict): query_string = '' for key, value in query_dict.items(): if value: query_string += key + ':' + value + ' ' return query_string # メールのリストを取得 def get_mail_list(self, query, user): try: results = self.service.users().messages().list(userId=user, q=query).execute() except HttpError as error: print('get_mail_list is error occurred: %s' % error) messages = results.get('messages', []) return messages # メール本文の取得 def get_subject_message(self, id, user): try: res = self.service.users().messages().get(userId=user, id=id).execute() except HttpError as err: print('action=get_message error={err}') raise result = {} subject = [d.get('value') for d in res['payload']['headers'] if d.get('name') == 'Subject'][0] result['subject'] = subject # text/plain if 'data' in res['payload']['body']: b64_message = res['payload']['body']['data'] # text/html elif res['payload']['parts'] is not None: b64_message = res['payload']['parts'][0]['body']['data'] message = self.base64_decode(b64_message) result['message'] = message return result # base64デコード def base64_decode(self, b64_message): #message = base64.urlsafe_b64decode(b64_message + '=' * (-len(b64_message) % 4)).decode(encoding='utf-8') message = base64.urlsafe_b64decode(b64_message).decode(encoding='utf-8') return message # XMLタグを除去してデータを取り出す def xml_tag_removal(self, xml): xml = '<?xml version="1.0" encoding="utf-8"?>\n' + '<data>\n' + xml + '</data>\n' # 全角(半角)文字の「?」が入っているとXMLタグとして解析できないので削除 xml = xml.replace('?', '') #xml = xml.replace('?', '') # 全角(半角)文字の「&」が入っているとXMLタグとして解析できないので削除 #xml = xml.replace('&', '') xml = xml.replace('&', '') # 全角空白を半角空白に変換します xml = xml.replace(' ', ' ') root = ET.fromstring(xml) ct = 0 list = [] # rootの要素数だけforで回してデータにアクセスします。 for data in root: list.append(root[ct].text) ct += 1 return list # データをCSVファイルに出力します def csv_write(self, csvfile, list): with open(csvfile, 'a') as f: w = csv.writer(f) w.writerow(list) # メールにラベルを付加します def modify_label(self, msg_id, dict, user): try: message = self.service.users().messages().modify(userId=user, id=msg_id, body={'addLabelIds':dict[GMAIL_LABEL]}).execute() except errors.HttpError as error: raise('An error occurred: %s' % error) # Gmailに設定されているラベルidを取得します # 指定したラベルが存在しない場合は作成します。 def get_label_dict(self, user): results = self.service.users().labels().list(userId=user).execute() labellist = results.get('labels', []) # ラベルの一覧を作成します if labellist: dict = {} for i in labellist: dict[ i['name'] ] = i['id'] # 指定したラベルが無い場合は作成して追加します。 try: flg = dict[GMAIL_LABEL] except KeyError: flg = 0 if not flg: label_obj = {'name': GMAIL_LABEL} label = self.service.users().labels().create(userId=user, body=label_obj).execute() dict[GMAIL_LABEL] = label['id'] return dict # データをデータベースに保存します def db_write(self, list): reg = XXX( title = list[0], # タイムゾーン付に変換 regist_datetime = make_aware(datetime.datetime.strptime(list[1], '%Y/%m/%d %H:%M:%S')), xxx = list[2], ) reg.save()
おすすめ。
Comments