Source code for singlet.io.googleapi.googleapi

# vim: fdm=indent
# author:     Fabio Zanini
# date:       09/01/17
# content:    Access Google Sheets via the REST API.
# Modules
import os
import numpy as np
import pandas as pd



# Classes / functions
[docs]class GoogleIOError(IOError): pass
[docs]class GoogleAPI: # If modifying these scopes, delete your previously saved credentials # NOTE: readonly for now scopes = 'https://www.googleapis.com/auth/spreadsheets.readonly' application_name = 'Google Sheet API to singlet' MAX_COLUMN = 'AZ' def __init__(self, spreadsheetId, spreadsheetname, client_id_filename): self.spreadsheetname = spreadsheetname self.client_id_filename = client_id_filename self.spreadsheetId = spreadsheetId self.set_service()
[docs] def get_credentials(self): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ from oauth2client import client from oauth2client import tools from oauth2client.file import Storage # Cached credentials client_secret_filename = ''.join([ os.getenv('HOME'), '/.credentials/sheets.googleapis.com-singlet-', self.spreadsheetname, '.json' ]) credential_dir = os.path.dirname(client_secret_filename) if not os.path.exists(credential_dir): os.makedirs(credential_dir) store = Storage(client_secret_filename) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(self.client_id_filename, self.scopes) flow.user_agent = self.application_name print('Storing credentials to '+client_secret_filename) credentials = tools.run_flow(flow, store) print('Stored credentials into:', client_secret_filename) return credentials
[docs] def set_service(self): import httplib2 from apiclient import discovery credentials = self.get_credentials() http = credentials.authorize(httplib2.Http()) discoveryUrl = ('https://sheets.googleapis.com/$discovery/rest?' 'version=v4') self.service = discovery.build( 'sheets', 'v4', http=http, discoveryServiceUrl=discoveryUrl)
[docs] def get_sheet_shape(self, sheetname): '''Get the data range of the sheet''' result = self.service.spreadsheets().get( spreadsheetId=self.spreadsheetId).execute() for sheet in result['sheets']: if sheet['properties']['title'] == sheetname: break else: raise GoogleIOError('Sheet not found') props = sheet['properties']['gridProperties'] return [props['rowCount'], props['columnCount']]
[docs] def get_last_column(self, sheetname): return self._convert_col_index_to_A1( self.get_sheet_shape(sheetname=sheetname)[1] - 1 )
[docs] def get_header_columns_indices(self, colnames, sheetname): '''Get the indices of columns in A1 notation''' # Get the column indices if self.MAX_COLUMN is None: last_col = self.get_last_column(sheetname=sheetname) else: last_col = self.MAX_COLUMN rangeName = sheetname+'!A1:'+last_col result = self.service.spreadsheets().values().get( spreadsheetId=self.spreadsheetId, range=rangeName).execute() header = result.get('values', [])[0] if not header: raise GoogleIOError('No header data found.') icols = {} for colname in colnames: icols[colname] = self._convert_col_index_to_A1(header.index(colname)) return icols
[docs] def get_data(self, sheetname, ranges=()): '''Get the whole sheet as a pandas dataframe''' if not ranges: # The max row number can be anything, Google figures it out if self.MAX_COLUMN is None: last_col = self.get_last_column(sheetname=sheetname) else: last_col = self.MAX_COLUMN ranges = 'A1:'+last_col+'100000' if isinstance(ranges, str): ranges = [ranges] squeeze = True else: squeeze = False ranges = [sheetname+'!'+r for r in ranges] # Get the data results = [] for r in ranges: result = self.service.spreadsheets().values().get( spreadsheetId=self.spreadsheetId, range=r).execute() values = result.get('values', []) results.append(values) if squeeze: results = results[0] return results
[docs] def set_sheet(self, sheetname, values): if self.MAX_COLUMN is None: last_col = self.get_last_column(sheetname=sheetname) else: last_col = self.MAX_COLUMN rangeName = sheetname+'!A1:'+last_col+'100000' self.service.spreadsheets().values().update( spreadsheetId=self.spreadsheetId, range=rangeName, valueInputOption='RAW', body={'values': values}, ).execute()
[docs] def get_sheetnames(self): spreadsheet = self.service.spreadsheets().get( spreadsheetId=self.spreadsheetId ).execute() sheetnames = {s['properties']['sheetId']: s['properties']['title'] for s in spreadsheet['sheets']} return sheetnames
[docs] def add_sheet(self, sheetname): self.service.spreadsheets().batchUpdate( spreadsheetId=self.spreadsheetId, body={'requests': [ {'addSheet': {'properties': {'title': sheetname}}} ]} ).execute()
[docs] def delete_sheet(self, sheetname): raise NotImplementedError('Function missing ON PURPOSE! Please delete sheets by hand.')
[docs] def get_sheet_id(self, sheetname): for sid, sname in self.get_sheetnames().items(): if sname == sheetname: return sid raise ValueError('Sheet not found')
[docs] def get_named_ranges(self, sheetname=None, fmt='dict', convert=True): sheetnames = self.get_sheetnames() response = self.service.spreadsheets().get( spreadsheetId=self.spreadsheetId, includeGridData=False, ).execute() # FIXME: Google does not return a sheetID when it is zero (?!) for r in response['namedRanges']: if 'sheetId' not in r['range']: r['range']['sheetId'] = 0 # restrict to that sheet named_ranges = [r for r in response['namedRanges'] if (sheetname is None) or (sheetnames[r['range']['sheetId']] == sheetname)] if fmt == 'list': if convert: for r in named_ranges: r['range'] = self.convert_range_json_to_A1(r['range']) return named_ranges elif fmt == 'dict': named_ranges = {r['name']: r['range'] for r in named_ranges} if convert: named_ranges = {k: self.convert_range_json_to_A1(r) for k, r in named_ranges.items()} return named_ranges else: raise ValueError('Format of named ranges not understood')
[docs] def get_cell_property(self, sheetname, field_string, ranges=()): if not ranges: ranges = 'A1:'+self.MAX_COLUMN+'100000' if isinstance(ranges, str): ranges = [ranges] squeeze = True else: squeeze = False ranges = ["'"+sheetname+"'!"+r for r in ranges] response = self.service.spreadsheets().get( spreadsheetId=self.spreadsheetId, ranges=ranges, includeGridData=True, ).execute() fields = field_string.split('.') data_ranges = [] for data_raw in response['sheets'][0]['data']: data = [] for row in data_raw['rowData']: data.append([]) if 'values' not in row: continue row = row['values'] for cell in row: datum = None for field in fields: if field not in cell: break cell = cell[field] else: # RGB to HEX if 'color' in fields[-1].lower(): cell = tuple([cell['red'], cell['green'], cell['blue']]) cell = self.rgb_to_hex(cell) datum = cell data[-1].append(datum) data_ranges.append(data) if squeeze: data_ranges = data_ranges[0] return data_ranges
[docs] def set_cell_property(self, sheetname, field_string, data, start_range=(0, 0)): sheet_id = self.get_sheet_id(sheetname) fields = field_string.split('.') # Convert to Google JSON data_json = [] for row in data: data_json.append({}) if not row: continue data_row = [] data_json[-1]['values'] = data_row for cell in row: data_cell = {} data_row.append(data_cell) if not cell: continue for field in fields[:-1]: data_cell[field] = {} data_cell = data_cell[field] field = fields[-1] # HEX to RGB if 'color' in field.lower(): cell = self.hex_to_rgb(cell) cell = {'red': cell[0], 'green': cell[1], 'blue': cell[2]} data_cell[field] = cell # Upload self.service.spreadsheets().batchUpdate( spreadsheetId=self.spreadsheetId, body={'requests': [ {'updateCells': { 'start': { 'sheetId': sheet_id, 'rowIndex': start_range[0], 'columnIndex': start_range[1], }, 'fields': field_string, 'rows': data_json, }}, ]} ).execute()
[docs] def set_dimension_size(self, sheetname, dimension, pixel_size, dim_range=(None, None)): if dimension not in ('rows', 'columns'): raise ValueError('Dimension must be in [\'rows\', \'columns\']') sheet_id = self.get_sheet_id(sheetname) r = {'sheetId': sheet_id, 'dimension': dimension.upper(), } if dim_range[0] is not None: r['startIndex'] = dim_range[0] if dim_range[1] is not None: r['endIndex'] = dim_range[1] self.service.spreadsheets().batchUpdate( spreadsheetId=self.spreadsheetId, body={'requests': [ {'updateDimensionProperties': { 'range': r, 'properties': {'pixelSize': pixel_size}, 'fields': 'pixelSize', }}, ]} ).execute()
[docs] def get_backgrounds(self, sheetname): return self.get_cell_property( sheetname, 'effectiveFormat.backgroundColor')
[docs] def set_backgrounds(self, sheetname, bkgs, start_range=(0, 0)): return self.set_cell_property( sheetname, 'userEnteredFormat.backgroundColor', bkgs, start_range=start_range)
[docs] def get_text_format(self, sheetname, prop, ranges=()): return self.get_cell_property( sheetname, 'effectiveFormat.textFormat.'+prop, ranges=ranges)
[docs] def set_text_format(self, sheetname, prop, fmts, start_range=(0, 0)): return self.set_cell_property( sheetname, 'userEnteredFormat.textFormat.'+prop, fmts, start_range=start_range)
[docs] def convert_range_json_to_A1(self, r): sheetname = self.get_sheetnames()[r['sheetId']] start = self.convert_row_col_to_A1( r['startRowIndex'], r['startColumnIndex']) end = self.convert_row_col_to_A1( r['endRowIndex'] - 1, r['endColumnIndex'] - 1) return "'"+sheetname+"'!"+start+':'+end
@staticmethod def _convert_col_index_to_A1(index): def convert(index): return chr(ord('A')+index) if index < 26: return convert(index) else: return convert(index // 26 - 1)+convert(index % 26) @classmethod
[docs] def convert_row_col_to_A1(self, row, col): return self._convert_col_index_to_A1(col)+str(row+1)
@staticmethod
[docs] def convert_row_col_to_A24(row, col): return chr(ord('A')+row)+str(col+1)
@staticmethod
[docs] def convert_A24_to_row_col(well): return (ord(well[0]) - ord('A'), int(well[1:]) - 1)
@staticmethod
[docs] def hex_to_rgb(value): """Return (red, green, blue) for the color given as #rrggbb.""" value = value.lstrip('#') lv = len(value) return tuple(1.0 / 255 * int(value[i:i + lv // 3], 16) for i in range(0, lv, lv // 3))
@staticmethod
[docs] def rgb_to_hex(rgb): """Return color as #rrggbb for the given color values.""" return '#%02x%02x%02x' % tuple(int(c * 255) for c in rgb)