1、集群在啟動時會初始化出異步線程連接隊列,其中cluster->async_max_conns_per_node為每個階段異步最大連接數,可配。
2、
1)先申請as_event_loop_capacity(為)個異步隊列空間
2)將最大連接數分配到每個異步隊列中。Max=最大連接/隊列個數,rem=0
3、初始化每個隊列。隊列為空
4、
//1、
as_node_create:
if (as_event_loop_capacity > 0) {
node->async_conn_qs=
as_node_create_async_queues(cluster->async_max_conns_per_node);
node->pipe_conn_qs= as_node_create_async_queues(cluster->pipe_max_conns_per_node);
}
//2、
as_node_create_async_queues(uint32_tmax_conns_per_node):
as_queue* queues = cf_malloc(sizeof(as_queue)* as_event_loop_capacity);
uint32_t max = max_conns_per_node /as_event_loop_capacity;
uint32_t rem = max_conns_per_node - (max *as_event_loop_capacity);
uint32_t capacity;
for (uint32_t i = 0; i <as_event_loop_capacity; i++) {
capacity= i < rem ? max + 1 : max;
as_queue_init(&queues[i],sizeof(void*), capacity);
}
3、
Bool as_queue_init(as_queue* queue,uint32_t item_size, uint32_t capacity)
{
queue->data= cf_malloc(capacity * item_size);
if(! queue->data) {
returnfalse;
}
queue->capacity= capacity;//capacity為每個隊列的容量
queue->head= queue->tail = 0;//隊列為空
queue->item_size= item_size;//item大學
queue->total= 0;
queue->flags= ITEMS_ON_HEAP;
returntrue;
}
4、異步,連接server之前,需要先創建eventloop。該函數入參即為as_event_loop_capacity。
每個event_loop有一個as_ev_worker線程
as_event_create_loops
5、aerospike_key_put_async異步接口,有個入參指定event_loop,
調用aerospike_key_put_async_ex
5.1、異步接口會調用as_async_write_command_create,拼接CMD
該cmd->event_loop為接口中傳進的值,如果不從接口傳入,采用輪詢方式分配
cmd->event_loop= as_event_assign(event_loop);
returnevent_loop ? event_loop : as_event_loop_get();
5.2、異步接口拼接完cmd后,調用as_event_command_execute(cmd,err)去執行
5.3、as_event_command_execute執行方法:
1)如果已經在該線程中,則直接執行
2)否則保存到event_loop隊列,并喚起event_loop線程進行處理
if (cmd->event_loop->thread == pthread_self()) {
//We are already in event loop thread, so start PRocessing.
as_event_command_begin(cmd);
}
else{
if(cmd->timeout_ms) {
//Store current time in first 8 bytes which is not used yet.
*(uint64_t*)cmd= cf_getms();
}
//Send command through queue so it can be executed in event loop thread.
if(! as_event_send(cmd)) {
as_event_command_free(cmd);
returnas_error_set_message(err, AEROSPIKE_ERR_CLIENT, "Failed to queuecommand");
}
}
as_event_send
as_event_loop*event_loop = cmd->event_loop;
pthread_mutex_lock(&event_loop->lock);
boolqueued = as_queue_push(&event_loop->queue, &cmd);
pthread_mutex_unlock(&event_loop->lock);
if(queued) {
ev_async_send(event_loop->loop,&event_loop->wakeup);
}
6、執行event的函數as_event_command_begin
6.1、獲取連接
as_connection_statusstatus = cmd->pipe_listener != NULL ? as_pipe_get_connection(cmd) :as_event_get_connection(cmd);
6.2、執行
if(status == AS_CONNECTION_FROM_POOL) {
as_ev_command_write_start(cmd);
}
elseif (status == AS_CONNECTION_NEW) {
as_ev_connect(cmd);//創建新連接后,進行連接
}
6.3、獲取連接的方法:
1)as_queue*queue = &cmd->node->async_conn_qs[cmd->event_loop->index];得到異步連接隊列
2)如果隊列不為空,則從中pop一個連接(怎么不使用鎖保護?)
3)否則新創建一個連接。這里有個限制。
每新創建一個連接queue->total++;
當queue->total>=queue->capicity的時候報連接耗盡錯誤。
6.4、
aerospike_key_put_async->aerospike_key_put_async_ex->as_async_write_command_create
中有個回調函數as_event_command_parse_header
6.5、該回調函數調用as_event_response_complete(cmd);將異步新創建的連接放到池子里
as_event_response_complete->as_event_put_connection
新聞熱點
疑難解答