Job Queue

Last Updated on : 2023-12-22 08:11:30download

This topic describes the job queue service.

Concepts

TuyaOS provides a standard data queue service based on callbacks. It is used for asynchronously processing events that need caching and depend on handlers. When there are many events cached and executed in the TuyaOS application, you can use the job queue service for application development.

Features

Run a job immediately, with a delay, or according to a set schedule.

How it works

Initialize job queue

WORKQ_SYSTEM
WORKQ_HIGHTPRI
Initialize job queue
Create thread wq_system and
prioritize THREAD_PRIO_2
Create thread wq_highpri and
prioritize THREAD_PRIO_1
Create queue
Create queue
Initialization completes

Process jobs

workq_initworkq_threadworkq_scheduleInitialize resourcestal_semaphore_wait (Suspend semaphore)tuya_queue_input (Create a member in the linked list,and send semaphore)work_item.cb (Invoke the callback for reading the linked list on receiving semaphore.)workq_initworkq_threadworkq_schedule

Development guide

Runtime environment

  • Development framework

    The general features provided by the Wi-Fi Development Framework.

  • Components

    tal_system_service

  • App version

    No special requirements.

  • Others

    No special requirements.

How to use

WORKQ_HIGHTPRI and WORKQ_SYSTEM are used to enqueue high-priority jobs and medium-priority jobs, respectively. For jobs with a delay, see the example.

Type definition

/**
 * @brief TuyaOS provides you with two workqueue services for convenience.
 */
typedef enum {
    /**
     * low priority workqueue (block operations are allowed)
     */
    WORKQ_SYSTEM,
    /**
     * high priority workqueue (block operations are not allowed)
     */
    WORKQ_HIGHTPRI
}WORKQ_SERVICE_E;

/**
 * @brief delayed work entry definition
 */
typedef struct {
    DELAYED_WORK_HANDLE *delayed_work; // delayed work handle
    WORKQUEUE_CB delayed_work_cb;      // delayed work callback
}DELAYED_WORK_ENTRY_S;

typedef VOID_T* WORKQUEUE_HANDLE;
typedef VOID_T* DELAYED_WORK_HANDLE;

Callback

typedef VOID_T (*WORKQUEUE_CB)(VOID_T *data);

API description

Initialize job queue

/**
 * @brief init ty work queue service
 *
 * @return OPRT_OK on success. Others on error, please refer to tuya_error_code.h
 */
OPERATE_RET tal_workq_init(VOID_T);

Get the job queue handle

/**
 * @brief get the handle of workqueue service
 *
 * @param[in] service the workqueue service
 *
 * @return WORKQUEUE_HANDLE handle of workqueue service
 */
WORKQUEUE_HANDLE tal_workq_get_handle(WORKQ_SERVICE_E service);

Enqueue a job

/**
 * @brief add work to work queue
 *
 * @param[in] service the workqueue service
 * @param[in] cb, call back of work
 * @param[in] data, parameter of call back
 *
 * @return OPRT_OK on success. Others on error, please refer to tuya_error_code.h
 */
OPERATE_RET tal_workq_schedule(WORKQ_SERVICE_E service, WORKQUEUE_CB cb, VOID_T *data);

Enqueue a job with the immediate job dequeued first

/**
 * @brief put work task in workqueue, instant tasks will be dequeued first
 *
 * @param[in] service the workqueue service
 * @param[in] cb the work callback
 * @param[in] data the work data
 *
 * @return OPRT_OK on success. Others on error, please refer to tuya_error_code.h
 */
OPERATE_RET tal_workq_schedule_instant(WORKQ_SERVICE_E service, WORKQUEUE_CB cb, VOID_T *data);

Cancel a job in the queue

/**
 * @brief cancel work task in workqueue
 *
 * @param[in] service the workqueue service
 * @param[in] cb the work callback
 * @param[in] data the work data
 *
 * @return OPRT_OK on success. Others on error, please refer to tuya_error_code.h
 */
OPERATE_RET tal_workq_cancel(WORKQ_SERVICE_E service, WORKQUEUE_CB cb, VOID_T *data);

Get the number of jobs in the queue

/**
 * @brief get current work number in work queue
 *
 * @param[in] service the workqueue service
 *
 * @note This API is used to get the current work number in work queue
 *
 * @return current work number in the work queue
 */
UINT16_T tal_workq_get_num(WORKQ_SERVICE_E service);

Dump all the jobs in the queue

/**
 * @brief dump all work in work queue.
 *
 * @param[in] service the workqueue service
 */
VOID_T tal_workq_dump(WORKQ_SERVICE_E service);

Initialize delayed job queue

/**
 * @brief init delayed work task
 *
 * @param[in] service the workqueue service
 * @param[in] cb the work callback
 * @param[in] data the work data
 * @param[out] delayed_work handle of delayed work
 *
 * @return OPRT_OK on success. Others on error, please refer to tuya_error_code.h
 */
OPERATE_RET tal_workq_init_delayed(WORKQ_SERVICE_E service, WORKQUEUE_CB cb, VOID_T *data, DELAYED_WORK_HANDLE *delayed_work);

Start delay

/**
 * @brief put work task in workqueue after delay
 *
 * @param[in] delayed_work handle of delayed work
 * @param[in] interval number of ms to wait or 0 for immediate execution
 * @param[in] type see @LOOP_TYPE
 *
 * @return OPRT_OK on success. Others on error, please refer to tuya_error_code.h
 */
OPERATE_RET tal_workq_start_delayed(DELAYED_WORK_HANDLE delayed_work,
    TIME_MS interval, LOOP_TYPE type);

Stop delay

/**
 * @brief stop delayed work
 *
 * @param[in] delayed_work handle of delayed work
 *
 * @return OPRT_OK on success. Others on error, please refer to tuya_error_code.h
 */
OPERATE_RET tal_workq_stop_delayed(DELAYED_WORK_HANDLE delayed_work);

Cancel delayed job queue

/**
 * @brief cancel delay work
 *
 * @param[in] delayed_work handle of delayed work
 *
 * @return OPRT_OK on success. Others on error, please refer to tuya_error_code.h
 */
OPERATE_RET tal_workq_cancel_delayed(DELAYED_WORK_HANDLE delayed_work);

Example

// Job queue callback
VOID_T work_queue_cb(VOID_T *data)
{
    return;
}

STATIC VOID_T wq_demo(VOID_T)
{
    OPERATE_RET op_ret = OPRT_OK;
    WORKQUEUE_HANDLE wq_hand = NULL;

    // tal_workq_init is executed when the application calls tuya_iot_init_params
    op_ret = tal_workq_init();
    if (OPRT_OK != op_ret) {
        return;
    }

    op_ret = tal_workq_schedule(WORKQ_HIGHTPRI, work_queue_cb, NULL);
    if (OPRT_OK != op_ret) {
        PR_ERR("bt work queue add fail:%d", op_ret);
        return;
    }

    return;
}

STATIC VOID_T wq_dalayed_demo(VOID_T)
{
    OPERATE_RET op_ret = OPRT_OK;
    DELAYED_WORK_HANDLE dl_wq_hand = NULL;

    op_ret = tal_workq_init_delayed(WORKQ_HIGHTPRI, work_queue_cb, NULL, &dl_wq_hand);
    if (OPRT_OK != op_ret) {
        PR_ERR("op_ret:%d", op_ret);
        return;
    }

    // The task is started after a 1s delay
    op_ret = tal_workq_start_delayed(dl_wq_hand, 1000, LOOP_ONCE);
    if (OPRT_OK != op_ret) {
        PR_ERR("op_ret:%d", op_ret);
        return;
    }
}

Things to note

Try to avoid long blocking queues so that other jobs can be executed properly.

FAQs

What is the maximum number of jobs in a queue?

A maximum number of 100 tasks for each type can be enqueued.