# Celery Periodic Tasks Configuration and OAuth Token Refresh

> This document explains how the MaiAgent platform uses the Celery Beat scheduler to execute periodic tasks, with a particular focus on the implementation of automatic OAuth Token refresh.

## 1. The Role of Celery in MaiAgent

Celery is a distributed task queue system that plays a critical role in the MaiAgent platform:

| Task Type              | Description                                                                        | Execution Method                           |
| ---------------------- | ---------------------------------------------------------------------------------- | ------------------------------------------ |
| **Asynchronous Tasks** | Time-consuming background processing such as document parsing and vectorization    | Celery Workers receive and execute tasks   |
| **Periodic Tasks**     | Scheduled maintenance jobs such as Token refresh and data cleanup                  | Celery Beat triggers tasks on schedule     |
| **Delayed Tasks**      | Tasks that need to run at a specific time, such as sending scheduled notifications | Executed with countdown or eta parameters  |
| **Task Chains**        | Complex workflows requiring sequential execution of multiple steps                 | Multiple tasks combined using Celery Chain |

Common use cases:

* **Knowledge Base Processing**: Time-consuming operations like document parsing, chunking, and vectorization after file upload
* **Scheduled Maintenance**: OAuth Token refresh, expired data cleanup, system health checks
* **Report Generation**: Periodic generation of usage statistics, conversation quality analysis, and other reports
* **Notification Delivery**: Batch email notifications, Webhook callbacks

***

## 2. Celery Architecture and Configuration

{% @mermaid/diagram content="flowchart LR
App\["Django Application"]
Beat\["Celery Beat<br/>(Scheduler)"]
Broker\["Message Broker<br/>(Redis/RabbitMQ)"]
Worker1\["Worker 1"]
Worker2\["Worker 2"]
WorkerN\["Worker N"]
DB\[("Database")]

```
App -- "Submit Task" --> Broker
Beat -- "Scheduled Trigger" --> Broker
Broker -- "Dispatch Task" --> Worker1
Broker -- "Dispatch Task" --> Worker2
Broker -- "Dispatch Task" --> WorkerN
Worker1 -- "Read/Write Data" --> DB
Worker2 -- "Read/Write Data" --> DB
WorkerN -- "Read/Write Data" --> DB" %}
```

### 2.1 Core Component Overview

* **Celery Beat (Scheduler)**: Triggers periodic tasks according to the configured schedule
* **Message Broker**: Uses Redis or RabbitMQ as the task queue
* **Celery Workers**: Worker processes that execute tasks; can be scaled horizontally
* **Result Backend**: Stores task execution results, typically using Redis or a database

### 2.2 Periodic Task Configuration

MaiAgent uses Django Celery Beat to manage periodic tasks:

| Task Name           | Schedule          | Description                                                   |
| ------------------- | ----------------- | ------------------------------------------------------------- |
| OAuth Token Refresh | Every hour        | Automatically refreshes OAuth Tokens that are about to expire |
| Session Cleanup     | Daily at midnight | Removes expired user sessions                                 |

**Schedule expression types**:

* **crontab**: Unix cron-like time expressions supporting minutes, hours, day of week, month, etc.
* **timedelta**: Fixed interval execution, e.g., every 30 minutes
* **solar**: Scheduling based on sunrise and sunset times

***

## 3. Automatic OAuth Token Refresh Task

### 3.1 Task Execution Flow

{% @mermaid/diagram content="sequenceDiagram
participant Beat as Celery Beat
participant Broker as Redis Queue
participant Worker as Celery Worker
participant DB as Database
participant OAuth as OAuth Provider

```
Note over Beat: Triggered once per hour
Beat->>Broker: Send refresh task
Broker->>Worker: Dispatch task to available Worker

Worker->>DB: Query Tokens about to expire<br/>(remaining validity < 1 hour)
DB-->>Worker: Return list of Tokens to refresh

loop Process each Token
    Worker->>DB: Check if Token has client credentials
    alt Has complete credentials
        Worker->>OAuth: Request new Token using Refresh Token
        OAuth-->>Worker: Return new Access Token
        Worker->>DB: Update Token information
    else Missing credentials
        Worker->>DB: Mark as legacy Token, skip processing
    end
end

Worker->>Broker: Report task completion" %}
```

### 3.2 Query Optimization Strategies

MaiAgent implements several query optimizations to improve Token refresh efficiency:

* **Legacy Token Filtering**: Excludes legacy Tokens that lack client\_id and client\_secret
* **Time Window Queries**: Only queries Tokens expiring within the next hour
* **Batch Processing**: Queries multiple Tokens to refresh at once, reducing database query count
* **Error Handling**: Records detailed error information for failed Token refreshes to facilitate troubleshooting

### 3.3 Error Handling Mechanisms

The Token refresh process may encounter the following error scenarios:

| Error Type                  | Possible Cause                                      | Handling Approach                                      |
| --------------------------- | --------------------------------------------------- | ------------------------------------------------------ |
| **Invalid Refresh Token**   | User has revoked authorization or Token has expired | Mark as requiring re-authorization, notify user        |
| **Network Timeout**         | OAuth service provider is temporarily unavailable   | Automatic retry up to 3 times with exponential backoff |
| **Rate Limiting**           | Exceeded the OAuth service's request rate limit     | Delayed retry to avoid being blocked                   |
| **Client Credential Error** | Incorrect client\_id or client\_secret              | Log error, notify administrator to check configuration |

***

## 4. Task Monitoring and Debugging

### 4.1 Task Status Monitoring

MaiAgent provides multiple ways to monitor Celery task status:

* **Flower Monitoring Dashboard**: Web interface for real-time task execution status and Worker health
* **Logging**: Detailed records of each task's execution time, parameters, and results
* **Metrics Collection**: Integration with Prometheus for collecting task execution metrics such as success rate and execution time
* **Alerting**: Automatic alerts when tasks fail consecutively or execution time is abnormal

### 4.2 Performance Optimization Recommendations

**Worker Pool Configuration**:

* **Concurrency Tuning**: Adjust Worker concurrency based on CPU core count and task type
* **Queue Separation**: Route urgent and non-urgent tasks to different queues
* **Task Priority**: Set higher priority for critical tasks

**Task Design Principles**:

* **Idempotency**: Ensure tasks can be safely retried without side effects
* **Time Sensitivity**: Set reasonable task expiration times to avoid executing stale tasks
* **Batch Processing**: For large data processing, use batching strategies to prevent memory overflow

***

## 5. Technical Advantages of MaiAgent's Celery Configuration

### 5.1 Reliability and Stability

* **Task Persistence**: Task information is stored in the Message Broker and persists even after system restarts
* **Automatic Retry**: Supports automatic retry mechanisms for failed tasks
* **Graceful Shutdown**: Workers complete in-progress tasks before shutting down
* **Health Checks**: Periodic monitoring of Worker and Beat runtime status

### 5.2 Scalability and Performance

* **Horizontal Scaling**: Easily add more Workers to handle increased traffic
* **Task Routing**: Route different task types to dedicated Worker pools
* **Asynchronous Execution**: Does not block the main application, improving system responsiveness
* **Batch Optimization**: Smart batch processing reduces database query count

### 5.3 Operations Friendliness

* **Visual Monitoring**: Flower provides an intuitive monitoring dashboard
* **Detailed Logging**: Complete records of task execution for troubleshooting
* **Dynamic Configuration**: Adjust schedules without restarting the system
* **Alert Integration**: Integrates with enterprise monitoring systems for timely anomaly detection

***

## 6. Related Documentation

* [OAuth 2.0 Integration and Automatic Token Refresh Mechanism](/tech/maiagent-tech-en/advanced-genai-tech/oauth-integration.md) - Learn more about OAuth Token refresh logic
* [Deployment Architecture](/tech/maiagent-tech-en/platform-development/architecture.md) - Understand Celery's role in the overall system architecture

### Reference Links

* [Celery Documentation](https://docs.celeryq.dev/)
* [Django Celery Beat](https://django-celery-beat.readthedocs.io/)
* [Flower - Celery Monitoring Tool](https://flower.readthedocs.io/)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.maiagent.ai/tech/maiagent-tech-en/platform-development/celery-periodic-tasks.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
