Code examplesGeospatial computationTransforms

Transforms

Python

Combine shapefiles and convert to GeoJSON

How do I combine multiple shapefiles and convert them to GeoJSON format?

This code uses the geospatial_tools library to read multiple shapefiles, convert their geometries to GeoJSON format, and combine them into a single PySpark DataFrame. It also computes the centroid of each geometry and converts it to a geohash.

Copied!
1from transforms.api import transform, Input, Output 2from geospatial_tools import geospatial 3from geospatial_tools.functions import clean_geometry, centroid, geojson_to_geohash 4import tempfile 5import shutil 6import geopandas as gpd 7from pyspark.sql import types as T 8from pyspark.sql import functions as F 9import json 10from shapely.geometry import mapping 11 12 13@geospatial() 14@transform( 15 output=Output(), 16 input_data=Input(), 17) 18def compute(ctx, input_data, output): 19 fs = input_data.filesystem() 20 schema = T.StructType([T.StructField("geoshape", T.StringType()), 21 T.StructField("name", T.StringType()), 22 T.StructField("centroid", T.StringType())]) 23 shapefiles = [f.path.replace('.shp', '') for f in fs.ls(glob='*shp')] 24 combined_data = ctx.spark_session.createDataFrame([], schema) 25 for shapefile in shapefiles: # NOQA 26 with tempfile.TemporaryDirectory() as tmp_dir: 27 # Copy all files for the shapefile to the local filesystem 28 # There are multiple files associated with a shapefile, such as .prj and .cpg 29 for shapefile_file in fs.ls(glob=f'{shapefile}.*'): 30 with open(f'{tmp_dir}/{shapefile_file.path}', 'wb') as tmp_file: 31 with fs.open(shapefile_file.path, 'rb') as f: 32 shutil.copyfileobj(f, tmp_file) 33 # Create a GeoJSON geometry column 34 pdf = gpd.read_file(f'{tmp_dir}/{shapefile}.shp') 35 pdf['geometry'] = pdf.geometry.apply(lambda x: json.dumps(mapping(x))) 36 df = ctx.spark_session.createDataFrame(pdf) 37 38 # Convert everything to EPSG:4326 format expected by Foundry 39 crs = gpd.read_file(f'{tmp_dir}/{shapefile}.shp').crs.to_string() 40 df = df.withColumn( 41 "geoshape", 42 clean_geometry('geometry', crs, lat_long=(crs != "EPSG:4326")) 43 ).select("geoshape") 44 df = df.withColumn('name', F.lit(shapefile)) 45 df = df.withColumn('centroid', geojson_to_geohash(centroid('geoshape'))) 46 combined_data = combined_data.unionByName(df) 47 48 return output.write_dataframe(combined_data)
  • Date submitted: 2024-05-23
  • Tags: geospatial, shapefile, geojson, geohash, pyspark, geopandas

Geospatial join with buffer in PySpark

How do I perform a geospatial join with a buffer in PySpark?

This code uses the geospatial_tools library to perform a geospatial join between two datasets, lines, and points, with a 30,000 meters buffer around the points. It then returns a DataFrame with the point_id and line_id.

Copied!
1from pyspark.sql import functions as F 2from transforms.api import configure, transform_df, Input, Output 3from geospatial_tools import geospatial 4from geospatial_tools.functions import buffer, lat_long_to_geometry 5 6@configure(profile=['GEOSPARK']) 7@geospatial() 8@transform_df( 9 Output(), 10 lines=Input(), 11 points=Input() 12) 13def compute(lines, points): 14 lines = lines.select(F.col('id').alias('line_id'), 'geometry') 15 points = points.withColumn( 16 'geometry', lat_long_to_geometry('latitude', 'longitude', 'EPSG:4326') 17 ).withColumn('geometry_buff', buffer('geometry', meters=30000) 18 ).select('point_id', 'geometry_buff') 19 df = points.spatial_join( 20 lines, 21 ('geometry_buff', 'geometry'), 22 'left' 23 ).select(points.point_id, lines.line_id) 24 return df
  • Date submitted: 2024-04-25
  • Tags: geospatial, pyspark, geospatial_tools, buffer, spatial_join