GoogleフォームでGASを使ってメール送信する方法(処理編)

Google

前の記事でGoogleフォームに入力されたデータをGASを使ってXML形式に加工し、Gmailアカウントに送信しました。
今回は、そのメールをPythonとDjnagoを使って処理し、データベースに保存するまでを行います。

メール本文はXML宣言のない不完全なXML形式なのでXML宣言を追加してPythonで処理し、CSVファイルとデータベースに保存します。
送信したメールの宛先はGmailアカウントなので、二段階認証の処理も行います。

環境

・Python 3.9.5
・Django 3.2.4

仕様

・Djangoのカスタムコマンド機能を使ってcronで動かします。(django-crontabは使っていない。)
・初回の実行時に二段階認証のURLが表示されるので、受信するGmailアカウントで認証します。
・Gailから二段階認証でメールを受信してメール本文を解析します。
・Gmailで設定しているラベルを取得します。
・指定したラベルが存在しなければ作成します。
・処理したメールにラベルを付けます。
・CSVファイルに保存します。
・データベースに保存します。

XML形式のメール本文

Pythonの「xml.etree.ElementTree」を使って処理します。
Googleフォームから送信するときに完全なXML形式にしておけば良いのですが、今回は諸事情でこの仕様にしています。
取得したメール本文は以下のようになっているので

<name>xxxx</name>
<age>oooo</age>

XML宣言を付加し、データ部分を<data>と</data>で囲んでから処理します。

<?xml version="1.0" encoding="utf-8"?>
<data>
<name>xxxx</name>
<age>oooo</age>
</data>

cronの設定

「django-crontab」は使っていないのでシステムのcronを使います。
pyenvで仮想環境を構築しているので、pythonのパスは以下の通りとなります。
「manage.py」の引数は、作成したスクリプト名をtest.pyとすると「.py」を除いたtestとなります。
root権限で動かしたい場合は「sudo」で指定します。

sudo crontab -e

/home/hoge/.pyenv/shims/python manage.py /home/www/wsgi/project/manage.py test

Djangoのカスタムコマンド

今回のアプリ名はcronで動かすので「cron」とします。
「settings.py」の「INSTALLED_APPS」に「cron」を追加します。
「cron」アプリを作成して反映させます。

アプリの作成

python manage.py startapp cron
python manage.py makemigrations
python manage.py migrate

構成

構成は以下の通りです。
「management」と「commands」ディレクトリは自分で作成します。

project
 ┣ cron
    ┣ management
	    ┣ commands
            ┣ 作成したスクリプト.py
            ┣ credentials.json
            ┣ token.pickle

「credentials.json」の取得

Google Search Consoleで「credentials.json」を取得、認証を受けて「token.pickle」を作成します。

  1. 送信先のGmailアカウントでhttps://console.cloud.google.com/?hl=jaにアクセスします。
  2. 新規にプロジェクトを作成して選択します。
  3. 「APIとサービス」→「ライブラリ」で[Gmail API」を有効にします。
  4. 「APIとサービス」→「認証情報」をクリックします。
  5. 「認証情報を作成」→[OAuthクライアントID」を選択します。
  6. 「アプリケーションの種類」を「デスクトップアプリ」にして適当な「名前」を入力、「作成」をクリックします。
  7. 「OAuth同意画面」の設定は環境に合わせて設定します。
  8. 「APIとサービス」→「認証情報」→「OAuth2.0クライアントID」に作成したIDがありますので、右端の↓マークをクリックして認証情報をダウンロードします。
  9. ダウンロードしたファイルを名前を「credentials.json」に変更してスクリプト同じディレクトリに置きます。

スクリプト

cronで動かす場合は「credentials.json」と「token.pickle」は絶対パスで指定します。

# coding: utf-8

import base64
import os
import csv
import datetime
import argparse
import xml.etree.ElementTree as ET
from django.utils.timezone import make_aware

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials

# Commandクラス
from django.core.management.base import BaseCommand, CommandError

# モデルの読み込み
from xxx.models import XXX

# メールの検索条件
# 「subject」を実験ごとに変更します。
# https://support.google.com/mail/answer/7190
SEARCH_CRITERIA = {
    'subject': "xxxxxxx",
    'has': "nouserlabels", # ラベル無しのメール
    'from': "xxxxx@xxx.xx",
    'to': "xxxxx@xxx.xx"
}

# データ保存用ファイル
CSV_FILE = './xxxxx.csv'

# Gmailで振り分けるラベル
GMAIL_LABEL = 'xxxxx'

# OAuth2 for google
SCOPES              = ['https://www.googleapis.com/auth/gmail.modify']
CLIENT_SECRETS_FILE = '/home/www/wsgi/xxx/cron/management/commands/credentials.json'
TOKEN_PICKLE_FILE   = '/home/www/wsgi/xxx/cron/management/commands/token.pickle'

#
# クラス
#
class Command(BaseCommand):
    # 「python manage.py help reguser」で表示されるメッセージ
    help = 'xxxxxxxx'

    # コマンドが実行された際に呼ばれるメソッド
    def handle(self, *args, **options):
        user_id      = 'me'
        creds        = self.get_credentials()
       #self.service = build("gmail", "v1", credentials=creds, cache_discovery=False)
        self.service = build('gmail', 'v1', credentials=creds)
        query        = self.build_search_criteria(SEARCH_CRITERIA)
        messages     = self.get_mail_list(query, user_id)


        #print('Start script')
        if messages:
            label_dict = self.get_label_dict(user_id)
            for mes in messages:
                mes_id = mes['id']
                self.modify_label(mes_id, label_dict, user_id)
                result = self.get_subject_message(mes_id, user_id)
                list = self.xml_tag_removal(result['message'])
                self.csv_write(CSV_FILE, list)
                self.db_write(list)
        else:
            print('No messages list.')

    # OAuth2(Authentication credential)
    def get_credentials(self):
        creds = None
        if os.path.exists(TOKEN_PICKLE_FILE):
            creds = Credentials.from_authorized_user_file(TOKEN_PICKLE_FILE, SCOPES)
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
                creds = flow.run_console()
            with open(TOKEN_PICKLE_FILE, 'w') as token:
                token.write(creds.to_json())
        return creds

    # メールリスト用のquery構築
    def build_search_criteria(self, query_dict):
        query_string = ''
        for key, value in query_dict.items():
            if value:
                query_string += key + ':' + value + ' '
        return query_string

    # メールのリストを取得
    def get_mail_list(self, query, user):
        try:
            results = self.service.users().messages().list(userId=user, q=query).execute()
        except HttpError as error:
            print('get_mail_list is error occurred: %s' % error)
        messages = results.get('messages', [])
        return messages

    # メール本文の取得
    def get_subject_message(self, id, user):
        try:
            res = self.service.users().messages().get(userId=user, id=id).execute()
        except HttpError as err:
            print('action=get_message error={err}')
            raise

        result  = {}
        subject = [d.get('value') for d in res['payload']['headers'] if d.get('name') == 'Subject'][0]
        result['subject'] = subject
        # text/plain
        if 'data' in res['payload']['body']:
            b64_message = res['payload']['body']['data']
        # text/html
        elif res['payload']['parts'] is not None:
            b64_message = res['payload']['parts'][0]['body']['data']
        message = self.base64_decode(b64_message)
        result['message'] = message
        return result

    # base64デコード
    def base64_decode(self, b64_message):
        #message = base64.urlsafe_b64decode(b64_message + '=' * (-len(b64_message) % 4)).decode(encoding='utf-8')
        message = base64.urlsafe_b64decode(b64_message).decode(encoding='utf-8')
        return message

    # XMLタグを除去してデータを取り出す
    def xml_tag_removal(self, xml):
        xml = '<?xml version="1.0" encoding="utf-8"?>\n' + '<data>\n' + xml + '</data>\n'
        # 全角(半角)文字の「?」が入っているとXMLタグとして解析できないので削除
        xml = xml.replace('?', '')
        #xml = xml.replace('?', '')
        # 全角(半角)文字の「&」が入っているとXMLタグとして解析できないので削除
        #xml = xml.replace('&', '')
        xml = xml.replace('&', '')
        # 全角空白を半角空白に変換します
        xml = xml.replace(' ', ' ')
        root = ET.fromstring(xml)
        ct   = 0
        list = []
        # rootの要素数だけforで回してデータにアクセスします。
        for data in root:
            list.append(root[ct].text)
            ct += 1
        return list

    # データをCSVファイルに出力します
    def csv_write(self, csvfile, list):
        with open(csvfile, 'a') as f:
            w = csv.writer(f)
            w.writerow(list)

    # メールにラベルを付加します
    def modify_label(self, msg_id, dict, user):
        try:
            message = self.service.users().messages().modify(userId=user, id=msg_id, body={'addLabelIds':dict[GMAIL_LABEL]}).execute()
        except errors.HttpError as error:
            raise('An error occurred: %s' % error)

    # Gmailに設定されているラベルidを取得します
    # 指定したラベルが存在しない場合は作成します。
    def get_label_dict(self, user):
        results = self.service.users().labels().list(userId=user).execute()
        labellist = results.get('labels', [])
        # ラベルの一覧を作成します
        if labellist:
            dict = {}
            for i in labellist:
                dict[ i['name'] ] = i['id']
        # 指定したラベルが無い場合は作成して追加します。
        try:
           flg = dict[GMAIL_LABEL]
        except KeyError:
           flg = 0
        if not flg:
            label_obj = {'name': GMAIL_LABEL}
            label = self.service.users().labels().create(userId=user, body=label_obj).execute()
            dict[GMAIL_LABEL] = label['id']
        return dict

    # データをデータベースに保存します
    def db_write(self, list):
        reg = XXX(
            title = list[0],
            # タイムゾーン付に変換
            regist_datetime = make_aware(datetime.datetime.strptime(list[1], '%Y/%m/%d %H:%M:%S')),
            xxx = list[2],
        )
        reg.save()

おすすめ。

Comments