Python Examples

Backend integration with Python

Setup

Install the requests library using pip:

pip install requests

💡 Tip: Use python-dotenv

Install python-dotenv to manage your API keys: pip install python-dotenv

1. Simple GET Request

Fetch account information:

import requests
import os

# Get API key from environment
api_key = os.getenv('CONNECT2PRINT_API_KEY', 'c2p_live_your_api_key_here')
base_url = 'https://app1.connect2print.com/api/v1'

def get_account():
    """Fetch account information"""
    url = f'{base_url}/account'
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Accept': 'application/json'
    }

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()

        data = response.json()

        if data.get('success'):
            account = data['data']
            print(f"Account: {account['company_name']}")
            print(f"Email: {account['email']}")
            return account

    except requests.exceptions.HTTPError as e:
        print(f"HTTP Error: {e}")
        print(f"Response: {e.response.text}")
    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")

# Usage
if __name__ == '__main__':
    get_account()

2. List Orders with Pagination

Fetch all orders with pagination:

import requests
import time

api_key = 'c2p_live_your_api_key_here'
base_url = 'https://app1.connect2print.com/api/v1'

def make_request(url, headers):
    """Make API request with error handling"""
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()

def fetch_all_orders():
    """Fetch all orders using pagination"""
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Accept': 'application/json'
    }

    page = 1
    per_page = 50
    all_orders = []

    while True:
        url = f'{base_url}/orders?page={page}&per_page={per_page}'

        try:
            data = make_request(url, headers)

            if not data.get('success'):
                raise Exception(f"API Error: {data.get('error', {}).get('message')}")

            # Add orders from this page
            all_orders.extend(data['data'])

            # Check if there are more pages
            has_more = data['pagination']['has_more']

            print(f"Fetched page {page}, total orders: {len(all_orders)}")

            if not has_more:
                break

            page += 1

            # Rate limit protection
            time.sleep(0.1)  # 100ms delay

        except requests.exceptions.RequestException as e:
            print(f"Error fetching orders: {e}")
            raise

    return all_orders

# Usage
if __name__ == '__main__':
    orders = fetch_all_orders()
    print(f"Total orders fetched: {len(orders)}")

3. Create a Customer

Create a new customer with validation:

import requests

api_key = 'c2p_live_your_api_key_here'
base_url = 'https://app1.connect2print.com/api/v1'

def create_customer(customer_data):
    """Create a new customer"""
    url = f'{base_url}/customers'
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Content-Type': 'application/json',
        'Accept': 'application/json'
    }

    try:
        response = requests.post(url, json=customer_data, headers=headers)

        if response.status_code == 201:
            data = response.json()
            customer = data['data']
            print("Customer created successfully!")
            print(f"Customer ID: {customer['id']}")
            print(f"Email: {customer['email']}")
            return customer

        elif response.status_code == 400:
            data = response.json()
            print("Validation errors:")
            if 'details' in data.get('error', {}):
                for field, errors in data['error']['details'].items():
                    print(f"  {field}: {', '.join(errors)}")
            raise ValueError("Validation failed")

        else:
            response.raise_for_status()

    except requests.exceptions.RequestException as e:
        print(f"Error creating customer: {e}")
        if hasattr(e, 'response') and e.response is not None:
            print(f"Response: {e.response.text}")
        raise

# Usage
if __name__ == '__main__':
    new_customer = {
        'email': 'john.doe@example.com',
        'company_name': 'Acme Corporation',
        'contact_name': 'John Doe',
        'phone': '+45 12 34 56 78',
        'billing_address': {
            'street_address': 'Main Street 123',
            'city': 'Copenhagen',
            'postal_code': '1000',
            'country': 'DK'
        }
    }

    try:
        customer = create_customer(new_customer)
        print(f"Created customer: {customer}")
    except Exception as e:
        print(f"Failed: {e}")

4. Update an Order

Update an existing order using PATCH:

import requests

api_key = 'c2p_live_your_api_key_here'
base_url = 'https://app1.connect2print.com/api/v1'

def update_order(order_id, updates):
    """Update an existing order"""
    url = f'{base_url}/orders/{order_id}'
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Content-Type': 'application/json',
        'Accept': 'application/json'
    }

    try:
        response = requests.patch(url, json=updates, headers=headers)

        if response.status_code == 200:
            data = response.json()
            order = data['data']
            print("Order updated successfully!")
            print(f"New status: {order['status']}")
            return order

        elif response.status_code == 404:
            print("Order not found")
            return None

        else:
            response.raise_for_status()

    except requests.exceptions.RequestException as e:
        print(f"Error updating order: {e}")
        if hasattr(e, 'response') and e.response is not None:
            print(f"Response: {e.response.text}")
        raise

# Usage
if __name__ == '__main__':
    updates = {
        'status': 'processing',
        'internal_notes': 'Printing started on machine #2'
    }

    order = update_order(123, updates)
    if order:
        print(f"Order {order['id']} status: {order['status']}")

5. Upload a File

Upload a production file:

import requests
import os

api_key = 'c2p_live_your_api_key_here'
base_url = 'https://app1.connect2print.com/api/v1'

def upload_file(file_path, order_id):
    """Upload a production file"""
    url = f'{base_url}/files/upload'
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Accept': 'application/json'
    }

    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")

    try:
        with open(file_path, 'rb') as f:
            files = {
                'file': (os.path.basename(file_path), f, 'application/pdf')
            }
            data = {
                'related_type': 'order',
                'related_id': order_id
            }

            response = requests.post(url, files=files, data=data, headers=headers)

        if response.status_code == 201:
            result = response.json()
            file_data = result['data']
            print("File uploaded successfully!")
            print(f"File ID: {file_data['id']}")
            print(f"Filename: {file_data['filename']}")
            print(f"Size: {file_data['file_size'] / 1024:.2f} KB")
            return file_data

        else:
            response.raise_for_status()

    except requests.exceptions.RequestException as e:
        print(f"Error uploading file: {e}")
        if hasattr(e, 'response') and e.response is not None:
            print(f"Response: {e.response.text}")
        raise

# Usage
if __name__ == '__main__':
    file_data = upload_file('./documents/brochure.pdf', 123)
    print(f"Uploaded file: {file_data}")

6. Reusable API Client Class

Create a comprehensive API client:

import requests
import time
from typing import Optional, Dict, List, Generator

class Connect2PrintAPI:
    """Connect2Print API Client"""

    def __init__(self, api_key: str, base_url: str = None):
        self.api_key = api_key
        self.base_url = base_url or 'https://app1.connect2print.com/api/v1'
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Bearer {self.api_key}',
            'Accept': 'application/json'
        })

    def _request(self, method: str, endpoint: str, **kwargs) -> Dict:
        """Make API request with error handling"""
        url = f'{self.base_url}{endpoint}'

        try:
            response = self.session.request(method, url, **kwargs)

            # Handle different status codes
            if response.status_code in [200, 201]:
                return response.json()

            elif response.status_code == 400:
                data = response.json()
                raise ValidationError(
                    data.get('error', {}).get('message', 'Validation error'),
                    data.get('error', {}).get('details', {})
                )

            elif response.status_code == 401:
                raise AuthenticationError("Invalid API key")

            elif response.status_code == 403:
                raise AuthorizationError("Insufficient permissions")

            elif response.status_code == 404:
                raise NotFoundError("Resource not found")

            elif response.status_code == 429:
                data = response.json()
                retry_after = data.get('retry_after', 60)
                raise RateLimitError(f"Rate limit exceeded. Retry after {retry_after}s")

            elif response.status_code >= 500:
                raise ServerError(f"Server error ({response.status_code})")

            else:
                response.raise_for_status()

        except requests.exceptions.RequestException as e:
            if not hasattr(e, 'response') or e.response is None:
                raise NetworkError(str(e))
            raise

    # Orders
    def get_orders(self, **params) -> Dict:
        """Get list of orders"""
        return self._request('GET', '/orders', params=params)

    def get_order(self, order_id: int) -> Dict:
        """Get single order"""
        result = self._request('GET', f'/orders/{order_id}')
        return result['data']

    def create_order(self, order_data: Dict) -> Dict:
        """Create a new order"""
        result = self._request('POST', '/orders', json=order_data)
        return result['data']

    def update_order(self, order_id: int, updates: Dict) -> Dict:
        """Update an order"""
        result = self._request('PATCH', f'/orders/{order_id}', json=updates)
        return result['data']

    # Customers
    def get_customers(self, **params) -> Dict:
        """Get list of customers"""
        return self._request('GET', '/customers', params=params)

    def get_customer(self, customer_id: int) -> Dict:
        """Get single customer"""
        result = self._request('GET', f'/customers/{customer_id}')
        return result['data']

    def create_customer(self, customer_data: Dict) -> Dict:
        """Create a new customer"""
        result = self._request('POST', '/customers', json=customer_data)
        return result['data']

    # Pagination helper
    def paginate_all(self, endpoint: str, **params) -> Generator[Dict, None, None]:
        """Generator that yields all items from paginated endpoint"""
        page = 1
        per_page = params.pop('per_page', 100)

        while True:
            result = self._request('GET', endpoint, params={
                **params,
                'page': page,
                'per_page': per_page
            })

            for item in result['data']:
                yield item

            if not result['pagination']['has_more']:
                break

            page += 1

            # Rate limit protection
            time.sleep(0.1)

# Custom exceptions
class APIError(Exception):
    """Base API error"""
    pass

class ValidationError(APIError):
    """Validation error (400)"""
    def __init__(self, message: str, details: Dict = None):
        super().__init__(message)
        self.details = details

class AuthenticationError(APIError):
    """Authentication error (401)"""
    pass

class AuthorizationError(APIError):
    """Authorization error (403)"""
    pass

class NotFoundError(APIError):
    """Not found error (404)"""
    pass

class RateLimitError(APIError):
    """Rate limit error (429)"""
    pass

class ServerError(APIError):
    """Server error (5xx)"""
    pass

class NetworkError(APIError):
    """Network/connection error"""
    pass

# Usage example
if __name__ == '__main__':
    api = Connect2PrintAPI('c2p_live_your_api_key_here')

    try:
        # Get a single order
        order = api.get_order(123)
        print(f"Order: {order['id']}")

        # Create a customer
        customer = api.create_customer({
            'email': 'test@example.com',
            'company_name': 'Test Company'
        })
        print(f"Customer ID: {customer['id']}")

        # Fetch all orders
        print("Fetching all orders...")
        for order in api.paginate_all('/orders'):
            print(f"Order {order['id']}: {order['total']}")

    except ValidationError as e:
        print(f"Validation failed: {e.details}")
    except RateLimitError as e:
        print(f"Rate limited: {e}")
    except APIError as e:
        print(f"API error: {e}")

7. Webhook Handler (Flask)

Receive and validate webhook notifications:

from flask import Flask, request, jsonify
import hmac
import hashlib
import os

app = Flask(__name__)

WEBHOOK_SECRET = os.getenv('WEBHOOK_SECRET', 'your_webhook_secret_here')

@app.route('/webhooks/connect2print', methods=['POST'])
def handle_webhook():
    """Handle incoming webhooks from Connect2Print"""

    # Get signature from headers
    signature = request.headers.get('X-Webhook-Signature')

    if not signature:
        return jsonify({'error': 'Missing signature'}), 401

    # Get raw body
    body = request.get_data(as_text=True)

    # Calculate expected signature
    expected_signature = hmac.new(
        WEBHOOK_SECRET.encode('utf-8'),
        body.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()

    # Verify signature
    if not hmac.compare_digest(signature, expected_signature):
        return jsonify({'error': 'Invalid signature'}), 401

    # Parse webhook data
    webhook = request.json

    print(f"Webhook received: {webhook['event']}")

    # Handle different events
    event = webhook['event']
    data = webhook['data']

    if event == 'order.created':
        handle_order_created(data)
    elif event == 'order.completed':
        handle_order_completed(data)
    elif event == 'invoice.paid':
        handle_invoice_paid(data)
    elif event == 'shipment.delivered':
        handle_shipment_delivered(data)
    else:
        print(f"Unknown event: {event}")

    # Always return 200 OK
    return jsonify({'success': True})

# Event handlers
def handle_order_created(order):
    print(f"New order #{order['id']} from {order['customer']['company_name']}")
    # Send notification, update database, etc.

def handle_order_completed(order):
    print(f"Order #{order['id']} completed")
    # Trigger fulfillment process

def handle_invoice_paid(invoice):
    print(f"Invoice #{invoice['id']} paid")
    # Update accounting system

def handle_shipment_delivered(shipment):
    print(f"Shipment #{shipment['id']} delivered")
    # Send delivery confirmation

if __name__ == '__main__':
    app.run(port=3000, debug=True)

Best Practices

1. Use environment variables for secrets

Use os.getenv() or python-dotenv to manage API keys and secrets.

2. Use type hints

Add type hints to functions for better code documentation and IDE support.

3. Create custom exception classes

Define specific exceptions for different error types to handle them appropriately.

4. Use context managers for files

Always use 'with' statements when working with files to ensure proper cleanup.

5. Implement logging

Use Python's logging module instead of print() for production code.