基础组件(线程池、内存池、异步请求池、Mysql连接池)

2023-09-21 17:17:12

1、概述

池化技术,减少了资源创建次数,提高了程序响应性能,特别是在高并发场景下,当程序7*24小时运行,创建资源可能会出现耗时较长和失败等问题,池化技术,主要是程序初始化之前创建多个可用连接,集中管理起来,后续直接使用,使用完并归还。

2、线程池

线程池主要解决问题:
1、解决线程创建和消耗所浪费的资源和时间
2、控制线程数量,减少资源竞争

代码解读

一共由三个接口组成:
1、CreatePthreadPool创建工作线程
初始化线程池结构下的条件变量和互斥锁,创建多个线程通过头插法,插入到线程池结构worker成员中
2、thread_func线程入口函数
进入while循环,第一件事先拿锁,然后再去判断,任务队列中是否有数据,要是没有则进入循环中的条件等待,进入条件等待会先释放锁,所以初始化完成之后所有线程全部都会进入条件等待,然后一旦条件满足则会去拿锁。
3、threadPoolQueue将任务加入到任务链表中
先加锁,然后通过头插法插入到线程池结构中的wait_jobs成员中,通知条件变量,然后解锁
4、threadPoolShutdown回收线程池中线程
将工作线程中的标识位全部置为一,然后通知所有线程,然后等待回收所有线程并释放每个节点

总结

线程池就是一个很典型的生产者消费者模型,不断地向任务队列中投放任务,工作线程不断地去队伍队列中去取任务消费掉。

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>

#define LL_ADD(item, list) do { 	\
	item->prev = NULL;				\
	item->next = list;				\
	list = item;					\
} while(0)

#define LL_REMOVE(item, list) do {						\
	if (item->prev != NULL) item->prev->next = item->next;	\
	if (item->next != NULL) item->next->prev = item->prev;	\
	if (list == item) list = item->next;					\
	item->prev = item->next = NULL;							\
} while(0)

// 工作线程结构体
typedef struct NWORKER{
    pthread_t id;               // 线程id
    int terminate;              // 终止条件
    struct NTHREADPOLL *worqueue;
    struct NWORKER *prev;       // 前向指针
    struct NWORKER *next;       // 后项指针
}nWorker;

// 任务队列
typedef struct NJOB{
    void (*job_function)(struct NJOB* job);
    void* user_data;
    struct NJOB *prev;
    struct NJOB *next;
}nJob;

typedef struct NTHREADPOLL{
    struct NWORKER *workers;
    struct NJOB *wait_jobs;

    pthread_cond_t cond;
    pthread_mutex_t mtx;
}nThreadpoll;

// 操作线程池函数
#define MAX_PTHREAD 80
#define MAX_JOB 8

// 毁掉函数
void job_fun(nJob *job){
    if(job == NULL) return;
    int *data = (int*)job->user_data;
    printf("user_data:%d-thread_id:%ld,line:%d\n",*data,pthread_self(),__LINE__);

    free(job->user_data);
    job->user_data = NULL;
    free(job);
    job = NULL;
    return;
}

// 线程入口函数
void* thread_func(void* data){
    nWorker *worker = (nWorker*)data;
    while (1)
    {
        pthread_mutex_lock(&(worker->worqueue->mtx));
        while(worker->worqueue->wait_jobs == NULL){
            if(worker->terminate) break;
            pthread_cond_wait(&(worker->worqueue->cond),&(worker->worqueue->mtx));
        }
        if(worker->terminate){
            pthread_mutex_unlock(&(worker->worqueue->mtx));
            break;
        }
        nJob *job = worker->worqueue->wait_jobs;
        if(job != NULL)
            LL_REMOVE(job,worker->worqueue->wait_jobs);
        pthread_mutex_unlock(&(worker->worqueue->mtx));
        if(job == NULL) continue;
        job->job_function(job);
    }
    // free(worker);
    // worker = NULL;
    pthread_exit(0);
}

// 创造线程
int CreatePthreadPool(nThreadpoll* threadpoll,int max_pthread)
{
    if(max_pthread <= 0) max_pthread = MAX_PTHREAD;
    memset(threadpoll,0x00,sizeof(nThreadpoll));
    
	pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
	memcpy(&threadpoll->cond, &blank_cond, sizeof(threadpoll->cond));
	
	pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
	memcpy(&threadpoll->mtx, &blank_mutex, sizeof(threadpoll->mtx));
    int i;
    for(i=0; i<max_pthread; i++){
         
        nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
        if(worker == NULL){
            perror("threadpoll->workers");
            return -1;
        }
        memset(worker,0x00,sizeof(nWorker));
        worker->worqueue = threadpoll;
        worker->terminate = 0;
        int ret = pthread_create(&(worker->id),NULL,thread_func,(void*)worker);
        if(ret != 0){
            perror("pthread_create");
            free(worker);
            worker = NULL;
            return -1;
        }
        LL_ADD(worker,worker->worqueue->workers);
    }
    return 0;
}

// 向线程中添加任务,唤醒一个线程去处理
void threadPoolQueue(nThreadpoll* threadpool,nJob *job){
    pthread_mutex_lock(&(threadpool->mtx));
    
    LL_ADD(job,threadpool->wait_jobs);
    
    pthread_cond_signal(&(threadpool->cond));
    pthread_mutex_unlock(&(threadpool->mtx));
    return;
}

void threadPoolShutdown(nThreadpoll *workerqueue){
    nWorker *woker = NULL;
    for(woker = workerqueue->workers;woker!=NULL;woker = woker->next){
        woker->terminate = 1;
    }
	pthread_cond_broadcast(&workerqueue->cond);
    struct NWORKER *tmp = workerqueue->workers;
    while (tmp)
    {
        pthread_join(tmp->id,NULL);
        if(tmp){
            free(tmp);
            tmp = NULL;
        }
        tmp = tmp->next;
    }
    pthread_cond_destroy(&workerqueue->cond);
    pthread_mutex_destroy(&workerqueue->mtx);
    workerqueue->workers = NULL;
	workerqueue->wait_jobs = NULL;
}

int main(){
    nThreadpoll workerQueue;
    if(CreatePthreadPool(&workerQueue,MAX_PTHREAD) != 0){
        perror("CreatePthreadPool faile");
        return -1;
    }
    int i;
    for(i=0;i<MAX_JOB;i++){
        nJob *job = (nJob*)malloc(sizeof(nJob));
        if(job == NULL){
            perror("nJob malloc faile");
            exit(-1);
        }
        job->job_function = job_fun;
        job->user_data = (int*)malloc(sizeof(int));
        *(int*)job->user_data = i;
        threadPoolQueue(&workerQueue,job);
    }
    getchar();
    return 0;
}

2、异步请求池

当我们向数据库或者DNS服务器发送请求时,需要同步等待他的返回结果,这个过程是比较耗时的,所以衍生出异步发起请求,当发出请求之后,不等待请求的响应,而是通过一个线程去监听,主线程去做其他事情,当请求响应之后如何去做其他事情。
在这里插入图片描述
在这里插入图片描述

同步向DNS服务器发送请求




#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <errno.h>
#include <fcntl.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include <sys/epoll.h>
#include <netdb.h>
#include <arpa/inet.h>

#include <pthread.h>


#define DNS_SVR				"114.114.114.114"


#define DNS_HOST			0x01
#define DNS_CNAME			0x05



struct dns_header {
	unsigned short id;
	unsigned short flags;
	unsigned short qdcount;
	unsigned short ancount;
	unsigned short nscount;
	unsigned short arcount;
};

struct dns_question {
	int length;
	unsigned short qtype;
	unsigned short qclass;
	char *qname;
};

struct dns_item {
	char *domain;
	char *ip;
};

int dns_create_header(struct dns_header *header) {

	if (header == NULL) return -1;
	memset(header, 0, sizeof(struct dns_header));

	srandom(time(NULL));

	header->id = random();
	header->flags |= htons(0x0100);
	header->qdcount = htons(1);

	return 0;
}

int dns_create_question(struct dns_question *question, const char *hostname) {

	if (question == NULL) return -1;
	memset(question, 0, sizeof(struct dns_question));

	question->qname = (char*)malloc(strlen(hostname) + 2);
	if (question->qname == NULL) return -2;

	question->length = strlen(hostname) + 2;

	question->qtype = htons(1);
	question->qclass = htons(1);

	const char delim[2] = ".";

	char *hostname_dup = strdup(hostname);
	char *token = strtok(hostname_dup, delim);

	char *qname_p = question->qname;

	while (token != NULL) {

		size_t len = strlen(token);

		*qname_p = len;
		qname_p ++;

		strncpy(qname_p, token, len+1);
		qname_p += len;

		token = strtok(NULL, delim);
	}

	free(hostname_dup);

	return 0;
	
}

int dns_build_request(struct dns_header *header, struct dns_question *question, char *request) {

	int header_s = sizeof(struct dns_header);
	int question_s = question->length + sizeof(question->qtype) + sizeof(question->qclass);

	int length = question_s + header_s;

	int offset = 0;
	memcpy(request+offset, header, sizeof(struct dns_header));
	offset += sizeof(struct dns_header);

	memcpy(request+offset, question->qname, question->length);
	offset += question->length;

	memcpy(request+offset, &question->qtype, sizeof(question->qtype));
	offset += sizeof(question->qtype);

	memcpy(request+offset, &question->qclass, sizeof(question->qclass));

	return length;
	
}

static int is_pointer(int in) {
	return ((in & 0xC0) == 0xC0);
}


static void dns_parse_name(unsigned char *chunk, unsigned char *ptr, char *out, int *len) {

	int flag = 0, n = 0, alen = 0;
	char *pos = out + (*len);

	while (1) {

		flag = (int)ptr[0];
		if (flag == 0) break;

		if (is_pointer(flag)) {
			
			n = (int)ptr[1];
			ptr = chunk + n;
			dns_parse_name(chunk, ptr, out, len);
			break;
			
		} else {

			ptr ++;
			memcpy(pos, ptr, flag);
			pos += flag;
			ptr += flag;

			*len += flag;
			if ((int)ptr[0] != 0) {
				memcpy(pos, ".", 1);
				pos += 1;
				(*len) += 1;
			}
		}
	
	}
	
}


static int dns_parse_response(char *buffer, struct dns_item **domains) {

	int i = 0;
	unsigned char *ptr = buffer;

	ptr += 4;
	int querys = ntohs(*(unsigned short*)ptr);

	ptr += 2;
	int answers = ntohs(*(unsigned short*)ptr);

	ptr += 6;
	for (i = 0;i < querys;i ++) {
		while (1) {
			int flag = (int)ptr[0];
			ptr += (flag + 1);

			if (flag == 0) break;
		}
		ptr += 4;
	}

	char cname[128], aname[128], ip[20], netip[4];
	int len, type, ttl, datalen;

	int cnt = 0;
	struct dns_item *list = (struct dns_item*)calloc(answers, sizeof(struct dns_item));
	if (list == NULL) {
		return -1;
	}

	for (i = 0;i < answers;i ++) {
		
		bzero(aname, sizeof(aname));
		len = 0;

		dns_parse_name(buffer, ptr, aname, &len);
		ptr += 2;

		type = htons(*(unsigned short*)ptr);
		ptr += 4;

		ttl = htons(*(unsigned short*)ptr);
		ptr += 4;

		datalen = ntohs(*(unsigned short*)ptr);
		ptr += 2;

		if (type == DNS_CNAME) {

			bzero(cname, sizeof(cname));
			len = 0;
			dns_parse_name(buffer, ptr, cname, &len);
			ptr += datalen;
			
		} else if (type == DNS_HOST) {

			bzero(ip, sizeof(ip));

			if (datalen == 4) {
				memcpy(netip, ptr, datalen);
				inet_ntop(AF_INET , netip , ip , sizeof(struct sockaddr));

				printf("%s has address %s\n" , aname, ip);
				printf("\tTime to live: %d minutes , %d seconds\n", ttl / 60, ttl % 60);

				list[cnt].domain = (char *)calloc(strlen(aname) + 1, 1);
				memcpy(list[cnt].domain, aname, strlen(aname));
				
				list[cnt].ip = (char *)calloc(strlen(ip) + 1, 1);
				memcpy(list[cnt].ip, ip, strlen(ip));
				
				cnt ++;
			}
			
			ptr += datalen;
		}
	}

	*domains = list;
	ptr += 2;

	return cnt;
	
}


int dns_client_commit(const char *domain) {

	int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
	if (sockfd < 0) {
		perror("create socket failed\n");
		exit(-1);
	}

	printf("url:%s\n", domain);

	struct sockaddr_in dest;
	bzero(&dest, sizeof(dest));
	dest.sin_family = AF_INET;
	dest.sin_port = htons(53);
	dest.sin_addr.s_addr = inet_addr(DNS_SVR);
	
	int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest));
	printf("connect :%d\n", ret);

	struct dns_header header = {0};
	dns_create_header(&header);

	struct dns_question question = {0};
	dns_create_question(&question, domain);

	char request[1024] = {0};
	int req_len = dns_build_request(&header, &question, request);
	int slen = sendto(sockfd, request, req_len, 0, (struct sockaddr*)&dest, sizeof(struct sockaddr));

	char buffer[1024] = {0};
	struct sockaddr_in addr;
	size_t addr_len = sizeof(struct sockaddr_in);
		
	int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);
		
	printf("recvfrom n : %d\n", n);
	struct dns_item *domains = NULL;
	dns_parse_response(buffer, &domains);

	return 0;
}

char *domain[] = {
//	"www.ntytcp.com",
	"bojing.wang",
	"www.baidu.com",
	"tieba.baidu.com",
	"news.baidu.com",
	"zhidao.baidu.com",
	"music.baidu.com",
	"image.baidu.com",
	"v.baidu.com",
	"map.baidu.com",
	"baijiahao.baidu.com",
	"xueshu.baidu.com",
	"cloud.baidu.com",
	"www.163.com",
	"open.163.com",
	"auto.163.com",
	"gov.163.com",
	"money.163.com",
	"sports.163.com",
	"tech.163.com",
	"edu.163.com",
	"www.taobao.com",
	"q.taobao.com",
	"sf.taobao.com",
	"yun.taobao.com",
	"baoxian.taobao.com",
	"www.tmall.com",
	"suning.tmall.com",
	"www.tencent.com",
	"www.qq.com",
	"www.aliyun.com",
	"www.ctrip.com",
	"hotels.ctrip.com",
	"hotels.ctrip.com",
	"vacations.ctrip.com",
	"flights.ctrip.com",
	"trains.ctrip.com",
	"bus.ctrip.com",
	"car.ctrip.com",
	"piao.ctrip.com",
	"tuan.ctrip.com",
	"you.ctrip.com",
	"g.ctrip.com",
	"lipin.ctrip.com",
	"ct.ctrip.com"
};



int main(int argc, char *argv[]) {

	int count = sizeof(domain) / sizeof(domain[0]);
	int i = 0;

	for (i = 0;i < count;i ++) {
		dns_client_commit(domain[i]);
	}

	getchar();
	
}

异步发送请求

1、dns_async_client_init初始化函数
创建epoll、创建线程,用来监控dns的响应消息
2、dns_async_client_commit创建连接,发送请求,将fd加入到epoll中进行监听
3、dns_async_client_proc线程入口函数
通过epoll_wait进行监听,调用取出数据,调用对应的回调函数,epoll_ctl删除节点。




#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <errno.h>
#include <fcntl.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include <sys/epoll.h>
#include <netdb.h>
#include <arpa/inet.h>

#include <pthread.h>


#define DNS_SVR				"114.114.114.114"


#define DNS_HOST			0x01
#define DNS_CNAME			0x05

#define ASYNC_CLIENT_NUM		1024


struct dns_header {
	unsigned short id;
	unsigned short flags;
	unsigned short qdcount;
	unsigned short ancount;
	unsigned short nscount;
	unsigned short arcount;
};

struct dns_question {
	int length;
	unsigned short qtype;
	unsigned short qclass;
	char *qname;
};

struct dns_item {
	char *domain;
	char *ip;
};

typedef void (*async_result_cb)(struct dns_item *list, int count);


struct async_context {
	int epfd;
};

struct ep_arg {
	int sockfd;
	async_result_cb cb;
};

int dns_create_header(struct dns_header *header) {

	if (header == NULL) return -1;
	memset(header, 0, sizeof(struct dns_header));

	srandom(time(NULL));

	header->id = random();
	header->flags |= htons(0x0100);
	header->qdcount = htons(1);

	return 0;
}

int dns_create_question(struct dns_question *question, const char *hostname) {

	if (question == NULL) return -1;
	memset(question, 0, sizeof(struct dns_question));

	question->qname = (char*)malloc(strlen(hostname) + 2);
	if (question->qname == NULL) return -2;

	question->length = strlen(hostname) + 2;

	question->qtype = htons(1);
	question->qclass = htons(1);

	const char delim[2] = ".";

	char *hostname_dup = strdup(hostname);
	char *token = strtok(hostname_dup, delim);

	char *qname_p = question->qname;

	while (token != NULL) {

		size_t len = strlen(token);

		*qname_p = len;
		qname_p ++;

		strncpy(qname_p, token, len+1);
		qname_p += len;

		token = strtok(NULL, delim);
	}

	free(hostname_dup);

	return 0;
	
}

int dns_build_request(struct dns_header *header, struct dns_question *question, char *request) {

	int header_s = sizeof(struct dns_header);
	int question_s = question->length + sizeof(question->qtype) + sizeof(question->qclass);

	int length = question_s + header_s;

	int offset = 0;
	memcpy(request+offset, header, sizeof(struct dns_header));
	offset += sizeof(struct dns_header);

	memcpy(request+offset, question->qname, question->length);
	offset += question->length;

	memcpy(request+offset, &question->qtype, sizeof(question->qtype));
	offset += sizeof(question->qtype);

	memcpy(request+offset, &question->qclass, sizeof(question->qclass));

	return length;
	
}

static int is_pointer(int in) {
	return ((in & 0xC0) == 0xC0);
}

static int set_block(int fd, int block) {
	int flags = fcntl(fd, F_GETFL, 0);
	if (flags < 0) return flags;

	if (block) {        
		flags &= ~O_NONBLOCK;    
	} else {        
		flags |= O_NONBLOCK;    
	}

	if (fcntl(fd, F_SETFL, flags) < 0) return -1;

	return 0;
}

static void dns_parse_name(unsigned char *chunk, unsigned char *ptr, char *out, int *len) {

	int flag = 0, n = 0, alen = 0;
	char *pos = out + (*len);

	while (1) {

		flag = (int)ptr[0];
		if (flag == 0) break;

		if (is_pointer(flag)) {
			
			n = (int)ptr[1];
			ptr = chunk + n;
			dns_parse_name(chunk, ptr, out, len);
			break;
			
		} else {

			ptr ++;
			memcpy(pos, ptr, flag);
			pos += flag;
			ptr += flag;

			*len += flag;
			if ((int)ptr[0] != 0) {
				memcpy(pos, ".", 1);
				pos += 1;
				(*len) += 1;
			}
		}
	
	}
	
}


static int dns_parse_response(char *buffer, struct dns_item **domains) {

	int i = 0;
	unsigned char *ptr = buffer;

	ptr += 4;
	int querys = ntohs(*(unsigned short*)ptr);

	ptr += 2;
	int answers = ntohs(*(unsigned short*)ptr);

	ptr += 6;
	for (i = 0;i < querys;i ++) {
		while (1) {
			int flag = (int)ptr[0];
			ptr += (flag + 1);

			if (flag == 0) break;
		}
		ptr += 4;
	}

	char cname[128], aname[128], ip[20], netip[4];
	int len, type, ttl, datalen;

	int cnt = 0;
	struct dns_item *list = (struct dns_item*)calloc(answers, sizeof(struct dns_item));
	if (list == NULL) {
		return -1;
	}

	for (i = 0;i < answers;i ++) {
		
		bzero(aname, sizeof(aname));
		len = 0;

		dns_parse_name(buffer, ptr, aname, &len);
		ptr += 2;

		type = htons(*(unsigned short*)ptr);
		ptr += 4;

		ttl = htons(*(unsigned short*)ptr);
		ptr += 4;

		datalen = ntohs(*(unsigned short*)ptr);
		ptr += 2;

		if (type == DNS_CNAME) {

			bzero(cname, sizeof(cname));
			len = 0;
			dns_parse_name(buffer, ptr, cname, &len);
			ptr += datalen;
			
		} else if (type == DNS_HOST) {

			bzero(ip, sizeof(ip));

			if (datalen == 4) {
				memcpy(netip, ptr, datalen);
				inet_ntop(AF_INET , netip , ip , sizeof(struct sockaddr));

				printf("%s has address %s\n" , aname, ip);
				printf("\tTime to live: %d minutes , %d seconds\n", ttl / 60, ttl % 60);

				list[cnt].domain = (char *)calloc(strlen(aname) + 1, 1);
				memcpy(list[cnt].domain, aname, strlen(aname));
				
				list[cnt].ip = (char *)calloc(strlen(ip) + 1, 1);
				memcpy(list[cnt].ip, ip, strlen(ip));
				
				cnt ++;
			}
			
			ptr += datalen;
		}
	}

	*domains = list;
	ptr += 2;

	return cnt;
	
}


int dns_client_commit(const char *domain) {

	int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
	if (sockfd < 0) {
		perror("create socket failed\n");
		exit(-1);
	}

	printf("url:%s\n", domain);

	set_block(sockfd, 0); //nonblock

	struct sockaddr_in dest;
	bzero(&dest, sizeof(dest));
	dest.sin_family = AF_INET;
	dest.sin_port = htons(53);
	dest.sin_addr.s_addr = inet_addr(DNS_SVR);
	
	int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest));
	//printf("connect :%d\n", ret);

	struct dns_header header = {0};
	dns_create_header(&header);

	struct dns_question question = {0};
	dns_create_question(&question, domain);

	char request[1024] = {0};
	int req_len = dns_build_request(&header, &question, request);
	int slen = sendto(sockfd, request, req_len, 0, (struct sockaddr*)&dest, sizeof(struct sockaddr));

	while (1) {
		char buffer[1024] = {0};
		struct sockaddr_in addr;
		size_t addr_len = sizeof(struct sockaddr_in);
	
		int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);
		if (n <= 0) continue;
		
		printf("recvfrom n : %d\n", n);
		struct dns_item *domains = NULL;
		dns_parse_response(buffer, &domains);

		break;
	}

	return 0;
}

void dns_async_client_free_domains(struct dns_item *list, int count) {
	int i = 0;

	for (i = 0;i < count;i ++) {
		free(list[i].domain);
		free(list[i].ip);
	}

	free(list);
}


//dns_async_client_proc()
//epoll_wait
//result callback
static void* dns_async_client_proc(void *arg) {
	struct async_context *ctx = (struct async_context*)arg;

	int epfd = ctx->epfd;

	while (1) {

		struct epoll_event events[ASYNC_CLIENT_NUM] = {0};

		int nready = epoll_wait(epfd, events, ASYNC_CLIENT_NUM, -1);
		if (nready < 0) {
			if (errno == EINTR || errno == EAGAIN) {
				continue;
			} else {
				break;
			}
		} else if (nready == 0) {
			continue;
		}

		printf("nready:%d\n", nready);
		int i = 0;
		for (i = 0;i < nready;i ++) {

			struct ep_arg *data = (struct ep_arg*)events[i].data.ptr;
			int sockfd = data->sockfd;

			char buffer[1024] = {0};
			struct sockaddr_in addr;
			size_t addr_len = sizeof(struct sockaddr_in);
			int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);

			struct dns_item *domain_list = NULL;
			int count = dns_parse_response(buffer, &domain_list);

			data->cb(domain_list, count); //call cb
			
			int ret = epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
			//printf("epoll_ctl DEL --> sockfd:%d\n", sockfd);

			close(sockfd); /

			dns_async_client_free_domains(domain_list, count);
			free(data);

		}
		
	}
	
}



//dns_async_client_init()
//epoll init
//thread init
struct async_context *dns_async_client_init(void) {

	int epfd = epoll_create(1); // 
	if (epfd < 0) return NULL;

	struct async_context *ctx = calloc(1, sizeof(struct async_context));
	if (ctx == NULL) {
		close(epfd);
		return NULL;
	}
	ctx->epfd = epfd;

	pthread_t thread_id;
	int ret = pthread_create(&thread_id, NULL, dns_async_client_proc, ctx);
	if (ret) {
		perror("pthread_create");
		return NULL;
	}
	usleep(1); //child go first

	return ctx;
}


//dns_async_client_commit(ctx, domain)
//socket init
//dns_request
//sendto dns send
int dns_async_client_commit(struct async_context* ctx, const char *domain, async_result_cb cb) {

	int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
	if (sockfd < 0) {
		perror("create socket failed\n");
		exit(-1);
	}

	printf("url:%s\n", domain);

	set_block(sockfd, 0); //nonblock

	struct sockaddr_in dest;
	bzero(&dest, sizeof(dest));
	dest.sin_family = AF_INET;
	dest.sin_port = htons(53);
	dest.sin_addr.s_addr = inet_addr(DNS_SVR);
	
	int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest));
	//printf("connect :%d\n", ret);

	struct dns_header header = {0};
	dns_create_header(&header);

	struct dns_question question = {0};
	dns_create_question(&question, domain);

	char request[1024] = {0};
	int req_len = dns_build_request(&header, &question, request);
	int slen = sendto(sockfd, request, req_len, 0, (struct sockaddr*)&dest, sizeof(struct sockaddr));

	struct ep_arg *eparg = (struct ep_arg*)calloc(1, sizeof(struct ep_arg));
	if (eparg == NULL) return -1;
	eparg->sockfd = sockfd;
	eparg->cb = cb;

	struct epoll_event ev;
	ev.data.ptr = eparg;
	ev.events = EPOLLIN;

	ret = epoll_ctl(ctx->epfd, EPOLL_CTL_ADD, sockfd, &ev); 
	//printf(" epoll_ctl ADD: sockfd->%d, ret:%d\n", sockfd, ret);

	return ret;


	
}


char *domain[] = {
	"www.ntytcp.com",
	"bojing.wang",
	"www.baidu.com",
	"tieba.baidu.com",
	"news.baidu.com",
	"zhidao.baidu.com",
	"music.baidu.com",
	"image.baidu.com",
	"v.baidu.com",
	"map.baidu.com",
	"baijiahao.baidu.com",
	"xueshu.baidu.com",
	"cloud.baidu.com",
	"www.163.com",
	"open.163.com",
	"auto.163.com",
	"gov.163.com",
	"money.163.com",
	"sports.163.com",
	"tech.163.com",
	"edu.163.com",
	"www.taobao.com",
	"q.taobao.com",
	"sf.taobao.com",
	"yun.taobao.com",
	"baoxian.taobao.com",
	"www.tmall.com",
	"suning.tmall.com",
	"www.tencent.com",
	"www.qq.com",
	"www.aliyun.com",
	"www.ctrip.com",
	"hotels.ctrip.com",
	"hotels.ctrip.com",
	"vacations.ctrip.com",
	"flights.ctrip.com",
	"trains.ctrip.com",
	"bus.ctrip.com",
	"car.ctrip.com",
	"piao.ctrip.com",
	"tuan.ctrip.com",
	"you.ctrip.com",
	"g.ctrip.com",
	"lipin.ctrip.com",
	"ct.ctrip.com"
};

static void dns_async_client_result_callback(struct dns_item *list, int count) {
	int i = 0;

	for (i = 0;i < count;i ++) {
		printf("name:%s, ip:%s\n", list[i].domain, list[i].ip);
	}
}


int main(int argc, char *argv[]) {
#if 0
	dns_client_commit(argv[1]);
#else

	struct async_context *ctx = dns_async_client_init();
	if (ctx == NULL) return -2;

	int count = sizeof(domain) / sizeof(domain[0]);
	int i = 0;

	for (i = 0;i < count;i ++) {
		dns_async_client_commit(ctx, domain[i], dns_async_client_result_callback);
		//sleep(2);
	}

	getchar();
#endif
	
}

3、内存池

内存池主要是解决,频繁开辟和释放内存,造成的内存碎片问题,大量的内存碎片,会导致我们分配内存失败。
内存池是通过,申请比较大的内存块,后面慢慢的去使用他,避免重复申请和释放内存带来的效率消耗和大量内存碎片




#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

#include <fcntl.h>



#define MP_ALIGNMENT       		32
#define MP_PAGE_SIZE			4096
#define MP_MAX_ALLOC_FROM_POOL	(MP_PAGE_SIZE-1)

#define mp_align(n, alignment) (((n)+(alignment-1)) & ~(alignment-1))
#define mp_align_ptr(p, alignment) (void *)((((size_t)p)+(alignment-1)) & ~(alignment-1))





struct mp_large_s {
	struct mp_large_s *next;
	void *alloc;
};

struct mp_node_s {

	unsigned char *last;
	unsigned char *end;
	
	struct mp_node_s *next;
	size_t failed;
};

struct mp_pool_s {

	size_t max;

	struct mp_node_s *current;
	struct mp_large_s *large;

	struct mp_node_s head[0];

};

struct mp_pool_s *mp_create_pool(size_t size);
void mp_destory_pool(struct mp_pool_s *pool);
void *mp_alloc(struct mp_pool_s *pool, size_t size);
void *mp_nalloc(struct mp_pool_s *pool, size_t size);
void *mp_calloc(struct mp_pool_s *pool, size_t size);
void mp_free(struct mp_pool_s *pool, void *p);


struct mp_pool_s *mp_create_pool(size_t size) {

	struct mp_pool_s *p;
	int ret = posix_memalign((void **)&p, MP_ALIGNMENT, size + sizeof(struct mp_pool_s) + sizeof(struct mp_node_s));
	if (ret) {
		return NULL;
	}
	
	p->max = (size < MP_MAX_ALLOC_FROM_POOL) ? size : MP_MAX_ALLOC_FROM_POOL;
	p->current = p->head;
	p->large = NULL;

	p->head->last = (unsigned char *)p + sizeof(struct mp_pool_s) + sizeof(struct mp_node_s);
	p->head->end = p->head->last + size;

	p->head->failed = 0;

	return p;

}

void mp_destory_pool(struct mp_pool_s *pool) {

	struct mp_node_s *h, *n;
	struct mp_large_s *l;

	for (l = pool->large; l; l = l->next) {
		if (l->alloc) {
			free(l->alloc);
		}
	}

	h = pool->head->next;

	while (h) {
		n = h->next;
		free(h);
		h = n;
	}

	free(pool);

}

void mp_reset_pool(struct mp_pool_s *pool) {

	struct mp_node_s *h;
	struct mp_large_s *l;

	for (l = pool->large; l; l = l->next) {
		if (l->alloc) {
			free(l->alloc);
		}
	}

	pool->large = NULL;

	for (h = pool->head; h; h = h->next) {
		h->last = (unsigned char *)h + sizeof(struct mp_node_s);
	}

}

static void *mp_alloc_block(struct mp_pool_s *pool, size_t size) {

	unsigned char *m;
	struct mp_node_s *h = pool->head;
	size_t psize = (size_t)(h->end - (unsigned char *)h);
	
	int ret = posix_memalign((void **)&m, MP_ALIGNMENT, psize);
	if (ret) return NULL;

	struct mp_node_s *p, *new_node, *current;
	new_node = (struct mp_node_s*)m;

	new_node->end = m + psize;
	new_node->next = NULL;
	new_node->failed = 0;

	m += sizeof(struct mp_node_s);
	m = mp_align_ptr(m, MP_ALIGNMENT);
	new_node->last = m + size;

	current = pool->current;

	for (p = current; p->next; p = p->next) {
		if (p->failed++ > 4) {
			current = p->next;
		}
	}
	p->next = new_node;

	pool->current = current ? current : new_node;

	return m;

}

static void *mp_alloc_large(struct mp_pool_s *pool, size_t size) {

	void *p = malloc(size);
	if (p == NULL) return NULL;

	size_t n = 0;
	struct mp_large_s *large;
	for (large = pool->large; large; large = large->next) {
		if (large->alloc == NULL) {
			large->alloc = p;
			return p;
		}
		if (n ++ > 3) break;
	}

	large = mp_alloc(pool, sizeof(struct mp_large_s));
	if (large == NULL) {
		free(p);
		return NULL;
	}

	large->alloc = p;
	large->next = pool->large;
	pool->large = large;

	return p;
}

void *mp_memalign(struct mp_pool_s *pool, size_t size, size_t alignment) {

	void *p;
	
	int ret = posix_memalign(&p, alignment, size);
	if (ret) {
		return NULL;
	}

	struct mp_large_s *large = mp_alloc(pool, sizeof(struct mp_large_s));
	if (large == NULL) {
		free(p);
		return NULL;
	}

	large->alloc = p;
	large->next = pool->large;
	pool->large = large;

	return p;
}




void *mp_alloc(struct mp_pool_s *pool, size_t size) {

	unsigned char *m;
	struct mp_node_s *p;

	if (size <= pool->max) {

		p = pool->current;

		do {
			
			m = mp_align_ptr(p->last, MP_ALIGNMENT);
			if ((size_t)(p->end - m) >= size) {
				p->last = m + size;
				return m;
			}
			p = p->next;
		} while (p);

		return mp_alloc_block(pool, size);
	}

	return mp_alloc_large(pool, size);
	
}


void *mp_nalloc(struct mp_pool_s *pool, size_t size) {

	unsigned char *m;
	struct mp_node_s *p;

	if (size <= pool->max) {
		p = pool->current;

		do {
			m = p->last;
			if ((size_t)(p->end - m) >= size) {
				p->last = m+size;
				return m;
			}
			p = p->next;
		} while (p);

		return mp_alloc_block(pool, size);
	}

	return mp_alloc_large(pool, size);
	
}

void *mp_calloc(struct mp_pool_s *pool, size_t size) {

	void *p = mp_alloc(pool, size);
	if (p) {
		memset(p, 0, size);
	}

	return p;
	
}

void mp_free(struct mp_pool_s *pool, void *p) {

	struct mp_large_s *l;
	for (l = pool->large; l; l = l->next) {
		if (p == l->alloc) {
			free(l->alloc);
			l->alloc = NULL;

			return ;
		}
	}
	
}


int main(int argc, char *argv[]) {

	int size = 1 << 12;

	struct mp_pool_s *p = mp_create_pool(size);

	int i = 0;
	for (i = 0;i < 10;i ++) {

		void *mp = mp_alloc(p, 512);
//		mp_free(mp);
	}

	//printf("mp_create_pool: %ld\n", p->max);
	printf("mp_align(123, 32): %d, mp_align(17, 32): %d\n", mp_align(24, 32), mp_align(17, 32));
	//printf("mp_align_ptr(p->current, 32): %lx, p->current: %lx, mp_align(p->large, 32): %lx, p->large: %lx\n", mp_align_ptr(p->current, 32), p->current, mp_align_ptr(p->large, 32), p->large);

	int j = 0;
	for (i = 0;i < 5;i ++) {

		char *pp = mp_calloc(p, 32);
		for (j = 0;j < 32;j ++) {
			if (pp[j]) {
				printf("calloc wrong\n");
			}
			printf("calloc success\n");
		}
	}

	//printf("mp_reset_pool\n");

	for (i = 0;i < 5;i ++) {
		void *l = mp_alloc(p, 8192);
		mp_free(p, l);
	}

	mp_reset_pool(p);

	//printf("mp_destory_pool\n");
	for (i = 0;i < 58;i ++) {
		mp_alloc(p, 256);
	}

	mp_destory_pool(p);

	return 0;

}




更多推荐

mybatis简介&idea导入mybatis

mybatis简介Mybatis是Apache的一个Java开源项目,是一个支持动态Sql语句的持久层框架。Mybatis可以将Sql语句配置在XML文件中,避免将Sql语句硬编码在Java类中。与JDBC相比:1)Mybatis通过参数映射方式,可以将参数灵活的配置在SQL语句中的配置文件中,避免在Java类中配置参

设计模式:解释器模式

目录组件代码示例优缺点总结解释器模式(InterpreterPattern)是一种行为型设计模式,它定义了一种语言的文法,并且定义了该语言中各个元素的解释器。通过使用解释器,可以解析和执行特定的语言表达式。解释器模式的核心思想是将一个语言的文法表示为一个类的层次结构,并使用该类的实例来表示语言中的各个元素。每个元素都有

优化代码,提升代码性能

文章目录一、方法1.尽量指定类、方法的final修饰符二、变量1.循环内不要不断创建对象引用2.基本类型转换成字符串3.如果变量的初值会被覆盖,就没有必要给变量赋初值4.尽量使用基本数据类型,避免不必要的装箱、拆箱和空指针判断三、常量1.将常量声明为staticfinal,并以大写命名2.禁止使用JSON转化对象四、对

nvme prp模型代码处理流程分析

以下函数是prp相关的源码。/**prp模型,除了第一个dmaaddr不是page_size对齐的其余的dmaaddr都要求是page_size对齐的*/staticblk_status_tnvme_pci_setup_prps(structnvme_dev*dev,structrequest*req,structnv

Google Data Fusion构建数据ETL任务

Google云平台提供了一个DataFusion的产品,是基于开源的CDAP做的一个图形化的编辑工具,可以很方便的来完成数据处理的任务,而无需编写代码。假设我们现在要构建一个ETL的任务,从Kafka中消费一些数据,经过处理之后把数据存放到Bigquery中。首先我们要准备一些测试数据发送到Kafka。这里我是在GKE

2023年腾讯云轻量应用服务器16核32G28M配置测评

腾讯云轻量应用服务器16核32G28M配置优惠价3468元15个月(支持免费续3个月/送同配置3个月),轻量应用服务器具有100%CPU性能,系统盘为380GBSSD盘,28M带宽下载速度3584KB/秒,月流量6000GB,折合每天200GB流量,超出月流量包的流量按照0.8元每GB的价格支付流量费,地域节点可选广州

【自学开发之旅】Flask-restful-Jinjia页面编写template-回顾(五)

restful是web编程里重要的概念–一种接口规范也是一种接口设计风格设计接口:要考虑:数据返回、接收数据的方式、url、方法统一风格rest–表现层状态转移web–每一类数据–资源资源通过http的动作来实现状态转移GET、PUT、POST、DELETEpath组成:/{version}/{resources}/{

分布式运用之企业级日志ELFK+logstash的过滤模块

一、ELFK集群部署(Filebeat+ELK)在搭建ELK的基础上安装Filebeat服务,Filebeat服务可以布置在以下任意一台主机,本次实验将布置在apache服务器的节点上步骤一:安装Filebeat(在apache节点操作)#上传软件包filebeat-6.7.2-linux-x86_64.tar.gz到

面向对象进阶

文章目录面向对象进阶一.static1.静态变量2.静态方法3.static的注意事项二.继承1.概述2.特点3.子类可以继承父类中的内容4.继承中成员变量的访问特点5.继承中成员方法的访问特点6.继承中构造方法的访问特点7.this和super使用总结三.多态1.认识多态2.多态中调用成员的特点3.多态的优势和弊端四

Bigemap如何添加谷歌历史影像

工具Bigemapgisoffice地图软件BIGEMAPGISOffice-全能版BigemapAPP_卫星地图APP_高清卫星地图APP很多粉丝私信都在问怎么才可以看到谷歌的历史影像,其实这个图源目前是没有对大陆网络ip进行开放,所以如果需要查看,也是需要看你当前的网络是否允许查看,如果可以查看的话,就可以通过bi

AI也需要透明度?是的,需要

文章目录什么是AI透明度为什么需要AI透明度AI透明度的弱点如何做好AI透明度推荐阅读什么是AI透明度AI透明度指的是人工智能(AI)系统的工作原理和决策过程能够被理解、解释和追踪的程度。它包括以下几个方面:可解释性(Explainability)追踪性(Traceability)公平性和偏见检测(Fairnessan

热文推荐