diff --git a/apps/api_web/lib/api_web/controllers/stop_event_controller.ex b/apps/api_web/lib/api_web/controllers/stop_event_controller.ex new file mode 100644 index 00000000..e5ad7f7f --- /dev/null +++ b/apps/api_web/lib/api_web/controllers/stop_event_controller.ex @@ -0,0 +1,306 @@ +defmodule ApiWeb.StopEventController do + @moduledoc """ + Controller for Stop Events. Filterable by: + + * trip + * stop + * route + * vehicle + * direction_id + """ + use ApiWeb.Web, :api_controller + alias State.StopEvent + + @filters ~w(trip stop route vehicle direction_id) + @includes ~w(trip stop route vehicle) + @pagination_opts [:offset, :limit, :order_by] + @description """ + Stop events represent the actual arrival and departure times of vehicles at stops along their trips. + This is historical data showing when vehicles actually arrived at or departed from stops, as opposed + to predictions or scheduled times. + + Each stop event contains: + - The actual arrival time (as Unix epoch seconds) + - The actual departure time (as Unix epoch seconds) + - The stop sequence number + - Whether the trip was a revenue trip + + Stop events are identified by a composite key of trip_id, route_id, vehicle_id, and stop_sequence. + """ + + def state_module, do: State.StopEvent + + swagger_path :index do + get(path("stop_event", :index)) + + description(""" + List of stop events. + + #{@description} + """) + + common_index_parameters(__MODULE__, :stop_event) + + include_parameters() + + parameter( + "filter[trip]", + :query, + :string, + "Filter by trip ID. #{comma_separated_list()}.", + example: "73885810" + ) + + parameter( + "filter[stop]", + :query, + :string, + "Filter by stop ID. #{comma_separated_list()}.", + example: "2231" + ) + + parameter( + "filter[route]", + :query, + :string, + "Filter by route ID. #{comma_separated_list()}.", + example: "64" + ) + + parameter( + "filter[vehicle]", + :query, + :string, + "Filter by vehicle ID. #{comma_separated_list()}.", + example: "y2071" + ) + + filter_param(:direction_id) + + consumes("application/vnd.api+json") + produces("application/vnd.api+json") + response(200, "OK", Schema.ref(:StopEvents)) + response(400, "Bad Request", Schema.ref(:BadRequest)) + response(403, "Forbidden", Schema.ref(:Forbidden)) + response(429, "Too Many Requests", Schema.ref(:TooManyRequests)) + end + + def index_data(conn, params) do + with :ok <- Params.validate_includes(params, @includes, conn), + {:ok, filtered} <- Params.filter_params(params, @filters, conn) do + formatted_filters = format_filters(filtered) + + if map_size(formatted_filters) == 0 do + {:error, :filter_required} + else + formatted_filters + |> StopEvent.filter_by() + |> State.all(pagination_opts(params, conn)) + end + else + {:error, _, _} = error -> error + end + end + + @spec format_filters(%{optional(String.t()) => String.t()}) :: StopEvent.filters() + defp format_filters(filters) do + Enum.reduce(filters, %{}, fn + {"trip", trip_ids}, acc -> + Map.put(acc, :trip_ids, Params.split_on_comma(trip_ids)) + + {"stop", stop_ids}, acc -> + Map.put(acc, :stop_ids, Params.split_on_comma(stop_ids)) + + {"route", route_ids}, acc -> + Map.put(acc, :route_ids, Params.split_on_comma(route_ids)) + + {"vehicle", vehicle_ids}, acc -> + Map.put(acc, :vehicle_ids, Params.split_on_comma(vehicle_ids)) + + {"direction_id", direction_id}, acc -> + Map.put(acc, :direction_id, Params.direction_id(%{"direction_id" => direction_id})) + + _, acc -> + acc + end) + end + + defp pagination_opts(params, conn) do + opts = + params + |> Params.filter_opts(@pagination_opts, conn) + + if is_list(opts) do + Keyword.put_new(opts, :order_by, {:id, :asc}) + else + opts + |> Map.to_list() + |> Keyword.put_new(:order_by, {:id, :asc}) + end + end + + swagger_path :show do + get(path("stop_event", :show)) + + description(""" + Show a particular stop event by its composite ID. + + #{@description} + """) + + parameter( + :id, + :path, + :string, + "Unique identifier for stop event (trip_id-route_id-vehicle_id-stop_sequence)" + ) + + include_parameters() + + consumes("application/vnd.api+json") + produces("application/vnd.api+json") + + response(200, "OK", Schema.ref(:StopEvent)) + response(403, "Forbidden", Schema.ref(:Forbidden)) + response(404, "Not Found", Schema.ref(:NotFound)) + response(429, "Too Many Requests", Schema.ref(:TooManyRequests)) + end + + def show_data(_conn, %{"id" => id}) do + StopEvent.by_id(id) + end + + defp include_parameters(schema) do + ApiWeb.SwaggerHelpers.include_parameters( + schema, + @includes, + description: """ + | include | Description | + |-|-| + | `trip` | The trip associated with this stop event. | + | `stop` | The stop where the event occurred. | + | `route` | The route associated with this stop event. | + | `vehicle` | The vehicle that served this trip. | + """ + ) + end + + def swagger_definitions do + import PhoenixSwagger.JsonApi, except: [page: 1] + + %{ + StopEventResource: + resource do + description(""" + Actual arrival and departure times of vehicles at stops. + """) + + attributes do + vehicle_id( + :string, + """ + The vehicle ID that served this trip. + """, + example: "y2071" + ) + + start_date( + :string, + """ + The service date of the trip in YYYY-MM-DD format. + """, + example: "2026-02-24", + format: :date + ) + + trip_id( + :string, + """ + The trip ID associated with this stop event. + """, + example: "73885810" + ) + + direction_id( + :integer, + """ + Direction in which the trip is traveling: + - `0` - Travel in one direction (e.g. outbound travel) + - `1` - Travel in the opposite direction (e.g. inbound travel) + """, + enum: [0, 1], + example: 0 + ) + + route_id( + :string, + """ + The route ID associated with this stop event. + """, + example: "64" + ) + + start_time( + :string, + """ + The scheduled start time of the trip in HH:MM:SS format. + """, + example: "16:07:00" + ) + + revenue( + :string, + """ + Whether this stop event is for a revenue trip: + - `REVENUE` - A revenue trip + - `NON_REVENUE` - A non-revenue trip + """, + enum: ["REVENUE", "NON_REVENUE"], + example: "REVENUE" + ) + + stop_id( + :string, + """ + The stop ID where the event occurred. + """, + example: "2231" + ) + + stop_sequence( + :integer, + """ + The stop sequence number along the trip. Increases monotonically but values need not be consecutive. + """, + example: 1 + ) + + arrived( + [:integer, :null], + """ + When the vehicle arrived at the stop, as seconds since Unix epoch (UTC). `null` if the first stop on the trip. + """, + example: 1_771_966_486, + "x-nullable": true + ) + + departed( + [:integer, :null], + """ + When the vehicle departed from the stop, as seconds since Unix epoch (UTC). `null` if the last stop on the trip or if the vehicle has not yet departed. + """, + example: 1_771_967_246, + "x-nullable": true + ) + end + + relationship(:trip) + relationship(:stop) + relationship(:route) + relationship(:vehicle) + end, + StopEvents: page(:StopEventResource), + StopEvent: single(:StopEventResource) + } + end +end diff --git a/apps/api_web/lib/api_web/router.ex b/apps/api_web/lib/api_web/router.ex index 8963943c..d8be1b90 100644 --- a/apps/api_web/lib/api_web/router.ex +++ b/apps/api_web/lib/api_web/router.ex @@ -100,6 +100,8 @@ defmodule ApiWeb.Router do resources("/live_facilities", LiveFacilityController, only: [:index, :show]) resources("/live-facilities", LiveFacilityController, only: [:index, :show]) resources("/services", ServiceController, only: [:index, :show]) + resources("/stop_events", StopEventController, only: [:index, :show]) + resources("/stop-events", StopEventController, only: [:index, :show]) end scope "/docs/swagger" do diff --git a/apps/api_web/lib/api_web/views/stop_event_view.ex b/apps/api_web/lib/api_web/views/stop_event_view.ex new file mode 100644 index 00000000..00d89635 --- /dev/null +++ b/apps/api_web/lib/api_web/views/stop_event_view.ex @@ -0,0 +1,66 @@ +defmodule ApiWeb.StopEventView do + use ApiWeb.Web, :api_view + + location(:stop_event_location) + + def stop_event_location(stop_event, conn), + do: stop_event_path(conn, :show, stop_event.id) + + has_one( + :trip, + type: :trip, + serializer: ApiWeb.TripView, + field: :trip_id + ) + + has_one( + :stop, + type: :stop, + serializer: ApiWeb.StopView, + field: :stop_id + ) + + has_one( + :route, + type: :route, + serializer: ApiWeb.RouteView, + field: :route_id + ) + + has_one( + :vehicle, + type: :vehicle, + serializer: ApiWeb.VehicleView, + field: :vehicle_id + ) + + attributes([ + :vehicle_id, + :start_date, + :trip_id, + :direction_id, + :route_id, + :start_time, + :revenue, + :stop_id, + :stop_sequence, + :arrived, + :departed + ]) + + def trip(%{trip_id: trip_id}, conn) do + optional_relationship("trip", trip_id, &State.Trip.by_primary_id/1, conn) + end + + def stop(%{stop_id: stop_id}, conn) do + optional_relationship("stop", stop_id, &State.Stop.by_id/1, conn) + end + + def route(%{route_id: route_id}, conn) do + optional_relationship("route", route_id, &State.Route.by_id/1, conn) + end + + def vehicle(%{vehicle_id: vehicle_id}, conn) do + optional_relationship("vehicle", vehicle_id, &State.Vehicle.by_id/1, conn) + end +end diff --git a/apps/api_web/test/api_web/controllers/stop_event_controller_test.exs b/apps/api_web/test/api_web/controllers/stop_event_controller_test.exs new file mode 100644 index 00000000..fc2dd4a4 --- /dev/null +++ b/apps/api_web/test/api_web/controllers/stop_event_controller_test.exs @@ -0,0 +1,688 @@ +defmodule ApiWeb.StopEventControllerTest do + @moduledoc false + use ApiWeb.ConnCase + + alias Model.StopEvent + + setup %{conn: conn} do + {:ok, conn: put_req_header(conn, "accept", "application/json")} + end + + describe "index_data/2" do + test "returns 400 with no filters", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + start_date: ~D[2026-02-24], + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + start_time: "10:00:00", + revenue: :REVENUE, + stop_id: "stop1", + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip2-route2-v2-1", + vehicle_id: "v2", + start_date: ~D[2026-02-24], + trip_id: "trip2", + direction_id: 1, + route_id: "route2", + start_time: "11:00:00", + revenue: :NON_REVENUE, + stop_id: "stop2", + stop_sequence: 1, + arrived: 1_771_968_343, + departed: nil + } + ]) + + conn = get(conn, stop_event_path(conn, :index)) + + assert json_response(conn, 400)["errors"] == [ + %{ + "status" => "400", + "code" => "bad_request", + "detail" => "At least one filter[] is required." + } + ] + end + + test "conforms to swagger response", %{swagger_schema: schema, conn: conn} do + stop_event = %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + start_date: ~D[2026-02-24], + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + start_time: "10:00:00", + revenue: :REVENUE, + stop_id: "stop1", + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + } + + State.StopEvent.new_state([stop_event]) + + response = get(conn, stop_event_path(conn, :index, %{"filter" => %{"trip" => "trip1"}})) + assert validate_resp_schema(response, schema, "StopEvents") + end + + test "can filter by trip", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip2-route2-v2-1", + vehicle_id: "v2", + trip_id: "trip2", + direction_id: 0, + route_id: "route2", + stop_id: "stop2", + start_date: ~D[2026-02-24], + start_time: "11:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_968_343, + departed: nil + } + ]) + + conn = get(conn, stop_event_path(conn, :index, %{"filter" => %{"trip" => "trip1"}})) + + assert [%{"id" => "trip1-route1-v1-1"}] = json_response(conn, 200)["data"] + end + + test "can filter by stop", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip1-route1-v1-2", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop2", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 2, + arrived: 1_771_967_286, + departed: 1_771_967_333 + } + ]) + + conn = get(conn, stop_event_path(conn, :index, %{"filter" => %{"stop" => "stop2"}})) + + assert [%{"id" => "trip1-route1-v1-2"}] = json_response(conn, 200)["data"] + end + + test "can filter by route", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip2-route2-v2-1", + vehicle_id: "v2", + trip_id: "trip2", + direction_id: 0, + route_id: "route2", + stop_id: "stop2", + start_date: ~D[2026-02-24], + start_time: "11:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_968_343, + departed: nil + } + ]) + + conn = get(conn, stop_event_path(conn, :index, %{"filter" => %{"route" => "route1"}})) + + assert [%{"id" => "trip1-route1-v1-1"}] = json_response(conn, 200)["data"] + end + + test "can filter by direction_id", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip2-route2-v2-1", + vehicle_id: "v2", + trip_id: "trip2", + direction_id: 1, + route_id: "route2", + stop_id: "stop2", + start_date: ~D[2026-02-24], + start_time: "11:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_968_343, + departed: nil + } + ]) + + conn = + get(conn, stop_event_path(conn, :index, %{"filter" => %{"direction_id" => "0"}})) + + assert [%{"id" => "trip1-route1-v1-1"}] = json_response(conn, 200)["data"] + end + + test "can filter by vehicle", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip2-route2-v2-1", + vehicle_id: "v2", + trip_id: "trip2", + direction_id: 0, + route_id: "route2", + stop_id: "stop2", + start_date: ~D[2026-02-24], + start_time: "11:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_968_343, + departed: nil + } + ]) + + conn = get(conn, stop_event_path(conn, :index, %{"filter" => %{"vehicle" => "v1"}})) + + assert [%{"id" => "trip1-route1-v1-1"}] = json_response(conn, 200)["data"] + end + + test "can filter by vehicle and route simultaneously", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip2-route1-v2-1", + vehicle_id: "v2", + trip_id: "trip2", + direction_id: 0, + route_id: "route1", + stop_id: "stop2", + start_date: ~D[2026-02-24], + start_time: "11:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_968_343, + departed: nil + } + ]) + + conn = + get( + conn, + stop_event_path(conn, :index, %{ + "filter" => %{"vehicle" => "v1", "route" => "route1"} + }) + ) + + assert [%{"id" => "trip1-route1-v1-1"}] = json_response(conn, 200)["data"] + end + + test "can filter by route and direction_id simultaneously", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip2-route1-v2-2", + vehicle_id: "v2", + trip_id: "trip2", + direction_id: 1, + route_id: "route1", + stop_id: "stop2", + start_date: ~D[2026-02-24], + start_time: "11:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_968_343, + departed: nil + }, + %StopEvent{ + id: "trip3-route2-v3-3", + vehicle_id: "v3", + trip_id: "trip3", + direction_id: 0, + route_id: "route2", + stop_id: "stop3", + start_date: ~D[2026-02-24], + start_time: "12:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_969_000, + departed: 1_771_969_100 + } + ]) + + conn = + get( + conn, + stop_event_path(conn, :index, %{ + "filter" => %{"route" => "route1", "direction_id" => "0"} + }) + ) + + assert [%{"id" => "trip1-route1-v1-1"}] = json_response(conn, 200)["data"] + end + + test "can filter by trip and stop simultaneously", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip1-route1-v1-2", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop2", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 2, + arrived: 1_771_967_286, + departed: 1_771_967_333 + }, + %StopEvent{ + id: "trip2-route1-v2-2", + vehicle_id: "v2", + trip_id: "trip2", + direction_id: 0, + route_id: "route1", + stop_id: "stop2", + start_date: ~D[2026-02-24], + start_time: "11:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_968_343, + departed: nil + } + ]) + + conn = + get( + conn, + stop_event_path(conn, :index, %{"filter" => %{"trip" => "trip1", "stop" => "stop2"}}) + ) + + assert [%{"id" => "trip1-route1-v1-2"}] = json_response(conn, 200)["data"] + end + + test "can filter by route, stop, and direction_id simultaneously", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip2-route1-1", + trip_id: "trip2", + direction_id: 1, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "11:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_968_343, + departed: nil + }, + %StopEvent{ + id: "trip3-route1-v3-2", + vehicle_id: "v3", + trip_id: "trip3", + direction_id: 0, + route_id: "route1", + stop_id: "stop2", + start_date: ~D[2026-02-24], + start_time: "12:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_969_000, + departed: 1_771_969_100 + }, + %StopEvent{ + id: "trip4-route2-v4-1", + vehicle_id: "v4", + trip_id: "trip4", + direction_id: 0, + route_id: "route2", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "13:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_970_000, + departed: 1_771_970_200 + } + ]) + + conn = + get( + conn, + stop_event_path(conn, :index, %{ + "filter" => %{"route" => "route1", "stop" => "stop1", "direction_id" => "0"} + }) + ) + + assert [%{"id" => "trip1-route1-v1-1"}] = json_response(conn, 200)["data"] + end + + test "can filter by multiple trips, routes, and stops simultaneously", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip2-route1-v2-2", + vehicle_id: "v2", + trip_id: "trip2", + direction_id: 0, + route_id: "route1", + stop_id: "stop2", + start_date: ~D[2026-02-24], + start_time: "11:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_968_343, + departed: nil + }, + %StopEvent{ + id: "trip3-route2-v3-3", + vehicle_id: "v3", + trip_id: "trip3", + direction_id: 0, + route_id: "route2", + stop_id: "stop3", + start_date: ~D[2026-02-24], + start_time: "12:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_969_000, + departed: 1_771_969_100 + }, + %StopEvent{ + id: "trip2-route2-v2-1", + vehicle_id: "v2", + trip_id: "trip2", + direction_id: 1, + route_id: "route2", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "13:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_970_000, + departed: 1_771_970_200 + } + ]) + + conn = + get( + conn, + stop_event_path(conn, :index, %{ + "filter" => %{"trip" => "trip1,trip2", "route" => "route1,route2", "stop" => "stop1"} + }) + ) + + response = json_response(conn, 200)["data"] + ids = response |> Enum.map(& &1["id"]) |> Enum.sort() + # Both trip1-route1-stop1 and trip2-route2-stop1 match the filters + assert ids == ["trip1-route1-v1-1", "trip2-route2-v2-1"] + end + + test "returns empty when filters match no records", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + } + ]) + + conn = + get( + conn, + stop_event_path(conn, :index, %{ + "filter" => %{"route" => "route1", "direction_id" => "1"} + }) + ) + + assert [] = json_response(conn, 200)["data"] + end + + test "pagination works", %{conn: conn} do + State.StopEvent.new_state([ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop1", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip1-route1-v1-2", + vehicle_id: "v1", + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + stop_id: "stop2", + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 2, + arrived: 1_771_967_286, + departed: 1_771_967_333 + } + ]) + + conn = + get( + conn, + stop_event_path(conn, :index, %{ + "filter" => %{"trip" => "trip1"}, + "page" => %{"limit" => "1", "offset" => "0"} + }) + ) + + response = json_response(conn, 200) + assert length(response["data"]) == 1 + assert response["links"]["next"] + end + end + + describe "show_data/2" do + test "shows chosen resource", %{conn: conn} do + stop_event = %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + start_date: ~D[2026-02-24], + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + start_time: "10:00:00", + revenue: :REVENUE, + stop_id: "stop1", + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + } + + State.StopEvent.new_state([stop_event]) + + conn = get(conn, stop_event_path(conn, :show, stop_event.id)) + assert json_response(conn, 200)["data"]["id"] == stop_event.id + end + + test "conforms to swagger response", %{swagger_schema: schema, conn: conn} do + stop_event = %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + start_date: ~D[2026-02-24], + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + start_time: "10:00:00", + revenue: :REVENUE, + stop_id: "stop1", + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + } + + State.StopEvent.new_state([stop_event]) + + response = get(conn, stop_event_path(conn, :show, stop_event.id)) + assert validate_resp_schema(response, schema, "StopEvent") + end + + test "does not show resource when id is nonexistent", %{conn: conn} do + conn = get(conn, stop_event_path(conn, :show, "nonexistent")) + assert json_response(conn, 404) + end + end +end diff --git a/apps/model/lib/model/stop_event.ex b/apps/model/lib/model/stop_event.ex new file mode 100644 index 00000000..2cf3d47f --- /dev/null +++ b/apps/model/lib/model/stop_event.ex @@ -0,0 +1,66 @@ +defmodule Model.StopEvent do + @moduledoc """ + The actual `arrival_time` and `departure_time` of a `vehicle_id` to/from a `stop_sequence` in a `trip_id`. + along a trip (`trip_id`) going a direction (`direction_id`) along a route (`route_id`). This is the actual time a vehicle arrived at or departed from a stop, as opposed to a prediction of when a vehicle will arrive at or depart from a stop (`Model.Prediction.t`) or the scheduled time of arrival or departure (`Model.Schedule.t`). + + For the predicted times, see `Model.Prediction.t`. + For the scheduled times, see `Model.Schedule.t`. + """ + + use Recordable, [ + :id, + :vehicle_id, + :start_date, + :trip_id, + :direction_id, + :route_id, + :start_time, + :revenue, + :stop_id, + :stop_sequence, + :arrived, + :departed + ] + + @typedoc """ + * `:id` - Composite key: `{trip_id}-{route_id}-{vehicle_id}-{stop_sequence}`. + * `:vehicle_id` - The vehicle serving this trip. See + [GTFS Realtime `FeedMessage` `FeedEntity` `VehiclePosition` `VehicleDescriptor` `id`](https://github.com/google/transit/blob/master/gtfs-realtime/spec/en/reference.md#message-vehicledescriptor). + * `:start_date` - The service date of the `trip_id`. + * `:trip_id` - The trip the `stop_id` is on. See [GTFS Realtime `FeedMesage` `FeedEntity` `TripUpdate` `TripDescriptor`](https://github.com/google/transit/blob/master/gtfs-realtime/spec/en/reference.md#message-tripdescriptor) + * `:direction_id` - Which direction along `route_id` the `trip_id` is going. See + [GTFS `trips.txt` `direction_id`](https://github.com/google/transit/blob/master/gtfs/spec/en/reference.md#tripstxt). + * `:route_id` - The route `trip_id` is on doing in `direction_id`. See + [GTFS `trips.txt` `route_id`](https://github.com/google/transit/blob/master/gtfs/spec/en/reference.md#tripstxt) + * `start_time` - The time the `trip_id` was scheduled to start. + * `:revenue` - Whether or not the stop event is for a revenue trip. + * `:stop_id` - Stop associated with arrived/departed. See + [GTFS Realtime `FeedMesage` `FeedEntity` `TripUpdate` `StopTimeUpdate` `stop_id`](https://github.com/google/transit/blob/master/gtfs-realtime/spec/en/reference.md#message-stoptimeupdate). + * `:stop_sequence` - The sequence of the stop along the `trip_id`. The stop sequence increases monotonically but values need not be consecutive. + See [GTFS `stop_times.txt` `stop_sequence`](https://github.com/google/transit/blob/master/gtfs/spec/en/reference.md#stop_timestxt). + * `:arrived` - When the vehicle arrived at the stop as seconds since Unix epoch (UTC). `nil` if the first stop (`stop_id`) on the `trip_id`. + * `:departed` - When the vehicle departed from the stop as seconds since Unix epoch (UTC). `nil` if the last stop (`stop_id`) on the `trip_id`. + """ + @type unix_timestamp :: non_neg_integer + + @type t :: %__MODULE__{ + id: String.t(), + vehicle_id: Model.Vehicle.id(), + start_date: Date.t(), + trip_id: Model.Trip.id(), + direction_id: Model.Direction.id(), + route_id: Model.Route.id(), + start_time: String.t(), + stop_id: Model.Stop.id(), + stop_sequence: non_neg_integer, + revenue: :REVENUE | :NON_REVENUE, + arrived: unix_timestamp | nil, + departed: unix_timestamp | nil + } + + @spec trip_id(t) :: Model.Trip.id() + def trip_id(%__MODULE__{trip_id: trip_id}), do: trip_id + + @spec vehicle_id(t) :: String.t() + def vehicle_id(%__MODULE__{vehicle_id: vehicle_id}), do: vehicle_id +end diff --git a/apps/parse/lib/parse/stop_events.ex b/apps/parse/lib/parse/stop_events.ex new file mode 100644 index 00000000..896e90f0 --- /dev/null +++ b/apps/parse/lib/parse/stop_events.ex @@ -0,0 +1,90 @@ +defmodule Parse.StopEvents do + @moduledoc """ + Parses stop_events new line-delimited JSON into a list of `%Model.StopEvent{}` structs. + """ + + require Logger + + @behaviour Parse + + @impl Parse + def parse(body) do + body + |> String.split("\n", trim: true) + |> Enum.map(&parse_line/1) + |> Enum.reject(&is_nil/1) + end + + defp parse_line(line) do + case Jason.decode(line) do + {:ok, record} -> + parse_record(record) + + e -> + Logger.warning("#{__MODULE__} decode_error e=#{inspect(e)}") + nil + end + end + + defp parse_record( + %{ + "start_date" => start_date, + "id" => id, + "trip_id" => trip_id, + "vehicle_id" => vehicle_id, + "direction_id" => direction_id, + "route_id" => route_id, + "start_time" => start_time, + "revenue" => revenue, + "stop_id" => stop_id, + "stop_sequence" => stop_sequence + } = record + ) do + case parse_date(start_date) do + {:ok, date} -> + case parse_revenue(revenue) do + {:ok, revenue_atom} -> + %Model.StopEvent{ + id: id, + vehicle_id: vehicle_id, + start_date: date, + trip_id: trip_id, + direction_id: direction_id, + route_id: route_id, + start_time: start_time, + revenue: revenue_atom, + stop_id: stop_id, + stop_sequence: stop_sequence, + arrived: Map.get(record, "arrived"), + departed: Map.get(record, "departed") + } + + {:error, reason} -> + Logger.warning("#{__MODULE__} parse_error error=#{reason} record=#{inspect(record)}") + nil + end + + {:error, reason} -> + Logger.warning("#{__MODULE__} parse_error error=#{reason} record=#{inspect(record)}") + nil + end + end + + defp parse_record(record) do + Logger.warning("#{__MODULE__} parse_error error=missing_fields #{inspect(record)}") + nil + end + + defp parse_date(<>) do + case Date.new(String.to_integer(year), String.to_integer(month), String.to_integer(day)) do + {:ok, date} -> {:ok, date} + _ -> {:error, :invalid_date} + end + end + + defp parse_date(_), do: {:error, :invalid_date} + + defp parse_revenue(true), do: {:ok, :REVENUE} + defp parse_revenue(false), do: {:ok, :NON_REVENUE} + defp parse_revenue(_), do: {:error, :invalid_revenue} +end diff --git a/apps/parse/test/parse/stop_events_test.exs b/apps/parse/test/parse/stop_events_test.exs new file mode 100644 index 00000000..e5f23b4c --- /dev/null +++ b/apps/parse/test/parse/stop_events_test.exs @@ -0,0 +1,160 @@ +defmodule Parse.StopEventsTest do + use ExUnit.Case + import ExUnit.CaptureLog + + import Parse.StopEvents + alias Model.StopEvent + + describe "parse" do + test "parses valid NDJSON data with multiple stop events" do + ndjson = """ + {"id":"73885810-64-y2071-1","timestamp":1771968343,"start_date":"20260224","trip_id":"73885810","vehicle_id":"y2071","direction_id":0,"route_id":"64","start_time":"16:07:00","revenue":true,"stop_id":"2231","stop_sequence":1,"arrived":1771966486,"departed":1771967246} + {"id":"73885810-64-y2071-2","timestamp":1771968343,"start_date":"20260224","trip_id":"73885810","vehicle_id":"y2071","direction_id":0,"route_id":"64","start_time":"16:07:00","revenue":true,"stop_id":"12232","stop_sequence":2,"arrived":1771967286,"departed":1771967333} + {"id":"73221192-Green-E-G-10077-4","timestamp":1771950045,"start_date":"20260224","trip_id":"73221192","vehicle_id":"G-10077","direction_id":0,"route_id":"Green-E","start_time":"10:16:00","revenue":true,"stop_id":"70512","stop_sequence":4,"arrived":1771946303,"departed":1771946479} + """ + + result = parse(ndjson) + + assert length(result) == 3 + + assert %StopEvent{ + id: "73885810-64-y2071-1", + vehicle_id: "y2071", + start_date: ~D[2026-02-24], + trip_id: "73885810", + direction_id: 0, + route_id: "64", + start_time: "16:07:00", + revenue: :REVENUE, + stop_id: "2231", + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + } in result + + assert %StopEvent{ + id: "73885810-64-y2071-2", + vehicle_id: "y2071", + start_date: ~D[2026-02-24], + trip_id: "73885810", + direction_id: 0, + route_id: "64", + start_time: "16:07:00", + revenue: :REVENUE, + stop_id: "12232", + stop_sequence: 2, + arrived: 1_771_967_286, + departed: 1_771_967_333 + } in result + + assert %StopEvent{ + id: "73221192-Green-E-G-10077-4", + vehicle_id: "G-10077", + start_date: ~D[2026-02-24], + trip_id: "73221192", + direction_id: 0, + route_id: "Green-E", + start_time: "10:16:00", + revenue: :REVENUE, + stop_id: "70512", + stop_sequence: 4, + arrived: 1_771_946_303, + departed: 1_771_946_479 + } in result + end + + test "handles null departed times for last stop" do + ndjson = """ + {"id":"test-trip-1","timestamp":1771968343,"start_date":"20260224","trip_id":"test","vehicle_id":"v1","direction_id":0,"route_id":"1","start_time":"10:00:00","revenue":true,"stop_id":"stop1","stop_sequence":1,"arrived":1771966486,"departed":null} + """ + + result = parse(ndjson) + + assert [%StopEvent{departed: nil}] = result + end + + test "handles null arrived times for first stop" do + ndjson = """ + {"id":"test-trip-1","timestamp":1771968343,"start_date":"20260224","trip_id":"test","vehicle_id":"v1","direction_id":0,"route_id":"1","start_time":"10:00:00","revenue":true,"stop_id":"stop1","stop_sequence":1,"arrived":null,"departed":1771967246} + """ + + result = parse(ndjson) + + assert [%StopEvent{arrived: nil}] = result + end + + test "handles non-revenue trips" do + ndjson = """ + {"id":"test-trip-1","timestamp":1771968343,"start_date":"20260224","trip_id":"test","vehicle_id":"v1","direction_id":0,"route_id":"1","start_time":"10:00:00","revenue":false,"stop_id":"stop1","stop_sequence":1,"arrived":1771966486,"departed":1771967246} + """ + + result = parse(ndjson) + + assert [%StopEvent{revenue: :NON_REVENUE}] = result + end + + test "ignores empty lines in NDJSON" do + ndjson = """ + + {"id":"test-trip-1","timestamp":1771968343,"start_date":"20260224","trip_id":"test","vehicle_id":"v1","direction_id":0,"route_id":"1","start_time":"10:00:00","revenue":true,"stop_id":"stop1","stop_sequence":1,"arrived":1771966486,"departed":1771967246} + + """ + + result = parse(ndjson) + + assert length(result) == 1 + end + + test "parses stop events with optional arrived/departed fields" do + ndjson = """ + {"id":"first-stop-1","timestamp":1771968343,"start_date":"20260224","trip_id":"arrived","vehicle_id":"v1","direction_id":0,"route_id":"1","start_time":"10:00:00","revenue":true,"stop_id":"stop1","stop_sequence":1,"departed":1771967246} + {"id":"last-stop-1","timestamp":1771968343,"start_date":"20260224","trip_id":"departed","vehicle_id":"v1","direction_id":0,"route_id":"1","start_time":"10:00:00","revenue":true,"stop_id":"stop1","stop_sequence":1,"arrived":1771966486} + {"id":"middle-stop-1","timestamp":1771968343,"start_date":"20260224","trip_id":"both-times","vehicle_id":"v2","direction_id":0,"route_id":"1","start_time":"10:00:00","revenue":true,"stop_id":"stop2","stop_sequence":1,"arrived":1771966486,"departed":1771967246} + """ + + result = parse(ndjson) + + assert [ + %StopEvent{trip_id: "arrived", arrived: nil, departed: 1_771_967_246}, + %StopEvent{trip_id: "departed", arrived: 1_771_966_486, departed: nil}, + %StopEvent{trip_id: "both-times", arrived: 1_771_966_486, departed: 1_771_967_246} + ] = result + end + + test "logs and ignores lines with missing required fields" do + ndjson = """ + {"id":"missing-stop-id-1","timestamp":1771968343,"start_date":"20260224","trip_id":"missing","vehicle_id":"v1","direction_id":0,"route_id":"1","start_time":"10:00:00","revenue":true,"stop_sequence":1,"arrived":1771966486,"departed":1771967246} + """ + + log = + capture_log(fn -> + result = parse(ndjson) + assert result == [] + end) + + assert log =~ "missing_fields" + end + + test "logs and ignores lines with invalid date format" do + ndjson = """ + {"id":"test-trip-1","timestamp":1771968343,"start_date":"invalid","trip_id":"test","vehicle_id":"v1","direction_id":0,"route_id":"1","start_time":"10:00:00","revenue":true,"stop_id":"stop1","stop_sequence":1,"arrived":1771966486,"departed":1771967246} + """ + + log = + capture_log(fn -> + assert parse(ndjson) == [] + end) + + assert log =~ "invalid_date" + end + + test "handles invalid JSON" do + log = + capture_log(fn -> + assert parse("{abc\n{def}") == [] + end) + + assert log =~ "decode_error" + end + end +end diff --git a/apps/state/lib/state.ex b/apps/state/lib/state.ex index 3638c4dc..a4d01c72 100644 --- a/apps/state/lib/state.ex +++ b/apps/state/lib/state.ex @@ -36,7 +36,8 @@ defmodule State do State.RoutesByService, State.Shape, State.Feed, - State.CommuterRailOccupancy + State.CommuterRailOccupancy, + State.StopEvent ] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html diff --git a/apps/state/lib/state/stop_event.ex b/apps/state/lib/state/stop_event.ex new file mode 100644 index 00000000..1b3b2475 --- /dev/null +++ b/apps/state/lib/state/stop_event.ex @@ -0,0 +1,186 @@ +defmodule State.StopEvent do + @moduledoc """ + State for stop events - actual arrival/departure times of vehicles at stops + """ + use State.Server, + indices: [:id, :trip_id, :stop_id, :route_id, :vehicle_id], + parser: Parse.StopEvents, + recordable: Model.StopEvent + + alias Model.Route + alias Model.Stop + alias Model.StopEvent + alias Model.Trip + alias Model.Vehicle + + @type filters :: %{ + optional(:trip_ids) => [Trip.id()], + optional(:stop_ids) => [Stop.id()], + optional(:route_ids) => [Route.id()], + optional(:vehicle_ids) => [Vehicle.id()], + optional(:direction_id) => Model.Direction.id() + } + + # Filter keys ordered by typical selectivity (most selective first) + @filter_keys [:trip_ids, :vehicle_ids, :stop_ids, :route_ids] + + @spec by_id(String.t()) :: StopEvent.t() | nil + def by_id(id) do + case super(id) do + [] -> nil + [stop_event] -> stop_event + end + end + + @doc """ + Filters stop events based on the provided filter criteria. + + At least one filter should be provided for efficient querying. The function + automatically selects the most selective index based on the number of values + in each filter list. + + ## Options + + Accepts the same options as `State.all/2`: + * `:limit` - Maximum number of results to return + * `:offset` - Number of results to skip + * `:order_by` - Field(s) to sort by, e.g. `{:arrived, :asc}` + + ## Examples + + filter_by(%{trip_ids: ["trip1"]}) + filter_by(%{route_ids: ["Red"], direction_id: 0}, limit: 10) + + """ + @spec filter_by(filters(), Keyword.t()) :: + [StopEvent.t()] | {[StopEvent.t()], State.Pagination.Offsets.t()} + def filter_by(filters, opts \\ []) + + def filter_by(%{} = filters, opts) when map_size(filters) == 0 do + all() + |> State.all(opts) + end + + def filter_by(filters, opts) do + case select_best_index(filters) do + {:direction_id_only, direction_id} -> + # direction_id alone requires full scan + all() + |> filter_by_direction(direction_id) + |> State.all(opts) + + {:single_filter, filter_key, values} -> + # Single indexed filter - use index directly, no MapSet needed + values + |> fetch_by_index(filter_key) + |> maybe_filter_direction(filters[:direction_id]) + |> State.all(opts) + + {:multi_filter, primary_key, primary_values, remaining_filters} -> + # Multiple filters - use best index, then apply remaining with MapSets + primary_values + |> fetch_by_index(primary_key) + |> apply_additional_filters(remaining_filters) + |> State.all(opts) + + :no_filters -> + [] + end + end + + # Selects the best index based on filter selectivity (smallest list first) + defp select_best_index(filters) do + indexed_filters = + @filter_keys + |> Enum.map(fn key -> {key, Map.get(filters, key)} end) + |> Enum.filter(fn {_key, values} -> is_list(values) and values != [] end) + |> Enum.sort_by(fn {_key, values} -> length(values) end) + + direction_id = filters[:direction_id] + + case {indexed_filters, direction_id} do + {[], nil} -> + :no_filters + + {[], direction_id} -> + {:direction_id_only, direction_id} + + {[{key, values}], nil} -> + {:single_filter, key, values} + + {[{key, values}], direction_id} -> + # Single indexed filter + direction_id + remaining = %{direction_id: direction_id} + {:multi_filter, key, values, remaining} + + {[{primary_key, primary_values} | rest], direction_id} -> + # Multiple indexed filters - build remaining filter map + remaining = + rest + |> Enum.into(%{}) + |> maybe_put_direction(direction_id) + + {:multi_filter, primary_key, primary_values, remaining} + end + end + + defp maybe_put_direction(map, nil), do: map + defp maybe_put_direction(map, direction_id), do: Map.put(map, :direction_id, direction_id) + + # Fetch records using the appropriate index + defp fetch_by_index(values, :trip_ids), do: by_trip_ids(values) + defp fetch_by_index(values, :stop_ids), do: by_stop_ids(values) + defp fetch_by_index(values, :route_ids), do: by_route_ids(values) + defp fetch_by_index(values, :vehicle_ids), do: by_vehicle_ids(values) + + # Simple direction filter for single-filter cases (no MapSet overhead) + defp filter_by_direction(events, direction_id) do + Enum.filter(events, fn %StopEvent{direction_id: d_id} -> d_id == direction_id end) + end + + defp maybe_filter_direction(events, nil), do: events + defp maybe_filter_direction(events, direction_id), do: filter_by_direction(events, direction_id) + + # Apply additional filters using pre-computed MapSets (for multi-filter cases) + defp apply_additional_filters(events, filters) when map_size(filters) == 0, do: events + + defp apply_additional_filters(events, filters) do + # Build all filter sets upfront to avoid repeated MapSet creation + filter_specs = build_filter_specs(filters) + + Enum.filter(events, fn event -> + Enum.all?(filter_specs, fn spec -> matches_filter?(event, spec) end) + end) + end + + # Build filter specifications with pre-computed MapSets + defp build_filter_specs(filters) do + [] + |> maybe_add_filter_spec(filters[:trip_ids], :trip_id) + |> maybe_add_filter_spec(filters[:stop_ids], :stop_id) + |> maybe_add_filter_spec(filters[:route_ids], :route_id) + |> maybe_add_filter_spec(filters[:vehicle_ids], :vehicle_id) + |> maybe_add_direction_spec(filters[:direction_id]) + end + + defp maybe_add_filter_spec(specs, nil, _field), do: specs + defp maybe_add_filter_spec(specs, [], _field), do: specs + + defp maybe_add_filter_spec(specs, values, field) when is_list(values) do + [{:set, field, MapSet.new(values)} | specs] + end + + defp maybe_add_direction_spec(specs, nil), do: specs + + defp maybe_add_direction_spec(specs, direction_id), + do: [{:eq, :direction_id, direction_id} | specs] + + # Pattern match on filter specification type for efficient dispatch + defp matches_filter?(event, {:set, field, set}) do + MapSet.member?(set, Map.get(event, field)) + end + + defp matches_filter?(event, {:eq, field, value}) do + Map.get(event, field) == value + end +end diff --git a/apps/state/test/state/stop_event_test.exs b/apps/state/test/state/stop_event_test.exs new file mode 100644 index 00000000..f69ff15e --- /dev/null +++ b/apps/state/test/state/stop_event_test.exs @@ -0,0 +1,414 @@ +defmodule State.StopEventTest do + use ExUnit.Case + + alias Model.StopEvent + import State.StopEvent + + describe "filter_by/1" do + setup do + stop_event1 = %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + start_date: ~D[2026-02-24], + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + start_time: "10:00:00", + revenue: :REVENUE, + stop_id: "stop1", + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + } + + stop_event2 = %StopEvent{ + id: "trip1-route1-v1-2", + vehicle_id: "v1", + start_date: ~D[2026-02-24], + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + start_time: "10:00:00", + revenue: :REVENUE, + stop_id: "stop2", + stop_sequence: 2, + arrived: 1_771_967_286, + departed: 1_771_967_333 + } + + stop_event3 = %StopEvent{ + id: "trip2-route2-v2-3", + vehicle_id: "v2", + start_date: ~D[2026-02-24], + trip_id: "trip2", + direction_id: 1, + route_id: "route2", + start_time: "11:00:00", + revenue: :NON_REVENUE, + stop_id: "stop3", + stop_sequence: 1, + arrived: 1_771_968_343, + departed: nil + } + + State.StopEvent.new_state([stop_event1, stop_event2, stop_event3]) + + {:ok, %{event1: stop_event1, event2: stop_event2, event3: stop_event3}} + end + + test "returns all events with empty filters", %{event1: e1, event2: e2, event3: e3} do + result = filter_by(%{}) + assert length(result) == 3 + assert e1 in result + assert e2 in result + assert e3 in result + end + + test "filters by trip_id", %{event1: e1, event2: e2, event3: e3} do + result = filter_by(%{trip_ids: ["trip1"]}) + assert length(result) == 2 + assert e1 in result + assert e2 in result + refute e3 in result + end + + test "filters by multiple trip_ids", %{event1: e1, event2: e2, event3: e3} do + result = filter_by(%{trip_ids: ["trip1", "trip2"]}) + assert length(result) == 3 + assert e1 in result + assert e2 in result + assert e3 in result + end + + test "filters by stop_id", %{event1: e1, event2: _e2} do + result = filter_by(%{stop_ids: ["stop1"]}) + assert result == [e1] + end + + test "filters by multiple stop_ids", %{event1: e1, event3: e3} do + result = filter_by(%{stop_ids: ["stop1", "stop3"]}) + assert length(result) == 2 + assert e1 in result + assert e3 in result + end + + test "filters by route_id", %{event1: e1, event2: e2, event3: e3} do + result = filter_by(%{route_ids: ["route1"]}) + assert length(result) == 2 + assert e1 in result + assert e2 in result + refute e3 in result + end + + test "filters by multiple route_ids", %{event1: e1, event2: e2, event3: e3} do + result = filter_by(%{route_ids: ["route1", "route2"]}) + assert length(result) == 3 + assert e1 in result + assert e2 in result + assert e3 in result + end + + test "filters by vehicle_id", %{event1: e1, event2: e2, event3: e3} do + result = filter_by(%{vehicle_ids: ["v1"]}) + assert length(result) == 2 + assert e1 in result + assert e2 in result + refute e3 in result + end + + test "filters by multiple vehicle_ids", %{event1: e1, event2: e2, event3: e3} do + result = filter_by(%{vehicle_ids: ["v1", "v2"]}) + assert length(result) == 3 + assert e1 in result + assert e2 in result + assert e3 in result + end + + test "filters by direction_id", %{event1: e1, event2: e2, event3: e3} do + result = filter_by(%{direction_id: 0}) + assert length(result) == 2 + assert e1 in result + assert e2 in result + refute e3 in result + + result = filter_by(%{direction_id: 1}) + assert result == [e3] + end + + test "filters by trip_id and stop_id", %{event1: e1} do + result = filter_by(%{trip_ids: ["trip1"], stop_ids: ["stop1"]}) + assert result == [e1] + end + + test "filters by route_id and direction_id", %{event1: e1, event2: e2} do + result = filter_by(%{route_ids: ["route1"], direction_id: 0}) + assert length(result) == 2 + assert e1 in result + assert e2 in result + end + + test "filters by trip_id, stop_id, and direction_id simultaneously", %{event1: e1} do + result = filter_by(%{trip_ids: ["trip1"], stop_ids: ["stop1"], direction_id: 0}) + assert result == [e1] + end + + test "filters by route_id, stop_id, and direction_id simultaneously", %{event1: e1} do + result = filter_by(%{route_ids: ["route1"], stop_ids: ["stop1"], direction_id: 0}) + assert result == [e1] + end + + test "filters by multiple values across all filter types" do + # Add more test data for this test + stop_event4 = %StopEvent{ + id: "trip2-route1-v2-1", + vehicle_id: "v2", + start_date: ~D[2026-02-24], + trip_id: "trip2", + direction_id: 0, + route_id: "route1", + start_time: "12:00:00", + revenue: :REVENUE, + stop_id: "stop1", + stop_sequence: 1, + arrived: 1_771_969_000, + departed: 1_771_969_100 + } + + all_events = State.StopEvent.all() + State.StopEvent.new_state(all_events ++ [stop_event4]) + + # Filter for trip1 OR trip2, route1, stop1, direction 0 + result = + filter_by(%{ + trip_ids: ["trip1", "trip2"], + route_ids: ["route1"], + stop_ids: ["stop1"], + direction_id: 0 + }) + + # Should return both trip1-route1-stop1 and trip2-route1-stop1 + assert length(result) == 2 + assert Enum.all?(result, fn e -> e.route_id == "route1" end) + assert Enum.all?(result, fn e -> e.stop_id == "stop1" end) + assert Enum.all?(result, fn e -> e.direction_id == 0 end) + assert Enum.all?(result, fn e -> e.trip_id in ["trip1", "trip2"] end) + end + + test "returns empty when combining filters that match no records", %{event1: _e1, event2: _e2} do + # event1 and event2 both have route1, but only event1 has stop1 + # Filtering for route1, stop2, and direction_id 1 should return nothing + result = filter_by(%{route_ids: ["route1"], stop_ids: ["stop2"], direction_id: 1}) + assert result == [] + end + + test "returns empty list for non-matching filters" do + assert filter_by(%{trip_ids: ["nonexistent"]}) == [] + assert filter_by(%{stop_ids: ["nonexistent"]}) == [] + assert filter_by(%{route_ids: ["nonexistent"]}) == [] + assert filter_by(%{direction_id: 2}) == [] + end + + test "returns empty list for empty id lists" do + assert filter_by(%{trip_ids: []}) == [] + assert filter_by(%{stop_ids: []}) == [] + assert filter_by(%{route_ids: []}) == [] + end + end + + describe "filter_by/2 with pagination" do + setup do + events = + for i <- 1..10 do + %StopEvent{ + id: "trip#{i}-route1-v1-#{i}", + vehicle_id: "v1", + start_date: ~D[2026-02-24], + trip_id: "trip#{i}", + direction_id: rem(i, 2), + route_id: "route1", + start_time: "10:00:00", + revenue: :REVENUE, + stop_id: "stop#{i}", + stop_sequence: i, + arrived: 1_771_966_486 + i * 100, + departed: 1_771_967_246 + i * 100 + } + end + + State.StopEvent.new_state(events) + {:ok, %{events: events}} + end + + test "supports limit option" do + {result, _pagination} = filter_by(%{route_ids: ["route1"]}, limit: 3) + assert length(result) == 3 + end + + test "supports offset option" do + all_results = filter_by(%{route_ids: ["route1"]}) + {offset_results, _pagination} = filter_by(%{route_ids: ["route1"]}, offset: 2, limit: 20) + + assert length(offset_results) == length(all_results) - 2 + end + + test "supports limit and offset together" do + {result, _pagination} = filter_by(%{route_ids: ["route1"]}, limit: 2, offset: 3) + assert length(result) == 2 + end + + test "supports order_by option" do + # Order by stop_sequence ascending + {result, _pagination} = + filter_by(%{route_ids: ["route1"]}, order_by: {:stop_sequence, :asc}, limit: 20) + + assert length(result) == 10 + assert hd(result).stop_sequence == 1 + assert List.last(result).stop_sequence == 10 + end + + test "combines pagination with filtering" do + # Filter by direction_id 0 (odd numbered trips: 1,3,5,7,9), limit 2 + {result, _pagination} = filter_by(%{direction_id: 0}, limit: 2) + assert length(result) == 2 + assert Enum.all?(result, fn e -> e.direction_id == 0 end) + end + end + + describe "filter_by/2 selectivity optimization" do + setup do + # Create data where vehicle_id is most selective (1 match), + # trip_id is medium (3 matches), route_id is least selective (5 matches) + events = [ + %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + trip_id: "trip1", + route_id: "route1", + stop_id: "stop1", + direction_id: 0, + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + }, + %StopEvent{ + id: "trip1-route1-v2-2", + vehicle_id: "v2", + trip_id: "trip1", + route_id: "route1", + stop_id: "stop2", + direction_id: 0, + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 2, + arrived: 1_771_966_586, + departed: 1_771_967_346 + }, + %StopEvent{ + id: "trip1-route1-v3-3", + vehicle_id: "v3", + trip_id: "trip1", + route_id: "route1", + stop_id: "stop3", + direction_id: 0, + start_date: ~D[2026-02-24], + start_time: "10:00:00", + revenue: :REVENUE, + stop_sequence: 3, + arrived: 1_771_966_686, + departed: 1_771_967_446 + }, + %StopEvent{ + id: "trip2-route1-v4-1", + vehicle_id: "v4", + trip_id: "trip2", + route_id: "route1", + stop_id: "stop1", + direction_id: 1, + start_date: ~D[2026-02-24], + start_time: "11:00:00", + revenue: :REVENUE, + stop_sequence: 1, + arrived: 1_771_968_000, + departed: 1_771_968_100 + }, + %StopEvent{ + id: "trip2-route1-v5-2", + vehicle_id: "v5", + trip_id: "trip2", + route_id: "route1", + stop_id: "stop2", + direction_id: 1, + start_date: ~D[2026-02-24], + start_time: "11:00:00", + revenue: :REVENUE, + stop_sequence: 2, + arrived: 1_771_968_100, + departed: 1_771_968_200 + } + ] + + State.StopEvent.new_state(events) + {:ok, %{}} + end + + test "selects most selective filter when multiple filters provided" do + # vehicle_id (1 match) should be chosen over route_id (5 matches) + result = filter_by(%{vehicle_ids: ["v1"], route_ids: ["route1"]}) + assert length(result) == 1 + assert hd(result).vehicle_id == "v1" + end + + test "handles single filter efficiently without MapSet overhead" do + # Single filter should work without creating MapSets + result = filter_by(%{vehicle_ids: ["v1"]}) + assert length(result) == 1 + assert hd(result).vehicle_id == "v1" + end + + test "handles single filter with direction_id" do + # Single indexed filter + direction should work efficiently + result = filter_by(%{vehicle_ids: ["v1"], direction_id: 0}) + assert length(result) == 1 + assert hd(result).vehicle_id == "v1" + assert hd(result).direction_id == 0 + end + + test "handles direction_id only filter" do + # Direction only should still work (full scan) + result = filter_by(%{direction_id: 0}) + assert length(result) == 3 + assert Enum.all?(result, fn e -> e.direction_id == 0 end) + end + end + + describe "by_id/1" do + test "returns stop event by id" do + stop_event = %StopEvent{ + id: "trip1-route1-v1-1", + vehicle_id: "v1", + start_date: ~D[2026-02-24], + trip_id: "trip1", + direction_id: 0, + route_id: "route1", + start_time: "10:00:00", + revenue: :REVENUE, + stop_id: "stop1", + stop_sequence: 1, + arrived: 1_771_966_486, + departed: 1_771_967_246 + } + + State.StopEvent.new_state([stop_event]) + + assert by_id("trip1-route1-v1-1") == stop_event + end + + test "returns nil for non-existent id" do + assert by_id("nonexistent") == nil + end + end +end diff --git a/apps/state_mediator/config/config.exs b/apps/state_mediator/config/config.exs index 56731792..21791154 100644 --- a/apps/state_mediator/config/config.exs +++ b/apps/state_mediator/config/config.exs @@ -15,6 +15,11 @@ config :state_mediator, :commuter_rail_crowding, s3_object: {:system, "CR_CROWDING_S3_OBJECT"}, source: {:system, "CR_CROWING_SOURCE", "s3"} +config :state_mediator, :stop_events, + enabled: {:system, "STOP_EVENTS_ENABLED", "false"}, + s3_bucket: {:system, "STOP_EVENTS_S3_BUCKET"}, + s3_object: {:system, "STOP_EVENTS_S3_OBJECT"} + config :state_mediator, Realtime, gtfs_url: {:system, "MBTA_GTFS_URL", "https://cdn.mbta.com/MBTA_GTFS.zip"}, alert_url: {:system, "ALERT_URL", "https://cdn.mbta.com/realtime/Alerts_enhanced.json"} diff --git a/apps/state_mediator/lib/state_mediator.ex b/apps/state_mediator/lib/state_mediator.ex index 64bff4a5..89a61492 100644 --- a/apps/state_mediator/lib/state_mediator.ex +++ b/apps/state_mediator/lib/state_mediator.ex @@ -14,7 +14,8 @@ defmodule StateMediator do crowding_children( app_value(:commuter_rail_crowding, :enabled) == "true", crowding_source - ) + ) ++ + stop_event_children(app_value(:stop_events, :enabled) == "true") # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # for other strategies and supported options @@ -155,6 +156,32 @@ defmodule StateMediator do [] end + @spec stop_event_children(boolean()) :: [ + :supervisor.child_spec() | {module(), term()} | module() + ] + defp stop_event_children(true) do + Logger.info("#{__MODULE__} STOP_EVENTS_ENABLED=true") + + [ + { + StateMediator.S3Mediator, + [ + spec_id: :stop_event_mediator, + bucket_arn: app_value(:stop_events, :s3_bucket), + object: app_value(:stop_events, :s3_object), + interval: 3 * 1_000, + sync_timeout: 30_000, + state: State.StopEvent + ] + } + ] + end + + defp stop_event_children(false) do + Logger.info("#{__MODULE__} STOP_EVENTS_ENABLED=false") + [] + end + @doc false def source_url(mod) do case Application.get_env(:state_mediator, mod)[:source] do diff --git a/config/runtime.exs b/config/runtime.exs index 624b915b..23b63e2d 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -82,6 +82,11 @@ if is_prod? and is_release? do s3_object: System.fetch_env!("CR_CROWDING_S3_OBJECT"), source: System.fetch_env!("CR_CROWDING_SOURCE") + config :state_mediator, :stop_events, + enabled: System.get_env("STOP_EVENTS_ENABLED", "false"), + s3_bucket: System.get_env("STOP_EVENTS_S3_BUCKET"), + s3_object: System.get_env("STOP_EVENTS_S3_OBJECT") + config :recaptcha, enabled: true, public_key: System.fetch_env!("RECAPTCHA_PUBLIC_KEY"),