How do I combine multiple shapefiles and convert them to GeoJSON format?
This code uses the geospatial_tools
library to read multiple shapefiles, convert their geometries to GeoJSON format, and combine them into a single PySpark DataFrame. It also computes the centroid of each geometry and converts it to a geohash.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
from transforms.api import transform, Input, Output from geospatial_tools import geospatial from geospatial_tools.functions import clean_geometry, centroid, geojson_to_geohash import tempfile import shutil import geopandas as gpd from pyspark.sql import types as T from pyspark.sql import functions as F import json from shapely.geometry import mapping @geospatial() @transform( output=Output(), input_data=Input(), ) def compute(ctx, input_data, output): fs = input_data.filesystem() schema = T.StructType([T.StructField("geoshape", T.StringType()), T.StructField("name", T.StringType()), T.StructField("centroid", T.StringType())]) shapefiles = [f.path.replace('.shp', '') for f in fs.ls(glob='*shp')] combined_data = ctx.spark_session.createDataFrame([], schema) for shapefile in shapefiles: # NOQA with tempfile.TemporaryDirectory() as tmp_dir: # Copy all files for the shapefile to the local filesystem # There are multiple files associated with a shapefile, such as .prj and .cpg for shapefile_file in fs.ls(glob=f'{shapefile}.*'): with open(f'{tmp_dir}/{shapefile_file.path}', 'wb') as tmp_file: with fs.open(shapefile_file.path, 'rb') as f: shutil.copyfileobj(f, tmp_file) # Create a GeoJSON geometry column pdf = gpd.read_file(f'{tmp_dir}/{shapefile}.shp') pdf['geometry'] = pdf.geometry.apply(lambda x: json.dumps(mapping(x))) df = ctx.spark_session.createDataFrame(pdf) # Convert everything to EPSG:4326 format expected by Foundry crs = gpd.read_file(f'{tmp_dir}/{shapefile}.shp').crs.to_string() df = df.withColumn( "geoshape", clean_geometry('geometry', crs, lat_long=(crs != "EPSG:4326")) ).select("geoshape") df = df.withColumn('name', F.lit(shapefile)) df = df.withColumn('centroid', geojson_to_geohash(centroid('geoshape'))) combined_data = combined_data.unionByName(df) return output.write_dataframe(combined_data)
geospatial
, shapefile
, geojson
, geohash
, pyspark
, geopandas
How do I perform a geospatial join with a buffer in PySpark?
This code uses the geospatial_tools
library to perform a geospatial join between two datasets, lines, and points, with a 30,000 meters buffer around the points. It then returns a DataFrame with the point_id
and line_id
.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
from pyspark.sql import functions as F from transforms.api import configure, transform_df, Input, Output from geospatial_tools import geospatial from geospatial_tools.functions import buffer, lat_long_to_geometry @configure(profile=['GEOSPARK']) @geospatial() @transform_df( Output(), lines=Input(), points=Input() ) def compute(lines, points): lines = lines.select(F.col('id').alias('line_id'), 'geometry') points = points.withColumn( 'geometry', lat_long_to_geometry('latitude', 'longitude', 'EPSG:4326') ).withColumn('geometry_buff', buffer('geometry', meters=30000) ).select('point_id', 'geometry_buff') df = points.spatial_join( lines, ('geometry_buff', 'geometry'), 'left' ).select(points.point_id, lines.line_id) return df
geospatial
, pyspark
, geospatial_tools
, buffer
, spatial_join