"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import typepy
from tabledata import TableData
from .._constant import TableNameTemplate as tnt
from .._validator import TextValidator
from ..error import APIError, OpenError
from .core import SpreadSheetLoader
[docs]class GoogleSheetsTableLoader(SpreadSheetLoader):
"""
Concrete class of Google Spreadsheet loader.
.. py:attribute:: table_name
Table name string. Defaults to ``%(sheet)s``.
:param str file_path: Path to the Google Sheets credential JSON file.
:Dependency Packages:
- `gspread <https://github.com/burnash/gspread>`_
- `SimpleSQLite <https://github.com/thombashi/SimpleSQLite>`_
- `oauth2client <https://pypi.org/project/oauth2client>`_
- `pyOpenSSL <https://pypi.org/project/pyOpenSSL>`_
:Examples:
:ref:`example-gs-table-loader`
"""
@property
def _sheet_name(self):
return self._worksheet.title
@property
def _row_count(self):
return self._worksheet.row_count
@property
def _col_count(self):
return self._worksheet.col_count
def __init__(self, file_path=None, quoting_flags=None, type_hints=None, type_hint_rules=None):
super().__init__(file_path, quoting_flags, type_hints, type_hint_rules)
self.title = None
self.start_row = 0
self._validator = TextValidator(file_path)
self.__all_values = None
[docs] def load(self):
"""
Load table data from a Google Spreadsheet.
This method consider :py:attr:`.source` as a path to the
credential JSON file to access Google Sheets API.
The method automatically search the header row start from
:py:attr:`.start_row`. The condition of the header row is that
all of the columns have value (except empty columns).
:return:
Loaded table data. Return one |TableData| for each sheet in
the workbook. The table name for data will be determined by
:py:meth:`~.GoogleSheetsTableLoader.make_table_name`.
:rtype: iterator of |TableData|
:raises pytablereader.DataError:
If the header row is not found.
:raises pytablereader.OpenError:
If the spread sheet not found.
"""
import gspread
from oauth2client.service_account import ServiceAccountCredentials
self._validate_table_name()
self._validate_title()
scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"]
credentials = ServiceAccountCredentials.from_json_keyfile_name(self.source, scope)
gc = gspread.authorize(credentials)
try:
for worksheet in gc.open(self.title).worksheets():
self._worksheet = worksheet
self.__all_values = [row for row in worksheet.get_all_values()]
if self._is_empty_sheet():
continue
try:
self.__strip_empty_col()
except ValueError:
continue
value_matrix = self.__all_values[self._get_start_row_idx() :]
try:
headers = value_matrix[0]
rows = value_matrix[1:]
except IndexError:
continue
self.inc_table_count()
yield TableData(
self.make_table_name(),
headers,
rows,
dp_extractor=self.dp_extractor,
type_hints=self._extract_type_hints(headers),
)
except gspread.exceptions.SpreadsheetNotFound:
raise OpenError(f"spreadsheet '{self.title}' not found")
except gspread.exceptions.APIError as e:
raise APIError(e)
def _is_empty_sheet(self):
return len(self.__all_values) <= 1
def _get_start_row_idx(self):
row_idx = 0
for row_values in self.__all_values:
if all([typepy.is_not_null_string(value) for value in row_values]):
break
row_idx += 1
return self.start_row + row_idx
def _validate_title(self):
if typepy.is_null_string(self.title):
raise ValueError("spreadsheet title is empty")
def _make_table_name(self):
self._validate_title()
kv_mapping = self._get_basic_tablename_keyvalue_mapping()
kv_mapping[tnt.TITLE] = self.title
try:
kv_mapping[tnt.SHEET] = self._sheet_name
except AttributeError:
kv_mapping[tnt.SHEET] = ""
return self._expand_table_name_format(kv_mapping)
def __strip_empty_col(self):
from simplesqlite import connect_memdb
from simplesqlite.query import Attr, AttrList
con = connect_memdb()
tmp_table_name = "tmp"
headers = [f"a{i:d}" for i in range(len(self.__all_values[0]))]
con.create_table_from_data_matrix(tmp_table_name, headers, self.__all_values)
for col_idx, header in enumerate(headers):
result = con.select(select=Attr(header), table_name=tmp_table_name)
if any([typepy.is_not_null_string(record[0]) for record in result.fetchall()]):
break
strip_headers = headers[col_idx:]
if typepy.is_empty_sequence(strip_headers):
raise ValueError()
result = con.select(select=AttrList(strip_headers), table_name=tmp_table_name)
self.__all_values = result.fetchall()