のえら

技術備忘とかメモとか.間違いはつっこんでいただきたい所存.アフィリエイトはやっていません.

Google Cloud Bigquery で非同期処理

require 'google/cloud/bigquery'

bigquery_project = Google::Cloud::Bigquery.new(
  # Bigqueryの接続情報を指定
  project: 'test_project_id',
  keyfile: './service_account.json'
)

sample_query = <<-QUERY
  SELECT
    repository.*
  FROM
    `bigquery-public-data.samples.github_nested`
  ORDER BY repository.forks desc
  LIMIT 1000
QUERY

puts '###QUERY###'
puts sample_query

puts 'Running query'
query_job = bigquery_project.query_job(sample_query) # (1)

puts 'Waiting for query to complete'
query_job.wait_until_done! # (2)

puts 'Query results'
if !query_job.failed? # (3)
  puts query_job.query_results.first
else
  puts query_job.error
end


(1) 指定の project に対してクエリを投げてジョブを作成する

  • クエリを実行するジョブを Bigquery で作ってその情報を返す

ここで戻るのは Google::Cloud::Bigquery::QueryJob クラス

  • Bigquery 側では、ジョブの作成と実行・ジョブ処理に使用する一時テーブル(約24時間後に消える)の作成が行われる
  • レスポンスにクエリの問い合わせ結果は含まれない

(2) Bigquery 内で実行しているジョブに対してリクエストを投げて、状態を確認する

  • 2秒待機→リクエストを、クエリ実行ステータスが「完了(停止)」になるまで繰り返す(5回)
  • 終わるまで後続の処理は実行されない

(3) ジョブの実行結果にエラーが含まれるかチェックする

  • #done? は対象のジョブが停止していたら true を返すだけなので、成功/失敗は判断できない

エラーチェックしたい場合は #failed? を使う

  • エラーがある場合ジョブの error にエラー理由とメッセージが入っている


実行結果


$ bundle exec ruby async_query_sample.rb

###QUERY###
SELECT
repository.*
FROM
`bigquery-public-data.samples.github_nested`
ORDER BY repository.forks desc
LIMIT 1000
Running query
Waiting for query to complete
Query results:
{:url=>"https://github.com/octocat/Spoon-Knife", :has_downloads=>true, :created_at=>"2011/01/27 11:30:43 -0800", :has_issues=>true, :description=>"This repo is for demonstration purposes only. Comments and issues may or may not be responded to.", :forks=>6718, :fork=>false, :has_wiki=>false, :homepage=>"", :integrate_branch=>nil, :master_branch=>nil, :size=>284, :private=>false, :name=>"Spoon-Knife", :organization=>nil, :owner=>"octocat", :open_issues=>139, :watchers=>8025, :pushed_at=>"2011/10/31 07:34:52 -0700", :language=>nil}


(1)でクエリをぶん投げたあとは Bigquery 側で処理を進めているので、重いクエリを実行するときや集計をしている間にプログラム側で何かしたいとき、複数のクエリを投げて並列で集計させたいときなどに使える。