Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions samcli/commands/sync/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def do_cli(
region=region,
profile=profile,
use_json=False,
force_upload=True,
force_upload=False,
) as package_context:
# 500ms of sleep time between stack checks and describe stack events.
DEFAULT_POLL_DELAY = 0.5
Expand Down Expand Up @@ -395,7 +395,7 @@ def do_cli(
fail_on_empty_changeset=True,
confirm_changeset=False,
use_changeset=False,
force_upload=True,
force_upload=False,
signing_profiles=None,
disable_rollback=False,
poll_delay=poll_delay,
Expand Down
2 changes: 1 addition & 1 deletion samcli/lib/providers/sam_base_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def _warn_code_extraction(resource_type: str, resource_name: str, code_property:

@staticmethod
def _warn_imageuri_extraction(resource_type: str, resource_name: str, image_property: str) -> None:
LOG.warning(
LOG.debug(
"The resource %s '%s' has specified ECR registry image for %s. "
"It will not be built and SAM CLI does not support invoking it locally.",
resource_type,
Expand Down
39 changes: 31 additions & 8 deletions samcli/lib/sync/flows/layer_sync_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast

from samcli.lib.build.app_builder import ApplicationBuilder, ApplicationBuildResult
from samcli.lib.package.s3_uploader import S3Uploader
from samcli.lib.package.utils import make_zip_with_lambda_permissions
from samcli.lib.providers.provider import Function, LayerVersion, ResourceIdentifier, Stack, get_resource_by_id
from samcli.lib.providers.sam_function_provider import SamFunctionProvider
Expand All @@ -31,6 +32,7 @@

LOG = logging.getLogger(__name__)
FUNCTION_SLEEP = 1 # used to wait for lambda function configuration last update to be successful
MAXIMUM_LAYER_ZIP_SIZE = 50 * 1024 * 1024 # 50MB limit for Lambda direct ZIP upload


def get_latest_layer_version(lambda_client: Any, layer_arn: str) -> int:
Expand All @@ -47,6 +49,7 @@ class AbstractLayerSyncFlow(SyncFlow, ABC):
"""

_lambda_client: Any
_s3_client: Any
_layer_arn: Optional[str]
_old_layer_version: Optional[int]
_new_layer_version: Optional[int]
Expand Down Expand Up @@ -83,6 +86,7 @@ def __init__(
def set_up(self) -> None:
super().set_up()
self._lambda_client = self._boto_client("lambda")
self._s3_client = self._boto_client("s3")

@property
def sync_state_identifier(self) -> str:
Expand Down Expand Up @@ -110,11 +114,10 @@ def compare_remote(self) -> bool:

def sync(self) -> None:
"""
Publish new layer version, and delete the existing (old) one
Publish new layer version
"""
LOG.debug("%sPublishing new Layer Version", self.log_prefix)
self._new_layer_version = self._publish_new_layer_version()
self._delete_old_layer_version()

def gather_dependencies(self) -> List[SyncFlow]:
if self._zip_file and os.path.exists(self._zip_file):
Expand Down Expand Up @@ -149,13 +152,33 @@ def _publish_new_layer_version(self) -> int:
Publish new layer version and keep new layer version arn so that we can update related functions
"""
compatible_runtimes = self._get_compatible_runtimes()
with open(cast(str, self._zip_file), "rb") as zip_file:
data = zip_file.read()
layer_publish_result = self._lambda_client.publish_layer_version(
LayerName=self._layer_arn, Content={"ZipFile": data}, CompatibleRuntimes=compatible_runtimes
zip_file_path = cast(str, self._zip_file)
zip_file_size = os.path.getsize(zip_file_path)

if zip_file_size < MAXIMUM_LAYER_ZIP_SIZE:
LOG.debug("%sUploading Layer directly", self.log_prefix)
with open(zip_file_path, "rb") as zip_file:
data = zip_file.read()
content: Dict[str, Any] = {"ZipFile": data}
else:
LOG.debug("%sUploading Layer through S3", self.log_prefix)
uploader = S3Uploader(
s3_client=self._s3_client,
bucket_name=self._deploy_context.s3_bucket,
prefix=self._deploy_context.s3_prefix,
kms_key_id=self._deploy_context.kms_key_id,
force_upload=True,
no_progressbar=True,
)
LOG.debug("%sPublish Layer Version Result %s", self.log_prefix, layer_publish_result)
return int(layer_publish_result.get("Version"))
s3_url = uploader.upload_with_dedup(zip_file_path)
s3_key = s3_url[5:].split("/", 1)[1]
content = {"S3Bucket": self._deploy_context.s3_bucket, "S3Key": s3_key}

layer_publish_result = self._lambda_client.publish_layer_version(
LayerName=self._layer_arn, Content=content, CompatibleRuntimes=compatible_runtimes
)
LOG.debug("%sPublish Layer Version Result %s", self.log_prefix, layer_publish_result)
return int(layer_publish_result.get("Version"))

def _delete_old_layer_version(self) -> None:
"""
Expand Down
26 changes: 23 additions & 3 deletions samcli/lib/sync/watch_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@
from watchdog.events import EVENT_TYPE_MODIFIED, EVENT_TYPE_OPENED, FileSystemEvent

from samcli.lib.providers.exceptions import InvalidTemplateFile, MissingCodeUri, MissingLocalDefinition
from samcli.lib.providers.provider import ResourceIdentifier, Stack, get_all_resource_ids
from samcli.lib.providers.provider import ResourceIdentifier, Stack, get_all_resource_ids, get_full_path
from samcli.lib.samlib.resource_metadata_normalizer import ResourceMetadataNormalizer
from samcli.lib.providers.sam_stack_provider import SamLocalStackProvider
from samcli.lib.sync.continuous_sync_flow_executor import ContinuousSyncFlowExecutor
from samcli.lib.sync.exceptions import InfraSyncRequiredError, MissingPhysicalResourceError, SyncFlowException
from samcli.lib.sync.infra_sync_executor import InfraSyncExecutor, InfraSyncResult
from samcli.lib.sync.sync_flow_factory import SyncFlowFactory
from samcli.lib.utils.code_trigger_factory import CodeTriggerFactory
from samcli.lib.utils.resources import AWS_LAMBDA_LAYERVERSION, AWS_SERVERLESS_LAYERVERSION
from samcli.lib.utils.colors import Colored, Colors
from samcli.lib.utils.path_observer import HandlerObserver
from samcli.lib.utils.resource_trigger import OnChangeCallback, TemplateTrigger
from samcli.local.lambdafn.exceptions import ResourceNotFound
from samcli.local.lambdafn.exceptions import ResourceNotFound, FunctionNotFound

if TYPE_CHECKING: # pragma: no cover
from samcli.commands.build.build_context import BuildContext
Expand Down Expand Up @@ -152,7 +154,7 @@ def _add_code_triggers(self) -> None:
extra=dict(markup=True),
)
continue
except ResourceNotFound:
except (ResourceNotFound, FunctionNotFound):
LOG.warning(
self._color.color_log(
msg="CodeTrigger not created as %s is not found or is with a S3 Location.",
Expand Down Expand Up @@ -224,6 +226,9 @@ def start(self) -> None:
self.queue_infra_sync()
if self._disable_infra_syncs:
self._start_sync()
if self._stacks:
resource_ids = self._get_non_layer_resource_ids(self._stacks)
self._queue_up_code_syncs(resource_ids)
LOG.info(
self._color.color_log(msg="Sync watch started.", color=Colors.SUCCESS), extra=dict(markup=True)
)
Expand Down Expand Up @@ -318,6 +323,21 @@ def _queue_up_code_syncs(self, resource_ids_with_code_sync: Set[ResourceIdentifi
if sync_flow:
self._sync_flow_executor.add_delayed_sync_flow(sync_flow)

@staticmethod
def _get_non_layer_resource_ids(stacks: List[Stack]) -> Set[ResourceIdentifier]:
"""Get all resource IDs excluding layer resources.
Layer builds can be very slow and are skipped on initial startup sync.
They will still sync when file changes are detected by the watcher.
"""
layer_types = {AWS_LAMBDA_LAYERVERSION, AWS_SERVERLESS_LAYERVERSION}
resource_ids: Set[ResourceIdentifier] = set()
for stack in stacks:
for logical_id, resource in stack.resources.items():
if resource.get("Type", "") not in layer_types:
resource_id = ResourceMetadataNormalizer.get_resource_id(resource, logical_id)
resource_ids.add(ResourceIdentifier(get_full_path(stack.stack_path, resource_id)))
return resource_ids

def _on_code_change_wrapper(self, resource_id: ResourceIdentifier) -> OnChangeCallback:
"""Wrapper method that generates a callback for code changes.

Expand Down
68 changes: 58 additions & 10 deletions tests/unit/lib/sync/flows/test_layer_sync_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def test_setup(self, client_provider_mock):
self.layer_sync_flow.set_up()

patched_super_setup.assert_called_once()
client_provider_mock.return_value.assert_called_with("lambda")
client_provider_mock.return_value.assert_any_call("lambda")
client_provider_mock.return_value.assert_any_call("s3")

@patch("samcli.lib.sync.sync_flow.get_boto_client_provider_from_session_with_config")
@patch("samcli.lib.sync.flows.layer_sync_flow.get_resource_by_id")
Expand All @@ -63,7 +64,8 @@ def test_setup_with_serverless_layer(self, get_resource_by_id_mock, client_provi
self.layer_sync_flow.set_up()

patched_super_setup.assert_called_once()
client_provider_mock.return_value.assert_called_with("lambda")
client_provider_mock.return_value.assert_any_call("lambda")
client_provider_mock.return_value.assert_any_call("s3")

self.assertEqual(self.layer_sync_flow._layer_arn, "layer_version_arn")

Expand Down Expand Up @@ -172,17 +174,18 @@ def test_compare_remote(self, patched_get_latest_layer_version):

def test_sync(self):
with patch.object(self.layer_sync_flow, "_publish_new_layer_version") as patched_publish_new_layer_version:
with patch.object(self.layer_sync_flow, "_delete_old_layer_version") as patched_delete_old_layer_version:
given_layer_version = Mock()
patched_publish_new_layer_version.return_value = given_layer_version
given_layer_version = Mock()
patched_publish_new_layer_version.return_value = given_layer_version

self.layer_sync_flow.sync()
self.assertEqual(self.layer_sync_flow._new_layer_version, given_layer_version)

self.layer_sync_flow.sync()
self.assertEqual(self.layer_sync_flow._new_layer_version, given_layer_version)
patched_publish_new_layer_version.assert_called_once()

patched_publish_new_layer_version.assert_called_once()
patched_delete_old_layer_version.assert_called_once()
@patch("samcli.lib.sync.flows.layer_sync_flow.os.path.getsize")
def test_publish_new_layer_version(self, patched_getsize):
patched_getsize.return_value = 1024 # Small file, direct upload

def test_publish_new_layer_version(self):
given_layer_name = Mock()

given_lambda_client = Mock()
Expand Down Expand Up @@ -212,6 +215,51 @@ def test_publish_new_layer_version(self):

self.assertEqual(result_version, given_publish_layer_result.get("Version"))

@patch("samcli.lib.sync.flows.layer_sync_flow.S3Uploader")
@patch("samcli.lib.sync.flows.layer_sync_flow.os.path.getsize")
def test_publish_new_layer_version_via_s3(self, patched_getsize, patched_s3_uploader):
patched_getsize.return_value = 60 * 1024 * 1024 # 60MB, over limit

given_layer_name = Mock()
given_lambda_client = Mock()
given_s3_client = Mock()
self.layer_sync_flow._lambda_client = given_lambda_client
self.layer_sync_flow._s3_client = given_s3_client
self.layer_sync_flow._zip_file = "/tmp/test.zip"
self.layer_sync_flow._layer_arn = given_layer_name

self.deploy_context_mock.s3_bucket = "my-bucket"
self.deploy_context_mock.s3_prefix = "my-prefix"
self.deploy_context_mock.kms_key_id = "my-kms-key"

uploader_mock = patched_s3_uploader.return_value
uploader_mock.upload_with_dedup.return_value = "s3://my-bucket/my-prefix/abc123.zip"

with patch.object(self.layer_sync_flow, "_get_resource") as patched_get_resource:
given_publish_layer_result = {"Version": 25}
given_lambda_client.publish_layer_version.return_value = given_publish_layer_result

given_layer_resource = Mock()
patched_get_resource.return_value = given_layer_resource

result_version = self.layer_sync_flow._publish_new_layer_version()

patched_s3_uploader.assert_called_once_with(
s3_client=given_s3_client,
bucket_name="my-bucket",
prefix="my-prefix",
kms_key_id="my-kms-key",
force_upload=True,
no_progressbar=True,
)
uploader_mock.upload_with_dedup.assert_called_once_with("/tmp/test.zip")
given_lambda_client.publish_layer_version.assert_called_with(
LayerName=given_layer_name,
Content={"S3Bucket": "my-bucket", "S3Key": "my-prefix/abc123.zip"},
CompatibleRuntimes=given_layer_resource.get("Properties", {}).get("CompatibleRuntimes", []),
)
self.assertEqual(result_version, 25)

def test_delete_old_layer_version(self):
given_layer_name = Mock()
given_layer_version = Mock()
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/lib/sync/test_watch_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,34 @@ def test_start(self):
self.path_observer.stop.assert_called_once_with()
stop_code_sync_mock.assert_called_once_with()

def test_start_with_code_flag(self):
_start_sync_mock = MagicMock()
_start_mock = MagicMock()
stop_code_sync_mock = MagicMock()
queue_up_code_syncs_mock = MagicMock()
get_non_layer_mock = MagicMock()

self.watch_manager._disable_infra_syncs = True
self.watch_manager._start_sync = _start_sync_mock
self.watch_manager._start = _start_mock
self.watch_manager._stop_code_sync = stop_code_sync_mock
self.watch_manager._queue_up_code_syncs = queue_up_code_syncs_mock
self.watch_manager._get_non_layer_resource_ids = get_non_layer_mock
self.watch_manager._stacks = [MagicMock()]

resource_ids = {ResourceIdentifier("Function")}
get_non_layer_mock.return_value = resource_ids

_start_mock.side_effect = KeyboardInterrupt()

self.watch_manager.start()

_start_sync_mock.assert_called_once()
get_non_layer_mock.assert_called_once_with(self.watch_manager._stacks)
queue_up_code_syncs_mock.assert_called_once_with(resource_ids)
self.path_observer.stop.assert_called_once_with()
stop_code_sync_mock.assert_called_once_with()

@parameterized.expand([(True, {ResourceIdentifier("Function")}), (False, set())])
@patch("samcli.lib.sync.watch_manager.time.sleep")
def test__start(self, executed, code_sync_resources, sleep_mock):
Expand Down