Appearance
Python Examples
Learn how to use SheetsToJson with Python using the requests library or built-in urllib.
Installation
Install the requests library (recommended):
bash
pip install requestsOr use Python's built-in urllib (no installation needed).
Basic Usage with Requests
Fetch All Rows
python
import requests
API_KEY = 'your_api_key_here'
SHEET_ID = 'your_sheet_id'
BASE_URL = 'https://api.sheetstojson.com'
def get_all_users():
url = f'{BASE_URL}/api/v1/{SHEET_ID}/Users'
headers = {'X-API-Key': API_KEY}
response = requests.get(url, headers=headers)
result = response.json()
if result['success']:
print(f"Found {len(result['data'])} users")
return result['data']
else:
raise Exception(result['message'])
# Usage
users = get_all_users()
for user in users:
print(f"{user['name']} - {user['email']}")Fetch Single Row
python
def get_user(user_id):
url = f'{BASE_URL}/api/v1/{SHEET_ID}/Users/{user_id}'
headers = {'X-API-Key': API_KEY}
response = requests.get(url, headers=headers)
result = response.json()
return result['data'] if result['success'] else None
# Usage
user = get_user('123')
if user:
print(f"Found user: {user['name']}")Create Row
python
def create_user(user_data):
url = f'{BASE_URL}/api/v1/{SHEET_ID}/Users'
headers = {
'X-API-Key': API_KEY,
'Content-Type': 'application/json'
}
response = requests.post(url, headers=headers, json=user_data)
result = response.json()
if result['success']:
created_user = result['data']['rows'][0]
print(f"User created: {created_user['id']}")
return created_user
else:
raise Exception(result['message'])
# Usage
new_user = create_user({
'name': 'Alice Johnson',
'email': '[email protected]',
'role': 'admin'
})Update Row
python
def update_user(user_id, updates):
url = f'{BASE_URL}/api/v1/{SHEET_ID}/Users/{user_id}'
headers = {
'X-API-Key': API_KEY,
'Content-Type': 'application/json'
}
response = requests.put(url, headers=headers, json=updates)
result = response.json()
return result['data'] if result['success'] else None
# Usage
from datetime import datetime
updated = update_user('123', {
'role': 'user',
'updated_at': datetime.now().isoformat()
})Delete Row
python
def delete_user(user_id):
url = f'{BASE_URL}/api/v1/{SHEET_ID}/Users/{user_id}'
headers = {'X-API-Key': API_KEY}
response = requests.delete(url, headers=headers)
result = response.json()
return result['success']
# Usage
if delete_user('123'):
print('User deleted successfully')Client Class
Create a reusable client class:
python
import requests
from typing import Optional, List, Dict, Any
class SheetsToJsonClient:
def __init__(self, api_key: str, sheet_id: str, base_url: str = 'https://api.sheetstojson.com'):
self.api_key = api_key
self.sheet_id = sheet_id
self.base_url = base_url
def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
url = f'{self.base_url}{endpoint}'
headers = kwargs.pop('headers', {})
headers['X-API-Key'] = self.api_key
response = requests.request(method, url, headers=headers, **kwargs)
response.raise_for_status()
return response.json()
def get_all(self, tab_name: str, limit: int = 100, offset: int = 0,
sort: Optional[str] = None, order: str = 'asc') -> List[Dict]:
params = {'limit': limit, 'offset': offset}
if sort:
params['sort'] = sort
params['order'] = order
endpoint = f'/api/v1/{self.sheet_id}/{tab_name}'
result = self._request('GET', endpoint, params=params)
return result['data'] if result['success'] else []
def get_one(self, tab_name: str, row_id: str) -> Optional[Dict]:
endpoint = f'/api/v1/{self.sheet_id}/{tab_name}/{row_id}'
result = self._request('GET', endpoint)
return result['data'] if result['success'] else None
def create(self, tab_name: str, data: Dict) -> Optional[Dict]:
endpoint = f'/api/v1/{self.sheet_id}/{tab_name}'
result = self._request('POST', endpoint, json=data)
return result['data']['rows'][0] if result['success'] else None
def update(self, tab_name: str, row_id: str, data: Dict) -> Optional[Dict]:
endpoint = f'/api/v1/{self.sheet_id}/{tab_name}/{row_id}'
result = self._request('PUT', endpoint, json=data)
return result['data'] if result['success'] else None
def delete(self, tab_name: str, row_id: str) -> bool:
endpoint = f'/api/v1/{self.sheet_id}/{tab_name}/{row_id}'
result = self._request('DELETE', endpoint)
return result['success']
# Usage
client = SheetsToJsonClient('your_api_key', 'your_sheet_id')
# Fetch users
users = client.get_all('Users', limit=50)
for user in users:
print(user['name'])
# Get specific user
user = client.get_one('Users', '123')
# Create user
new_user = client.create('Users', {
'name': 'Bob Smith',
'email': '[email protected]',
'role': 'user'
})
# Update user
updated = client.update('Users', '123', {'role': 'admin'})
# Delete user
client.delete('Users', '123')Type Hints with Dataclasses
Use Python dataclasses for type safety:
python
from dataclasses import dataclass, asdict
from typing import Optional, List
from datetime import datetime
@dataclass
class User:
name: str
email: str
role: str = 'user'
id: Optional[str] = None
created_at: Optional[str] = None
class TypedSheetsClient(SheetsToJsonClient):
def get_all_typed(self, tab_name: str, model_class, **kwargs) -> List:
data = self.get_all(tab_name, **kwargs)
return [model_class(**item) for item in data]
def create_typed(self, tab_name: str, instance) -> Optional:
data = asdict(instance)
result = self.create(tab_name, data)
return type(instance)(**result) if result else None
# Usage
client = TypedSheetsClient('your_api_key', 'your_sheet_id')
# Get typed users
users: List[User] = client.get_all_typed('Users', User, limit=50)
for user in users:
print(f"{user.name} ({user.role})")
# Create typed user
new_user = User(name='Alice', email='[email protected]', role='admin')
created = client.create_typed('Users', new_user)
print(f"Created user: {created.id}")Pagination
Iterate through large datasets:
python
def paginate_all(client, tab_name, page_size=100):
offset = 0
while True:
page = client.get_all(tab_name, limit=page_size, offset=offset)
if not page:
break
yield from page
offset += page_size
if len(page) < page_size:
break
# Usage
client = SheetsToJsonClient('your_api_key', 'your_sheet_id')
for user in paginate_all(client, 'Users', page_size=50):
print(user['name'])Error Handling
python
import requests
from typing import Optional
class SheetsAPIError(Exception):
def __init__(self, message: str, code: str, status: int):
super().__init__(message)
self.code = code
self.status = status
class SheetsToJsonClient:
# ... (previous code)
def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
url = f'{self.base_url}{endpoint}'
headers = kwargs.pop('headers', {})
headers['X-API-Key'] = self.api_key
try:
response = requests.request(method, url, headers=headers, **kwargs)
result = response.json()
if not result.get('success'):
raise SheetsAPIError(
result.get('message', 'Request failed'),
result.get('error', 'unknown'),
response.status_code
)
return result
except requests.exceptions.RequestException as e:
raise SheetsAPIError(str(e), 'network_error', 0)
# Usage with error handling
try:
users = client.get_all('Users')
print(users)
except SheetsAPIError as e:
if e.code == 'rate_limit_exceeded':
print('Rate limit hit, try again later')
elif e.code == 'unauthorized':
print('Invalid API key')
else:
print(f'Request failed: {e}')Rate Limiting with Backoff
python
import time
from functools import wraps
def retry_with_backoff(max_retries=3, backoff_factor=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except SheetsAPIError as e:
if e.code == 'rate_limit_exceeded' and attempt < max_retries - 1:
delay = backoff_factor ** attempt
print(f'Rate limited. Retrying in {delay}s...')
time.sleep(delay)
else:
raise
return wrapper
return decorator
class SheetsToJsonClient:
# ... (previous code)
@retry_with_backoff(max_retries=3)
def get_all(self, tab_name: str, **kwargs) -> List[Dict]:
# ... implementation
pass
# Usage - automatically retries on rate limit
client = SheetsToJsonClient('your_api_key', 'your_sheet_id')
users = client.get_all('Users') # Will retry if rate limitedBatch Operations
python
def create_users_batch(client, users_data, batch_size=100):
"""Create multiple users in batches"""
created_users = []
for i in range(0, len(users_data), batch_size):
batch = users_data[i:i + batch_size]
# API supports array of objects
result = client.create('Users', batch)
if result:
created_users.extend(result)
print(f"Created batch {i//batch_size + 1}: {len(batch)} users")
return created_users
# Usage
users_to_create = [
{'name': f'User {i}', 'email': f'user{i}@example.com'}
for i in range(500)
]
created = create_users_batch(client, users_to_create, batch_size=50)
print(f"Total created: {len(created)} users")Django Integration
python
# models.py
from django.db import models
class User(models.Model):
sheet_id = models.CharField(max_length=100)
name = models.CharField(max_length=200)
email = models.EmailField()
role = models.CharField(max_length=50)
synced_at = models.DateTimeField(auto_now=True)
# management/commands/sync_users.py
from django.core.management.base import BaseCommand
from myapp.models import User
from myapp.sheets_client import SheetsToJsonClient
class Command(BaseCommand):
help = 'Sync users from Google Sheets'
def handle(self, *args, **options):
client = SheetsToJsonClient(
settings.SHEETS_API_KEY,
settings.SHEET_ID
)
users_data = client.get_all('Users')
for user_data in users_data:
User.objects.update_or_create(
sheet_id=user_data['id'],
defaults={
'name': user_data['name'],
'email': user_data['email'],
'role': user_data.get('role', 'user')
}
)
self.stdout.write(
self.style.SUCCESS(f'Synced {len(users_data)} users')
)Flask API
python
from flask import Flask, jsonify, request
from sheets_client import SheetsToJsonClient
app = Flask(__name__)
client = SheetsToJsonClient(
app.config['SHEETS_API_KEY'],
app.config['SHEET_ID']
)
@app.route('/users', methods=['GET'])
def get_users():
limit = request.args.get('limit', 100, type=int)
offset = request.args.get('offset', 0, type=int)
users = client.get_all('Users', limit=limit, offset=offset)
return jsonify(users)
@app.route('/users/<user_id>', methods=['GET'])
def get_user(user_id):
user = client.get_one('Users', user_id)
return jsonify(user) if user else ('User not found', 404)
@app.route('/users', methods=['POST'])
def create_user():
data = request.json
user = client.create('Users', data)
return jsonify(user), 201
@app.route('/users/<user_id>', methods=['PUT'])
def update_user(user_id):
data = request.json
user = client.update('Users', user_id, data)
return jsonify(user) if user else ('User not found', 404)
@app.route('/users/<user_id>', methods=['DELETE'])
def delete_user(user_id):
success = client.delete('Users', user_id)
return ('', 204) if success else ('User not found', 404)Async with aiohttp
python
import asyncio
import aiohttp
class AsyncSheetsClient:
def __init__(self, api_key: str, sheet_id: str):
self.api_key = api_key
self.sheet_id = sheet_id
self.base_url = 'https://api.sheetstojson.com'
async def get_all(self, tab_name: str, session: aiohttp.ClientSession):
url = f'{self.base_url}/api/v1/{self.sheet_id}/{tab_name}'
headers = {'X-API-Key': self.api_key}
async with session.get(url, headers=headers) as response:
result = await response.json()
return result['data'] if result['success'] else []
async def main():
client = AsyncSheetsClient('your_api_key', 'your_sheet_id')
async with aiohttp.ClientSession() as session:
# Fetch multiple tabs concurrently
users_task = client.get_all('Users', session)
orders_task = client.get_all('Orders', session)
users, orders = await asyncio.gather(users_task, orders_task)
print(f"Users: {len(users)}, Orders: {len(orders)}")
# Run
asyncio.run(main())
