别再手动下载了!用Python脚本批量抓取并处理ESA CCI 300米土地覆盖数据(1992-2022)

发布时间:2026/6/20 8:42:09

别再手动下载了!用Python脚本批量抓取并处理ESA CCI 300米土地覆盖数据(1992-2022) 自动化获取与处理ESA CCI土地覆盖数据的Python实战指南当我们需要分析全球土地覆盖变化趋势时ESA CCI提供的300米分辨率数据集无疑是宝贵资源。但面对1992-2022年长达30年的时间序列数据手动下载和处理不仅耗时耗力还容易出错。本文将分享一套完整的Python自动化解决方案帮助您高效完成从数据获取到预处理的全流程。1. 环境准备与数据源概述在开始自动化流程前我们需要配置合适的Python环境并了解ESA CCI数据的基本特性。这套数据集由欧洲航天局气候变化倡议项目发布包含全球土地覆盖分类信息空间分辨率约为300米时间跨度为1992年至2022年。推荐Python环境配置# 创建conda环境推荐 conda create -n esa_cci python3.9 conda activate esa_cci # 安装核心依赖库 pip install xarray rioxarray rasterio geopandas requests dask netCDF4ESA CCI数据主要分为两个版本v2.0.71992-2015年数据v2.1.12016-2022年数据数据采用NetCDF格式存储每个文件包含以下关键变量lccs_class土地覆盖分类代码主要分析对象lat/lon经纬度坐标time时间维度通常为年度2. 自动化下载ESA CCI数据手动从官网逐个下载数十个数据文件效率极低。我们可以利用Python脚本实现批量下载这里提供两种方法通过官方API和直接HTTP下载。2.1 通过CDS API下载推荐欧洲气候数据存储(CDS)提供了API接口需先注册并获取API密钥import cdsapi c cdsapi.Client() # 下载单一年份数据示例 def download_year(year): c.retrieve( satellite-land-cover, { variable: all, year: str(year), version: v2.1.1 if year 2016 else v2.0.7, format: zip, }, fesa_cci_{year}.zip ) # 批量下载1992-2022年数据 for year in range(1992, 2023): download_year(year)2.2 直接HTTP下载备选方案当API不可用时可直接从镜像站点下载import requests from tqdm import tqdm def download_file(url, save_path): response requests.get(url, streamTrue) total_size int(response.headers.get(content-length, 0)) with open(save_path, wb) as f, tqdm( descsave_path, totaltotal_size, unitiB, unit_scaleTrue ) as bar: for data in response.iter_content(chunk_size1024): size f.write(data) bar.update(size) # 示例下载2020年数据 base_url https://cds.climate.copernicus.eu/data/satellite-land-cover file_url f{base_url}/2020/C3S-LC-L4-LCCS-Map-300m-P1Y-2020-v2.1.1.nc download_file(file_url, esa_cci_2020.nc)3. 批量处理NetCDF数据下载后的NetCDF文件需要转换为更易用的GeoTIFF格式并提取所需信息。下面介绍完整的处理流程。3.1 单文件转换示例import xarray as xr import rioxarray from pathlib import Path def convert_nc_to_tif(nc_path, output_dir): # 读取NetCDF文件 ds xr.open_dataset(nc_path) # 提取土地覆盖分类数据 lcc ds[lccs_class].isel(time0) # 设置坐标参考系统 lcc.rio.set_crs(EPSG:4326) # 构建输出路径 year nc_path.stem.split(_)[-2] # 从文件名提取年份 tif_path output_dir / fesa_cci_{year}.tif # 保存为GeoTIFF lcc.rio.to_raster(tif_path, dtypeuint8) print(f成功转换: {tif_path}) # 使用示例 nc_file Path(esa_cci_2020.nc) output_dir Path(processed_data) output_dir.mkdir(exist_okTrue) convert_nc_to_tif(nc_file, output_dir)3.2 批量处理脚本为提高效率我们可以并行处理多个年份数据from concurrent.futures import ThreadPoolExecutor import os def batch_convert(input_dir, output_dir, max_workers4): input_dir Path(input_dir) output_dir Path(output_dir) output_dir.mkdir(exist_okTrue) nc_files list(input_dir.glob(*.nc)) with ThreadPoolExecutor(max_workersmax_workers) as executor: for nc_file in nc_files: executor.submit(convert_nc_to_tif, nc_file, output_dir) # 使用示例 batch_convert(downloads, processed_data)4. 高级数据处理技巧获得基础土地覆盖数据后我们通常需要进一步处理以满足特定分析需求。4.1 提取特定土地覆盖类型ESA CCI使用LCCS分类系统下表列出主要类别代码分类代码土地覆盖类型10-40农田50-90森林110-130草地/灌丛140-150稀疏植被160-180湿地190城市区域提取特定类别的示例代码import numpy as np def extract_class(input_tif, output_tif, class_codes): 提取指定类别的土地覆盖数据 参数: input_tif: 输入TIFF文件路径 output_tif: 输出TIFF文件路径 class_codes: 需要提取的类别代码列表 with rasterio.open(input_tif) as src: data src.read(1) profile src.profile.copy() # 创建掩膜保留指定类别其他设为nodata mask np.isin(data, class_codes) result np.where(mask, data, profile[nodata]) # 更新元数据 profile.update(dtypeuint8, nodata255) with rasterio.open(output_tif, w, **profile) as dst: dst.write(result, 1) # 示例提取森林类别(代码50-90) extract_class( processed_data/esa_cci_2020.tif, extracted/forest_2020.tif, list(range(50, 91)) )4.2 计算各类别面积比例了解不同土地覆盖类型的面积占比是常见分析需求def calculate_class_area(input_tif, pixel_area_km20.09): 计算各土地覆盖类别的面积及占比 参数: input_tif: 输入TIFF文件路径 pixel_area_km2: 单个像元面积(300m分辨率约0.09km²) with rasterio.open(input_tif) as src: data src.read(1) nodata src.nodata # 排除nodata值 valid_data data[data ! nodata] # 统计各类别像元数 unique, counts np.unique(valid_data, return_countsTrue) # 计算面积 results {} for class_code, count in zip(unique, counts): area_km2 count * pixel_area_km2 results[class_code] { pixel_count: count, area_km2: round(area_km2, 2), percentage: round(count/len(valid_data)*100, 2) } return results # 使用示例 area_stats calculate_class_area(extracted/forest_2020.tif) print(area_stats)5. 时间序列分析与可视化处理完所有年份数据后我们可以分析土地覆盖变化趋势。5.1 构建时间序列数据集import pandas as pd def build_time_series(data_dir, class_codes): 构建指定类别的时间序列数据集 参数: data_dir: 包含各年份TIFF文件的目录 class_codes: 关注的类别代码列表 years [] areas [] for tif_file in Path(data_dir).glob(esa_cci_*.tif): year int(tif_file.stem.split(_)[-1]) stats calculate_class_area(tif_file) # 计算指定类别的总面积 total_area sum( v[area_km2] for k, v in stats.items() if k in class_codes ) years.append(year) areas.append(total_area) return pd.DataFrame({Year: years, Area_km2: areas}) # 示例分析森林面积变化(代码50-90) forest_df build_time_series(processed_data, list(range(50, 91))) forest_df.to_csv(forest_area_timeseries.csv, indexFalse)5.2 可视化土地覆盖变化使用matplotlib绘制变化趋势图import matplotlib.pyplot as plt def plot_timeseries(csv_path, title): df pd.read_csv(csv_path) plt.figure(figsize(12, 6)) plt.plot(df[Year], df[Area_km2], markero, linestyle-) plt.title(title) plt.xlabel(Year) plt.ylabel(Area (km²)) plt.grid(True) # 标注最大/最小值 max_idx df[Area_km2].idxmax() min_idx df[Area_km2].idxmin() plt.annotate( fMax: {df.loc[max_idx, Area_km2]:,.0f} km², xy(df.loc[max_idx, Year], df.loc[max_idx, Area_km2]), xytext(10, 10), textcoordsoffset points ) plt.annotate( fMin: {df.loc[min_idx, Area_km2]:,.0f} km², xy(df.loc[min_idx, Year], df.loc[min_idx, Area_km2]), xytext(10, -20), textcoordsoffset points ) plt.tight_layout() plt.savefig(f{title.replace( , _)}.png, dpi300) plt.close() # 使用示例 plot_timeseries(forest_area_timeseries.csv, Global Forest Area Change (1992-2022))6. 性能优化与错误处理处理大规模地理空间数据时性能优化和健壮性至关重要。6.1 使用Dask处理大数据对于内存不足的情况可以使用Dask进行分块处理import dask.array as da def process_large_nc(nc_path, chunk_size1024): # 使用Dask延迟加载大数据 ds xr.open_dataset(nc_path, chunks{lat: chunk_size, lon: chunk_size}) # Dask操作会延迟执行 lcc ds[lccs_class].isel(time0) # 计算时才会实际加载数据 forest_mask ((lcc 50) (lcc 90)).compute() return forest_mask # 使用示例 forest_mask process_large_nc(esa_cci_2020.nc)6.2 错误处理与重试机制网络请求和文件操作需要完善的错误处理from tenacity import retry, stop_after_attempt, wait_exponential retry(stopstop_after_attempt(3), waitwait_exponential(multiplier1, min4, max10)) def robust_download(url, save_path): try: response requests.get(url, streamTrue, timeout30) response.raise_for_status() with open(save_path, wb) as f: for chunk in response.iter_content(chunk_size8192): f.write(chunk) except Exception as e: print(f下载失败: {str(e)}) if Path(save_path).exists(): Path(save_path).unlink() raise # 使用示例 try: robust_download(file_url, esa_cci_2020.nc) except Exception as e: print(f最终下载失败: {str(e)})在实际项目中这套自动化流程将30年的数据处理时间从数天缩短到几小时且可复用于其他类似数据集。关键是要根据具体需求调整参数并做好异常处理和数据验证。

相关新闻