HGUC_システムエンジニア_1/1

HGUC_システムエンジニア_1/1

主にガンプラについて。gunpraでなくてgunplaなので気をつけよう

Twitterデータ取得_Postgre導入と一旦の完成まで

注意:2018年5月くらいに書いていた記事を移設したものです。

 

phytonでのtwitterからのデータ取得とデータベース取得について一気にがーっと書いた後、 ある程度直してみました。

現在は、これからどう分析しようかなーと試行錯誤中。 形態素解析?っていうんですかね? MeCab導入して、分解して、元単語のポジネガでツイートのポジネガを見てみようかなーとやって見てます。

まぁ上のは完成がいつになるか、果たして完成するのかも分からないので、 これまでの成果物をまとめ。

Postgre導入

多分ここから書いていなかった

前回までで取れるようになったデータを保存するために
ローカルにDB構築

フリーの奴なら何でもよかったんですが
参考の多さ的にMySQLかPostgreかなーと

いろいろ情報見てると
「現状両方とも十分商用実用可能だし、これからさらに2つの差はなくなっていく」みたいな記事も多く

ただ、現状大量のデータを分析させるとかであればPostgreの方が得意かもねー位の書き込みを見たので
いったんPostgreで進めてみます。

ちなみに仕事ではほぼOracleしか触ったことないです。

<参考>

https://lets.postgresql.jp/documents/tutorial/windows/
https://qiita.com/rubytomato@github/items/be540f2c1bd3605cacae

管理者実行して、ロケールはCで設定
そのままクラスタの作成まで特に問題なく

作成後、下をたたいてみたけど、実行できず

pg_ctl status
psql -U 管理者アカウント -l

環境変数の問題の様だったので、下記追加して、無事通りました
Path →ポスグレのパス\PostgreSQL\10\bin
PGDATA →ポスグレのパス\PostgreSQL\10\data

その後、下のコマンドを実行して、データベースのお試し作成とログインまで確認

サーバー起動
pg_ctl start

ステータスの確認
pg_ctl status

新しいユーザーの作成
createuser -U 管理者 -P 新ユーザ

新ユーザーがオーナーのtest_dbデータベースを作成
createdb -E UTF8 -O 新ユーザ -U 管理者 test_db

データベース一覧
psql -U postgres -l


psqlでのログイン
psql -d test_db -U 新ユーザ

とりあえず使える状態にはなった

取得からDB登録まで

参考

PythonでTwitterのデータを自動的に取得してデータベースに登録してみた - you diary

完成したものがあるので、それをはっちゃいますね。 ちなみにテーブル構造は参考サイトのまま

configファイルは張りませんが、 Twitter接続情報/DB接続情報/twitterAPI制限が書いてます。

課題としては、
・ 過去のデータが十分に取れない
 →日付を1日単位でsince/until指定したが10日以前のデータが取れなかった
・検索結果が多すぎる?と途中で検索が終わる
 →「スタバ」ならOKだったけど、「FGO」とかで調べると、最初の100件とらないうちに検索が終わっちゃう。。

これはどうしたもんですかね。。

<メイン>

from twitter import Twitter, OAuth
import datetime
import psycopg2.extras
from time import sleep
import config as conf
import common_functions as cf
import twitter_functions as tf
'''
main
    検索ワードリストに基づき、Twitterからつぶやきを取得し、DBに登録する。
    検索期間は過去1週間
'''


# 実行識別用
batch_tag = datetime.datetime.today().strftime('G%Y%m%d%H%M%S')

# 検索ワードのリスト
search_words = ['検索ワード1', '検索ワード2']

# 検索実行回数(APIリクエスト制限対策)
count_exec_search = 0

# 検索開始年月日 1週間前までを検索 API仕様で1週間以前は取得不可
s_ymd = (datetime.datetime.today() + datetime.timedelta(weeks=-1)).strftime('%Y-%m-%d')

# Twitter API接続用情報
twitter_obj = Twitter(
    auth=OAuth(
        conf.T_ACCESS_TOKEN,
        conf.T_ACCESS_TOKEN_SECRET,
        conf.T_CONSUMER_KEY,
        conf.T_CONSUMER_SECRET
    )
)

cf.echo_line()
cf.echo('[{0}]ツイート取得バッチ開始'.format(batch_tag))
cf.echo_line()

# 検索ワードリストでのループ
for search_word in search_words:

    # これより過去のツイートを取得
    m_id = -1
    # 次回実行用パラーメータの保持有無
    has_next_param = True

    # DBコネクション
    connection = psycopg2.connect(
        host=conf.DB_HOST,
        database=conf.DB_NAME,
        user=conf.DB_USER,
        password=conf.DB_PASS
    )
    cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)

    # 同検索ワードでの登録済み最新IDを取得し、検索開始IDとする
    s_id = tf.search_registered_max_id(cursor, search_word)

    cf.echo('[{0}]でのTweet検索'.format(search_word))

    # APIで取得できる100件ごとのループ
    while has_next_param:

        # 実行回数を加算
        count_exec_search += 1

        # API制限に引っかかる場合、制限解除までスリープ
        if count_exec_search % conf.T_WSEARCH_LIMIT == 0:
            cf.echo('リクエスト回数制限のため、{}秒スリープ'.format(conf.T_LIMIT_SECONDS))
            sleep(conf.T_LIMIT_SECONDS + 5)

        # Tweet検索
        # TODO 検索結果の時間帯が集中してたり、件数が多い場合取得が途中で止まる(TwitterAPIの仕様のよう)
        #      修正する場合、取得済みの日時をUntilで指定し、それ以前に対してリトライをかけるようにする。
        search_tweets = twitter_obj.search.tweets(q=search_word, count=100, since=s_ymd, since_id=s_id, max_id=m_id)

        # 取得した件数を表示
        tweet_count = len(search_tweets['statuses'])
        cf.echo('新規ツイートを{}件取得しました'.format(tweet_count))

        # 検索結果から次ループ実行用パラメータをセット
        m_id, has_next_param = tf.get_next_param(search_tweets)

        cf.echo('データベースにツイートを登録します')

        # 登録実行回数
        count_registered_tweet = 1

        # 取得したtweet分ループ
        for tweet_statuses in search_tweets['statuses']:

            tweet_id = tweet_statuses['id']
            tweet_statuses['batch_tag'] = batch_tag
            tweet_statuses['search_word'] = search_word

            # 既に登録しているツイートはスキップ
            if tf.has_duplicated_tweet(cursor, tweet_id):
                cf.echo('[{0}/{1}] ツイートID:{2}は既に登録済みです'.
                        format(*[count_registered_tweet, tweet_count, tweet_statuses['id']]))
                count_registered_tweet += 1
                continue

            # tweet_statusesから登録用dict作成
            tweet_data, place_data, meta_data, user_data = tf.make_register_from_statuses(tweet_statuses)

            # tweetデータの登録
            tf.insert_tweet_data(cursor, tweet_data)
            # metadataの登録
            tf.insert_meta_data(cursor, meta_data)
            # placeデータの登録
            if tweet_statuses['place'] is not None:
                tf.insert_place_data(cursor, place_data)

            cf.echo('[{0}/{1}] ツイートID:{2}を登録しました'.format(*[count_registered_tweet, tweet_count, tweet_id]))

            # userデータの登録
            user_id = user_data['id']
            # 既に登録しているツイートはスキップ
            if tf.has_duplicated_user(cursor, user_id):
                cf.echo('[{0}/{1}] ユーザーID:{2}は既に登録済みです'.
                        format(*[count_registered_tweet, tweet_count, user_id]))
                count_registered_tweet += 1
                continue
            tf.insert_user_data(cursor, user_data)
            cf.echo('[{0}/{1}] ユーザーID:{2}を登録しました'.format(*[count_registered_tweet, tweet_count, user_id]))

            count_registered_tweet += 1

        # コミット
        connection.commit()
cf.echo_line()
cf.echo('[{0}]ツイート取得バッチ終了'.format(batch_tag))
cf.echo_line()

<common_function>

from datetime import datetime
"""
共通関数の定義
"""


def echo(text):
    """
    タイムスタンプ付与でのコンソール出力
    """
    print(datetime.now().strftime("%Y/%m/%d %H:%M:%S"), text)


def echo_line():
    """
    ライン出力
    """
    print('--------------------------------------------------------------')


def del_column(dect, key):
    """
    dectから使用したカラムを削除
    """
    if key in dect.keys():
        del dect[key]

<twitter_function>

import re
import psycopg2
import psycopg2.extras
from pytz import timezone
from dateutil import parser
import common_functions as cf

"""
Twitter用関数の定義
"""


def search_registered_max_id(cursor, search_word):
    """
    検索キーワードでの登録済み最新レコードのIDを返却
    引数  cursor:DB接続用
          search_word:検索ワード
    戻値  max_id(存在しない場合は-1)
    """
    # SQL作成
    statement = 'SELECT max(id) FROM t_tw_tweets WHERE search_word = %s'
    # バインドセット
    cursor.execute(statement, [search_word])
    # 結果取り出し
    result = cursor.fetchone()
    # 取得済みのツイートのIDをSIDにセット
    if result[0] is not None:
        max_id = result[0]
    # 存在しない場合 -1 返却
    else:
        max_id = -1
    return max_id


def get_next_param(search_tweets):
    """
     次回実行用のパラメータを取得
     パラメータのリストが連結された文字列で返却されるため、
    引数  search_tweets:APIでの検索結果
    戻値  max_id(存在しない場合は-1)
          has_next_result(True/False)
    """
    # next_resultsの存在確認
    if 'next_results' in search_tweets['search_metadata'].keys():
        text_next_parameters = search_tweets['search_metadata']['next_results']

        # 1文字目"?"削除
        text_next_parameters = text_next_parameters[1:]
        # "&"で分割
        next_parameters = text_next_parameters.split('&')
        # 分割したパラメータ分ループし、max_idの検索
        for next_parameter in next_parameters:
            # max_idがあればidと次回実行の判定をセットしループを抜ける
            if re.match("max_id=*", next_parameter):
                max_id = next_parameter[7:]
                has_next_param = True
                return max_id, has_next_param
            else:
                max_id = -1
                has_next_param = False
        # max_idがない場合は次回実行の判定Falseで返却
        return max_id, has_next_param
    # next_resultsが存在しない場合、次回実行の判定をFalseに
    else:
        # 次回実行の判定をFalseに
        max_id = -1
        has_next_param = False
        return max_id, has_next_param


def has_duplicated_tweet(cursor, tweet_id):
    """
    登録済みtweetとの重複チェック
    引数  cursor:DB接続用
          tweet_id:チェック対象のid
    戻値  True(重複あり)/False(重複なし)
    """
    statement = 'SELECT id FROM t_tw_tweets WHERE id = %s'
    cursor.execute(statement, [tweet_id])
    result = cursor.fetchone()
    if result is not None:
        return True
    else:
        return False


def has_duplicated_user(cursor, user_id):
    """
    登録済みuserとの重複チェック
    引数  cursor:DB接続用
          user_id:チェック対象のid
    戻値  True(重複あり)/False(重複なし)
    """
    statement = 'SELECT id FROM m_tw_users WHERE id = %s'
    cursor.execute(statement, [user_id])
    result = cursor.fetchone()
    if result is not None:
        return True
    else:
        return False


def make_register_from_statuses(tweet_statuses):
    """
    APIで取得したデータからDB登録用のdict作成
    引数  tweet_statuses:APIで取得したツイート
          tweet_id:ID
    戻値  tweet_data:t_tw_tweets登録用dict
          place_data:t_tw_tweetplaces登録用dict
          meta_data:t_tw_metadata登録用dict
          user_data:m_tw_users登録用dict
    """
    tweet_data = {}
    place_data = {}
    meta_data = {}
    user_data = {}
    tweet_id = tweet_statuses['id']
    for tweet_status_key in tweet_statuses.keys():
        # find実行用に型変換
        tweet_status_value = str(tweet_statuses[tweet_status_key])

        # 追加情報は登録しない
        if tweet_status_key == 'entities' or tweet_status_key == 'extended_entities':
            continue
        # tweet日時は日本時間に変換
        elif tweet_status_key == 'created_at':
            utc_time = tweet_statuses[tweet_status_key]
            jst_time = parser.parse(utc_time).astimezone(timezone('Asia/Tokyo'))
            tweet_data[tweet_status_key] = jst_time

        # meta情報セット
        elif tweet_status_key == 'metadata':
            meta_data = tweet_statuses[tweet_status_key]
            meta_data['tweet_id'] = tweet_id
            continue

        # 位置情報セット
        elif tweet_status_key == 'place':
            if tweet_statuses[tweet_status_key] is not None:
                place_data = tweet_statuses[tweet_status_key]
                place_data['tweet_id'] = tweet_id
                cf.del_column(place_data, 'id')
                cf.del_column(place_data, 'url')
                cf.del_column(place_data, 'contained_within')
                cf.del_column(place_data, 'bounding_box')
                cf.del_column(place_data, 'attributes')
                continue

        # ユーザ情報セット
        elif tweet_status_key == 'user':
            user_data = tweet_statuses[tweet_status_key]
            #locationが100文字以上あるケースがあったため、切り出し
            user_data['location'] = (user_data['location'])[:99]
            tweet_data['user_id'] = user_data['id']
            cf.del_column(user_data, 'id_str')
            cf.del_column(user_data, 'entities')
            cf.del_column(user_data, 'profile_image_url')
            cf.del_column(user_data, 'profile_banner_url')
            cf.del_column(user_data, 'profile_background_color')
            cf.del_column(user_data, 'profile_background_image_url')
            cf.del_column(user_data, 'profile_background_image_url_https')
            cf.del_column(user_data, 'profile_background_tile')
            cf.del_column(user_data, 'profile_image_url_https')
            cf.del_column(user_data, 'profile_link_color')
            cf.del_column(user_data, 'profile_sidebar_border_color')
            cf.del_column(user_data, 'profile_sidebar_fill_color')
            cf.del_column(user_data, 'profile_text_color')
            cf.del_column(user_data, 'profile_use_background_image')

        # {が見つかった場合、文字列に変換してセット
        elif tweet_status_value.find('{') > -1:
            tweet_data[tweet_status_key] = str(tweet_statuses[tweet_status_key])
        # その他セット
        else:
            tweet_data[tweet_status_key] = tweet_statuses[tweet_status_key]

    return tweet_data, place_data, meta_data, user_data


def insert_tweet_data(cursor, tweet_data):
    """
    t_tw_tweetsへの登録
    引数  cursor:DB接続用
          tweet_data:t_tw_tweets登録用dict
    """
    statement = 'INSERT INTO t_tw_tweets (%s) VALUES %s'
    cursor.execute(statement, (psycopg2.extensions.AsIs(','.join(tweet_data.keys())), tuple(tweet_data.values())))


def insert_place_data(cursor, place_data):
    """
    t_tw_tweetplacesへの登録
    引数  cursor:DB接続用
          place_data:t_tw_tweetplaces登録用dict
    """
    statement = 'INSERT INTO t_tw_tweetplaces (%s) VALUES %s'
    cursor.execute(statement, (psycopg2.extensions.AsIs(','.join(place_data.keys())), tuple(place_data.values())))


def insert_meta_data(cursor, meta_data):
    """
    t_tw_tweetplacesへの登録
    引数  cursor:DB接続用
          meta_data:t_tw_metadata登録用dict
    """
    statement = 'INSERT INTO t_tw_metadata (%s) VALUES %s'
    cursor.execute(statement, (psycopg2.extensions.AsIs(','.join(meta_data.keys())), tuple(meta_data.values())))


def insert_user_data(cursor, user_data):
    """
    m_tw_usersへの登録
    引数  cursor:DB接続用
          user_data:m_tw_users登録用dict
    """
    statement = 'INSERT INTO m_tw_users (%s) VALUES %s'
    cursor.execute(statement, (psycopg2.extensions.AsIs(','.join(user_data.keys())), tuple(user_data.values())))

ソースを全部張ると非常に見ずらいすね。。

個人開発用のgithubのアカウントも作ってみたので、こんどからそっちを張るようにしたい。 まぁまだ、使い方全然わかってないんだけど。

無料アカウントは 基本的には全体公開でプルは自由、プッシュとかは許可しないとかできるのかな。

Gitのお勉強もしないと。

それではまた