{ "cells": [ { "cell_type": "markdown", "source": [ "# Semantic segmentation of the trip video" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true, "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "# Import libraries\n", "\n", "# Generic library\n", "import io\n", "import os\n", "\n", "import cv2\n", "import numpy as np\n", "import pandas as pd\n", "from PIL import Image\n", "from natsort import natsorted\n", "\n", "# Semantic segmentation model\n", "from torchvision.models.segmentation import fcn_resnet50, FCN_ResNet50_Weights\n", "from torchvision.transforms.functional import to_pil_image, resize, pil_to_tensor\n", "from torchvision.utils import draw_segmentation_masks\n", "\n", "# Creation of the video\n", "from tqdm import tqdm\n", "\n", "from pytrack.analytics import plugins\n", "from pytrack.analytics import video\n", "from pytrack.graph import distance\n", "\n", "# Creation of matched path\n", "from pytrack.graph import graph\n", "from pytrack.graph import utils\n", "from pytrack.matching import candidate, mpmatching_utils, mpmatching" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "def image_to_byte_array(image, format=\"PNG\"):\n", " # BytesIO is a fake file stored in memory\n", " imgByteArr = io.BytesIO()\n", " # image.save expects a file as an argument, passing a bytes io ins\n", " image.save(imgByteArr, format=format)\n", " # Turn the BytesIO object back into a bytes object\n", " imgByteArr = imgByteArr.getvalue()\n", " return imgByteArr\n", "\n", "class Segmenter(plugins.Segmenter):\n", " \"\"\"\n", " For more information see: https://pytorch.org/vision/stable/models.html\n", " \"\"\"\n", " def __init__(self, model, weights):\n", " # Initialize model with the weights\n", " self.weights = weights.DEFAULT\n", " self.model = model(weights=self.weights)\n", " self.model.eval()\n", "\n", " def processing(self, img):\n", " # Initialize the inference transforms\n", " preprocess = self.weights.transforms()\n", " # Apply inference preprocessing transforms\n", " img = preprocess(img)\n", " return img\n", "\n", " def run(self, img):\n", " # Use the model and visualize the prediction\n", " img = pil_to_tensor(Image.open(io.BytesIO(img)))\n", " batch = self.processing(img).unsqueeze(0)\n", "\n", " prediction = self.model(batch)[\"out\"]\n", " normalized_masks = prediction.softmax(dim=1)\n", " class_to_idx = {cls: idx for (idx, cls) in enumerate(self.weights.meta[\"categories\"])}\n", "\n", " pred = draw_segmentation_masks(resize(img, 520), masks=normalized_masks.argmax(1) == class_to_idx['car'], alpha=0.6, colors=\"green\")\n", "\n", " imgByteArr = image_to_byte_array(to_pil_image(pred))\n", " return imgByteArr" ] }, { "cell_type": "code", "execution_count": 3, "outputs": [], "source": [ "# Initialize segmentation model\n", "model = fcn_resnet50\n", "weights = FCN_ResNet50_Weights\n", "segmenter = Segmenter(model, weights)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": 4, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Downloaded 448.33kB\n" ] } ], "source": [ "df = pd.read_excel(\"dataset.xlsx\")\n", "\n", "latitude = df[\"latitude\"].to_list()\n", "longitude = df[\"longitude\"].to_list()\n", "\n", "points = [(lat, lon) for lat, lon in zip(latitude[:30], longitude[:30])]\n", "\n", "# Create BBOX\n", "north, east = np.max(np.array([*points]), 0)\n", "south, west = np.min(np.array([*points]), 0)\n", "\n", "# Extract road graph\n", "G = graph.graph_from_bbox(*distance.enlarge_bbox(north, south, west, east, 500), simplify=True, network_type='drive')\n", "\n", "nodes, edges = utils.graph_to_gdfs(G) # Add to G a geometry attribute describing the geometry of both nodes and edges. TODO: create an autonomous method.\n", "\n", "# Extract candidates\n", "G_interp, candidates = candidate.get_candidates(G, points, interp_dist=5, closest=True, radius=30)\n", "\n", "# Extract trellis DAG graph\n", "trellis = mpmatching_utils.create_trellis(candidates)\n", "\n", "# Perform the map-matching process\n", "path_prob, predecessor = mpmatching.viterbi_search(G_interp, trellis, \"start\", \"target\")\n", "\n", "_, path = mpmatching_utils.create_matched_path(G_interp, trellis, predecessor) # Path expressed through a list of nodes (lat, lng)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": 5, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "100%|██████████| 347/347 [03:19<00:00, 1.74it/s]\n" ] } ], "source": [ "root_dir = \"SV_panoramas\" # Directory where save Google Street View panoramas\n", "api_key = 'Insert your private API key for Google services'\n", "\n", "if not os.path.exists(root_dir):\n", " os.makedirs(root_dir)\n", "\n", "for i in tqdm(range(len(path))):\n", " if not os.path.isdir(os.path.join(root_dir, str(i))):\n", " if i != 0:\n", " point = path[i]\n", " prec_point = path[i - 1]\n", " head = distance.get_bearing(prec_point[0], prec_point[1], point[0], point[1])\n", " else:\n", " point = path[i]\n", " succ_point = path[i + 1]\n", " head = distance.get_bearing(point[0], point[1], succ_point[0], succ_point[1])\n", "\n", " pic, meta = video.extract_streetview_pic(point, api_key, size=\"520x520\", heading=head, pitch=-10)\n", "\n", " if pic is not None:\n", " video.save_streetview(pic, meta, os.path.join(root_dir, str(i)), model=segmenter)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": 6, "outputs": [], "source": [ "# Create video of the path\n", "root_dir = \"SV_panoramas\"\n", "\n", "images = list()\n", "for root, dirs, files in os.walk(root_dir):\n", " for file in files:\n", " if file.endswith(\"pic_seg.png\"):\n", " images += [os.path.join(root, file)]\n", "\n", "images = natsorted(images)\n", "\n", "fourcc = cv2.VideoWriter_fourcc(*\"avc1\")\n", "video_path = os.path.join(\"video_seg.mp4\")\n", "\n", "video.make_video(images, video_path, fourcc, fps=16, size=(520, 520), is_color=True)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": 7, "outputs": [ { "data": { "text/plain": "", "text/html": "" }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from IPython.display import Video\n", "\n", "Video(\"video_seg.mp4\", embed=True, width=520, height=520)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 1 }