Twitterデータ取得_Postgre導入と一旦の完成まで
注意:2018年5月くらいに書いていた記事を移設したものです。
phytonでのtwitterからのデータ取得とデータベース取得について一気にがーっと書いた後、 ある程度直してみました。
現在は、これからどう分析しようかなーと試行錯誤中。 形態素解析?っていうんですかね? MeCab導入して、分解して、元単語のポジネガでツイートのポジネガを見てみようかなーとやって見てます。
まぁ上のは完成がいつになるか、果たして完成するのかも分からないので、 これまでの成果物をまとめ。
Postgre導入
多分ここから書いていなかった
前回までで取れるようになったデータを保存するために
ローカルにDB構築
フリーの奴なら何でもよかったんですが
参考の多さ的にMySQLかPostgreかなーと
いろいろ情報見てると
「現状両方とも十分商用実用可能だし、これからさらに2つの差はなくなっていく」みたいな記事も多く
ただ、現状大量のデータを分析させるとかであればPostgreの方が得意かもねー位の書き込みを見たので
いったんPostgreで進めてみます。
ちなみに仕事ではほぼOracleしか触ったことないです。
<参考>
https://lets.postgresql.jp/documents/tutorial/windows/
https://qiita.com/rubytomato@github/items/be540f2c1bd3605cacae
管理者実行して、ロケールはCで設定
そのままクラスタの作成まで特に問題なく
作成後、下をたたいてみたけど、実行できず
pg_ctl status psql -U 管理者アカウント -l
環境変数の問題の様だったので、下記追加して、無事通りました
Path →ポスグレのパス\PostgreSQL\10\bin
PGDATA →ポスグレのパス\PostgreSQL\10\data
その後、下のコマンドを実行して、データベースのお試し作成とログインまで確認
サーバー起動 pg_ctl start ステータスの確認 pg_ctl status 新しいユーザーの作成 createuser -U 管理者 -P 新ユーザ 新ユーザーがオーナーのtest_dbデータベースを作成 createdb -E UTF8 -O 新ユーザ -U 管理者 test_db データベース一覧 psql -U postgres -l psqlでのログイン psql -d test_db -U 新ユーザ
とりあえず使える状態にはなった
取得からDB登録まで
参考
PythonでTwitterのデータを自動的に取得してデータベースに登録してみた - you diary
完成したものがあるので、それをはっちゃいますね。 ちなみにテーブル構造は参考サイトのまま
configファイルは張りませんが、 Twitter接続情報/DB接続情報/twitterのAPI制限が書いてます。
課題としては、
・ 過去のデータが十分に取れない
→日付を1日単位でsince/until指定したが10日以前のデータが取れなかった
・検索結果が多すぎる?と途中で検索が終わる
→「スタバ」ならOKだったけど、「FGO」とかで調べると、最初の100件とらないうちに検索が終わっちゃう。。
これはどうしたもんですかね。。
<メイン>
from twitter import Twitter, OAuth import datetime import psycopg2.extras from time import sleep import config as conf import common_functions as cf import twitter_functions as tf ''' main 検索ワードリストに基づき、Twitterからつぶやきを取得し、DBに登録する。 検索期間は過去1週間 ''' # 実行識別用 batch_tag = datetime.datetime.today().strftime('G%Y%m%d%H%M%S') # 検索ワードのリスト search_words = ['検索ワード1', '検索ワード2'] # 検索実行回数(APIリクエスト制限対策) count_exec_search = 0 # 検索開始年月日 1週間前までを検索 API仕様で1週間以前は取得不可 s_ymd = (datetime.datetime.today() + datetime.timedelta(weeks=-1)).strftime('%Y-%m-%d') # Twitter API接続用情報 twitter_obj = Twitter( auth=OAuth( conf.T_ACCESS_TOKEN, conf.T_ACCESS_TOKEN_SECRET, conf.T_CONSUMER_KEY, conf.T_CONSUMER_SECRET ) ) cf.echo_line() cf.echo('[{0}]ツイート取得バッチ開始'.format(batch_tag)) cf.echo_line() # 検索ワードリストでのループ for search_word in search_words: # これより過去のツイートを取得 m_id = -1 # 次回実行用パラーメータの保持有無 has_next_param = True # DBコネクション connection = psycopg2.connect( host=conf.DB_HOST, database=conf.DB_NAME, user=conf.DB_USER, password=conf.DB_PASS ) cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor) # 同検索ワードでの登録済み最新IDを取得し、検索開始IDとする s_id = tf.search_registered_max_id(cursor, search_word) cf.echo('[{0}]でのTweet検索'.format(search_word)) # APIで取得できる100件ごとのループ while has_next_param: # 実行回数を加算 count_exec_search += 1 # API制限に引っかかる場合、制限解除までスリープ if count_exec_search % conf.T_WSEARCH_LIMIT == 0: cf.echo('リクエスト回数制限のため、{}秒スリープ'.format(conf.T_LIMIT_SECONDS)) sleep(conf.T_LIMIT_SECONDS + 5) # Tweet検索 # TODO 検索結果の時間帯が集中してたり、件数が多い場合取得が途中で止まる(TwitterAPIの仕様のよう) # 修正する場合、取得済みの日時をUntilで指定し、それ以前に対してリトライをかけるようにする。 search_tweets = twitter_obj.search.tweets(q=search_word, count=100, since=s_ymd, since_id=s_id, max_id=m_id) # 取得した件数を表示 tweet_count = len(search_tweets['statuses']) cf.echo('新規ツイートを{}件取得しました'.format(tweet_count)) # 検索結果から次ループ実行用パラメータをセット m_id, has_next_param = tf.get_next_param(search_tweets) cf.echo('データベースにツイートを登録します') # 登録実行回数 count_registered_tweet = 1 # 取得したtweet分ループ for tweet_statuses in search_tweets['statuses']: tweet_id = tweet_statuses['id'] tweet_statuses['batch_tag'] = batch_tag tweet_statuses['search_word'] = search_word # 既に登録しているツイートはスキップ if tf.has_duplicated_tweet(cursor, tweet_id): cf.echo('[{0}/{1}] ツイートID:{2}は既に登録済みです'. format(*[count_registered_tweet, tweet_count, tweet_statuses['id']])) count_registered_tweet += 1 continue # tweet_statusesから登録用dict作成 tweet_data, place_data, meta_data, user_data = tf.make_register_from_statuses(tweet_statuses) # tweetデータの登録 tf.insert_tweet_data(cursor, tweet_data) # metadataの登録 tf.insert_meta_data(cursor, meta_data) # placeデータの登録 if tweet_statuses['place'] is not None: tf.insert_place_data(cursor, place_data) cf.echo('[{0}/{1}] ツイートID:{2}を登録しました'.format(*[count_registered_tweet, tweet_count, tweet_id])) # userデータの登録 user_id = user_data['id'] # 既に登録しているツイートはスキップ if tf.has_duplicated_user(cursor, user_id): cf.echo('[{0}/{1}] ユーザーID:{2}は既に登録済みです'. format(*[count_registered_tweet, tweet_count, user_id])) count_registered_tweet += 1 continue tf.insert_user_data(cursor, user_data) cf.echo('[{0}/{1}] ユーザーID:{2}を登録しました'.format(*[count_registered_tweet, tweet_count, user_id])) count_registered_tweet += 1 # コミット connection.commit() cf.echo_line() cf.echo('[{0}]ツイート取得バッチ終了'.format(batch_tag)) cf.echo_line()
<common_function>
from datetime import datetime """ 共通関数の定義 """ def echo(text): """ タイムスタンプ付与でのコンソール出力 """ print(datetime.now().strftime("%Y/%m/%d %H:%M:%S"), text) def echo_line(): """ ライン出力 """ print('--------------------------------------------------------------') def del_column(dect, key): """ dectから使用したカラムを削除 """ if key in dect.keys(): del dect[key]
<twitter_function>
import re import psycopg2 import psycopg2.extras from pytz import timezone from dateutil import parser import common_functions as cf """ Twitter用関数の定義 """ def search_registered_max_id(cursor, search_word): """ 検索キーワードでの登録済み最新レコードのIDを返却 引数 cursor:DB接続用 search_word:検索ワード 戻値 max_id(存在しない場合は-1) """ # SQL作成 statement = 'SELECT max(id) FROM t_tw_tweets WHERE search_word = %s' # バインドセット cursor.execute(statement, [search_word]) # 結果取り出し result = cursor.fetchone() # 取得済みのツイートのIDをSIDにセット if result[0] is not None: max_id = result[0] # 存在しない場合 -1 返却 else: max_id = -1 return max_id def get_next_param(search_tweets): """ 次回実行用のパラメータを取得 パラメータのリストが連結された文字列で返却されるため、 引数 search_tweets:APIでの検索結果 戻値 max_id(存在しない場合は-1) has_next_result(True/False) """ # next_resultsの存在確認 if 'next_results' in search_tweets['search_metadata'].keys(): text_next_parameters = search_tweets['search_metadata']['next_results'] # 1文字目"?"削除 text_next_parameters = text_next_parameters[1:] # "&"で分割 next_parameters = text_next_parameters.split('&') # 分割したパラメータ分ループし、max_idの検索 for next_parameter in next_parameters: # max_idがあればidと次回実行の判定をセットしループを抜ける if re.match("max_id=*", next_parameter): max_id = next_parameter[7:] has_next_param = True return max_id, has_next_param else: max_id = -1 has_next_param = False # max_idがない場合は次回実行の判定Falseで返却 return max_id, has_next_param # next_resultsが存在しない場合、次回実行の判定をFalseに else: # 次回実行の判定をFalseに max_id = -1 has_next_param = False return max_id, has_next_param def has_duplicated_tweet(cursor, tweet_id): """ 登録済みtweetとの重複チェック 引数 cursor:DB接続用 tweet_id:チェック対象のid 戻値 True(重複あり)/False(重複なし) """ statement = 'SELECT id FROM t_tw_tweets WHERE id = %s' cursor.execute(statement, [tweet_id]) result = cursor.fetchone() if result is not None: return True else: return False def has_duplicated_user(cursor, user_id): """ 登録済みuserとの重複チェック 引数 cursor:DB接続用 user_id:チェック対象のid 戻値 True(重複あり)/False(重複なし) """ statement = 'SELECT id FROM m_tw_users WHERE id = %s' cursor.execute(statement, [user_id]) result = cursor.fetchone() if result is not None: return True else: return False def make_register_from_statuses(tweet_statuses): """ APIで取得したデータからDB登録用のdict作成 引数 tweet_statuses:APIで取得したツイート tweet_id:ID 戻値 tweet_data:t_tw_tweets登録用dict place_data:t_tw_tweetplaces登録用dict meta_data:t_tw_metadata登録用dict user_data:m_tw_users登録用dict """ tweet_data = {} place_data = {} meta_data = {} user_data = {} tweet_id = tweet_statuses['id'] for tweet_status_key in tweet_statuses.keys(): # find実行用に型変換 tweet_status_value = str(tweet_statuses[tweet_status_key]) # 追加情報は登録しない if tweet_status_key == 'entities' or tweet_status_key == 'extended_entities': continue # tweet日時は日本時間に変換 elif tweet_status_key == 'created_at': utc_time = tweet_statuses[tweet_status_key] jst_time = parser.parse(utc_time).astimezone(timezone('Asia/Tokyo')) tweet_data[tweet_status_key] = jst_time # meta情報セット elif tweet_status_key == 'metadata': meta_data = tweet_statuses[tweet_status_key] meta_data['tweet_id'] = tweet_id continue # 位置情報セット elif tweet_status_key == 'place': if tweet_statuses[tweet_status_key] is not None: place_data = tweet_statuses[tweet_status_key] place_data['tweet_id'] = tweet_id cf.del_column(place_data, 'id') cf.del_column(place_data, 'url') cf.del_column(place_data, 'contained_within') cf.del_column(place_data, 'bounding_box') cf.del_column(place_data, 'attributes') continue # ユーザ情報セット elif tweet_status_key == 'user': user_data = tweet_statuses[tweet_status_key] #locationが100文字以上あるケースがあったため、切り出し user_data['location'] = (user_data['location'])[:99] tweet_data['user_id'] = user_data['id'] cf.del_column(user_data, 'id_str') cf.del_column(user_data, 'entities') cf.del_column(user_data, 'profile_image_url') cf.del_column(user_data, 'profile_banner_url') cf.del_column(user_data, 'profile_background_color') cf.del_column(user_data, 'profile_background_image_url') cf.del_column(user_data, 'profile_background_image_url_https') cf.del_column(user_data, 'profile_background_tile') cf.del_column(user_data, 'profile_image_url_https') cf.del_column(user_data, 'profile_link_color') cf.del_column(user_data, 'profile_sidebar_border_color') cf.del_column(user_data, 'profile_sidebar_fill_color') cf.del_column(user_data, 'profile_text_color') cf.del_column(user_data, 'profile_use_background_image') # {が見つかった場合、文字列に変換してセット elif tweet_status_value.find('{') > -1: tweet_data[tweet_status_key] = str(tweet_statuses[tweet_status_key]) # その他セット else: tweet_data[tweet_status_key] = tweet_statuses[tweet_status_key] return tweet_data, place_data, meta_data, user_data def insert_tweet_data(cursor, tweet_data): """ t_tw_tweetsへの登録 引数 cursor:DB接続用 tweet_data:t_tw_tweets登録用dict """ statement = 'INSERT INTO t_tw_tweets (%s) VALUES %s' cursor.execute(statement, (psycopg2.extensions.AsIs(','.join(tweet_data.keys())), tuple(tweet_data.values()))) def insert_place_data(cursor, place_data): """ t_tw_tweetplacesへの登録 引数 cursor:DB接続用 place_data:t_tw_tweetplaces登録用dict """ statement = 'INSERT INTO t_tw_tweetplaces (%s) VALUES %s' cursor.execute(statement, (psycopg2.extensions.AsIs(','.join(place_data.keys())), tuple(place_data.values()))) def insert_meta_data(cursor, meta_data): """ t_tw_tweetplacesへの登録 引数 cursor:DB接続用 meta_data:t_tw_metadata登録用dict """ statement = 'INSERT INTO t_tw_metadata (%s) VALUES %s' cursor.execute(statement, (psycopg2.extensions.AsIs(','.join(meta_data.keys())), tuple(meta_data.values()))) def insert_user_data(cursor, user_data): """ m_tw_usersへの登録 引数 cursor:DB接続用 user_data:m_tw_users登録用dict """ statement = 'INSERT INTO m_tw_users (%s) VALUES %s' cursor.execute(statement, (psycopg2.extensions.AsIs(','.join(user_data.keys())), tuple(user_data.values())))
ソースを全部張ると非常に見ずらいすね。。
個人開発用のgithubのアカウントも作ってみたので、こんどからそっちを張るようにしたい。 まぁまだ、使い方全然わかってないんだけど。
無料アカウントは 基本的には全体公開でプルは自由、プッシュとかは許可しないとかできるのかな。
Gitのお勉強もしないと。
それではまた