Price Monitoring Guide

Track competitor prices and automate pricing strategies

What You'll Learn

  • Set up automated price tracking systems
  • Monitor competitor pricing in real-time
  • Detect price changes and trends
  • Build dynamic pricing strategies
  • Generate price alerts and reports

Basic Price Tracking

track_prices.py
python
from scrapehub import ScrapeHubClient
import pandas as pd
from datetime import datetime

client = ScrapeHubClient(api_key="sk_live_xxxx_449x")

# Products to monitor
products = [
    "https://store.com/product/laptop-pro-15",
    "https://store.com/product/smartphone-x",
    "https://store.com/product/tablet-plus"
]

# Track current prices
price_data = []

for url in products:
    result = client.scrape(
        url=url,
        engine="neural-x1"
    )

    if result.data:
        product = result.data[0]
        price_data.append({
            'timestamp': datetime.now(),
            'product_name': product['name'],
            'url': url,
            'price': product['price'],
            'currency': product.get('currency', 'USD'),
            'in_stock': product.get('in_stock', True)
        })

# Save to CSV
df = pd.DataFrame(price_data)
df.to_csv('price_snapshot.csv', index=False)

print(f"Tracked {len(price_data)} products")
for item in price_data:
    print(f"{item['product_name']}: {item['currency']} {item['price']}")

Automated Price Monitor with Scheduling

automated_monitor.py
python
from scrapehub import ScrapeHubClient
import pandas as pd
from datetime import datetime
import schedule
import time

class PriceMonitor:
    def __init__(self, api_key, products):
        self.client = ScrapeHubClient(api_key=api_key)
        self.products = products
        self.history_file = 'price_history.csv'

    def check_prices(self):
        """Check current prices for all products"""
        print(f"\n[{datetime.now()}] Checking prices...")

        current_prices = []

        for url in self.products:
            try:
                result = self.client.scrape(
                    url=url,
                    engine="neural-x1"
                )

                if result.data:
                    product = result.data[0]
                    current_prices.append({
                        'timestamp': datetime.now(),
                        'url': url,
                        'name': product['name'],
                        'price': float(product['price']),
                        'in_stock': product.get('in_stock', True)
                    })

                    print(f"  ✓ {product['name']}: ${product['price']}")

            except Exception as e:
                print(f"  ✗ Error scraping {url}: {e}")

        # Save to history
        self.save_history(current_prices)

        # Check for changes
        self.detect_changes(current_prices)

        return current_prices

    def save_history(self, new_data):
        """Append to price history file"""
        df_new = pd.DataFrame(new_data)

        try:
            df_existing = pd.read_csv(self.history_file)
            df_combined = pd.concat([df_existing, df_new], ignore_index=True)
        except FileNotFoundError:
            df_combined = df_new

        df_combined.to_csv(self.history_file, index=False)
        print(f"  Saved {len(new_data)} records to {self.history_file}")

    def detect_changes(self, current_prices):
        """Detect price changes since last check"""
        try:
            df_history = pd.read_csv(self.history_file)
            df_history['timestamp'] = pd.to_datetime(df_history['timestamp'])

            for current in current_prices:
                # Get last recorded price for this URL
                url_history = df_history[df_history['url'] == current['url']]

                if len(url_history) > 1:
                    url_history = url_history.sort_values('timestamp')
                    previous = url_history.iloc[-2]
                    current_price = current['price']
                    previous_price = previous['price']

                    if current_price != previous_price:
                        change = current_price - previous_price
                        percent = (change / previous_price) * 100

                        print(f"  🔔 PRICE CHANGE: {current['name']}")
                        print(f"     Previous: $${previous_price:.2f}")
                        print(f"     Current:  $${current_price:.2f}")
                        print(f"     Change:   $${change:.2f} ({percent:+.1f}%)")

                        # Send alert (email, Slack, etc.)
                        self.send_alert(current, previous_price, current_price)

        except Exception as e:
            print(f"  Error detecting changes: {e}")

    def send_alert(self, product, old_price, new_price):
        """Send alert for price change"""
        # Implement your alert logic here
        # Examples: email, Slack, Discord, SMS, etc.
        pass

    def start_monitoring(self, interval_hours=1):
        """Start continuous monitoring"""
        print(f"Starting price monitor (checking every {interval_hours} hour(s))")

        # Run immediately
        self.check_prices()

        # Schedule periodic checks
        schedule.every(interval_hours).hours.do(self.check_prices)

        while True:
            schedule.run_pending()
            time.sleep(60)

# Usage
products_to_monitor = [
    "https://competitor1.com/product/item-a",
    "https://competitor2.com/product/item-a",
    "https://competitor3.com/product/item-a"
]

monitor = PriceMonitor("sk_live_xxxx_449x", products_to_monitor)

# Check every hour
monitor.start_monitoring(interval_hours=1)

Multi-Competitor Price Comparison

competitor_comparison.py
python
from scrapehub import AsyncScrapeHubClient
import asyncio
import pandas as pd

async def compare_competitor_prices(product_name, urls):
    """Compare prices across multiple competitors"""
    client = AsyncScrapeHubClient(api_key="sk_live_xxxx_449x")

    # Scrape all competitors concurrently
    tasks = [client.scrape(url=url, engine="neural-x1") for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Collect prices
    prices = []

    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            print(f"Error scraping {url}: {result}")
            continue

        if result.data:
            product = result.data[0]
            prices.append({
                'competitor': url.split('/')[2],  # Extract domain
                'name': product['name'],
                'price': float(product['price']),
                'in_stock': product.get('in_stock', True),
                'url': url
            })

    return pd.DataFrame(prices)

# Product URLs from different competitors
laptop_urls = [
    "https://store-a.com/laptop-pro-15",
    "https://store-b.com/laptop-pro-15",
    "https://store-c.com/laptop-pro-15",
    "https://store-d.com/laptop-pro-15"
]

# Compare prices
df = asyncio.run(compare_competitor_prices("Laptop Pro 15", laptop_urls))

# Analysis
print("\n=== Price Comparison ===")
print(df[['competitor', 'price', 'in_stock']])

print(f"\nLowest price: ${df['price'].min():.2f} at {df.loc[df['price'].idxmin(), 'competitor']}")
print(f"Highest price: ${df['price'].max():.2f} at {df.loc[df['price'].idxmax(), 'competitor']}")
print(f"Average price: ${df['price'].mean():.2f}")
print(f"Price spread: ${df['price'].max() - df['price'].min():.2f}")

# Export
df.to_excel('competitor_prices.xlsx', index=False)

Price History & Trends

price_trends.py
python
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

class PriceTrendAnalyzer:
    def __init__(self, history_file='price_history.csv'):
        self.df = pd.read_csv(history_file)
        self.df['timestamp'] = pd.to_datetime(self.df['timestamp'])

    def plot_price_trend(self, product_url, days=30):
        """Plot price trend for a product"""
        # Filter data
        cutoff = datetime.now() - timedelta(days=days)
        product_data = self.df[
            (self.df['url'] == product_url) &
            (self.df['timestamp'] >= cutoff)
        ].sort_values('timestamp')

        if len(product_data) == 0:
            print(f"No data found for {product_url}")
            return

        # Create plot
        plt.figure(figsize=(12, 6))
        plt.plot(product_data['timestamp'], product_data['price'], marker='o')
        plt.title(f"Price Trend: {product_data.iloc[0]['name']}")
        plt.xlabel('Date')
        plt.ylabel('Price ($)')
        plt.grid(True, alpha=0.3)
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.savefig(f'price_trend_{product_url.split("/")[-1]}.png')
        print(f"Trend chart saved")

    def get_statistics(self, product_url, days=30):
        """Calculate price statistics"""
        cutoff = datetime.now() - timedelta(days=days)
        product_data = self.df[
            (self.df['url'] == product_url) &
            (self.df['timestamp'] >= cutoff)
        ]

        if len(product_data) == 0:
            return None

        return {
            'product_name': product_data.iloc[0]['name'],
            'current_price': product_data.iloc[-1]['price'],
            'min_price': product_data['price'].min(),
            'max_price': product_data['price'].max(),
            'avg_price': product_data['price'].mean(),
            'std_dev': product_data['price'].std(),
            'price_changes': len(product_data[product_data['price'] != product_data['price'].shift()]),
            'volatility': (product_data['price'].std() / product_data['price'].mean()) * 100
        }

    def find_best_time_to_buy(self, product_url):
        """Analyze when prices are typically lowest"""
        product_data = self.df[self.df['url'] == product_url].copy()

        if len(product_data) == 0:
            return None

        # Group by day of week
        product_data['day_of_week'] = product_data['timestamp'].dt.day_name()
        avg_by_day = product_data.groupby('day_of_week')['price'].mean().sort_values()

        # Group by hour
        product_data['hour'] = product_data['timestamp'].dt.hour
        avg_by_hour = product_data.groupby('hour')['price'].mean().sort_values()

        return {
            'best_day': avg_by_day.index[0],
            'best_day_avg': avg_by_day.iloc[0],
            'best_hour': avg_by_hour.index[0],
            'best_hour_avg': avg_by_hour.iloc[0]
        }

# Usage
analyzer = PriceTrendAnalyzer('price_history.csv')

product_url = "https://store.com/product/laptop-pro-15"

# Get statistics
stats = analyzer.get_statistics(product_url, days=30)
print("\n=== Price Statistics (Last 30 Days) ===")
print(f"Product: {stats['product_name']}")
print(f"Current: ${stats['current_price']:.2f}")
print(f"Min:     ${stats['min_price']:.2f}")
print(f"Max:     ${stats['max_price']:.2f}")
print(f"Average: ${stats['avg_price']:.2f}")
print(f"Volatility: {stats['volatility']:.1f}%")
print(f"Price changes: {stats['price_changes']}")

# Best time to buy
best_time = analyzer.find_best_time_to_buy(product_url)
if best_time:
    print(f"\nBest day to buy: {best_time['best_day']} (avg ${best_time['best_day_avg']:.2f})")

# Plot trend
analyzer.plot_price_trend(product_url, days=30)

Real-time Alerts with Webhooks

webhook_alerts.py
python
from scrapehub import ScrapeHubClient
from flask import Flask, request, jsonify
import requests

client = ScrapeHubClient(api_key="sk_live_xxxx_449x")
app = Flask(__name__)

# Price thresholds
ALERT_RULES = {
    "https://store.com/product/laptop": {
        "target_price": 999.99,
        "alert_on_drop": True
    },
    "https://store.com/product/phone": {
        "target_price": 699.99,
        "alert_on_drop": True
    }
}

def create_monitoring_job(url):
    """Create async job with webhook"""
    job = client.create_job(
        url=url,
        engine="neural-x1",
        webhook_url="https://your-server.com/price-webhook"
    )
    return job.id

@app.route('/price-webhook', methods=['POST'])
def handle_price_webhook():
    """Handle webhook from ScrapeHub"""
    data = request.json

    if data['event'] == 'job.completed':
        job_id = data['job_id']

        # Fetch results
        job = client.get_job(job_id)
        results = job.get_results()

        if results:
            product = results[0]
            url = job.target_url
            current_price = float(product['price'])

            # Check alert rules
            if url in ALERT_RULES:
                rule = ALERT_RULES[url]

                if rule['alert_on_drop'] and current_price <= rule['target_price']:
                    send_price_alert(
                        product_name=product['name'],
                        current_price=current_price,
                        target_price=rule['target_price'],
                        url=url
                    )

    return jsonify({'status': 'received'}), 200

def send_price_alert(product_name, current_price, target_price, url):
    """Send alert via Slack"""
    slack_webhook = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

    message = {
        "text": f"🔔 Price Alert!",
        "blocks": [
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*{product_name}* is now at your target price!"
                }
            },
            {
                "type": "section",
                "fields": [
                    {"type": "mrkdwn", "text": f"*Current Price:*\n${current_price:.2f}"},
                    {"type": "mrkdwn", "text": f"*Target Price:*\n${target_price:.2f}"}
                ]
            },
            {
                "type": "actions",
                "elements": [
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": "View Product"},
                        "url": url
                    }
                ]
            }
        ]
    }

    requests.post(slack_webhook, json=message)
    print(f"Alert sent for {product_name}")

# Start monitoring
for url in ALERT_RULES.keys():
    job_id = create_monitoring_job(url)
    print(f"Started monitoring: {url} (Job: {job_id})")

# Run Flask server
if __name__ == '__main__':
    app.run(port=5000)

Dynamic Pricing Strategy

dynamic_pricing.py
python
from scrapehub import ScrapeHubClient
import pandas as pd

class DynamicPricingEngine:
    def __init__(self, api_key):
        self.client = ScrapeHubClient(api_key=api_key)

    def get_competitor_prices(self, product_urls):
        """Get current competitor prices"""
        prices = []

        for url in product_urls:
            result = self.client.scrape(url=url, engine="neural-x1")
            if result.data:
                product = result.data[0]
                prices.append(float(product['price']))

        return prices

    def calculate_optimal_price(self, my_cost, competitor_prices, strategy='competitive'):
        """Calculate optimal price based on strategy"""

        if not competitor_prices:
            return my_cost * 1.3  # 30% markup if no competitor data

        min_competitor = min(competitor_prices)
        avg_competitor = sum(competitor_prices) / len(competitor_prices)
        max_competitor = max(competitor_prices)

        if strategy == 'aggressive':
            # Undercut lowest competitor by 5%
            optimal = min_competitor * 0.95

        elif strategy == 'competitive':
            # Match average competitor price
            optimal = avg_competitor

        elif strategy == 'premium':
            # Price slightly below highest competitor
            optimal = max_competitor * 0.95

        else:  # balanced
            # Between average and lowest
            optimal = (avg_competitor + min_competitor) / 2

        # Ensure minimum margin
        min_price = my_cost * 1.15  # At least 15% margin
        return max(optimal, min_price)

    def recommend_price(self, product_name, my_cost, competitor_urls):
        """Get price recommendation"""

        print(f"\nAnalyzing prices for: {product_name}")
        print(f"Your cost: ${my_cost:.2f}")

        # Get competitor prices
        competitor_prices = self.get_competitor_prices(competitor_urls)

        print(f"\nCompetitor prices: {[f'${p:.2f}' for p in competitor_prices]}")
        print(f"Range: ${min(competitor_prices):.2f} - ${max(competitor_prices):.2f}")

        # Calculate recommendations
        strategies = ['aggressive', 'competitive', 'premium', 'balanced']
        recommendations = {}

        for strategy in strategies:
            price = self.calculate_optimal_price(my_cost, competitor_prices, strategy)
            margin = ((price - my_cost) / my_cost) * 100
            recommendations[strategy] = {
                'price': price,
                'margin': margin
            }

        return recommendations

# Usage
engine = DynamicPricingEngine("sk_live_xxxx_449x")

# Your product
my_product_cost = 450.00

# Competitor URLs
competitors = [
    "https://competitor1.com/laptop-pro-15",
    "https://competitor2.com/laptop-pro-15",
    "https://competitor3.com/laptop-pro-15"
]

# Get recommendations
recommendations = engine.recommend_price(
    product_name="Laptop Pro 15",
    my_cost=my_product_cost,
    competitor_urls=competitors
)

# Display recommendations
print("\n=== Pricing Recommendations ===")
for strategy, data in recommendations.items():
    print(f"\n{strategy.capitalize()} Strategy:")
    print(f"  Price: ${data['price']:.2f}")
    print(f"  Margin: {data['margin']:.1f}%")

Best Practices

  • Check prices at consistent intervals (hourly, daily) for accurate trends
  • Store historical data to identify patterns and seasonality
  • Set up multiple alert thresholds (% change, absolute price)
  • Monitor both price and availability changes
  • Track MAP (Minimum Advertised Price) compliance
  • Use webhooks for real-time notifications on critical changes

Key Metrics to Track

Price Metrics

  • Current price vs. historical average
  • Price volatility and standard deviation
  • Frequency of price changes
  • Price positioning vs. competitors

Market Insights

  • Competitive price gaps
  • Market price ranges
  • Discount patterns and cycles
  • Stock availability correlation