作者|KIDGINBROOK 更新|潘丽晨
NCCL是英伟达开源的GPU通信库,支持集合通信和点对点通信。
看下官方给的一个demo:
(相关资料图)
在上边的示例中,rank0会执行ncclGetUniqueId获取Id,然后通过mpi广播给其他rank,接下来看下UniqueId是怎么产生的。 然后看下ncclInit。 首先执行initEnv,设置环境变量。 然后执行initNet,用来初始化nccl所需要的网络,包括两个,一个是bootstrap网络,另外一个是数据通信网络,bootstrap网络主要用于初始化时交换一些简单的信息,比如每个机器的ip端口,由于数据量很小,而且主要是在初始化阶段执行一次,因此bootstrap使用的是tcp;而通信网络是用于实际数据的传输,因此会优先使用rdma(支持gdr的话会优先使用gdr)。 bootstrapNetInit就是bootstrap网络的初始化,主要就是通过findInterfaces遍历机器上所有的网卡信息,通过prefixList匹配选择使用哪些网卡,将可用网卡的信息保存下来,将ifa_name保存到全局的bootstrapNetIfNames,ip地址保存到全局bootstrapNetIfAddrs,默认除了docker和lo其他的网卡都可以使用。 例如在测试机器上有三张网卡,分别是xgbe0、xgbe1、xgbe2,那么就会把这三个ifaname和对应的ip地址保存下来,另外nccl提供了环境变量NCCL_SOCKET_IFNAME可以用来指定想用的网卡名,例如通过export NCCL_SOCKET_IFNAME=xgbe0来指定使用xgbe0,其实就是通过prefixList来匹配做到的。 开始初始化通信网络。 ncclNet_t结构体是一系列的函数指针,比如初始化,发送,接收等;socket,IB等通信方式都实现了自己的ncclNet_t,如ncclNetSocket,ncclNetIb,初始化通信网络的过程就是依次看哪个通信模式可用,然后赋值给全局的ncclNet。 首先执行initNetPlugin,查看是否有libnccl-net.so,测试环境没有这个so,所以直接返回。 然后尝试使用IB网络: 首先执行ncclNetIb的init函数,就是ncclIbInit。 首先第三行通过wrap_ibv_symbols加载动态库libibverbs.so,然后获取动态库的各个函数。 然后通过wrap_ibv_fork_init避免fork引起rdma网卡读写出错。 后面会讲到ib网络也会用到socket进行带外网络的传输,所以这里也通过findInterfaces获取一个可用的网卡保存到ncclIbIfAddr。 通过ibv_get_device_list获取所有rdma设备到devices中,遍历devices的每个device,因为每个HCA可能有多个物理port,所以对每个device遍历每一个物理port,获取每个port的信息。 然后将相关信息保存到全局的ncclIbDevs中,比如是哪个device的哪个port,使用的是IB还是ROCE,device的pci路径,maxqp,device的name等,注意这里也有类似bootstrap网络NCCL_SOCKET_IFNAME的环境变量,叫NCCL_IB_HCA,可以指定使用哪个IB HCA。 到这里整个初始化的过程就完成了,一句话总结就是,获取了当前机器上所有可用的IB网卡和普通以太网卡之后保存下来。 然后开始生成UniqueId。 ncclNetHandle_t也是一个字符数组,然后执行bootstrapNetListen。 依次看下这三个函数,通过bootstrapNetGetSocketAddr获取一个可用的ip地址。 此时dev是0, bootstrapNetIfs是初始化bootstrap网络的时候一共找到了几个可用的网卡,这里就是获取了第0个可用的ip地址。 然后通过bootstrapNetNewComm创建bootstrapNetComm,bootstrapNetComm其实就是fd,bootstrapNetNewComm其实就是new了一个bootstrapNetComm。 通过createListenSocket启动socker server。 创建监听fd,ip由localaddr指定,初始端口为0,bind时随机找一个可用端口,并通过getsockname(sockfd, &localAddr->sa, &size)将ip端口写回到localaddr,这里localaddr就是UniqueId。 到这里UniqueId也就产生了,其实就是当前机器的ip和port。 (本文经授权后由OneFlow发布。原文:https://blog.csdn.net/KIDGIN7439/article/details/126712106?spm=1001.2014.3001.5502) 其他人都在看 One-YOLOv5 v1.2.0发布 超越ChatGPT:大模型的智能极限 对抗软件系统复杂性③:恰当分层,不多不少 ChatGPT作者Schulman:我们成功的秘密武器 比快更快,开源Stable Diffusion刷新作图速度 OneEmbedding:单卡训练TB级推荐模型不是梦 GLM训练加速:性能最高提升3倍,显存节省1/3 欢迎Star、试用OneFlow新版本:#include
ncclResult_t ncclGetUniqueId(ncclUniqueId* out) { NCCLCHECK(ncclInit()); NCCLCHECK(PtrCheck(out, "GetUniqueId", "out")); return bootstrapGetUniqueId(out);}
ncclResult_t initNet() { // Always initialize bootstrap network NCCLCHECK(bootstrapNetInit()); NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet)); if (ncclNet != NULL) return ncclSuccess; if (initNet(&ncclNetIb) == ncclSuccess) { ncclNet = &ncclNetIb; } else { NCCLCHECK(initNet(&ncclNetSocket)); ncclNet = &ncclNetSocket; } return ncclSuccess;}
static int findInterfaces(const char* prefixList, char* names, union socketAddress *addrs, int sock_family, int maxIfNameSize, int maxIfs) { struct netIf userIfs[MAX_IFS]; bool searchNot = prefixList && prefixList[0] == "^"; if (searchNot) prefixList++; bool searchExact = prefixList && prefixList[0] == "="; if (searchExact) prefixList++; int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS); int found = 0; struct ifaddrs *interfaces, *interface; getifaddrs(&interfaces); for (interface = interfaces; interface && found < maxIfs; interface = interface->ifa_next) { if (interface->ifa_addr == NULL) continue; int family = interface->ifa_addr->sa_family; if (family != AF_INET && family != AF_INET6) continue; if (sock_family != -1 && family != sock_family) continue; if (family == AF_INET6) { struct sockaddr_in6* sa = (struct sockaddr_in6*)(interface->ifa_addr); if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue; } if (!(matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) { continue; } bool duplicate = false; for (int i = 0; i < found; i++) { if (strcmp(interface->ifa_name, names+i*maxIfNameSize) == 0) { duplicate = true; break; } } if (!duplicate) { strncpy(names+found*maxIfNameSize, interface->ifa_name, maxIfNameSize); int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6); memcpy(addrs+found, interface->ifa_addr, salen); found++; } } freeifaddrs(interfaces); return found;}
ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) { static int shownIbHcaEnv = 0; if(wrap_ibv_symbols() != ncclSuccess) { return ncclInternalError; } if (ncclParamIbDisable()) return ncclInternalError; if (ncclNIbDevs == -1) { pthread_mutex_lock(&ncclIbLock); wrap_ibv_fork_init(); if (ncclNIbDevs == -1) { ncclNIbDevs = 0; if (findInterfaces(ncclIbIfName, &ncclIbIfAddr, MAX_IF_NAME_SIZE, 1) != 1) { WARN("NET/IB : No IP interface found."); return ncclInternalError; } // Detect IB cards int nIbDevs; struct ibv_device** devices; // Check if user defined which IB device:port to use char* userIbEnv = getenv("NCCL_IB_HCA"); if (userIbEnv != NULL && shownIbHcaEnv++ == 0) INFO(NCCL_NET|NCCL_ENV, "NCCL_IB_HCA set to %s", userIbEnv); struct netIf userIfs[MAX_IB_DEVS]; bool searchNot = userIbEnv && userIbEnv[0] == "^"; if (searchNot) userIbEnv++; bool searchExact = userIbEnv && userIbEnv[0] == "="; if (searchExact) userIbEnv++; int nUserIfs = parseStringList(userIbEnv, userIfs, MAX_IB_DEVS); if (ncclSuccess != wrap_ibv_get_device_list(&devices, &nIbDevs)) return ncclInternalError; for (int d=0; d
ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) { ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id; void* listenComm; NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0, netHandle, &listenComm)); pthread_t thread; pthread_create(&thread, NULL, bootstrapRoot, listenComm); return ncclSuccess;}
static ncclResult_t bootstrapNetListen(int dev, ncclNetHandle_t* netHandle, void** listenComm) { union socketAddress* connectAddr = (union socketAddress*) netHandle; static_assert(sizeof(union socketAddress) < NCCL_NET_HANDLE_MAXSIZE, "union socketAddress size is too large"); // if dev >= 0, listen based on dev if (dev >= 0) { NCCLCHECK(bootstrapNetGetSocketAddr(dev, connectAddr)); } else if (dev == findSubnetIf) { ... } // Otherwise, handle stores a local address struct bootstrapNetComm* comm; NCCLCHECK(bootstrapNetNewComm(&comm)); NCCLCHECK(createListenSocket(&comm->fd, connectAddr)); *listenComm = comm; return ncclSuccess;}
static ncclResult_t bootstrapNetGetSocketAddr(int dev, union socketAddress* addr) { if (dev >= bootstrapNetIfs) return ncclInternalError; memcpy(addr, bootstrapNetIfAddrs+dev, sizeof(*addr)); return ncclSuccess;}
struct bootstrapNetComm { int fd; };
static ncclResult_t createListenSocket(int *fd, union socketAddress *localAddr) { /* IPv4/IPv6 support */ int family = localAddr->sa.sa_family; int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6); /* Create socket and bind it to a port */ int sockfd = socket(family, SOCK_STREAM, 0); if (sockfd == -1) { WARN("Net : Socket creation failed : %s", strerror(errno)); return ncclSystemError; } if (socketToPort(&localAddr->sa)) { // Port is forced by env. Make sure we get the port. int opt = 1;#if defined(SO_REUSEPORT) SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt");#else SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), "setsockopt");#endif } // localAddr port should be 0 (Any port) SYSCHECK(bind(sockfd, &localAddr->sa, salen), "bind"); /* Get the assigned Port */ socklen_t size = salen; SYSCHECK(getsockname(sockfd, &localAddr->sa, &size), "getsockname"); #ifdef ENABLE_TRACE char line[1024]; TRACE(NCCL_INIT|NCCL_NET,"Listening on socket %s", socketToString(&localAddr->sa, line));#endif /* Put the socket in listen mode * NB: The backlog will be silently truncated to the value in /proc/sys/net/core/somaxconn */ SYSCHECK(listen(sockfd, 16384), "listen"); *fd = sockfd; return ncclSuccess;}