# Celery Periodic Tasks Configuration and OAuth Token Refresh

> This document explains how the MaiAgent platform uses the Celery Beat scheduler to execute periodic tasks, with a particular focus on the implementation of automatic OAuth Token refresh.

## 1. The Role of Celery in MaiAgent

Celery is a distributed task queue system that plays a critical role in the MaiAgent platform:

| Task Type              | Description                                                                        | Execution Method                           |
| ---------------------- | ---------------------------------------------------------------------------------- | ------------------------------------------ |
| **Asynchronous Tasks** | Time-consuming background processing such as document parsing and vectorization    | Celery Workers receive and execute tasks   |
| **Periodic Tasks**     | Scheduled maintenance jobs such as Token refresh and data cleanup                  | Celery Beat triggers tasks on schedule     |
| **Delayed Tasks**      | Tasks that need to run at a specific time, such as sending scheduled notifications | Executed with countdown or eta parameters  |
| **Task Chains**        | Complex workflows requiring sequential execution of multiple steps                 | Multiple tasks combined using Celery Chain |

Common use cases:

* **Knowledge Base Processing**: Time-consuming operations like document parsing, chunking, and vectorization after file upload
* **Scheduled Maintenance**: OAuth Token refresh, expired data cleanup, system health checks
* **Report Generation**: Periodic generation of usage statistics, conversation quality analysis, and other reports
* **Notification Delivery**: Batch email notifications, Webhook callbacks

***

## 2. Celery Architecture and Configuration

{% @mermaid/diagram content="flowchart LR
App\["Django Application"]
Beat\["Celery Beat<br/>(Scheduler)"]
Broker\["Message Broker<br/>(Redis/RabbitMQ)"]
Worker1\["Worker 1"]
Worker2\["Worker 2"]
WorkerN\["Worker N"]
DB\[("Database")]

```
App -- "Submit Task" --> Broker
Beat -- "Scheduled Trigger" --> Broker
Broker -- "Dispatch Task" --> Worker1
Broker -- "Dispatch Task" --> Worker2
Broker -- "Dispatch Task" --> WorkerN
Worker1 -- "Read/Write Data" --> DB
Worker2 -- "Read/Write Data" --> DB
WorkerN -- "Read/Write Data" --> DB" %}
```

### 2.1 Core Component Overview

* **Celery Beat (Scheduler)**: Triggers periodic tasks according to the configured schedule
* **Message Broker**: Uses Redis or RabbitMQ as the task queue
* **Celery Workers**: Worker processes that execute tasks; can be scaled horizontally
* **Result Backend**: Stores task execution results, typically using Redis or a database

### 2.2 Periodic Task Configuration

MaiAgent uses Django Celery Beat to manage periodic tasks:

| Task Name           | Schedule          | Description                                                   |
| ------------------- | ----------------- | ------------------------------------------------------------- |
| OAuth Token Refresh | Every hour        | Automatically refreshes OAuth Tokens that are about to expire |
| Session Cleanup     | Daily at midnight | Removes expired user sessions                                 |

**Schedule expression types**:

* **crontab**: Unix cron-like time expressions supporting minutes, hours, day of week, month, etc.
* **timedelta**: Fixed interval execution, e.g., every 30 minutes
* **solar**: Scheduling based on sunrise and sunset times

***

## 3. Automatic OAuth Token Refresh Task

### 3.1 Task Execution Flow

{% @mermaid/diagram content="sequenceDiagram
participant Beat as Celery Beat
participant Broker as Redis Queue
participant Worker as Celery Worker
participant DB as Database
participant OAuth as OAuth Provider

```
Note over Beat: Triggered once per hour
Beat->>Broker: Send refresh task
Broker->>Worker: Dispatch task to available Worker

Worker->>DB: Query Tokens about to expire<br/>(remaining validity < 1 hour)
DB-->>Worker: Return list of Tokens to refresh

loop Process each Token
    Worker->>DB: Check if Token has client credentials
    alt Has complete credentials
        Worker->>OAuth: Request new Token using Refresh Token
        OAuth-->>Worker: Return new Access Token
        Worker->>DB: Update Token information
    else Missing credentials
        Worker->>DB: Mark as legacy Token, skip processing
    end
end

Worker->>Broker: Report task completion" %}
```

### 3.2 Query Optimization Strategies

MaiAgent implements several query optimizations to improve Token refresh efficiency:

* **Legacy Token Filtering**: Excludes legacy Tokens that lack client\_id and client\_secret
* **Time Window Queries**: Only queries Tokens expiring within the next hour
* **Batch Processing**: Queries multiple Tokens to refresh at once, reducing database query count
* **Error Handling**: Records detailed error information for failed Token refreshes to facilitate troubleshooting

### 3.3 Error Handling Mechanisms

The Token refresh process may encounter the following error scenarios:

| Error Type                  | Possible Cause                                      | Handling Approach                                      |
| --------------------------- | --------------------------------------------------- | ------------------------------------------------------ |
| **Invalid Refresh Token**   | User has revoked authorization or Token has expired | Mark as requiring re-authorization, notify user        |
| **Network Timeout**         | OAuth service provider is temporarily unavailable   | Automatic retry up to 3 times with exponential backoff |
| **Rate Limiting**           | Exceeded the OAuth service's request rate limit     | Delayed retry to avoid being blocked                   |
| **Client Credential Error** | Incorrect client\_id or client\_secret              | Log error, notify administrator to check configuration |

***

## 4. Task Monitoring and Debugging

### 4.1 Task Status Monitoring

MaiAgent provides multiple ways to monitor Celery task status:

* **Flower Monitoring Dashboard**: Web interface for real-time task execution status and Worker health
* **Logging**: Detailed records of each task's execution time, parameters, and results
* **Metrics Collection**: Integration with Prometheus for collecting task execution metrics such as success rate and execution time
* **Alerting**: Automatic alerts when tasks fail consecutively or execution time is abnormal

### 4.2 Performance Optimization Recommendations

**Worker Pool Configuration**:

* **Concurrency Tuning**: Adjust Worker concurrency based on CPU core count and task type
* **Queue Separation**: Route urgent and non-urgent tasks to different queues
* **Task Priority**: Set higher priority for critical tasks

**Task Design Principles**:

* **Idempotency**: Ensure tasks can be safely retried without side effects
* **Time Sensitivity**: Set reasonable task expiration times to avoid executing stale tasks
* **Batch Processing**: For large data processing, use batching strategies to prevent memory overflow

***

## 5. Technical Advantages of MaiAgent's Celery Configuration

### 5.1 Reliability and Stability

* **Task Persistence**: Task information is stored in the Message Broker and persists even after system restarts
* **Automatic Retry**: Supports automatic retry mechanisms for failed tasks
* **Graceful Shutdown**: Workers complete in-progress tasks before shutting down
* **Health Checks**: Periodic monitoring of Worker and Beat runtime status

### 5.2 Scalability and Performance

* **Horizontal Scaling**: Easily add more Workers to handle increased traffic
* **Task Routing**: Route different task types to dedicated Worker pools
* **Asynchronous Execution**: Does not block the main application, improving system responsiveness
* **Batch Optimization**: Smart batch processing reduces database query count

### 5.3 Operations Friendliness

* **Visual Monitoring**: Flower provides an intuitive monitoring dashboard
* **Detailed Logging**: Complete records of task execution for troubleshooting
* **Dynamic Configuration**: Adjust schedules without restarting the system
* **Alert Integration**: Integrates with enterprise monitoring systems for timely anomaly detection

***

## 6. Related Documentation

* [OAuth 2.0 Integration and Automatic Token Refresh Mechanism](https://docs.maiagent.ai/tech/maiagent-tech-en/advanced-genai-tech/oauth-integration) - Learn more about OAuth Token refresh logic
* [Deployment Architecture](https://docs.maiagent.ai/tech/maiagent-tech-en/platform-development/architecture) - Understand Celery's role in the overall system architecture

### Reference Links

* [Celery Documentation](https://docs.celeryq.dev/)
* [Django Celery Beat](https://django-celery-beat.readthedocs.io/)
* [Flower - Celery Monitoring Tool](https://flower.readthedocs.io/)
