
aclnnMatmulAllReduceV3【免费下载链接】ops-transformer本项目是CANN提供的transformer类大模型算子库实现网络在NPU上加速计算。项目地址: https://gitcode.com/cann/ops-transformer 查看源码产品支持情况产品是否支持Ascend 950PR/Ascend 950DT√Atlas A3 训练系列产品/Atlas A3 推理系列产品×Atlas A2 训练系列产品/Atlas A2 推理系列产品√Atlas 200I/500 A2 推理产品×Atlas 推理系列产品×Atlas 训练系列产品×说明使用该接口时请确保驱动固件包和CANN包都为配套的8.0.RC2版本或者配套的更高版本否则将会引发报错比如Bus Error等。功能说明接口功能完成MatMul计算与AllReduce通信融合兼容aclnnMatmulAllReduceV2支持的功能新增通信引擎参数commMode支持自选通信引擎。计算公式$$ output AllReduce(x1 x2 bias x3) $$函数原型每个算子分为两段式接口必须先调用aclnnMatmulAllReduceV3GetWorkspaceSize接口获取计算所需workspace大小以及包含了算子计算流程的执行器再调用aclnnMatmulAllReduceV3接口执行计算。aclnnStatus aclnnMatmulAllReduceV3GetWorkspaceSize( const aclTensor *x1, const aclTensor *x2, const aclTensor *bias, const aclTensor *x3, const char *group, const char *reduceOp, const char *commMode, int64_t commTurn, int64_t streamMode, const aclTensor *output, uint64_t *workspaceSize, aclOpExecutor **executor)aclnnStatus aclnnMatmulAllReduceV3( void *workspace, uint64_t workspaceSize, aclOpExecutor *executor, const aclrtStream stream)aclnnMatmulAllReduceV3GetWorkspaceSize参数说明参数名输入/输出描述使用说明数据类型数据格式维度(shape)非连续Tensorx1输入MatMul计算的左矩阵即计算公式中的x1。当前版本仅支持二维或者三维输入。支持不转置场景。FLOAT16、BFLOAT16ND2-3×x2输入MatMul计算的右矩阵即计算公式中的x2。当前版本仅支持两维输入。支持转置/不转置场景。ND格式下支持最后两轴转置情况下的非连续的tensor其他非连续tensor不支持FLOAT16、BFLOAT16ND2×x3输入MatMul计算后的add计算即计算公式中的x3。shape与MatMul计算后的shape一致。FLOAT16、BFLOAT16ND2√bias输入对应计算公式中的bias偏移。当前版本仅支持一维输入。FLOAT16、BFLOAT16ND0-1√group输入通信域名称。通过Hccl提供的接口“extern HcclResult HcclGetCommName(HcclComm comm, char* commName);”获取其中commName即为group。String---reduceOp输入reduce操作类型。当前版本仅支持输入sum。String---commMode输入通信引擎参数。用于指定通信引擎根据芯片型号有不同输入限制。通信引擎简单介绍和输入约束详见约束说明。String---commTurn输入通信数据切分数即总数据量/单次通信量。当前版本仅支持输入0。INT64---streamMode输入AscendCL流模式的枚举。当前版本仅支持枚举值1。INT64---output输出MatMul计算与AllReduce通信的结果即计算公式中的output。output的维数与x1一致。FLOAT16、BFLOAT16ND2-3√workspaceSize输出返回需要在Device侧申请的workspace大小。-----executor输出返回op执行器包含了算子计算流程。-----返回值返回aclnnStatus状态码具体参见aclnn返回码。第一阶段接口完成入参校验出现以下场景报错返回值错误码描述ACLNN_ERR_PARAM_NULLPTR161001传入的x1、x2或output是空指针。ACLNN_ERR_PARAM_INVALID161002x1、x2、bias、x3或output的数据类型不在支持的范围之内。reduceOp、streamMode不在合法范围内。x1、x2、bias、x3或output的shape不符合约束要求。aclnnMatmulAllReduceV3参数说明参数名输入/输出描述workspace输入在Device侧申请的workspace内存地址。workspaceSize输入在Device侧申请的workspace大小由第一段接口aclnnMatmulAllReduceV3GetWorkspaceSize获取。executor输入op执行器包含了算子计算流程。stream输入指定执行任务的Stream。返回值返回aclnnStatus状态码具体参见aclnn返回码。约束说明通信引擎commMode支持度Atlas A2 训练系列产品/Atlas A2 推理系列产品 目前不支持指定通信引擎commMode仅允许输入为空字符串默认使用AICPU。Ascend 950PR/Ascend 950DT 目前通信引擎支持AICPU和CCUcommMode允许输入为ai_cpu、ccu和空字符串前两者直接指定AICPU和CCU空字符串表示自适应模式优先选择最佳的通信引擎当前版本默认使用CCU作为通信引擎。同一条通信链路内只能选择同一种通信引擎。AICPU和CCU通信引擎简单介绍AICPU不占用计算核通信效率高但通信静态开销较大对小数据量通信场景不友好。适用于大数据高带宽场景。CCU能够减少访存带宽与计算核占用但受限于片上资源支持的通信域数量有限。适用于高带宽、低时延的通信场景。更详细的通信引擎介绍请参考通信引擎。确定性计算Atlas A2 训练系列产品/Atlas A2 推理系列产品 aclnnMatmulAllReduceV3默认非确定性实现支持通过配置HCCL_DETERMINISTIC环境变量为true开启确定性计算。Ascend 950PR/Ascend 950DT aclnnMatmulAllReduceV3默认确定性实现。增量场景不使能MC2全量场景使能MC2。输入x1可为二维或者三维其shape为(b, s, k)或者(m, k)。x2必须是二维其shape为(k, n)轴满足mm算子入参要求k轴相等。bias若非空其shape为(n)。b*s、m、k、n的值均不得超过2147483647(INT32_MAX)。当输入x1的shape为(b, s, k)时输出output的shape为(b, s, n)当输入x1的shape为(m, k)时输出output的shape为(m, n)。x1、x2、bias计算输入的数据类型要和output计算输出的数据类型一致传入的x1、x2、x3或者output不为空指针。仅支持HCCS链路all mesh组网。Atlas A2 训练系列产品/Atlas A2 推理系列产品 支持1、2、4、8卡。Ascend 950PR/Ascend 950DT 支持1、2、4、8、16、32、64卡。Atlas A2 训练系列产品/Atlas A2 推理系列产品 一个模型中的通算融合MC2算子仅支持相同通信域。空tensor支持度支持k为0的场景输出为bias x3。支持bs/m/n为0此时传入的输出也应该是空tensor此场景不进入kernel计算直接返回。调用示例示例代码如下仅供参考具体编译和执行过程请参考编译与运行样例。说明本示例代码调用了部分HCCL集合通信库接口HcclGetCommName、HcclCommInitAll、HcclCommDestroy, 请参考HCCL API (C)。Atlas A2 训练系列产品/Atlas A2 推理系列产品 、 Ascend 950PR/Ascend 950DT #include iostream #include vector #include thread #include hccl/hccl.h #include aclnn/opdev/fp16_t.h #include aclnnop/aclnn_matmul_all_reduce_v2.h int ndev 2; #define CHECK_RET(cond, return_expr) \ do { \ if (!(cond)) { \ return_expr; \ } \ } while (0) #define LOG_PRINT(message, ...) \ do { \ printf(message, ##__VA_ARGS__); \ } while (0) int64_t GetShapeSize(const std::vectorint64_t shape) { int64_t shapeSize 1; for (auto i: shape) { shapeSize * i; } return shapeSize; } templatetypename T int CreateAclTensor(const std::vectorT hostData, const std::vectorint64_t shape, void **deviceAddr, aclDataType dataType, aclTensor **tensor) { auto size GetShapeSize(shape) * sizeof(T); // 调用aclrtMalloc申请device侧内存 auto ret aclrtMalloc(deviceAddr, size, ACL_MEM_MALLOC_HUGE_FIRST); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(aclrtMalloc failed. ERROR: %d\n, ret); return ret); // 调用aclrtMemcpy将host侧数据拷贝到device侧内存上 ret aclrtMemcpy(*deviceAddr, size, hostData.data(), size, ACL_MEMCPY_HOST_TO_DEVICE); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(aclrtMemcpy failed. ERROR: %d\n, ret); return ret); // 计算连续tensor的strides std::vectorint64_t strides(shape.size(), 1); for (int64_t i shape.size() - 2; i 0; i--) { strides[i] shape[i 1] * strides[i 1]; } // 调用aclCreateTensor接口创建aclTensor *tensor aclCreateTensor(shape.data(), shape.size(), dataType, strides.data(), 0, aclFormat::ACL_FORMAT_ND, shape.data(), shape.size(), *deviceAddr); return 0; } struct Args { uint32_t rankId; HcclComm hcclComm; aclrtStream stream; aclrtContext context; }; int launchOneThreadMatmulAllReduce(Args args) { int ret; ret aclrtSetCurrentContext(args.context); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(aclrtSetCurrentContext failed. ERROR: %d\n, ret); return ret); char hcom_name[128]; ret HcclGetCommName(args.hcclComm, hcom_name); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT([ERROR] HcclGetCommName failed. ret %d \n, ret); return -1); LOG_PRINT([INFO] rank %d hcom: %s stream: %p, context : %p\n, args.rankId, hcom_name, args.stream, args.context); std::vectorint64_t x1Shape {32, 64}; std::vectorint64_t x2Shape {64, 128}; std::vectorint64_t biasShape {128}; std::vectorint64_t x3Shape {32, 128}; std::vectorint64_t outShape {32, 128}; void *x1DeviceAddr nullptr; void *x2DeviceAddr nullptr; void *biasDeviceAddr nullptr; void *x3DeviceAddr nullptr; void *outDeviceAddr nullptr; aclTensor *x1 nullptr; aclTensor *x2 nullptr; aclTensor *bias nullptr; aclTensor *x3 nullptr; aclTensor *out nullptr; int64_t commTurn 0; int64_t streamMode 1; uint64_t workspaceSize 0; aclOpExecutor *executor; void *workspaceAddr nullptr; long long x1ShapeSize GetShapeSize(x1Shape); long long x2ShapeSize GetShapeSize(x2Shape); long long biasShapeSize GetShapeSize(biasShape); long long x3ShapeSize GetShapeSize(x3Shape); long long outShapeSize GetShapeSize(outShape); std::vectorop::fp16_t x1HostData(x1ShapeSize, 1); std::vectorop::fp16_t x2HostData(x2ShapeSize, 1); std::vectorop::fp16_t biasHostData(biasShapeSize, 1); std::vectorop::fp16_t x3HostData(x3ShapeSize, 1); std::vectorop::fp16_t outHostData(outShapeSize, 0); // 创建 tensor ret CreateAclTensor(x1HostData, x1Shape, x1DeviceAddr, aclDataType::ACL_FLOAT16, x1); CHECK_RET(ret ACL_SUCCESS, return ret); ret CreateAclTensor(x2HostData, x2Shape, x2DeviceAddr, aclDataType::ACL_FLOAT16, x2); CHECK_RET(ret ACL_SUCCESS, return ret); ret CreateAclTensor(biasHostData, biasShape, biasDeviceAddr, aclDataType::ACL_FLOAT16, bias); CHECK_RET(ret ACL_SUCCESS, return ret); ret CreateAclTensor(x3HostData, x3Shape, x3DeviceAddr, aclDataType::ACL_FLOAT16, x3); CHECK_RET(ret ACL_SUCCESS, return ret); ret CreateAclTensor(outHostData, outShape, outDeviceAddr, aclDataType::ACL_FLOAT16, out); CHECK_RET(ret ACL_SUCCESS, return ret); // 调用第一段接口, x3位置传入out ret aclnnMatmulAllReduceV3GetWorkspaceSize(x1, x2, bias, x3, hcom_name, sum, , commTurn, streamMode, out, workspaceSize, executor); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(aclnnMatmulAllReduceV3GetWorkspaceSize failed. ERROR: %d\n, ret); return ret); // 根据第一段接口计算出的workspaceSize申请device内存 if (workspaceSize 0) { ret aclrtMalloc(workspaceAddr, workspaceSize, ACL_MEM_MALLOC_HUGE_FIRST); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(allocate workspace failed. ERROR: %d\n, ret); return ret); } // 调用第二段接口 ret aclnnMatmulAllReduceV3(workspaceAddr, workspaceSize, executor, args.stream); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(aclnnMatmulAllReduceV3 failed. ERROR: %d\n, ret); return ret); //固定写法同步等待任务执行结束 ret aclrtSynchronizeStreamWithTimeout(args.stream, 10000); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(aclrtSynchronizeStream failed. ERROR: %d\n, ret); return ret); LOG_PRINT(device%d aclnnMatmulAllReduceV3 execute success \n, args.rankId); // 释放device资源需要根据具体API的接口定义修改 if (x1 ! nullptr) { aclDestroyTensor(x1); } if (x2 ! nullptr) { aclDestroyTensor(x2); } if (bias ! nullptr) { aclDestroyTensor(bias); } if (x3 ! nullptr) { aclDestroyTensor(x3); } if (out ! nullptr) { aclDestroyTensor(out); } if (x1DeviceAddr ! nullptr) { aclrtFree(x1DeviceAddr); } if (x2DeviceAddr ! nullptr) { aclrtFree(x2DeviceAddr); } if (biasDeviceAddr ! nullptr) { aclrtFree(biasDeviceAddr); } if (x3DeviceAddr ! nullptr) { aclrtFree(x3DeviceAddr); } if (outDeviceAddr ! nullptr) { aclrtFree(outDeviceAddr); } if (workspaceSize 0) { aclrtFree(workspaceAddr); } aclrtDestroyStream(args.stream); HcclCommDestroy(args.hcclComm); aclrtDestroyContext(args.context); aclrtResetDevice(args.rankId); return 0; } int main(int argc, char *argv[]) { int ret; int32_t devices[ndev]; for (int i 0; i ndev; i) { devices[i] i; } HcclComm comms[128]; ret aclInit(nullptr); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(aclInit failed. ERROR: %d\n, ret); return ret); // 初始化集合通信域 for (int i 0; i ndev; i) { ret aclrtSetDevice(devices[i]); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(aclrtSetDevice failed. ERROR: %d\n, ret); return ret); } ret HcclCommInitAll(ndev, devices, comms); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(HcclCommInitAll failed. ERROR: %d\n, ret); return ret); Args args[ndev]; aclrtStream stream[ndev]; aclrtContext context[ndev]; for (uint32_t rankId 0; rankId ndev; rankId) { ret aclrtSetDevice(rankId); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(aclrtSetDevice failed. ERROR: %d\n, ret); return ret); ret aclrtCreateContext(context[rankId], rankId); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(aclrtCreateContext failed. ERROR: %d\n, ret); return ret); ret aclrtCreateStream(stream[rankId]); CHECK_RET(ret ACL_SUCCESS, LOG_PRINT(aclrtCreateStream failed. ERROR: %d\n, ret); return ret); } // 启动多线程 std::vectorstd::unique_ptrstd::thread threads(ndev); for (uint32_t rankId 0; rankId ndev; rankId) { args[rankId].rankId rankId; args[rankId].hcclComm comms[rankId]; args[rankId].stream stream[rankId]; args[rankId].context context[rankId]; threads[rankId].reset(new(std::nothrow) std::thread(launchOneThreadMatmulAllReduce, std::ref(args[rankId]))); } for (uint32_t rankId 0; rankId ndev; rankId) { threads[rankId]-join(); } aclFinalize(); return 0; }【免费下载链接】ops-transformer本项目是CANN提供的transformer类大模型算子库实现网络在NPU上加速计算。项目地址: https://gitcode.com/cann/ops-transformer创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考